Skip to content

DQA (Validation) API Reference

CLIFpy's Data Quality Assessment (DQA) module provides comprehensive validation organized around three pillars: Conformance, Completeness, and Plausibility. All checks support dual backends (Polars and DuckDB) and return structured result objects.

For a user-guide introduction, see Data Quality Assessment (DQA).


Result Classes

clifpy.utils.validator.DQAConformanceResult

DQAConformanceResult(check_type, table_name)

Container for DQA conformance check results.

Source code in clifpy/utils/validator.py
def __init__(self, check_type: str, table_name: str):
    self.check_type = check_type
    self.table_name = table_name
    self.passed = True
    self.errors: List[Dict[str, Any]] = []
    self.warnings: List[Dict[str, Any]] = []
    self.info: List[Dict[str, Any]] = []
    self.metrics: Dict[str, Any] = {}
    # Atomic-granularity scoring. When a check examines N atomic units
    # (e.g. permissible values, rule×column pairs) but rolls up to fewer
    # messages, it sets these so downstream scores reflect real work
    # done rather than message count. Populated by the check from its
    # own iteration; never hardcoded. None means "fall back to message
    # count" in collect_dqa_issues.
    self.atomic_total: Optional[int] = None
    self.atomic_passed: Optional[int] = None

clifpy.utils.validator.DQACompletenessResult

DQACompletenessResult(check_type, table_name)

Container for DQA completeness check results.

Source code in clifpy/utils/validator.py
def __init__(self, check_type: str, table_name: str):
    self.check_type = check_type
    self.table_name = table_name
    self.passed = True
    self.errors: List[Dict[str, Any]] = []
    self.warnings: List[Dict[str, Any]] = []
    self.info: List[Dict[str, Any]] = []
    self.metrics: Dict[str, Any] = {}
    self.atomic_total: Optional[int] = None
    self.atomic_passed: Optional[int] = None

clifpy.utils.validator.DQAPlausibilityResult

DQAPlausibilityResult(check_type, table_name)

Container for DQA plausibility check results.

Source code in clifpy/utils/validator.py
def __init__(self, check_type: str, table_name: str):
    self.check_type = check_type
    self.table_name = table_name
    self.passed = True
    self.errors: List[Dict[str, Any]] = []
    self.warnings: List[Dict[str, Any]] = []
    self.info: List[Dict[str, Any]] = []
    self.metrics: Dict[str, Any] = {}
    self.atomic_total: Optional[int] = None
    self.atomic_passed: Optional[int] = None

Conformance Checks

Conformance checks verify that data matches expected structure, schema, types, and allowed values.

A.1 — Table Presence

clifpy.utils.validator.check_table_exists

check_table_exists(table_path, table_name, filetype='parquet')

Check if a table file exists at the specified path.

Parameters:

Name Type Description Default
table_path str or Path

Directory containing the table files

required
table_name str

Name of the table to check

required
filetype str

File extension (parquet, csv, etc.)

'parquet'

Returns:

Type Description
DQAConformanceResult

Result object with check status

Source code in clifpy/utils/validator.py
def check_table_exists(
    table_path: Union[str, Path],
    table_name: str,
    filetype: str = 'parquet'
) -> DQAConformanceResult:
    """
    Check if a table file exists at the specified path.

    Parameters
    ----------
    table_path : str or Path
        Directory containing the table files
    table_name : str
        Name of the table to check
    filetype : str
        File extension (parquet, csv, etc.)

    Returns
    -------
    DQAConformanceResult
        Result object with check status
    """
    result = DQAConformanceResult("table_exists", table_name)

    table_path = Path(table_path)
    expected_file = table_path / f"{table_name}.{filetype}"

    if expected_file.exists():
        result.add_info(f"Table file found: {expected_file}")
        result.metrics["file_path"] = str(expected_file)
        result.metrics["file_size_mb"] = expected_file.stat().st_size / (1024 * 1024)
    else:
        result.add_error(
            f"Table file not found: {expected_file}",
            {"expected_path": str(expected_file)}
        )

    result.atomic_total = 1
    result.atomic_passed = 1 if not result.errors else 0
    return result

clifpy.utils.validator.check_table_presence

check_table_presence(df, table_name)

Check that a loaded DataFrame has rows and columns.

Parameters:

Name Type Description Default
df pd.DataFrame, pl.DataFrame, or pl.LazyFrame

Data to validate (already loaded)

required
table_name str

Name of the table

