#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ FHIR JSON Bundle -> PAD XML converter with validation and detailed stats. - Validates FHIR JSON (schema if provided, otherwise lightweight checks) - Converts to PAD XML based on example structure - Validates PAD XML (XSD if provided, otherwise well-formedness check) - Produces a JSON report with diagnostics before and after conversion Usage: python fhir_to_pad_converter.py --input-json input.json --output-xml output.xml \ [--report-json report.json] [--fhir-json-schema fhir.schema.json] [--pad-xsd pad.xsd] """ import json import logging from collections import Counter, defaultdict from typing import Any, Dict, List, Optional, Tuple from pathlib import Path import typer from translator import CodeTranslator from utils import ( parse_iso_date, format_iso_date, get_ref_id, ensure_text, collect_effective_dates, validate_file_path, validate_output_path, validate_directory_path ) from datetime import datetime import random # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('fhir_to_pad_converter') # Optional deps with better error handling try: import jsonschema # type: ignore HAS_JSONSCHEMA = True logger.debug("jsonschema module loaded successfully") except ImportError as e: HAS_JSONSCHEMA = False logger.warning("jsonschema not available - FHIR JSON Schema validation will be skipped") logger.warning("To enable JSON Schema validation, install with: pip install jsonschema") except Exception as e: HAS_JSONSCHEMA = False logger.error(f"Unexpected error loading jsonschema module: {e}") try: from lxml import etree # type: ignore HAS_LXML = True logger.debug("lxml module loaded successfully") except ImportError as e: HAS_LXML = False logger.warning("lxml not available - XSD validation will be skipped") logger.warning("To enable XSD validation, install with: pip install lxml") except Exception as e: HAS_LXML = False logger.error(f"Unexpected error loading lxml module: {e}") import xml.etree.ElementTree as ET # Config validation (optional, with graceful degradation) try: from config_schemas import ( validate_header_config, validate_placeholder_config, validate_mapping_config ) HAS_CONFIG_VALIDATION = True logger.debug("Config validation schemas loaded successfully") except ImportError: HAS_CONFIG_VALIDATION = False logger.debug("Config validation not available (config_schemas.py not found)") except Exception as e: HAS_CONFIG_VALIDATION = False logger.error(f"Error loading config validation: {e}") PAD_NS = "http://padinfo.de/ns/pad" # Create Typer CLI application app = typer.Typer( name="fhir2pad", help="FHIR JSON to PADneXt XML converter with validation & statistics", add_completion=True, rich_markup_mode="rich" ) # ---------------------------- # FHIR validation & stats # ---------------------------- def validate_fhir_json(bundle: Dict[str, Any], fhir_json_schema_path: Optional[str] = None) -> Tuple[bool, List[str]]: messages: List[str] = [] if fhir_json_schema_path and HAS_JSONSCHEMA: try: with open(fhir_json_schema_path, "r", encoding="utf-8") as f: schema = json.load(f) jsonschema.validate(bundle, schema) return True, [f"FHIR JSON validated against schema: {fhir_json_schema_path}"] except Exception as e: return False, [f"FHIR JSON schema validation failed: {e}"] # Lightweight checks if bundle.get("resourceType") != "Bundle": messages.append("Expected resourceType='Bundle'.") if bundle.get("type") not in {"searchset", "collection", "document", "message"}: messages.append("Bundle.type should be one of: searchset|collection|document|message.") if not isinstance(bundle.get("entry", []), list): messages.append("Bundle.entry must be an array.") ok = not messages if ok: messages.append("FHIR JSON passed lightweight structural checks (no JSON Schema provided/available).") else: messages.insert(0, "FHIR JSON failed lightweight structural checks.") return ok, messages def compute_fhir_stats(bundle: Dict[str, Any]) -> Dict[str, Any]: entries = bundle.get("entry", []) or [] n_entries = len(entries) resource_counts = Counter() missing_subject = 0 with_dates = 0 all_dates: List[datetime] = [] eob_stats = { "count": 0, "total_submitted": 0.0, "outcomes": Counter(), } for e in entries: res = e.get("resource", {}) if isinstance(e, dict) else {} rtype = res.get("resourceType") or "Unknown" resource_counts[rtype] += 1 if rtype == "ExplanationOfBenefit": eob_stats["count"] += 1 eob_stats["outcomes"][res.get("outcome")] += 1 total = res.get("total", []) for t in total: if t.get("category", {}).get("coding", [{}])[0].get("code") == "submitted": eob_stats["total_submitted"] += t.get("amount", {}).get("value", 0.0) subject = res.get("subject") or res.get("patient") if not (isinstance(subject, dict) and isinstance(subject.get("reference"), str)): missing_subject += 1 dates = collect_effective_dates(res) if dates: with_dates += 1 all_dates.extend(dates) date_min = min(all_dates).isoformat() if all_dates else None date_max = max(all_dates).isoformat() if all_dates else None warnings: List[str] = [] if missing_subject > 0: warnings.append(f"{missing_subject} / {n_entries} resources missing subject/patient reference.") if n_entries > 0 and with_dates == 0: warnings.append("No effective/issued/authoredOn/date fields found in any resource.") return { "bundle_type": bundle.get("type"), "total_entries": n_entries, "resource_type_counts": dict(resource_counts), "eob_stats": eob_stats, "entries_missing_subject": missing_subject, "entries_with_any_date": with_dates, "date_range": {"min": date_min, "max": date_max}, "warnings": warnings, } # ---------------------------- # Grouping & mapping # ---------------------------- def group_entries(bundle: Dict[str, Any]) -> Dict[Tuple[Optional[str], Optional[str]], List[Dict[str, Any]]]: groups: Dict[Tuple[Optional[str], Optional[str]], List[Dict[str, Any]]] = defaultdict(list) # First, check for Claim resources to determine grouping strategy has_claims = any( e.get("resource", {}).get("resourceType") == "Claim" for e in bundle.get("entry", []) if e is not None # Filter out None entries ) if has_claims: # Group by (patient_id, claim_id) for e in bundle.get("entry", []): if e is None: # Skip None entries continue res = e.get("resource", {}) if not isinstance(res, dict): continue patient_id = None subject = res.get("subject") or res.get("patient") if isinstance(subject, dict): patient_id = get_ref_id(subject.get("reference")) claim_id = None if res.get("resourceType") == "Claim": claim_id = res.get("id") elif "claim" in res: # For ExplanationOfBenefit claim_ref = res.get("claim", {}).get("reference") if claim_ref: claim_id = get_ref_id(claim_ref) if patient_id and claim_id: groups[(patient_id, claim_id)].append(res) elif patient_id: # For resources not directly linked to a claim but to a patient # This part needs careful handling. For now, let's add to all claim groups for that patient for key in list(groups.keys()): if key[0] == patient_id: groups[key].append(res) else: # Fallback to encounter-based grouping for e in bundle.get("entry", []): if e is None: # Skip None entries continue res = e.get("resource", {}) if not isinstance(res, dict): continue patient_id = None subject = res.get("subject") or res.get("patient") if isinstance(subject, dict): patient_id = get_ref_id(subject.get("reference")) encounter_id = None enc = res.get("encounter") or res.get("context") if isinstance(enc, dict): encounter_id = get_ref_id(enc.get("reference")) groups[(patient_id, encounter_id)].append(res) return groups def get_value_from_path(resource: Dict[str, Any], path: str) -> Optional[Any]: """Gets a value from a nested dict using a dot-separated path.""" keys = path.split('.') value = resource for key in keys: if isinstance(value, dict): value = value.get(key) elif isinstance(value, list): try: idx = int(key) if 0 <= idx < len(value): value = value[idx] else: return None except (ValueError, IndexError): return None else: return None return value def map_resource_to_position(res: Dict[str, Any], mapping_config: Dict[str, Any], translator: Optional[CodeTranslator] = None) -> Optional[Dict[str, Any]]: """Maps a FHIR resource to a PAD position using a configurable mapping.""" rtype = res.get("resourceType") if not rtype or rtype not in mapping_config.get("resources", {}): return None mapping = mapping_config["resources"][rtype] position = { "id": res.get("id", ""), "faktor": "", "umsatzsteuer": "", "minderungssatz": "", "aisbewertung": {"punktwert": "", "punktzahl": "", "einzelbetrag": ""}, } for field, rules in mapping.get("fields", {}).items(): value = None if "source" in rules: value = get_value_from_path(res, rules["source"]) if "translate" in rules and translator: translate_rules = rules["translate"] source_system_field = translate_rules.get("source_system_field") source_code_field = translate_rules.get("source_code_field") if source_system_field and source_code_field: coding_object = get_value_from_path(res, rules["source"]) if isinstance(coding_object, dict): system = coding_object.get(source_system_field) code = coding_object.get(source_code_field) if system and code: translated_code = translator.translate(system, code) if translated_code: value = translated_code if value is None and "default" in rules: value = rules["default"] if value is None and rules.get("required"): value = rules.get("placeholder", "") position[field] = value if value is not None else "" # Fallback for text if not position.get("text"): position["text"] = rtype # Handle date separately for now if 'datum' in position and position['datum']: dt = parse_iso_date(position['datum']) position['datum'] = format_iso_date(dt) if dt else "" else: # Fallback to collect_effective_dates if no specific date is mapped dates = collect_effective_dates(res) position['datum'] = format_iso_date(sorted(dates)[0]) if dates else "" return position def claim_item_to_position(item: Dict[str, Any]) -> Dict[str, Any]: """Converts a FHIR Claim.item to a PAD position dictionary.""" serviced_date = item.get("servicedDate", "") product_or_service = item.get("productOrService", {}) code = "" disp = "" if isinstance(product_or_service, dict): codings = product_or_service.get("coding", []) if codings: code = codings[0].get("code", "") disp = codings[0].get("display", "") return { "id": item.get("sequence", ""), "go": "EBM", # Default to EBM (Einheitlicher Bewertungsmaßstab) for general medical services "ziffer": code, "datum": serviced_date, "anzahl": "1", "text": disp or "", "faktor": "", "umsatzsteuer": "", "minderungssatz": "", "aisbewertung": {"punktwert": "", "punktzahl": "", "einzelbetrag": ""}, } def find_resource_by_ref(bundle: Dict[str, Any], ref: str) -> Optional[Dict[str, Any]]: """Finds a resource in the bundle by its reference.""" ref_parts = ref.split('/') if len(ref_parts) != 2: return None res_type, res_id = ref_parts for entry in bundle.get("entry", []): resource = entry.get("resource") if isinstance(resource, dict) and resource.get("resourceType") == res_type and resource.get("id") == res_id: return resource return None def claim_to_rechnung_header(claim: Dict[str, Any], bundle: Dict[str, Any]) -> Dict[str, Any]: """Extracts header info from a Claim resource and referenced resources.""" header = {} # Provider information provider_ref = claim.get("provider", {}).get("reference") if provider_ref: provider_res = find_resource_by_ref(bundle, provider_ref) if provider_res: header["leistungserbringer_name"] = provider_res.get("name", "") # Insurer/Recipient information insurer_ref = claim.get("insurer", {}).get("reference") if insurer_ref: insurer_res = find_resource_by_ref(bundle, insurer_ref) if insurer_res: header["empfaenger_name"] = insurer_res.get("name", "") # Patient information patient_ref = claim.get("patient", {}).get("reference") if patient_ref: patient_res = find_resource_by_ref(bundle, patient_ref) if patient_res: name = patient_res.get("name", [{}])[0] header["behandelter_vorname"] = " ".join(name.get("given", [])) header["behandelter_name"] = name.get("family", "") header["behandelter_gebdatum"] = patient_res.get("birthDate", "") # Diagnosis diagnosis = claim.get("diagnosis", []) if diagnosis: diag1 = diagnosis[0].get("diagnosisCodeableConcept", {}).get("coding", [{}])[0] header["diagnose_text"] = diag1.get("display", "") header["diagnose_datum"] = claim.get("created", "").split("T")[0] return header # ---------------------------- # XML helpers & builder # ---------------------------- def E(tag: str, text: Optional[str] = None, attrib: Optional[Dict[str, str]] = None) -> ET.Element: if attrib is None: attrib = {} el = ET.Element(f"{{{PAD_NS}}}{tag}", attrib) if text is not None: el.text = text return el def Sub(parent: ET.Element, tag: str, text: Optional[str] = None, attrib: Optional[Dict[str, str]] = None) -> ET.Element: if attrib is None: attrib = {} el = ET.SubElement(parent, f"{{{PAD_NS}}}{tag}", attrib) if text is not None: el.text = text return el def build_person(parent: ET.Element, tag: str, *, anrede: str = "", vorname: str = "", name: str = "", gebdatum: str = "", geschlecht: str = "", anschrift: Optional[Dict[str, str]] = None, extra_attrib: Optional[Dict[str, str]] = None) -> ET.Element: if extra_attrib is None: extra_attrib = {} person_el = Sub(parent, tag, attrib=extra_attrib) if anrede: Sub(person_el, "anrede", anrede) if vorname: Sub(person_el, "vorname", vorname) if name: Sub(person_el, "name", name) if gebdatum: Sub(person_el, "gebdatum", gebdatum) # Only add geschlecht if provided (not allowed for all person types) if geschlecht: Sub(person_el, "geschlecht", geschlecht) # Only add anschrift if provided if anschrift and any(anschrift.get(k) for k in ["strasse", "plz", "ort"]): adr = Sub(person_el, "anschrift") hausadresse_el = Sub(adr, "hausadresse") # XSD requires order: land, zusatz, plz, ort, strasse, hausnr if anschrift.get("plz"): Sub(hausadresse_el, "plz", anschrift["plz"]) if anschrift.get("ort"): Sub(hausadresse_el, "ort", anschrift["ort"]) if anschrift.get("strasse"): Sub(hausadresse_el, "strasse", anschrift["strasse"]) return person_el # ---------------------------- # Placeholder helpers # ---------------------------- def get_with_placeholder(value: str, placeholder: str, field_path: str, auto_filled: List[str]) -> str: """Get value with placeholder fallback. Track if placeholder was used.""" if value and value.strip(): return value if placeholder is not None: auto_filled.append(f"{field_path} = '{placeholder}'") return placeholder # Should not happen if placeholders are configured correctly return "" def sub_optional(parent: ET.Element, tag: str, value: Any, attrib: Optional[Dict[str, str]] = None) -> Optional[ET.Element]: """Create subelement only if value is not None/empty. For optional fields.""" if value is None or (isinstance(value, str) and not value.strip()): return None return Sub(parent, tag, str(value), attrib=attrib) def sub_required(parent: ET.Element, tag: str, value: str, placeholder: str, field_path: str, auto_filled: List[str], attrib: Optional[Dict[str, str]] = None) -> ET.Element: """Create subelement with placeholder fallback for required fields.""" actual_value = get_with_placeholder(value, placeholder, field_path, auto_filled) return Sub(parent, tag, actual_value, attrib=attrib) def validate_ziffer(ziffer: str, placeholder: str, field_path: str, auto_filled: List[str]) -> str: """Validate and fix ziffer billing code. Args: ziffer: Original billing code from FHIR placeholder: Placeholder code to use if invalid field_path: Path for tracking (e.g., "position[1].ziffer") auto_filled: List to track auto-filled fields Returns: Valid ziffer code (max 8 chars, min 1 char) """ # Empty or None - use placeholder if not ziffer or not str(ziffer).strip(): auto_filled.append(f"{field_path} = '{placeholder}' (empty code)") return placeholder ziffer_str = str(ziffer).strip() # Too long - truncate to 8 chars and track if len(ziffer_str) > 8: truncated = ziffer_str[:8] auto_filled.append(f"{field_path} = '{truncated}' (original: '{ziffer_str}' - {len(ziffer_str)} chars, truncated to 8)") return truncated # Valid - return as is return ziffer_str def build_person_with_placeholders(parent: ET.Element, tag: str, anrede: str, vorname: str, name: str, anschrift: Dict[str, str], gebdatum: str, geschlecht: str, ph_person: Dict[str, Any], field_prefix: str, auto_filled: List[str], include_geschlecht: bool = True) -> ET.Element: """Build person element with placeholder fallback for required fields. Args: include_geschlecht: Whether to include geschlecht field (not allowed for empfaenger, required for behandelter) """ # Apply placeholders for required fields actual_anrede = get_with_placeholder(anrede, ph_person.get("anrede", "Ohne Anrede"), f"{field_prefix}.anrede", auto_filled) actual_vorname = get_with_placeholder(vorname, ph_person.get("vorname", "UNKNOWN"), f"{field_prefix}.vorname", auto_filled) actual_name = get_with_placeholder(name, ph_person.get("name", "UNKNOWN"), f"{field_prefix}.name", auto_filled) actual_gebdatum = get_with_placeholder(gebdatum, ph_person.get("gebdatum", "1900-01-01"), f"{field_prefix}.gebdatum", auto_filled) # geschlecht is only included for certain person types (behandelter, not empfaenger) if include_geschlecht: actual_geschlecht = get_with_placeholder(geschlecht, ph_person.get("geschlecht", "u"), f"{field_prefix}.geschlecht", auto_filled) else: actual_geschlecht = "" # Will be ignored by build_person # Address placeholders - only if anschrift is expected (not for behandelter) actual_anschrift = None if anschrift is not None and anschrift: # Only process if anschrift dict is provided and not empty actual_anschrift = { "plz": get_with_placeholder(anschrift.get("plz", ""), ph_person.get("plz", "00000"), f"{field_prefix}.anschrift.plz", auto_filled), "ort": get_with_placeholder(anschrift.get("ort", ""), ph_person.get("ort", "UNKNOWN"), f"{field_prefix}.anschrift.ort", auto_filled), "strasse": get_with_placeholder(anschrift.get("strasse", ""), ph_person.get("strasse", "UNKNOWN"), f"{field_prefix}.anschrift.strasse", auto_filled), } return build_person(parent, tag, anrede=actual_anrede, vorname=actual_vorname, name=actual_name, anschrift=actual_anschrift, gebdatum=actual_gebdatum, geschlecht=actual_geschlecht if include_geschlecht else "") def build_pad_xml(bundle: Dict[str, Any], header_cfg: Optional[Dict[str, Any]] = None, placeholder_cfg: Optional[Dict[str, Any]] = None, mapping_config: Optional[Dict[str, Any]] = None, translator: Optional[CodeTranslator] = None) -> Tuple[ET.Element, List[str], Dict[str, Any], List[str]]: """FULL implementation (no stubs) - returns a valid XML root element, a list of validation warnings, the header info, and auto-filled fields.""" if header_cfg is None: header_cfg = {} if placeholder_cfg is None: placeholder_cfg = {} if mapping_config is None: mapping_config = {} all_validation_warnings = [] auto_filled: List[str] = [] final_header = header_cfg.copy() rechnungen = E("rechnungen", attrib={"anzahl": "0"}) rechnungen.set("xmlns", PAD_NS) Sub(rechnungen, "nachrichtentyp", "ADL", attrib={"version": header_cfg.get("nachrichtentyp_version", "1.0")}) # Rechnungsersteller with placeholders ph_re = placeholder_cfg.get("rechnungsersteller", {}) rechnungsersteller = Sub(rechnungen, "rechnungsersteller") sub_required(rechnungsersteller, "name", header_cfg.get("rechnungsersteller_name", ""), ph_re.get("name", "UNKNOWN"), "rechnungsersteller.name", auto_filled) Sub(rechnungsersteller, "kundennr", header_cfg.get("rechnungsersteller_kundennr", "")) anschrift_el = Sub(rechnungsersteller, "anschrift") hausadresse_el = Sub(anschrift_el, "hausadresse") # XSD requires order: land, zusatz, plz, ort, strasse, hausnr sub_required(hausadresse_el, "plz", header_cfg.get("rechnungsersteller_plz", ""), ph_re.get("plz", "00000"), "rechnungsersteller.anschrift.hausadresse.plz", auto_filled) sub_required(hausadresse_el, "ort", header_cfg.get("rechnungsersteller_ort", ""), ph_re.get("ort", "UNKNOWN"), "rechnungsersteller.anschrift.hausadresse.ort", auto_filled) sub_required(hausadresse_el, "strasse", header_cfg.get("rechnungsersteller_strasse", ""), ph_re.get("strasse", "UNKNOWN"), "rechnungsersteller.anschrift.hausadresse.strasse", auto_filled) # iknr is optional - only create if value exists iknr_val = header_cfg.get("rechnungsersteller_iknr", "") if iknr_val and iknr_val.strip(): Sub(rechnungsersteller, "iknr", iknr_val) # Leistungserbringer with placeholders ph_le = placeholder_cfg.get("leistungserbringer", {}) leistungserbringer = Sub(rechnungen, "leistungserbringer", attrib={"id": header_cfg.get("leistungserbringer_id", "")}) # titel is optional sub_optional(leistungserbringer, "titel", header_cfg.get("leistungserbringer_titel", "")) # vorname and name are required sub_required(leistungserbringer, "vorname", header_cfg.get("leistungserbringer_vorname", ""), ph_le.get("vorname", "UNKNOWN"), "leistungserbringer.vorname", auto_filled) sub_required(leistungserbringer, "name", header_cfg.get("leistungserbringer_name", ""), ph_le.get("name", "UNKNOWN"), "leistungserbringer.name", auto_filled) groups = group_entries(bundle) rechnung_count = 0 for (patient_id, group_id), entries in groups.items(): if not patient_id: continue # Run validation checks for the group group_warnings = run_validation(entries) if group_warnings: all_validation_warnings.extend(group_warnings) claim_resource = next((e for e in entries if e.get("resourceType") == "Claim"), None) current_header = header_cfg.copy() if claim_resource: claim_header_info = claim_to_rechnung_header(claim_resource, bundle) current_header.update(claim_header_info) final_header = current_header rechnung_count += 1 # Build rechnung attributes - skip optional empty ones ph_rech = placeholder_cfg.get("rechnung", {}) rechnung_attrib = {"id": f"R{rechnung_count:05d}", "aisrechnungsnr": str(random.randint(100000000, 999999999))} # Optional attributes - only add if they have values eabgabe_val = current_header.get("eabgabe", "") if eabgabe_val and str(eabgabe_val).strip(): rechnung_attrib["eabgabe"] = str(eabgabe_val) aisaktenzeichen_val = current_header.get("aisaktenzeichen", "") if aisaktenzeichen_val and str(aisaktenzeichen_val).strip(): rechnung_attrib["aisaktenzeichen"] = str(aisaktenzeichen_val) aisendbetrag_val = current_header.get("aisendbetrag", "") if aisendbetrag_val and str(aisendbetrag_val).strip(): rechnung_attrib["aisendbetrag"] = str(aisendbetrag_val) rechnung = Sub(rechnungen, "rechnung", attrib=rechnung_attrib) # Optional recipient block ph_emp = placeholder_cfg.get("empfaenger", {}) empfaenger = Sub(rechnung, "rechnungsempfaenger") build_person_with_placeholders( empfaenger, "person", anrede=current_header.get("empfaenger_anrede", ""), vorname=current_header.get("empfaenger_vorname", ""), name=current_header.get("empfaenger_name", ""), anschrift={ "strasse": current_header.get("empfaenger_strasse", ""), "plz": current_header.get("empfaenger_plz", ""), "ort": current_header.get("empfaenger_ort", ""), }, gebdatum=current_header.get("empfaenger_gebdatum", ""), geschlecht=current_header.get("empfaenger_geschlecht", ""), ph_person=ph_emp, field_prefix="empfaenger", auto_filled=auto_filled, include_geschlecht=False # empfaenger does not include geschlecht per XSD ) fall = Sub(rechnung, "abrechnungsfall") # Per XSD, abrechnungsfall must contain one of: bema, bmgnormprivat, humanmedizin, etc. # We use humanmedizin for general medical billing humanmedizin = Sub(fall, "humanmedizin") # behandelter - per XSD: NO anschrift field! ph_behandelter = placeholder_cfg.get("behandelter", {}) build_person_with_placeholders(humanmedizin, "behandelter", anrede=current_header.get("behandelter_anrede", ""), vorname=current_header.get("behandelter_vorname", ""), name=current_header.get("behandelter_name", ""), anschrift=None, # behandelter does NOT have anschrift per XSD gebdatum=current_header.get("behandelter_gebdatum", ""), geschlecht=current_header.get("behandelter_geschlecht", ""), ph_person=ph_behandelter, field_prefix="behandelter", auto_filled=auto_filled, include_geschlecht=True) # behandelter DOES include geschlecht # versicherter - has similar structure to behandelter but with anschrift (optional) ph_versicherter = placeholder_cfg.get("versicherter", {}) build_person_with_placeholders(humanmedizin, "versicherter", anrede=current_header.get("versicherter_anrede", ""), vorname=current_header.get("versicherter_vorname", ""), name=current_header.get("versicherter_name", ""), anschrift=None, # We don't have versicherter address data gebdatum=current_header.get("versicherter_gebdatum", ""), geschlecht=current_header.get("versicherter_geschlecht", ""), ph_person=ph_versicherter, field_prefix="versicherter", auto_filled=auto_filled, include_geschlecht=True) # versicherter includes geschlecht # Zeitraum from min/max dates in group all_dates: List[datetime] = [] for res in entries: all_dates.extend(collect_effective_dates(res)) start = min(all_dates) if all_dates else None end = max(all_dates) if all_dates else None ph_zeitraum = placeholder_cfg.get("zeitraum", {}) zr = Sub(humanmedizin, "zeitraum") sub_required(zr, "startdatum", format_iso_date(start) if start else "", ph_zeitraum.get("startdatum", "1900-01-01"), "zeitraum.startdatum", auto_filled) sub_required(zr, "endedatum", format_iso_date(end) if end else "", ph_zeitraum.get("endedatum", "1900-01-01"), "zeitraum.endedatum", auto_filled) # Scalars with placeholders for required fields ph_fall = placeholder_cfg.get("abrechnungsfall", {}) sub_required(humanmedizin, "behandlungsart", current_header.get("behandlungsart", ""), ph_fall.get("behandlungsart", "UNKNOWN"), "abrechnungsfall.behandlungsart", auto_filled) sub_required(humanmedizin, "vertragsart", current_header.get("vertragsart", ""), ph_fall.get("vertragsart", "UNKNOWN"), "abrechnungsfall.vertragsart", auto_filled) # aktenzeichen is optional sub_optional(humanmedizin, "aktenzeichen", current_header.get("aktenzeichen", "")) # Diagnose with placeholder for missing datum ph_diagnose = placeholder_cfg.get("diagnose", {}) diag = Sub(humanmedizin, "diagnose") Sub(diag, "text", current_header.get("diagnose_text", "")) sub_required(diag, "datum", current_header.get("diagnose_datum", ""), ph_diagnose.get("datum", "1900-01-01"), "diagnose.datum", auto_filled) # Positions positions: List[Dict[str, Any]] = [] if claim_resource: for item in claim_resource.get("item", []): positions.append(claim_item_to_position(item)) else: for res in entries: rtype = res.get("resourceType") if rtype in mapping_config.get("resources", {}): position = map_resource_to_position(res, mapping_config, translator) if position: positions.append(position) ph_goziffer = placeholder_cfg.get("goziffer", {}) # If no positions, create at least one placeholder position (XSD requires at least one child) if not positions: positions.append({ "id": "1", "go": ph_goziffer.get("go", "EBM"), "ziffer": ph_goziffer.get("ziffer", "99999"), "datum": ph_goziffer.get("datum", "1900-01-01"), "anzahl": "1", "text": "Placeholder position - no billing data in FHIR", "faktor": "1.0", "mwstsatz": "", "minderungssatz": "", "aisbewertung": {"punktwert": "", "punktzahl": "", "einzelbetrag": ""} }) auto_filled.append("position[1] = complete placeholder (no positions found in FHIR data)") pos_el = Sub(humanmedizin, "positionen", attrib={"posanzahl": str(len(positions))}) for idx, p in enumerate(positions, start=1): # Validate and fix ziffer code (max 8 chars, min 1 char) valid_ziffer = validate_ziffer( p["ziffer"], ph_goziffer.get("ziffer", "99999"), f"position[{idx}].ziffer", auto_filled ) goz = Sub(pos_el, "goziffer", attrib={ "id": str(p["id"]), "positionsnr": str(idx), # Required: sequential position number "go": p["go"], "ziffer": valid_ziffer }) # datum is required - apply placeholder if empty datum_val = p.get("datum", "") if not datum_val or not str(datum_val).strip(): datum_val = ph_goziffer.get("datum", "1900-01-01") auto_filled.append(f"position[{idx}].datum = '{datum_val}' (empty date)") Sub(goz, "datum", datum_val) Sub(goz, "anzahl", p["anzahl"]) Sub(goz, "text", p["text"]) # Choice: faktor OR einzelbetrag (one required) # Using faktor with default 1.0 for standard rate faktor_val = p.get("faktor", "") if faktor_val and str(faktor_val).strip(): Sub(goz, "faktor", str(faktor_val)) else: Sub(goz, "faktor", "1.0") # Default factor for standard billing rate # Optional elements (only output if present) mwstsatz_val = p.get("mwstsatz", "") if mwstsatz_val and str(mwstsatz_val).strip(): Sub(goz, "mwstsatz", str(mwstsatz_val)) minderungssatz_val = p.get("minderungssatz", "") if minderungssatz_val and str(minderungssatz_val).strip(): Sub(goz, "minderungssatz", str(minderungssatz_val)) # punktwert and punktzahl are direct children, not in aisbewertung punktwert_val = p.get("aisbewertung", {}).get("punktwert", "") if punktwert_val and str(punktwert_val).strip(): Sub(goz, "punktwert", str(punktwert_val)) punktzahl_val = p.get("aisbewertung", {}).get("punktzahl", "") if punktzahl_val and str(punktzahl_val).strip(): Sub(goz, "punktzahl", str(punktzahl_val)) # gesamtbetrag is REQUIRED - use einzelbetrag or default to 0.00 einzelbetrag_val = p.get("aisbewertung", {}).get("einzelbetrag", "") gesamtbetrag = einzelbetrag_val if einzelbetrag_val and str(einzelbetrag_val).strip() else "0.00" Sub(goz, "gesamtbetrag", str(gesamtbetrag)) rechnungen.set("anzahl", str(rechnung_count)) return rechnungen, all_validation_warnings, final_header, auto_filled def build_auf_xml(header: Dict[str, Any], stats: Dict[str, Any], output_xml_filename: str) -> ET.Element: """Builds the AUF XML file.""" now = datetime.now() auftrag = E("auftrag", attrib={ "erstellungsdatum": now.isoformat(), "transfernr": str(random.randint(100000, 999999)), "echtdaten": "true", "dateianzahl": "1" }) auftrag.set("xmlns", PAD_NS) empfaenger = Sub(auftrag, "empfaenger") logischer_empfaenger = Sub(empfaenger, "logisch") Sub(logischer_empfaenger, "name", header.get("empfaenger_name", "UNKNOWN")) physikalisch_empfaenger = Sub(empfaenger, "physikalisch") Sub(physikalisch_empfaenger, "name", header.get("empfaenger_name", "UNKNOWN")) absender = Sub(auftrag, "absender") logischer_absender = Sub(absender, "logisch") Sub(logischer_absender, "name", header.get("leistungserbringer_name", "UNKNOWN")) Sub(logischer_absender, "kundennr", header.get("rechnungsersteller_kundennr", "")) physikalisch_absender = Sub(absender, "physikalisch") Sub(physikalisch_absender, "name", header.get("leistungserbringer_name", "UNKNOWN")) Sub(physikalisch_absender, "kundennr", header.get("rechnungsersteller_kundennr", "")) Sub(auftrag, "nachrichtentyp", "ADL", attrib={"version": header.get("nachrichtentyp_version", "1.0")}) system = Sub(auftrag, "system") Sub(system, "produkt", "fhir_to_pad_converter") Sub(system, "version", "1.0") Sub(system, "hersteller", "Gemini") verschluesselung = Sub(auftrag, "verschluesselung", attrib={"verfahren": "0", "idcert": "none"}) empfangsquittung = Sub(auftrag, "empfangsquittung", "false") datei = Sub(auftrag, "datei", attrib={"id": "1", "erstellungsdatum": now.isoformat()}) Sub(datei, "dokumententyp", "PADneXt", attrib={"format": "pdf"}) Sub(datei, "name", output_xml_filename) Sub(datei, "dateilaenge", attrib={"laenge": "0", "pruefsumme": "0" * 40}) return auftrag # ---------------------------- # PAD validation & stats # ---------------------------- def validate_pad_xml(root: ET.Element, pad_xsd_path: Optional[str] = None) -> Tuple[bool, List[str]]: """Enhanced PADneXt XML validation with detailed error reporting.""" messages: List[str] = [] # Step 1: Well-formedness check try: ET.tostring(root, encoding="utf-8", xml_declaration=True) messages.append("✓ XML is well-formed") except Exception as e: return False, [f"PAD XML is not well-formed: {e}"] # Step 2: Namespace check if root.tag == f"{{{PAD_NS}}}rechnungen": messages.append(f"✓ Root element has correct namespace: {PAD_NS}") else: messages.append(f"✗ WARNING: Root element namespace mismatch. Expected: {{{PAD_NS}}}rechnungen, Got: {root.tag}") # Step 3: XSD validation with detailed error reporting if pad_xsd_path and HAS_LXML: try: xml_bytes = ET.tostring(root, encoding="utf-8", xml_declaration=True) xml_doc = etree.fromstring(xml_bytes) with open(pad_xsd_path, "rb") as f: xsd_doc = etree.parse(f) xmlschema = etree.XMLSchema(xsd_doc) # Validate and collect ALL errors is_valid = xmlschema.validate(xml_doc) if is_valid: messages.append(f"✓ PAD XML fully complies with XSD schema: {pad_xsd_path}") return True, messages else: # Collect all validation errors error_log = xmlschema.error_log messages.append(f"✗ XSD validation FAILED with {len(error_log)} error(s):") messages.append(f" Schema: {pad_xsd_path}") messages.append("") messages.append("Detailed validation errors:") for idx, error in enumerate(error_log, 1): messages.append(f" Error {idx}:") messages.append(f" Line {error.line}, Column {error.column}") messages.append(f" Type: {error.type_name}") messages.append(f" Domain: {error.domain_name}") messages.append(f" Message: {error.message}") if error.path: messages.append(f" Path: {error.path}") messages.append("") return False, messages except Exception as e: messages.append(f"✗ XSD validation error: {e}") return False, messages elif pad_xsd_path and not HAS_LXML: messages.append("⚠ lxml not available; cannot perform XSD validation. Performed well-formedness check only.") else: messages.append("⚠ No PAD XSD provided; performed well-formedness check only.") return True, messages def verify_padnext_compliance(root: ET.Element) -> Dict[str, Any]: """Perform detailed PADneXt 2.12 compliance checks beyond XSD validation.""" ns = {"p": PAD_NS} compliance_checks = [] compliance_issues = [] # Check 1: Required header elements nachrichtentyp = root.find("p:nachrichtentyp", ns) if nachrichtentyp is not None and nachrichtentyp.text == "ADL": compliance_checks.append("✓ Nachrichtentyp is ADL (billing data)") version = nachrichtentyp.get("version") if version: compliance_checks.append(f"✓ ADL version: {version}") else: compliance_issues.append("✗ Missing or invalid nachrichtentyp") # Check 2: Rechnungsersteller rechnungsersteller = root.find("p:rechnungsersteller", ns) if rechnungsersteller is not None: name = rechnungsersteller.find("p:name", ns) iknummer = rechnungsersteller.find("p:iknummer", ns) if name is not None and name.text: compliance_checks.append(f"✓ Rechnungsersteller name: {name.text}") else: compliance_issues.append("✗ Missing rechnungsersteller name") if iknummer is not None and iknummer.text: compliance_checks.append(f"✓ Rechnungsersteller IK-Nummer: {iknummer.text}") else: compliance_issues.append("✗ Missing rechnungsersteller element") # Check 3: Leistungserbringer leistungserbringer = root.find("p:leistungserbringer", ns) if leistungserbringer is not None: lb_id = leistungserbringer.get("id") name = leistungserbringer.find("p:name", ns) if name is not None and name.text: compliance_checks.append(f"✓ Leistungserbringer name: {name.text}") if lb_id: compliance_checks.append(f"✓ Leistungserbringer ID: {lb_id}") else: compliance_issues.append("✗ Missing leistungserbringer element") # Check 4: Invoice structure rechnung_nodes = root.findall(".//p:rechnung", ns) if rechnung_nodes: compliance_checks.append(f"✓ Found {len(rechnung_nodes)} Rechnung(en)") # Check each invoice for idx, rechnung in enumerate(rechnung_nodes, 1): rng = rechnung.get("aisrechnungsnr") if rng: compliance_checks.append(f" ✓ Rechnung {idx} has RNG: {rng}") else: compliance_issues.append(f" ✗ Rechnung {idx} missing RNG attribute") # Check for patient patient = rechnung.find(".//p:patient", ns) if patient is not None: pat_name = patient.find("p:name", ns) pat_gebdatum = patient.find("p:gebdatum", ns) if pat_name is not None and pat_name.text: compliance_checks.append(f" ✓ Rechnung {idx} has patient name") if pat_gebdatum is not None and pat_gebdatum.text: compliance_checks.append(f" ✓ Rechnung {idx} has patient birth date") else: compliance_issues.append("✗ No Rechnung elements found") return { "compliance_checks": compliance_checks, "compliance_issues": compliance_issues, "total_checks": len(compliance_checks), "total_issues": len(compliance_issues) } def compute_pad_stats(root: ET.Element) -> Dict[str, Any]: """Compute detailed statistics about the PADneXt XML structure.""" ns = {"p": PAD_NS} rechnung_nodes = root.findall(".//p:rechnung", ns) fall_nodes = root.findall(".//p:abrechnungsfall", ns) pos_nodes = root.findall(".//p:abrechnungsfall/p:humanmedizin/p:positionen", ns) goz_nodes = root.findall(".//p:abrechnungsfall/p:humanmedizin/p:positionen/p:goziffer", ns) patient_nodes = root.findall(".//p:abrechnungsfall/p:humanmedizin/p:behandelter", ns) kostentraeger_nodes = root.findall(".//p:rechnung/p:rechnungsempfaenger", ns) total_positions_declared = sum(int(p.get("posanzahl") or "0") for p in pos_nodes) total_goziffer = len(goz_nodes) missing_behandlungsart = 0 missing_vertragsart = 0 missing_zeitraum = 0 for fall in fall_nodes: behandlungsart = fall.find(f"{{{PAD_NS}}}behandlungsart") vertragsart = fall.find(f"{{{PAD_NS}}}vertragsart") zr = fall.find(f"{{{PAD_NS}}}zeitraum") if not behandlungsart or not ensure_text(behandlungsart): missing_behandlungsart += 1 if not vertragsart or not ensure_text(vertragsart): missing_vertragsart += 1 if not zr or zr.find(f"{{{PAD_NS}}}startdatum") is None or zr.find(f"{{{PAD_NS}}}endedatum") is None: missing_zeitraum += 1 warnings: List[str] = [] root_anzahl = int(root.get("anzahl") or "0") if root_anzahl != len(rechnung_nodes): warnings.append(f"Root anzahl={root_anzahl} differs from actual Rechnung elements={len(rechnung_nodes)}.") if total_positions_declared != total_goziffer: warnings.append(f"Sum of posanzahl={total_positions_declared} differs from goziffer count={total_goziffer}.") if total_goziffer == 0: warnings.append("No positions were generated (goziffer count is 0).") return { "rechnungen_declared": root_anzahl, "rechnungen_actual": len(rechnung_nodes), "abrechnungsfaelle": len(fall_nodes), "position_groups": len(pos_nodes), "positions_declared_sum": total_positions_declared, "goziffer_count": total_goziffer, "patient_count": len(patient_nodes), "kostentraeger_count": len(kostentraeger_nodes), "missing_behandlungsart": missing_behandlungsart, "missing_vertragsart": missing_vertragsart, "missing_zeitraum": missing_zeitraum, "warnings": warnings, } # ---------------------------- # Logging Infrastructure # ---------------------------- class ConversionLogger: """Handles dual logging to console and file.""" def __init__(self, log_file: Optional[str] = None, verbose: bool = False): self.log_file = log_file self.verbose = verbose self.log_buffer: List[str] = [] def log(self, message: str, to_console: bool = False): """Log to file (and optionally console).""" self.log_buffer.append(message) if to_console or self.verbose: print(message) def console(self, message: str): """Log only to console.""" print(message) def both(self, message: str): """Log to both console and file.""" self.log_buffer.append(message) print(message) def section(self, title: str, to_console: bool = False): """Add section header.""" separator = "=" * 70 self.log(separator, to_console) self.log(title, to_console) self.log(separator, to_console) def write_log_file(self): """Write buffered log to file.""" if self.log_file: with open(self.log_file, 'w', encoding='utf-8') as f: f.write('\n'.join(self.log_buffer)) # ---------------------------- # Runner # ---------------------------- from validation import run_validation import os def run(input_json: str, output_xml: str, report_json: Optional[str] = None, output_auf_xml: Optional[str] = None, fhir_json_schema: Optional[str] = None, pad_xsd: Optional[str] = None, header_cfg: Optional[Dict[str, Any]] = None, placeholder_cfg: Optional[Dict[str, Any]] = None, mapping_config: Optional[Dict[str, Any]] = None, concept_maps: Optional[str] = None, log_file: Optional[str] = None, verbose: bool = False) -> Dict[str, Any]: # Initialize logger logger = ConversionLogger(log_file=log_file, verbose=verbose) logger.log(f"FHIR to PADneXt Conversion - {datetime.now().isoformat()}") logger.log(f"Input: {input_json}") logger.log(f"Output: {output_xml}") logger.log("") translator = CodeTranslator() if concept_maps: translator.load_concept_maps(concept_maps) with open(input_json, "r", encoding="utf-8") as f: bundle = json.load(f) # Input validation & stats fhir_ok, fhir_msgs = validate_fhir_json(bundle, fhir_json_schema) fhir_stat = compute_fhir_stats(bundle) # Build output XML root, validation_warnings, final_header, auto_filled = build_pad_xml(bundle, header_cfg=header_cfg, placeholder_cfg=placeholder_cfg, mapping_config=mapping_config, translator=translator) # Output validation & stats pad_ok, pad_msgs = validate_pad_xml(root, pad_xsd) pad_stat = compute_pad_stats(root) pad_compliance = verify_padnext_compliance(root) # Save XML ET.ElementTree(root).write(output_xml, encoding="utf-8", xml_declaration=True) # Build and save AUF XML if output_auf_xml: auf_root = build_auf_xml(final_header, pad_stat, os.path.basename(output_xml)) ET.ElementTree(auf_root).write(output_auf_xml, encoding="utf-8", xml_declaration=True) auf_ok, auf_msgs = validate_pad_xml(auf_root, "specs/padnext/padx_auf_v2.12.xsd") else: auf_ok, auf_msgs = None, [] report = { "input": { "file": input_json, "schema_validation_ok": fhir_ok, "schema_messages": fhir_msgs, "stats": fhir_stat, }, "output": { "adl_file": output_xml, "adl_schema_validation_ok": pad_ok, "adl_schema_messages": pad_msgs, "adl_stats": pad_stat, "padnext_compliance": pad_compliance, "auf_file": output_auf_xml, "auf_schema_validation_ok": auf_ok, "auf_schema_messages": auf_msgs, "auto_filled_fields": auto_filled, }, "validation_warnings": validation_warnings, } if report_json: with open(report_json, "w", encoding="utf-8") as rf: json.dump(report, rf, indent=2, ensure_ascii=False) # ========== DETAILED LOG OUTPUT (to file) ========== logger.section("FHIR INPUT VALIDATION") logger.log(f"Validation: {'OK' if fhir_ok else 'FAILED'}") for m in fhir_msgs: logger.log(f" - {m}") resource_counts = fhir_stat.get("resource_type_counts", {}) claim_count = resource_counts.get("Claim", 0) encounter_count = resource_counts.get("Encounter", 0) logger.log(f"\nAnalysis: Found {claim_count} Claim(s) and {encounter_count} Encounter(s).") if resource_counts: logger.log("\nResource Type Counts:") sorted_counts = sorted(resource_counts.items(), key=lambda x: x[1], reverse=True) for resource_type, count in sorted_counts: logger.log(f" {resource_type}: {count}") logger.log(f"\nFull Stats:\n{json.dumps(fhir_stat, indent=2, ensure_ascii=False)}") logger.section("PAD OUTPUT VALIDATION") logger.log(f"Validation: {'OK' if pad_ok else 'FAILED'}") for m in pad_msgs: logger.log(m) logger.log(f"\nFull Stats:\n{json.dumps(pad_stat, indent=2, ensure_ascii=False)}") logger.section("PADneXt 2.12 COMPLIANCE VERIFICATION") if pad_compliance["total_issues"] == 0: logger.log(f"✓ FULLY COMPLIANT - All {pad_compliance['total_checks']} compliance checks passed") else: logger.log(f"⚠ PARTIAL COMPLIANCE - {pad_compliance['total_issues']} issue(s) found, {pad_compliance['total_checks']} check(s) passed") if pad_compliance["compliance_checks"]: logger.log("\nCompliance Checks:") for check in pad_compliance["compliance_checks"]: logger.log(f" {check}") if pad_compliance["compliance_issues"]: logger.log("\nCompliance Issues:") for issue in pad_compliance["compliance_issues"]: logger.log(f" {issue}") logger.section("AUTO-FILLED FIELDS") if auto_filled: logger.log(f"⚠ {len(auto_filled)} required field(s) were missing and filled with placeholders:") for filled in auto_filled: logger.log(f" {filled}") logger.log("\nℹ These fields should be populated from FHIR data for production use.") else: logger.log("✓ All required fields had values from FHIR data - no placeholders used") logger.section("PAD AUF (Order) Declarative Info") logger.log(f"Erstellungsdatum: {datetime.now().isoformat()}") logger.log(f"Transfer-Nr: {random.randint(100, 999)}") logger.log(f"Empfänger: {final_header.get('empfaenger_name', 'N/A')}") logger.log(f"Absender: {final_header.get('leistungserbringer_name', 'N/A')}") logger.log(f"Datei: {output_xml}") logger.log(f"Anzahl Rechnungen: {pad_stat['rechnungen_actual']}") # ========== CONSOLE OUTPUT (simple summary) ========== logger.console("") logger.console("=" * 70) logger.console("FHIR to PADneXt Conversion") logger.console("=" * 70) # Input summary total_resources = fhir_stat.get("total_entries", 0) logger.console(f"Input: {input_json}") claims_encounters = f"{claim_count} Claim(s), {encounter_count} Encounter(s)" logger.console(f" • {total_resources} resources ({claims_encounters})") logger.console("") # Output summary logger.console(f"Output: {output_xml}") rechnungen_count = pad_stat.get("rechnungen_actual", 0) positions_count = pad_stat.get("goziffer_count", 0) logger.console(f" • {rechnungen_count} Rechnung(en) generated") logger.console(f" • {positions_count} position(s)") logger.console("") if log_file: logger.console(f"Log: {log_file}") logger.console("") # Determine overall status has_xsd_errors = not pad_ok has_compliance_issues = pad_compliance["total_issues"] > 0 has_placeholders = len(auto_filled) > 0 has_fhir_errors = not fhir_ok xsd_error_count = sum(1 for msg in pad_msgs if msg.strip().startswith("Error ")) # Status if not has_fhir_errors and not has_xsd_errors and not has_compliance_issues and not has_placeholders: logger.console("Status: ✓ SUCCESS") elif not has_fhir_errors and not has_xsd_errors and not has_compliance_issues and has_placeholders: logger.console("Status: ✓ SUCCESS (with placeholders)") elif not has_fhir_errors and not has_xsd_errors and has_compliance_issues: logger.console("Status: ⚠ PARTIAL SUCCESS") elif not has_fhir_errors and has_xsd_errors: logger.console("Status: ✗ FAILED") else: logger.console("Status: ✗ FAILED") logger.console("") logger.console("Validation Results:") # Validation details logger.console(f" {'✓' if fhir_ok else '✗'} FHIR structure {'valid' if fhir_ok else 'invalid'}") if has_xsd_errors: logger.console(f" ✗ XSD validation failed ({xsd_error_count} error(s))") else: logger.console(f" ✓ XSD schema compliant") if has_compliance_issues: logger.console(f" ⚠ {pad_compliance['total_issues']} compliance issue(s) found") if has_placeholders: logger.console(f" ⚠ {len(auto_filled)} field(s) auto-filled with placeholders") # Show top errors if there are XSD validation errors if has_xsd_errors and xsd_error_count > 0: logger.console("") logger.console(f"Top Errors (first 3 of {xsd_error_count}):") error_count = 0 in_error_block = False for msg in pad_msgs: msg_stripped = msg.strip() if msg_stripped.startswith("Error ") and ":" in msg_stripped: in_error_block = True elif in_error_block and msg_stripped.startswith("Message:") and error_count < 3: error_msg = msg_stripped.replace("Message:", "").strip() logger.console(f" • {error_msg[:100]}{'...' if len(error_msg) > 100 else ''}") error_count += 1 in_error_block = False if error_count >= 3: break # Next steps logger.console("") logger.console("Next Steps:") if not has_fhir_errors and not has_xsd_errors and not has_compliance_issues and not has_placeholders: logger.console(" → File is ready for production use") elif not has_fhir_errors and not has_xsd_errors and not has_compliance_issues and has_placeholders: logger.console(" → Review auto-filled fields in log file") logger.console(" → Update FHIR data with missing information") elif not has_fhir_errors and not has_xsd_errors and has_compliance_issues: logger.console(" → Review compliance issues in log file") logger.console(" → File can be imported but may require manual review") elif not has_fhir_errors and has_xsd_errors: logger.console(" → Check log file for complete error list") logger.console(" → Ensure FHIR data has valid billing codes") logger.console(" → File cannot be imported - fix validation errors") else: logger.console(" → Fix FHIR input validation errors") logger.console(" → Cannot proceed with conversion") if log_file: logger.console("") logger.console(f"Details: {log_file}") logger.console("=" * 70) # Write log file logger.write_log_file() return report @app.command() def convert( input_json: Path = typer.Option( ..., "--input-json", "-i", exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True, help="Path to FHIR Bundle JSON file", ), output_dir: Path = typer.Option( ".", "--output-dir", "-o", file_okay=False, dir_okay=True, resolve_path=True, help="Directory to save output files", ), verbose: bool = typer.Option( False, "--verbose", "-v", help="Show detailed output on console (same as log file)", ), fhir_json_schema: Optional[Path] = typer.Option( None, "--fhir-json-schema", exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True, help="Optional path to FHIR JSON Schema", ), pad_xsd: Optional[Path] = typer.Option( None, "--pad-xsd", exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True, help="Optional path to PAD XML XSD schema", ), header_cfg: Optional[Path] = typer.Option( None, "--header-cfg", exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True, help="Optional path to header config JSON (fills static fields)", ), placeholder_cfg: Optional[Path] = typer.Option( None, "--placeholder-cfg", exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True, help="Optional path to placeholder config JSON (fills missing required fields)", ), mapping_config: Optional[Path] = typer.Option( "mapping_config.json", "--mapping-config", "-m", file_okay=True, dir_okay=False, resolve_path=True, help="Optional path to mapping config JSON", ), concept_maps: Optional[Path] = typer.Option( None, "--concept-maps", help="Path to a directory or file containing FHIR ConceptMaps", ), ): """ Convert FHIR Bundle JSON to PADneXt 2.12 XML format. Validates both input FHIR data and output PAD XML, producing detailed diagnostic reports with statistics and auto-filled field tracking. Example: python fhir_to_pad_converter.py -i input.json -o ./results """ # Enable verbose logging if requested if verbose: logger.setLevel(logging.DEBUG) # Convert Path objects to strings for compatibility input_json_str = str(input_json) fhir_schema_str = str(fhir_json_schema) if fhir_json_schema else None pad_xsd_str = str(pad_xsd) if pad_xsd else None logger.info(f"Input file: {input_json_str}") if fhir_schema_str: logger.info(f"FHIR schema: {fhir_schema_str}") if pad_xsd_str: logger.info(f"PAD XSD: {pad_xsd_str}") # Create timestamped output directory import os timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") output_dir_path = os.path.join(str(output_dir), f"result__{timestamp}") try: logger.info(f"Creating output directory: {output_dir_path}") output_dir = validate_directory_path(output_dir_path, must_exist=False, create=True) logger.info(f"Output directory created: {output_dir}") except (ValueError, PermissionError) as e: logger.error(f"Failed to create output directory: {e}") print(f"ERROR: Cannot create output directory: {e}") return 1 output_xml = os.path.join(output_dir, "output.xml") report_json = os.path.join(output_dir, "report.json") log_file = os.path.join(output_dir, "output.log") output_auf_xml = os.path.join(output_dir, "output_auf.xml") # Load and validate header config header_cfg_data = None if header_cfg: try: logger.info(f"Loading header config: {header_cfg}") with open(header_cfg, "r", encoding="utf-8") as hf: header_cfg_data = json.load(hf) # Validate config if validation is available if HAS_CONFIG_VALIDATION: logger.info("Validating header configuration") warnings = validate_header_config(header_cfg_data) for warning in warnings: logger.warning(f"Header config: {warning}") logger.info("Header config loaded successfully") except FileNotFoundError as e: logger.error(f"Header config file not found: {e}") typer.echo(f"ERROR: Header config file not found: {header_cfg}", err=True) raise typer.Exit(code=1) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in header config: {e}") typer.echo(f"ERROR: Invalid JSON in header config file: {e}", err=True) raise typer.Exit(code=1) except ValueError as e: logger.error(f"Header config validation failed: {e}") typer.echo(f"ERROR: Header config validation failed: {e}", err=True) raise typer.Exit(code=1) # Load and validate placeholder config placeholder_cfg_data = None if placeholder_cfg: try: logger.info(f"Loading placeholder config: {placeholder_cfg}") with open(placeholder_cfg, "r", encoding="utf-8") as pf: placeholder_cfg_data = json.load(pf) # Validate config if validation is available if HAS_CONFIG_VALIDATION: logger.info("Validating placeholder configuration") warnings = validate_placeholder_config(placeholder_cfg_data) for warning in warnings: logger.warning(f"Placeholder config: {warning}") logger.info("Placeholder config loaded successfully") except FileNotFoundError as e: logger.error(f"Placeholder config file not found: {e}") typer.echo(f"ERROR: Placeholder config file not found: {placeholder_cfg}", err=True) raise typer.Exit(code=1) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in placeholder config: {e}") typer.echo(f"ERROR: Invalid JSON in placeholder config file: {e}", err=True) raise typer.Exit(code=1) except ValueError as e: logger.error(f"Placeholder config validation failed: {e}") typer.echo(f"ERROR: Placeholder config validation failed: {e}", err=True) raise typer.Exit(code=1) # Load and validate mapping config mapping_cfg_data = None if mapping_config: try: logger.info(f"Loading mapping config: {mapping_config}") with open(mapping_config, "r", encoding="utf-8") as mf: mapping_cfg_data = json.load(mf) # Validate config if validation is available if HAS_CONFIG_VALIDATION: logger.info("Validating mapping configuration") warnings = validate_mapping_config(mapping_cfg_data) for warning in warnings: logger.warning(f"Mapping config: {warning}") logger.info("Mapping config loaded successfully") except FileNotFoundError: logger.warning(f"Mapping config file not found at {mapping_config}. Using empty mapping.") typer.echo(f"Warning: Mapping config file not found at {mapping_config}. Using empty mapping.") mapping_cfg_data = {} except json.JSONDecodeError as e: logger.error(f"Invalid JSON in mapping config: {e}") typer.echo(f"ERROR: Invalid JSON in mapping config file: {e}", err=True) raise typer.Exit(code=1) except ValueError as e: logger.error(f"Mapping config validation failed: {e}") typer.echo(f"ERROR: Mapping config validation failed: {e}", err=True) raise typer.Exit(code=1) # Use header from mapping config if available, otherwise use defaults if header_cfg_data is None: # Check if mapping_config has header section if mapping_cfg_data and "header" in mapping_cfg_data: logger.info("Using header configuration from mapping_config.json") header_cfg_data = mapping_cfg_data["header"] else: # Fallback to empty defaults logger.info("No header configuration provided, using empty defaults") header_cfg_data = { "nachrichtentyp_version": "1.0", "rechnungsersteller_name": "", "rechnungsersteller_kundennr": "12345", "rechnungsersteller_strasse": "", "rechnungsersteller_plz": "", "rechnungsersteller_ort": "", "rechnungsersteller_iknr": "", "leistungserbringer_id": "", "leistungserbringer_titel": "", "leistungserbringer_vorname": "", "leistungserbringer_name": "", "empfaenger_anrede": "", "empfaenger_vorname": "", "empfaenger_name": "", "empfaenger_strasse": "", "empfaenger_plz": "", "empfaenger_ort": "", "behandlungsart": "", "vertragsart": "", "aktenzeichen": "", "diagnose_text": "", "diagnose_datum": "", "eabgabe": "", "aisaktenzeichen": "", "aisendbetrag": "", } # Default placeholder config if none provided if placeholder_cfg_data is None: placeholder_cfg_data = { "rechnungsersteller": { "name": "UNKNOWN", "plz": "00000", "ort": "UNKNOWN", "strasse": "UNKNOWN" }, "leistungserbringer": { "vorname": "UNKNOWN", "name": "UNKNOWN", "titel": None }, "empfaenger": { "anrede": "Ohne Anrede", "vorname": "UNKNOWN", "name": "UNKNOWN", "plz": "00000", "ort": "UNKNOWN", "strasse": "UNKNOWN", "gebdatum": "1900-01-01", "geschlecht": "u" }, "rechnung": { "eabgabe": None, "aisaktenzeichen": None, "aisendbetrag": None }, "abrechnungsfall": { "behandlungsart": "0", "vertragsart": "1" } } # Run conversion with error handling try: logger.info("Starting FHIR to PADneXt conversion") run( input_json=input_json_str, output_xml=output_xml, report_json=report_json, output_auf_xml=output_auf_xml, fhir_json_schema=fhir_schema_str, pad_xsd=pad_xsd_str, header_cfg=header_cfg_data, placeholder_cfg=placeholder_cfg_data, mapping_config=mapping_cfg_data, concept_maps=str(concept_maps) if concept_maps else None, log_file=log_file, verbose=verbose, ) logger.info("Conversion completed successfully") except FileNotFoundError as e: logger.error(f"File not found: {e}") typer.echo(f"ERROR: File not found: {e}", err=True) raise typer.Exit(code=1) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in input file: {e}") typer.echo(f"ERROR: Invalid JSON in input file: {e}", err=True) raise typer.Exit(code=1) except PermissionError as e: logger.error(f"Permission denied: {e}") typer.echo(f"ERROR: Permission denied: {e}", err=True) raise typer.Exit(code=1) except Exception as e: logger.exception(f"Unexpected error during conversion: {e}") typer.echo(f"ERROR: Unexpected error during conversion: {e}", err=True) typer.echo("See log file for detailed traceback", err=True) raise typer.Exit(code=1) if __name__ == "__main__": app()