typer integration and header for PAD AUF

This commit is contained in:
Alexander Domene
2025-10-27 09:44:07 +01:00
parent 8650bd09a3
commit 7c07d80747
48 changed files with 15010 additions and 145 deletions

View File

@@ -12,11 +12,12 @@ Usage:
[--report-json report.json] [--fhir-json-schema fhir.schema.json] [--pad-xsd pad.xsd]
"""
import argparse
import json
import logging
from collections import Counter, defaultdict
from typing import Any, Dict, List, Optional, Tuple
from pathlib import Path
import typer
from translator import CodeTranslator
from utils import (
parse_iso_date, format_iso_date, get_ref_id, ensure_text, collect_effective_dates,
@@ -77,6 +78,14 @@ except Exception as e:
PAD_NS = "http://padinfo.de/ns/pad"
# Create Typer CLI application
app = typer.Typer(
name="fhir2pad",
help="FHIR JSON to PADneXt XML converter with validation & statistics",
add_completion=True,
rich_markup_mode="rich"
)
# ----------------------------
# FHIR validation & stats
# ----------------------------
@@ -1393,51 +1402,118 @@ def run(input_json: str, output_xml: str, report_json: Optional[str] = None,
return report
def main():
p = argparse.ArgumentParser(description="FHIR JSON -> PAD XML converter with validation & stats")
p.add_argument("--input-json", required=True, help="Path to FHIR Bundle JSON")
p.add_argument("--output-dir", default=".", help="Directory to save output files")
p.add_argument("--verbose", action="store_true", help="Show detailed output on console (same as log file)")
p.add_argument("--fhir-json-schema", default=None, help="Optional path to FHIR JSON Schema")
p.add_argument("--pad-xsd", default=None, help="Optional path to PAD XML XSD")
p.add_argument("--header-cfg", default=None, help="Optional path to header config JSON (fills static fields)")
p.add_argument("--placeholder-cfg", default=None, help="Optional path to placeholder config JSON (fills missing required fields)")
p.add_argument("--mapping-config", default="mapping_config.json", help="Optional path to mapping config JSON")
p.add_argument("--concept-maps", default=None, help="Path to a directory or a single file for FHIR ConceptMaps")
args = p.parse_args()
@app.command()
def convert(
input_json: Path = typer.Option(
...,
"--input-json",
"-i",
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True,
help="Path to FHIR Bundle JSON file",
),
output_dir: Path = typer.Option(
".",
"--output-dir",
"-o",
file_okay=False,
dir_okay=True,
resolve_path=True,
help="Directory to save output files",
),
verbose: bool = typer.Option(
False,
"--verbose",
"-v",
help="Show detailed output on console (same as log file)",
),
fhir_json_schema: Optional[Path] = typer.Option(
None,
"--fhir-json-schema",
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True,
help="Optional path to FHIR JSON Schema",
),
pad_xsd: Optional[Path] = typer.Option(
None,
"--pad-xsd",
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True,
help="Optional path to PAD XML XSD schema",
),
header_cfg: Optional[Path] = typer.Option(
None,
"--header-cfg",
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True,
help="Optional path to header config JSON (fills static fields)",
),
placeholder_cfg: Optional[Path] = typer.Option(
None,
"--placeholder-cfg",
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True,
help="Optional path to placeholder config JSON (fills missing required fields)",
),
mapping_config: Optional[Path] = typer.Option(
"mapping_config.json",
"--mapping-config",
"-m",
file_okay=True,
dir_okay=False,
resolve_path=True,
help="Optional path to mapping config JSON",
),
concept_maps: Optional[Path] = typer.Option(
None,
"--concept-maps",
help="Path to a directory or file containing FHIR ConceptMaps",
),
):
"""
Convert FHIR Bundle JSON to PADneXt 2.12 XML format.
Validates both input FHIR data and output PAD XML, producing detailed
diagnostic reports with statistics and auto-filled field tracking.
Example:
python fhir_to_pad_converter.py -i input.json -o ./results
"""
# Enable verbose logging if requested
if args.verbose:
if verbose:
logger.setLevel(logging.DEBUG)
try:
# Validate input file path
logger.info(f"Validating input file: {args.input_json}")
input_json = validate_file_path(args.input_json, must_exist=True, check_readable=True)
logger.info(f"Input file validated: {input_json}")
# Convert Path objects to strings for compatibility
input_json_str = str(input_json)
fhir_schema_str = str(fhir_json_schema) if fhir_json_schema else None
pad_xsd_str = str(pad_xsd) if pad_xsd else None
# Validate schema paths if provided
fhir_schema = None
if args.fhir_json_schema:
logger.info(f"Validating FHIR schema path: {args.fhir_json_schema}")
fhir_schema = validate_file_path(args.fhir_json_schema, must_exist=True)
logger.info(f"FHIR schema validated: {fhir_schema}")
pad_xsd = None
if args.pad_xsd:
logger.info(f"Validating PAD XSD path: {args.pad_xsd}")
pad_xsd = validate_file_path(args.pad_xsd, must_exist=True)
logger.info(f"PAD XSD validated: {pad_xsd}")
except (ValueError, FileNotFoundError, PermissionError) as e:
logger.error(f"Input validation failed: {e}")
print(f"ERROR: {e}")
return 1
logger.info(f"Input file: {input_json_str}")
if fhir_schema_str:
logger.info(f"FHIR schema: {fhir_schema_str}")
if pad_xsd_str:
logger.info(f"PAD XSD: {pad_xsd_str}")
# Create timestamped output directory
import os
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_dir_path = os.path.join(args.output_dir, f"result__{timestamp}")
output_dir_path = os.path.join(str(output_dir), f"result__{timestamp}")
try:
logger.info(f"Creating output directory: {output_dir_path}")
@@ -1454,128 +1530,132 @@ def main():
output_auf_xml = os.path.join(output_dir, "output_auf.xml")
# Load and validate header config
header_cfg = None
if args.header_cfg:
header_cfg_data = None
if header_cfg:
try:
logger.info(f"Loading header config: {args.header_cfg}")
header_cfg_path = validate_file_path(args.header_cfg, must_exist=True)
with open(header_cfg_path, "r", encoding="utf-8") as hf:
header_cfg = json.load(hf)
logger.info(f"Loading header config: {header_cfg}")
with open(header_cfg, "r", encoding="utf-8") as hf:
header_cfg_data = json.load(hf)
# Validate config if validation is available
if HAS_CONFIG_VALIDATION:
logger.info("Validating header configuration")
warnings = validate_header_config(header_cfg)
warnings = validate_header_config(header_cfg_data)
for warning in warnings:
logger.warning(f"Header config: {warning}")
logger.info("Header config loaded successfully")
except FileNotFoundError as e:
logger.error(f"Header config file not found: {e}")
print(f"ERROR: Header config file not found: {args.header_cfg}")
return 1
typer.echo(f"ERROR: Header config file not found: {header_cfg}", err=True)
raise typer.Exit(code=1)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in header config: {e}")
print(f"ERROR: Invalid JSON in header config file: {e}")
return 1
typer.echo(f"ERROR: Invalid JSON in header config file: {e}", err=True)
raise typer.Exit(code=1)
except ValueError as e:
logger.error(f"Header config validation failed: {e}")
print(f"ERROR: Header config validation failed: {e}")
return 1
typer.echo(f"ERROR: Header config validation failed: {e}", err=True)
raise typer.Exit(code=1)
# Load and validate placeholder config
placeholder_cfg = None
if args.placeholder_cfg:
placeholder_cfg_data = None
if placeholder_cfg:
try:
logger.info(f"Loading placeholder config: {args.placeholder_cfg}")
placeholder_cfg_path = validate_file_path(args.placeholder_cfg, must_exist=True)
with open(placeholder_cfg_path, "r", encoding="utf-8") as pf:
placeholder_cfg = json.load(pf)
logger.info(f"Loading placeholder config: {placeholder_cfg}")
with open(placeholder_cfg, "r", encoding="utf-8") as pf:
placeholder_cfg_data = json.load(pf)
# Validate config if validation is available
if HAS_CONFIG_VALIDATION:
logger.info("Validating placeholder configuration")
warnings = validate_placeholder_config(placeholder_cfg)
warnings = validate_placeholder_config(placeholder_cfg_data)
for warning in warnings:
logger.warning(f"Placeholder config: {warning}")
logger.info("Placeholder config loaded successfully")
except FileNotFoundError as e:
logger.error(f"Placeholder config file not found: {e}")
print(f"ERROR: Placeholder config file not found: {args.placeholder_cfg}")
return 1
typer.echo(f"ERROR: Placeholder config file not found: {placeholder_cfg}", err=True)
raise typer.Exit(code=1)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in placeholder config: {e}")
print(f"ERROR: Invalid JSON in placeholder config file: {e}")
return 1
typer.echo(f"ERROR: Invalid JSON in placeholder config file: {e}", err=True)
raise typer.Exit(code=1)
except ValueError as e:
logger.error(f"Placeholder config validation failed: {e}")
print(f"ERROR: Placeholder config validation failed: {e}")
return 1
typer.echo(f"ERROR: Placeholder config validation failed: {e}", err=True)
raise typer.Exit(code=1)
# Load and validate mapping config
mapping_cfg = None
if args.mapping_config:
mapping_cfg_data = None
if mapping_config:
try:
logger.info(f"Loading mapping config: {args.mapping_config}")
mapping_cfg_path = validate_file_path(args.mapping_config, must_exist=True)
with open(mapping_cfg_path, "r", encoding="utf-8") as mf:
mapping_cfg = json.load(mf)
logger.info(f"Loading mapping config: {mapping_config}")
with open(mapping_config, "r", encoding="utf-8") as mf:
mapping_cfg_data = json.load(mf)
# Validate config if validation is available
if HAS_CONFIG_VALIDATION:
logger.info("Validating mapping configuration")
warnings = validate_mapping_config(mapping_cfg)
warnings = validate_mapping_config(mapping_cfg_data)
for warning in warnings:
logger.warning(f"Mapping config: {warning}")
logger.info("Mapping config loaded successfully")
except FileNotFoundError:
logger.warning(f"Mapping config file not found at {args.mapping_config}. Using empty mapping.")
print(f"Warning: Mapping config file not found at {args.mapping_config}. Using empty mapping.")
mapping_cfg = {}
logger.warning(f"Mapping config file not found at {mapping_config}. Using empty mapping.")
typer.echo(f"Warning: Mapping config file not found at {mapping_config}. Using empty mapping.")
mapping_cfg_data = {}
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in mapping config: {e}")
print(f"ERROR: Invalid JSON in mapping config file: {e}")
return 1
typer.echo(f"ERROR: Invalid JSON in mapping config file: {e}", err=True)
raise typer.Exit(code=1)
except ValueError as e:
logger.error(f"Mapping config validation failed: {e}")
print(f"ERROR: Mapping config validation failed: {e}")
return 1
typer.echo(f"ERROR: Mapping config validation failed: {e}", err=True)
raise typer.Exit(code=1)
# Sensible defaults if no header config is provided
if header_cfg is None:
header_cfg = {
"nachrichtentyp_version": "1.0",
"rechnungsersteller_name": "",
"rechnungsersteller_kundennr": "12345",
"rechnungsersteller_strasse": "",
"rechnungsersteller_plz": "",
"rechnungsersteller_ort": "",
"rechnungsersteller_iknr": "",
"leistungserbringer_id": "",
"leistungserbringer_titel": "",
"leistungserbringer_vorname": "",
"leistungserbringer_name": "",
"empfaenger_anrede": "",
"empfaenger_vorname": "",
"empfaenger_name": "",
"empfaenger_strasse": "",
"empfaenger_plz": "",
"empfaenger_ort": "",
"behandlungsart": "",
"vertragsart": "",
"aktenzeichen": "",
"diagnose_text": "",
"diagnose_datum": "",
"eabgabe": "",
"aisaktenzeichen": "",
"aisendbetrag": "",
}
# Use header from mapping config if available, otherwise use defaults
if header_cfg_data is None:
# Check if mapping_config has header section
if mapping_cfg_data and "header" in mapping_cfg_data:
logger.info("Using header configuration from mapping_config.json")
header_cfg_data = mapping_cfg_data["header"]
else:
# Fallback to empty defaults
logger.info("No header configuration provided, using empty defaults")
header_cfg_data = {
"nachrichtentyp_version": "1.0",
"rechnungsersteller_name": "",
"rechnungsersteller_kundennr": "12345",
"rechnungsersteller_strasse": "",
"rechnungsersteller_plz": "",
"rechnungsersteller_ort": "",
"rechnungsersteller_iknr": "",
"leistungserbringer_id": "",
"leistungserbringer_titel": "",
"leistungserbringer_vorname": "",
"leistungserbringer_name": "",
"empfaenger_anrede": "",
"empfaenger_vorname": "",
"empfaenger_name": "",
"empfaenger_strasse": "",
"empfaenger_plz": "",
"empfaenger_ort": "",
"behandlungsart": "",
"vertragsart": "",
"aktenzeichen": "",
"diagnose_text": "",
"diagnose_datum": "",
"eabgabe": "",
"aisaktenzeichen": "",
"aisendbetrag": "",
}
# Default placeholder config if none provided
if placeholder_cfg is None:
placeholder_cfg = {
if placeholder_cfg_data is None:
placeholder_cfg_data = {
"rechnungsersteller": {
"name": "UNKNOWN",
"plz": "00000",
@@ -1612,39 +1692,38 @@ def main():
try:
logger.info("Starting FHIR to PADneXt conversion")
run(
input_json=input_json,
input_json=input_json_str,
output_xml=output_xml,
report_json=report_json,
output_auf_xml=output_auf_xml,
fhir_json_schema=fhir_schema,
pad_xsd=pad_xsd,
header_cfg=header_cfg,
placeholder_cfg=placeholder_cfg,
mapping_config=mapping_cfg,
concept_maps=args.concept_maps,
fhir_json_schema=fhir_schema_str,
pad_xsd=pad_xsd_str,
header_cfg=header_cfg_data,
placeholder_cfg=placeholder_cfg_data,
mapping_config=mapping_cfg_data,
concept_maps=str(concept_maps) if concept_maps else None,
log_file=log_file,
verbose=args.verbose,
verbose=verbose,
)
logger.info("Conversion completed successfully")
return 0
except FileNotFoundError as e:
logger.error(f"File not found: {e}")
print(f"ERROR: File not found: {e}")
return 1
typer.echo(f"ERROR: File not found: {e}", err=True)
raise typer.Exit(code=1)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in input file: {e}")
print(f"ERROR: Invalid JSON in input file: {e}")
return 1
typer.echo(f"ERROR: Invalid JSON in input file: {e}", err=True)
raise typer.Exit(code=1)
except PermissionError as e:
logger.error(f"Permission denied: {e}")
print(f"ERROR: Permission denied: {e}")
return 1
typer.echo(f"ERROR: Permission denied: {e}", err=True)
raise typer.Exit(code=1)
except Exception as e:
logger.exception(f"Unexpected error during conversion: {e}")
print(f"ERROR: Unexpected error during conversion: {e}")
print("See log file for detailed traceback")
return 1
typer.echo(f"ERROR: Unexpected error during conversion: {e}", err=True)
typer.echo("See log file for detailed traceback", err=True)
raise typer.Exit(code=1)
if __name__ == "__main__":
main()
app()