required
Source code in clifpy/utils/validator.py
def check_table_presence(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str
) -> DQAConformanceResult:
    """
    Check that a loaded DataFrame has rows and columns.

    Parameters
    ----------
    df : pd.DataFrame, pl.DataFrame, or pl.LazyFrame
        Data to validate (already loaded)
    table_name : str
        Name of the table
    """
    _logger.debug("check_table_presence: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_table_presence_polars(df, table_name)
    else:
        result = check_table_presence_duckdb(df, table_name)
    _logger.debug("check_table_presence: table '%s' — rows=%s, cols=%s",
                  table_name, result.metrics.get("row_count"), result.metrics.get("column_count"))
    return result

A.2 — Required Columns

clifpy.utils.validator.check_required_columns

check_required_columns(df, schema, table_name)

Check if all required columns are present.

Parameters:

Name Type Description Default
df pd.DataFrame, pl.DataFrame, or pl.LazyFrame

Data to validate (already loaded)

required
schema dict

Table schema containing required_columns

required
table_name str

Name of the table

required
Source code in clifpy/utils/validator.py
def check_required_columns(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> DQAConformanceResult:
    """
    Check if all required columns are present.

    Parameters
    ----------
    df : pd.DataFrame, pl.DataFrame, or pl.LazyFrame
        Data to validate (already loaded)
    schema : dict
        Table schema containing required_columns
    table_name : str
        Name of the table
    """
    _logger.debug("check_required_columns: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_required_columns_polars(df, schema, table_name)
    else:
        result = check_required_columns_duckdb(df, schema, table_name)
    if result.metrics.get("total_missing", 0) > 0:
        _logger.info("check_required_columns: table '%s' missing %d of %d required columns",
                     table_name, result.metrics["total_missing"], result.metrics.get("total_required", 0))
    _logger.debug("check_required_columns: table '%s' — required=%s, present=%s, missing=%s",
                  table_name, result.metrics.get("total_required"),
                  result.metrics.get("total_present"), result.metrics.get("total_missing"))
    return result

B.1 — Data Types

clifpy.utils.validator.check_column_dtypes

check_column_dtypes(df, schema, table_name)

Check if columns have correct data types.

Parameters:

Name Type Description Default
df pd.DataFrame, pl.DataFrame, or pl.LazyFrame

Data to validate (already loaded)

required
schema dict

Table schema

required
table_name str

Name of the table

required
Source code in clifpy/utils/validator.py
def check_column_dtypes(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> DQAConformanceResult:
    """
    Check if columns have correct data types.

    Parameters
    ----------
    df : pd.DataFrame, pl.DataFrame, or pl.LazyFrame
        Data to validate (already loaded)
    schema : dict
        Table schema
    table_name : str
        Name of the table
    """
    _logger.debug("check_column_dtypes: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_column_dtypes_polars(df, schema, table_name)
    else:
        result = check_column_dtypes_duckdb(df, schema, table_name)
    _logger.debug("check_column_dtypes: table '%s' — checked=%s, errors=%s, warnings=%s",
                  table_name, result.metrics.get("columns_checked"),
                  result.metrics.get("dtype_errors"), result.metrics.get("dtype_warnings"))
    return result

B.2 — Datetime Format

clifpy.utils.validator.check_datetime_format

check_datetime_format(df, schema, table_name, expected_tz='UTC')

Validate datetime columns are in correct format.

Source code in clifpy/utils/validator.py
def check_datetime_format(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str,
    expected_tz: str = 'UTC'
) -> DQAConformanceResult:
    """Validate datetime columns are in correct format."""
    _logger.debug("check_datetime_format: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_datetime_format_polars(df, schema, table_name, expected_tz)
    else:
        result = check_datetime_format_duckdb(df, schema, table_name, expected_tz)
    _logger.debug("check_datetime_format: table '%s' — columns_checked=%s",
                  table_name, result.metrics.get("datetime_columns_checked"))
    return result

B.3 — Lab Reference Units

clifpy.utils.validator.check_lab_reference_units

check_lab_reference_units(df, schema, table_name='labs')

Check if lab reference units match schema definitions.

Source code in clifpy/utils/validator.py
def check_lab_reference_units(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str = 'labs'
) -> DQAConformanceResult:
    """Check if lab reference units match schema definitions."""
    _logger.debug("check_lab_reference_units: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_lab_reference_units_polars(df, schema, table_name)
    else:
        result = check_lab_reference_units_duckdb(df, schema, table_name)
    _logger.debug("check_lab_reference_units: table '%s' — valid=%s, invalid_categories=%s",
                  table_name, result.metrics.get("valid_units"),
                  result.metrics.get("invalid_unit_categories"))
    return result

B.4 — Categorical Values

clifpy.utils.validator.check_categorical_values

check_categorical_values(df, schema, table_name)

Check if categorical values match mCIDE permissible values.

Source code in clifpy/utils/validator.py
def check_categorical_values(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> DQAConformanceResult:
    """Check if categorical values match mCIDE permissible values."""
    _logger.debug("check_categorical_values: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_categorical_values_polars(df, schema, table_name)
    else:
        result = check_categorical_values_duckdb(df, schema, table_name)
    _logger.debug("check_categorical_values: table '%s' — columns_checked=%s, columns_with_invalid=%s",
                  table_name, result.metrics.get("category_columns_checked"),
                  result.metrics.get("columns_with_invalid_values"))
    return result

B.5 — Category-to-Group Mapping

clifpy.utils.validator.check_category_group_mapping

check_category_group_mapping(df, schema, table_name)

Check if category-to-group mappings match schema definitions.

Source code in clifpy/utils/validator.py
def check_category_group_mapping(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> DQAConformanceResult:
    """Check if category-to-group mappings match schema definitions."""
    _logger.debug("check_category_group_mapping: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_category_group_mapping_polars(df, schema, table_name)
    else:
        result = check_category_group_mapping_duckdb(df, schema, table_name)
    _logger.debug("check_category_group_mapping: table '%s' — completed", table_name)
    return result

Completeness Checks

Completeness checks evaluate missing data, conditional requirements, and referential coverage.

A.1 — Missingness

clifpy.utils.validator.check_missingness

check_missingness(df, schema, table_name, error_threshold=50.0, warning_threshold=10.0)

Check missingness in required columns.

Parameters:

Name Type Description Default
df pd.DataFrame, pl.DataFrame, or pl.LazyFrame

Data to validate (already loaded)

required
schema dict

Table schema containing required_columns

required
table_name str

Name of the table

required
error_threshold float

Percent missing above which an error is raised

50.0
warning_threshold float

Percent missing above which a warning is raised

10.0

Returns:

Type Description
DQACompletenessResult

Result containing missingness statistics

Source code in clifpy/utils/validator.py
def check_missingness(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str,
    error_threshold: float = 50.0,
    warning_threshold: float = 10.0
) -> DQACompletenessResult:
    """
    Check missingness in required columns.

    Parameters
    ----------
    df : pd.DataFrame, pl.DataFrame, or pl.LazyFrame
        Data to validate (already loaded)
    schema : dict
        Table schema containing required_columns
    table_name : str
        Name of the table
    error_threshold : float
        Percent missing above which an error is raised
    warning_threshold : float
        Percent missing above which a warning is raised

    Returns
    -------
    DQACompletenessResult
        Result containing missingness statistics
    """
    _logger.debug("check_missingness: starting for table '%s' (error_threshold=%.1f%%, warning_threshold=%.1f%%)",
                  table_name, error_threshold, warning_threshold)
    if _ACTIVE_BACKEND == 'polars':
        result = check_missingness_polars(df, schema, table_name, error_threshold, warning_threshold)
    else:
        result = check_missingness_duckdb(df, schema, table_name, error_threshold, warning_threshold)
    if result.errors:
        for err in result.errors:
            _logger.info("check_missingness: table '%s' — %s", table_name, err["message"])
    _logger.debug("check_missingness: table '%s' — columns_checked=%s",
                  table_name, result.metrics.get("required_columns_checked"))
    return result

A.2 — Conditional Requirements

clifpy.utils.validator.check_conditional_requirements

check_conditional_requirements(df, table_name, conditions=None)

Check conditional required fields.

Source code in clifpy/utils/validator.py
def check_conditional_requirements(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    conditions: Optional[List[Dict[str, Any]]] = None
) -> DQACompletenessResult:
    """Check conditional required fields."""
    n_conditions = len(conditions) if conditions else 0
    _logger.debug("check_conditional_requirements: starting for table '%s' (%d explicit conditions)",
                  table_name, n_conditions)
    if _ACTIVE_BACKEND == 'polars':
        result = check_conditional_requirements_polars(df, table_name, conditions)
    else:
        result = check_conditional_requirements_duckdb(df, table_name, conditions)
    if result.warnings:
        _logger.info("check_conditional_requirements: table '%s' — %d violations found",
                     table_name, len(result.warnings))
    _logger.debug("check_conditional_requirements: table '%s' complete", table_name)
    return result

B — mCIDE Value Coverage

clifpy.utils.validator.check_mcide_value_coverage

check_mcide_value_coverage(df, schema, table_name)

Check if all mCIDE standardized values are present in the data.

Source code in clifpy/utils/validator.py
def check_mcide_value_coverage(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> DQACompletenessResult:
    """Check if all mCIDE standardized values are present in the data."""
    _logger.debug("check_mcide_value_coverage: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_mcide_value_coverage_polars(df, schema, table_name)
    else:
        result = check_mcide_value_coverage_duckdb(df, schema, table_name)
    _logger.debug("check_mcide_value_coverage: table '%s' — columns_checked=%s",
                  table_name, result.metrics.get("category_columns_checked"))
    return result

C.1 — Relational Integrity

clifpy.utils.validator.check_relational_integrity

check_relational_integrity(target_df, reference_df, target_table, reference_table, key_column)

Check bidirectional relational integrity between tables.

Runs the backend-specific check in both directions: - Forward (reference → target): What percentage of reference IDs appear in the target table? (e.g., "what % of hospitalizations have labs?") - Reverse (target → reference): What percentage of target IDs exist in the reference table? (e.g., "what % of lab hosp_ids are valid?")

Parameters:

Name Type Description Default
target_df DataFrame

The target table (e.g., labs).

required
reference_df DataFrame

The reference table (e.g., hospitalization).

required
target_table str

Name of the target table.

required
reference_table str

Name of the reference table.

required
key_column str

The shared key column (e.g., hospitalization_id).

required

Returns:

Type Description
DQACompletenessResult

Consolidated result with forward/reverse coverage metrics.

Source code in clifpy/utils/validator.py
def check_relational_integrity(
    target_df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    reference_df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    target_table: str,
    reference_table: str,
    key_column: str
) -> DQACompletenessResult:
    """Check bidirectional relational integrity between tables.

    Runs the backend-specific check in both directions:
    - **Forward** (reference → target): What percentage of reference IDs
      appear in the target table?  (e.g., "what % of hospitalizations
      have labs?")
    - **Reverse** (target → reference): What percentage of target IDs
      exist in the reference table?  (e.g., "what % of lab hosp_ids are
      valid?")

    Parameters
    ----------
    target_df : DataFrame
        The target table (e.g., labs).
    reference_df : DataFrame
        The reference table (e.g., hospitalization).
    target_table : str
        Name of the target table.
    reference_table : str
        Name of the reference table.
    key_column : str
        The shared key column (e.g., ``hospitalization_id``).

    Returns
    -------
    DQACompletenessResult
        Consolidated result with forward/reverse coverage metrics.
    """
    _logger.debug(
        "check_relational_integrity: '%s' <-> '%s' on key '%s'",
        target_table, reference_table, key_column,
    )
    result = DQACompletenessResult(
        "relational_integrity",
        f"{target_table}<->{reference_table}",
    )

    # Pick the right backend dispatcher
    _backend_fn = (check_relational_integrity_polars
                   if _ACTIVE_BACKEND == 'polars'
                   else check_relational_integrity_duckdb)

    try:
        # Forward: reference → target  (source=reference, ref=target)
        fwd = _backend_fn(
            reference_df, target_df, reference_table, target_table, key_column
        )
        # Reverse: target → reference  (source=target, ref=reference)
        rev = _backend_fn(
            target_df, reference_df, target_table, reference_table, key_column
        )

        result.metrics["forward_coverage_percent"] = fwd.metrics.get("coverage_percent", 0)
        result.metrics["forward_orphan_ids"] = fwd.metrics.get("orphan_ids", 0)
        result.metrics["forward_reference_unique_ids"] = fwd.metrics.get("source_unique_ids", 0)
        result.metrics["reverse_coverage_percent"] = rev.metrics.get("coverage_percent", 0)
        result.metrics["reverse_orphan_ids"] = rev.metrics.get("orphan_ids", 0)
        result.metrics["reverse_target_unique_ids"] = rev.metrics.get("source_unique_ids", 0)

        # `fwd` here = reverse-cardinality direction ("does every reference
        # row have a child in the source?"). On inpatient data this commonly
        # fails legitimately — surface as warning, atomic_count=0.
        for w in fwd.warnings:
            fwd_details = dict(w.get("details", {}) or {})
            fwd_details.setdefault('atomic_count', 0)
            result.add_warning(w['message'], fwd_details)
        for e in fwd.errors:
            # A code-level error on the reverse-cardinality direction still
            # signals a real problem; propagate as-is.
            result.add_error(e['message'], e.get("details", {}))

        # `rev` = FK integrity direction ("does every source FK value resolve
        # in the reference?"). Apply thresholds: >10% orphans → ERROR,
        # >1% → WARNING, else silent pass.
        rev_coverage = result.metrics["reverse_coverage_percent"]
        rev_orphan_pct = 100 - rev_coverage
        for w in rev.warnings:
            if rev_orphan_pct > 10.0:
                result.add_error(w['message'], w.get("details", {}))
            elif rev_orphan_pct > 1.0:
                result.add_warning(w['message'], w.get("details", {}))
            # else: silent pass (≤1% orphans is acceptable drift)
        for e in rev.errors:
            result.add_error(e['message'], e.get("details", {}))

        # Info when both directions are clean (no errors AND no warnings)
        if fwd.passed and rev.passed and not fwd.warnings and not rev.warnings:
            result.add_info(
                f"Full bidirectional coverage for {key_column} between "
                f"{target_table} and {reference_table}"
            )

        _logger.info(
            "check_relational_integrity: '%s' <-> '%s' — "
            "fwd_coverage=%.1f%%, rev_coverage=%.1f%%",
            target_table, reference_table,
            result.metrics["forward_coverage_percent"],
            result.metrics["reverse_coverage_percent"],
        )

        # Atomic granularity: 1 per FK. Only the FK integrity direction
        # (`rev` in this dispatcher) contributes to the scored atom.
        result.atomic_total = 1
        result.atomic_passed = 1 if not result.errors else 0
    except Exception as e:
        _logger.error(
            "Check 'relational_integrity' failed for '%s' <-> '%s': %s",
            target_table, reference_table, e,
        )
        result.add_error(f"Error checking relational integrity: {str(e)}")
        if result.atomic_total is None:
            result.atomic_total = 1
            result.atomic_passed = 0

    return result

Plausibility Checks

Plausibility checks validate logical consistency, chronological order, and clinical reasonableness.

A.1 — Chronological Order

clifpy.utils.validator.check_chronological_order

check_chronological_order(df, table_name, chronological_rules=None, warning_threshold=0.0, error_threshold=10.0)

Check that datetime pairs follow expected chronological order.

Source code in clifpy/utils/validator.py
def check_chronological_order(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    chronological_rules: Optional[List[Dict[str, str]]] = None,
    warning_threshold: float = 0.0,
    error_threshold: float = 10.0,
) -> DQAPlausibilityResult:
    """Check that datetime pairs follow expected chronological order."""
    _logger.debug("check_chronological_order: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_chronological_order_polars(df, table_name, chronological_rules,
                                                warning_threshold=warning_threshold,
                                                error_threshold=error_threshold)
    else:
        result = check_chronological_order_duckdb(df, table_name, chronological_rules,
                                                warning_threshold=warning_threshold,
                                                error_threshold=error_threshold)
    _logger.debug("check_chronological_order: table '%s' — pairs_checked=%s",
                  table_name, result.metrics.get("pairs_checked"))
    return result

A.2 — Numeric Range Plausibility

clifpy.utils.validator.check_numeric_range_plausibility

check_numeric_range_plausibility(df, table_name, outlier_config=None, warning_threshold=0.0, error_threshold=10.0)

Check numeric values are within plausible ranges.

Source code in clifpy/utils/validator.py
def check_numeric_range_plausibility(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    outlier_config: Optional[Dict[str, Any]] = None,
    warning_threshold: float = 0.0,
    error_threshold: float = 10.0,
) -> DQAPlausibilityResult:
    """Check numeric values are within plausible ranges."""
    _logger.debug("check_numeric_range_plausibility: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_numeric_range_plausibility_polars(df, table_name, outlier_config,
                                                         warning_threshold=warning_threshold,
                                                         error_threshold=error_threshold)
    else:
        result = check_numeric_range_plausibility_duckdb(df, table_name, outlier_config,
                                                         warning_threshold=warning_threshold,
                                                         error_threshold=error_threshold)
    _logger.debug("check_numeric_range_plausibility: table '%s' — columns_checked=%s",
                  table_name, result.metrics.get("columns_checked"))
    return result

A.3 — Field-Level Plausibility

clifpy.utils.validator.check_field_plausibility

check_field_plausibility(df, table_name, rules=None)

Check field-level plausibility constraints.

Source code in clifpy/utils/validator.py
def check_field_plausibility(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    rules: Optional[List[Dict[str, Any]]] = None,
) -> DQAPlausibilityResult:
    """Check field-level plausibility constraints."""
    _logger.debug("check_field_plausibility: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_field_plausibility_polars(df, table_name, rules)
    else:
        result = check_field_plausibility_duckdb(df, table_name, rules)
    _logger.debug("check_field_plausibility: table '%s' — rules_checked=%s",
                  table_name, result.metrics.get("rules_checked"))
    return result

A.4 — Medication Dose Unit Consistency

clifpy.utils.validator.check_medication_dose_unit_consistency

check_medication_dose_unit_consistency(df, table_name, warning_threshold=0.0, error_threshold=10.0)

Check medication dose unit consistency.

Source code in clifpy/utils/validator.py
def check_medication_dose_unit_consistency(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    warning_threshold: float = 0.0,
    error_threshold: float = 10.0,
) -> DQAPlausibilityResult:
    """Check medication dose unit consistency."""
    _logger.debug("check_medication_dose_unit_consistency: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_medication_dose_unit_consistency_polars(df, table_name,
                                                               warning_threshold=warning_threshold,
                                                               error_threshold=error_threshold)
    else:
        result = check_medication_dose_unit_consistency_duckdb(df, table_name,
                                                               warning_threshold=warning_threshold,
                                                               error_threshold=error_threshold)
    _logger.debug("check_medication_dose_unit_consistency: table '%s' — violations=%s",
                  table_name, result.metrics.get("unit_pattern_violations"))
    return result

B.1 — Cross-Table Temporal Plausibility

clifpy.utils.validator.check_cross_table_temporal_plausibility

check_cross_table_temporal_plausibility(target_df, hospitalization_df, target_table, time_columns, warning_threshold=0.0, error_threshold=10.0)

Check that datetime values fall within hospitalization bounds.

Source code in clifpy/utils/validator.py
def check_cross_table_temporal_plausibility(
    target_df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    hospitalization_df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    target_table: str,
    time_columns: List[str],
    warning_threshold: float = 0.0,
    error_threshold: float = 10.0,
) -> DQAPlausibilityResult:
    """Check that datetime values fall within hospitalization bounds."""
    _logger.debug("check_cross_table_temporal_plausibility: starting for table '%s'", target_table)
    if _ACTIVE_BACKEND == 'polars':
        result = check_cross_table_temporal_plausibility_polars(
            target_df, hospitalization_df, target_table, time_columns,
            warning_threshold=warning_threshold, error_threshold=error_threshold)
    else:
        result = check_cross_table_temporal_plausibility_duckdb(
            target_df, hospitalization_df, target_table, time_columns,
            warning_threshold=warning_threshold, error_threshold=error_threshold)
    _logger.debug("check_cross_table_temporal_plausibility: table '%s' complete", target_table)
    return result

C.1 — Overlapping Periods

clifpy.utils.validator.check_overlapping_periods

check_overlapping_periods(df, table_name, entity_col='hospitalization_id', start_col='in_dttm', end_col='out_dttm')

Check for overlapping time periods within entities.

Source code in clifpy/utils/validator.py
def check_overlapping_periods(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    entity_col: str = 'hospitalization_id',
    start_col: str = 'in_dttm',
    end_col: str = 'out_dttm',
) -> DQAPlausibilityResult:
    """Check for overlapping time periods within entities."""
    _logger.debug("check_overlapping_periods: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_overlapping_periods_polars(df, table_name, entity_col, start_col, end_col)
    else:
        result = check_overlapping_periods_duckdb(df, table_name, entity_col, start_col, end_col)
    _logger.debug("check_overlapping_periods: table '%s' — overlaps=%s",
                  table_name, result.metrics.get("overlapping_records"))
    return result

C.2 — Category Temporal Consistency

clifpy.utils.validator.check_category_temporal_consistency

check_category_temporal_consistency(df, schema, table_name, time_column=None, hosp_years=None)

Check category distribution consistency over time.

Source code in clifpy/utils/validator.py
def check_category_temporal_consistency(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str,
    time_column: Optional[str] = None,
    hosp_years: Optional[set] = None,
) -> DQAPlausibilityResult:
    """Check category distribution consistency over time."""
    _logger.debug("check_category_temporal_consistency: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_category_temporal_consistency_polars(df, schema, table_name, time_column, hosp_years)
    else:
        result = check_category_temporal_consistency_duckdb(df, schema, table_name, time_column, hosp_years)
    _logger.debug("check_category_temporal_consistency: table '%s' — columns_checked=%s",
                  table_name, result.metrics.get("category_columns_checked"))
    return result

D.1 — Duplicate Composite Keys

clifpy.utils.validator.check_duplicate_composite_keys

check_duplicate_composite_keys(df, table_name, composite_keys=None, schema=None, warning_threshold=0.0, error_threshold=10.0)

Check for duplicate composite keys.

Source code in clifpy/utils/validator.py
def check_duplicate_composite_keys(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    composite_keys: Optional[List[str]] = None,
    schema: Optional[Dict[str, Any]] = None,
    warning_threshold: float = 0.0,
    error_threshold: float = 10.0,
) -> DQAPlausibilityResult:
    """Check for duplicate composite keys."""
    _logger.debug("check_duplicate_composite_keys: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_duplicate_composite_keys_polars(df, table_name, composite_keys, schema,
                                                       warning_threshold=warning_threshold,
                                                       error_threshold=error_threshold)
    else:
        result = check_duplicate_composite_keys_duckdb(df, table_name, composite_keys, schema,
                                                       warning_threshold=warning_threshold,
                                                       error_threshold=error_threshold)
    _logger.debug("check_duplicate_composite_keys: table '%s' — duplicates=%s",
                  table_name, result.metrics.get("duplicate_records"))
    return result

Cross-Table Checks

These checks operate across multiple loaded tables to validate relational and temporal consistency.

clifpy.utils.validator.run_relational_integrity_checks

run_relational_integrity_checks(tables)

Auto-detect and run relational integrity checks for loaded tables.

Reads FK rules from validation_rules.yaml and runs :func:check_relational_integrity for every applicable (table, fk_column) pair.

Parameters:

Name Type Description Default
tables list

Objects with .table_name (str) and .df (DataFrame) attributes — typically :class:BaseTable instances.

required

Returns:

Type Description
Dict[str, Dict[str, DQACompletenessResult]]

{table_name: {fk_column: DQACompletenessResult}}.

Source code in clifpy/utils/validator.py
def run_relational_integrity_checks(
    tables: list,
) -> Dict[str, Dict[str, DQACompletenessResult]]:
    """Auto-detect and run relational integrity checks for loaded tables.

    Reads FK rules from ``validation_rules.yaml`` and runs
    :func:`check_relational_integrity` for every applicable
    (table, fk_column) pair.

    Parameters
    ----------
    tables : list
        Objects with ``.table_name`` (str) and ``.df`` (DataFrame)
        attributes — typically :class:`BaseTable` instances.

    Returns
    -------
    Dict[str, Dict[str, DQACompletenessResult]]
        ``{table_name: {fk_column: DQACompletenessResult}}``.
    """
    _logger.info("run_relational_integrity_checks: starting with %d tables", len(tables))

    # Build lookup: table_name -> (DataFrame, schema_col_names)
    # Convert pandas DataFrames to Polars when the Polars backend is active,
    # matching the pattern used by run_conformance_checks / run_completeness_checks.
    lookup = {}
    for obj in tables:
        df = obj.df
        if _ACTIVE_BACKEND == 'polars' and isinstance(df, pd.DataFrame):
            _logger.debug("Converting pandas DataFrame to Polars for table '%s'", obj.table_name)
            df = pl.from_pandas(df)
        # Case-normalize for validation (see run_conformance_checks).
        df = _normalize_for_validation(df)
        # Extract schema-defined column names (only check FK columns that belong
        # to the table's schema, not extra columns that happen to be in the data).
        schema = getattr(obj, 'schema', None) or {}
        schema_cols = {c['name'] for c in schema.get('columns', [])} if schema.get('columns') else None
        lookup[obj.table_name] = (df, schema_cols)

    # Load FK rules
    fk_rules = _load_validation_rules().get('relational_integrity', {})
    _logger.debug("run_relational_integrity_checks: loaded %d FK rules", len(fk_rules))

    results: Dict[str, Dict[str, DQACompletenessResult]] = {}

    for table_name, (df, schema_cols) in lookup.items():
        # Use schema columns when available; fall back to DataFrame columns
        if schema_cols is not None:
            col_names = schema_cols
        elif HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
            lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
            col_names = set(lf.collect_schema().names())
        else:
            col_names = set(df.columns.tolist())

        for fk_column, rule in fk_rules.items():
            if fk_column not in col_names:
                continue

            ref_table_name = rule['references_table']

            # Skip self-references
            if ref_table_name == table_name:
                _logger.debug(
                    "run_relational_integrity_checks: skipping self-ref "
                    "%s.%s -> %s", table_name, fk_column, ref_table_name,
                )
                continue

            # Skip if the reference table isn't loaded
            if ref_table_name not in lookup:
                _logger.debug(
                    "run_relational_integrity_checks: skipping %s.%s — "
                    "reference table '%s' not loaded",
                    table_name, fk_column, ref_table_name,
                )
                continue

            _logger.info(
                "run_relational_integrity_checks: checking %s.%s -> %s",
                table_name, fk_column, ref_table_name,
            )
            result = check_relational_integrity(
                target_df=df,
                reference_df=lookup[ref_table_name][0],
                target_table=table_name,
                reference_table=ref_table_name,
                key_column=fk_column,
            )
            results.setdefault(table_name, {})[fk_column] = result

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_relational_integrity_checks: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

clifpy.utils.validator.run_cross_table_completeness_checks

run_cross_table_completeness_checks(tables)

Run cross-table conditional completeness checks (K.5) on full DataFrames.

Parameters:

Name Type Description Default
tables list

Objects with .table_name and .df attributes.

required

Returns:

Type Description
Dict[str, Dict[str, DQACompletenessResult]]

Results keyed by target table name.

Source code in clifpy/utils/validator.py
def run_cross_table_completeness_checks(
    tables: list,
) -> Dict[str, Dict[str, DQACompletenessResult]]:
    """Run cross-table conditional completeness checks (K.5) on full DataFrames.

    Parameters
    ----------
    tables : list
        Objects with ``.table_name`` and ``.df`` attributes.

    Returns
    -------
    Dict[str, Dict[str, DQACompletenessResult]]
        Results keyed by **target** table name.
    """
    _logger.info("run_cross_table_completeness_checks: starting with %d tables", len(tables))

    ct_cond_rules = _load_validation_rules().get('cross_table_conditional_requirements', [])
    if not ct_cond_rules:
        return {}

    lookup = {}
    for obj in tables:
        tname = getattr(obj, 'table_name', '').replace('clif_', '')
        tdf = obj.df
        if _ACTIVE_BACKEND == 'polars' and isinstance(tdf, pd.DataFrame):
            tdf = pl.from_pandas(tdf)
        # Case-normalize for validation (see run_conformance_checks).
        tdf = _normalize_for_validation(tdf)
        lookup[tname] = tdf

    results: Dict[str, Dict[str, DQACompletenessResult]] = {}

    for rule in ct_cond_rules:
        rule_key = f"{rule['source_column']}_{rule['target_column']}"
        source_table = rule['source_table']
        target_table = rule['target_table']
        join_col = rule['join_column']

        if source_table not in lookup or target_table not in lookup:
            continue

        src_df = lookup[source_table]
        tgt_df = lookup[target_table]
        match_values = [str(v).strip().lower() for v in rule['source_value']]

        # Get source IDs matching condition. `src_df` has already been passed
        # through `_normalize_for_validation` at the top of this function, so
        # string columns are lowercased + stripped — skip the redundant per-row
        # transformation when the column is already string-typed (common case).
        if HAS_POLARS and isinstance(src_df, (pl.DataFrame, pl.LazyFrame)):
            lf = src_df if isinstance(src_df, pl.LazyFrame) else src_df.lazy()
            schema = lf.collect_schema()
            if rule['source_column'] not in schema.names() or join_col not in schema.names():
                continue
            src_dtype = schema[rule['source_column']]
            if src_dtype == pl.Utf8 or src_dtype == pl.String:
                filter_expr = pl.col(rule['source_column']).is_in(match_values)
            else:
                filter_expr = pl.col(rule['source_column']).cast(pl.Utf8).str.strip_chars().str.to_lowercase().is_in(match_values)
            matched = (
                lf.filter(filter_expr)
                .select(pl.col(join_col).drop_nulls().unique())
                .collect(streaming=True)
            )
            source_ids = set(matched[join_col].to_list())
        else:
            if rule['source_column'] not in src_df.columns or join_col not in src_df.columns:
                continue
            src_col = src_df[rule['source_column']]
            if pd.api.types.is_string_dtype(src_col):
                mask = src_col.isin(match_values)
            else:
                mask = src_col.astype(str).str.strip().str.lower().isin(match_values)
            source_ids = set(src_df.loc[mask, join_col].dropna().unique().tolist())

        # Get target IDs with non-null target column (also exclude empty strings)
        if HAS_POLARS and isinstance(tgt_df, (pl.DataFrame, pl.LazyFrame)):
            lf = tgt_df if isinstance(tgt_df, pl.LazyFrame) else tgt_df.lazy()
            schema_names = lf.collect_schema().names()
            if rule['target_column'] not in schema_names or join_col not in schema_names:
                continue
            tcol = pl.col(rule['target_column'])
            not_null_filter = tcol.is_not_null()
            dtype = lf.collect_schema()[rule['target_column']]
            if dtype == pl.Utf8 or dtype == pl.String:
                not_null_filter = not_null_filter & (tcol.str.strip_chars().str.len_chars() > 0)
            matched = (
                lf.filter(not_null_filter)
                .select(pl.col(join_col).drop_nulls().unique())
                .collect(streaming=True)
            )
            target_ids = set(matched[join_col].to_list())
        else:
            if rule['target_column'] not in tgt_df.columns or join_col not in tgt_df.columns:
                continue
            target_col = tgt_df[rule['target_column']]
            mask = target_col.notna()
            if hasattr(target_col, 'str') and target_col.dtype == 'object':
                mask = mask & (target_col.astype(str).str.strip() != '')
            target_ids = set(tgt_df.loc[mask, join_col].dropna().unique().tolist())

        result = DQACompletenessResult(
            "cross_table_conditional_completeness",
            f"{source_table}->{target_table}",
        )

        if not source_ids:
            result.add_info(
                f"No {rule['source_column']} = {rule['source_value']} found in "
                f"{source_table}; cross-table conditional check not triggered"
            )
            result.atomic_total = 0
            result.atomic_passed = 0
            results.setdefault(target_table, {})[rule_key] = result
            continue

        missing_ids = source_ids - target_ids
        total = len(source_ids)
        missing_count = len(missing_ids)
        pct = round(missing_count / total * 100, 1) if total > 0 else 0

        result.metrics["total_matching_source"] = total
        result.metrics["missing_in_target"] = missing_count
        result.metrics["coverage_percent"] = round(100 - pct, 2)

        if missing_ids:
            sample = sorted(list(missing_ids))[:10]
            result.add_warning(
                f"{missing_count}/{total} patients discharged as {rule['source_value']} "
                f"in {source_table} are missing {rule['target_column']} in {target_table} "
                f"({pct}% missing)",
                {
                    "column": rule['target_column'],
                    "missing_count": missing_count,
                    "total_matching": total,
                    "percent_missing": pct,
                    "sample_ids": sample,
                    "source_condition": f"{rule['source_column']} in {rule['source_value']}",
                },
            )
        else:
            result.add_info(
                f"All {total} patients discharged as {rule['source_value']} in "
                f"{source_table} have {rule['target_column']} in {target_table}"
            )

        result.atomic_total = 1
        result.atomic_passed = 1 if not result.errors else 0
        results.setdefault(target_table, {})[rule_key] = result

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_cross_table_completeness_checks: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

clifpy.utils.validator.run_cross_table_plausibility_checks

run_cross_table_plausibility_checks(tables, plausibility_thresholds=None)

Run cross-table plausibility checks (B.1).

Parameters:

Name Type Description Default
tables list

Objects with .table_name and .df attributes.

required
plausibility_thresholds dict

Override default plausibility thresholds per check.

None

Returns:

Type Description
Dict[str, Dict[str, DQAPlausibilityResult]]

{table_name: {"cross_table_temporal": DQAPlausibilityResult}}.

Source code in clifpy/utils/validator.py
def run_cross_table_plausibility_checks(
    tables: list,
    plausibility_thresholds: Optional[Dict[str, Dict[str, float]]] = None,
) -> Dict[str, Dict[str, DQAPlausibilityResult]]:
    """Run cross-table plausibility checks (B.1).

    Parameters
    ----------
    tables : list
        Objects with ``.table_name`` and ``.df`` attributes.
    plausibility_thresholds : dict, optional
        Override default plausibility thresholds per check.

    Returns
    -------
    Dict[str, Dict[str, DQAPlausibilityResult]]
        ``{table_name: {"cross_table_temporal": DQAPlausibilityResult}}``.
    """
    _logger.info("run_cross_table_plausibility_checks: starting with %d tables", len(tables))

    # Merge caller overrides with defaults
    thresholds = {k: dict(v) for k, v in _DEFAULT_PLAUSIBILITY_THRESHOLDS.items()}
    if plausibility_thresholds:
        for check_name, overrides in plausibility_thresholds.items():
            if check_name in thresholds:
                thresholds[check_name].update(overrides)
            else:
                thresholds[check_name] = overrides

    lookup = {}
    for obj in tables:
        tdf = obj.df
        if _ACTIVE_BACKEND == 'polars' and isinstance(tdf, pd.DataFrame):
            tdf = pl.from_pandas(tdf)
        # Case-normalize for validation (see run_conformance_checks).
        tdf = _normalize_for_validation(tdf)
        lookup[obj.table_name] = tdf

    if 'hospitalization' not in lookup:
        _logger.info("Hospitalization table not loaded; skipping cross-table plausibility")
        return {}

    hosp_df = lookup['hospitalization']
    results: Dict[str, Dict[str, DQAPlausibilityResult]] = {}

    for tbl_name, tdf in lookup.items():
        if tbl_name == 'hospitalization':
            continue
        time_cols = _CROSS_TABLE_TIME_COLUMNS.get(tbl_name, [])
        if not time_cols:
            continue

        if HAS_POLARS and isinstance(tdf, (pl.DataFrame, pl.LazyFrame)):
            lf = tdf if isinstance(tdf, pl.LazyFrame) else tdf.lazy()
            actual_cols = lf.collect_schema().names()
        else:
            actual_cols = tdf.columns.tolist()

        available_time_cols = [c for c in time_cols if c in actual_cols]
        if not available_time_cols or 'hospitalization_id' not in actual_cols:
            continue

        result = check_cross_table_temporal_plausibility(
            tdf, hosp_df, tbl_name, available_time_cols,
            **thresholds['cross_table_temporal']
        )
        results.setdefault(tbl_name, {})["cross_table_temporal"] = result

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_cross_table_plausibility_checks: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

Orchestration

High-level functions that run groups of checks or the full DQA suite.

Single-Table Orchestration

clifpy.utils.validator.run_conformance_checks

run_conformance_checks(df, schema, table_name)

Run all conformance checks on a table.

Parameters:

Name Type Description Default
df DataFrame

The data to validate

required
schema dict

Schema for the table

required
table_name str

Name of the table

required

Returns:

Type Description
Dict[str, DQAConformanceResult]

Dictionary of check results keyed by check type

Source code in clifpy/utils/validator.py
def run_conformance_checks(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> Dict[str, DQAConformanceResult]:
    """
    Run all conformance checks on a table.

    Parameters
    ----------
    df : DataFrame
        The data to validate
    schema : dict
        Schema for the table
    table_name : str
        Name of the table

    Returns
    -------
    Dict[str, DQAConformanceResult]
        Dictionary of check results keyed by check type
    """
    _logger.info("Running conformance checks for table '%s' using %s backend", table_name, _ACTIVE_BACKEND)

    # Convert pandas DataFrame to Polars if the active backend is Polars
    if _ACTIVE_BACKEND == 'polars' and isinstance(df, pd.DataFrame):
        _logger.debug("Converting pandas DataFrame to Polars for table '%s'", table_name)
        df = pl.from_pandas(df)

    # Case-normalize for validation: lowercase column names + string values,
    # with __orig_<col> sidecars preserving original case for error messages.
    df = _normalize_for_validation(df)

    results = {}

    results['table_presence'] = check_table_presence(df, table_name)
    gc.collect()

    results['required_columns'] = check_required_columns(df, schema, table_name)
    gc.collect()

    results['column_dtypes'] = check_column_dtypes(df, schema, table_name)
    gc.collect()

    results['datetime_format'] = check_datetime_format(df, schema, table_name)
    gc.collect()

    if table_name == 'labs':
        results['lab_reference_units'] = check_lab_reference_units(df, schema, table_name)
        gc.collect()

    results['categorical_values'] = check_categorical_values(df, schema, table_name)
    gc.collect()

    results['category_group_mapping'] = check_category_group_mapping(df, schema, table_name)
    gc.collect()

    passed = sum(1 for r in results.values() if r.passed)
    failed = len(results) - passed
    _logger.info("Conformance checks complete for '%s': %d passed, %d failed", table_name, passed, failed)

    return results

clifpy.utils.validator.run_completeness_checks

run_completeness_checks(df, schema, table_name, error_threshold=50.0, warning_threshold=10.0)

Run all completeness checks on a table.

Parameters:

Name Type Description Default
df DataFrame

The data to validate

required
schema dict

Schema for the table

required
table_name str

Name of the table

required
error_threshold float

Percent missing above which an error is raised

50.0
warning_threshold float

Percent missing above which a warning is raised

10.0

Returns:

Type Description
Dict[str, DQACompletenessResult]

Dictionary of check results keyed by check type

Source code in clifpy/utils/validator.py
def run_completeness_checks(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str,
    error_threshold: float = 50.0,
    warning_threshold: float = 10.0
) -> Dict[str, DQACompletenessResult]:
    """
    Run all completeness checks on a table.

    Parameters
    ----------
    df : DataFrame
        The data to validate
    schema : dict
        Schema for the table
    table_name : str
        Name of the table
    error_threshold : float
        Percent missing above which an error is raised
    warning_threshold : float
        Percent missing above which a warning is raised

    Returns
    -------
    Dict[str, DQACompletenessResult]
        Dictionary of check results keyed by check type
    """
    _logger.info("Running completeness checks for table '%s' using %s backend", table_name, _ACTIVE_BACKEND)

    # Convert pandas DataFrame to Polars if the active backend is Polars
    if _ACTIVE_BACKEND == 'polars' and isinstance(df, pd.DataFrame):
        _logger.debug("Converting pandas DataFrame to Polars for table '%s'", table_name)
        df = pl.from_pandas(df)

    # Case-normalize for validation (see run_conformance_checks).
    df = _normalize_for_validation(df)

    results = {}

    results['missingness'] = check_missingness(
        df, schema, table_name, error_threshold, warning_threshold
    )
    gc.collect()

    results['conditional_requirements'] = check_conditional_requirements(df, table_name)
    gc.collect()

    results['mcide_value_coverage'] = check_mcide_value_coverage(df, schema, table_name)
    gc.collect()

    passed = sum(1 for r in results.values() if r.passed)
    failed = len(results) - passed
    _logger.info("Completeness checks complete for '%s': %d passed, %d failed", table_name, passed, failed)

    return results

clifpy.utils.validator.run_plausibility_checks

run_plausibility_checks(df, schema, table_name, hosp_years=None, plausibility_thresholds=None)

Run all single-table plausibility checks on a table.

Parameters:

Name Type Description Default
df DataFrame

The data to validate

required
schema dict

Schema for the table

required
table_name str

Name of the table

required
plausibility_thresholds dict

Override default plausibility thresholds per check.

None

Returns:

Type Description
Dict[str, DQAPlausibilityResult]

Dictionary of check results keyed by check type

Source code in clifpy/utils/validator.py
def run_plausibility_checks(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str,
    hosp_years: Optional[set] = None,
    plausibility_thresholds: Optional[Dict[str, Dict[str, float]]] = None,
) -> Dict[str, DQAPlausibilityResult]:
    """
    Run all single-table plausibility checks on a table.

    Parameters
    ----------
    df : DataFrame
        The data to validate
    schema : dict
        Schema for the table
    table_name : str
        Name of the table
    plausibility_thresholds : dict, optional
        Override default plausibility thresholds per check.

    Returns
    -------
    Dict[str, DQAPlausibilityResult]
        Dictionary of check results keyed by check type
    """
    _logger.info("Running plausibility checks for table '%s' using %s backend", table_name, _ACTIVE_BACKEND)

    # Merge caller overrides with defaults
    thresholds = {k: dict(v) for k, v in _DEFAULT_PLAUSIBILITY_THRESHOLDS.items()}
    if plausibility_thresholds:
        for check_name, overrides in plausibility_thresholds.items():
            if check_name in thresholds:
                thresholds[check_name].update(overrides)
            else:
                thresholds[check_name] = overrides

    if _ACTIVE_BACKEND == 'polars' and isinstance(df, pd.DataFrame):
        _logger.debug("Converting pandas DataFrame to Polars for table '%s'", table_name)
        df = pl.from_pandas(df)

    # Case-normalize for validation (see run_conformance_checks).
    df = _normalize_for_validation(df)

    results = {}

    # A.1 Chronological order
    results['chronological_order'] = check_chronological_order(
        df, table_name, **thresholds['chronological_order'])
    gc.collect()

    # A.2 Numeric range plausibility
    results['numeric_range_plausibility'] = check_numeric_range_plausibility(
        df, table_name, **thresholds['numeric_range_plausibility'])
    gc.collect()

    # A.3 Field-level plausibility
    results['field_plausibility'] = check_field_plausibility(df, table_name)
    gc.collect()

    # A.4 Medication dose unit consistency (only for med tables)
    if table_name in ('medication_admin_continuous', 'medication_admin_intermittent'):
        results['medication_dose_unit_consistency'] = check_medication_dose_unit_consistency(
            df, table_name, **thresholds['medication_dose_unit_consistency'])
        gc.collect()

    # C.1 Overlapping periods
    overlap_rules = _load_validation_rules().get('overlapping_periods', {}).get(table_name)
    if overlap_rules:
        results['overlapping_periods'] = check_overlapping_periods(
            df, table_name,
            entity_col=overlap_rules.get('entity_column', 'hospitalization_id'),
            start_col=overlap_rules.get('start_column', 'in_dttm'),
            end_col=overlap_rules.get('end_column', 'out_dttm'),
        )
        gc.collect()

    # C.2 Category temporal consistency
    results['category_temporal_consistency'] = check_category_temporal_consistency(df, schema, table_name, hosp_years=hosp_years)
    gc.collect()

    # D.1 Duplicate composite keys
    results['duplicate_composite_keys'] = check_duplicate_composite_keys(
        df, table_name, schema=schema, **thresholds['duplicate_composite_keys'])
    gc.collect()

    passed = sum(1 for r in results.values() if r.passed)
    failed = len(results) - passed
    _logger.info("Plausibility checks complete for '%s': %d passed, %d failed", table_name, passed, failed)
    return results

clifpy.utils.validator.run_full_dqa

run_full_dqa(df, schema=None, table_name='', tables=None, error_threshold=50.0, warning_threshold=10.0, hosp_years=None, plausibility_thresholds=None, clif_version=DEFAULT_CLIF_VERSION)

Run the complete DQA suite on a single table.

Orchestrates conformance checks, completeness checks, plausibility checks, and — when tables is provided — auto-detected relational integrity and cross-table plausibility checks.

Parameters:

Name Type Description Default
df DataFrame

The data to validate.

required
schema dict

Schema for the table. When None (the default), the schema is loaded automatically from the built-in schemas using table_name.

None
table_name str

Name of the table.

''
tables list

Objects with .table_name and .df attributes (e.g. :class:BaseTable instances). When provided, relational integrity and cross-table plausibility checks are run.

None
error_threshold float

Percent missing above which an error is raised (default 50).

50.0
warning_threshold float

Percent missing above which a warning is raised (default 10).

10.0
hosp_years set

Pre-extracted hospitalization years for P.6 temporal consistency. When provided, skips scanning the hospitalization table to extract years.

None
plausibility_thresholds dict

Override default plausibility thresholds per check.

None
clif_version str

CLIF schema version to auto-load when schema is None (e.g. "2.1", "3.0"). Ignored when schema is passed explicitly. Defaults to the package default (2.1).

DEFAULT_CLIF_VERSION

Returns:

Type Description
Dict[str, Any]

Keys: table_name, backend, conformance, completeness, relational, plausibility.

Source code in clifpy/utils/validator.py
def run_full_dqa(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Optional[Dict[str, Any]] = None,
    table_name: str = "",
    tables: Optional[list] = None,
    error_threshold: float = 50.0,
    warning_threshold: float = 10.0,
    hosp_years: Optional[set] = None,
    plausibility_thresholds: Optional[Dict[str, Dict[str, float]]] = None,
    clif_version: str = DEFAULT_CLIF_VERSION,
) -> Dict[str, Any]:
    """Run the complete DQA suite on a single table.

    Orchestrates conformance checks, completeness checks, plausibility
    checks, and — when *tables* is provided — auto-detected relational
    integrity and cross-table plausibility checks.

    Parameters
    ----------
    df : DataFrame
        The data to validate.
    schema : dict, optional
        Schema for the table.  When *None* (the default), the schema is
        loaded automatically from the built-in schemas using *table_name*.
    table_name : str
        Name of the table.
    tables : list, optional
        Objects with ``.table_name`` and ``.df`` attributes (e.g.
        :class:`BaseTable` instances).  When provided, relational
        integrity and cross-table plausibility checks are run.
    error_threshold : float
        Percent missing above which an error is raised (default 50).
    warning_threshold : float
        Percent missing above which a warning is raised (default 10).
    hosp_years : set, optional
        Pre-extracted hospitalization years for P.6 temporal consistency.
        When provided, skips scanning the hospitalization table to
        extract years.
    plausibility_thresholds : dict, optional
        Override default plausibility thresholds per check.
    clif_version : str, optional
        CLIF schema version to auto-load when *schema* is None
        (e.g. "2.1", "3.0"). Ignored when *schema* is passed explicitly.
        Defaults to the package default (2.1).

    Returns
    -------
    Dict[str, Any]
        Keys: ``table_name``, ``backend``, ``conformance``,
        ``completeness``, ``relational``, ``plausibility``.
    """
    if not table_name:
        raise ValueError("table_name is required")
    if schema is None:
        schema = _load_schema(table_name, clif_version=clif_version)
        if schema is None:
            raise FileNotFoundError(
                f"No built-in schema found for table '{table_name}' "
                f"(CLIF {clif_version}). Pass a schema dict explicitly."
            )
    _logger.info("Starting full DQA for table: %s", table_name)

    results: Dict[str, Any] = {
        'table_name': table_name,
        'backend': _ACTIVE_BACKEND,
        'conformance': {},
        'completeness': {},
        'relational': {},
        'plausibility': {},
    }

    results['conformance'] = {
        k: v.to_dict()
        for k, v in run_conformance_checks(df, schema, table_name).items()
    }

    results['completeness'] = {
        k: v.to_dict()
        for k, v in run_completeness_checks(
            df, schema, table_name, error_threshold, warning_threshold
        ).items()
    }

    if tables is not None:
        rel_results = run_relational_integrity_checks(tables)
        if table_name in rel_results:
            results['relational'] = {
                k: v.to_dict()
                for k, v in rel_results[table_name].items()
            }

    # Extract hospitalization years for P.6 temporal consistency context
    # (skip scan if hosp_years was provided by caller)
    if hosp_years is None and tables is not None:
        for obj in tables:
            tname = getattr(obj, 'table_name', '').replace('clif_', '')
            if tname == 'hospitalization':
                hdf = obj.df
                if HAS_POLARS and isinstance(hdf, (pl.DataFrame, pl.LazyFrame)):
                    hlf = hdf if isinstance(hdf, pl.LazyFrame) else hdf.lazy()
                    hcols = hlf.collect_schema().names()
                    if 'admission_dttm' in hcols:
                        hosp_years = set(
                            hlf.select(pl.col('admission_dttm').dt.year().alias('yr'))
                            .filter(pl.col('yr').is_not_null())
                            .unique()
                            .collect()
                            .get_column('yr')
                            .to_list()
                        )
                elif isinstance(hdf, pd.DataFrame) and 'admission_dttm' in hdf.columns:
                    hosp_years = set(
                        int(y) for y in hdf['admission_dttm'].dropna().dt.year.unique()
                    )
                break

    # Plausibility checks (single-table)
    results['plausibility'] = {
        k: v.to_dict()
        for k, v in run_plausibility_checks(
            df, schema, table_name, hosp_years=hosp_years,
            plausibility_thresholds=plausibility_thresholds,
        ).items()
    }

    # Cross-table plausibility checks (when tables provided)
    if tables is not None:
        cross_plaus = run_cross_table_plausibility_checks(
            tables, plausibility_thresholds=plausibility_thresholds)
        if table_name in cross_plaus:
            for k, v in cross_plaus[table_name].items():
                results['plausibility'][k] = v.to_dict()

    gc.collect()
    _logger.info("Completed full DQA for table: %s", table_name)
    return results

Cache-Based Cross-Table Pipeline

For memory-optimized cross-table validation, extract lightweight caches and run checks without keeping full DataFrames in memory.

clifpy.utils.validator.extract_cross_table_cache

extract_cross_table_cache(table_obj)

Extract a lightweight cache from a single table object.

Used by the optimised pipeline in CLIF-TableOne's runner to avoid keeping full DataFrames in memory for cross-table checks.

Parameters:

Name Type Description Default
table_obj BaseTable

Object with .table_name, .df, and .schema attributes.

required

Returns:

Type Description
dict

Keys: table_name, fk_ids, schema_cols, temporal_df, hosp_bounds_df, hosp_years.

Source code in clifpy/utils/validator.py
def extract_cross_table_cache(table_obj) -> Dict[str, Any]:
    """Extract a lightweight cache from a single table object.

    Used by the optimised pipeline in CLIF-TableOne's runner to avoid
    keeping full DataFrames in memory for cross-table checks.

    Parameters
    ----------
    table_obj : BaseTable
        Object with ``.table_name``, ``.df``, and ``.schema`` attributes.

    Returns
    -------
    dict
        Keys: ``table_name``, ``fk_ids``, ``schema_cols``,
        ``temporal_df``, ``hosp_bounds_df``, ``hosp_years``.
    """
    tname = getattr(table_obj, 'table_name', '').replace('clif_', '')
    df = table_obj.df
    schema = getattr(table_obj, 'schema', None) or {}

    # Case-normalize for validation (column names + string values; sidecars preserved).
    # This makes downstream FK set-membership checks case-insensitive too.
    df = _normalize_for_validation(df)

    # Schema-defined column names
    schema_cols = (
        {c['name'] for c in schema.get('columns', [])}
        if schema.get('columns') else None
    )

    # Determine actual columns in the DataFrame (exclude __orig_ sidecars)
    if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
        lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
        actual_cols = set(_strip_sidecars(lf.collect_schema().names()))
    else:
        actual_cols = set(_strip_sidecars(df.columns.tolist()))

    # Use schema cols when available; fall back to actual cols
    col_names = schema_cols if schema_cols is not None else actual_cols

    # --- FK ID sets ---
    fk_rules = _load_validation_rules().get('relational_integrity', {})
    fk_ids: Dict[str, set] = {}
    for fk_column in fk_rules:
        if fk_column not in col_names or fk_column not in actual_cols:
            continue
        if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
            lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
            id_series = (
                lf.select(pl.col(fk_column).drop_nulls().unique())
                .collect(streaming=True)
            )
            fk_ids[fk_column] = set(id_series[fk_column].to_list())
        else:
            fk_ids[fk_column] = set(df[fk_column].dropna().unique().tolist())

    # --- Temporal subset for cross-table plausibility ---
    time_cols = _CROSS_TABLE_TIME_COLUMNS.get(tname, [])
    temporal_df = None
    if time_cols and 'hospitalization_id' in actual_cols:
        needed = ['hospitalization_id'] + [c for c in time_cols if c in actual_cols]
        if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
            lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
            temporal_df = lf.select(needed).collect()
        else:
            temporal_df = df[needed].copy()

    # --- Hospitalization-specific caches ---
    hosp_bounds_df = None
    hosp_years = None
    if tname == 'hospitalization':
        if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
            lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
            hosp_cols_available = set(lf.collect_schema().names())
            bound_cols = [c for c in ['hospitalization_id', 'admission_dttm', 'discharge_dttm']
                          if c in hosp_cols_available]
            if 'hospitalization_id' in bound_cols:
                hosp_bounds_df = lf.select(bound_cols).collect()
            if 'admission_dttm' in hosp_cols_available:
                hosp_years = set(
                    lf.select(pl.col('admission_dttm').dt.year().alias('yr'))
                    .filter(pl.col('yr').is_not_null())
                    .unique()
                    .collect()
                    .get_column('yr')
                    .to_list()
                )
        elif isinstance(df, pd.DataFrame):
            bound_cols = [c for c in ['hospitalization_id', 'admission_dttm', 'discharge_dttm']
                          if c in df.columns]
            if 'hospitalization_id' in bound_cols:
                hosp_bounds_df = df[bound_cols].copy()
            if 'admission_dttm' in df.columns:
                hosp_years = set(
                    int(y) for y in df['admission_dttm'].dropna().dt.year.unique()
                )

    # --- Cross-table conditional requirements cache ---
    ct_cond_rules = _load_validation_rules().get('cross_table_conditional_requirements', [])
    conditional_source_ids: Dict[str, set] = {}
    conditional_target_ids: Dict[str, set] = {}

    for rule in ct_cond_rules:
        rule_key = f"{rule['source_column']}_{rule['target_column']}"
        join_col = rule['join_column']

        if tname == rule['source_table'] and join_col in actual_cols and rule['source_column'] in actual_cols:
            # Extract join_column IDs where source_column matches source_value (case-insensitive).
            # `df` has already passed through `_normalize_for_validation` above, so string
            # columns are lowercased + stripped; skip the redundant per-row transformation
            # when the column is already string-typed (the common case).
            match_values = [str(v).strip().lower() for v in rule['source_value']]
            if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
                lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
                src_dtype = lf.collect_schema()[rule['source_column']]
                if src_dtype == pl.Utf8 or src_dtype == pl.String:
                    filter_expr = pl.col(rule['source_column']).is_in(match_values)
                else:
                    filter_expr = pl.col(rule['source_column']).cast(pl.Utf8).str.strip_chars().str.to_lowercase().is_in(match_values)
                matched = (
                    lf.filter(filter_expr)
                    .select(pl.col(join_col).drop_nulls().unique())
                    .collect(streaming=True)
                )
                conditional_source_ids[rule_key] = set(matched[join_col].to_list())
            else:
                src_col = df[rule['source_column']]
                if pd.api.types.is_string_dtype(src_col):
                    mask = src_col.isin(match_values)
                else:
                    mask = src_col.astype(str).str.strip().str.lower().isin(match_values)
                conditional_source_ids[rule_key] = set(df.loc[mask, join_col].dropna().unique().tolist())

        if tname == rule['target_table'] and join_col in actual_cols and rule['target_column'] in actual_cols:
            # Extract join_column IDs where target_column is not null (and not empty string)
            if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
                lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
                tcol = pl.col(rule['target_column'])
                # Handle both datetime and string columns: not null AND not empty string
                not_null_filter = tcol.is_not_null()
                dtype = lf.collect_schema()[rule['target_column']]
                if dtype == pl.Utf8 or dtype == pl.String:
                    not_null_filter = not_null_filter & (tcol.str.strip_chars().str.len_chars() > 0)
                matched = (
                    lf.filter(not_null_filter)
                    .select(pl.col(join_col).drop_nulls().unique())
                    .collect(streaming=True)
                )
                conditional_target_ids[rule_key] = set(matched[join_col].to_list())
            else:
                target_col = df[rule['target_column']]
                mask = target_col.notna()
                # If string dtype, also exclude empty/whitespace-only strings
                if hasattr(target_col, 'str') and target_col.dtype == 'object':
                    mask = mask & (target_col.astype(str).str.strip() != '')
                conditional_target_ids[rule_key] = set(df.loc[mask, join_col].dropna().unique().tolist())

    cache = {
        'table_name': tname,
        'fk_ids': fk_ids,
        'schema_cols': schema_cols,
        'temporal_df': temporal_df,
        'hosp_bounds_df': hosp_bounds_df,
        'hosp_years': hosp_years,
        'conditional_source_ids': conditional_source_ids,
        'conditional_target_ids': conditional_target_ids,
    }
    _logger.info(
        "extract_cross_table_cache: table='%s', fk_keys=%s, temporal=%s, hosp_bounds=%s, "
        "cond_source_keys=%s, cond_target_keys=%s",
        tname, list(fk_ids.keys()), temporal_df is not None, hosp_bounds_df is not None,
        list(conditional_source_ids.keys()), list(conditional_target_ids.keys()),
    )
    return cache

clifpy.utils.validator.run_relational_integrity_checks_from_cache

run_relational_integrity_checks_from_cache(caches)

Run relational integrity checks using pre-extracted caches.

Equivalent to :func:run_relational_integrity_checks but operates on Python set objects (FK ID sets) instead of scanning full DataFrames.

Parameters:

Name Type Description Default
caches dict

{table_name: cache_dict} as returned by :func:extract_cross_table_cache.

required

Returns:

Type Description
Dict[str, Dict[str, DQACompletenessResult]]

Same structure as :func:run_relational_integrity_checks.

Source code in clifpy/utils/validator.py
def run_relational_integrity_checks_from_cache(
    caches: Dict[str, Dict[str, Any]],
) -> Dict[str, Dict[str, DQACompletenessResult]]:
    """Run relational integrity checks using pre-extracted caches.

    Equivalent to :func:`run_relational_integrity_checks` but operates on
    Python ``set`` objects (FK ID sets) instead of scanning full DataFrames.

    Parameters
    ----------
    caches : dict
        ``{table_name: cache_dict}`` as returned by
        :func:`extract_cross_table_cache`.

    Returns
    -------
    Dict[str, Dict[str, DQACompletenessResult]]
        Same structure as :func:`run_relational_integrity_checks`.
    """
    _logger.info(
        "run_relational_integrity_checks_from_cache: starting with %d cached tables",
        len(caches),
    )

    fk_rules = _load_validation_rules().get('relational_integrity', {})
    results: Dict[str, Dict[str, DQACompletenessResult]] = {}

    for table_name, cache in caches.items():
        col_names = cache['schema_cols'] if cache['schema_cols'] is not None else set(cache['fk_ids'].keys())

        for fk_column, rule in fk_rules.items():
            if fk_column not in col_names:
                continue

            ref_table_name = rule['references_table']

            if ref_table_name == table_name:
                continue

            if ref_table_name not in caches:
                _logger.debug(
                    "run_relational_integrity_checks_from_cache: skipping %s.%s — "
                    "reference table '%s' not cached",
                    table_name, fk_column, ref_table_name,
                )
                continue

            # Get the FK ID sets
            source_ids = cache['fk_ids'].get(fk_column)
            if source_ids is None:
                continue

            ref_cache = caches[ref_table_name]
            ref_ids = ref_cache['fk_ids'].get(fk_column)
            if ref_ids is None:
                # Reference table doesn't have this FK column cached — skip
                continue

            _logger.info(
                "run_relational_integrity_checks_from_cache: checking %s.%s -> %s",
                table_name, fk_column, ref_table_name,
            )

            # Build the bidirectional result (same structure as check_relational_integrity)
            result = DQACompletenessResult(
                "relational_integrity",
                f"{table_name}<->{ref_table_name}",
            )

            try:
                # Forward: reference -> target (source=ref, ref=target)
                fwd_orphans = ref_ids - source_ids
                fwd_total = len(ref_ids)
                fwd_orphan_count = len(fwd_orphans)
                fwd_coverage = ((fwd_total - fwd_orphan_count) / fwd_total * 100) if fwd_total > 0 else 100

                # Reverse: target -> reference (source=target, ref=reference)
                rev_orphans = source_ids - ref_ids
                rev_total = len(source_ids)
                rev_orphan_count = len(rev_orphans)
                rev_coverage = ((rev_total - rev_orphan_count) / rev_total * 100) if rev_total > 0 else 100

                result.metrics["forward_coverage_percent"] = round(fwd_coverage, 2)
                result.metrics["forward_orphan_ids"] = fwd_orphan_count
                result.metrics["forward_reference_unique_ids"] = fwd_total
                result.metrics["reverse_coverage_percent"] = round(rev_coverage, 2)
                result.metrics["reverse_orphan_ids"] = rev_orphan_count
                result.metrics["reverse_target_unique_ids"] = rev_total

                # `fwd` here is the reverse-cardinality question
                # ("does every reference row have a child in the source?"),
                # which on normal inpatient data routinely fails (many
                # hospitalizations don't have, e.g., CRRT / procedures /
                # ECMO). Always surface it as a warning but mark atomic_count=0
                # so it doesn't contribute to the FK's one-atom denominator.
                if fwd_orphans:
                    result.add_warning(
                        f"{fwd_orphan_count}/{fwd_total} {fk_column} values in {ref_table_name} "
                        f"not found in {table_name} ({round(fwd_coverage, 1)}% coverage)",
                        {"orphan_count": fwd_orphan_count,
                         "coverage_percent": round(fwd_coverage, 2),
                         "atomic_count": 0}
                    )

                # `rev` is the real FK integrity check ("does every source
                # FK value resolve in the reference?"). Apply thresholds:
                # >10% orphans → ERROR, >1% → WARNING, else silent pass.
                if rev_orphans:
                    orphan_pct = 100 - rev_coverage
                    emit = (result.add_error if orphan_pct > 10.0
                            else result.add_warning if orphan_pct > 1.0
                            else None)
                    if emit is not None:
                        emit(
                            f"{rev_orphan_count}/{rev_total} {fk_column} values in {table_name} "
                            f"not found in {ref_table_name} ({round(rev_coverage, 1)}% coverage)",
                            {"orphan_count": rev_orphan_count,
                             "coverage_percent": round(rev_coverage, 2),
                             "orphan_percent": round(orphan_pct, 2)}
                        )

                if not fwd_orphans and not rev_orphans:
                    result.add_info(
                        f"Full bidirectional coverage for {fk_column} between "
                        f"{table_name} and {ref_table_name}"
                    )

                _logger.info(
                    "run_relational_integrity_checks_from_cache: '%s' <-> '%s' — "
                    "fwd_coverage=%.1f%%, rev_coverage=%.1f%%",
                    table_name, ref_table_name, fwd_coverage, rev_coverage,
                )

                # Atomic granularity: 1 per FK. Forward direction (source → reference)
                # is the real integrity check; the reverse is informational only.
                result.atomic_total = 1
                result.atomic_passed = 1 if not result.errors else 0
            except Exception as e:
                _logger.error(
                    "Cached relational check failed for '%s' <-> '%s': %s",
                    table_name, ref_table_name, e,
                )
                result.add_error(f"Error checking relational integrity: {str(e)}")
                if result.atomic_total is None:
                    result.atomic_total = 1
                    result.atomic_passed = 0

            results.setdefault(table_name, {})[fk_column] = result

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_relational_integrity_checks_from_cache: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

clifpy.utils.validator.run_cross_table_completeness_checks_from_cache

run_cross_table_completeness_checks_from_cache(caches)

Run cross-table conditional completeness checks (K.5) from caches.

For each YAML rule in cross_table_conditional_requirements, computes the set of join-column IDs that satisfy the source condition but are missing the required target column value.

Parameters:

Name Type Description Default
caches dict

{table_name: cache_dict} as returned by :func:extract_cross_table_cache.

required

Returns:

Type Description
Dict[str, Dict[str, DQACompletenessResult]]

Results keyed by target table name, then by a descriptive check key.

Source code in clifpy/utils/validator.py
def run_cross_table_completeness_checks_from_cache(
    caches: Dict[str, Dict[str, Any]],
) -> Dict[str, Dict[str, DQACompletenessResult]]:
    """Run cross-table conditional completeness checks (K.5) from caches.

    For each YAML rule in ``cross_table_conditional_requirements``, computes
    the set of join-column IDs that satisfy the source condition but are
    missing the required target column value.

    Parameters
    ----------
    caches : dict
        ``{table_name: cache_dict}`` as returned by
        :func:`extract_cross_table_cache`.

    Returns
    -------
    Dict[str, Dict[str, DQACompletenessResult]]
        Results keyed by **target** table name, then by a descriptive
        check key.
    """
    _logger.info(
        "run_cross_table_completeness_checks_from_cache: starting with %d cached tables",
        len(caches),
    )

    ct_cond_rules = _load_validation_rules().get('cross_table_conditional_requirements', [])
    if not ct_cond_rules:
        return {}

    results: Dict[str, Dict[str, DQACompletenessResult]] = {}

    for rule in ct_cond_rules:
        rule_key = f"{rule['source_column']}_{rule['target_column']}"
        source_table = rule['source_table']
        target_table = rule['target_table']

        # Both tables must be cached
        if source_table not in caches or target_table not in caches:
            _logger.debug(
                "K.5: skipping rule '%s' — source '%s' or target '%s' not cached",
                rule_key, source_table, target_table,
            )
            continue

        source_cache = caches[source_table]
        target_cache = caches[target_table]

        source_ids = source_cache.get('conditional_source_ids', {}).get(rule_key)
        target_ids = target_cache.get('conditional_target_ids', {}).get(rule_key)

        result = DQACompletenessResult(
            "cross_table_conditional_completeness",
            f"{source_table}->{target_table}",
        )

        if source_ids is None or target_ids is None:
            result.add_info(
                "No cross-table conditional requirements applicable"
            )
            result.atomic_total = 0
            result.atomic_passed = 0
            results.setdefault(target_table, {})[rule_key] = result
            continue

        if not source_ids:
            result.add_info(
                f"No {rule['source_column']} = {rule['source_value']} found in "
                f"{source_table}; cross-table conditional check not triggered"
            )
            result.atomic_total = 0
            result.atomic_passed = 0
            results.setdefault(target_table, {})[rule_key] = result
            continue

        missing_ids = source_ids - target_ids
        total = len(source_ids)
        missing_count = len(missing_ids)
        pct = round(missing_count / total * 100, 1) if total > 0 else 0

        result.metrics["total_matching_source"] = total
        result.metrics["missing_in_target"] = missing_count
        result.metrics["coverage_percent"] = round(100 - pct, 2)

        if missing_ids:
            sample = sorted(list(missing_ids))[:10]
            result.add_warning(
                f"{missing_count}/{total} patients discharged as {rule['source_value']} "
                f"in {source_table} are missing {rule['target_column']} in {target_table} "
                f"({pct}% missing)",
                {
                    "column": rule['target_column'],
                    "missing_count": missing_count,
                    "total_matching": total,
                    "percent_missing": pct,
                    "sample_ids": sample,
                    "source_condition": f"{rule['source_column']} in {rule['source_value']}",
                },
            )
        else:
            result.add_info(
                f"All {total} patients discharged as {rule['source_value']} in "
                f"{source_table} have {rule['target_column']} in {target_table}"
            )

        result.atomic_total = 1
        result.atomic_passed = 1 if not result.errors else 0
        results.setdefault(target_table, {})[rule_key] = result
        _logger.info(
            "K.5: %s->%s rule '%s': %d/%d missing (%.1f%%)",
            source_table, target_table, rule_key, missing_count, total, pct,
        )

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_cross_table_completeness_checks_from_cache: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

clifpy.utils.validator.run_cross_table_plausibility_checks_from_cache

run_cross_table_plausibility_checks_from_cache(caches, plausibility_thresholds=None)

Run cross-table plausibility checks using pre-extracted caches.

Equivalent to :func:run_cross_table_plausibility_checks but uses cached temporal subset DataFrames and hospitalization bounds instead of full DataFrames.

Parameters:

Name Type Description Default
caches dict

{table_name: cache_dict} as returned by :func:extract_cross_table_cache.

required
plausibility_thresholds dict

Override default plausibility thresholds per check.

None

Returns:

Type Description
Dict[str, Dict[str, DQAPlausibilityResult]]

Same structure as :func:run_cross_table_plausibility_checks.

Source code in clifpy/utils/validator.py
def run_cross_table_plausibility_checks_from_cache(
    caches: Dict[str, Dict[str, Any]],
    plausibility_thresholds: Optional[Dict[str, Dict[str, float]]] = None,
) -> Dict[str, Dict[str, DQAPlausibilityResult]]:
    """Run cross-table plausibility checks using pre-extracted caches.

    Equivalent to :func:`run_cross_table_plausibility_checks` but uses
    cached temporal subset DataFrames and hospitalization bounds instead
    of full DataFrames.

    Parameters
    ----------
    caches : dict
        ``{table_name: cache_dict}`` as returned by
        :func:`extract_cross_table_cache`.
    plausibility_thresholds : dict, optional
        Override default plausibility thresholds per check.

    Returns
    -------
    Dict[str, Dict[str, DQAPlausibilityResult]]
        Same structure as :func:`run_cross_table_plausibility_checks`.
    """
    _logger.info(
        "run_cross_table_plausibility_checks_from_cache: starting with %d cached tables",
        len(caches),
    )

    # Merge caller overrides with defaults
    thresholds = {k: dict(v) for k, v in _DEFAULT_PLAUSIBILITY_THRESHOLDS.items()}
    if plausibility_thresholds:
        for check_name, overrides in plausibility_thresholds.items():
            if check_name in thresholds:
                thresholds[check_name].update(overrides)
            else:
                thresholds[check_name] = overrides

    # Need hospitalization bounds
    hosp_cache = caches.get('hospitalization')
    if hosp_cache is None or hosp_cache.get('hosp_bounds_df') is None:
        _logger.info("Hospitalization cache not available; skipping cross-table plausibility")
        return {}

    hosp_bounds_df = hosp_cache['hosp_bounds_df']
    # Ensure hosp_bounds_df matches active backend
    if _ACTIVE_BACKEND == 'polars' and isinstance(hosp_bounds_df, pd.DataFrame):
        hosp_bounds_df = pl.from_pandas(hosp_bounds_df)

    results: Dict[str, Dict[str, DQAPlausibilityResult]] = {}

    for tbl_name, cache in caches.items():
        if tbl_name == 'hospitalization':
            continue

        temporal_df = cache.get('temporal_df')
        if temporal_df is None:
            continue

        # Ensure temporal_df matches active backend
        if _ACTIVE_BACKEND == 'polars' and isinstance(temporal_df, pd.DataFrame):
            temporal_df = pl.from_pandas(temporal_df)

        time_cols = _CROSS_TABLE_TIME_COLUMNS.get(tbl_name, [])
        if not time_cols:
            continue

        # Determine available time columns in the cached temporal subset
        if HAS_POLARS and isinstance(temporal_df, (pl.DataFrame, pl.LazyFrame)):
            actual_cols = (temporal_df.collect_schema().names()
                          if isinstance(temporal_df, pl.LazyFrame)
                          else temporal_df.columns)
        else:
            actual_cols = temporal_df.columns.tolist()

        available_time_cols = [c for c in time_cols if c in actual_cols]
        if not available_time_cols or 'hospitalization_id' not in actual_cols:
            continue

        result = check_cross_table_temporal_plausibility(
            temporal_df, hosp_bounds_df, tbl_name, available_time_cols,
            **thresholds['cross_table_temporal']
        )
        results.setdefault(tbl_name, {})["cross_table_temporal"] = result

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_cross_table_plausibility_checks_from_cache: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

Report Generation

clifpy.utils.report_generator.collect_dqa_issues

collect_dqa_issues(validation_data)

Collect errors, warnings, and info messages from run_full_dqa output.

Returns (category_scores, all_issues) where each issue is a dict with category, check_type, severity ('error'/'warning'/'info'), message, details, plus enriched fields: rule_code, rule_description, column_field.

Scoring reads atomic_total/atomic_passed on each check's result. Both fields are required on every DQAResult producer — the check's "natural atomic unit" decides the granularity (per-column, per-rule, per-permissible-value, or 1 for binary checks). If a check result is missing atomic counts, this function raises ValueError rather than silently approximating the score from message counts; populate the fields in the check itself (see clifpy/utils/validator.py).

Source code in clifpy/utils/report_generator.py
def collect_dqa_issues(validation_data: Dict[str, Any]):
    """Collect errors, warnings, and info messages from run_full_dqa output.

    Returns (category_scores, all_issues) where each issue is a dict with
    category, check_type, severity ('error'/'warning'/'info'), message, details,
    plus enriched fields: rule_code, rule_description, column_field.

    Scoring reads ``atomic_total``/``atomic_passed`` on each check's result.
    Both fields are **required** on every DQAResult producer — the check's
    "natural atomic unit" decides the granularity (per-column, per-rule,
    per-permissible-value, or 1 for binary checks). If a check result is
    missing atomic counts, this function raises ``ValueError`` rather than
    silently approximating the score from message counts; populate the
    fields in the check itself (see ``clifpy/utils/validator.py``).
    """
    category_scores = {}
    all_issues: List[Dict[str, Any]] = []

    for category in DQA_CATEGORIES:
        checks = validation_data.get(category, {})
        if not checks:
            continue

        cat_passed = 0
        cat_total = 0

        for check_name, d in checks.items():
            # Enrich this check's messages
            check_enriched: List[Dict[str, Any]] = []
            for err in d['errors']:
                issue = {
                    'category': category,
                    'check_type': d['check_type'],
                    'severity': 'error',
                    'message': err.get('message', ''),
                    'details': err.get('details', {}),
                }
                enriched = enrich_issue(issue, check_key=check_name)
                if enriched is not None:
                    check_enriched.append(enriched)
            for warn in d['warnings']:
                issue = {
                    'category': category,
                    'check_type': d['check_type'],
                    'severity': 'warning',
                    'message': warn.get('message', ''),
                    'details': warn.get('details', {}),
                }
                enriched = enrich_issue(issue, check_key=check_name)
                if enriched is not None:
                    check_enriched.append(enriched)
            for info_msg in d.get('info', []):
                issue = {
                    'category': category,
                    'check_type': d['check_type'],
                    'severity': 'info',
                    'message': info_msg.get('message', ''),
                    'details': info_msg.get('details', {}),
                }
                enriched = enrich_issue(issue, check_key=check_name)
                if enriched is not None:
                    check_enriched.append(enriched)

            # Score this check: atomic counts are now mandatory on every
            # DQAResult producer in validator.py. A check reaching this
            # branch without atomic_total/atomic_passed is a bug in the
            # check itself — don't silently absorb it.
            atomic_t = d.get('atomic_total')
            atomic_p = d.get('atomic_passed')
            if atomic_t is None or atomic_p is None:
                raise ValueError(
                    f"Check '{check_name}' in category '{category}' is missing "
                    f"atomic_total/atomic_passed. Every DQA check must populate "
                    f"these fields; see clifpy/utils/validator.py for the pattern."
                )
            cat_total += atomic_t
            cat_passed += atomic_p

            collapsed = _collapse_info_rows(check_enriched)
            _reconcile_atomic_counts(
                collapsed, atomic_t, atomic_p, category, d['check_type'],
                check_key=check_name,
            )
            all_issues.extend(collapsed)

        if cat_total > 0:
            category_scores[category] = (cat_passed, cat_total)

    return category_scores, all_issues

clifpy.utils.report_generator.generate_validation_pdf

generate_validation_pdf(validation_data, table_name, output_path, site_name=None, feedback=None)

Generate a PDF report from DQA validation results.

Parameters:

Name Type Description Default
validation_data dict

Output from run_full_dqa (keys: conformance, completeness, relational, plausibility).

required
table_name str

Name of the table.

required
output_path str

Path where PDF should be saved.

required
site_name str

Name of the site/hospital.

None
feedback dict

User feedback with 'user_decisions' keyed by error_id.

None

Returns:

Type Description
str

Path to generated PDF file.

Source code in clifpy/utils/report_generator.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
def generate_validation_pdf(validation_data: Dict[str, Any],
                            table_name: str, output_path: str,
                            site_name: Optional[str] = None,
                            feedback: Optional[Dict[str, Any]] = None) -> str:
    """
    Generate a PDF report from DQA validation results.

    Parameters
    ----------
    validation_data : dict
        Output from run_full_dqa (keys: conformance, completeness,
        relational, plausibility).
    table_name : str
        Name of the table.
    output_path : str
        Path where PDF should be saved.
    site_name : str, optional
        Name of the site/hospital.
    feedback : dict, optional
        User feedback with 'user_decisions' keyed by error_id.

    Returns
    -------
    str
        Path to generated PDF file.
    """
    category_scores, all_issues = collect_dqa_issues(validation_data)

    # Build feedback lookup: error_id -> decision dict
    feedback_lookup: Dict[str, Dict[str, Any]] = {}
    if feedback and feedback.get('user_decisions'):
        feedback_lookup = feedback['user_decisions']

    # Build set of rejected error IDs so we can adjust summary counts
    rejected_ids: set = {
        eid for eid, d in feedback_lookup.items()
        if d.get('decision') == 'rejected'
    }

    total_passed = sum(p for p, _ in category_scores.values())
    total_checks = sum(t for _, t in category_scores.values())
    error_count = sum(i.get('atomic_count', 1) for i in all_issues if i['severity'] == 'error')
    warning_count = sum(i.get('atomic_count', 1) for i in all_issues if i['severity'] == 'warning')

    # Adjust counts: rejected errors no longer count as errors
    if rejected_ids:
        rejected_error_count = sum(
            i.get('atomic_count', 1) for i in all_issues
            if i['severity'] == 'error' and _make_error_id(i) in rejected_ids
        )
        error_count -= rejected_error_count
        total_passed += rejected_error_count

    # --- Build PDF ---
    doc = SimpleDocTemplate(output_path, pagesize=letter)
    story = []
    styles = getSampleStyleSheet()

    primary_color = colors.HexColor('#1F4E79')
    text_dark = colors.HexColor('#2C3E50')
    text_medium = colors.HexColor('#5D6D7E')
    header_bg = colors.HexColor('#F5F6FA')
    pass_bg = colors.HexColor('#E8F5E8')
    fail_bg = colors.HexColor('#FFEAEA')
    warn_bg = colors.HexColor('#FFF3E0')

    title_style = ParagraphStyle(
        'CustomTitle', parent=styles['Heading1'],
        fontSize=22, textColor=primary_color, spaceAfter=24,
        alignment=TA_CENTER, fontName='Helvetica-Bold',
    )
    heading_style = ParagraphStyle(
        'CustomHeading', parent=styles['Heading2'],
        fontSize=14, textColor=text_dark, spaceAfter=10,
        spaceBefore=16, fontName='Helvetica-Bold',
    )
    timestamp_style = ParagraphStyle(
        'TimestampStyle', parent=styles['Normal'],
        fontSize=8, textColor=text_medium, alignment=1,
        fontName='Helvetica',
    )

    # Cell styles for issue detail tables
    cell_style = ParagraphStyle(
        'CellStyle', parent=styles['Normal'],
        fontSize=7, leading=9, fontName='Helvetica',
        textColor=text_dark,
    )
    cell_bold_style = ParagraphStyle(
        'CellBoldStyle', parent=cell_style,
        fontName='Helvetica-Bold',
    )

    # Timestamp
    ts = Table(
        [[Paragraph(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", timestamp_style)]],
        colWidths=[7.5 * inch],
    )
    ts.setStyle(TableStyle([
        ('ALIGN', (0, 0), (0, 0), 'RIGHT'),
        ('TOPPADDING', (0, 0), (0, 0), -24),
        ('BOTTOMPADDING', (0, 0), (0, 0), 2),
    ]))
    story.append(ts)

    # Title
    title_text = f"{site_name + ' ' if site_name else ''}CLIF DQA Report Card"
    story.append(Paragraph(title_text, title_style))
    story.append(Paragraph(f"{table_name.title()} Table", heading_style))
    story.append(Spacer(1, 0.2 * inch))

    # --- Feedback banner (only when feedback with decisions exists) ---
    has_feedback = bool(feedback_lookup)
    if has_feedback:
        n_accepted = sum(1 for d in feedback_lookup.values() if d.get('decision') == 'accepted')
        n_rejected = sum(1 for d in feedback_lookup.values() if d.get('decision') == 'rejected')
        if n_accepted > 0 or n_rejected > 0:
            from datetime import datetime as _dt
            _raw_ts = feedback.get('timestamp', '')
            try:
                fb_ts = _dt.fromisoformat(_raw_ts).strftime('%Y-%m-%d %H:%M')
            except (ValueError, TypeError):
                fb_ts = _raw_ts
            banner_style = ParagraphStyle(
                'FeedbackBanner', parent=styles['Normal'],
                fontSize=8, textColor=colors.HexColor('#1F4E79'),
                fontName='Helvetica',
            )
            banner_text = (
                f"<i>This report was updated based on feedback provided on {fb_ts}. "
                f"Accepted: {n_accepted} | Rejected: {n_rejected}</i>"
            )
            banner_tbl = Table(
                [[Paragraph(banner_text, banner_style)]],
                colWidths=[7.5 * inch],
            )
            banner_tbl.setStyle(TableStyle([
                ('BACKGROUND', (0, 0), (0, 0), colors.HexColor('#E8F0FE')),
                ('TOPPADDING', (0, 0), (0, 0), 6),
                ('BOTTOMPADDING', (0, 0), (0, 0), 6),
                ('LEFTPADDING', (0, 0), (0, 0), 10),
                ('RIGHTPADDING', (0, 0), (0, 0), 10),
            ]))
            story.append(banner_tbl)
            story.append(Spacer(1, 0.15 * inch))

    # --- DQA Summary Table ---
    story.append(Paragraph("DQA Summary", heading_style))
    summary_header = ['Category', 'Non-Error', 'Total', 'Errors', 'Warnings']
    summary_rows = [summary_header]
    for category in DQA_CATEGORIES:
        if category not in category_scores:
            continue
        passed, total = category_scores[category]
        cat_issues = [i for i in all_issues if i['category'] == category]
        cat_errors = sum(i.get('atomic_count', 1) for i in cat_issues if i['severity'] == 'error')
        cat_warnings = sum(i.get('atomic_count', 1) for i in cat_issues if i['severity'] == 'warning')
        # Adjust for rejected errors in this category
        if rejected_ids:
            cat_rejected = sum(
                i.get('atomic_count', 1) for i in cat_issues
                if i['severity'] == 'error' and _make_error_id(i) in rejected_ids
            )
            cat_errors -= cat_rejected
            passed += cat_rejected
        summary_rows.append([category.title(), str(passed), str(total),
                             str(cat_errors), str(cat_warnings)])
    summary_rows.append(['Overall', str(total_passed), str(total_checks),
                         str(error_count), str(warning_count)])

    summary_tbl = Table(summary_rows, colWidths=[2 * inch, 0.8 * inch, 0.8 * inch,
                                                  0.8 * inch, 0.8 * inch])
    tbl_style = [
        ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
        ('FONTSIZE', (0, 0), (-1, -1), 9),
        ('BACKGROUND', (0, 0), (-1, 0), header_bg),
        ('TEXTCOLOR', (0, 0), (-1, -1), text_dark),
        ('GRID', (0, 0), (-1, -1), 0.5, colors.HexColor('#DADADA')),
        ('TOPPADDING', (0, 0), (-1, -1), 6),
        ('BOTTOMPADDING', (0, 0), (-1, -1), 6),
        ('LEFTPADDING', (0, 0), (-1, -1), 10),
        ('RIGHTPADDING', (0, 0), (-1, -1), 10),
    ]
    # Color-code error/warning cells
    for row_idx in range(1, len(summary_rows)):
        errors_val = int(summary_rows[row_idx][3])
        warnings_val = int(summary_rows[row_idx][4])
        if errors_val > 0:
            tbl_style.append(('BACKGROUND', (3, row_idx), (3, row_idx), fail_bg))
        else:
            tbl_style.append(('BACKGROUND', (3, row_idx), (3, row_idx), pass_bg))
        if warnings_val > 0:
            tbl_style.append(('BACKGROUND', (4, row_idx), (4, row_idx), warn_bg))
        else:
            tbl_style.append(('BACKGROUND', (4, row_idx), (4, row_idx), pass_bg))
    summary_tbl.setStyle(TableStyle(tbl_style))
    story.append(summary_tbl)
    story.append(Spacer(1, 0.3 * inch))

    # --- Data Profile Table ---
    table_stats = validation_data.get('table_stats', [])
    if table_stats:
        story.append(Paragraph("Data Profile", heading_style))
        total_rows = validation_data.get('total_rows', 0)
        profile_subtitle = ParagraphStyle(
            'ProfileSubtitle', parent=styles['Normal'],
            fontSize=9, textColor=text_medium, fontName='Helvetica',
        )
        story.append(Paragraph(f"Total Rows: {total_rows:,}", profile_subtitle))
        story.append(Spacer(1, 0.1 * inch))

        profile_header = ['Column', 'Dtype', 'Null', 'Null%', 'Unique', 'Min', 'Max']
        profile_rows = [profile_header]
        for s in table_stats:
            profile_rows.append([
                s['column'], s['dtype'],
                f"{s['null_count']:,}",
                f"{s['null_pct']:.1f}%", f"{s['unique']:,}",
                s.get('min') or '', s.get('max') or '',
            ])

        profile_col_widths = [1.6 * inch, 0.9 * inch,
                              0.6 * inch, 0.6 * inch, 0.6 * inch,
                              1.3 * inch, 1.3 * inch]
        profile_tbl = Table(profile_rows, colWidths=profile_col_widths)

        profile_style = [
            ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
            ('FONTSIZE', (0, 0), (-1, -1), 7),
            ('BACKGROUND', (0, 0), (-1, 0), header_bg),
            ('TEXTCOLOR', (0, 0), (-1, -1), text_dark),
            ('GRID', (0, 0), (-1, -1), 0.5, colors.HexColor('#DADADA')),
            ('TOPPADDING', (0, 0), (-1, -1), 3),
            ('BOTTOMPADDING', (0, 0), (-1, -1), 3),
            ('LEFTPADDING', (0, 0), (-1, -1), 4),
            ('RIGHTPADDING', (0, 0), (-1, -1), 4),
            # Right-align numeric columns (Non-Null, Null, Null%, Unique)
            ('ALIGN', (2, 0), (-1, -1), 'RIGHT'),
        ]
        # Color-code Null% cells
        amber_bg = colors.HexColor('#FFF3E0')
        red_light_bg = colors.HexColor('#FFEAEA')
        for row_idx, s in enumerate(table_stats, 1):
            if s['null_pct'] > 50:
                profile_style.append(('BACKGROUND', (3, row_idx), (3, row_idx), red_light_bg))
            elif s['null_pct'] > 10:
                profile_style.append(('BACKGROUND', (3, row_idx), (3, row_idx), amber_bg))

        profile_tbl.setStyle(TableStyle(profile_style))
        story.append(profile_tbl)
        story.append(Spacer(1, 0.3 * inch))

    # --- Per-category issue details as structured tables ---
    rejected_bg = colors.HexColor('#E0E0E0')
    rejected_text = colors.HexColor('#999999')

    if all_issues:
        story.append(PageBreak())
        story.append(Paragraph("Details", heading_style))

        # Column order: rule, rule_description, column_field, severity, finding,
        # checks, [status]. Metric is omitted because each sub-table is already
        # a single category.
        if has_feedback:
            detail_col_widths = [0.5 * inch, 1.7 * inch, 1.1 * inch,
                                 0.5 * inch, 2.7 * inch, 0.5 * inch, 0.5 * inch]
        else:
            detail_col_widths = [0.5 * inch, 1.7 * inch, 1.1 * inch,
                                 0.5 * inch, 3.2 * inch, 0.5 * inch]
        checks_col_idx = 5

        for category in DQA_CATEGORIES:
            cat_issues = [i for i in all_issues if i['category'] == category]
            if not cat_issues:
                continue

            cs = category_scores.get(category)
            header_tail = f"({cs[0]}/{cs[1]})" if cs else f"({len(cat_issues)})"
            story.append(Paragraph(f"{category.title()} {header_tail}", heading_style))

            # Header row
            header_row = [
                Paragraph('<b>rule</b>', cell_bold_style),
                Paragraph('<b>rule_description</b>', cell_bold_style),
                Paragraph('<b>column_field</b>', cell_bold_style),
                Paragraph('<b>severity</b>', cell_bold_style),
                Paragraph('<b>finding</b>', cell_bold_style),
                Paragraph('<b>checks</b>', cell_bold_style),
            ]
            if has_feedback:
                header_row.append(Paragraph('<b>status</b>', cell_bold_style))
            table_data = [header_row]

            for issue in cat_issues:
                severity_upper = issue['severity'].upper()
                finding_text = truncate_comment(issue.get('finding', issue['message']))
                # Build finding cell: text + optional sparkline for temporal checks
                spark_width = 2.5 * inch if has_feedback else 3.0 * inch
                yearly_counts = issue.get('details', {}).get('yearly_counts')
                if yearly_counts:
                    finding_cell = [
                        Paragraph(escape(finding_text), cell_style),
                        Spacer(1, 2),
                        YearlySparkBar(yearly_counts, width=spark_width, height=16),
                    ]
                else:
                    finding_cell = Paragraph(escape(finding_text), cell_style)
                row = [
                    Paragraph(escape(issue.get('rule_code', '')), cell_style),
                    Paragraph(escape(issue.get('rule_description', '')), cell_style),
                    Paragraph(escape(issue.get('column_field', 'NA')), cell_style),
                    Paragraph(escape(severity_upper), cell_style),
                    finding_cell,
                    Paragraph(
                        '—' if issue.get('atomic_count', 1) == 0
                        else str(issue.get('atomic_count', 1)),
                        cell_style,
                    ),
                ]
                if has_feedback:
                    if issue['severity'] == 'error':
                        error_id = _make_error_id(issue)
                        decision_info = feedback_lookup.get(error_id, {})
                        status_text = decision_info.get('decision', '').upper()
                        row.append(Paragraph(escape(status_text), cell_style))
                    else:
                        row.append(Paragraph('', cell_style))
                table_data.append(row)

            detail_tbl = Table(table_data, colWidths=detail_col_widths,
                               repeatRows=1)

            # Base table style
            detail_style = [
                ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
                ('FONTSIZE', (0, 0), (-1, -1), 7),
                ('BACKGROUND', (0, 0), (-1, 0), header_bg),
                ('TEXTCOLOR', (0, 0), (-1, -1), text_dark),
                ('GRID', (0, 0), (-1, -1), 0.5, colors.HexColor('#DADADA')),
                ('TOPPADDING', (0, 0), (-1, -1), 3),
                ('BOTTOMPADDING', (0, 0), (-1, -1), 3),
                ('LEFTPADDING', (0, 0), (-1, -1), 4),
                ('RIGHTPADDING', (0, 0), (-1, -1), 4),
                ('VALIGN', (0, 0), (-1, -1), 'TOP'),
                ('ALIGN', (checks_col_idx, 0), (checks_col_idx, -1), 'RIGHT'),
            ]

            # Color-code rows by severity, override with grey for rejected
            for row_idx, issue in enumerate(cat_issues, 1):
                error_id = _make_error_id(issue)
                decision = feedback_lookup.get(error_id, {}).get('decision', '')
                if decision == 'rejected':
                    detail_style.append(('BACKGROUND', (0, row_idx), (-1, row_idx), rejected_bg))
                    detail_style.append(('TEXTCOLOR', (0, row_idx), (-1, row_idx), rejected_text))
                elif issue['severity'] == 'error':
                    detail_style.append(('BACKGROUND', (0, row_idx), (-1, row_idx), fail_bg))
                elif issue['severity'] == 'warning':
                    detail_style.append(('BACKGROUND', (0, row_idx), (-1, row_idx), warn_bg))

            detail_tbl.setStyle(TableStyle(detail_style))
            story.append(detail_tbl)

            # Add note about monthly trend CSVs for temporal consistency checks
            temporal_cols = sorted({
                i['column_field'] for i in cat_issues
                if i.get('check_type') == 'category_temporal_consistency'
                and i.get('column_field')
            })
            if temporal_cols:
                file_list = ", ".join(
                    f"{table_name}_{col}_monthly.csv" for col in temporal_cols
                )
                note_text = (
                    f"<i>P.6 Category temporal consistency — monthly breakdown available at: "
                    f"output/final/validation/monthly_trends/ ({file_list})</i>"
                )
                note_style = ParagraphStyle(
                    'NoteStyle', parent=styles['Normal'],
                    fontSize=7, textColor=colors.HexColor('#555555'),
                )
                story.append(Paragraph(note_text, note_style))

            story.append(Spacer(1, 0.2 * inch))
    else:
        story.append(Paragraph("No validation issues found!", styles['Normal']))

    doc.build(story)
    return output_path

clifpy.utils.report_generator.generate_text_report

generate_text_report(validation_data, table_name, output_path, site_name=None)

Generate a plain-text DQA report.

Parameters match generate_validation_pdf.

Source code in clifpy/utils/report_generator.py
def generate_text_report(validation_data: Dict[str, Any],
                         table_name: str, output_path: str,
                         site_name: Optional[str] = None) -> str:
    """
    Generate a plain-text DQA report.

    Parameters match generate_validation_pdf.
    """
    category_scores, all_issues = collect_dqa_issues(validation_data)
    total_passed = sum(p for p, _ in category_scores.values())
    total_checks = sum(t for _, t in category_scores.values())
    error_count = sum(i.get('atomic_count', 1) for i in all_issues if i['severity'] == 'error')
    warning_count = sum(i.get('atomic_count', 1) for i in all_issues if i['severity'] == 'warning')

    lines = []
    lines.append("=" * 120)
    lines.append("CLIF 2.1 DQA VALIDATION REPORT")
    lines.append(f"{table_name.upper()} TABLE")
    lines.append("=" * 120)
    lines.append("")
    if site_name:
        lines.append(f"Site: {site_name}")
    lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    lines.append("")

    # DQA Summary
    lines.append("-" * 120)
    lines.append("DQA SUMMARY")
    lines.append("-" * 120)
    lines.append(f"  {'Category':20s}  {'Passed':>6s}  {'Total':>5s}  {'Errors':>6s}  {'Warnings':>8s}")
    lines.append(f"  {'-'*20}  {'-'*6}  {'-'*5}  {'-'*6}  {'-'*8}")
    for category in DQA_CATEGORIES:
        if category not in category_scores:
            continue
        passed, total = category_scores[category]
        cat_issues = [i for i in all_issues if i['category'] == category]
        cat_errors = sum(i.get('atomic_count', 1) for i in cat_issues if i['severity'] == 'error')
        cat_warnings = sum(i.get('atomic_count', 1) for i in cat_issues if i['severity'] == 'warning')
        lines.append(f"  {category.title():20s}  {passed:>6d}  {total:>5d}  {cat_errors:>6d}  {cat_warnings:>8d}")
    lines.append(f"  {'Overall':20s}  {total_passed:>6d}  {total_checks:>5d}  {error_count:>6d}  {warning_count:>8d}")
    lines.append("")

    # Data Profile
    table_stats = validation_data.get('table_stats', [])
    if table_stats:
        lines.append("-" * 120)
        lines.append("DATA PROFILE")
        lines.append("-" * 120)
        total_rows = validation_data.get('total_rows', 0)
        lines.append(f"  Total Rows: {total_rows:,}")
        lines.append("")
        w_col, w_dtype, w_null, w_pct, w_uniq, w_min, w_max = 25, 12, 8, 8, 10, 20, 20
        hdr = (f"  {'Column':<{w_col}}"
               f"{'Dtype':<{w_dtype}}"
               f"{'Null':>{w_null}}"
               f"{'Null%':>{w_pct}}"
               f"{'Unique':>{w_uniq}}"
               f"  {'Min':<{w_min}}"
               f"{'Max':<{w_max}}")
        lines.append(hdr)
        lines.append("  " + "-" * (w_col + w_dtype + w_null + w_pct + w_uniq + 2 + w_min + w_max))
        for s in table_stats:
            col_name = s['column']
            if len(col_name) > w_col - 2:
                col_name = col_name[:w_col - 4] + '..'
            col_min = s.get('min') or ''
            col_max = s.get('max') or ''
            lines.append(
                f"  {col_name:<{w_col}}"
                f"{s['dtype']:<{w_dtype}}"
                f"{s['null_count']:>{w_null},}"
                f"{s['null_pct']:>{w_pct}.1f}%"
                f"{s['unique']:>{w_uniq},}"
                f"  {col_min:<{w_min}}"
                f"{col_max:<{w_max}}"
            )
        lines.append("")

    # Issue details as tabular text
    if all_issues:
        lines.append("=" * 120)
        lines.append("DETAILS")
        lines.append("=" * 120)

        # Column widths for text alignment
        w_rule, w_desc, w_col, w_sev, w_checks = 6, 30, 18, 10, 7

        for category in DQA_CATEGORIES:
            cat_issues = [i for i in all_issues if i['category'] == category]
            if not cat_issues:
                continue

            lines.append("")
            cs = category_scores.get(category)
            header_tail = f"({cs[0]}/{cs[1]})" if cs else f"({len(cat_issues)})"
            lines.append(f"-- {category.title()} {header_tail} --")
            lines.append("")

            # Header
            hdr = (f"  {'rule':<{w_rule}}"
                   f"{'rule_description':<{w_desc}}"
                   f"{'column_field':<{w_col}}"
                   f"{'severity':<{w_sev}}"
                   f"{'checks':>{w_checks}}  "
                   f"finding")
            lines.append(hdr)
            lines.append("  " + "-" * 116)

            for issue in cat_issues:
                severity_upper = issue['severity'].upper()
                rule_code = issue.get('rule_code', '')
                rule_desc = issue.get('rule_description', '')
                col_field = issue.get('column_field', 'NA')
                finding = issue.get('finding', issue['message'])
                checks = issue.get('atomic_count', 1)
                checks_str = '—' if checks == 0 else f"{checks:d}"

                # Truncate long fields for text display
                if len(rule_desc) > w_desc - 2:
                    rule_desc = rule_desc[:w_desc - 4] + '..'
                if len(col_field) > w_col - 2:
                    col_field = col_field[:w_col - 4] + '..'

                line = (f"  {rule_code:<{w_rule}}"
                        f"{rule_desc:<{w_desc}}"
                        f"{col_field:<{w_col}}"
                        f"{severity_upper:<{w_sev}}"
                        f"{checks_str:>{w_checks}s}  "
                        f"{finding}")
                lines.append(line)
    else:
        lines.append("No validation issues found!")

    lines.append("")
    lines.append("=" * 120)
    lines.append("END OF REPORT")
    lines.append("=" * 120)

    with open(output_path, 'w') as f:
        f.write('\n'.join(lines))

    return output_path

Backward Compatibility

clifpy.utils.validator.validate_dataframe

validate_dataframe(df, schema, table_name=None, plausibility_thresholds=None)

Validate a dataframe against schema and return list of errors.

This function provides compatibility with CLIF-TableOne's expected validation interface.

Parameters:

Name Type Description Default
df pd.DataFrame, pl.DataFrame, or pl.LazyFrame

Data to validate

required
schema dict

Table schema containing columns, required_columns, etc.

required
table_name str

Name of the table (inferred from schema if not provided)

None
plausibility_thresholds dict

Override default plausibility thresholds per check.

None

Returns:

Type Description
List[Dict[str, Any]]

List of error dictionaries with keys: - type: str - Error type/check name - description: str - Human-readable error description - details: dict - Additional error details - category: str - 'schema' or 'data_quality'

Source code in clifpy/utils/validator.py
def validate_dataframe(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: Optional[str] = None,
    plausibility_thresholds: Optional[Dict[str, Dict[str, float]]] = None,
) -> List[Dict[str, Any]]:
    """
    Validate a dataframe against schema and return list of errors.

    This function provides compatibility with CLIF-TableOne's expected
    validation interface.

    Parameters
    ----------
    df : pd.DataFrame, pl.DataFrame, or pl.LazyFrame
        Data to validate
    schema : dict
        Table schema containing columns, required_columns, etc.
    table_name : str, optional
        Name of the table (inferred from schema if not provided)
    plausibility_thresholds : dict, optional
        Override default plausibility thresholds per check.

    Returns
    -------
    List[Dict[str, Any]]
        List of error dictionaries with keys:
        - type: str - Error type/check name
        - description: str - Human-readable error description
        - details: dict - Additional error details
        - category: str - 'schema' or 'data_quality'
    """
    table_name = table_name or schema.get('table_name', 'unknown')
    _logger.info("validate_dataframe: starting validation for table '%s'", table_name)
    errors = []

    # Schema-level checks (affect 'incomplete' status)
    schema_checks = ['required_columns', 'column_dtypes']

    # Run conformance checks
    conformance_results = run_conformance_checks(df, schema, table_name)
    for check_name, result in conformance_results.items():
        category = 'schema' if check_name in schema_checks else 'data_quality'

        for err in result.errors:
            errors.append({
                'type': _format_check_type(check_name),
                'description': err['message'],
                'details': err.get('details', {}),
                'category': category
            })

        # Include warnings as data quality issues
        for warn in result.warnings:
            errors.append({
                'type': _format_check_type(check_name),
                'description': warn['message'],
                'details': warn.get('details', {}),
                'category': 'data_quality',
                'severity': 'warning'
            })

    # Run completeness checks
    completeness_results = run_completeness_checks(df, schema, table_name)
    for check_name, result in completeness_results.items():
        for err in result.errors:
            errors.append({
                'type': _format_check_type(check_name),
                'description': err['message'],
                'details': err.get('details', {}),
                'category': 'data_quality'
            })

        for warn in result.warnings:
            errors.append({
                'type': _format_check_type(check_name),
                'description': warn['message'],
                'details': warn.get('details', {}),
                'category': 'data_quality',
                'severity': 'warning'
            })

    # Run plausibility checks
    plausibility_results = run_plausibility_checks(
        df, schema, table_name,
        plausibility_thresholds=plausibility_thresholds,
    )
    for check_name, result in plausibility_results.items():
        for err in result.errors:
            errors.append({
                'type': _format_check_type(check_name),
                'description': err['message'],
                'details': err.get('details', {}),
                'category': 'data_quality'
            })

        for warn in result.warnings:
            errors.append({
                'type': _format_check_type(check_name),
                'description': warn['message'],
                'details': warn.get('details', {}),
                'category': 'data_quality',
                'severity': 'warning'
            })

    gc.collect()
    error_count = sum(1 for e in errors if e.get('severity', 'error') == 'error')
    warning_count = sum(1 for e in errors if e.get('severity') == 'warning')
    _logger.info("validate_dataframe: table '%s' complete — %d errors, %d warnings",
                 table_name, error_count, warning_count)
    return errors

clifpy.utils.validator.format_clifpy_error

format_clifpy_error(error, row_count, table_name)

Format a validation error for display.

Parameters:

Name Type Description Default
error dict

Error dictionary from validate_dataframe()

required
row_count int

Total row count of the table

required
table_name str

Name of the table

required

Returns:

Type Description
dict

Formatted error with type, description, category, and details

Source code in clifpy/utils/validator.py
def format_clifpy_error(
    error: Dict[str, Any],
    row_count: int,
    table_name: str
) -> Dict[str, Any]:
    """
    Format a validation error for display.

    Parameters
    ----------
    error : dict
        Error dictionary from validate_dataframe()
    row_count : int
        Total row count of the table
    table_name : str
        Name of the table

    Returns
    -------
    dict
        Formatted error with type, description, category, and details
    """
    formatted = {
        'type': error.get('type', 'Unknown Error'),
        'description': error.get('description', str(error)),
        'category': error.get('category', 'other'),
        'details': error.get('details', {}),
        'table_name': table_name,
        'row_count': row_count
    }

    # Add severity if present
    if 'severity' in error:
        formatted['severity'] = error['severity']

    return formatted

clifpy.utils.validator.determine_validation_status

determine_validation_status(errors, required_columns=None, table_name=None)

Determine validation status based on errors.

Status Logic: - INCOMPLETE (red): Missing required columns OR non-castable datatype errors OR 100% null in required columns - PARTIAL (yellow): Required columns present but has data quality issues (missing categorical values, high missingness, etc.) - COMPLETE (green): All required columns present, no critical issues

Parameters:

Name Type Description Default
errors list

List of formatted error dictionaries

required
required_columns list

List of required column names

None
table_name str

Name of the table (for table-specific logic)

None

Returns:

Type Description
str

'complete', 'partial', or 'incomplete'

Source code in clifpy/utils/validator.py
def determine_validation_status(
    errors: List[Dict[str, Any]],
    required_columns: Optional[List[str]] = None,
    table_name: Optional[str] = None
) -> str:
    """
    Determine validation status based on errors.

    Status Logic:
    - INCOMPLETE (red): Missing required columns OR non-castable datatype errors
      OR 100% null in required columns
    - PARTIAL (yellow): Required columns present but has data quality issues
      (missing categorical values, high missingness, etc.)
    - COMPLETE (green): All required columns present, no critical issues

    Parameters
    ----------
    errors : list
        List of formatted error dictionaries
    required_columns : list, optional
        List of required column names
    table_name : str, optional
        Name of the table (for table-specific logic)

    Returns
    -------
    str
        'complete', 'partial', or 'incomplete'
    """
    if not errors:
        _logger.debug("determine_validation_status: no errors — status='complete'")
        return 'complete'

    # Check for schema-level errors that make data incomplete
    for error in errors:
        error_type = error.get('type', '').lower()
        category = error.get('category', '')
        details = error.get('details', {})
        severity = error.get('severity', 'error')

        # Missing required columns -> incomplete
        if 'missing required columns' in error_type or 'missing_columns' in details:
            _logger.debug("determine_validation_status: missing required columns — status='incomplete'")
            return 'incomplete'

        # Non-castable data type errors -> incomplete
        if 'data type' in error_type and category == 'schema':
            if details.get('castable') is False:
                _logger.debug("determine_validation_status: non-castable dtype — status='incomplete'")
                return 'incomplete'

        # 100% missingness in required columns -> incomplete
        if 'missingness' in error_type and severity != 'warning':
            pct_missing = details.get('percent_missing', 0)
            column = details.get('column', '')
            if pct_missing >= 100 and required_columns and column in required_columns:
                _logger.debug("determine_validation_status: 100%% null in '%s' — status='incomplete'", column)
                return 'incomplete'

    # Has errors but not critical -> partial
    has_errors = any(
        e.get('severity', 'error') == 'error'
        for e in errors
    )

    if has_errors:
        _logger.debug("determine_validation_status: has non-critical errors — status='partial'")
        return 'partial'

    # Only warnings -> complete
    _logger.debug("determine_validation_status: warnings only — status='complete'")
    return 'complete'

clifpy.utils.validator.classify_errors_by_status_impact

classify_errors_by_status_impact(errors, required_columns, table_name, config_timezone=None)

Classify errors into status-affecting and informational categories.

Used by PDF/report generators to separate critical errors from informational messages.

Parameters:

Name Type Description Default
errors dict

Dictionary with keys 'schema_errors', 'data_quality_issues', 'other_errors'

required
required_columns list

List of required column names

required
table_name str

Name of the table

required
config_timezone str

Configured timezone (to filter timezone-related errors)

None

Returns:

Type Description
dict

Dictionary with 'status_affecting' and 'informational', each containing 'schema_errors', 'data_quality_issues', and 'other_errors' lists

Source code in clifpy/utils/validator.py
def classify_errors_by_status_impact(
    errors: Dict[str, List[Dict[str, Any]]],
    required_columns: List[str],
    table_name: str,
    config_timezone: Optional[str] = None
) -> Dict[str, Dict[str, List[Dict[str, Any]]]]:
    """
    Classify errors into status-affecting and informational categories.

    Used by PDF/report generators to separate critical errors from
    informational messages.

    Parameters
    ----------
    errors : dict
        Dictionary with keys 'schema_errors', 'data_quality_issues', 'other_errors'
    required_columns : list
        List of required column names
    table_name : str
        Name of the table
    config_timezone : str, optional
        Configured timezone (to filter timezone-related errors)

    Returns
    -------
    dict
        Dictionary with 'status_affecting' and 'informational', each containing
        'schema_errors', 'data_quality_issues', and 'other_errors' lists
    """
    status_affecting = {
        'schema_errors': [],
        'data_quality_issues': [],
        'other_errors': []
    }
    informational = {
        'schema_errors': [],
        'data_quality_issues': [],
        'other_errors': []
    }

    # Columns that are optional and shouldn't affect status
    optional_columns = {
        'patient': ['race', 'ethnicity', 'language'],
        'hospitalization': ['discharge_category'],
    }
    table_optional = optional_columns.get(table_name, [])

    # Process each error category
    for category_key in ['schema_errors', 'data_quality_issues', 'other_errors']:
        for error in errors.get(category_key, []):
            error_type = error.get('type', '').lower()
            details = error.get('details', {})
            description = error.get('description', '').lower()

            is_informational = False

            # Filter out timezone errors for configured timezone
            if config_timezone and 'timezone' in error_type:
                if config_timezone.lower() in description:
                    is_informational = True

            # Filter out optional column issues
            column = details.get('column', '')
            if column in table_optional:
                is_informational = True

            # mCIDE coverage gaps are informational
            if 'mcide' in error_type or 'coverage' in error_type:
                is_informational = True

            # Plausibility warnings are informational; plausibility errors affect status
            plausibility_keywords = ['chronological order', 'numeric range', 'field plausibility',
                                     'dose unit', 'overlapping', 'distribution shift',
                                     'duplicate composite', 'cross-table temporal']
            is_plausibility = any(p in error_type for p in plausibility_keywords)
            if is_plausibility and error.get('severity') == 'warning':
                is_informational = True

            # Warnings are generally informational
            if error.get('severity') == 'warning':
                is_informational = True

            # Classify into appropriate bucket
            if is_informational:
                informational[category_key].append(error)
            else:
                status_affecting[category_key].append(error)

    return {
        'status_affecting': status_affecting,
        'informational': informational
    }

clifpy.utils.validator.get_validation_summary

get_validation_summary(validation_results)

Generate a text summary of validation results.

Parameters:

Name Type Description Default
validation_results dict

Validation results from validate() method

required

Returns:

Type Description
str

Human-readable summary string

Source code in clifpy/utils/validator.py
def get_validation_summary(validation_results: Dict[str, Any]) -> str:
    """
    Generate a text summary of validation results.

    Parameters
    ----------
    validation_results : dict
        Validation results from validate() method

    Returns
    -------
    str
        Human-readable summary string
    """
    status = validation_results.get('status', 'unknown')
    errors = validation_results.get('errors', {})

    schema_count = len(errors.get('schema_errors', []))
    dq_count = len(errors.get('data_quality_issues', []))
    other_count = len(errors.get('other_errors', []))
    total = schema_count + dq_count + other_count

    status_emoji = {
        'complete': '✓',
        'partial': '⚠',
        'incomplete': '✗'
    }

    summary_parts = [
        f"Status: {status_emoji.get(status, '?')} {status.upper()}"
    ]

    if total > 0:
        summary_parts.append(f"Issues: {total} total")
        if schema_count:
            summary_parts.append(f"  - Schema: {schema_count}")
        if dq_count:
            summary_parts.append(f"  - Data Quality: {dq_count}")
        if other_count:
            summary_parts.append(f"  - Other: {other_count}")
    else:
        summary_parts.append("No issues found")

    return '\n'.join(summary_parts)