Skip to content

DQA (Validation) API Reference

CLIFpy's Data Quality Assessment (DQA) module provides comprehensive validation organized around three pillars: Conformance, Completeness, and Plausibility. All checks support dual backends (Polars and DuckDB) and return structured result objects.

For a user-guide introduction, see Data Quality Assessment (DQA).


Result Classes

clifpy.utils.validator.DQAConformanceResult

DQAConformanceResult(check_type, table_name)

Container for DQA conformance check results.

Source code in clifpy/utils/validator.py
def __init__(self, check_type: str, table_name: str):
    self.check_type = check_type
    self.table_name = table_name
    self.passed = True
    self.errors: List[Dict[str, Any]] = []
    self.warnings: List[Dict[str, Any]] = []
    self.info: List[Dict[str, Any]] = []
    self.metrics: Dict[str, Any] = {}

clifpy.utils.validator.DQACompletenessResult

DQACompletenessResult(check_type, table_name)

Container for DQA completeness check results.

Source code in clifpy/utils/validator.py
def __init__(self, check_type: str, table_name: str):
    self.check_type = check_type
    self.table_name = table_name
    self.passed = True
    self.errors: List[Dict[str, Any]] = []
    self.warnings: List[Dict[str, Any]] = []
    self.info: List[Dict[str, Any]] = []
    self.metrics: Dict[str, Any] = {}

clifpy.utils.validator.DQAPlausibilityResult

DQAPlausibilityResult(check_type, table_name)

Container for DQA plausibility check results.

Source code in clifpy/utils/validator.py
def __init__(self, check_type: str, table_name: str):
    self.check_type = check_type
    self.table_name = table_name
    self.passed = True
    self.errors: List[Dict[str, Any]] = []
    self.warnings: List[Dict[str, Any]] = []
    self.info: List[Dict[str, Any]] = []
    self.metrics: Dict[str, Any] = {}

Conformance Checks

Conformance checks verify that data matches expected structure, schema, types, and allowed values.

A.1 — Table Presence

clifpy.utils.validator.check_table_exists

check_table_exists(table_path, table_name, filetype='parquet')

Check if a table file exists at the specified path.

Parameters:

Name Type Description Default
table_path str or Path

Directory containing the table files

required
table_name str

Name of the table to check

required
filetype str

File extension (parquet, csv, etc.)

'parquet'

Returns:

Type Description
DQAConformanceResult

Result object with check status

Source code in clifpy/utils/validator.py
def check_table_exists(
    table_path: Union[str, Path],
    table_name: str,
    filetype: str = 'parquet'
) -> DQAConformanceResult:
    """
    Check if a table file exists at the specified path.

    Parameters
    ----------
    table_path : str or Path
        Directory containing the table files
    table_name : str
        Name of the table to check
    filetype : str
        File extension (parquet, csv, etc.)

    Returns
    -------
    DQAConformanceResult
        Result object with check status
    """
    result = DQAConformanceResult("table_exists", table_name)

    table_path = Path(table_path)
    expected_file = table_path / f"{table_name}.{filetype}"

    if expected_file.exists():
        result.add_info(f"Table file found: {expected_file}")
        result.metrics["file_path"] = str(expected_file)
        result.metrics["file_size_mb"] = expected_file.stat().st_size / (1024 * 1024)
    else:
        result.add_error(
            f"Table file not found: {expected_file}",
            {"expected_path": str(expected_file)}
        )

    return result

clifpy.utils.validator.check_table_presence

check_table_presence(df, table_name)

Check that a loaded DataFrame has rows and columns.

Parameters:

Name Type Description Default
df pd.DataFrame, pl.DataFrame, or pl.LazyFrame

Data to validate (already loaded)

required
table_name str

Name of the table

