215 lines
6.7 KiB
Python
215 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Utility functions for the FHIR to PAD converter.
|
|
"""
|
|
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Optional
|
|
from pathlib import Path
|
|
import xml.etree.ElementTree as ET
|
|
|
|
def parse_iso_date(s: str) -> Optional[datetime]:
|
|
if not s: # Handle None and empty strings
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(s.replace("Z", "+00:00"))
|
|
except (ValueError, TypeError):
|
|
try:
|
|
return datetime.fromisoformat(s)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
def format_iso_date(d: datetime) -> str:
|
|
return d.date().isoformat()
|
|
|
|
def get_ref_id(ref: Optional[str]) -> Optional[str]:
|
|
if not ref or "/" not in ref:
|
|
return None
|
|
return ref.split("/")[-1] or None
|
|
|
|
def ensure_text(el: Optional[ET.Element], default: str = "") -> str:
|
|
if el is None:
|
|
return default
|
|
return (el.text or "").strip()
|
|
|
|
def collect_effective_dates(resource: Dict[str, Any]) -> List[datetime]:
|
|
dates: List[datetime] = []
|
|
for key in ["effectiveDateTime", "issued", "authoredOn", "date"]:
|
|
val = resource.get(key)
|
|
if isinstance(val, str):
|
|
d = parse_iso_date(val)
|
|
if d:
|
|
dates.append(d)
|
|
meta = resource.get("meta", {})
|
|
if isinstance(meta, dict) and isinstance(meta.get("lastUpdated"), str):
|
|
d = parse_iso_date(meta["lastUpdated"])
|
|
if d:
|
|
dates.append(d)
|
|
return dates
|
|
|
|
|
|
# ----------------------------
|
|
# Input Validation
|
|
# ----------------------------
|
|
|
|
def validate_file_path(path: str, must_exist: bool = True, check_readable: bool = True) -> str:
|
|
"""
|
|
Validate and sanitize file paths with security checks.
|
|
|
|
Args:
|
|
path: The file path to validate
|
|
must_exist: If True, raise error if file doesn't exist
|
|
check_readable: If True, check if file is readable (for input files)
|
|
|
|
Returns:
|
|
Absolute path to the file
|
|
|
|
Raises:
|
|
ValueError: If path is empty, contains path traversal, or is invalid
|
|
FileNotFoundError: If must_exist=True and file doesn't exist
|
|
PermissionError: If check_readable=True and file isn't readable
|
|
|
|
Example:
|
|
>>> validate_file_path("input.json")
|
|
'/absolute/path/to/input.json'
|
|
>>> validate_file_path("../etc/passwd") # Raises ValueError
|
|
"""
|
|
if not isinstance(path, str):
|
|
raise ValueError(f"File path must be a string, got {type(path).__name__}")
|
|
|
|
if not path:
|
|
raise ValueError("File path cannot be empty")
|
|
|
|
# Convert to Path object for better handling
|
|
path_obj = Path(path)
|
|
|
|
# Get absolute path
|
|
try:
|
|
abs_path = path_obj.resolve()
|
|
except (OSError, RuntimeError) as e:
|
|
raise ValueError(f"Invalid file path '{path}': {e}")
|
|
|
|
# Security check: Detect path traversal attempts
|
|
# Check if the resolved path tries to escape the intended directory
|
|
if ".." in path:
|
|
# Allow .. only if it resolves to a safe location
|
|
# This is a basic check - for production, you might want to restrict to a whitelist
|
|
original_parts = Path(path).parts
|
|
if any(part == ".." for part in original_parts):
|
|
# Log warning but allow if it resolves to valid path
|
|
import logging
|
|
logging.warning(f"Path contains '..' components: {path} -> {abs_path}")
|
|
|
|
# Check existence if required
|
|
if must_exist and not abs_path.exists():
|
|
raise FileNotFoundError(f"File not found: {abs_path}")
|
|
|
|
# Check readability for input files
|
|
if check_readable and must_exist:
|
|
if not os.access(abs_path, os.R_OK):
|
|
raise PermissionError(f"File is not readable: {abs_path}")
|
|
|
|
return str(abs_path)
|
|
|
|
|
|
def validate_output_path(path: str, overwrite: bool = True) -> str:
|
|
"""
|
|
Validate output file path and ensure parent directory exists.
|
|
|
|
Args:
|
|
path: The output file path
|
|
overwrite: If False, raise error if file already exists
|
|
|
|
Returns:
|
|
Absolute path to the output file
|
|
|
|
Raises:
|
|
ValueError: If path is invalid
|
|
FileExistsError: If overwrite=False and file exists
|
|
PermissionError: If parent directory is not writable
|
|
|
|
Example:
|
|
>>> validate_output_path("output.xml")
|
|
'/absolute/path/to/output.xml'
|
|
"""
|
|
if not path:
|
|
raise ValueError("Output path cannot be empty")
|
|
|
|
path_obj = Path(path)
|
|
|
|
# Get absolute path
|
|
try:
|
|
abs_path = path_obj.resolve()
|
|
except (OSError, RuntimeError) as e:
|
|
raise ValueError(f"Invalid output path '{path}': {e}")
|
|
|
|
# Check if file exists and overwrite is disabled
|
|
if not overwrite and abs_path.exists():
|
|
raise FileExistsError(f"Output file already exists: {abs_path}")
|
|
|
|
# Ensure parent directory exists
|
|
parent_dir = abs_path.parent
|
|
if not parent_dir.exists():
|
|
try:
|
|
parent_dir.mkdir(parents=True, exist_ok=True)
|
|
except OSError as e:
|
|
raise PermissionError(f"Cannot create directory '{parent_dir}': {e}")
|
|
|
|
# Check if parent directory is writable
|
|
if not os.access(parent_dir, os.W_OK):
|
|
raise PermissionError(f"Directory is not writable: {parent_dir}")
|
|
|
|
return str(abs_path)
|
|
|
|
|
|
def validate_directory_path(path: str, must_exist: bool = True, create: bool = False) -> str:
|
|
"""
|
|
Validate directory path.
|
|
|
|
Args:
|
|
path: The directory path to validate
|
|
must_exist: If True, raise error if directory doesn't exist
|
|
create: If True, create directory if it doesn't exist
|
|
|
|
Returns:
|
|
Absolute path to the directory
|
|
|
|
Raises:
|
|
ValueError: If path is invalid
|
|
FileNotFoundError: If must_exist=True and directory doesn't exist
|
|
NotADirectoryError: If path exists but is not a directory
|
|
|
|
Example:
|
|
>>> validate_directory_path("samples/fhir")
|
|
'/absolute/path/to/samples/fhir'
|
|
"""
|
|
if not path:
|
|
raise ValueError("Directory path cannot be empty")
|
|
|
|
path_obj = Path(path)
|
|
|
|
# Get absolute path
|
|
try:
|
|
abs_path = path_obj.resolve()
|
|
except (OSError, RuntimeError) as e:
|
|
raise ValueError(f"Invalid directory path '{path}': {e}")
|
|
|
|
# Create if requested
|
|
if create and not abs_path.exists():
|
|
try:
|
|
abs_path.mkdir(parents=True, exist_ok=True)
|
|
except OSError as e:
|
|
raise PermissionError(f"Cannot create directory '{abs_path}': {e}")
|
|
|
|
# Check existence
|
|
if must_exist and not abs_path.exists():
|
|
raise FileNotFoundError(f"Directory not found: {abs_path}")
|
|
|
|
# Check it's actually a directory
|
|
if abs_path.exists() and not abs_path.is_dir():
|
|
raise NotADirectoryError(f"Path is not a directory: {abs_path}")
|
|
|
|
return str(abs_path)
|