required
Source code in clifpy/utils/validator.py
def check_table_presence(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str
) -> DQAConformanceResult:
    """
    Check that a loaded DataFrame has rows and columns.

    Parameters
    ----------
    df : pd.DataFrame, pl.DataFrame, or pl.LazyFrame
        Data to validate (already loaded)
    table_name : str
        Name of the table
    """
    _logger.debug("check_table_presence: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_table_presence_polars(df, table_name)
    else:
        result = check_table_presence_duckdb(df, table_name)
    _logger.debug("check_table_presence: table '%s' — rows=%s, cols=%s",
                  table_name, result.metrics.get("row_count"), result.metrics.get("column_count"))
    return result

A.2 — Required Columns

clifpy.utils.validator.check_required_columns

check_required_columns(df, schema, table_name)

Check if all required columns are present.

Parameters:

Name Type Description Default
df pd.DataFrame, pl.DataFrame, or pl.LazyFrame

Data to validate (already loaded)

required
schema dict

Table schema containing required_columns

required
table_name str

Name of the table

required
Source code in clifpy/utils/validator.py
def check_required_columns(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> DQAConformanceResult:
    """
    Check if all required columns are present.

    Parameters
    ----------
    df : pd.DataFrame, pl.DataFrame, or pl.LazyFrame
        Data to validate (already loaded)
    schema : dict
        Table schema containing required_columns
    table_name : str
        Name of the table
    """
    _logger.debug("check_required_columns: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_required_columns_polars(df, schema, table_name)
    else:
        result = check_required_columns_duckdb(df, schema, table_name)
    if result.metrics.get("total_missing", 0) > 0:
        _logger.info("check_required_columns: table '%s' missing %d of %d required columns",
                     table_name, result.metrics["total_missing"], result.metrics.get("total_required", 0))
    _logger.debug("check_required_columns: table '%s' — required=%s, present=%s, missing=%s",
                  table_name, result.metrics.get("total_required"),
                  result.metrics.get("total_present"), result.metrics.get("total_missing"))
    return result

B.1 — Data Types

clifpy.utils.validator.check_column_dtypes

check_column_dtypes(df, schema, table_name)

Check if columns have correct data types.

Parameters:

Name Type Description Default
df pd.DataFrame, pl.DataFrame, or pl.LazyFrame

Data to validate (already loaded)

required
schema dict

Table schema

required
table_name str

Name of the table

required
Source code in clifpy/utils/validator.py
def check_column_dtypes(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> DQAConformanceResult:
    """
    Check if columns have correct data types.

    Parameters
    ----------
    df : pd.DataFrame, pl.DataFrame, or pl.LazyFrame
        Data to validate (already loaded)
    schema : dict
        Table schema
    table_name : str
        Name of the table
    """
    _logger.debug("check_column_dtypes: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_column_dtypes_polars(df, schema, table_name)
    else:
        result = check_column_dtypes_duckdb(df, schema, table_name)
    _logger.debug("check_column_dtypes: table '%s' — checked=%s, errors=%s, warnings=%s",
                  table_name, result.metrics.get("columns_checked"),
                  result.metrics.get("dtype_errors"), result.metrics.get("dtype_warnings"))
    return result

B.2 — Datetime Format

clifpy.utils.validator.check_datetime_format

check_datetime_format(df, schema, table_name, expected_tz='UTC')

Validate datetime columns are in correct format.

Source code in clifpy/utils/validator.py
def check_datetime_format(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str,
    expected_tz: str = 'UTC'
) -> DQAConformanceResult:
    """Validate datetime columns are in correct format."""
    _logger.debug("check_datetime_format: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_datetime_format_polars(df, schema, table_name, expected_tz)
    else:
        result = check_datetime_format_duckdb(df, schema, table_name, expected_tz)
    _logger.debug("check_datetime_format: table '%s' — columns_checked=%s",
                  table_name, result.metrics.get("datetime_columns_checked"))
    return result

B.3 — Lab Reference Units

clifpy.utils.validator.check_lab_reference_units

check_lab_reference_units(df, schema, table_name='labs')

Check if lab reference units match schema definitions.

Source code in clifpy/utils/validator.py
def check_lab_reference_units(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str = 'labs'
) -> DQAConformanceResult:
    """Check if lab reference units match schema definitions."""
    _logger.debug("check_lab_reference_units: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_lab_reference_units_polars(df, schema, table_name)
    else:
        result = check_lab_reference_units_duckdb(df, schema, table_name)
    _logger.debug("check_lab_reference_units: table '%s' — valid=%s, invalid_categories=%s",
                  table_name, result.metrics.get("valid_units"),
                  result.metrics.get("invalid_unit_categories"))
    return result

B.4 — Categorical Values

clifpy.utils.validator.check_categorical_values

check_categorical_values(df, schema, table_name)

Check if categorical values match mCIDE permissible values.

Source code in clifpy/utils/validator.py
def check_categorical_values(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> DQAConformanceResult:
    """Check if categorical values match mCIDE permissible values."""
    _logger.debug("check_categorical_values: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_categorical_values_polars(df, schema, table_name)
    else:
        result = check_categorical_values_duckdb(df, schema, table_name)
    _logger.debug("check_categorical_values: table '%s' — columns_checked=%s, columns_with_invalid=%s",
                  table_name, result.metrics.get("category_columns_checked"),
                  result.metrics.get("columns_with_invalid_values"))
    return result

B.5 — Category-to-Group Mapping

clifpy.utils.validator.check_category_group_mapping

check_category_group_mapping(df, schema, table_name)

Check if category-to-group mappings match schema definitions.

Source code in clifpy/utils/validator.py
def check_category_group_mapping(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> DQAConformanceResult:
    """Check if category-to-group mappings match schema definitions."""
    _logger.debug("check_category_group_mapping: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_category_group_mapping_polars(df, schema, table_name)
    else:
        result = check_category_group_mapping_duckdb(df, schema, table_name)
    _logger.debug("check_category_group_mapping: table '%s' — completed", table_name)
    return result

Completeness Checks

Completeness checks evaluate missing data, conditional requirements, and referential coverage.

A.1 — Missingness

clifpy.utils.validator.check_missingness

check_missingness(df, schema, table_name, error_threshold=50.0, warning_threshold=10.0)

Check missingness in required columns.

Parameters:

Name Type Description Default
df pd.DataFrame, pl.DataFrame, or pl.LazyFrame

Data to validate (already loaded)

required
schema dict

Table schema containing required_columns

required
table_name str

Name of the table

required
error_threshold float

Percent missing above which an error is raised

50.0
warning_threshold float

Percent missing above which a warning is raised

10.0

Returns:

Type Description
DQACompletenessResult

Result containing missingness statistics

Source code in clifpy/utils/validator.py
def check_missingness(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str,
    error_threshold: float = 50.0,
    warning_threshold: float = 10.0
) -> DQACompletenessResult:
    """
    Check missingness in required columns.

    Parameters
    ----------
    df : pd.DataFrame, pl.DataFrame, or pl.LazyFrame
        Data to validate (already loaded)
    schema : dict
        Table schema containing required_columns
    table_name : str
        Name of the table
    error_threshold : float
        Percent missing above which an error is raised
    warning_threshold : float
        Percent missing above which a warning is raised

    Returns
    -------
    DQACompletenessResult
        Result containing missingness statistics
    """
    _logger.debug("check_missingness: starting for table '%s' (error_threshold=%.1f%%, warning_threshold=%.1f%%)",
                  table_name, error_threshold, warning_threshold)
    if _ACTIVE_BACKEND == 'polars':
        result = check_missingness_polars(df, schema, table_name, error_threshold, warning_threshold)
    else:
        result = check_missingness_duckdb(df, schema, table_name, error_threshold, warning_threshold)
    if result.errors:
        for err in result.errors:
            _logger.info("check_missingness: table '%s' — %s", table_name, err["message"])
    _logger.debug("check_missingness: table '%s' — columns_checked=%s",
                  table_name, result.metrics.get("required_columns_checked"))
    return result

A.2 — Conditional Requirements

clifpy.utils.validator.check_conditional_requirements

check_conditional_requirements(df, table_name, conditions=None)

Check conditional required fields.

Source code in clifpy/utils/validator.py
def check_conditional_requirements(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    conditions: Optional[List[Dict[str, Any]]] = None
) -> DQACompletenessResult:
    """Check conditional required fields."""
    n_conditions = len(conditions) if conditions else 0
    _logger.debug("check_conditional_requirements: starting for table '%s' (%d explicit conditions)",
                  table_name, n_conditions)
    if _ACTIVE_BACKEND == 'polars':
        result = check_conditional_requirements_polars(df, table_name, conditions)
    else:
        result = check_conditional_requirements_duckdb(df, table_name, conditions)
    if result.warnings:
        _logger.info("check_conditional_requirements: table '%s' — %d violations found",
                     table_name, len(result.warnings))
    _logger.debug("check_conditional_requirements: table '%s' complete", table_name)
    return result

B — mCIDE Value Coverage

clifpy.utils.validator.check_mcide_value_coverage

check_mcide_value_coverage(df, schema, table_name)

Check if all mCIDE standardized values are present in the data.

Source code in clifpy/utils/validator.py
def check_mcide_value_coverage(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> DQACompletenessResult:
    """Check if all mCIDE standardized values are present in the data."""
    _logger.debug("check_mcide_value_coverage: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_mcide_value_coverage_polars(df, schema, table_name)
    else:
        result = check_mcide_value_coverage_duckdb(df, schema, table_name)
    _logger.debug("check_mcide_value_coverage: table '%s' — columns_checked=%s",
                  table_name, result.metrics.get("category_columns_checked"))
    return result

C.1 — Relational Integrity

clifpy.utils.validator.check_relational_integrity

check_relational_integrity(target_df, reference_df, target_table, reference_table, key_column)

Check bidirectional relational integrity between tables.

Runs the backend-specific check in both directions: - Forward (reference → target): What percentage of reference IDs appear in the target table? (e.g., "what % of hospitalizations have labs?") - Reverse (target → reference): What percentage of target IDs exist in the reference table? (e.g., "what % of lab hosp_ids are valid?")

Parameters:

Name Type Description Default
target_df DataFrame

The target table (e.g., labs).

required
reference_df DataFrame

The reference table (e.g., hospitalization).

required
target_table str

Name of the target table.

required
reference_table str

Name of the reference table.

required
key_column str

The shared key column (e.g., hospitalization_id).

required

Returns:

Type Description
DQACompletenessResult

Consolidated result with forward/reverse coverage metrics.

Source code in clifpy/utils/validator.py
def check_relational_integrity(
    target_df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    reference_df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    target_table: str,
    reference_table: str,
    key_column: str
) -> DQACompletenessResult:
    """Check bidirectional relational integrity between tables.

    Runs the backend-specific check in both directions:
    - **Forward** (reference → target): What percentage of reference IDs
      appear in the target table?  (e.g., "what % of hospitalizations
      have labs?")
    - **Reverse** (target → reference): What percentage of target IDs
      exist in the reference table?  (e.g., "what % of lab hosp_ids are
      valid?")

    Parameters
    ----------
    target_df : DataFrame
        The target table (e.g., labs).
    reference_df : DataFrame
        The reference table (e.g., hospitalization).
    target_table : str
        Name of the target table.
    reference_table : str
        Name of the reference table.
    key_column : str
        The shared key column (e.g., ``hospitalization_id``).

    Returns
    -------
    DQACompletenessResult
        Consolidated result with forward/reverse coverage metrics.
    """
    _logger.debug(
        "check_relational_integrity: '%s' <-> '%s' on key '%s'",
        target_table, reference_table, key_column,
    )
    result = DQACompletenessResult(
        "relational_integrity",
        f"{target_table}<->{reference_table}",
    )

    # Pick the right backend dispatcher
    _backend_fn = (check_relational_integrity_polars
                   if _ACTIVE_BACKEND == 'polars'
                   else check_relational_integrity_duckdb)

    try:
        # Forward: reference → target  (source=reference, ref=target)
        fwd = _backend_fn(
            reference_df, target_df, reference_table, target_table, key_column
        )
        # Reverse: target → reference  (source=target, ref=reference)
        rev = _backend_fn(
            target_df, reference_df, target_table, reference_table, key_column
        )

        result.metrics["forward_coverage_percent"] = fwd.metrics.get("coverage_percent", 0)
        result.metrics["forward_orphan_ids"] = fwd.metrics.get("orphan_ids", 0)
        result.metrics["forward_reference_unique_ids"] = fwd.metrics.get("source_unique_ids", 0)
        result.metrics["reverse_coverage_percent"] = rev.metrics.get("coverage_percent", 0)
        result.metrics["reverse_orphan_ids"] = rev.metrics.get("orphan_ids", 0)
        result.metrics["reverse_target_unique_ids"] = rev.metrics.get("source_unique_ids", 0)

        # Propagate warnings/errors from both directions
        for w in fwd.warnings:
            result.add_warning(w['message'], w.get("details", {}))
        for w in rev.warnings:
            result.add_warning(w['message'], w.get("details", {}))
        for e in fwd.errors:
            result.add_error(e['message'], e.get("details", {}))
        for e in rev.errors:
            result.add_error(e['message'], e.get("details", {}))

        # Info when both directions are clean (no errors AND no warnings)
        if fwd.passed and rev.passed and not fwd.warnings and not rev.warnings:
            result.add_info(
                f"Full bidirectional coverage for {key_column} between "
                f"{target_table} and {reference_table}"
            )

        _logger.info(
            "check_relational_integrity: '%s' <-> '%s' — "
            "fwd_coverage=%.1f%%, rev_coverage=%.1f%%",
            target_table, reference_table,
            result.metrics["forward_coverage_percent"],
            result.metrics["reverse_coverage_percent"],
        )

    except Exception as e:
        _logger.error(
            "Check 'relational_integrity' failed for '%s' <-> '%s': %s",
            target_table, reference_table, e,
        )
        result.add_error(f"Error checking relational integrity: {str(e)}")

    return result

Plausibility Checks

Plausibility checks validate logical consistency, temporal ordering, and clinical reasonableness.

A.1 — Temporal Ordering

clifpy.utils.validator.check_temporal_ordering

check_temporal_ordering(df, table_name, temporal_rules=None, warning_threshold=0.0, error_threshold=10.0)

Check that datetime pairs follow expected temporal ordering.

Source code in clifpy/utils/validator.py
def check_temporal_ordering(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    temporal_rules: Optional[List[Dict[str, str]]] = None,
    warning_threshold: float = 0.0,
    error_threshold: float = 10.0,
) -> DQAPlausibilityResult:
    """Check that datetime pairs follow expected temporal ordering."""
    _logger.debug("check_temporal_ordering: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_temporal_ordering_polars(df, table_name, temporal_rules,
                                                warning_threshold=warning_threshold,
                                                error_threshold=error_threshold)
    else:
        result = check_temporal_ordering_duckdb(df, table_name, temporal_rules,
                                                warning_threshold=warning_threshold,
                                                error_threshold=error_threshold)
    _logger.debug("check_temporal_ordering: table '%s' — pairs_checked=%s",
                  table_name, result.metrics.get("pairs_checked"))
    return result

A.2 — Numeric Range Plausibility

clifpy.utils.validator.check_numeric_range_plausibility

check_numeric_range_plausibility(df, table_name, outlier_config=None, warning_threshold=0.0, error_threshold=10.0)

Check numeric values are within plausible ranges.

Source code in clifpy/utils/validator.py
def check_numeric_range_plausibility(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    outlier_config: Optional[Dict[str, Any]] = None,
    warning_threshold: float = 0.0,
    error_threshold: float = 10.0,
) -> DQAPlausibilityResult:
    """Check numeric values are within plausible ranges."""
    _logger.debug("check_numeric_range_plausibility: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_numeric_range_plausibility_polars(df, table_name, outlier_config,
                                                         warning_threshold=warning_threshold,
                                                         error_threshold=error_threshold)
    else:
        result = check_numeric_range_plausibility_duckdb(df, table_name, outlier_config,
                                                         warning_threshold=warning_threshold,
                                                         error_threshold=error_threshold)
    _logger.debug("check_numeric_range_plausibility: table '%s' — columns_checked=%s",
                  table_name, result.metrics.get("columns_checked"))
    return result

A.3 — Field-Level Plausibility

clifpy.utils.validator.check_field_plausibility

check_field_plausibility(df, table_name, rules=None)

Check field-level plausibility constraints.

Source code in clifpy/utils/validator.py
def check_field_plausibility(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    rules: Optional[List[Dict[str, Any]]] = None,
) -> DQAPlausibilityResult:
    """Check field-level plausibility constraints."""
    _logger.debug("check_field_plausibility: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_field_plausibility_polars(df, table_name, rules)
    else:
        result = check_field_plausibility_duckdb(df, table_name, rules)
    _logger.debug("check_field_plausibility: table '%s' — rules_checked=%s",
                  table_name, result.metrics.get("rules_checked"))
    return result

A.4 — Medication Dose Unit Consistency

clifpy.utils.validator.check_medication_dose_unit_consistency

check_medication_dose_unit_consistency(df, table_name, warning_threshold=0.0, error_threshold=10.0)

Check medication dose unit consistency.

Source code in clifpy/utils/validator.py
def check_medication_dose_unit_consistency(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    warning_threshold: float = 0.0,
    error_threshold: float = 10.0,
) -> DQAPlausibilityResult:
    """Check medication dose unit consistency."""
    _logger.debug("check_medication_dose_unit_consistency: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_medication_dose_unit_consistency_polars(df, table_name,
                                                               warning_threshold=warning_threshold,
                                                               error_threshold=error_threshold)
    else:
        result = check_medication_dose_unit_consistency_duckdb(df, table_name,
                                                               warning_threshold=warning_threshold,
                                                               error_threshold=error_threshold)
    _logger.debug("check_medication_dose_unit_consistency: table '%s' — violations=%s",
                  table_name, result.metrics.get("unit_pattern_violations"))
    return result

B.1 — Cross-Table Temporal Plausibility

clifpy.utils.validator.check_cross_table_temporal_plausibility

check_cross_table_temporal_plausibility(target_df, hospitalization_df, target_table, time_columns, warning_threshold=0.0, error_threshold=10.0)

Check that datetime values fall within hospitalization bounds.

Source code in clifpy/utils/validator.py
def check_cross_table_temporal_plausibility(
    target_df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    hospitalization_df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    target_table: str,
    time_columns: List[str],
    warning_threshold: float = 0.0,
    error_threshold: float = 10.0,
) -> DQAPlausibilityResult:
    """Check that datetime values fall within hospitalization bounds."""
    _logger.debug("check_cross_table_temporal_plausibility: starting for table '%s'", target_table)
    if _ACTIVE_BACKEND == 'polars':
        result = check_cross_table_temporal_plausibility_polars(
            target_df, hospitalization_df, target_table, time_columns,
            warning_threshold=warning_threshold, error_threshold=error_threshold)
    else:
        result = check_cross_table_temporal_plausibility_duckdb(
            target_df, hospitalization_df, target_table, time_columns,
            warning_threshold=warning_threshold, error_threshold=error_threshold)
    _logger.debug("check_cross_table_temporal_plausibility: table '%s' complete", target_table)
    return result

C.1 — Overlapping Periods

clifpy.utils.validator.check_overlapping_periods

check_overlapping_periods(df, table_name, entity_col='hospitalization_id', start_col='in_dttm', end_col='out_dttm')

Check for overlapping time periods within entities.

Source code in clifpy/utils/validator.py
def check_overlapping_periods(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    entity_col: str = 'hospitalization_id',
    start_col: str = 'in_dttm',
    end_col: str = 'out_dttm',
) -> DQAPlausibilityResult:
    """Check for overlapping time periods within entities."""
    _logger.debug("check_overlapping_periods: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_overlapping_periods_polars(df, table_name, entity_col, start_col, end_col)
    else:
        result = check_overlapping_periods_duckdb(df, table_name, entity_col, start_col, end_col)
    _logger.debug("check_overlapping_periods: table '%s' — overlaps=%s",
                  table_name, result.metrics.get("overlapping_records"))
    return result

C.2 — Category Temporal Consistency

clifpy.utils.validator.check_category_temporal_consistency

check_category_temporal_consistency(df, schema, table_name, time_column=None, hosp_years=None)

Check category distribution consistency over time.

Source code in clifpy/utils/validator.py
def check_category_temporal_consistency(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str,
    time_column: Optional[str] = None,
    hosp_years: Optional[set] = None,
) -> DQAPlausibilityResult:
    """Check category distribution consistency over time."""
    _logger.debug("check_category_temporal_consistency: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_category_temporal_consistency_polars(df, schema, table_name, time_column, hosp_years)
    else:
        result = check_category_temporal_consistency_duckdb(df, schema, table_name, time_column, hosp_years)
    _logger.debug("check_category_temporal_consistency: table '%s' — columns_checked=%s",
                  table_name, result.metrics.get("category_columns_checked"))
    return result

D.1 — Duplicate Composite Keys

clifpy.utils.validator.check_duplicate_composite_keys

check_duplicate_composite_keys(df, table_name, composite_keys=None, schema=None, warning_threshold=0.0, error_threshold=10.0)

Check for duplicate composite keys.

Source code in clifpy/utils/validator.py
def check_duplicate_composite_keys(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    table_name: str,
    composite_keys: Optional[List[str]] = None,
    schema: Optional[Dict[str, Any]] = None,
    warning_threshold: float = 0.0,
    error_threshold: float = 10.0,
) -> DQAPlausibilityResult:
    """Check for duplicate composite keys."""
    _logger.debug("check_duplicate_composite_keys: starting for table '%s'", table_name)
    if _ACTIVE_BACKEND == 'polars':
        result = check_duplicate_composite_keys_polars(df, table_name, composite_keys, schema,
                                                       warning_threshold=warning_threshold,
                                                       error_threshold=error_threshold)
    else:
        result = check_duplicate_composite_keys_duckdb(df, table_name, composite_keys, schema,
                                                       warning_threshold=warning_threshold,
                                                       error_threshold=error_threshold)
    _logger.debug("check_duplicate_composite_keys: table '%s' — duplicates=%s",
                  table_name, result.metrics.get("duplicate_records"))
    return result

Cross-Table Checks

These checks operate across multiple loaded tables to validate relational and temporal consistency.

clifpy.utils.validator.run_relational_integrity_checks

run_relational_integrity_checks(tables)

Auto-detect and run relational integrity checks for loaded tables.

Reads FK rules from validation_rules.yaml and runs :func:check_relational_integrity for every applicable (table, fk_column) pair.

Parameters:

Name Type Description Default
tables list

Objects with .table_name (str) and .df (DataFrame) attributes — typically :class:BaseTable instances.

required

Returns:

Type Description
Dict[str, Dict[str, DQACompletenessResult]]

{table_name: {fk_column: DQACompletenessResult}}.

Source code in clifpy/utils/validator.py
def run_relational_integrity_checks(
    tables: list,
) -> Dict[str, Dict[str, DQACompletenessResult]]:
    """Auto-detect and run relational integrity checks for loaded tables.

    Reads FK rules from ``validation_rules.yaml`` and runs
    :func:`check_relational_integrity` for every applicable
    (table, fk_column) pair.

    Parameters
    ----------
    tables : list
        Objects with ``.table_name`` (str) and ``.df`` (DataFrame)
        attributes — typically :class:`BaseTable` instances.

    Returns
    -------
    Dict[str, Dict[str, DQACompletenessResult]]
        ``{table_name: {fk_column: DQACompletenessResult}}``.
    """
    _logger.info("run_relational_integrity_checks: starting with %d tables", len(tables))

    # Build lookup: table_name -> (DataFrame, schema_col_names)
    # Convert pandas DataFrames to Polars when the Polars backend is active,
    # matching the pattern used by run_conformance_checks / run_completeness_checks.
    lookup = {}
    for obj in tables:
        df = obj.df
        if _ACTIVE_BACKEND == 'polars' and isinstance(df, pd.DataFrame):
            _logger.debug("Converting pandas DataFrame to Polars for table '%s'", obj.table_name)
            df = pl.from_pandas(df)
        # Extract schema-defined column names (only check FK columns that belong
        # to the table's schema, not extra columns that happen to be in the data).
        schema = getattr(obj, 'schema', None) or {}
        schema_cols = {c['name'] for c in schema.get('columns', [])} if schema.get('columns') else None
        lookup[obj.table_name] = (df, schema_cols)

    # Load FK rules
    fk_rules = _load_validation_rules().get('relational_integrity', {})
    _logger.debug("run_relational_integrity_checks: loaded %d FK rules", len(fk_rules))

    results: Dict[str, Dict[str, DQACompletenessResult]] = {}

    for table_name, (df, schema_cols) in lookup.items():
        # Use schema columns when available; fall back to DataFrame columns
        if schema_cols is not None:
            col_names = schema_cols
        elif HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
            lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
            col_names = set(lf.collect_schema().names())
        else:
            col_names = set(df.columns.tolist())

        for fk_column, rule in fk_rules.items():
            if fk_column not in col_names:
                continue

            ref_table_name = rule['references_table']

            # Skip self-references
            if ref_table_name == table_name:
                _logger.debug(
                    "run_relational_integrity_checks: skipping self-ref "
                    "%s.%s -> %s", table_name, fk_column, ref_table_name,
                )
                continue

            # Skip if the reference table isn't loaded
            if ref_table_name not in lookup:
                _logger.debug(
                    "run_relational_integrity_checks: skipping %s.%s — "
                    "reference table '%s' not loaded",
                    table_name, fk_column, ref_table_name,
                )
                continue

            _logger.info(
                "run_relational_integrity_checks: checking %s.%s -> %s",
                table_name, fk_column, ref_table_name,
            )
            result = check_relational_integrity(
                target_df=df,
                reference_df=lookup[ref_table_name][0],
                target_table=table_name,
                reference_table=ref_table_name,
                key_column=fk_column,
            )
            results.setdefault(table_name, {})[fk_column] = result

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_relational_integrity_checks: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

clifpy.utils.validator.run_cross_table_completeness_checks

run_cross_table_completeness_checks(tables)

Run cross-table conditional completeness checks (K.5) on full DataFrames.

Parameters:

Name Type Description Default
tables list

Objects with .table_name and .df attributes.

required

Returns:

Type Description
Dict[str, Dict[str, DQACompletenessResult]]

Results keyed by target table name.

Source code in clifpy/utils/validator.py
def run_cross_table_completeness_checks(
    tables: list,
) -> Dict[str, Dict[str, DQACompletenessResult]]:
    """Run cross-table conditional completeness checks (K.5) on full DataFrames.

    Parameters
    ----------
    tables : list
        Objects with ``.table_name`` and ``.df`` attributes.

    Returns
    -------
    Dict[str, Dict[str, DQACompletenessResult]]
        Results keyed by **target** table name.
    """
    _logger.info("run_cross_table_completeness_checks: starting with %d tables", len(tables))

    ct_cond_rules = _load_validation_rules().get('cross_table_conditional_requirements', [])
    if not ct_cond_rules:
        return {}

    lookup = {}
    for obj in tables:
        tname = getattr(obj, 'table_name', '').replace('clif_', '')
        tdf = obj.df
        if _ACTIVE_BACKEND == 'polars' and isinstance(tdf, pd.DataFrame):
            tdf = pl.from_pandas(tdf)
        lookup[tname] = tdf

    results: Dict[str, Dict[str, DQACompletenessResult]] = {}

    for rule in ct_cond_rules:
        rule_key = f"{rule['source_column']}_{rule['target_column']}"
        source_table = rule['source_table']
        target_table = rule['target_table']
        join_col = rule['join_column']

        if source_table not in lookup or target_table not in lookup:
            continue

        src_df = lookup[source_table]
        tgt_df = lookup[target_table]
        match_values = [str(v).strip().lower() for v in rule['source_value']]

        # Get source IDs matching condition
        if HAS_POLARS and isinstance(src_df, (pl.DataFrame, pl.LazyFrame)):
            lf = src_df if isinstance(src_df, pl.LazyFrame) else src_df.lazy()
            if rule['source_column'] not in lf.collect_schema().names() or join_col not in lf.collect_schema().names():
                continue
            matched = (
                lf.filter(
                    pl.col(rule['source_column']).cast(pl.Utf8).str.strip_chars().str.to_lowercase().is_in(match_values)
                )
                .select(pl.col(join_col).drop_nulls().unique())
                .collect(streaming=True)
            )
            source_ids = set(matched[join_col].to_list())
        else:
            if rule['source_column'] not in src_df.columns or join_col not in src_df.columns:
                continue
            mask = src_df[rule['source_column']].astype(str).str.strip().str.lower().isin(match_values)
            source_ids = set(src_df.loc[mask, join_col].dropna().unique().tolist())

        # Get target IDs with non-null target column (also exclude empty strings)
        if HAS_POLARS and isinstance(tgt_df, (pl.DataFrame, pl.LazyFrame)):
            lf = tgt_df if isinstance(tgt_df, pl.LazyFrame) else tgt_df.lazy()
            schema_names = lf.collect_schema().names()
            if rule['target_column'] not in schema_names or join_col not in schema_names:
                continue
            tcol = pl.col(rule['target_column'])
            not_null_filter = tcol.is_not_null()
            dtype = lf.collect_schema()[rule['target_column']]
            if dtype == pl.Utf8 or dtype == pl.String:
                not_null_filter = not_null_filter & (tcol.str.strip_chars().str.len_chars() > 0)
            matched = (
                lf.filter(not_null_filter)
                .select(pl.col(join_col).drop_nulls().unique())
                .collect(streaming=True)
            )
            target_ids = set(matched[join_col].to_list())
        else:
            if rule['target_column'] not in tgt_df.columns or join_col not in tgt_df.columns:
                continue
            target_col = tgt_df[rule['target_column']]
            mask = target_col.notna()
            if hasattr(target_col, 'str') and target_col.dtype == 'object':
                mask = mask & (target_col.astype(str).str.strip() != '')
            target_ids = set(tgt_df.loc[mask, join_col].dropna().unique().tolist())

        result = DQACompletenessResult(
            "cross_table_conditional_completeness",
            f"{source_table}->{target_table}",
        )

        if not source_ids:
            result.add_info(
                f"No {rule['source_column']} = {rule['source_value']} found in "
                f"{source_table}; cross-table conditional check not triggered"
            )
            results.setdefault(target_table, {})[rule_key] = result
            continue

        missing_ids = source_ids - target_ids
        total = len(source_ids)
        missing_count = len(missing_ids)
        pct = round(missing_count / total * 100, 1) if total > 0 else 0

        result.metrics["total_matching_source"] = total
        result.metrics["missing_in_target"] = missing_count
        result.metrics["coverage_percent"] = round(100 - pct, 2)

        if missing_ids:
            sample = sorted(list(missing_ids))[:10]
            result.add_warning(
                f"{missing_count}/{total} patients discharged as {rule['source_value']} "
                f"in {source_table} are missing {rule['target_column']} in {target_table} "
                f"({pct}% missing)",
                {
                    "column": rule['target_column'],
                    "missing_count": missing_count,
                    "total_matching": total,
                    "percent_missing": pct,
                    "sample_ids": sample,
                    "source_condition": f"{rule['source_column']} in {rule['source_value']}",
                },
            )
        else:
            result.add_info(
                f"All {total} patients discharged as {rule['source_value']} in "
                f"{source_table} have {rule['target_column']} in {target_table}"
            )

        results.setdefault(target_table, {})[rule_key] = result

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_cross_table_completeness_checks: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

clifpy.utils.validator.run_cross_table_plausibility_checks

run_cross_table_plausibility_checks(tables, plausibility_thresholds=None)

Run cross-table plausibility checks (B.1).

Parameters:

Name Type Description Default
tables list

Objects with .table_name and .df attributes.

required
plausibility_thresholds dict

Override default plausibility thresholds per check.

None

Returns:

Type Description
Dict[str, Dict[str, DQAPlausibilityResult]]

{table_name: {"cross_table_temporal": DQAPlausibilityResult}}.

Source code in clifpy/utils/validator.py
def run_cross_table_plausibility_checks(
    tables: list,
    plausibility_thresholds: Optional[Dict[str, Dict[str, float]]] = None,
) -> Dict[str, Dict[str, DQAPlausibilityResult]]:
    """Run cross-table plausibility checks (B.1).

    Parameters
    ----------
    tables : list
        Objects with ``.table_name`` and ``.df`` attributes.
    plausibility_thresholds : dict, optional
        Override default plausibility thresholds per check.

    Returns
    -------
    Dict[str, Dict[str, DQAPlausibilityResult]]
        ``{table_name: {"cross_table_temporal": DQAPlausibilityResult}}``.
    """
    _logger.info("run_cross_table_plausibility_checks: starting with %d tables", len(tables))

    # Merge caller overrides with defaults
    thresholds = {k: dict(v) for k, v in _DEFAULT_PLAUSIBILITY_THRESHOLDS.items()}
    if plausibility_thresholds:
        for check_name, overrides in plausibility_thresholds.items():
            if check_name in thresholds:
                thresholds[check_name].update(overrides)
            else:
                thresholds[check_name] = overrides

    lookup = {}
    for obj in tables:
        tdf = obj.df
        if _ACTIVE_BACKEND == 'polars' and isinstance(tdf, pd.DataFrame):
            tdf = pl.from_pandas(tdf)
        lookup[obj.table_name] = tdf

    if 'hospitalization' not in lookup:
        _logger.info("Hospitalization table not loaded; skipping cross-table plausibility")
        return {}

    hosp_df = lookup['hospitalization']
    results: Dict[str, Dict[str, DQAPlausibilityResult]] = {}

    for tbl_name, tdf in lookup.items():
        if tbl_name == 'hospitalization':
            continue
        time_cols = _CROSS_TABLE_TIME_COLUMNS.get(tbl_name, [])
        if not time_cols:
            continue

        if HAS_POLARS and isinstance(tdf, (pl.DataFrame, pl.LazyFrame)):
            lf = tdf if isinstance(tdf, pl.LazyFrame) else tdf.lazy()
            actual_cols = lf.collect_schema().names()
        else:
            actual_cols = tdf.columns.tolist()

        available_time_cols = [c for c in time_cols if c in actual_cols]
        if not available_time_cols or 'hospitalization_id' not in actual_cols:
            continue

        result = check_cross_table_temporal_plausibility(
            tdf, hosp_df, tbl_name, available_time_cols,
            **thresholds['cross_table_temporal']
        )
        results.setdefault(tbl_name, {})["cross_table_temporal"] = result

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_cross_table_plausibility_checks: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

Orchestration

High-level functions that run groups of checks or the full DQA suite.

Single-Table Orchestration

clifpy.utils.validator.run_conformance_checks

run_conformance_checks(df, schema, table_name)

Run all conformance checks on a table.

Parameters:

Name Type Description Default
df DataFrame

The data to validate

required
schema dict

Schema for the table

required
table_name str

Name of the table

required

Returns:

Type Description
Dict[str, DQAConformanceResult]

Dictionary of check results keyed by check type

Source code in clifpy/utils/validator.py
def run_conformance_checks(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str
) -> Dict[str, DQAConformanceResult]:
    """
    Run all conformance checks on a table.

    Parameters
    ----------
    df : DataFrame
        The data to validate
    schema : dict
        Schema for the table
    table_name : str
        Name of the table

    Returns
    -------
    Dict[str, DQAConformanceResult]
        Dictionary of check results keyed by check type
    """
    _logger.info("Running conformance checks for table '%s' using %s backend", table_name, _ACTIVE_BACKEND)

    # Convert pandas DataFrame to Polars if the active backend is Polars
    if _ACTIVE_BACKEND == 'polars' and isinstance(df, pd.DataFrame):
        _logger.debug("Converting pandas DataFrame to Polars for table '%s'", table_name)
        df = pl.from_pandas(df)

    results = {}

    results['table_presence'] = check_table_presence(df, table_name)
    gc.collect()

    results['required_columns'] = check_required_columns(df, schema, table_name)
    gc.collect()

    results['column_dtypes'] = check_column_dtypes(df, schema, table_name)
    gc.collect()

    results['datetime_format'] = check_datetime_format(df, schema, table_name)
    gc.collect()

    if table_name == 'labs':
        results['lab_reference_units'] = check_lab_reference_units(df, schema, table_name)
        gc.collect()

    results['categorical_values'] = check_categorical_values(df, schema, table_name)
    gc.collect()

    results['category_group_mapping'] = check_category_group_mapping(df, schema, table_name)
    gc.collect()

    passed = sum(1 for r in results.values() if r.passed)
    failed = len(results) - passed
    _logger.info("Conformance checks complete for '%s': %d passed, %d failed", table_name, passed, failed)

    return results

clifpy.utils.validator.run_completeness_checks

run_completeness_checks(df, schema, table_name, error_threshold=50.0, warning_threshold=10.0)

Run all completeness checks on a table.

Parameters:

Name Type Description Default
df DataFrame

The data to validate

required
schema dict

Schema for the table

required
table_name str

Name of the table

required
error_threshold float

Percent missing above which an error is raised

50.0
warning_threshold float

Percent missing above which a warning is raised

10.0

Returns:

Type Description
Dict[str, DQACompletenessResult]

Dictionary of check results keyed by check type

Source code in clifpy/utils/validator.py
def run_completeness_checks(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str,
    error_threshold: float = 50.0,
    warning_threshold: float = 10.0
) -> Dict[str, DQACompletenessResult]:
    """
    Run all completeness checks on a table.

    Parameters
    ----------
    df : DataFrame
        The data to validate
    schema : dict
        Schema for the table
    table_name : str
        Name of the table
    error_threshold : float
        Percent missing above which an error is raised
    warning_threshold : float
        Percent missing above which a warning is raised

    Returns
    -------
    Dict[str, DQACompletenessResult]
        Dictionary of check results keyed by check type
    """
    _logger.info("Running completeness checks for table '%s' using %s backend", table_name, _ACTIVE_BACKEND)

    # Convert pandas DataFrame to Polars if the active backend is Polars
    if _ACTIVE_BACKEND == 'polars' and isinstance(df, pd.DataFrame):
        _logger.debug("Converting pandas DataFrame to Polars for table '%s'", table_name)
        df = pl.from_pandas(df)

    results = {}

    results['missingness'] = check_missingness(
        df, schema, table_name, error_threshold, warning_threshold
    )
    gc.collect()

    results['conditional_requirements'] = check_conditional_requirements(df, table_name)
    gc.collect()

    results['mcide_value_coverage'] = check_mcide_value_coverage(df, schema, table_name)
    gc.collect()

    passed = sum(1 for r in results.values() if r.passed)
    failed = len(results) - passed
    _logger.info("Completeness checks complete for '%s': %d passed, %d failed", table_name, passed, failed)

    return results

clifpy.utils.validator.run_plausibility_checks

run_plausibility_checks(df, schema, table_name, hosp_years=None, plausibility_thresholds=None)

Run all single-table plausibility checks on a table.

Parameters:

Name Type Description Default
df DataFrame

The data to validate

required
schema dict

Schema for the table

required
table_name str

Name of the table

required
plausibility_thresholds dict

Override default plausibility thresholds per check.

None

Returns:

Type Description
Dict[str, DQAPlausibilityResult]

Dictionary of check results keyed by check type

Source code in clifpy/utils/validator.py
def run_plausibility_checks(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: str,
    hosp_years: Optional[set] = None,
    plausibility_thresholds: Optional[Dict[str, Dict[str, float]]] = None,
) -> Dict[str, DQAPlausibilityResult]:
    """
    Run all single-table plausibility checks on a table.

    Parameters
    ----------
    df : DataFrame
        The data to validate
    schema : dict
        Schema for the table
    table_name : str
        Name of the table
    plausibility_thresholds : dict, optional
        Override default plausibility thresholds per check.

    Returns
    -------
    Dict[str, DQAPlausibilityResult]
        Dictionary of check results keyed by check type
    """
    _logger.info("Running plausibility checks for table '%s' using %s backend", table_name, _ACTIVE_BACKEND)

    # Merge caller overrides with defaults
    thresholds = {k: dict(v) for k, v in _DEFAULT_PLAUSIBILITY_THRESHOLDS.items()}
    if plausibility_thresholds:
        for check_name, overrides in plausibility_thresholds.items():
            if check_name in thresholds:
                thresholds[check_name].update(overrides)
            else:
                thresholds[check_name] = overrides

    if _ACTIVE_BACKEND == 'polars' and isinstance(df, pd.DataFrame):
        _logger.debug("Converting pandas DataFrame to Polars for table '%s'", table_name)
        df = pl.from_pandas(df)

    results = {}

    # A.1 Temporal ordering
    results['temporal_ordering'] = check_temporal_ordering(
        df, table_name, **thresholds['temporal_ordering'])
    gc.collect()

    # A.2 Numeric range plausibility
    results['numeric_range_plausibility'] = check_numeric_range_plausibility(
        df, table_name, **thresholds['numeric_range_plausibility'])
    gc.collect()

    # A.3 Field-level plausibility
    results['field_plausibility'] = check_field_plausibility(df, table_name)
    gc.collect()

    # A.4 Medication dose unit consistency (only for med tables)
    if table_name in ('medication_admin_continuous', 'medication_admin_intermittent'):
        results['medication_dose_unit_consistency'] = check_medication_dose_unit_consistency(
            df, table_name, **thresholds['medication_dose_unit_consistency'])
        gc.collect()

    # C.1 Overlapping periods
    overlap_rules = _load_validation_rules().get('overlapping_periods', {}).get(table_name)
    if overlap_rules:
        results['overlapping_periods'] = check_overlapping_periods(
            df, table_name,
            entity_col=overlap_rules.get('entity_column', 'hospitalization_id'),
            start_col=overlap_rules.get('start_column', 'in_dttm'),
            end_col=overlap_rules.get('end_column', 'out_dttm'),
        )
        gc.collect()

    # C.2 Category temporal consistency
    results['category_temporal_consistency'] = check_category_temporal_consistency(df, schema, table_name, hosp_years=hosp_years)
    gc.collect()

    # D.1 Duplicate composite keys
    results['duplicate_composite_keys'] = check_duplicate_composite_keys(
        df, table_name, schema=schema, **thresholds['duplicate_composite_keys'])
    gc.collect()

    passed = sum(1 for r in results.values() if r.passed)
    failed = len(results) - passed
    _logger.info("Plausibility checks complete for '%s': %d passed, %d failed", table_name, passed, failed)
    return results

clifpy.utils.validator.run_full_dqa

run_full_dqa(df, schema=None, table_name='', tables=None, error_threshold=50.0, warning_threshold=10.0, hosp_years=None, plausibility_thresholds=None)

Run the complete DQA suite on a single table.

Orchestrates conformance checks, completeness checks, plausibility checks, and — when tables is provided — auto-detected relational integrity and cross-table plausibility checks.

Parameters:

Name Type Description Default
df DataFrame

The data to validate.

required
schema dict

Schema for the table. When None (the default), the schema is loaded automatically from the built-in schemas using table_name.

None
table_name str

Name of the table.

''
tables list

Objects with .table_name and .df attributes (e.g. :class:BaseTable instances). When provided, relational integrity and cross-table plausibility checks are run.

None
error_threshold float

Percent missing above which an error is raised (default 50).

50.0
warning_threshold float

Percent missing above which a warning is raised (default 10).

10.0
hosp_years set

Pre-extracted hospitalization years for P.6 temporal consistency. When provided, skips scanning the hospitalization table to extract years.

None
plausibility_thresholds dict

Override default plausibility thresholds per check.

None

Returns:

Type Description
Dict[str, Any]

Keys: table_name, backend, conformance, completeness, relational, plausibility.

Source code in clifpy/utils/validator.py
def run_full_dqa(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Optional[Dict[str, Any]] = None,
    table_name: str = "",
    tables: Optional[list] = None,
    error_threshold: float = 50.0,
    warning_threshold: float = 10.0,
    hosp_years: Optional[set] = None,
    plausibility_thresholds: Optional[Dict[str, Dict[str, float]]] = None,
) -> Dict[str, Any]:
    """Run the complete DQA suite on a single table.

    Orchestrates conformance checks, completeness checks, plausibility
    checks, and — when *tables* is provided — auto-detected relational
    integrity and cross-table plausibility checks.

    Parameters
    ----------
    df : DataFrame
        The data to validate.
    schema : dict, optional
        Schema for the table.  When *None* (the default), the schema is
        loaded automatically from the built-in schemas using *table_name*.
    table_name : str
        Name of the table.
    tables : list, optional
        Objects with ``.table_name`` and ``.df`` attributes (e.g.
        :class:`BaseTable` instances).  When provided, relational
        integrity and cross-table plausibility checks are run.
    error_threshold : float
        Percent missing above which an error is raised (default 50).
    warning_threshold : float
        Percent missing above which a warning is raised (default 10).
    hosp_years : set, optional
        Pre-extracted hospitalization years for P.6 temporal consistency.
        When provided, skips scanning the hospitalization table to
        extract years.
    plausibility_thresholds : dict, optional
        Override default plausibility thresholds per check.

    Returns
    -------
    Dict[str, Any]
        Keys: ``table_name``, ``backend``, ``conformance``,
        ``completeness``, ``relational``, ``plausibility``.
    """
    if not table_name:
        raise ValueError("table_name is required")
    if schema is None:
        schema = _load_schema(table_name)
        if schema is None:
            raise FileNotFoundError(
                f"No built-in schema found for table '{table_name}'. "
                "Pass a schema dict explicitly."
            )
    _logger.info("Starting full DQA for table: %s", table_name)

    results: Dict[str, Any] = {
        'table_name': table_name,
        'backend': _ACTIVE_BACKEND,
        'conformance': {},
        'completeness': {},
        'relational': {},
        'plausibility': {},
    }

    results['conformance'] = {
        k: v.to_dict()
        for k, v in run_conformance_checks(df, schema, table_name).items()
    }

    results['completeness'] = {
        k: v.to_dict()
        for k, v in run_completeness_checks(
            df, schema, table_name, error_threshold, warning_threshold
        ).items()
    }

    if tables is not None:
        rel_results = run_relational_integrity_checks(tables)
        if table_name in rel_results:
            results['relational'] = {
                k: v.to_dict()
                for k, v in rel_results[table_name].items()
            }

    # Extract hospitalization years for P.6 temporal consistency context
    # (skip scan if hosp_years was provided by caller)
    if hosp_years is None and tables is not None:
        for obj in tables:
            tname = getattr(obj, 'table_name', '').replace('clif_', '')
            if tname == 'hospitalization':
                hdf = obj.df
                if HAS_POLARS and isinstance(hdf, (pl.DataFrame, pl.LazyFrame)):
                    hlf = hdf if isinstance(hdf, pl.LazyFrame) else hdf.lazy()
                    hcols = hlf.collect_schema().names()
                    if 'admission_dttm' in hcols:
                        hosp_years = set(
                            hlf.select(pl.col('admission_dttm').dt.year().alias('yr'))
                            .filter(pl.col('yr').is_not_null())
                            .unique()
                            .collect()
                            .get_column('yr')
                            .to_list()
                        )
                elif isinstance(hdf, pd.DataFrame) and 'admission_dttm' in hdf.columns:
                    hosp_years = set(
                        int(y) for y in hdf['admission_dttm'].dropna().dt.year.unique()
                    )
                break

    # Plausibility checks (single-table)
    results['plausibility'] = {
        k: v.to_dict()
        for k, v in run_plausibility_checks(
            df, schema, table_name, hosp_years=hosp_years,
            plausibility_thresholds=plausibility_thresholds,
        ).items()
    }

    # Cross-table plausibility checks (when tables provided)
    if tables is not None:
        cross_plaus = run_cross_table_plausibility_checks(
            tables, plausibility_thresholds=plausibility_thresholds)
        if table_name in cross_plaus:
            for k, v in cross_plaus[table_name].items():
                results['plausibility'][k] = v.to_dict()

    gc.collect()
    _logger.info("Completed full DQA for table: %s", table_name)
    return results

Cache-Based Cross-Table Pipeline

For memory-optimized cross-table validation, extract lightweight caches and run checks without keeping full DataFrames in memory.

clifpy.utils.validator.extract_cross_table_cache

extract_cross_table_cache(table_obj)

Extract a lightweight cache from a single table object.

Used by the optimised pipeline in CLIF-TableOne's runner to avoid keeping full DataFrames in memory for cross-table checks.

Parameters:

Name Type Description Default
table_obj BaseTable

Object with .table_name, .df, and .schema attributes.

required

Returns:

Type Description
dict

Keys: table_name, fk_ids, schema_cols, temporal_df, hosp_bounds_df, hosp_years.

Source code in clifpy/utils/validator.py
def extract_cross_table_cache(table_obj) -> Dict[str, Any]:
    """Extract a lightweight cache from a single table object.

    Used by the optimised pipeline in CLIF-TableOne's runner to avoid
    keeping full DataFrames in memory for cross-table checks.

    Parameters
    ----------
    table_obj : BaseTable
        Object with ``.table_name``, ``.df``, and ``.schema`` attributes.

    Returns
    -------
    dict
        Keys: ``table_name``, ``fk_ids``, ``schema_cols``,
        ``temporal_df``, ``hosp_bounds_df``, ``hosp_years``.
    """
    tname = getattr(table_obj, 'table_name', '').replace('clif_', '')
    df = table_obj.df
    schema = getattr(table_obj, 'schema', None) or {}

    # Schema-defined column names
    schema_cols = (
        {c['name'] for c in schema.get('columns', [])}
        if schema.get('columns') else None
    )

    # Determine actual columns in the DataFrame
    if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
        lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
        actual_cols = set(lf.collect_schema().names())
    else:
        actual_cols = set(df.columns.tolist())

    # Use schema cols when available; fall back to actual cols
    col_names = schema_cols if schema_cols is not None else actual_cols

    # --- FK ID sets ---
    fk_rules = _load_validation_rules().get('relational_integrity', {})
    fk_ids: Dict[str, set] = {}
    for fk_column in fk_rules:
        if fk_column not in col_names or fk_column not in actual_cols:
            continue
        if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
            lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
            id_series = (
                lf.select(pl.col(fk_column).drop_nulls().unique())
                .collect(streaming=True)
            )
            fk_ids[fk_column] = set(id_series[fk_column].to_list())
        else:
            fk_ids[fk_column] = set(df[fk_column].dropna().unique().tolist())

    # --- Temporal subset for cross-table plausibility ---
    time_cols = _CROSS_TABLE_TIME_COLUMNS.get(tname, [])
    temporal_df = None
    if time_cols and 'hospitalization_id' in actual_cols:
        needed = ['hospitalization_id'] + [c for c in time_cols if c in actual_cols]
        if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
            lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
            temporal_df = lf.select(needed).collect()
        else:
            temporal_df = df[needed].copy()

    # --- Hospitalization-specific caches ---
    hosp_bounds_df = None
    hosp_years = None
    if tname == 'hospitalization':
        if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
            lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
            hosp_cols_available = set(lf.collect_schema().names())
            bound_cols = [c for c in ['hospitalization_id', 'admission_dttm', 'discharge_dttm']
                          if c in hosp_cols_available]
            if 'hospitalization_id' in bound_cols:
                hosp_bounds_df = lf.select(bound_cols).collect()
            if 'admission_dttm' in hosp_cols_available:
                hosp_years = set(
                    lf.select(pl.col('admission_dttm').dt.year().alias('yr'))
                    .filter(pl.col('yr').is_not_null())
                    .unique()
                    .collect()
                    .get_column('yr')
                    .to_list()
                )
        elif isinstance(df, pd.DataFrame):
            bound_cols = [c for c in ['hospitalization_id', 'admission_dttm', 'discharge_dttm']
                          if c in df.columns]
            if 'hospitalization_id' in bound_cols:
                hosp_bounds_df = df[bound_cols].copy()
            if 'admission_dttm' in df.columns:
                hosp_years = set(
                    int(y) for y in df['admission_dttm'].dropna().dt.year.unique()
                )

    # --- Cross-table conditional requirements cache ---
    ct_cond_rules = _load_validation_rules().get('cross_table_conditional_requirements', [])
    conditional_source_ids: Dict[str, set] = {}
    conditional_target_ids: Dict[str, set] = {}

    for rule in ct_cond_rules:
        rule_key = f"{rule['source_column']}_{rule['target_column']}"
        join_col = rule['join_column']

        if tname == rule['source_table'] and join_col in actual_cols and rule['source_column'] in actual_cols:
            # Extract join_column IDs where source_column matches source_value (case-insensitive)
            match_values = [str(v).strip().lower() for v in rule['source_value']]
            if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
                lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
                matched = (
                    lf.filter(
                        pl.col(rule['source_column']).cast(pl.Utf8).str.strip_chars().str.to_lowercase().is_in(match_values)
                    )
                    .select(pl.col(join_col).drop_nulls().unique())
                    .collect(streaming=True)
                )
                conditional_source_ids[rule_key] = set(matched[join_col].to_list())
            else:
                mask = df[rule['source_column']].astype(str).str.strip().str.lower().isin(match_values)
                conditional_source_ids[rule_key] = set(df.loc[mask, join_col].dropna().unique().tolist())

        if tname == rule['target_table'] and join_col in actual_cols and rule['target_column'] in actual_cols:
            # Extract join_column IDs where target_column is not null (and not empty string)
            if HAS_POLARS and isinstance(df, (pl.DataFrame, pl.LazyFrame)):
                lf = df if isinstance(df, pl.LazyFrame) else df.lazy()
                tcol = pl.col(rule['target_column'])
                # Handle both datetime and string columns: not null AND not empty string
                not_null_filter = tcol.is_not_null()
                dtype = lf.collect_schema()[rule['target_column']]
                if dtype == pl.Utf8 or dtype == pl.String:
                    not_null_filter = not_null_filter & (tcol.str.strip_chars().str.len_chars() > 0)
                matched = (
                    lf.filter(not_null_filter)
                    .select(pl.col(join_col).drop_nulls().unique())
                    .collect(streaming=True)
                )
                conditional_target_ids[rule_key] = set(matched[join_col].to_list())
            else:
                target_col = df[rule['target_column']]
                mask = target_col.notna()
                # If string dtype, also exclude empty/whitespace-only strings
                if hasattr(target_col, 'str') and target_col.dtype == 'object':
                    mask = mask & (target_col.astype(str).str.strip() != '')
                conditional_target_ids[rule_key] = set(df.loc[mask, join_col].dropna().unique().tolist())

    cache = {
        'table_name': tname,
        'fk_ids': fk_ids,
        'schema_cols': schema_cols,
        'temporal_df': temporal_df,
        'hosp_bounds_df': hosp_bounds_df,
        'hosp_years': hosp_years,
        'conditional_source_ids': conditional_source_ids,
        'conditional_target_ids': conditional_target_ids,
    }
    _logger.info(
        "extract_cross_table_cache: table='%s', fk_keys=%s, temporal=%s, hosp_bounds=%s, "
        "cond_source_keys=%s, cond_target_keys=%s",
        tname, list(fk_ids.keys()), temporal_df is not None, hosp_bounds_df is not None,
        list(conditional_source_ids.keys()), list(conditional_target_ids.keys()),
    )
    return cache

clifpy.utils.validator.run_relational_integrity_checks_from_cache

run_relational_integrity_checks_from_cache(caches)

Run relational integrity checks using pre-extracted caches.

Equivalent to :func:run_relational_integrity_checks but operates on Python set objects (FK ID sets) instead of scanning full DataFrames.

Parameters:

Name Type Description Default
caches dict

{table_name: cache_dict} as returned by :func:extract_cross_table_cache.

required

Returns:

Type Description
Dict[str, Dict[str, DQACompletenessResult]]

Same structure as :func:run_relational_integrity_checks.

Source code in clifpy/utils/validator.py
def run_relational_integrity_checks_from_cache(
    caches: Dict[str, Dict[str, Any]],
) -> Dict[str, Dict[str, DQACompletenessResult]]:
    """Run relational integrity checks using pre-extracted caches.

    Equivalent to :func:`run_relational_integrity_checks` but operates on
    Python ``set`` objects (FK ID sets) instead of scanning full DataFrames.

    Parameters
    ----------
    caches : dict
        ``{table_name: cache_dict}`` as returned by
        :func:`extract_cross_table_cache`.

    Returns
    -------
    Dict[str, Dict[str, DQACompletenessResult]]
        Same structure as :func:`run_relational_integrity_checks`.
    """
    _logger.info(
        "run_relational_integrity_checks_from_cache: starting with %d cached tables",
        len(caches),
    )

    fk_rules = _load_validation_rules().get('relational_integrity', {})
    results: Dict[str, Dict[str, DQACompletenessResult]] = {}

    for table_name, cache in caches.items():
        col_names = cache['schema_cols'] if cache['schema_cols'] is not None else set(cache['fk_ids'].keys())

        for fk_column, rule in fk_rules.items():
            if fk_column not in col_names:
                continue

            ref_table_name = rule['references_table']

            if ref_table_name == table_name:
                continue

            if ref_table_name not in caches:
                _logger.debug(
                    "run_relational_integrity_checks_from_cache: skipping %s.%s — "
                    "reference table '%s' not cached",
                    table_name, fk_column, ref_table_name,
                )
                continue

            # Get the FK ID sets
            source_ids = cache['fk_ids'].get(fk_column)
            if source_ids is None:
                continue

            ref_cache = caches[ref_table_name]
            ref_ids = ref_cache['fk_ids'].get(fk_column)
            if ref_ids is None:
                # Reference table doesn't have this FK column cached — skip
                continue

            _logger.info(
                "run_relational_integrity_checks_from_cache: checking %s.%s -> %s",
                table_name, fk_column, ref_table_name,
            )

            # Build the bidirectional result (same structure as check_relational_integrity)
            result = DQACompletenessResult(
                "relational_integrity",
                f"{table_name}<->{ref_table_name}",
            )

            try:
                # Forward: reference -> target (source=ref, ref=target)
                fwd_orphans = ref_ids - source_ids
                fwd_total = len(ref_ids)
                fwd_orphan_count = len(fwd_orphans)
                fwd_coverage = ((fwd_total - fwd_orphan_count) / fwd_total * 100) if fwd_total > 0 else 100

                # Reverse: target -> reference (source=target, ref=reference)
                rev_orphans = source_ids - ref_ids
                rev_total = len(source_ids)
                rev_orphan_count = len(rev_orphans)
                rev_coverage = ((rev_total - rev_orphan_count) / rev_total * 100) if rev_total > 0 else 100

                result.metrics["forward_coverage_percent"] = round(fwd_coverage, 2)
                result.metrics["forward_orphan_ids"] = fwd_orphan_count
                result.metrics["forward_reference_unique_ids"] = fwd_total
                result.metrics["reverse_coverage_percent"] = round(rev_coverage, 2)
                result.metrics["reverse_orphan_ids"] = rev_orphan_count
                result.metrics["reverse_target_unique_ids"] = rev_total

                # Forward warnings/errors
                if fwd_orphans:
                    result.add_warning(
                        f"{fwd_orphan_count}/{fwd_total} {fk_column} values in {ref_table_name} "
                        f"not found in {table_name} ({round(fwd_coverage, 1)}% coverage)",
                        {"orphan_count": fwd_orphan_count,
                         "sample_orphan_ids": list(fwd_orphans)[:10],
                         "coverage_percent": round(fwd_coverage, 2)}
                    )

                # Reverse warnings/errors
                if rev_orphans:
                    result.add_warning(
                        f"{rev_orphan_count}/{rev_total} {fk_column} values in {table_name} "
                        f"not found in {ref_table_name} ({round(rev_coverage, 1)}% coverage)",
                        {"orphan_count": rev_orphan_count,
                         "sample_orphan_ids": list(rev_orphans)[:10],
                         "coverage_percent": round(rev_coverage, 2)}
                    )

                if not fwd_orphans and not rev_orphans:
                    result.add_info(
                        f"Full bidirectional coverage for {fk_column} between "
                        f"{table_name} and {ref_table_name}"
                    )

                _logger.info(
                    "run_relational_integrity_checks_from_cache: '%s' <-> '%s' — "
                    "fwd_coverage=%.1f%%, rev_coverage=%.1f%%",
                    table_name, ref_table_name, fwd_coverage, rev_coverage,
                )

            except Exception as e:
                _logger.error(
                    "Cached relational check failed for '%s' <-> '%s': %s",
                    table_name, ref_table_name, e,
                )
                result.add_error(f"Error checking relational integrity: {str(e)}")

            results.setdefault(table_name, {})[fk_column] = result

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_relational_integrity_checks_from_cache: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

clifpy.utils.validator.run_cross_table_completeness_checks_from_cache

run_cross_table_completeness_checks_from_cache(caches)

Run cross-table conditional completeness checks (K.5) from caches.

For each YAML rule in cross_table_conditional_requirements, computes the set of join-column IDs that satisfy the source condition but are missing the required target column value.

Parameters:

Name Type Description Default
caches dict

{table_name: cache_dict} as returned by :func:extract_cross_table_cache.

required

Returns:

Type Description
Dict[str, Dict[str, DQACompletenessResult]]

Results keyed by target table name, then by a descriptive check key.

Source code in clifpy/utils/validator.py
def run_cross_table_completeness_checks_from_cache(
    caches: Dict[str, Dict[str, Any]],
) -> Dict[str, Dict[str, DQACompletenessResult]]:
    """Run cross-table conditional completeness checks (K.5) from caches.

    For each YAML rule in ``cross_table_conditional_requirements``, computes
    the set of join-column IDs that satisfy the source condition but are
    missing the required target column value.

    Parameters
    ----------
    caches : dict
        ``{table_name: cache_dict}`` as returned by
        :func:`extract_cross_table_cache`.

    Returns
    -------
    Dict[str, Dict[str, DQACompletenessResult]]
        Results keyed by **target** table name, then by a descriptive
        check key.
    """
    _logger.info(
        "run_cross_table_completeness_checks_from_cache: starting with %d cached tables",
        len(caches),
    )

    ct_cond_rules = _load_validation_rules().get('cross_table_conditional_requirements', [])
    if not ct_cond_rules:
        return {}

    results: Dict[str, Dict[str, DQACompletenessResult]] = {}

    for rule in ct_cond_rules:
        rule_key = f"{rule['source_column']}_{rule['target_column']}"
        source_table = rule['source_table']
        target_table = rule['target_table']

        # Both tables must be cached
        if source_table not in caches or target_table not in caches:
            _logger.debug(
                "K.5: skipping rule '%s' — source '%s' or target '%s' not cached",
                rule_key, source_table, target_table,
            )
            continue

        source_cache = caches[source_table]
        target_cache = caches[target_table]

        source_ids = source_cache.get('conditional_source_ids', {}).get(rule_key)
        target_ids = target_cache.get('conditional_target_ids', {}).get(rule_key)

        result = DQACompletenessResult(
            "cross_table_conditional_completeness",
            f"{source_table}->{target_table}",
        )

        if source_ids is None or target_ids is None:
            result.add_info(
                "No cross-table conditional requirements applicable"
            )
            results.setdefault(target_table, {})[rule_key] = result
            continue

        if not source_ids:
            result.add_info(
                f"No {rule['source_column']} = {rule['source_value']} found in "
                f"{source_table}; cross-table conditional check not triggered"
            )
            results.setdefault(target_table, {})[rule_key] = result
            continue

        missing_ids = source_ids - target_ids
        total = len(source_ids)
        missing_count = len(missing_ids)
        pct = round(missing_count / total * 100, 1) if total > 0 else 0

        result.metrics["total_matching_source"] = total
        result.metrics["missing_in_target"] = missing_count
        result.metrics["coverage_percent"] = round(100 - pct, 2)

        if missing_ids:
            sample = sorted(list(missing_ids))[:10]
            result.add_warning(
                f"{missing_count}/{total} patients discharged as {rule['source_value']} "
                f"in {source_table} are missing {rule['target_column']} in {target_table} "
                f"({pct}% missing)",
                {
                    "column": rule['target_column'],
                    "missing_count": missing_count,
                    "total_matching": total,
                    "percent_missing": pct,
                    "sample_ids": sample,
                    "source_condition": f"{rule['source_column']} in {rule['source_value']}",
                },
            )
        else:
            result.add_info(
                f"All {total} patients discharged as {rule['source_value']} in "
                f"{source_table} have {rule['target_column']} in {target_table}"
            )

        results.setdefault(target_table, {})[rule_key] = result
        _logger.info(
            "K.5: %s->%s rule '%s': %d/%d missing (%.1f%%)",
            source_table, target_table, rule_key, missing_count, total, pct,
        )

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_cross_table_completeness_checks_from_cache: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

clifpy.utils.validator.run_cross_table_plausibility_checks_from_cache

run_cross_table_plausibility_checks_from_cache(caches, plausibility_thresholds=None)

Run cross-table plausibility checks using pre-extracted caches.

Equivalent to :func:run_cross_table_plausibility_checks but uses cached temporal subset DataFrames and hospitalization bounds instead of full DataFrames.

Parameters:

Name Type Description Default
caches dict

{table_name: cache_dict} as returned by :func:extract_cross_table_cache.

required
plausibility_thresholds dict

Override default plausibility thresholds per check.

None

Returns:

Type Description
Dict[str, Dict[str, DQAPlausibilityResult]]

Same structure as :func:run_cross_table_plausibility_checks.

Source code in clifpy/utils/validator.py
def run_cross_table_plausibility_checks_from_cache(
    caches: Dict[str, Dict[str, Any]],
    plausibility_thresholds: Optional[Dict[str, Dict[str, float]]] = None,
) -> Dict[str, Dict[str, DQAPlausibilityResult]]:
    """Run cross-table plausibility checks using pre-extracted caches.

    Equivalent to :func:`run_cross_table_plausibility_checks` but uses
    cached temporal subset DataFrames and hospitalization bounds instead
    of full DataFrames.

    Parameters
    ----------
    caches : dict
        ``{table_name: cache_dict}`` as returned by
        :func:`extract_cross_table_cache`.
    plausibility_thresholds : dict, optional
        Override default plausibility thresholds per check.

    Returns
    -------
    Dict[str, Dict[str, DQAPlausibilityResult]]
        Same structure as :func:`run_cross_table_plausibility_checks`.
    """
    _logger.info(
        "run_cross_table_plausibility_checks_from_cache: starting with %d cached tables",
        len(caches),
    )

    # Merge caller overrides with defaults
    thresholds = {k: dict(v) for k, v in _DEFAULT_PLAUSIBILITY_THRESHOLDS.items()}
    if plausibility_thresholds:
        for check_name, overrides in plausibility_thresholds.items():
            if check_name in thresholds:
                thresholds[check_name].update(overrides)
            else:
                thresholds[check_name] = overrides

    # Need hospitalization bounds
    hosp_cache = caches.get('hospitalization')
    if hosp_cache is None or hosp_cache.get('hosp_bounds_df') is None:
        _logger.info("Hospitalization cache not available; skipping cross-table plausibility")
        return {}

    hosp_bounds_df = hosp_cache['hosp_bounds_df']
    # Ensure hosp_bounds_df matches active backend
    if _ACTIVE_BACKEND == 'polars' and isinstance(hosp_bounds_df, pd.DataFrame):
        hosp_bounds_df = pl.from_pandas(hosp_bounds_df)

    results: Dict[str, Dict[str, DQAPlausibilityResult]] = {}

    for tbl_name, cache in caches.items():
        if tbl_name == 'hospitalization':
            continue

        temporal_df = cache.get('temporal_df')
        if temporal_df is None:
            continue

        # Ensure temporal_df matches active backend
        if _ACTIVE_BACKEND == 'polars' and isinstance(temporal_df, pd.DataFrame):
            temporal_df = pl.from_pandas(temporal_df)

        time_cols = _CROSS_TABLE_TIME_COLUMNS.get(tbl_name, [])
        if not time_cols:
            continue

        # Determine available time columns in the cached temporal subset
        if HAS_POLARS and isinstance(temporal_df, (pl.DataFrame, pl.LazyFrame)):
            actual_cols = (temporal_df.collect_schema().names()
                          if isinstance(temporal_df, pl.LazyFrame)
                          else temporal_df.columns)
        else:
            actual_cols = temporal_df.columns.tolist()

        available_time_cols = [c for c in time_cols if c in actual_cols]
        if not available_time_cols or 'hospitalization_id' not in actual_cols:
            continue

        result = check_cross_table_temporal_plausibility(
            temporal_df, hosp_bounds_df, tbl_name, available_time_cols,
            **thresholds['cross_table_temporal']
        )
        results.setdefault(tbl_name, {})["cross_table_temporal"] = result

    checked = sum(len(v) for v in results.values())
    _logger.info(
        "run_cross_table_plausibility_checks_from_cache: completed %d checks across %d tables",
        checked, len(results),
    )
    return results

Report Generation

clifpy.utils.report_generator.collect_dqa_issues

collect_dqa_issues(validation_data)

Collect errors, warnings, and info messages from run_full_dqa output.

Returns (category_scores, all_issues) where each issue is a dict with category, check_type, severity ('error'/'warning'/'info'), message, details, plus enriched fields: rule_code, rule_description, column_field.

Source code in clifpy/utils/report_generator.py
def collect_dqa_issues(validation_data: Dict[str, Any]):
    """Collect errors, warnings, and info messages from run_full_dqa output.

    Returns (category_scores, all_issues) where each issue is a dict with
    category, check_type, severity ('error'/'warning'/'info'), message, details,
    plus enriched fields: rule_code, rule_description, column_field.
    """
    category_scores = {}
    all_issues: List[Dict[str, Any]] = []

    for category in DQA_CATEGORIES:
        checks = validation_data.get(category, {})
        if not checks:
            continue
        for check_name, d in checks.items():
            for err in d['errors']:
                issue = {
                    'category': category,
                    'check_type': d['check_type'],
                    'severity': 'error',
                    'message': err.get('message', ''),
                    'details': err.get('details', {}),
                }
                enriched = enrich_issue(issue, check_key=check_name)
                if enriched is not None:
                    all_issues.append(enriched)
            for warn in d['warnings']:
                issue = {
                    'category': category,
                    'check_type': d['check_type'],
                    'severity': 'warning',
                    'message': warn.get('message', ''),
                    'details': warn.get('details', {}),
                }
                enriched = enrich_issue(issue, check_key=check_name)
                if enriched is not None:
                    all_issues.append(enriched)
            for info_msg in d.get('info', []):
                issue = {
                    'category': category,
                    'check_type': d['check_type'],
                    'severity': 'info',
                    'message': info_msg.get('message', ''),
                    'details': info_msg.get('details', {}),
                }
                enriched = enrich_issue(issue, check_key=check_name)
                if enriched is not None:
                    all_issues.append(enriched)

    # Compute scores from enriched findings so summary matches detail counts
    for category in DQA_CATEGORIES:
        cat_issues = [i for i in all_issues if i['category'] == category]
        if cat_issues:
            cat_passed = sum(1 for i in cat_issues if i['severity'] in ('info', 'warning'))
            category_scores[category] = (cat_passed, len(cat_issues))

    return category_scores, all_issues

clifpy.utils.report_generator.generate_validation_pdf

generate_validation_pdf(validation_data, table_name, output_path, site_name=None, feedback=None)

Generate a PDF report from DQA validation results.

Parameters:

Name Type Description Default
validation_data dict

Output from run_full_dqa (keys: conformance, completeness, relational, plausibility).

required
table_name str

Name of the table.

required
output_path str

Path where PDF should be saved.

required
site_name str

Name of the site/hospital.

None
feedback dict

User feedback with 'user_decisions' keyed by error_id.

None

Returns:

Type Description
str

Path to generated PDF file.

Source code in clifpy/utils/report_generator.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
def generate_validation_pdf(validation_data: Dict[str, Any],
                            table_name: str, output_path: str,
                            site_name: Optional[str] = None,
                            feedback: Optional[Dict[str, Any]] = None) -> str:
    """
    Generate a PDF report from DQA validation results.

    Parameters
    ----------
    validation_data : dict
        Output from run_full_dqa (keys: conformance, completeness,
        relational, plausibility).
    table_name : str
        Name of the table.
    output_path : str
        Path where PDF should be saved.
    site_name : str, optional
        Name of the site/hospital.
    feedback : dict, optional
        User feedback with 'user_decisions' keyed by error_id.

    Returns
    -------
    str
        Path to generated PDF file.
    """
    category_scores, all_issues = collect_dqa_issues(validation_data)

    # Build feedback lookup: error_id -> decision dict
    feedback_lookup: Dict[str, Dict[str, Any]] = {}
    if feedback and feedback.get('user_decisions'):
        feedback_lookup = feedback['user_decisions']

    # Build set of rejected error IDs so we can adjust summary counts
    rejected_ids: set = {
        eid for eid, d in feedback_lookup.items()
        if d.get('decision') == 'rejected'
    }

    total_passed = sum(p for p, _ in category_scores.values())
    total_checks = sum(t for _, t in category_scores.values())
    error_count = sum(1 for i in all_issues if i['severity'] == 'error')
    warning_count = sum(1 for i in all_issues if i['severity'] == 'warning')

    # Adjust counts: rejected errors no longer count as errors
    if rejected_ids:
        rejected_error_count = sum(
            1 for i in all_issues
            if i['severity'] == 'error' and _make_error_id(i) in rejected_ids
        )
        error_count -= rejected_error_count
        total_passed += rejected_error_count

    # --- Build PDF ---
    doc = SimpleDocTemplate(output_path, pagesize=letter)
    story = []
    styles = getSampleStyleSheet()

    primary_color = colors.HexColor('#1F4E79')
    text_dark = colors.HexColor('#2C3E50')
    text_medium = colors.HexColor('#5D6D7E')
    header_bg = colors.HexColor('#F5F6FA')
    pass_bg = colors.HexColor('#E8F5E8')
    fail_bg = colors.HexColor('#FFEAEA')
    warn_bg = colors.HexColor('#FFF3E0')

    title_style = ParagraphStyle(
        'CustomTitle', parent=styles['Heading1'],
        fontSize=22, textColor=primary_color, spaceAfter=24,
        alignment=TA_CENTER, fontName='Helvetica-Bold',
    )
    heading_style = ParagraphStyle(
        'CustomHeading', parent=styles['Heading2'],
        fontSize=14, textColor=text_dark, spaceAfter=10,
        spaceBefore=16, fontName='Helvetica-Bold',
    )
    timestamp_style = ParagraphStyle(
        'TimestampStyle', parent=styles['Normal'],
        fontSize=8, textColor=text_medium, alignment=1,
        fontName='Helvetica',
    )

    # Cell styles for issue detail tables
    cell_style = ParagraphStyle(
        'CellStyle', parent=styles['Normal'],
        fontSize=7, leading=9, fontName='Helvetica',
        textColor=text_dark,
    )
    cell_bold_style = ParagraphStyle(
        'CellBoldStyle', parent=cell_style,
        fontName='Helvetica-Bold',
    )

    # Timestamp
    ts = Table(
        [[Paragraph(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", timestamp_style)]],
        colWidths=[7.5 * inch],
    )
    ts.setStyle(TableStyle([
        ('ALIGN', (0, 0), (0, 0), 'RIGHT'),
        ('TOPPADDING', (0, 0), (0, 0), -24),
        ('BOTTOMPADDING', (0, 0), (0, 0), 2),
    ]))
    story.append(ts)

    # Title
    title_text = f"{site_name + ' ' if site_name else ''}CLIF DQA Report Card"
    story.append(Paragraph(title_text, title_style))
    story.append(Paragraph(f"{table_name.title()} Table", heading_style))
    story.append(Spacer(1, 0.2 * inch))

    # --- Feedback banner (only when feedback with decisions exists) ---
    has_feedback = bool(feedback_lookup)
    if has_feedback:
        n_accepted = sum(1 for d in feedback_lookup.values() if d.get('decision') == 'accepted')
        n_rejected = sum(1 for d in feedback_lookup.values() if d.get('decision') == 'rejected')
        if n_accepted > 0 or n_rejected > 0:
            from datetime import datetime as _dt
            _raw_ts = feedback.get('timestamp', '')
            try:
                fb_ts = _dt.fromisoformat(_raw_ts).strftime('%Y-%m-%d %H:%M')
            except (ValueError, TypeError):
                fb_ts = _raw_ts
            banner_style = ParagraphStyle(
                'FeedbackBanner', parent=styles['Normal'],
                fontSize=8, textColor=colors.HexColor('#1F4E79'),
                fontName='Helvetica',
            )
            banner_text = (
                f"<i>This report was updated based on feedback provided on {fb_ts}. "
                f"Accepted: {n_accepted} | Rejected: {n_rejected}</i>"
            )
            banner_tbl = Table(
                [[Paragraph(banner_text, banner_style)]],
                colWidths=[7.5 * inch],
            )
            banner_tbl.setStyle(TableStyle([
                ('BACKGROUND', (0, 0), (0, 0), colors.HexColor('#E8F0FE')),
                ('TOPPADDING', (0, 0), (0, 0), 6),
                ('BOTTOMPADDING', (0, 0), (0, 0), 6),
                ('LEFTPADDING', (0, 0), (0, 0), 10),
                ('RIGHTPADDING', (0, 0), (0, 0), 10),
            ]))
            story.append(banner_tbl)
            story.append(Spacer(1, 0.15 * inch))

    # --- DQA Summary Table ---
    story.append(Paragraph("DQA Summary", heading_style))
    summary_header = ['Category', 'Non-Error', 'Total', 'Errors', 'Warnings']
    summary_rows = [summary_header]
    for category in DQA_CATEGORIES:
        if category not in category_scores:
            continue
        passed, total = category_scores[category]
        cat_issues = [i for i in all_issues if i['category'] == category]
        cat_errors = sum(1 for i in cat_issues if i['severity'] == 'error')
        cat_warnings = sum(1 for i in cat_issues if i['severity'] == 'warning')
        # Adjust for rejected errors in this category
        if rejected_ids:
            cat_rejected = sum(
                1 for i in cat_issues
                if i['severity'] == 'error' and _make_error_id(i) in rejected_ids
            )
            cat_errors -= cat_rejected
            passed += cat_rejected
        summary_rows.append([category.title(), str(passed), str(total),
                             str(cat_errors), str(cat_warnings)])
    summary_rows.append(['Overall', str(total_passed), str(total_checks),
                         str(error_count), str(warning_count)])

    summary_tbl = Table(summary_rows, colWidths=[2 * inch, 0.8 * inch, 0.8 * inch,
                                                  0.8 * inch, 0.8 * inch])
    tbl_style = [
        ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
        ('FONTSIZE', (0, 0), (-1, -1), 9),
        ('BACKGROUND', (0, 0), (-1, 0), header_bg),
        ('TEXTCOLOR', (0, 0), (-1, -1), text_dark),
        ('GRID', (0, 0), (-1, -1), 0.5, colors.HexColor('#DADADA')),
        ('TOPPADDING', (0, 0), (-1, -1), 6),
        ('BOTTOMPADDING', (0, 0), (-1, -1), 6),
        ('LEFTPADDING', (0, 0), (-1, -1), 10),
        ('RIGHTPADDING', (0, 0), (-1, -1), 10),
    ]
    # Color-code error/warning cells
    for row_idx in range(1, len(summary_rows)):
        errors_val = int(summary_rows[row_idx][3])
        warnings_val = int(summary_rows[row_idx][4])
        if errors_val > 0:
            tbl_style.append(('BACKGROUND', (3, row_idx), (3, row_idx), fail_bg))
        else:
            tbl_style.append(('BACKGROUND', (3, row_idx), (3, row_idx), pass_bg))
        if warnings_val > 0:
            tbl_style.append(('BACKGROUND', (4, row_idx), (4, row_idx), warn_bg))
        else:
            tbl_style.append(('BACKGROUND', (4, row_idx), (4, row_idx), pass_bg))
    summary_tbl.setStyle(TableStyle(tbl_style))
    story.append(summary_tbl)
    story.append(Spacer(1, 0.3 * inch))

    # --- Data Profile Table ---
    table_stats = validation_data.get('table_stats', [])
    if table_stats:
        story.append(Paragraph("Data Profile", heading_style))
        total_rows = validation_data.get('total_rows', 0)
        profile_subtitle = ParagraphStyle(
            'ProfileSubtitle', parent=styles['Normal'],
            fontSize=9, textColor=text_medium, fontName='Helvetica',
        )
        story.append(Paragraph(f"Total Rows: {total_rows:,}", profile_subtitle))
        story.append(Spacer(1, 0.1 * inch))

        profile_header = ['Column', 'Dtype', 'Null', 'Null%', 'Unique', 'Min', 'Max']
        profile_rows = [profile_header]
        for s in table_stats:
            profile_rows.append([
                s['column'], s['dtype'],
                f"{s['null_count']:,}",
                f"{s['null_pct']:.1f}%", f"{s['unique']:,}",
                s.get('min') or '', s.get('max') or '',
            ])

        profile_col_widths = [1.6 * inch, 0.9 * inch,
                              0.6 * inch, 0.6 * inch, 0.6 * inch,
                              1.3 * inch, 1.3 * inch]
        profile_tbl = Table(profile_rows, colWidths=profile_col_widths)

        profile_style = [
            ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
            ('FONTSIZE', (0, 0), (-1, -1), 7),
            ('BACKGROUND', (0, 0), (-1, 0), header_bg),
            ('TEXTCOLOR', (0, 0), (-1, -1), text_dark),
            ('GRID', (0, 0), (-1, -1), 0.5, colors.HexColor('#DADADA')),
            ('TOPPADDING', (0, 0), (-1, -1), 3),
            ('BOTTOMPADDING', (0, 0), (-1, -1), 3),
            ('LEFTPADDING', (0, 0), (-1, -1), 4),
            ('RIGHTPADDING', (0, 0), (-1, -1), 4),
            # Right-align numeric columns (Non-Null, Null, Null%, Unique)
            ('ALIGN', (2, 0), (-1, -1), 'RIGHT'),
        ]
        # Color-code Null% cells
        amber_bg = colors.HexColor('#FFF3E0')
        red_light_bg = colors.HexColor('#FFEAEA')
        for row_idx, s in enumerate(table_stats, 1):
            if s['null_pct'] > 50:
                profile_style.append(('BACKGROUND', (3, row_idx), (3, row_idx), red_light_bg))
            elif s['null_pct'] > 10:
                profile_style.append(('BACKGROUND', (3, row_idx), (3, row_idx), amber_bg))

        profile_tbl.setStyle(TableStyle(profile_style))
        story.append(profile_tbl)
        story.append(Spacer(1, 0.3 * inch))

    # --- Per-category issue details as structured tables ---
    rejected_bg = colors.HexColor('#E0E0E0')
    rejected_text = colors.HexColor('#999999')

    if all_issues:
        story.append(PageBreak())
        story.append(Paragraph(f"Issue Details ({len(all_issues)})", heading_style))

        # Column widths — add status column only when feedback exists
        if has_feedback:
            detail_col_widths = [1.0 * inch, 0.5 * inch, 1.7 * inch,
                                 1.1 * inch, 0.5 * inch, 2.2 * inch, 0.5 * inch]
        else:
            detail_col_widths = [1.0 * inch, 0.5 * inch, 1.7 * inch,
                                 1.1 * inch, 0.5 * inch, 2.7 * inch]

        for category in DQA_CATEGORIES:
            cat_issues = [i for i in all_issues if i['category'] == category]
            if not cat_issues:
                continue

            story.append(Paragraph(f"{category.title()} ({len(cat_issues)})", heading_style))

            # Header row
            header_row = [
                Paragraph('<b>metric</b>', cell_bold_style),
                Paragraph('<b>rule</b>', cell_bold_style),
                Paragraph('<b>rule_description</b>', cell_bold_style),
                Paragraph('<b>column_field</b>', cell_bold_style),
                Paragraph('<b>severity</b>', cell_bold_style),
                Paragraph('<b>finding</b>', cell_bold_style),
            ]
            if has_feedback:
                header_row.append(Paragraph('<b>status</b>', cell_bold_style))
            table_data = [header_row]

            for issue in cat_issues:
                severity_upper = issue['severity'].upper()
                finding_text = truncate_comment(issue.get('finding', issue['message']))
                # Build finding cell: text + optional sparkline for temporal checks
                spark_width = 2.0 * inch if has_feedback else 2.5 * inch
                yearly_counts = issue.get('details', {}).get('yearly_counts')
                if yearly_counts:
                    finding_cell = [
                        Paragraph(escape(finding_text), cell_style),
                        Spacer(1, 2),
                        YearlySparkBar(yearly_counts, width=spark_width, height=16),
                    ]
                else:
                    finding_cell = Paragraph(escape(finding_text), cell_style)
                row = [
                    Paragraph(escape(category), cell_style),
                    Paragraph(escape(issue.get('rule_code', '')), cell_style),
                    Paragraph(escape(issue.get('rule_description', '')), cell_style),
                    Paragraph(escape(issue.get('column_field', 'NA')), cell_style),
                    Paragraph(escape(severity_upper), cell_style),
                    finding_cell,
                ]
                if has_feedback:
                    if issue['severity'] == 'error':
                        error_id = _make_error_id(issue)
                        decision_info = feedback_lookup.get(error_id, {})
                        status_text = decision_info.get('decision', '').upper()
                        row.append(Paragraph(escape(status_text), cell_style))
                    else:
                        row.append(Paragraph('', cell_style))
                table_data.append(row)

            detail_tbl = Table(table_data, colWidths=detail_col_widths,
                               repeatRows=1)

            # Base table style
            detail_style = [
                ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
                ('FONTSIZE', (0, 0), (-1, -1), 7),
                ('BACKGROUND', (0, 0), (-1, 0), header_bg),
                ('TEXTCOLOR', (0, 0), (-1, -1), text_dark),
                ('GRID', (0, 0), (-1, -1), 0.5, colors.HexColor('#DADADA')),
                ('TOPPADDING', (0, 0), (-1, -1), 3),
                ('BOTTOMPADDING', (0, 0), (-1, -1), 3),
                ('LEFTPADDING', (0, 0), (-1, -1), 4),
                ('RIGHTPADDING', (0, 0), (-1, -1), 4),
                ('VALIGN', (0, 0), (-1, -1), 'TOP'),
            ]

            # Color-code rows by severity, override with grey for rejected
            for row_idx, issue in enumerate(cat_issues, 1):
                error_id = _make_error_id(issue)
                decision = feedback_lookup.get(error_id, {}).get('decision', '')
                if decision == 'rejected':
                    detail_style.append(('BACKGROUND', (0, row_idx), (-1, row_idx), rejected_bg))
                    detail_style.append(('TEXTCOLOR', (0, row_idx), (-1, row_idx), rejected_text))
                elif issue['severity'] == 'error':
                    detail_style.append(('BACKGROUND', (0, row_idx), (-1, row_idx), fail_bg))
                elif issue['severity'] == 'warning':
                    detail_style.append(('BACKGROUND', (0, row_idx), (-1, row_idx), warn_bg))

            detail_tbl.setStyle(TableStyle(detail_style))
            story.append(detail_tbl)

            # Add note about monthly trend CSVs for temporal consistency checks
            temporal_cols = sorted({
                i['column_field'] for i in cat_issues
                if i.get('check_type') == 'category_temporal_consistency'
                and i.get('column_field')
            })
            if temporal_cols:
                file_list = ", ".join(
                    f"{table_name}_{col}_monthly.csv" for col in temporal_cols
                )
                note_text = (
                    f"<i>P.6 Category temporal consistency — monthly breakdown available at: "
                    f"clifpy/monthly_trends/ ({file_list})</i>"
                )
                note_style = ParagraphStyle(
                    'NoteStyle', parent=styles['Normal'],
                    fontSize=7, textColor=colors.HexColor('#555555'),
                )
                story.append(Paragraph(note_text, note_style))

            story.append(Spacer(1, 0.2 * inch))
    else:
        story.append(Paragraph("No validation issues found!", styles['Normal']))

    doc.build(story)
    return output_path

clifpy.utils.report_generator.generate_text_report

generate_text_report(validation_data, table_name, output_path, site_name=None)

Generate a plain-text DQA report.

Parameters match generate_validation_pdf.

Source code in clifpy/utils/report_generator.py
def generate_text_report(validation_data: Dict[str, Any],
                         table_name: str, output_path: str,
                         site_name: Optional[str] = None) -> str:
    """
    Generate a plain-text DQA report.

    Parameters match generate_validation_pdf.
    """
    category_scores, all_issues = collect_dqa_issues(validation_data)
    total_passed = sum(p for p, _ in category_scores.values())
    total_checks = sum(t for _, t in category_scores.values())
    error_count = sum(1 for i in all_issues if i['severity'] == 'error')
    warning_count = sum(1 for i in all_issues if i['severity'] == 'warning')

    lines = []
    lines.append("=" * 120)
    lines.append("CLIF 2.1 DQA VALIDATION REPORT")
    lines.append(f"{table_name.upper()} TABLE")
    lines.append("=" * 120)
    lines.append("")
    if site_name:
        lines.append(f"Site: {site_name}")
    lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    lines.append("")

    # DQA Summary
    lines.append("-" * 120)
    lines.append("DQA SUMMARY")
    lines.append("-" * 120)
    lines.append(f"  {'Category':20s}  {'Passed':>6s}  {'Total':>5s}  {'Errors':>6s}  {'Warnings':>8s}")
    lines.append(f"  {'-'*20}  {'-'*6}  {'-'*5}  {'-'*6}  {'-'*8}")
    for category in DQA_CATEGORIES:
        if category not in category_scores:
            continue
        passed, total = category_scores[category]
        cat_issues = [i for i in all_issues if i['category'] == category]
        cat_errors = sum(1 for i in cat_issues if i['severity'] == 'error')
        cat_warnings = sum(1 for i in cat_issues if i['severity'] == 'warning')
        lines.append(f"  {category.title():20s}  {passed:>6d}  {total:>5d}  {cat_errors:>6d}  {cat_warnings:>8d}")
    lines.append(f"  {'Overall':20s}  {total_passed:>6d}  {total_checks:>5d}  {error_count:>6d}  {warning_count:>8d}")
    lines.append("")

    # Data Profile
    table_stats = validation_data.get('table_stats', [])
    if table_stats:
        lines.append("-" * 120)
        lines.append("DATA PROFILE")
        lines.append("-" * 120)
        total_rows = validation_data.get('total_rows', 0)
        lines.append(f"  Total Rows: {total_rows:,}")
        lines.append("")
        w_col, w_dtype, w_null, w_pct, w_uniq, w_min, w_max = 25, 12, 8, 8, 10, 20, 20
        hdr = (f"  {'Column':<{w_col}}"
               f"{'Dtype':<{w_dtype}}"
               f"{'Null':>{w_null}}"
               f"{'Null%':>{w_pct}}"
               f"{'Unique':>{w_uniq}}"
               f"  {'Min':<{w_min}}"
               f"{'Max':<{w_max}}")
        lines.append(hdr)
        lines.append("  " + "-" * (w_col + w_dtype + w_null + w_pct + w_uniq + 2 + w_min + w_max))
        for s in table_stats:
            col_name = s['column']
            if len(col_name) > w_col - 2:
                col_name = col_name[:w_col - 4] + '..'
            col_min = s.get('min') or ''
            col_max = s.get('max') or ''
            lines.append(
                f"  {col_name:<{w_col}}"
                f"{s['dtype']:<{w_dtype}}"
                f"{s['null_count']:>{w_null},}"
                f"{s['null_pct']:>{w_pct}.1f}%"
                f"{s['unique']:>{w_uniq},}"
                f"  {col_min:<{w_min}}"
                f"{col_max:<{w_max}}"
            )
        lines.append("")

    # Issue details as tabular text
    if all_issues:
        lines.append("=" * 120)
        lines.append(f"ISSUE DETAILS ({len(all_issues)})")
        lines.append("=" * 120)

        # Column widths for text alignment
        w_metric, w_rule, w_desc, w_col, w_sev = 14, 6, 30, 18, 10

        for category in DQA_CATEGORIES:
            cat_issues = [i for i in all_issues if i['category'] == category]
            if not cat_issues:
                continue

            lines.append("")
            lines.append(f"-- {category.title()} ({len(cat_issues)}) --")
            lines.append("")

            # Header
            hdr = (f"  {'metric':<{w_metric}}"
                   f"{'rule':<{w_rule}}"
                   f"{'rule_description':<{w_desc}}"
                   f"{'column_field':<{w_col}}"
                   f"{'severity':<{w_sev}}"
                   f"finding")
            lines.append(hdr)
            lines.append("  " + "-" * 116)

            for issue in cat_issues:
                severity_upper = issue['severity'].upper()
                rule_code = issue.get('rule_code', '')
                rule_desc = issue.get('rule_description', '')
                col_field = issue.get('column_field', 'NA')
                finding = issue.get('finding', issue['message'])

                # Truncate long fields for text display
                if len(rule_desc) > w_desc - 2:
                    rule_desc = rule_desc[:w_desc - 4] + '..'
                if len(col_field) > w_col - 2:
                    col_field = col_field[:w_col - 4] + '..'

                line = (f"  {category:<{w_metric}}"
                        f"{rule_code:<{w_rule}}"
                        f"{rule_desc:<{w_desc}}"
                        f"{col_field:<{w_col}}"
                        f"{severity_upper:<{w_sev}}"
                        f"{finding}")
                lines.append(line)
    else:
        lines.append("No validation issues found!")

    lines.append("")
    lines.append("=" * 120)
    lines.append("END OF REPORT")
    lines.append("=" * 120)

    with open(output_path, 'w') as f:
        f.write('\n'.join(lines))

    return output_path

Backward Compatibility

clifpy.utils.validator.validate_dataframe

validate_dataframe(df, schema, table_name=None, plausibility_thresholds=None)

Validate a dataframe against schema and return list of errors.

This function provides compatibility with CLIF-TableOne's expected validation interface.

Parameters:

Name Type Description Default
df pd.DataFrame, pl.DataFrame, or pl.LazyFrame

Data to validate

required
schema dict

Table schema containing columns, required_columns, etc.

required
table_name str

Name of the table (inferred from schema if not provided)

None
plausibility_thresholds dict

Override default plausibility thresholds per check.

None

Returns:

Type Description
List[Dict[str, Any]]

List of error dictionaries with keys: - type: str - Error type/check name - description: str - Human-readable error description - details: dict - Additional error details - category: str - 'schema' or 'data_quality'

Source code in clifpy/utils/validator.py
def validate_dataframe(
    df: Union[pd.DataFrame, 'pl.DataFrame', 'pl.LazyFrame'],
    schema: Dict[str, Any],
    table_name: Optional[str] = None,
    plausibility_thresholds: Optional[Dict[str, Dict[str, float]]] = None,
) -> List[Dict[str, Any]]:
    """
    Validate a dataframe against schema and return list of errors.

    This function provides compatibility with CLIF-TableOne's expected
    validation interface.

    Parameters
    ----------
    df : pd.DataFrame, pl.DataFrame, or pl.LazyFrame
        Data to validate
    schema : dict
        Table schema containing columns, required_columns, etc.
    table_name : str, optional
        Name of the table (inferred from schema if not provided)
    plausibility_thresholds : dict, optional
        Override default plausibility thresholds per check.

    Returns
    -------
    List[Dict[str, Any]]
        List of error dictionaries with keys:
        - type: str - Error type/check name
        - description: str - Human-readable error description
        - details: dict - Additional error details
        - category: str - 'schema' or 'data_quality'
    """
    table_name = table_name or schema.get('table_name', 'unknown')
    _logger.info("validate_dataframe: starting validation for table '%s'", table_name)
    errors = []

    # Schema-level checks (affect 'incomplete' status)
    schema_checks = ['required_columns', 'column_dtypes']

    # Run conformance checks
    conformance_results = run_conformance_checks(df, schema, table_name)
    for check_name, result in conformance_results.items():
        category = 'schema' if check_name in schema_checks else 'data_quality'

        for err in result.errors:
            errors.append({
                'type': _format_check_type(check_name),
                'description': err['message'],
                'details': err.get('details', {}),
                'category': category
            })

        # Include warnings as data quality issues
        for warn in result.warnings:
            errors.append({
                'type': _format_check_type(check_name),
                'description': warn['message'],
                'details': warn.get('details', {}),
                'category': 'data_quality',
                'severity': 'warning'
            })

    # Run completeness checks
    completeness_results = run_completeness_checks(df, schema, table_name)
    for check_name, result in completeness_results.items():
        for err in result.errors:
            errors.append({
                'type': _format_check_type(check_name),
                'description': err['message'],
                'details': err.get('details', {}),
                'category': 'data_quality'
            })

        for warn in result.warnings:
            errors.append({
                'type': _format_check_type(check_name),
                'description': warn['message'],
                'details': warn.get('details', {}),
                'category': 'data_quality',
                'severity': 'warning'
            })

    # Run plausibility checks
    plausibility_results = run_plausibility_checks(
        df, schema, table_name,
        plausibility_thresholds=plausibility_thresholds,
    )
    for check_name, result in plausibility_results.items():
        for err in result.errors:
            errors.append({
                'type': _format_check_type(check_name),
                'description': err['message'],
                'details': err.get('details', {}),
                'category': 'data_quality'
            })

        for warn in result.warnings:
            errors.append({
                'type': _format_check_type(check_name),
                'description': warn['message'],
                'details': warn.get('details', {}),
                'category': 'data_quality',
                'severity': 'warning'
            })

    gc.collect()
    error_count = sum(1 for e in errors if e.get('severity', 'error') == 'error')
    warning_count = sum(1 for e in errors if e.get('severity') == 'warning')
    _logger.info("validate_dataframe: table '%s' complete — %d errors, %d warnings",
                 table_name, error_count, warning_count)
    return errors

clifpy.utils.validator.format_clifpy_error

format_clifpy_error(error, row_count, table_name)

Format a validation error for display.

Parameters:

Name Type Description Default
error dict

Error dictionary from validate_dataframe()

required
row_count int

Total row count of the table

required
table_name str

Name of the table

required

Returns:

Type Description
dict

Formatted error with type, description, category, and details

Source code in clifpy/utils/validator.py
def format_clifpy_error(
    error: Dict[str, Any],
    row_count: int,
    table_name: str
) -> Dict[str, Any]:
    """
    Format a validation error for display.

    Parameters
    ----------
    error : dict
        Error dictionary from validate_dataframe()
    row_count : int
        Total row count of the table
    table_name : str
        Name of the table

    Returns
    -------
    dict
        Formatted error with type, description, category, and details
    """
    formatted = {
        'type': error.get('type', 'Unknown Error'),
        'description': error.get('description', str(error)),
        'category': error.get('category', 'other'),
        'details': error.get('details', {}),
        'table_name': table_name,
        'row_count': row_count
    }

    # Add severity if present
    if 'severity' in error:
        formatted['severity'] = error['severity']

    return formatted

clifpy.utils.validator.determine_validation_status

determine_validation_status(errors, required_columns=None, table_name=None)

Determine validation status based on errors.

Status Logic: - INCOMPLETE (red): Missing required columns OR non-castable datatype errors OR 100% null in required columns - PARTIAL (yellow): Required columns present but has data quality issues (missing categorical values, high missingness, etc.) - COMPLETE (green): All required columns present, no critical issues

Parameters:

Name Type Description Default
errors list

List of formatted error dictionaries

required
required_columns list

List of required column names

None
table_name str

Name of the table (for table-specific logic)

None

Returns:

Type Description
str

'complete', 'partial', or 'incomplete'

Source code in clifpy/utils/validator.py
def determine_validation_status(
    errors: List[Dict[str, Any]],
    required_columns: Optional[List[str]] = None,
    table_name: Optional[str] = None
) -> str:
    """
    Determine validation status based on errors.

    Status Logic:
    - INCOMPLETE (red): Missing required columns OR non-castable datatype errors
      OR 100% null in required columns
    - PARTIAL (yellow): Required columns present but has data quality issues
      (missing categorical values, high missingness, etc.)
    - COMPLETE (green): All required columns present, no critical issues

    Parameters
    ----------
    errors : list
        List of formatted error dictionaries
    required_columns : list, optional
        List of required column names
    table_name : str, optional
        Name of the table (for table-specific logic)

    Returns
    -------
    str
        'complete', 'partial', or 'incomplete'
    """
    if not errors:
        _logger.debug("determine_validation_status: no errors — status='complete'")
        return 'complete'

    # Check for schema-level errors that make data incomplete
    for error in errors:
        error_type = error.get('type', '').lower()
        category = error.get('category', '')
        details = error.get('details', {})
        severity = error.get('severity', 'error')

        # Missing required columns -> incomplete
        if 'missing required columns' in error_type or 'missing_columns' in details:
            _logger.debug("determine_validation_status: missing required columns — status='incomplete'")
            return 'incomplete'

        # Non-castable data type errors -> incomplete
        if 'data type' in error_type and category == 'schema':
            if details.get('castable') is False:
                _logger.debug("determine_validation_status: non-castable dtype — status='incomplete'")
                return 'incomplete'

        # 100% missingness in required columns -> incomplete
        if 'missingness' in error_type and severity != 'warning':
            pct_missing = details.get('percent_missing', 0)
            column = details.get('column', '')
            if pct_missing >= 100 and required_columns and column in required_columns:
                _logger.debug("determine_validation_status: 100%% null in '%s' — status='incomplete'", column)
                return 'incomplete'

    # Has errors but not critical -> partial
    has_errors = any(
        e.get('severity', 'error') == 'error'
        for e in errors
    )

    if has_errors:
        _logger.debug("determine_validation_status: has non-critical errors — status='partial'")
        return 'partial'

    # Only warnings -> complete
    _logger.debug("determine_validation_status: warnings only — status='complete'")
    return 'complete'

clifpy.utils.validator.classify_errors_by_status_impact

classify_errors_by_status_impact(errors, required_columns, table_name, config_timezone=None)

Classify errors into status-affecting and informational categories.

Used by PDF/report generators to separate critical errors from informational messages.

Parameters:

Name Type Description Default
errors dict

Dictionary with keys 'schema_errors', 'data_quality_issues', 'other_errors'

required
required_columns list

List of required column names

required
table_name str

Name of the table

required
config_timezone str

Configured timezone (to filter timezone-related errors)

None

Returns:

Type Description
dict

Dictionary with 'status_affecting' and 'informational', each containing 'schema_errors', 'data_quality_issues', and 'other_errors' lists

Source code in clifpy/utils/validator.py
def classify_errors_by_status_impact(
    errors: Dict[str, List[Dict[str, Any]]],
    required_columns: List[str],
    table_name: str,
    config_timezone: Optional[str] = None
) -> Dict[str, Dict[str, List[Dict[str, Any]]]]:
    """
    Classify errors into status-affecting and informational categories.

    Used by PDF/report generators to separate critical errors from
    informational messages.

    Parameters
    ----------
    errors : dict
        Dictionary with keys 'schema_errors', 'data_quality_issues', 'other_errors'
    required_columns : list
        List of required column names
    table_name : str
        Name of the table
    config_timezone : str, optional
        Configured timezone (to filter timezone-related errors)

    Returns
    -------
    dict
        Dictionary with 'status_affecting' and 'informational', each containing
        'schema_errors', 'data_quality_issues', and 'other_errors' lists
    """
    status_affecting = {
        'schema_errors': [],
        'data_quality_issues': [],
        'other_errors': []
    }
    informational = {
        'schema_errors': [],
        'data_quality_issues': [],
        'other_errors': []
    }

    # Columns that are optional and shouldn't affect status
    optional_columns = {
        'patient': ['race', 'ethnicity', 'language'],
        'hospitalization': ['discharge_category'],
    }
    table_optional = optional_columns.get(table_name, [])

    # Process each error category
    for category_key in ['schema_errors', 'data_quality_issues', 'other_errors']:
        for error in errors.get(category_key, []):
            error_type = error.get('type', '').lower()
            details = error.get('details', {})
            description = error.get('description', '').lower()

            is_informational = False

            # Filter out timezone errors for configured timezone
            if config_timezone and 'timezone' in error_type:
                if config_timezone.lower() in description:
                    is_informational = True

            # Filter out optional column issues
            column = details.get('column', '')
            if column in table_optional:
                is_informational = True

            # mCIDE coverage gaps are informational
            if 'mcide' in error_type or 'coverage' in error_type:
                is_informational = True

            # Plausibility warnings are informational; plausibility errors affect status
            plausibility_keywords = ['temporal ordering', 'numeric range', 'field plausibility',
                                     'dose unit', 'overlapping', 'distribution shift',
                                     'duplicate composite', 'cross-table temporal']
            is_plausibility = any(p in error_type for p in plausibility_keywords)
            if is_plausibility and error.get('severity') == 'warning':
                is_informational = True

            # Warnings are generally informational
            if error.get('severity') == 'warning':
                is_informational = True

            # Classify into appropriate bucket
            if is_informational:
                informational[category_key].append(error)
            else:
                status_affecting[category_key].append(error)

    return {
        'status_affecting': status_affecting,
        'informational': informational
    }

clifpy.utils.validator.get_validation_summary

get_validation_summary(validation_results)

Generate a text summary of validation results.

Parameters:

Name Type Description Default
validation_results dict

Validation results from validate() method

required

Returns:

Type Description
str

Human-readable summary string

Source code in clifpy/utils/validator.py
def get_validation_summary(validation_results: Dict[str, Any]) -> str:
    """
    Generate a text summary of validation results.

    Parameters
    ----------
    validation_results : dict
        Validation results from validate() method

    Returns
    -------
    str
        Human-readable summary string
    """
    status = validation_results.get('status', 'unknown')
    errors = validation_results.get('errors', {})

    schema_count = len(errors.get('schema_errors', []))
    dq_count = len(errors.get('data_quality_issues', []))
    other_count = len(errors.get('other_errors', []))
    total = schema_count + dq_count + other_count

    status_emoji = {
        'complete': '✓',
        'partial': '⚠',
        'incomplete': '✗'
    }

    summary_parts = [
        f"Status: {status_emoji.get(status, '?')} {status.upper()}"
    ]

    if total > 0:
        summary_parts.append(f"Issues: {total} total")
        if schema_count:
            summary_parts.append(f"  - Schema: {schema_count}")
        if dq_count:
            summary_parts.append(f"  - Data Quality: {dq_count}")
        if other_count:
            summary_parts.append(f"  - Other: {other_count}")
    else:
        summary_parts.append("No issues found")

    return '\n'.join(summary_parts)