Skip to content

Utilities API Reference

CLIFpy provides several utility modules to support data processing and analysis tasks.

Med Unit Converter

The unit converter module provides comprehensive medication dose unit conversion functionality.

clifpy.utils.unit_converter.convert_dose_units_by_med_category

convert_dose_units_by_med_category(med_df, vitals_df=None, preferred_units=None, show_intermediate=False, override=False)

Convert medication dose units to user-defined preferred units for each med_category.

This function performs a two-step conversion process:

  1. Standardizes all dose units to a base set of standard units (mcg/min, ml/min, u/min for rates)
  2. Converts from base units to medication-specific preferred units if provided

The conversion maintains unit class consistency (rates stay rates, amounts stay amounts) and handles weight-based dosing appropriately using patient weights.

Parameters:

Name Type Description Default
med_df DataFrame

Medication DataFrame with required columns:

  • med_dose: Original dose values (numeric)
  • med_dose_unit: Original dose unit strings (e.g., 'MCG/KG/HR', 'mL/hr')
  • med_category: Medication category identifier (e.g., 'propofol', 'fentanyl')
  • weight_kg: Patient weight in kg (optional, will be extracted from vitals_df if missing)
required
vitals_df DataFrame

Vitals DataFrame for extracting patient weights if not in med_df. Required columns if weight_kg missing from med_df:

  • hospitalization_id: Patient identifier
  • recorded_dttm: Timestamp of vital recording
  • vital_category: Must include 'weight_kg' values
  • vital_value: Weight values
None
preferred_units dict

Dictionary mapping medication categories to their preferred units. Keys are medication category names, values are target unit strings. Example: {'propofol': 'mcg/kg/min', 'fentanyl': 'mcg/hr', 'insulin': 'u/hr'} If None, uses base units (mcg/min, ml/min, u/min) as defaults.

None
show_intermediate bool

If False, excludes intermediate calculation columns (multipliers) from output. If True, retains all columns including conversion multipliers for debugging.

False
override bool

If True, prints warning messages for unacceptable preferred units but continues processing. If False, raises ValueError when encountering unacceptable preferred units.

False

Returns:

Type Description
Tuple[DataFrame, DataFrame]

A tuple containing:

  • [0] Converted medication DataFrame with additional columns:

    • _clean_unit: Cleaned unit format
    • _base_unit: Base unit after first conversion
    • _base_dose: Dose value in base units
    • _preferred_unit: Target unit for medication category
    • med_dose_converted: Final dose value in preferred units
    • med_dose_unit_converted: Final unit string after conversion
    • _unit_class: Classification ('rate', 'amount', or 'unrecognized')
    • _convert_status: Status message indicating success or reason for failure

    If show_intermediate=True, also includes conversion multipliers.

  • [1] Summary counts DataFrame with conversion statistics grouped by medication category

Raises:

Type Description
ValueError

If required columns (med_dose_unit, med_dose) are missing from med_df, if standardization to base units fails, or if conversion to preferred units fails.

Examples:

>>> import pandas as pd
>>> med_df = pd.DataFrame({
...     'med_category': ['propofol', 'fentanyl', 'insulin'],
...     'med_dose': [200, 2, 5],
...     'med_dose_unit': ['MCG/KG/MIN', 'mcg/kg/hr', 'units/hr'],
...     'weight_kg': [70, 80, 75]
... })
>>> preferred = {
...     'propofol': 'mcg/kg/min',
...     'fentanyl': 'mcg/hr',
...     'insulin': 'u/hr'
... }
>>> result_df, counts_df = convert_dose_units_by_med_category(med_df, preferred_units=preferred)
Notes

The function handles various unit formats including:

  • Weight-based dosing: /kg, /lb (uses patient weight for conversion)
  • Time conversions: /hr to /min
  • Volume conversions: L to mL
  • Mass conversions: mg, ng, g to mcg
  • Unit conversions: milli-units (mu) to units (u)

Unrecognized units are preserved but flagged in the _unit_class column.

Todo

Implement config file parsing for default preferred_units.

Source code in clifpy/utils/unit_converter.py
def convert_dose_units_by_med_category(
    med_df: pd.DataFrame,
    vitals_df: pd.DataFrame = None,
    preferred_units: dict = None,
    show_intermediate: bool = False,
    override: bool = False
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Convert medication dose units to user-defined preferred units for each med_category.

    This function performs a two-step conversion process:

    1. Standardizes all dose units to a base set of standard units (mcg/min, ml/min, u/min for rates)
    2. Converts from base units to medication-specific preferred units if provided

    The conversion maintains unit class consistency (rates stay rates, amounts stay amounts)
    and handles weight-based dosing appropriately using patient weights.

    Parameters
    ----------
    med_df : pd.DataFrame
        Medication DataFrame with required columns:

        - med_dose: Original dose values (numeric)
        - med_dose_unit: Original dose unit strings (e.g., 'MCG/KG/HR', 'mL/hr')
        - med_category: Medication category identifier (e.g., 'propofol', 'fentanyl')
        - weight_kg: Patient weight in kg (optional, will be extracted from vitals_df if missing)
    vitals_df : pd.DataFrame, optional
        Vitals DataFrame for extracting patient weights if not in med_df.
        Required columns if weight_kg missing from med_df:

        - hospitalization_id: Patient identifier
        - recorded_dttm: Timestamp of vital recording
        - vital_category: Must include 'weight_kg' values
        - vital_value: Weight values
    preferred_units : dict, optional
        Dictionary mapping medication categories to their preferred units.
        Keys are medication category names, values are target unit strings.
        Example: {'propofol': 'mcg/kg/min', 'fentanyl': 'mcg/hr', 'insulin': 'u/hr'}
        If None, uses base units (mcg/min, ml/min, u/min) as defaults.
    show_intermediate : bool, default False
        If False, excludes intermediate calculation columns (multipliers) from output.
        If True, retains all columns including conversion multipliers for debugging.
    override : bool, default False
        If True, prints warning messages for unacceptable preferred units but continues processing.
        If False, raises ValueError when encountering unacceptable preferred units.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame]
        A tuple containing:

        - [0] Converted medication DataFrame with additional columns:

            * _clean_unit: Cleaned unit format
            * _base_unit: Base unit after first conversion
            * _base_dose: Dose value in base units
            * _preferred_unit: Target unit for medication category
            * med_dose_converted: Final dose value in preferred units
            * med_dose_unit_converted: Final unit string after conversion
            * _unit_class: Classification ('rate', 'amount', or 'unrecognized')
            * _convert_status: Status message indicating success or reason for failure

            If show_intermediate=True, also includes conversion multipliers.

        - [1] Summary counts DataFrame with conversion statistics grouped by medication category

    Raises
    ------
    ValueError
        If required columns (med_dose_unit, med_dose) are missing from med_df,
        if standardization to base units fails, or if conversion to preferred units fails.

    Examples
    --------
    >>> import pandas as pd
    >>> med_df = pd.DataFrame({
    ...     'med_category': ['propofol', 'fentanyl', 'insulin'],
    ...     'med_dose': [200, 2, 5],
    ...     'med_dose_unit': ['MCG/KG/MIN', 'mcg/kg/hr', 'units/hr'],
    ...     'weight_kg': [70, 80, 75]
    ... })
    >>> preferred = {
    ...     'propofol': 'mcg/kg/min',
    ...     'fentanyl': 'mcg/hr',
    ...     'insulin': 'u/hr'
    ... }
    >>> result_df, counts_df = convert_dose_units_by_med_category(med_df, preferred_units=preferred)

    Notes
    -----
    The function handles various unit formats including:

    - Weight-based dosing: /kg, /lb (uses patient weight for conversion)
    - Time conversions: /hr to /min
    - Volume conversions: L to mL
    - Mass conversions: mg, ng, g to mcg
    - Unit conversions: milli-units (mu) to units (u)

    Unrecognized units are preserved but flagged in the _unit_class column.

    Todo
    ----
    Implement config file parsing for default preferred_units.
    """
    try:
        med_df_base, _ = standardize_dose_to_base_units(med_df, vitals_df)
    except ValueError as e:
        raise ValueError(f"Error standardizing dose units to base units: {e}")

    # check if the requested med_categories are in the input med_df
    requested_med_categories = set(preferred_units.keys())
    extra_med_categories = requested_med_categories - set(med_df_base['med_category'])
    if extra_med_categories:
        error_msg = f"The following med_categories are given a preferred unit but not found in the input med_df: {extra_med_categories}"
        if override:
            print(error_msg)
        else:
            raise ValueError(error_msg)

    try:
        # join the preferred units to the df
        preferred_units_df = pd.DataFrame(preferred_units.items(), columns=['med_category', '_preferred_unit'])
        q = """
        SELECT l.*
            -- for unspecified preferred units, use the base units by default
            , _preferred_unit: COALESCE(r._preferred_unit, l._base_unit)
        FROM med_df_base l
        LEFT JOIN preferred_units_df r USING (med_category)
        """
        med_df_preferred = duckdb.sql(q).to_df()

        med_df_converted = _convert_base_units_to_preferred_units(med_df_preferred, override=override)
    except ValueError as e:
        raise ValueError(f"Error converting dose units to preferred units: {e}")

    try:
        convert_counts_df = _create_unit_conversion_counts_table(
            med_df_converted, 
            group_by=[
                'med_category',
                'med_dose_unit', '_clean_unit', '_base_unit', '_unit_class',
                '_preferred_unit', 'med_dose_unit_converted', '_convert_status'
                ]
            )
    except ValueError as e:
        raise ValueError(f"Error creating unit conversion counts table: {e}")

    if show_intermediate:
        return med_df_converted, convert_counts_df
    else:
        # the default (detailed_output=False) is to drop multiplier columns which likely are not useful for the user
        multiplier_cols = [col for col in med_df_converted.columns if 'multiplier' in col]
        qa_cols = [
            '_rn',
            '_weight_recorded_dttm',
            '_weighted', '_weighted_preferred',
            '_base_dose', '_base_unit',
            '_preferred_unit',
            '_unit_class_preferred',
            '_unit_subclass', '_unit_subclass_preferred'
            ]

        cols_to_drop = [c for c in multiplier_cols + qa_cols if c in med_df_converted.columns]

        return med_df_converted.drop(columns=cols_to_drop), convert_counts_df

clifpy.utils.unit_converter.standardize_dose_to_base_units

standardize_dose_to_base_units(med_df, vitals_df=None)

Standardize medication dose units to a base set of standard units.

Main public API function that performs complete dose unit standardization pipeline: format cleaning, name cleaning, and unit conversion. Returns both base data and a summary table of conversions.

Parameters:

Name Type Description Default
med_df DataFrame

Medication DataFrame with required columns:

  • med_dose_unit: Original dose unit strings
  • med_dose: Dose values
  • weight_kg: Patient weights (optional, can be added from vitals_df)

Additional columns are preserved in output.

required
vitals_df DataFrame

Vitals DataFrame for extracting patient weights if not in med_df. Required columns if weight_kg missing from med_df:

  • hospitalization_id: Patient identifier
  • recorded_dttm: Timestamp of vital recording
  • vital_category: Must include 'weight_kg' values
  • vital_value: Weight values
None

Returns:

Type Description
Tuple[DataFrame, DataFrame]

A tuple containing:

  • [0] base medication DataFrame with additional columns:

    • _clean_unit: Cleaned unit string
    • _unit_class: 'rate', 'amount', or 'unrecognized'
    • _base_dose: base dose value
    • _base_unit: base unit
    • amount_multiplier, time_multiplier, weight_multiplier: Conversion factors
  • [1] Summary counts DataFrame showing conversion patterns and frequencies

Raises:

Type Description
ValueError

If required columns are missing from med_df.

Examples:

>>> import pandas as pd
>>> med_df = pd.DataFrame({
...     'med_dose': [6, 100, 500],
...     'med_dose_unit': ['MCG/KG/HR', 'mL / hr', 'mg'],
...     'weight_kg': [70, 80, 75]
... })
>>> base_df, counts_df = standardize_dose_to_base_units(med_df)
>>> '_base_unit' in base_df.columns
True
>>> 'count' in counts_df.columns
True
Notes

Standard units for conversion:

  • Rate units: mcg/min, ml/min, u/min (all per minute)
  • Amount units: mcg, ml, u (base units)

The function automatically handles:

  • Weight-based dosing (/kg, /lb) using patient weights
  • Time conversions (per hour to per minute)
  • Volume conversions (L to mL)
  • Mass conversions (mg, ng, g to mcg)
  • Unit conversions (milli-units to units)

Unrecognized units are flagged but preserved in the output.

Source code in clifpy/utils/unit_converter.py
def standardize_dose_to_base_units(
    med_df: pd.DataFrame,
    vitals_df: pd.DataFrame = None
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Standardize medication dose units to a base set of standard units.

    Main public API function that performs complete dose unit standardization
    pipeline: format cleaning, name cleaning, and unit conversion.
    Returns both base data and a summary table of conversions.

    Parameters
    ----------
    med_df : pd.DataFrame
        Medication DataFrame with required columns:

        - med_dose_unit: Original dose unit strings
        - med_dose: Dose values
        - weight_kg: Patient weights (optional, can be added from vitals_df)

        Additional columns are preserved in output.
    vitals_df : pd.DataFrame, optional
        Vitals DataFrame for extracting patient weights if not in med_df.
        Required columns if weight_kg missing from med_df:

        - hospitalization_id: Patient identifier
        - recorded_dttm: Timestamp of vital recording
        - vital_category: Must include 'weight_kg' values
        - vital_value: Weight values

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame]
        A tuple containing:

        - [0] base medication DataFrame with additional columns:

            * _clean_unit: Cleaned unit string
            * _unit_class: 'rate', 'amount', or 'unrecognized'
            * _base_dose: base dose value
            * _base_unit: base unit
            * amount_multiplier, time_multiplier, weight_multiplier: Conversion factors

        - [1] Summary counts DataFrame showing conversion patterns and frequencies

    Raises
    ------
    ValueError
        If required columns are missing from med_df.

    Examples
    --------
    >>> import pandas as pd
    >>> med_df = pd.DataFrame({
    ...     'med_dose': [6, 100, 500],
    ...     'med_dose_unit': ['MCG/KG/HR', 'mL / hr', 'mg'],
    ...     'weight_kg': [70, 80, 75]
    ... })
    >>> base_df, counts_df = standardize_dose_to_base_units(med_df)
    >>> '_base_unit' in base_df.columns
    True
    >>> 'count' in counts_df.columns
    True

    Notes
    -----
    Standard units for conversion:

    - Rate units: mcg/min, ml/min, u/min (all per minute)
    - Amount units: mcg, ml, u (base units)

    The function automatically handles:

    - Weight-based dosing (/kg, /lb) using patient weights
    - Time conversions (per hour to per minute)
    - Volume conversions (L to mL)
    - Mass conversions (mg, ng, g to mcg)
    - Unit conversions (milli-units to units)

    Unrecognized units are flagged but preserved in the output.
    """
    if 'weight_kg' not in med_df.columns:
        print("No weight_kg column found, adding the most recent from vitals")
        query = """
        SELECT m.*
            , v.vital_value as weight_kg
            , v.recorded_dttm as _weight_recorded_dttm
            , ROW_NUMBER() OVER (
                PARTITION BY m.hospitalization_id, m.admin_dttm, m.med_category
                ORDER BY v.recorded_dttm DESC
                ) as _rn
        FROM med_df m
        LEFT JOIN vitals_df v 
            ON m.hospitalization_id = v.hospitalization_id 
            AND v.vital_category = 'weight_kg' AND v.vital_value IS NOT NULL
            AND v.recorded_dttm <= m.admin_dttm  -- only past weights
        -- rn = 1 for the weight w/ the latest recorded_dttm (and thus most recent)
        QUALIFY (_rn = 1 OR _rn IS NULL) 
        ORDER BY m.hospitalization_id, m.admin_dttm, m.med_category, _rn
        """
        med_df = duckdb.sql(query).to_df()

    # check if the required columns are present
    required_columns = {'med_dose_unit', 'med_dose', 'weight_kg'}
    missing_columns = required_columns - set(med_df.columns)
    if missing_columns:
        raise ValueError(f"The following column(s) are required but not found: {missing_columns}")

    med_df['_clean_unit'] = (
        med_df['med_dose_unit'].pipe(_clean_dose_unit_formats)
        .pipe(_clean_dose_unit_names)
    )

    med_df_base = _convert_clean_units_to_base_units(med_df)
    convert_counts_df = _create_unit_conversion_counts_table(
        med_df_base, 
        group_by=['med_dose_unit', '_clean_unit', '_base_unit', '_unit_class']
        )

    return med_df_base, convert_counts_df

Constants and Data Structures

Acceptable Units

clifpy.utils.unit_converter.ACCEPTABLE_AMOUNT_UNITS module-attribute

ACCEPTABLE_AMOUNT_UNITS = {'ml', 'l', 'mu', 'u', 'mcg', 'mg', 'ng', 'g'}

clifpy.utils.unit_converter.ACCEPTABLE_RATE_UNITS module-attribute

ACCEPTABLE_RATE_UNITS = _acceptable_rate_units()

clifpy.utils.unit_converter.ALL_ACCEPTABLE_UNITS module-attribute

ALL_ACCEPTABLE_UNITS = ACCEPTABLE_RATE_UNITS | ACCEPTABLE_AMOUNT_UNITS

Unit Patterns

The following constants define regex patterns for unit classification:

clifpy.utils.unit_converter.MASS_REGEX module-attribute

MASS_REGEX = f'^(mcg|mg|ng|g){AMOUNT_ENDER}'

clifpy.utils.unit_converter.VOLUME_REGEX module-attribute

VOLUME_REGEX = f'^(l|ml){AMOUNT_ENDER}'

clifpy.utils.unit_converter.UNIT_REGEX module-attribute

UNIT_REGEX = f'^(u|mu){AMOUNT_ENDER}'

clifpy.utils.unit_converter.HR_REGEX module-attribute

HR_REGEX = f'/hr$'

clifpy.utils.unit_converter.WEIGHT_REGEX module-attribute

WEIGHT_REGEX = f'/(lb|kg)/'

Conversion Mappings

clifpy.utils.unit_converter.UNIT_NAMING_VARIANTS module-attribute

UNIT_NAMING_VARIANTS = {'/hr': '/h(r|our)?$', '/min': '/m(in|inute)?$', 'u': 'u(nits|nit)?', 'm': 'milli-?', 'l': 'l(iters|itres|itre|iter)?', 'mcg': '^(u|µ|μ)g', 'g': '^g(rams|ram)?'}

clifpy.utils.unit_converter.REGEX_TO_FACTOR_MAPPER module-attribute

REGEX_TO_FACTOR_MAPPER = {HR_REGEX: '1/60', L_REGEX: '1000', MU_REGEX: '1/1000', MG_REGEX: '1000', NG_REGEX: '1/1000', G_REGEX: '1000000', KG_REGEX: 'weight_kg', LB_REGEX: 'weight_kg * 2.20462'}

Internal Functions

The following functions are used internally by the main conversion functions. They are documented here for completeness and advanced usage.

clifpy.utils.unit_converter._clean_dose_unit_formats

_clean_dose_unit_formats(s)

Clean dose unit formatting by removing spaces and converting to lowercase.

This is the first step in the cleaning pipeline. It standardizes the basic formatting of dose units before applying name cleaning.

Parameters:

Name Type Description Default
s Series

Series containing dose unit strings to clean.

required

Returns:

Type Description
Series

Series with cleaned formatting (no spaces, lowercase).

Examples:

>>> import pandas as pd
>>> s = pd.Series(['mL / hr', 'MCG/KG/MIN', ' Mg/Hr '])
>>> result = _clean_dose_unit_formats(s)
>>> list(result)
['ml/hr', 'mcg/kg/min', 'mg/hr']
Notes

This function is typically used as the first step in the cleaning pipeline, followed by _clean_dose_unit_names().

Source code in clifpy/utils/unit_converter.py
def _clean_dose_unit_formats(s: pd.Series) -> pd.Series:
    """Clean dose unit formatting by removing spaces and converting to lowercase.

    This is the first step in the cleaning pipeline. It standardizes
    the basic formatting of dose units before applying name cleaning.

    Parameters
    ----------
    s : pd.Series
        Series containing dose unit strings to clean.

    Returns
    -------
    pd.Series
        Series with cleaned formatting (no spaces, lowercase).

    Examples
    --------
    >>> import pandas as pd
    >>> s = pd.Series(['mL / hr', 'MCG/KG/MIN', ' Mg/Hr '])
    >>> result = _clean_dose_unit_formats(s)
    >>> list(result)
    ['ml/hr', 'mcg/kg/min', 'mg/hr']

    Notes
    -----
    This function is typically used as the first step in the cleaning
    pipeline, followed by _clean_dose_unit_names().
    """
    return s.str.replace(r'\s+', '', regex=True).str.lower().replace('', None, regex=False)

clifpy.utils.unit_converter._clean_dose_unit_names

_clean_dose_unit_names(s)

Clean dose unit name variants to standard abbreviations.

Applies regex patterns to convert various unit name variants to their standard abbreviated forms (e.g., 'milliliter' -> 'ml', 'hour' -> 'hr').

Parameters:

Name Type Description Default
s Series

Series containing dose unit strings with name variants. Should already be format-cleaned (lowercase, no spaces).

required

Returns:

Type Description
Series

Series with clean unit names.

Examples:

>>> import pandas as pd
>>> s = pd.Series(['milliliter/hour', 'units/minute', 'µg/kg/h'])
>>> result = _clean_dose_unit_names(s)
>>> list(result)
['ml/hr', 'u/min', 'mcg/kg/hr']
Notes

Handles conversions including:

  • Time: hour/h -> hr, minute/m -> min
  • Volume: liter/liters/litre/litres -> l
  • Units: units/unit -> u, milli-units -> mu
  • Mass: µg/ug -> mcg, gram -> g

This function should be applied after _clean_dose_unit_formats().

Source code in clifpy/utils/unit_converter.py
def _clean_dose_unit_names(s: pd.Series) -> pd.Series:
    """Clean dose unit name variants to standard abbreviations.

    Applies regex patterns to convert various unit name variants to their
    standard abbreviated forms (e.g., 'milliliter' -> 'ml', 'hour' -> 'hr').

    Parameters
    ----------
    s : pd.Series
        Series containing dose unit strings with name variants.
        Should already be format-cleaned (lowercase, no spaces).

    Returns
    -------
    pd.Series
        Series with clean unit names.

    Examples
    --------
    >>> import pandas as pd
    >>> s = pd.Series(['milliliter/hour', 'units/minute', 'µg/kg/h'])
    >>> result = _clean_dose_unit_names(s)
    >>> list(result)
    ['ml/hr', 'u/min', 'mcg/kg/hr']

    Notes
    -----
    Handles conversions including:

    - Time: hour/h -> hr, minute/m -> min
    - Volume: liter/liters/litre/litres -> l
    - Units: units/unit -> u, milli-units -> mu
    - Mass: µg/ug -> mcg, gram -> g

    This function should be applied after _clean_dose_unit_formats().
    """
    for repl, pattern in UNIT_NAMING_VARIANTS.items():
        s = s.str.replace(pattern, repl, regex=True)
    return s

clifpy.utils.unit_converter._convert_clean_units_to_base_units

_convert_clean_units_to_base_units(med_df)

Convert clean dose units to base units.

Core conversion function that transforms various dose units into a base set of standard units (mcg/min, ml/min, u/min for rates; mcg, ml, u for amounts). Uses DuckDB for efficient SQL-based transformations.

Parameters:

Name Type Description Default
med_df DataFrame

DataFrame containing medication data with required columns:

  • _clean_unit: Cleaned unit strings
  • med_dose: Original dose values
  • weight_kg: Patient weight (used for /kg and /lb conversions)
required

Returns:

Type Description
DataFrame

Original DataFrame with additional columns:

  • _unit_class: 'rate', 'amount', or 'unrecognized'
  • _amount_multiplier: Factor for amount conversion
  • _time_multiplier: Factor for time conversion (hr to min)
  • _weight_multiplier: Factor for weight-based conversion
  • _base_dose: base dose value
  • _base_unit: base unit string

Examples:

>>> import pandas as pd
>>> df = pd.DataFrame({
...     'med_dose': [6, 100],
...     '_clean_unit': ['mcg/kg/hr', 'ml/hr'],
...     'weight_kg': [70, 80]
... })
>>> result = _convert_clean_dose_units_to_base_units(df)
>>> 'mcg/min' in result['_base_unit'].values
True
>>> 'ml/min' in result['_base_unit'].values
True
Notes

Conversion targets:

  • Rate units: mcg/min, ml/min, u/min
  • Amount units: mcg, ml, u
  • Unrecognized units: original dose and (cleaned) unit will be preserved

Weight-based conversions use patient weight from weight_kg column. Time conversions: /hr -> /min (divide by 60).

Source code in clifpy/utils/unit_converter.py
def _convert_clean_units_to_base_units(med_df: pd.DataFrame) -> pd.DataFrame:
    """Convert clean dose units to base units.

    Core conversion function that transforms various dose units into a base
    set of standard units (mcg/min, ml/min, u/min for rates; mcg, ml, u for amounts).
    Uses DuckDB for efficient SQL-based transformations.

    Parameters
    ----------
    med_df : pd.DataFrame
        DataFrame containing medication data with required columns:

        - _clean_unit: Cleaned unit strings
        - med_dose: Original dose values
        - weight_kg: Patient weight (used for /kg and /lb conversions)

    Returns
    -------
    pd.DataFrame
        Original DataFrame with additional columns:

        - _unit_class: 'rate', 'amount', or 'unrecognized'
        - _amount_multiplier: Factor for amount conversion
        - _time_multiplier: Factor for time conversion (hr to min)
        - _weight_multiplier: Factor for weight-based conversion
        - _base_dose: base dose value
        - _base_unit: base unit string

    Examples
    --------
    >>> import pandas as pd
    >>> df = pd.DataFrame({
    ...     'med_dose': [6, 100],
    ...     '_clean_unit': ['mcg/kg/hr', 'ml/hr'],
    ...     'weight_kg': [70, 80]
    ... })
    >>> result = _convert_clean_dose_units_to_base_units(df)
    >>> 'mcg/min' in result['_base_unit'].values
    True
    >>> 'ml/min' in result['_base_unit'].values
    True

    Notes
    -----
    Conversion targets:

    - Rate units: mcg/min, ml/min, u/min
    - Amount units: mcg, ml, u
    - Unrecognized units: original dose and (cleaned) unit will be preserved

    Weight-based conversions use patient weight from weight_kg column.
    Time conversions: /hr -> /min (divide by 60).
    """

    amount_clause = _concat_builders_by_patterns(
        builder=_pattern_to_factor_builder_for_base,
        patterns=[L_REGEX, MU_REGEX, MG_REGEX, NG_REGEX, G_REGEX],
        else_case='1'
        )

    time_clause = _concat_builders_by_patterns(
        builder=_pattern_to_factor_builder_for_base,
        patterns=[HR_REGEX],
        else_case='1'
        )

    weight_clause = _concat_builders_by_patterns(
        builder=_pattern_to_factor_builder_for_base,
        patterns=[KG_REGEX, LB_REGEX],
        else_case='1'
        )

    q = f"""
    SELECT *
        -- classify and check acceptability first
        , _unit_class: CASE
            WHEN _clean_unit IN ('{RATE_UNITS_STR}') THEN 'rate' 
            WHEN _clean_unit IN ('{AMOUNT_UNITS_STR}') THEN 'amount'
            ELSE 'unrecognized' END
        -- mark if the input unit is adjusted by weight (e.g. 'mcg/kg/hr')
        , _weighted: CASE
            WHEN regexp_matches(_clean_unit, '{WEIGHT_REGEX}') THEN 1 ELSE 0 END
        -- parse and generate multipliers
        , _amount_multiplier: CASE
            WHEN _unit_class = 'unrecognized' THEN 1 ELSE ({amount_clause}) END 
        , _time_multiplier: CASE
            WHEN _unit_class = 'unrecognized' THEN 1 ELSE ({time_clause}) END 
        , _weight_multiplier: CASE
            WHEN _unit_class = 'unrecognized' THEN 1 ELSE ({weight_clause}) END
        -- calculate the base dose
        , _base_dose: CASE
            -- when the input unit is weighted but weight_kg is missing, keep the original dose
            WHEN _weighted = 1 AND weight_kg IS NULL THEN med_dose
            ELSE med_dose * _amount_multiplier * _time_multiplier * _weight_multiplier 
            END
        -- id the base unit
        , _base_unit: CASE 
            -- when the input unit is weighted but weight_kg is missing, keep the original dose
            WHEN _weighted = 1 AND weight_kg IS NULL THEN _clean_unit
            WHEN _unit_class = 'unrecognized' THEN _clean_unit
            WHEN _unit_class = 'rate' AND regexp_matches(_clean_unit, '{MASS_REGEX}') THEN 'mcg/min'
            WHEN _unit_class = 'rate' AND regexp_matches(_clean_unit, '{VOLUME_REGEX}') THEN 'ml/min'
            WHEN _unit_class = 'rate' AND regexp_matches(_clean_unit, '{UNIT_REGEX}') THEN 'u/min'
            WHEN _unit_class = 'amount' AND regexp_matches(_clean_unit, '{MASS_REGEX}') THEN 'mcg'
            WHEN _unit_class = 'amount' AND regexp_matches(_clean_unit, '{VOLUME_REGEX}') THEN 'ml'
            WHEN _unit_class = 'amount' AND regexp_matches(_clean_unit, '{UNIT_REGEX}') THEN 'u'
            END
    FROM med_df 
    """
    return duckdb.sql(q).to_df()

clifpy.utils.unit_converter._convert_base_units_to_preferred_units

_convert_base_units_to_preferred_units(med_df, override=False)

Convert base standardized units to user-preferred units.

Performs the second stage of unit conversion, transforming from standardized base units (mcg/min, ml/min, u/min) to medication-specific preferred units while maintaining unit class consistency.

Parameters:

Name Type Description Default
med_df DataFrame

DataFrame with required columns from first-stage conversion:

  • _base_dose: Dose values in standardized units
  • _base_unit: Standardized unit strings (may be NULL)
  • _preferred_unit: Target unit strings for each medication
  • weight_kg: Patient weights (optional, used for weight-based conversions)
required
override bool

If True, prints warnings but continues when encountering:

  • Unacceptable preferred units not in ALL_ACCEPTABLE_UNITS
  • Cross-class conversions (e.g., rate to amount)
  • Cross-subclass conversions (e.g., mass to volume)

If False, raises ValueError for these conditions.

False

Returns:

Type Description
DataFrame

Original DataFrame with additional columns:

  • _unit_class: Classification of base unit ('rate', 'amount', 'unrecognized')
  • _unit_subclass: Subclassification ('mass', 'volume', 'unit', 'unrecognized')
  • _unit_class_preferred: Classification of preferred unit
  • _unit_subclass_preferred: Subclassification of preferred unit
  • _convert_status: Success or failure reason message
  • _amount_multiplier_preferred: Conversion factor for amount units
  • _time_multiplier_preferred: Conversion factor for time units
  • _weight_multiplier_preferred: Conversion factor for weight-based units
  • med_dose_converted: Final converted dose value
  • med_dose_unit_converted: Final unit string after conversion

Raises:

Type Description
ValueError

If required columns are missing from med_df or if preferred units are not in ALL_ACCEPTABLE_UNITS (when override=False).

Notes

Conversion rules enforced:

  • Conversions only allowed within same unit class (rate→rate, amount→amount)
  • Cannot convert between incompatible subclasses (e.g., mass→volume)
  • When conversion fails, falls back to base units and dose values
  • Missing units (NULL) are handled with 'original unit is missing' status

The function uses DuckDB SQL for efficient processing and applies regex pattern matching to classify units and calculate conversion factors.

See Also

_convert_clean_dose_units_to_base_units : First-stage conversion convert_dose_units_by_med_category : Public API for complete conversion pipeline

Source code in clifpy/utils/unit_converter.py
def _convert_base_units_to_preferred_units(
    med_df: pd.DataFrame,
    override: bool = False
    ) -> pd.DataFrame:
    """Convert base standardized units to user-preferred units.

    Performs the second stage of unit conversion, transforming from standardized
    base units (mcg/min, ml/min, u/min) to medication-specific preferred units
    while maintaining unit class consistency.

    Parameters
    ----------
    med_df : pd.DataFrame
        DataFrame with required columns from first-stage conversion:

        - _base_dose: Dose values in standardized units
        - _base_unit: Standardized unit strings (may be NULL)
        - _preferred_unit: Target unit strings for each medication
        - weight_kg: Patient weights (optional, used for weight-based conversions)
    override : bool, default False
        If True, prints warnings but continues when encountering:

        - Unacceptable preferred units not in ALL_ACCEPTABLE_UNITS
        - Cross-class conversions (e.g., rate to amount)
        - Cross-subclass conversions (e.g., mass to volume)

        If False, raises ValueError for these conditions.

    Returns
    -------
    pd.DataFrame
        Original DataFrame with additional columns:

        - _unit_class: Classification of base unit ('rate', 'amount', 'unrecognized')
        - _unit_subclass: Subclassification ('mass', 'volume', 'unit', 'unrecognized')
        - _unit_class_preferred: Classification of preferred unit
        - _unit_subclass_preferred: Subclassification of preferred unit
        - _convert_status: Success or failure reason message
        - _amount_multiplier_preferred: Conversion factor for amount units
        - _time_multiplier_preferred: Conversion factor for time units
        - _weight_multiplier_preferred: Conversion factor for weight-based units
        - med_dose_converted: Final converted dose value
        - med_dose_unit_converted: Final unit string after conversion

    Raises
    ------
    ValueError
        If required columns are missing from med_df or if preferred units are not
        in ALL_ACCEPTABLE_UNITS (when override=False).

    Notes
    -----
    Conversion rules enforced:

    - Conversions only allowed within same unit class (rate→rate, amount→amount)
    - Cannot convert between incompatible subclasses (e.g., mass→volume)
    - When conversion fails, falls back to base units and dose values
    - Missing units (NULL) are handled with 'original unit is missing' status

    The function uses DuckDB SQL for efficient processing and applies regex
    pattern matching to classify units and calculate conversion factors.

    See Also
    --------
    _convert_clean_dose_units_to_base_units : First-stage conversion
    convert_dose_units_by_med_category : Public API for complete conversion pipeline
    """
    # check presense of all required columns
    required_columns = {'_base_dose', '_preferred_unit'}
    missing_columns = required_columns - set(med_df.columns)
    if missing_columns:
        raise ValueError(f"The following column(s) are required but not found: {missing_columns}")

    # check user-defined _preferred_unit are in the set of acceptable units
    unacceptable_preferred_units = set(med_df['_preferred_unit']) - ALL_ACCEPTABLE_UNITS - {None}
    if unacceptable_preferred_units:
        error_msg = f"Cannot accommodate the conversion to the following preferred units: {unacceptable_preferred_units}. Consult the function documentation for a list of acceptable units."
        if override:
            print(error_msg)
        else:
            raise ValueError(error_msg)

    amount_clause = _concat_builders_by_patterns(
        builder=_pattern_to_factor_builder_for_preferred,
        patterns=[L_REGEX, MU_REGEX, MG_REGEX, NG_REGEX, G_REGEX],
        else_case='1'
        )

    time_clause = _concat_builders_by_patterns(
        builder=_pattern_to_factor_builder_for_preferred,
        patterns=[HR_REGEX],
        else_case='1'
        )

    weight_clause = _concat_builders_by_patterns(
        builder=_pattern_to_factor_builder_for_preferred,
        patterns=[KG_REGEX, LB_REGEX],
        else_case='1'
        )

    unit_class_clause = f"""
    , _unit_class: CASE
        WHEN _base_unit IN ('{RATE_UNITS_STR}') THEN 'rate' 
        WHEN _base_unit IN ('{AMOUNT_UNITS_STR}') THEN 'amount'
        ELSE 'unrecognized' END
    """ if '_unit_class' not in med_df.columns else ''

    weighted_clause = f"""
    , _weighted: CASE
        WHEN regexp_matches(_clean_unit, '{WEIGHT_REGEX}') THEN 1 ELSE 0 END
    """ if '_weighted' not in med_df.columns else ''

    dose_converted_name = "med_dose" if "med_dose" in med_df.columns else "_base_dose"
    unit_converted_name = "_clean_unit" if "_clean_unit" in med_df.columns else "_base_unit"

    q = f"""
    SELECT l.*
        {unit_class_clause}
        , _unit_subclass: CASE 
            WHEN regexp_matches(_base_unit, '{MASS_REGEX}') THEN 'mass'
            WHEN regexp_matches(_base_unit, '{VOLUME_REGEX}') THEN 'volume'
            WHEN regexp_matches(_base_unit, '{UNIT_REGEX}') THEN 'unit'
            ELSE 'unrecognized' END
        , _unit_class_preferred: CASE 
            WHEN _preferred_unit IN ('{RATE_UNITS_STR}') THEN 'rate' 
            WHEN _preferred_unit IN ('{AMOUNT_UNITS_STR}') THEN 'amount'
            ELSE 'unrecognized' END
        , _unit_subclass_preferred: CASE 
            WHEN regexp_matches(_preferred_unit, '{MASS_REGEX}') THEN 'mass'
            WHEN regexp_matches(_preferred_unit, '{VOLUME_REGEX}') THEN 'volume'
            WHEN regexp_matches(_preferred_unit, '{UNIT_REGEX}') THEN 'unit'
            ELSE 'unrecognized' END
        , _weighted_preferred: CASE
            WHEN regexp_matches(_preferred_unit, '{WEIGHT_REGEX}') THEN 1 ELSE 0 END
        , _convert_status: CASE 
            WHEN _weighted_preferred = 1 AND weight_kg IS NULL 
                THEN 'cannot convert to a weighted unit if weight_kg is missing'
            WHEN _base_unit IS NULL THEN 'original unit is missing'
            WHEN _unit_class == 'unrecognized' OR _unit_subclass == 'unrecognized'
                THEN 'original unit ' || _base_unit || ' is not recognized'
            WHEN _unit_class_preferred == 'unrecognized' OR _unit_subclass_preferred == 'unrecognized'
                THEN 'user-preferred unit ' || _preferred_unit || ' is not recognized'
            WHEN _unit_class != _unit_class_preferred 
                THEN 'cannot convert ' || _unit_class || ' to ' || _unit_class_preferred
            WHEN _unit_subclass != _unit_subclass_preferred
                THEN 'cannot convert ' || _unit_subclass || ' to ' || _unit_subclass_preferred
            WHEN _unit_class == _unit_class_preferred AND _unit_subclass == _unit_subclass_preferred
                -- AND _unit_class != 'unrecognized' AND _unit_subclass != 'unrecognized'
                THEN 'success'
            ELSE 'other error - please report'
            END
        , _amount_multiplier_preferred: {amount_clause}
        , _time_multiplier_preferred: {time_clause}
        , _weight_multiplier_preferred: {weight_clause}
        -- fall back to the base units and dose (i.e. the input) if conversion cannot be accommondated
        , med_dose_converted: CASE
            WHEN _convert_status == 'success' THEN _base_dose * _amount_multiplier_preferred * _time_multiplier_preferred * _weight_multiplier_preferred
            ELSE {dose_converted_name}
            END
        , med_dose_unit_converted: CASE
            WHEN _convert_status == 'success' THEN _preferred_unit
            ELSE {unit_converted_name}
            END
    FROM med_df l
    """
    return duckdb.sql(q).to_df()

clifpy.utils.unit_converter._create_unit_conversion_counts_table

_create_unit_conversion_counts_table(med_df, group_by)

Create summary table of unit conversion counts.

Generates a grouped summary showing the frequency of each unit conversion pattern, useful for data quality assessment and identifying common or problematic unit patterns.

Parameters:

Name Type Description Default
med_df DataFrame

DataFrame with required columns from conversion process:

  • med_dose_unit: Original unit string
  • _clean_unit: Cleaned unit string
  • _base_unit: base standard unit
  • _unit_class: Classification (rate/amount/unrecognized)
required
group_by List[str]

List of columns to group by.

required

Returns:

Type Description
DataFrame

Summary DataFrame with columns:

  • med_dose_unit: Original unit
  • _clean_unit: After cleaning
  • _base_unit: After conversion
  • _unit_class: Classification
  • count: Number of occurrences

Raises:

Type Description
ValueError

If required columns are missing from input DataFrame.

Examples:

>>> import pandas as pd
>>> # df_base = standardize_dose_to_base_units(med_df)[0]
>>> # counts = _create_unit_conversion_counts_table(df_base, ['med_dose_unit'])
>>> # 'count' in counts.columns
True
Notes

This table is particularly useful for:

  • Identifying unrecognized units that need handling
  • Understanding the distribution of unit types in your data
  • Quality control and validation of conversions
Source code in clifpy/utils/unit_converter.py
def _create_unit_conversion_counts_table(
    med_df: pd.DataFrame,
    group_by: List[str]
    ) -> pd.DataFrame:
    """Create summary table of unit conversion counts.

    Generates a grouped summary showing the frequency of each unit conversion
    pattern, useful for data quality assessment and identifying common or
    problematic unit patterns.

    Parameters
    ----------
    med_df : pd.DataFrame
        DataFrame with required columns from conversion process:

        - med_dose_unit: Original unit string
        - _clean_unit: Cleaned unit string
        - _base_unit: base standard unit
        - _unit_class: Classification (rate/amount/unrecognized)
    group_by : List[str]
        List of columns to group by.

    Returns
    -------
    pd.DataFrame
        Summary DataFrame with columns:

        - med_dose_unit: Original unit
        - _clean_unit: After cleaning
        - _base_unit: After conversion
        - _unit_class: Classification
        - count: Number of occurrences

    Raises
    ------
    ValueError
        If required columns are missing from input DataFrame.

    Examples
    --------
    >>> import pandas as pd
    >>> # df_base = standardize_dose_to_base_units(med_df)[0]
    >>> # counts = _create_unit_conversion_counts_table(df_base, ['med_dose_unit'])
    >>> # 'count' in counts.columns
    True

    Notes
    -----
    This table is particularly useful for:

    - Identifying unrecognized units that need handling
    - Understanding the distribution of unit types in your data
    - Quality control and validation of conversions
    """
    # check presense of all the group by columns
    # required_columns = {'med_dose_unit', 'med_dose_unit_normalized', 'med_dose_unit_limited', 'unit_class'}
    missing_columns = set(group_by) - set(med_df.columns)
    if missing_columns:
        raise ValueError(f"The following column(s) are required but not found: {missing_columns}")

    # build the string that enumerates the group by columns 
    # e.g. 'med_dose_unit, med_dose_unit_normalized, unit_class'
    cols_enum_str = f"{', '.join(group_by)}"
    order_by_clause = f"med_category, count DESC" if 'med_category' in group_by else "count DESC"

    q = f"""
    SELECT {cols_enum_str}   
        , COUNT(*) as count
    FROM med_df
    GROUP BY {cols_enum_str}
    ORDER BY {order_by_clause}
    """
    return duckdb.sql(q).to_df()

clifpy.utils.unit_converter._convert_set_to_str_for_sql

_convert_set_to_str_for_sql(s)

Convert a set of strings to SQL IN clause format.

Transforms a Python set into a comma-separated string suitable for use in SQL IN clauses within DuckDB queries.

Parameters:

Name Type Description Default
s Set[str]

Set of strings to be formatted for SQL.

required

Returns:

Type Description
str

Comma-separated string with items separated by "','". Does not include outer quotes - those are added in SQL query.

Examples:

>>> units = {'ml/hr', 'mcg/min', 'u/hr'}
>>> _convert_set_to_str_for_sql(units)
"ml/hr','mcg/min','u/hr"

Usage in SQL queries:

>>> # f"WHERE unit IN ('{_convert_set_to_str_for_sql(units)}')"
Notes

This is a helper function for building DuckDB SQL queries that need to check if values are in a set of acceptable units.

Source code in clifpy/utils/unit_converter.py
def _convert_set_to_str_for_sql(s: Set[str]) -> str:
    """Convert a set of strings to SQL IN clause format.

    Transforms a Python set into a comma-separated string suitable for use
    in SQL IN clauses within DuckDB queries.

    Parameters
    ----------
    s : Set[str]
        Set of strings to be formatted for SQL.

    Returns
    -------
    str
        Comma-separated string with items separated by "','".
        Does not include outer quotes - those are added in SQL query.

    Examples
    --------
    >>> units = {'ml/hr', 'mcg/min', 'u/hr'}
    >>> _convert_set_to_str_for_sql(units)
    "ml/hr','mcg/min','u/hr"

    Usage in SQL queries:

    >>> # f"WHERE unit IN ('{_convert_set_to_str_for_sql(units)}')"

    Notes
    -----
    This is a helper function for building DuckDB SQL queries that need to check
    if values are in a set of acceptable units.
    """
    return "','".join(s)

clifpy.utils.unit_converter._concat_builders_by_patterns

_concat_builders_by_patterns(builder, patterns, else_case='1')

Concatenate multiple SQL CASE WHEN statements from patterns.

Helper function that combines multiple regex pattern builders into a single SQL CASE statement for DuckDB queries. Used internally to build conversion factor calculations for different unit components (amount, time, weight).

Parameters:

Name Type Description Default
builder callable

Function that generates CASE WHEN clauses from regex patterns. Should accept a pattern string and return a WHEN...THEN clause.

required
patterns list

List of regex patterns to process with the builder function.

required
else_case str

Value to use in the ELSE clause when no patterns match. Default is '1' (no conversion factor).

'1'

Returns:

Type Description
str

Complete SQL CASE statement with all pattern conditions.

Examples:

>>> patterns = ['/hr$', '/min$']
>>> builder = lambda p: f"WHEN regexp_matches(col, '{p}') THEN factor"
>>> result = _concat_builders_by_patterns(builder, patterns)
>>> 'CASE WHEN' in result and 'ELSE 1 END' in result
True
Notes

This function is used internally by conversion functions to build SQL queries that apply different conversion factors based on unit patterns.

Source code in clifpy/utils/unit_converter.py
def _concat_builders_by_patterns(builder: callable, patterns: list, else_case: str = '1') -> str:
    """Concatenate multiple SQL CASE WHEN statements from patterns.

    Helper function that combines multiple regex pattern builders into a single
    SQL CASE statement for DuckDB queries. Used internally to build conversion
    factor calculations for different unit components (amount, time, weight).

    Parameters
    ----------
    builder : callable
        Function that generates CASE WHEN clauses from regex patterns.
        Should accept a pattern string and return a WHEN...THEN clause.
    patterns : list
        List of regex patterns to process with the builder function.
    else_case : str, default '1'
        Value to use in the ELSE clause when no patterns match.
        Default is '1' (no conversion factor).

    Returns
    -------
    str
        Complete SQL CASE statement with all pattern conditions.

    Examples
    --------
    >>> patterns = ['/hr$', '/min$']
    >>> builder = lambda p: f"WHEN regexp_matches(col, '{p}') THEN factor"
    >>> result = _concat_builders_by_patterns(builder, patterns)
    >>> 'CASE WHEN' in result and 'ELSE 1 END' in result
    True

    Notes
    -----
    This function is used internally by conversion functions to build
    SQL queries that apply different conversion factors based on unit patterns.
    """
    return "CASE " + " ".join([builder(pattern) for pattern in patterns]) + f" ELSE {else_case} END"

clifpy.utils.unit_converter._pattern_to_factor_builder_for_base

_pattern_to_factor_builder_for_base(pattern)

Build SQL CASE WHEN statement for regex pattern matching.

Helper function that generates SQL CASE WHEN clauses for DuckDB queries based on regex patterns and their corresponding conversion factors.

Parameters:

Name Type Description Default
pattern str

Regex pattern to match (must exist in REGEX_TO_FACTOR_MAPPER).

required

Returns:

Type Description
str

SQL CASE WHEN clause string.

Raises:

Type Description
ValueError

If the pattern is not found in REGEX_TO_FACTOR_MAPPER.

Examples:

>>> clause = _pattern_to_factor_builder_for_base(HR_REGEX)
>>> 'WHEN regexp_matches' in clause and 'THEN' in clause
True
Notes

This function is used internally by _convert_clean_dose_units_to_base_units to build the SQL query for unit conversion.

Source code in clifpy/utils/unit_converter.py
def _pattern_to_factor_builder_for_base(pattern: str) -> str:
    """Build SQL CASE WHEN statement for regex pattern matching.

    Helper function that generates SQL CASE WHEN clauses for DuckDB queries
    based on regex patterns and their corresponding conversion factors.

    Parameters
    ----------
    pattern : str
        Regex pattern to match (must exist in REGEX_TO_FACTOR_MAPPER).

    Returns
    -------
    str
        SQL CASE WHEN clause string.

    Raises
    ------
    ValueError
        If the pattern is not found in REGEX_TO_FACTOR_MAPPER.

    Examples
    --------
    >>> clause = _pattern_to_factor_builder_for_base(HR_REGEX)
    >>> 'WHEN regexp_matches' in clause and 'THEN' in clause
    True

    Notes
    -----
    This function is used internally by _convert_clean_dose_units_to_base_units
    to build the SQL query for unit conversion.
    """
    if pattern in REGEX_TO_FACTOR_MAPPER:
        return f"WHEN regexp_matches(_clean_unit, '{pattern}') THEN {REGEX_TO_FACTOR_MAPPER.get(pattern)}"
    raise ValueError(f"regex pattern {pattern} not found in REGEX_TO_FACTOR_MAPPER dict")

clifpy.utils.unit_converter._pattern_to_factor_builder_for_preferred

_pattern_to_factor_builder_for_preferred(pattern)

Build SQL CASE WHEN statement for preferred unit conversion.

Generates SQL clauses for converting from base units back to preferred units by applying the inverse of the original conversion factor. Used when converting from standardized base units to medication-specific preferred units.

Parameters:

Name Type Description Default
pattern str

Regex pattern to match in _preferred_unit column. Must exist in REGEX_TO_FACTOR_MAPPER dictionary.

required

Returns:

Type Description
str

SQL CASE WHEN clause with inverse conversion factor.

Raises:

Type Description
ValueError

If the pattern is not found in REGEX_TO_FACTOR_MAPPER.

Examples:

>>> clause = _pattern_to_factor_builder_for_preferred('/hr$')
>>> 'WHEN regexp_matches(_preferred_unit' in clause and 'THEN 1/' in clause
True
Notes

This function applies the inverse of the factor used in _pattern_to_factor_builder_for_base, allowing bidirectional conversion between unit systems. The inverse is calculated as 1/(original_factor).

See Also

_pattern_to_factor_builder_for_base : Builds patterns for base unit conversion

Source code in clifpy/utils/unit_converter.py
def _pattern_to_factor_builder_for_preferred(pattern: str) -> str:
    """Build SQL CASE WHEN statement for preferred unit conversion.

    Generates SQL clauses for converting from base units back to preferred units
    by applying the inverse of the original conversion factor. Used when converting
    from standardized base units to medication-specific preferred units.

    Parameters
    ----------
    pattern : str
        Regex pattern to match in _preferred_unit column.
        Must exist in REGEX_TO_FACTOR_MAPPER dictionary.

    Returns
    -------
    str
        SQL CASE WHEN clause with inverse conversion factor.

    Raises
    ------
    ValueError
        If the pattern is not found in REGEX_TO_FACTOR_MAPPER.

    Examples
    --------
    >>> clause = _pattern_to_factor_builder_for_preferred('/hr$')
    >>> 'WHEN regexp_matches(_preferred_unit' in clause and 'THEN 1/' in clause
    True

    Notes
    -----
    This function applies the inverse of the factor used in
    _pattern_to_factor_builder_for_base, allowing bidirectional conversion
    between unit systems. The inverse is calculated as 1/(original_factor).

    See Also
    --------
    _pattern_to_factor_builder_for_base : Builds patterns for base unit conversion
    """
    if pattern in REGEX_TO_FACTOR_MAPPER:
        return f"WHEN regexp_matches(_preferred_unit, '{pattern}') THEN 1/({REGEX_TO_FACTOR_MAPPER.get(pattern)})"
    raise ValueError(f"regex pattern {pattern} not found in REGEX_TO_FACTOR_MAPPER dict")

This section documents the utility functions available in CLIFpy for data processing, validation, and specialized operations.

Core Data Processing

Encounter Stitching

Stitch together hospital encounters that occur within a specified time window, useful for treating rapid readmissions as a single continuous encounter.

clifpy.utils.stitching_encounters.stitch_encounters

stitch_encounters(hospitalization, adt, time_interval=6)

Stitches together related hospital encounters that occur within a specified time interval.

This function identifies and groups hospitalizations that occur within a specified time window of each other (default 6 hours), treating them as a single continuous encounter. This is useful for handling cases where patients are discharged and readmitted quickly (e.g., ED to inpatient transfers).

Parameters:

Name Type Description Default
hospitalization DataFrame

Hospitalization table with required columns: - patient_id - hospitalization_id - admission_dttm - discharge_dttm - age_at_admission - admission_type_category - discharge_category

required
adt DataFrame

ADT (Admission/Discharge/Transfer) table with required columns: - hospitalization_id - in_dttm - out_dttm - location_category - hospital_id

required
time_interval int

Number of hours between discharge and next admission to consider encounters linked. If a patient is readmitted within this window, the encounters are stitched together.

6

Returns:

Type Description
Tuple[DataFrame, DataFrame, DataFrame]

hospitalization_stitched : pd.DataFrame Enhanced hospitalization data with encounter_block column adt_stitched : pd.DataFrame Enhanced ADT data with encounter_block column encounter_mapping : pd.DataFrame Mapping of hospitalization_id to encounter_block

Raises:

Type Description
ValueError

If required columns are missing from input DataFrames

Examples:

>>> hosp_stitched, adt_stitched, mapping = stitch_encounters(
...     hospitalization_df, 
...     adt_df, 
...     time_interval=12  # 12-hour window
... )
Source code in clifpy/utils/stitching_encounters.py
def stitch_encounters(
    hospitalization: pd.DataFrame, 
    adt: pd.DataFrame, 
    time_interval: int = 6
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Stitches together related hospital encounters that occur within a specified time interval.

    This function identifies and groups hospitalizations that occur within a specified time window
    of each other (default 6 hours), treating them as a single continuous encounter. This is useful
    for handling cases where patients are discharged and readmitted quickly (e.g., ED to inpatient
    transfers).

    Parameters
    ----------
    hospitalization : pd.DataFrame
        Hospitalization table with required columns:
        - patient_id
        - hospitalization_id
        - admission_dttm
        - discharge_dttm
        - age_at_admission
        - admission_type_category
        - discharge_category

    adt : pd.DataFrame
        ADT (Admission/Discharge/Transfer) table with required columns:
        - hospitalization_id
        - in_dttm
        - out_dttm
        - location_category
        - hospital_id

    time_interval : int, default=6
        Number of hours between discharge and next admission to consider encounters linked.
        If a patient is readmitted within this window, the encounters are stitched together.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
        hospitalization_stitched : pd.DataFrame
            Enhanced hospitalization data with encounter_block column
        adt_stitched : pd.DataFrame
            Enhanced ADT data with encounter_block column
        encounter_mapping : pd.DataFrame
            Mapping of hospitalization_id to encounter_block

    Raises
    ------
    ValueError
        If required columns are missing from input DataFrames

    Examples
    --------
    >>> hosp_stitched, adt_stitched, mapping = stitch_encounters(
    ...     hospitalization_df, 
    ...     adt_df, 
    ...     time_interval=12  # 12-hour window
    ... )
    """
    # Validate input DataFrames
    hosp_required_cols = [
        "patient_id", "hospitalization_id", "admission_dttm", 
        "discharge_dttm", "age_at_admission", "admission_type_category", 
        "discharge_category"
    ]
    adt_required_cols = [
        "hospitalization_id", "in_dttm", "out_dttm", 
        "location_category", "hospital_id"
    ]

    missing_hosp_cols = [col for col in hosp_required_cols if col not in hospitalization.columns]
    if missing_hosp_cols:
        raise ValueError(f"Missing required columns in hospitalization DataFrame: {missing_hosp_cols}")

    missing_adt_cols = [col for col in adt_required_cols if col not in adt.columns]
    if missing_adt_cols:
        raise ValueError(f"Missing required columns in ADT DataFrame: {missing_adt_cols}")
    hospitalization_filtered = hospitalization[["patient_id","hospitalization_id","admission_dttm",
                                                "discharge_dttm","age_at_admission", "admission_type_category", "discharge_category"]].copy()
    hospitalization_filtered['admission_dttm'] = pd.to_datetime(hospitalization_filtered['admission_dttm'])
    hospitalization_filtered['discharge_dttm'] = pd.to_datetime(hospitalization_filtered['discharge_dttm'])

    hosp_adt_join = pd.merge(hospitalization_filtered[["patient_id","hospitalization_id","age_at_admission","admission_type_category",
                                                       "admission_dttm","discharge_dttm",
                                                        "discharge_category"]], 
                      adt[["hospitalization_id","in_dttm","out_dttm","location_category","hospital_id"]],
                 on="hospitalization_id",how="left")

    hospital_cat = hosp_adt_join[["hospitalization_id","in_dttm","out_dttm","hospital_id"]]

    # Step 1: Sort by patient_id and admission_dttm
    hospital_block = hosp_adt_join[["patient_id","hospitalization_id","admission_dttm","discharge_dttm", "age_at_admission",  "discharge_category", "admission_type_category"]]
    hospital_block = hospital_block.drop_duplicates()
    hospital_block = hospital_block.sort_values(by=["patient_id", "admission_dttm"]).reset_index(drop=True)
    hospital_block = hospital_block[["patient_id","hospitalization_id","admission_dttm","discharge_dttm", "age_at_admission",  "discharge_category", "admission_type_category"]]

    # Step 2: Calculate time between discharge and next admission
    hospital_block["next_admission_dttm"] = hospital_block.groupby("patient_id")["admission_dttm"].shift(-1)
    hospital_block["discharge_to_next_admission_hrs"] = (
        (hospital_block["next_admission_dttm"] - hospital_block["discharge_dttm"]).dt.total_seconds() / 3600
    )

    # Step 3: Create linked column based on time_interval
    eps = 1e-6  # tiny tolerance for float rounding
    hospital_block["linked_hrs"] = (
        hospital_block["discharge_to_next_admission_hrs"].le(time_interval + eps).fillna(False)
    )

    # Sort values to ensure correct order
    hospital_block = hospital_block.sort_values(by=["patient_id", "admission_dttm"]).reset_index(drop=True)

    # Initialize encounter_block with row indices + 1
    hospital_block['encounter_block'] = hospital_block.index + 1

    # Iteratively propagate the encounter_block values
    while True:
      shifted = hospital_block['encounter_block'].shift(-1)
      mask = hospital_block['linked_hrs'] & (hospital_block['patient_id'] == hospital_block['patient_id'].shift(-1))
      old_values = hospital_block['encounter_block'].copy()
      hospital_block.loc[mask, 'encounter_block'] = shifted[mask]
      if hospital_block['encounter_block'].equals(old_values):
          break

    hospital_block['encounter_block'] = hospital_block['encounter_block'].bfill().astype('int32')
    hospital_block = pd.merge(hospital_block,hospital_cat,how="left",on="hospitalization_id")
    hospital_block = hospital_block.sort_values(by=["patient_id", "admission_dttm","in_dttm","out_dttm"]).reset_index(drop=True)
    hospital_block = hospital_block.drop_duplicates()

    hospital_block2 = hospital_block.groupby(['patient_id','encounter_block']).agg(
        admission_dttm=pd.NamedAgg(column='admission_dttm', aggfunc='min'),
        discharge_dttm=pd.NamedAgg(column='discharge_dttm', aggfunc='max'),
        admission_type_category=pd.NamedAgg(column='admission_type_category', aggfunc='first'),
        discharge_category=pd.NamedAgg(column='discharge_category', aggfunc='last'),
        hospital_id = pd.NamedAgg(column='hospital_id', aggfunc='last'),
        age_at_admission=pd.NamedAgg(column='age_at_admission', aggfunc='last'),
        list_hospitalization_id=pd.NamedAgg(column='hospitalization_id', aggfunc=lambda x: sorted(x.unique()))
    ).reset_index()

    df = pd.merge(hospital_block[["patient_id",
                                  "hospitalization_id",
                                  "encounter_block"]].drop_duplicates(),
             hosp_adt_join[["hospitalization_id","location_category","in_dttm","out_dttm"]], on="hospitalization_id",how="left")

    df = pd.merge(df,hospital_block2[["encounter_block",
                                      "admission_dttm",
                                      "discharge_dttm",
                                      "discharge_category",
                                      "admission_type_category",
                                      "age_at_admission",
                                      "hospital_id",
                                     "list_hospitalization_id"]],on="encounter_block",how="left")
    df = df.drop_duplicates(subset=["patient_id","encounter_block","in_dttm","out_dttm","location_category"])

    # Create the mapping DataFrame
    encounter_mapping = hospital_block[["hospitalization_id", "encounter_block"]].drop_duplicates()

    # Create hospitalization_stitched DataFrame
    hospitalization_stitched = hospitalization.merge(
        encounter_mapping, 
        on="hospitalization_id", 
        how="left"
    )

    # Create adt_stitched DataFrame  
    adt_stitched = adt.merge(
        encounter_mapping,
        on="hospitalization_id",
        how="left"
    )

    return hospitalization_stitched, adt_stitched, encounter_mapping

Wide Dataset Creation

Transform CLIF tables into wide format for analysis, with automatic pivoting and high-performance processing.

clifpy.utils.wide_dataset.create_wide_dataset

create_wide_dataset(clif_instance, optional_tables=None, category_filters=None, sample=False, hospitalization_ids=None, cohort_df=None, output_format='dataframe', save_to_data_location=False, output_filename=None, return_dataframe=True, base_table_columns=None, batch_size=1000, memory_limit=None, threads=None, show_progress=True)

Create a wide dataset by joining multiple CLIF tables with pivoting support.

Parameters:

Name Type Description Default
clif_instance

CLIF object with loaded data

required
optional_tables List[str]

DEPRECATED - use category_filters to specify tables

None
category_filters Dict[str, List[str]]

Dict specifying filtering/selection for each table. Behavior differs by table type:

PIVOT TABLES (narrow to wide conversion): - Values are category values to filter and pivot into columns - Example: {'vitals': ['heart_rate', 'sbp', 'spo2'], 'labs': ['hemoglobin', 'sodium', 'creatinine']} - Acceptable values come from the category column's permissible values defined in each table's schema file (clifpy/schemas/*_schema.yaml)

WIDE TABLES (already in wide format): - Values are column names to keep from the table - Example: {'respiratory_support': ['device_category', 'fio2_set', 'peep_set']} - Acceptable values are any column names from the table schema

Supported tables and their types are defined in: clifpy/schemas/wide_tables_config.yaml

Table presence in this dict determines if it will be loaded. For complete lists of acceptable category values, see: - Table schemas: clifpy/schemas/*_schema.yaml - Wide dataset config: clifpy/schemas/wide_tables_config.yaml

None
sample bool

if True, randomly select 20 hospitalizations

False
hospitalization_ids List[str]

List of specific hospitalization IDs to filter

None
cohort_df DataFrame

DataFrame with columns ['hospitalization_id', 'start_time', 'end_time'] If provided, data will be filtered to only include events within the specified time windows for each hospitalization

None
output_format str

'dataframe', 'csv', or 'parquet'

'dataframe'
save_to_data_location bool

save output to data directory

False
output_filename str

Custom filename (default: 'wide_dataset_YYYYMMDD_HHMMSS')

None
return_dataframe bool

return DataFrame even when saving to file

True
base_table_columns Dict[str, List[str]]

DEPRECATED - columns are selected automatically

None
batch_size int

Number of hospitalizations to process in each batch

1000
memory_limit str

DuckDB memory limit (e.g., '8GB')

None
threads int

Number of threads for DuckDB to use

None
show_progress bool

Show progress bars for long operations

True

Returns:

Type Description
DataFrame or None

DataFrame if return_dataframe=True, None otherwise

Source code in clifpy/utils/wide_dataset.py
def create_wide_dataset(
    clif_instance,
    optional_tables: Optional[List[str]] = None,
    category_filters: Optional[Dict[str, List[str]]] = None,
    sample: bool = False,
    hospitalization_ids: Optional[List[str]] = None,
    cohort_df: Optional[pd.DataFrame] = None,
    output_format: str = 'dataframe',
    save_to_data_location: bool = False,
    output_filename: Optional[str] = None,
    return_dataframe: bool = True,
    base_table_columns: Optional[Dict[str, List[str]]] = None,
    batch_size: int = 1000,
    memory_limit: Optional[str] = None,
    threads: Optional[int] = None,
    show_progress: bool = True
) -> Optional[pd.DataFrame]:
    """
    Create a wide dataset by joining multiple CLIF tables with pivoting support.

    Parameters
    ----------
    clif_instance
        CLIF object with loaded data
    optional_tables : List[str], optional
        DEPRECATED - use category_filters to specify tables
    category_filters : Dict[str, List[str]], optional
        Dict specifying filtering/selection for each table. Behavior differs by table type:

        **PIVOT TABLES** (narrow to wide conversion):
        - Values are **category values** to filter and pivot into columns
        - Example: {'vitals': ['heart_rate', 'sbp', 'spo2'],
                    'labs': ['hemoglobin', 'sodium', 'creatinine']}
        - Acceptable values come from the category column's permissible values
          defined in each table's schema file (clifpy/schemas/*_schema.yaml)

        **WIDE TABLES** (already in wide format):
        - Values are **column names** to keep from the table
        - Example: {'respiratory_support': ['device_category', 'fio2_set', 'peep_set']}
        - Acceptable values are any column names from the table schema

        **Supported tables and their types are defined in:**
        clifpy/schemas/wide_tables_config.yaml

        Table presence in this dict determines if it will be loaded.
        For complete lists of acceptable category values, see:
        - Table schemas: clifpy/schemas/*_schema.yaml
        - Wide dataset config: clifpy/schemas/wide_tables_config.yaml
    sample : bool, default=False
        if True, randomly select 20 hospitalizations
    hospitalization_ids : List[str], optional
        List of specific hospitalization IDs to filter
    cohort_df : pd.DataFrame, optional
        DataFrame with columns ['hospitalization_id', 'start_time', 'end_time']
        If provided, data will be filtered to only include events within the specified
        time windows for each hospitalization
    output_format : str, default='dataframe'
        'dataframe', 'csv', or 'parquet'
    save_to_data_location : bool, default=False
        save output to data directory
    output_filename : str, optional
        Custom filename (default: 'wide_dataset_YYYYMMDD_HHMMSS')
    return_dataframe : bool, default=True
        return DataFrame even when saving to file
    base_table_columns : Dict[str, List[str]], optional
        DEPRECATED - columns are selected automatically
    batch_size : int, default=1000
        Number of hospitalizations to process in each batch
    memory_limit : str, optional
        DuckDB memory limit (e.g., '8GB')
    threads : int, optional
        Number of threads for DuckDB to use
    show_progress : bool, default=True
        Show progress bars for long operations

    Returns
    -------
    pd.DataFrame or None
        DataFrame if return_dataframe=True, None otherwise
    """


    logger.info("Phase 4: Wide Dataset Processing (utility function)")
    logger.debug("  4.1: Starting wide dataset creation")

    # Validate cohort_df if provided
    if cohort_df is not None:
        required_cols = ['hospitalization_id', 'start_time', 'end_time']
        missing_cols = [col for col in required_cols if col not in cohort_df.columns]
        if missing_cols:
            raise ValueError(f"cohort_df must contain columns: {required_cols}. Missing: {missing_cols}")

        # Ensure hospitalization_id is string type to match with other tables
        cohort_df['hospitalization_id'] = cohort_df['hospitalization_id'].astype(str)

        # Ensure time columns are datetime
        for time_col in ['start_time', 'end_time']:
            if not pd.api.types.is_datetime64_any_dtype(cohort_df[time_col]):
                cohort_df[time_col] = pd.to_datetime(cohort_df[time_col])

        logger.info("  === SPECIAL: COHORT TIME WINDOW FILTERING ===")
        logger.info(f"       - Processing {len(cohort_df)} hospitalizations with time windows")
        logger.debug(f"       - Ensuring datetime types for start_time, end_time")

    # Get table types from config
    PIVOT_TABLES = _get_supported_tables(table_type='pivot')
    WIDE_TABLES = _get_supported_tables(table_type='wide')

    # Determine which tables to load from category_filters
    if category_filters is None:
        category_filters = {}

    # For backward compatibility with optional_tables
    if optional_tables and not category_filters:
        logger.warning("optional_tables parameter is deprecated. Converting to category_filters format")
        category_filters = {table: [] for table in optional_tables}

    tables_to_load = list(category_filters.keys())

    # Create DuckDB connection with optimized settings
    conn_config = {
        'preserve_insertion_order': 'false'
    }

    if memory_limit:
        conn_config['memory_limit'] = memory_limit
    if threads:
        conn_config['threads'] = str(threads)

    # Use context manager for connection
    with duckdb.connect(':memory:', config=conn_config) as conn:
        # Preserve timezone from clif_instance configuration
        conn.execute(f"SET timezone = '{clif_instance.timezone}'")
        # Set additional optimization settings
        conn.execute("SET preserve_insertion_order = false")

        # Get hospitalization IDs to process
        hospitalization_df = clif_instance.hospitalization.df.copy()

        if hospitalization_ids is not None:
            logger.info(f"Filtering to specific hospitalization IDs: {len(hospitalization_ids)} encounters")
            required_ids = hospitalization_ids
        elif cohort_df is not None:
            # Use hospitalization IDs from cohort_df
            required_ids = cohort_df['hospitalization_id'].unique().tolist()
            logger.info(f"Using {len(required_ids)} hospitalization IDs from cohort_df")
        elif sample:
            logger.info("Sampling 20 random hospitalizations")
            all_ids = hospitalization_df['hospitalization_id'].unique()
            required_ids = np.random.choice(all_ids, size=min(20, len(all_ids)), replace=False).tolist()
            logger.info(f"Selected {len(required_ids)} hospitalizations for sampling")
        else:
            required_ids = hospitalization_df['hospitalization_id'].unique().tolist()
            logger.info(f"Processing all {len(required_ids)} hospitalizations")

        # Filter all base tables by required IDs immediately
        logger.info("Loading and filtering base tables")
        # Only keep required columns from hospitalization table
        hosp_required_cols = ['hospitalization_id', 'patient_id', 'age_at_admission']
        hosp_available_cols = [col for col in hosp_required_cols if col in hospitalization_df.columns]
        hospitalization_df = hospitalization_df[hosp_available_cols]
        hospitalization_df = hospitalization_df[hospitalization_df['hospitalization_id'].isin(required_ids)]
        patient_df = clif_instance.patient.df[['patient_id']].copy()

        # Get ADT with selected columns
        adt_df = clif_instance.adt.df.copy()
        adt_df = adt_df[adt_df['hospitalization_id'].isin(required_ids)]

        # Apply time filtering to ADT if cohort_df is provided
        if cohort_df is not None and 'in_dttm' in adt_df.columns:
            pre_filter_count = len(adt_df)
            # Merge with cohort_df to get time windows
            adt_df = pd.merge(
                adt_df,
                cohort_df[['hospitalization_id', 'start_time', 'end_time']],
                on='hospitalization_id',
                how='inner'
            )

            # Ensure in_dttm column is datetime
            if not pd.api.types.is_datetime64_any_dtype(adt_df['in_dttm']):
                adt_df['in_dttm'] = pd.to_datetime(adt_df['in_dttm'])

            # Filter to time window
            adt_df = adt_df[
                (adt_df['in_dttm'] >= adt_df['start_time']) &
                (adt_df['in_dttm'] <= adt_df['end_time'])
            ].copy()

            # Drop the time window columns
            adt_df = adt_df.drop(columns=['start_time', 'end_time'])

            logger.info(f"  ADT time filtering: {pre_filter_count}{len(adt_df)} records")

        # Remove duplicate columns and _name columns
        adt_cols = [col for col in adt_df.columns if not col.endswith('_name') and col != 'patient_id']
        adt_df = adt_df[adt_cols]

        logger.info(f"       - Base tables filtered - Hospitalization: {len(hospitalization_df)}, Patient: {len(patient_df)}, ADT: {len(adt_df)}")

        logger.info("  4.2: Determining processing mode")
        # Process in batches to avoid memory issues
        if batch_size > 0 and len(required_ids) > batch_size:
            logger.info(f"       - Batch mode: {len(required_ids)} hospitalizations in {len(required_ids)//batch_size + 1} batches of {batch_size}")
            logger.info("  4.B: === BATCH PROCESSING MODE ===")
            return _process_in_batches(
                conn, clif_instance, required_ids, patient_df, hospitalization_df, adt_df,
                tables_to_load, category_filters, PIVOT_TABLES, WIDE_TABLES,
                batch_size, show_progress, save_to_data_location, output_filename,
                output_format, return_dataframe, cohort_df
            )
        else:
            logger.info(f"       - Single mode: Processing all {len(required_ids)} hospitalizations at once")
            logger.info("  4.S: === SINGLE PROCESSING MODE ===")
            # Process all at once for small datasets
            return _process_hospitalizations(
                conn, clif_instance, required_ids, patient_df, hospitalization_df, adt_df,
                tables_to_load, category_filters, PIVOT_TABLES, WIDE_TABLES,
                show_progress, cohort_df
            )

clifpy.utils.wide_dataset.convert_wide_to_hourly

convert_wide_to_hourly(wide_df, aggregation_config, id_name='hospitalization_id', hourly_window=1, fill_gaps=False, memory_limit='4GB', temp_directory=None, batch_size=None, timezone='UTC')

Convert a wide dataset to temporal aggregation with user-defined aggregation methods.

This function uses DuckDB for high-performance aggregation with event-based windowing.

Parameters:

Name Type Description Default
wide_df DataFrame

Wide dataset DataFrame from create_wide_dataset()

required
aggregation_config Dict[str, List[str]]

Dict mapping aggregation methods to list of columns Example: { 'max': ['map', 'temp_c', 'sbp'], 'mean': ['heart_rate', 'respiratory_rate'], 'min': ['spo2'], 'median': ['glucose'], 'first': ['gcs_total', 'rass'], 'last': ['assessment_value'], 'boolean': ['norepinephrine', 'propofol'], 'one_hot_encode': ['medication_name', 'assessment_category'] }

required
id_name str

Column name to use for grouping aggregation - 'hospitalization_id': Group by individual hospitalizations (default) - 'encounter_block': Group by encounter blocks (after encounter stitching) - Any other ID column present in the wide dataset

'hospitalization_id'
hourly_window int

Time window for aggregation in hours (1-72).

Windows are event-based (relative to each group's first event): - Window 0: [first_event, first_event + hourly_window hours) - Window 1: [first_event + hourly_window, first_event + 2hourly_window) - Window N: [first_event + Nhourly_window, ...)

Common values: 1 (hourly), 2 (bi-hourly), 6 (quarter-day), 12 (half-day), 24 (daily), 72 (3-day - maximum)

1
fill_gaps bool

Whether to create rows for time windows with no data.

  • False (default): Sparse output - only windows with actual data appear
  • True: Dense output - create ALL windows from 0 to max_window per group, filling gaps with NaN values (no forward-filling)

Example with events at window 0, 1, 5: - fill_gaps=False: Output has 3 rows (windows 0, 1, 5) - fill_gaps=True: Output has 6 rows (windows 0, 1, 2, 3, 4, 5) Windows 2, 3, 4 have NaN for all aggregated columns

False
memory_limit str

DuckDB memory limit (e.g., '4GB', '8GB')

'4GB'
temp_directory str

Directory for temporary files (default: system temp)

None
batch_size int

Process in batches if dataset is large (auto-determined if None)

None
timezone str

Timezone for datetime operations in DuckDB (e.g., 'UTC', 'America/New_York')

'UTC'

Returns:

Type Description
DataFrame

Aggregated dataset with columns:

Group & Window Identifiers: - {id_name}: Group identifier (hospitalization_id or encounter_block) - window_number: Sequential window index (0-indexed, starts at 0 for each group) - window_start_dttm: Window start timestamp (inclusive) - window_end_dttm: Window end timestamp (exclusive)

Context Columns: - patient_id: Patient identifier - day_number: Day number within hospitalization

Aggregated Columns: - All columns specified in aggregation_config with appropriate suffixes (_max, _min, _mean, _median, _first, _last, _boolean, one-hot encoded)

Notes: - Windows are relative to each group's first event, not calendar boundaries - window_end_dttm - window_start_dttm = hourly_window hours (always) - When fill_gaps=True, gap windows contain NaN (not forward-filled) - When fill_gaps=False, only windows with data appear (sparse output)

Source code in clifpy/utils/wide_dataset.py
def convert_wide_to_hourly(
    wide_df: pd.DataFrame,
    aggregation_config: Dict[str, List[str]],
    id_name: str = 'hospitalization_id',
    hourly_window: int = 1,
    fill_gaps: bool = False,
    memory_limit: str = '4GB',
    temp_directory: Optional[str] = None,
    batch_size: Optional[int] = None,
    timezone: str = 'UTC'
) -> pd.DataFrame:
    """
    Convert a wide dataset to temporal aggregation with user-defined aggregation methods.

    This function uses DuckDB for high-performance aggregation with event-based windowing.

    Parameters
    ----------
    wide_df : pd.DataFrame
        Wide dataset DataFrame from create_wide_dataset()
    aggregation_config : Dict[str, List[str]]
        Dict mapping aggregation methods to list of columns
        Example: {
            'max': ['map', 'temp_c', 'sbp'],
            'mean': ['heart_rate', 'respiratory_rate'],
            'min': ['spo2'],
            'median': ['glucose'],
            'first': ['gcs_total', 'rass'],
            'last': ['assessment_value'],
            'boolean': ['norepinephrine', 'propofol'],
            'one_hot_encode': ['medication_name', 'assessment_category']
        }
    id_name : str, default='hospitalization_id'
        Column name to use for grouping aggregation
        - 'hospitalization_id': Group by individual hospitalizations (default)
        - 'encounter_block': Group by encounter blocks (after encounter stitching)
        - Any other ID column present in the wide dataset
    hourly_window : int, default=1
        Time window for aggregation in hours (1-72).

        Windows are event-based (relative to each group's first event):
        - Window 0: [first_event, first_event + hourly_window hours)
        - Window 1: [first_event + hourly_window, first_event + 2*hourly_window)
        - Window N: [first_event + N*hourly_window, ...)

        Common values: 1 (hourly), 2 (bi-hourly), 6 (quarter-day), 12 (half-day),
                       24 (daily), 72 (3-day - maximum)
    fill_gaps : bool, default=False
        Whether to create rows for time windows with no data.

        - False (default): Sparse output - only windows with actual data appear
        - True: Dense output - create ALL windows from 0 to max_window per group,
                filling gaps with NaN values (no forward-filling)

        Example with events at window 0, 1, 5:
        - fill_gaps=False: Output has 3 rows (windows 0, 1, 5)
        - fill_gaps=True: Output has 6 rows (windows 0, 1, 2, 3, 4, 5)
                          Windows 2, 3, 4 have NaN for all aggregated columns
    memory_limit : str, default='4GB'
        DuckDB memory limit (e.g., '4GB', '8GB')
    temp_directory : str, optional
        Directory for temporary files (default: system temp)
    batch_size : int, optional
        Process in batches if dataset is large (auto-determined if None)
    timezone : str, default='UTC'
        Timezone for datetime operations in DuckDB (e.g., 'UTC', 'America/New_York')

    Returns
    -------
    pd.DataFrame
        Aggregated dataset with columns:

        **Group & Window Identifiers:**
        - {id_name}: Group identifier (hospitalization_id or encounter_block)
        - window_number: Sequential window index (0-indexed, starts at 0 for each group)
        - window_start_dttm: Window start timestamp (inclusive)
        - window_end_dttm: Window end timestamp (exclusive)

        **Context Columns:**
        - patient_id: Patient identifier
        - day_number: Day number within hospitalization

        **Aggregated Columns:**
        - All columns specified in aggregation_config with appropriate suffixes
          (_max, _min, _mean, _median, _first, _last, _boolean, one-hot encoded)

        **Notes:**
        - Windows are relative to each group's first event, not calendar boundaries
        - window_end_dttm - window_start_dttm = hourly_window hours (always)
        - When fill_gaps=True, gap windows contain NaN (not forward-filled)
        - When fill_gaps=False, only windows with data appear (sparse output)
    """

    # Validate hourly_window parameter
    if not isinstance(hourly_window, int):
        raise ValueError(f"hourly_window must be an integer, got: {type(hourly_window).__name__}")
    if hourly_window < 1 or hourly_window > 72:
        raise ValueError(f"hourly_window must be between 1 and 72 hours, got: {hourly_window}")

    # Validate fill_gaps parameter
    if not isinstance(fill_gaps, bool):
        raise ValueError(f"fill_gaps must be a boolean, got: {type(fill_gaps).__name__}")

    # Strip timezone from datetime columns (no conversion, just remove tz metadata)
    wide_df = wide_df.copy()
    for col in wide_df.columns:
        if pd.api.types.is_datetime64_any_dtype(wide_df[col]):
            if hasattr(wide_df[col].dtype, 'tz') and wide_df[col].dtype.tz is not None:
                wide_df[col] = wide_df[col].dt.tz_localize(None)

    # Update log statements
    window_label = "hourly" if hourly_window == 1 else f"{hourly_window}-hour"
    gap_handling = "with gap filling" if fill_gaps else "sparse (no gap filling)"
    logger.info(f"Starting optimized {window_label} aggregation using DuckDB {gap_handling}")
    logger.info(f"Input dataset shape: {wide_df.shape}")
    logger.debug(f"Memory limit: {memory_limit}")
    logger.debug(f"Aggregation window: {hourly_window} hour(s)")
    logger.debug(f"Gap filling: {'enabled' if fill_gaps else 'disabled'}")

    # Validate input
    required_columns = ['event_time', id_name, 'day_number']
    for col in required_columns:
        if col not in wide_df.columns:
            raise ValueError(f"wide_df must contain '{col}' column")

    # Auto-determine batch size for very large datasets
    if batch_size is None:
        n_rows = len(wide_df)
        n_ids = wide_df[id_name].nunique()

        # Use batching if dataset is very large
        if n_rows > 1_000_000 or n_ids > 10_000:
            batch_size = min(5000, n_ids // 4)
            logger.info(f"Large dataset detected ({n_rows:,} rows, {n_ids:,} {id_name}s)")
            logger.info(f"Will process in batches of {batch_size} {id_name}s")
        else:
            batch_size = 0  # Process all at once

    # Configure DuckDB connection
    config = {
        'memory_limit': memory_limit,
        'temp_directory': temp_directory or '/tmp/duckdb_temp',
        'preserve_insertion_order': 'false',
        'threads': '4'
    }

    # Remove None values from config
    config = {k: v for k, v in config.items() if v is not None}

    try:
        # Create DuckDB connection with error handling
        with duckdb.connect(':memory:', config=config) as conn:
            # Use timezone from parameter (passed from orchestrator)
            conn.execute(f"SET timezone = '{timezone}'")
            # Set additional optimization settings
            conn.execute("SET preserve_insertion_order = false")

            if batch_size > 0:
                return _process_hourly_in_batches(conn, wide_df, aggregation_config, id_name, batch_size, hourly_window, fill_gaps)
            else:
                return _process_hourly_single_batch(conn, wide_df, aggregation_config, id_name, hourly_window, fill_gaps)

    except Exception as e:
        logger.error(f"DuckDB processing failed: {str(e)}")
        raise

Respiratory Support Processing

Waterfall Processing

Apply sophisticated data cleaning and imputation to respiratory support data for complete ventilator timelines.

clifpy.utils.waterfall.process_resp_support_waterfall

process_resp_support_waterfall(resp_support, *, id_col='hospitalization_id', bfill=False, verbose=True)

Clean + waterfall-fill the CLIF resp_support table (Python port of Nick's reference R pipeline).

Parameters:

Name Type Description Default
resp_support DataFrame

Raw CLIF respiratory-support table already in UTC.

required
id_col str

Encounter-level identifier column.

``"hospitalization_id"``
bfill bool

If True, numeric setters are back-filled after forward-fill. If False (default) only forward-fill is used.

``False``
verbose bool

Prints progress banners when True.

``True``

Returns:

Type Description
DataFrame

Fully processed table with

  • hourly scaffold rows (HH:59:59) inserted,
  • device / mode heuristics applied,
  • hierarchical episode IDs (device_cat_id → …),
  • numeric waterfall fill inside each mode_name_id block (forward-only or bi-directional per bfill),
  • tracheostomy flag forward-filled,
  • one unique row per (id_col, recorded_dttm) in chronological order.
Notes

The function does not change time-zones; convert before calling if needed.

Source code in clifpy/utils/waterfall.py
def process_resp_support_waterfall(
    resp_support: pd.DataFrame,
    *,
    id_col: str = "hospitalization_id",
    bfill: bool = False,                
    verbose: bool = True,
) -> pd.DataFrame:
    """
    Clean + waterfall-fill the CLIF **`resp_support`** table
    (Python port of Nick's reference R pipeline).

    Parameters
    ----------
    resp_support : pd.DataFrame
        Raw CLIF respiratory-support table **already in UTC**.
    id_col : str, default ``"hospitalization_id"``
        Encounter-level identifier column.
    bfill : bool, default ``False``
        If *True*, numeric setters are back-filled after forward-fill.
        If *False* (default) only forward-fill is used.
    verbose : bool, default ``True``
        Prints progress banners when *True*.

    Returns
    -------
    pd.DataFrame
        Fully processed table with

        * hourly scaffold rows (``HH:59:59``) inserted,
        * device / mode heuristics applied,
        * hierarchical episode IDs (``device_cat_id → …``),
        * numeric waterfall fill inside each ``mode_name_id`` block
          (forward-only or bi-directional per *bfill*),
        * tracheostomy flag forward-filled,
        * one unique row per ``(id_col, recorded_dttm)`` in
          chronological order.

    Notes
    -----
    The function **does not** change time-zones; convert before
    calling if needed.
    """

    p = print if verbose else (lambda *_, **__: None)

    # ------------------------------------------------------------------ #
    # Helper: forward-fill only or forward + back depending on flag      #
    # ------------------------------------------------------------------ #
    def fb(obj):
        if isinstance(obj, (pd.DataFrame, pd.Series)):
            return obj.ffill().bfill() if bfill else obj.ffill()
        raise TypeError("obj must be a pandas DataFrame or Series")

    # ------------------------------------------------------------------ #
    # Small helper to build the hourly scaffold                          #
    #   - tries DuckDB (fast), falls back to pandas                      #
    # ------------------------------------------------------------------ #
    def _build_hourly_scaffold(rs: pd.DataFrame) -> pd.DataFrame:
        # Try DuckDB first
        try:
            # local import so package doesn't hard-depend on it
            if verbose:
                p("  • Building hourly scaffold via DuckDB")

            con = duckdb.connect()
            # Only need id + timestamps for bounds
            con.register("rs", rs[[id_col, "recorded_dttm"]].dropna(subset=["recorded_dttm"]))

            # Generate hourly series from floor(min) to floor(max), then add :59:59
            sql = f"""
            WITH bounds AS (
              SELECT
                {id_col} AS id,
                date_trunc('hour', MIN(recorded_dttm)) AS tmin_h,
                date_trunc('hour', MAX(recorded_dttm)) AS tmax_h
              FROM rs
              GROUP BY 1
            ),
            hour_sequence AS (
              SELECT
                b.id AS {id_col},
                gs.ts + INTERVAL '59 minutes 59 seconds' AS recorded_dttm
              FROM bounds b,
                   LATERAL generate_series(b.tmin_h, b.tmax_h, INTERVAL 1 HOUR) AS gs(ts)
            )
            SELECT {id_col}, recorded_dttm
            FROM hour_sequence
            ORDER BY {id_col}, recorded_dttm
            """
            scaffold = con.execute(sql).df()
            con.close()

            # Ensure pandas datetime with UTC if input was tz-aware
            # (function contract says already UTC; this keeps dtype consistent)
            scaffold["recorded_dttm"] = pd.to_datetime(scaffold["recorded_dttm"], utc=True, errors="coerce")
            scaffold["recorded_date"] = scaffold["recorded_dttm"].dt.date
            scaffold["recorded_hour"] = scaffold["recorded_dttm"].dt.hour
            scaffold["is_scaffold"]   = True
            return scaffold

        except Exception as e:
            if verbose:
                p(f"  • DuckDB scaffold unavailable ({type(e).__name__}: {e}). Falling back to pandas...")
            # ---- Original pandas scaffold (ground truth) ----
            rs_copy = rs.copy()
            rs_copy["recorded_date"] = rs_copy["recorded_dttm"].dt.date
            rs_copy["recorded_hour"] = rs_copy["recorded_dttm"].dt.hour

            min_max = rs_copy.groupby(id_col)["recorded_dttm"].agg(["min", "max"]).reset_index()
            tqdm.pandas(disable=not verbose, desc="Creating hourly scaffolds")
            scaffold = (
                min_max.progress_apply(
                    lambda r: pd.date_range(
                        r["min"].floor("h"),
                        r["max"].floor("h"),
                        freq="1h", tz="UTC"
                    ),
                    axis=1,
                )
                .explode()
                .rename("recorded_dttm")
            )
            scaffold = (
                min_max[[id_col]].join(scaffold)
                .assign(recorded_dttm=lambda d: d["recorded_dttm"].dt.floor("h")
                                               + pd.Timedelta(minutes=59, seconds=59))
            )
            scaffold["recorded_date"] = scaffold["recorded_dttm"].dt.date
            scaffold["recorded_hour"] = scaffold["recorded_dttm"].dt.hour
            scaffold["is_scaffold"]   = True
            return scaffold

    # ------------------------------------------------------------------ #
    # Phase 0 – set-up & hourly scaffold                                 #
    # ------------------------------------------------------------------ #
    p("✦ Phase 0: initialise & create hourly scaffold")
    rs = resp_support.copy()

    # Lower-case categorical strings
    for c in ["device_category", "device_name", "mode_category", "mode_name"]:
        if c in rs.columns:
            rs[c] = rs[c].str.lower()

    # Numeric coercion
    num_cols = [
        "tracheostomy", "fio2_set", "lpm_set", "peep_set",
        "tidal_volume_set", "resp_rate_set", "resp_rate_obs",
        "pressure_support_set", "peak_inspiratory_pressure_set",
    ]
    num_cols = [c for c in num_cols if c in rs.columns]
    if num_cols:
        rs[num_cols] = rs[num_cols].apply(pd.to_numeric, errors="coerce")

    # FiO₂ scaling if documented 40 → 0.40
    if "fio2_set" in rs.columns:
        fio2_mean = rs["fio2_set"].mean(skipna=True)
        if pd.notna(fio2_mean) and fio2_mean > 1.0:
            rs.loc[rs["fio2_set"] > 1, "fio2_set"] /= 100
            p("  • Scaled FiO₂ values > 1 down by /100")

    # Build hourly scaffold (DuckDB if available, else pandas)
    scaffold = _build_hourly_scaffold(rs)
    if verbose:
        p(f"  • Scaffold rows created: {len(scaffold):,}")

    # We keep recorded_date/hour on rs only for temporary ops below
    rs["recorded_date"] = rs["recorded_dttm"].dt.date
    rs["recorded_hour"] = rs["recorded_dttm"].dt.hour

    # ------------------------------------------------------------------ #
    # Phase 1 – heuristic device / mode inference                        #
    # ------------------------------------------------------------------ #
    p("✦ Phase 1: heuristic inference of device & mode")

    # Most-frequent fall-back labels
    device_counts = rs[["device_name", "device_category"]].value_counts().reset_index()

    imv_devices = device_counts.loc[device_counts["device_category"] == "imv", "device_name"]
    most_common_imv_name = imv_devices.iloc[0] if len(imv_devices) > 0 else "ventilator"

    nippv_devices = device_counts.loc[device_counts["device_category"] == "nippv", "device_name"]
    most_common_nippv_name = nippv_devices.iloc[0] if len(nippv_devices) > 0 else "bipap"

    mode_counts = rs[["mode_name", "mode_category"]].value_counts().reset_index()
    cmv_modes = mode_counts.loc[
        mode_counts["mode_category"] == "assist control-volume control", "mode_name"
    ]
    most_common_cmv_name = cmv_modes.iloc[0] if len(cmv_modes) > 0 else "AC/VC"

    # --- 1-a IMV from mode_category
    mask = (
        rs["device_category"].isna() & rs["device_name"].isna()
        & rs["mode_category"].str.contains(
            r"(?:assist control-volume control|simv|pressure control)", na=False, regex=True
            )
    )
    rs.loc[mask, ["device_category", "device_name"]] = ["imv", most_common_imv_name]

    # --- 1-b IMV look-behind/ahead
    rs = rs.sort_values([id_col, "recorded_dttm"])
    prev_cat = rs.groupby(id_col)["device_category"].shift()
    next_cat = rs.groupby(id_col)["device_category"].shift(-1)
    imv_like = (
        rs["device_category"].isna()
        & ((prev_cat == "imv") | (next_cat == "imv"))
        & rs["peep_set"].gt(1) & rs["resp_rate_set"].gt(1) & rs["tidal_volume_set"].gt(1)
    )
    rs.loc[imv_like, ["device_category", "device_name"]] = ["imv", most_common_imv_name]

    # --- 1-c NIPPV heuristics
    prev_cat = rs.groupby(id_col)["device_category"].shift()
    next_cat = rs.groupby(id_col)["device_category"].shift(-1)
    nippv_like = (
        rs["device_category"].isna()
        & ((prev_cat == "nippv") | (next_cat == "nippv"))
        & rs["peak_inspiratory_pressure_set"].gt(1)
        & rs["pressure_support_set"].gt(1)
    )
    rs.loc[nippv_like, "device_category"] = "nippv"
    rs.loc[nippv_like & rs["device_name"].isna(), "device_name"] = most_common_nippv_name

    # --- 1-d Clean duplicates & empty rows
    rs = rs.sort_values([id_col, "recorded_dttm"])
    rs["dup_count"] = rs.groupby([id_col, "recorded_dttm"])["recorded_dttm"].transform("size")
    rs = rs[~((rs["dup_count"] > 1) & (rs["device_category"] == "nippv"))]
    rs["dup_count"] = rs.groupby([id_col, "recorded_dttm"])["recorded_dttm"].transform("size")
    rs = rs[~((rs["dup_count"] > 1) & rs["device_category"].isna())].drop(columns="dup_count")

    # --- 1-e Guard: nasal-cannula rows must never carry PEEP
    if "peep_set" in rs.columns:
        mask_bad_nc = (rs["device_category"] == "nasal cannula") & rs["peep_set"].gt(0)
        if mask_bad_nc.any():
            rs.loc[mask_bad_nc, "device_category"] = np.nan
            p(f"{mask_bad_nc.sum():,} rows had PEEP>0 on nasal cannula device_category reset")

    # Drop rows with nothing useful
    all_na_cols = [
        "device_category", "device_name", "mode_category", "mode_name",
        "tracheostomy", "fio2_set", "lpm_set", "peep_set", "tidal_volume_set",
        "resp_rate_set", "resp_rate_obs", "pressure_support_set",
        "peak_inspiratory_pressure_set",
    ]
    rs = rs.dropna(subset=[c for c in all_na_cols if c in rs.columns], how="all")

    # Unique per timestamp
    rs = rs.drop_duplicates(subset=[id_col, "recorded_dttm"], keep="first")

    # Merge scaffold (exactly like original)
    rs["is_scaffold"] = False
    rs = pd.concat([rs, scaffold], ignore_index=True).sort_values(
        [id_col, "recorded_dttm", "recorded_date", "recorded_hour"]
    )

    # ------------------------------------------------------------------ #
    # Phase 2 – hierarchical IDs                                         #
    # ------------------------------------------------------------------ #
    p("✦ Phase 2: build hierarchical IDs")

    def change_id(col: pd.Series, by: pd.Series) -> pd.Series:
        return (
            col.fillna("missing")
            .groupby(by)
            .transform(lambda s: s.ne(s.shift()).cumsum())
            .astype("int32")
        )

    rs["device_category"] = rs.groupby(id_col)["device_category"].ffill()
    rs["device_cat_id"]   = change_id(rs["device_category"], rs[id_col])

    rs["device_name"] = (
        rs.sort_values("recorded_dttm")
          .groupby([id_col, "device_cat_id"])["device_name"]
          .transform(fb).infer_objects(copy=False)
    )
    rs["device_id"] = change_id(rs["device_name"], rs[id_col])

    rs = rs.sort_values([id_col, "recorded_dttm"])
    rs["mode_category"] = (
        rs.groupby([id_col, "device_id"])["mode_category"]
          .transform(fb).infer_objects(copy=False)
    )
    rs["mode_cat_id"] = change_id(
        rs["mode_category"].fillna("missing"), rs[id_col]
    )

    rs["mode_name"] = (
        rs.groupby([id_col, "mode_cat_id"])["mode_name"]
          .transform(fb).infer_objects(copy=False)
    )
    rs["mode_name_id"] = change_id(
        rs["mode_name"].fillna("missing"), rs[id_col]
    )

    # ------------------------------------------------------------------ #
    # Phase 3 – numeric waterfall                                        #
    # ------------------------------------------------------------------ #
    fill_type = "bi-directional" if bfill else "forward-only"
    p(f"✦ Phase 3: {fill_type} numeric fill inside mode_name_id blocks")

    # FiO₂ default for room-air
    if "fio2_set" in rs.columns:
        rs.loc[(rs["device_category"] == "room air") & rs["fio2_set"].isna(), "fio2_set"] = 0.21

    # Tidal-volume clean-up
    if "tidal_volume_set" in rs.columns:
        bad_tv = (
            ((rs["mode_category"] == "pressure support/cpap") & rs.get("pressure_support_set").notna())
            | (rs["mode_category"].isna() & rs.get("device_name").str.contains("trach", na=False))
            | ((rs["mode_category"] == "pressure support/cpap") & rs.get("device_name").str.contains("trach", na=False))
        )
        rs.loc[bad_tv, "tidal_volume_set"] = np.nan

    num_cols_fill = [
        c for c in [
            "fio2_set", "lpm_set", "peep_set", "tidal_volume_set",
            "pressure_support_set", "resp_rate_set", "resp_rate_obs",
            "peak_inspiratory_pressure_set",
        ] if c in rs.columns
    ]

    def fill_block(g: pd.DataFrame) -> pd.DataFrame:
        if (g["device_category"] == "trach collar").any():
            breaker = (g["device_category"] == "trach collar").cumsum()
            return g.groupby(breaker)[num_cols_fill].apply(fb)
        return fb(g[num_cols_fill])

    p(f"  • applying waterfall fill to {rs[id_col].nunique():,} encounters")
    tqdm.pandas(disable=not verbose, desc="Waterfall fill by mode_name_id")
    rs[num_cols_fill] = (
        rs.groupby([id_col, "mode_name_id"], group_keys=False, sort=False)
          .progress_apply(fill_block)
    )

    # “T-piece” → classify as blow-by
    tpiece = rs["mode_category"].isna() & rs.get("device_name").str.contains("t-piece", na=False)
    rs.loc[tpiece, "mode_category"] = "blow by"

    # Tracheostomy flag forward-fill per encounter
    if "tracheostomy" in rs.columns:
        rs["tracheostomy"] = rs.groupby(id_col)["tracheostomy"].ffill()

    # ------------------------------------------------------------------ #
    # Phase 4 – final tidy-up                                            #
    # ------------------------------------------------------------------ #
    p("✦ Phase 4: final dedup & ordering")
    rs = (
        rs.drop_duplicates()
          .sort_values([id_col, "recorded_dttm"])
          .reset_index(drop=True)
    )

    # Drop helper cols
    rs = rs.drop(columns=[c for c in ["recorded_date", "recorded_hour"] if c in rs.columns])

    p("[OK] Respiratory-support waterfall complete.")
    return rs

Clinical Calculations

Comorbidity Indices

Calculate Charlson and Elixhauser comorbidity indices from diagnosis data.

clifpy.utils.comorbidity.calculate_cci

calculate_cci(hospital_diagnosis, hierarchy=True)

Calculate Charlson Comorbidity Index (CCI) for hospitalizations.

This function processes hospital diagnosis data to calculate CCI scores using the Quan (2011) adaptation with ICD-10-CM codes.

Parameters:

Name Type Description Default
hospital_diagnosis HospitalDiagnosis object, pandas DataFrame, or polars DataFrame

containing diagnosis data with columns: - hospitalization_id - diagnosis_code - diagnosis_code_format

required
hierarchy bool

Apply assign0 logic to prevent double counting of conditions when both mild and severe forms are present

True

Returns:

Type Description
DataFrame

DataFrame with columns: - hospitalization_id (index) - 17 binary condition columns (0/1) - cci_score (weighted sum)

Source code in clifpy/utils/comorbidity.py
def calculate_cci(
    hospital_diagnosis: Union['HospitalDiagnosis', pd.DataFrame, pl.DataFrame],
    hierarchy: bool = True
) -> pl.DataFrame:
    """
    Calculate Charlson Comorbidity Index (CCI) for hospitalizations.

    This function processes hospital diagnosis data to calculate CCI scores
    using the Quan (2011) adaptation with ICD-10-CM codes.

    Parameters
    ----------
    hospital_diagnosis : HospitalDiagnosis object, pandas DataFrame, or polars DataFrame
        containing diagnosis data with columns:
        - hospitalization_id
        - diagnosis_code
        - diagnosis_code_format
    hierarchy : bool, default=True
        Apply assign0 logic to prevent double counting
        of conditions when both mild and severe forms are present

    Returns
    -------
    pd.DataFrame
        DataFrame with columns:
        - hospitalization_id (index)
        - 17 binary condition columns (0/1)
        - cci_score (weighted sum)
    """

    # Load CCI configuration
    cci_config = _load_cci_config()

    # Print configuration info as requested
    print(f"name: \"{cci_config['name']}\"")
    print(f"version: \"{cci_config['version']}\"")
    print(f"supported_formats:")
    for fmt in cci_config['supported_formats']:
        print(f"  - {fmt}")

    # Convert input to polars DataFrame
    if hasattr(hospital_diagnosis, 'df'):
        # HospitalDiagnosis object
        df = pl.from_pandas(hospital_diagnosis.df)
    elif isinstance(hospital_diagnosis, pd.DataFrame):
        df = pl.from_pandas(hospital_diagnosis)
    elif isinstance(hospital_diagnosis, pl.DataFrame):
        df = hospital_diagnosis
    else:
        raise ValueError("hospital_diagnosis must be HospitalDiagnosis object, pandas DataFrame, or polars DataFrame")

    # Filter to only ICD10CM codes (discard other formats)
    df_filtered = df.filter(pl.col("diagnosis_code_format") == "ICD10CM")

    # Preprocess diagnosis codes: remove decimal parts (e.g., "I21.45" -> "I21")
    df_processed = df_filtered.with_columns([
        pl.col("diagnosis_code").str.split(".").list.get(0).alias("diagnosis_code_clean")
    ])

    # Map diagnosis codes to CCI conditions
    condition_mappings = cci_config['diagnosis_code_mappings']['ICD10CM']
    weights = cci_config['weights']

    # Create condition presence indicators
    condition_columns = []

    for condition_name, condition_info in tqdm(condition_mappings.items(), desc="Mapping ICD codes to CCI conditions"):
        condition_codes = condition_info['codes']

        # Create a boolean expression for this condition
        condition_expr = pl.lit(False)
        for code in condition_codes:
            condition_expr = condition_expr | pl.col("diagnosis_code_clean").str.starts_with(code)

        condition_columns.append(condition_expr.alias(f"{condition_name}_present"))

    # Add condition indicators to dataframe
    df_with_conditions = df_processed.with_columns(condition_columns)

    # Group by hospitalization_id and aggregate condition presence
    condition_names = list(condition_mappings.keys())

    # Create aggregation expressions
    agg_exprs = []
    for condition_name in condition_names:
        agg_exprs.append(
            pl.col(f"{condition_name}_present").max().alias(condition_name)
        )

    # Group by hospitalization and get condition presence
    df_grouped = df_with_conditions.group_by("hospitalization_id").agg(agg_exprs)

    # Apply hierarchy logic if enabled (assign0)
    if hierarchy:
        df_grouped = _apply_hierarchy_logic(df_grouped, cci_config['hierarchies'])

    # Calculate CCI score
    df_with_score = _calculate_cci_score(df_grouped, weights)

    # Convert boolean columns to integers for consistency
    condition_names = list(condition_mappings.keys())
    cast_exprs = []
    for col in df_with_score.columns:
        if col in condition_names:
            cast_exprs.append(pl.col(col).cast(pl.Int32).alias(col))
        else:
            cast_exprs.append(pl.col(col))

    df_with_score = df_with_score.select(cast_exprs)

    # Convert to pandas DataFrame before returning
    return df_with_score.to_pandas()

Data Quality Management

Outlier Handling

Detect and handle physiologically implausible values using configurable ranges.

clifpy.utils.outlier_handler.apply_outlier_handling

apply_outlier_handling(table_obj, outlier_config_path=None)

Apply outlier handling to a table object's dataframe.

This function identifies numeric values that fall outside acceptable ranges and converts them to NaN. For category-dependent columns (vitals, labs, medications, assessments), ranges are applied based on the category value.

Uses ultra-fast Polars implementation with progress tracking.

Parameters:

Name Type Description Default
table_obj

A pyCLIF table object with .df (DataFrame) and .table_name attributes

required
outlier_config_path str

Path to custom outlier configuration YAML. If None, uses internal CLIF standard config.

None

Returns:

Type Description
None

modifies table_obj.df in-place

Source code in clifpy/utils/outlier_handler.py
def apply_outlier_handling(table_obj, outlier_config_path: Optional[str] = None) -> None:
    """
    Apply outlier handling to a table object's dataframe.

    This function identifies numeric values that fall outside acceptable ranges
    and converts them to NaN. For category-dependent columns (vitals, labs,
    medications, assessments), ranges are applied based on the category value.

    Uses ultra-fast Polars implementation with progress tracking.

    Parameters
    ----------
    table_obj
        A pyCLIF table object with .df (DataFrame) and .table_name attributes
    outlier_config_path : str, optional
        Path to custom outlier configuration YAML.
        If None, uses internal CLIF standard config.

    Returns
    -------
    None
        modifies table_obj.df in-place
    """
    if table_obj.df is None or table_obj.df.empty:
        print("No data to process for outlier handling.")
        return

    # Load outlier configuration
    config = _load_outlier_config(outlier_config_path)
    if not config:
        print("Failed to load outlier configuration.")
        return

    # Print which configuration is being used
    if outlier_config_path is None:
        print("Using CLIF standard outlier ranges\n")
    else:
        print(f"Using custom outlier ranges from: {outlier_config_path}\n")

    # Get table-specific configuration
    table_config = config.get('tables', {}).get(table_obj.table_name, {})
    if not table_config:
        print(f"No outlier configuration found for table: {table_obj.table_name}")
        return

    # Filter columns that exist in the dataframe
    existing_columns = {col: conf for col, conf in table_config.items() if col in table_obj.df.columns}

    if not existing_columns:
        print("No configured columns found in dataframe.")
        return

    # Ultra-fast processing with single conversion
    _process_all_columns_ultra_fast(table_obj, existing_columns)

clifpy.utils.outlier_handler.get_outlier_summary

get_outlier_summary(table_obj, outlier_config_path=None)

Get a summary of potential outliers without modifying the data.

This is a convenience wrapper around validate_numeric_ranges_from_config() for interactive use with table objects. It provides actual outlier counts and percentages without modifying the data.

Parameters:

Name Type Description Default
table_obj

A pyCLIF table object with .df, .table_name, and .schema attributes

required
outlier_config_path str

Path to custom outlier configuration. If None, uses CLIF standard config.

None

Returns:

Type Description
dict

Summary of outliers with keys: - table_name: Name of the table - total_rows: Total number of rows - config_source: "CLIF standard" or "Custom" - outliers: List of outlier validation results with counts and percentages

See Also

clifpy.utils.validator.validate_numeric_ranges_from_config : Core validation function

Examples:

>>> from clifpy.tables.vitals import Vitals
>>> from clifpy.utils.outlier_handler import get_outlier_summary
>>>
>>> vitals = Vitals.from_file()
>>> summary = get_outlier_summary(vitals)
>>> print(f"Found {len(summary['outliers'])} outlier patterns")
Source code in clifpy/utils/outlier_handler.py
def get_outlier_summary(table_obj, outlier_config_path: Optional[str] = None) -> Dict[str, Any]:
    """
    Get a summary of potential outliers without modifying the data.

    This is a convenience wrapper around validate_numeric_ranges_from_config()
    for interactive use with table objects. It provides actual outlier counts
    and percentages without modifying the data.

    Parameters
    ----------
    table_obj
        A pyCLIF table object with .df, .table_name, and .schema attributes
    outlier_config_path : str, optional
        Path to custom outlier configuration. If None, uses CLIF standard config.

    Returns
    -------
    dict
        Summary of outliers with keys:
        - table_name: Name of the table
        - total_rows: Total number of rows
        - config_source: "CLIF standard" or "Custom"
        - outliers: List of outlier validation results with counts and percentages

    See Also
    --------
    clifpy.utils.validator.validate_numeric_ranges_from_config : Core validation function

    Examples
    --------
    >>> from clifpy.tables.vitals import Vitals
    >>> from clifpy.utils.outlier_handler import get_outlier_summary
    >>>
    >>> vitals = Vitals.from_file()
    >>> summary = get_outlier_summary(vitals)
    >>> print(f"Found {len(summary['outliers'])} outlier patterns")
    """
    if table_obj.df is None or table_obj.df.empty:
        return {"status": "No data to analyze"}

    # Load outlier configuration
    config = _load_outlier_config(outlier_config_path)
    if not config:
        return {"status": "Failed to load configuration"}

    # Check if table has schema
    if not hasattr(table_obj, 'schema') or table_obj.schema is None:
        return {"status": "Table schema not available"}

    # Check if table has outlier configuration
    table_config = config.get('tables', {}).get(table_obj.table_name, {})
    if not table_config:
        return {"status": f"No outlier configuration for table: {table_obj.table_name}"}

    # Use the validator to get actual outlier analysis
    from clifpy.utils import validator

    outlier_results = validator.validate_numeric_ranges_from_config(
        table_obj.df,
        table_obj.table_name,
        table_obj.schema,
        config
    )

    # Build summary
    summary = {
        "table_name": table_obj.table_name,
        "total_rows": len(table_obj.df),
        "config_source": "CLIF standard" if outlier_config_path is None else "Custom",
        "outliers": outlier_results
    }

    return summary

Data Validation

Comprehensive validation functions for ensuring data quality and CLIF compliance.

clifpy.utils.validator.validate_dataframe

validate_dataframe(df, spec)

Validate df against spec.

Returns a list of error dictionaries. An empty list means success.

For datatype validation:

  • If a column doesn't match the expected type exactly, the validator checks if the data can be cast to the correct type
  • Castable type mismatches return warnings with type "datatype_castable"
  • Non-castable type mismatches return errors with type "datatype_mismatch"
  • Both include descriptive messages about the casting capability
Source code in clifpy/utils/validator.py
def validate_dataframe(df: pd.DataFrame, spec: dict[str, Any]) -> List[dict[str, Any]]:
    """Validate *df* against *spec*.

    Returns a list of error dictionaries. An empty list means success.

    For datatype validation:

    - If a column doesn't match the expected type exactly, the validator checks
      if the data can be cast to the correct type
    - Castable type mismatches return warnings with type "datatype_castable"
    - Non-castable type mismatches return errors with type "datatype_mismatch"
    - Both include descriptive messages about the casting capability
    """

    errors: List[dict[str, Any]] = []

    # 1. Required columns present ------------------------------------------------
    req_cols = set(spec.get("required_columns", []))
    missing = req_cols - set(df.columns)
    if missing:
        missing_list = sorted(missing)
        errors.append({
            "type": "missing_columns",
            "columns": missing_list,
            "message": f"Missing required columns: {', '.join(missing_list)}"
        })

    # 2. Per-column checks -------------------------------------------------------
    for col_spec in spec.get("columns", []):
        name = col_spec["name"]
        if name not in df.columns:
            # If it's required the above block already captured the issue.
            continue

        series = df[name]

        # 2a. NULL checks -----------------------------------------------------
        if col_spec.get("required", False):
            null_cnt = int(series.isna().sum())
            total_cnt = int(len(series))
            null_pct = (null_cnt / total_cnt * 100) if total_cnt > 0 else 0.0
            if null_cnt:
                errors.append({
                    "type": "null_values",
                    "column": name,
                    "count": null_cnt,
                    "percent": round(null_pct, 2),
                    "message": f"Column '{name}' has {null_cnt} null values ({null_pct:.2f}%) in required field"
                })

        # 2b. Datatype checks -------------------------------------------------
        expected_type = col_spec.get("data_type")
        checker = _DATATYPE_CHECKERS.get(expected_type)
        cast_checker = _DATATYPE_CAST_CHECKERS.get(expected_type)

        if checker and not checker(series):
            # Check if data can be cast to the correct type
            if cast_checker and cast_checker(series):
                # Data can be cast - this is a warning, not an error
                errors.append({
                    "type": "datatype_castable",
                    "column": name,
                    "expected": expected_type,
                    "actual": str(series.dtype),
                    "message": f"Column '{name}' has type {series.dtype} but can be cast to {expected_type}"
                })
            else:
                # Data cannot be cast - this is an error
                errors.append({
                    "type": "datatype_mismatch",
                    "column": name,
                    "expected": expected_type,
                    "actual": str(series.dtype),
                    "message": f"Column '{name}' has type {series.dtype} and cannot be cast to {expected_type}"
                })

        # # 2c. Category values -------------------------------------------------
        # if col_spec.get("is_category_column") and col_spec.get("permissible_values"):
        #     allowed = set(col_spec["permissible_values"])
        #     actual_values = set(series.dropna().unique())

        #     # Check for missing expected values (permissible values not present in data)
        #     missing_values = [v for v in allowed if v not in actual_values]
        #     if missing_values:
        #         errors.append({
        #             "type": "missing_category_values",
        #             "column": name,
        #             "missing_values": missing_values,
        #             "message": f"Column '{name}' is missing expected category values: {missing_values}"
        #         })

    return errors

clifpy.utils.validator.validate_table

validate_table(df, table_name, spec_dir=None)

Validate df using the JSON spec for table_name.

Convenience wrapper combining :pyfunc:_load_spec and :pyfunc:validate_dataframe.

Source code in clifpy/utils/validator.py
def validate_table(
    df: pd.DataFrame, table_name: str, spec_dir: str | None = None
) -> List[dict[str, Any]]:
    """Validate *df* using the JSON spec for *table_name*.

    Convenience wrapper combining :pyfunc:`_load_spec` and
    :pyfunc:`validate_dataframe`.
    """

    spec = _load_spec(table_name, spec_dir)
    return validate_dataframe(df, spec)

clifpy.utils.validator.check_required_columns

check_required_columns(df, column_names, table_name)

Validate that required columns are present in the dataframe.

Parameters:

Name Type Description Default
df DataFrame

The dataframe to validate

required
column_names List[str]

List of required column names

required
table_name str

Name of the table being validated

required

Returns:

Type Description
dict

Dictionary with validation results including missing columns

Source code in clifpy/utils/validator.py
def check_required_columns(
    df: pd.DataFrame, 
    column_names: List[str], 
    table_name: str
) -> Dict[str, Any]:
    """
    Validate that required columns are present in the dataframe.

    Parameters
    ----------
    df : pd.DataFrame
        The dataframe to validate
    column_names : List[str]
        List of required column names
    table_name : str
        Name of the table being validated

    Returns
    -------
    dict
        Dictionary with validation results including missing columns
    """
    try:
        missing_columns = [col for col in column_names if col not in df.columns]

        if missing_columns:
            return {
                "type": "missing_required_columns",
                "table": table_name,
                "missing_columns": missing_columns,
                "status": "error",
                "message": f"Table '{table_name}' is missing required columns: {', '.join(missing_columns)}"
            }

        return {
            "type": "missing_required_columns",
            "table": table_name,
            "status": "success",
            "message": f"Table '{table_name}' has all required columns"
        }

    except Exception as e:
        return {
            "type": "missing_required_columns",
            "table": table_name,
            "status": "error",
            "error_message": str(e),
            "message": f"Error checking required columns for table '{table_name}': {str(e)}"
        }

clifpy.utils.validator.verify_column_dtypes

verify_column_dtypes(df, schema)

Ensure columns have correct data types per schema.

Parameters:

Name Type Description Default
df DataFrame

The dataframe to validate

required
schema dict

Schema containing column definitions

required

Returns:

Type Description
List[dict]

List of datatype mismatch errors

Source code in clifpy/utils/validator.py
def verify_column_dtypes(df: pd.DataFrame, schema: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Ensure columns have correct data types per schema.

    Parameters
    ----------
    df : pd.DataFrame
        The dataframe to validate
    schema : dict
        Schema containing column definitions

    Returns
    -------
    List[dict]
        List of datatype mismatch errors
    """
    errors = []

    try:
        for col_spec in schema.get("columns", []):
            name = col_spec["name"]
            if name not in df.columns:
                continue

            expected_type = col_spec.get("data_type")
            if not expected_type:
                continue

            series = df[name]
            checker = _DATATYPE_CHECKERS.get(expected_type)
            cast_checker = _DATATYPE_CAST_CHECKERS.get(expected_type)

            if checker and not checker(series):
                # Check if data can be cast to the correct type
                if cast_checker and cast_checker(series):
                    # Data can be cast - this is a warning, not an error
                    errors.append({
                        "type": "datatype_verification_castable",
                        "column": name,
                        "expected": expected_type,
                        "actual": str(series.dtype),
                        "status": "warning",
                        "message": f"Column '{name}' has type {series.dtype} but can be cast to {expected_type}"
                    })
                else:
                    # Data cannot be cast - this is an error
                    errors.append({
                        "type": "datatype_verification",
                        "column": name,
                        "expected": expected_type,
                        "actual": str(series.dtype),
                        "status": "error",
                        "message": f"Column '{name}' has type {series.dtype} and cannot be cast to {expected_type}"
                    })

    except Exception as e:
        errors.append({
            "type": "datatype_verification",
            "status": "error",
            "error_message": str(e),
            "message": f"Error during datatype verification: {str(e)}"
        })

    return errors

clifpy.utils.validator.validate_categorical_values

validate_categorical_values(df, schema)

Check values against permitted categories.

Parameters:

Name Type Description Default
df DataFrame

The dataframe to validate

required
schema dict

Schema containing category definitions

required

Returns:

Type Description
List[dict]

List of invalid category value errors

Source code in clifpy/utils/validator.py
def validate_categorical_values(
    df: pd.DataFrame, 
    schema: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """
    Check values against permitted categories.

    Parameters
    ----------
    df : pd.DataFrame
        The dataframe to validate
    schema : dict
        Schema containing category definitions

    Returns
    -------
    List[dict]
        List of invalid category value errors
    """
    errors = []

    try:
        category_columns = schema.get("category_columns") or []

        for col_spec in schema.get("columns", []):
            name = col_spec["name"]

            if name not in df.columns or name not in category_columns:
                continue

            if col_spec.get("permissible_values"):
                allowed = set(col_spec["permissible_values"])

                # Get unique values in the column (excluding NaN)
                unique_values = set(df[name].dropna().unique())
                # Check for missing expected values (permissible values not present in data)
                missing_values = [v for v in allowed if v not in unique_values]
                if missing_values:
                    errors.append({
                                "type": "missing_categorical_values",
                                "column": name,
                                "missing_values": missing_values,
                                "total_missing": len(missing_values),
                                "message": f"Column '{name}' is missing {len(missing_values)} expected category values: {missing_values}"
                            })

    except Exception as e:
        errors.append({
            "type": "categorical_validation",
            "status": "error",
            "error_message": str(e),
            "message": f"Error validating categorical values: {str(e)}"
        })

    return errors

Configuration and I/O

Configuration Management

Load and manage CLIF configuration files for consistent settings.

clifpy.utils.config.load_config

load_config(config_path=None)

Load CLIF configuration from JSON or YAML file.

Parameters:

Name Type Description Default
config_path str

Path to the configuration file. If None, looks for 'config.json' or 'config.yaml' in current directory.

None

Returns:

Type Description
dict

Configuration dictionary with required fields validated

Raises:

Type Description
FileNotFoundError

If config file doesn't exist

ValueError

If required fields are missing or invalid

JSONDecodeError

If JSON config file is not valid

YAMLError

If YAML config file is not valid

Source code in clifpy/utils/config.py
def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
    """
    Load CLIF configuration from JSON or YAML file.

    Parameters
    ----------
    config_path : str, optional
        Path to the configuration file.
        If None, looks for 'config.json' or 'config.yaml' in current directory.

    Returns
    -------
    dict
        Configuration dictionary with required fields validated

    Raises
    ------
    FileNotFoundError
        If config file doesn't exist
    ValueError
        If required fields are missing or invalid
    json.JSONDecodeError
        If JSON config file is not valid
    yaml.YAMLError
        If YAML config file is not valid
    """
    # Determine config file path
    if config_path is None:
        # Look for config files in order of preference: JSON, YAML, YML
        cwd = os.getcwd()
        for filename in ['config.json', 'config.yaml', 'config.yml']:
            potential_path = os.path.join(cwd, filename)
            if os.path.exists(potential_path):
                config_path = potential_path
                break

        if config_path is None:
            raise FileNotFoundError(
                f"Configuration file not found in {cwd}\n"
                "Please either:\n"
                "  1. Create a config.json or config.yaml file in the current directory\n"
                "  2. Provide config_path parameter pointing to your config file\n"
                "  3. Provide data_directory, filetype, and timezone parameters directly"
            )

    # Check if config file exists
    if not os.path.exists(config_path):
        raise FileNotFoundError(
            f"Configuration file not found: {config_path}\n"
            "Please either:\n"
            "  1. Create a config.json or config.yaml file in the current directory\n"
            "  2. Provide config_path parameter pointing to your config file\n"
            "  3. Provide data_directory, filetype, and timezone parameters directly"
        )

    # Load configuration using helper function
    config = _load_config_file(config_path)

    # Validate required fields
    required_fields = ['data_directory', 'filetype', 'timezone']
    missing_fields = [field for field in required_fields if field not in config]

    if missing_fields:
        raise ValueError(
            f"Missing required fields in configuration file {config_path}: {missing_fields}\n"
            f"Required fields are: {required_fields}"
        )

    # Validate data_directory exists
    data_dir = config['data_directory']
    if not os.path.exists(data_dir):
        raise ValueError(
            f"Data directory specified in config does not exist: {data_dir}\n"
            f"Please check the 'data_directory' path in {config_path}"
        )

    # Validate filetype
    supported_filetypes = ['csv', 'parquet']
    if config['filetype'] not in supported_filetypes:
        raise ValueError(
            f"Unsupported filetype '{config['filetype']}' in {config_path}\n"
            f"Supported filetypes are: {supported_filetypes}"
        )

    logger.info(f"Configuration loaded from {config_path}")
    return config

clifpy.utils.config.get_config_or_params

get_config_or_params(config_path=None, data_directory=None, filetype=None, timezone=None, output_directory=None)

Get configuration from either config file or direct parameters.

Loading priority:

  1. If all required params provided directly → use them
  2. If config_path provided → load from that path, allow param overrides
  3. If no params and no config_path → auto-detect config.json/yaml/yml
  4. Parameters override config file values when both are provided

Parameters:

Name Type Description Default
config_path str

Path to configuration file

None
data_directory str

Direct parameter

None
filetype str

Direct parameter

None
timezone str

Direct parameter

None
output_directory str

Direct parameter

None

Returns:

Type Description
dict

Final configuration dictionary

Raises:

Type Description
ValueError

If neither config nor required params are provided

Source code in clifpy/utils/config.py
def get_config_or_params(
    config_path: Optional[str] = None,
    data_directory: Optional[str] = None,
    filetype: Optional[str] = None,
    timezone: Optional[str] = None,
    output_directory: Optional[str] = None
) -> Dict[str, Any]:
    """
    Get configuration from either config file or direct parameters.

    Loading priority:

    1. If all required params provided directly → use them
    2. If config_path provided → load from that path, allow param overrides
    3. If no params and no config_path → auto-detect config.json/yaml/yml
    4. Parameters override config file values when both are provided

    Parameters
    ----------
    config_path : str, optional
        Path to configuration file
    data_directory : str, optional
        Direct parameter
    filetype : str, optional
        Direct parameter  
    timezone : str, optional
        Direct parameter
    output_directory : str, optional
        Direct parameter

    Returns
    -------
    dict
        Final configuration dictionary

    Raises
    ------
    ValueError
        If neither config nor required params are provided
    """
    # Check if all required params are provided directly
    required_params = [data_directory, filetype, timezone]
    if all(param is not None for param in required_params):
        # All required params provided - use them directly
        config = {
            'data_directory': data_directory,
            'filetype': filetype,
            'timezone': timezone
        }
        if output_directory is not None:
            config['output_directory'] = output_directory
        logger.debug("Using directly provided parameters")
        return config

    # Try to load from config file
    try:
        config = load_config(config_path)
    except FileNotFoundError:
        # If no config file and incomplete params, raise helpful error
        if any(param is not None for param in required_params):
            # Some params provided but not all
            missing = []
            if data_directory is None:
                missing.append('data_directory')
            if filetype is None:
                missing.append('filetype') 
            if timezone is None:
                missing.append('timezone')
            raise ValueError(
                f"Incomplete parameters provided. Missing: {missing}\n"
                "Please either:\n"
                "  1. Provide all required parameters (data_directory, filetype, timezone)\n"
                "  2. Create a config.json or config.yaml file\n"
                "  3. Provide a config_path parameter"
            )
        else:
            # No params and no config file - re-raise the original error
            raise

    # Override config values with any provided parameters
    if data_directory is not None:
        config['data_directory'] = data_directory
        logger.debug(f"Overriding data_directory: {data_directory}")

    if filetype is not None:
        config['filetype'] = filetype
        logger.debug(f"Overriding filetype: {filetype}")

    if timezone is not None:
        config['timezone'] = timezone
        logger.debug(f"Overriding timezone: {timezone}")

    if output_directory is not None:
        config['output_directory'] = output_directory
        logger.debug(f"Overriding output_directory: {output_directory}")

    return config

Data Loading

Core data loading functionality with timezone and filtering support.

clifpy.utils.io.load_data

load_data(table_name, table_path, table_format_type, sample_size=None, columns=None, filters=None, site_tz=None, verbose=False)

Load data from a file in the specified directory with the option to select specific columns and apply filters.

Parameters:

Name Type Description Default
table_name str

The name of the table to load.

required
table_path str

Path to the directory containing the data file.

required
table_format_type str

Format of the data file (e.g., 'csv', 'parquet').

required
sample_size int

Number of rows to load.

None
columns list of str

List of column names to load.

None
filters dict

Dictionary of filters to apply.

None
site_tz str

Timezone string for datetime conversion, e.g., "America/New_York".

None
verbose bool

If True, show detailed loading messages. Default is False.

False

Returns:

Type Description
DataFrame

DataFrame containing the requested data.

Source code in clifpy/utils/io.py
def load_data(table_name, table_path, table_format_type, sample_size=None, columns=None, filters=None, site_tz=None, verbose=False) -> pd.DataFrame:
    """
    Load data from a file in the specified directory with the option to select specific columns and apply filters.

    Parameters
    ----------
    table_name : str
        The name of the table to load.
    table_path : str
        Path to the directory containing the data file.
    table_format_type : str
        Format of the data file (e.g., 'csv', 'parquet').
    sample_size : int, optional
        Number of rows to load.
    columns : list of str, optional
        List of column names to load.
    filters : dict, optional
        Dictionary of filters to apply.
    site_tz : str, optional
        Timezone string for datetime conversion, e.g., "America/New_York".
    verbose : bool, optional
        If True, show detailed loading messages. Default is False.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the requested data.
    """
    # Determine the file path based on the directory and filetype

    file_path = os.path.join(table_path, 'clif_'+ table_name + '.' + table_format_type)

    # Load the data based on filetype
    if os.path.exists(file_path):
        if  table_format_type == 'csv':
            if verbose:
                logger.info('Loading CSV file')
            # For CSV, we can use DuckDB to read specific columns and apply filters efficiently
            con = duckdb.connect()
            # Build the SELECT clause
            select_clause = "*" if not columns else ", ".join(columns)
            # Start building the query
            query = f"SELECT {select_clause} FROM read_csv_auto('{file_path}')"
            # Apply filters
            if filters:
                filter_clauses = []
                for column, values in filters.items():
                    if isinstance(values, list):
                        # Escape single quotes and wrap values in quotes
                        values_list = ', '.join(["'" + str(value).replace("'", "''") + "'" for value in values])
                        filter_clauses.append(f"{column} IN ({values_list})")
                    else:
                        value = str(values).replace("'", "''")
                        filter_clauses.append(f"{column} = '{value}'")
                if filter_clauses:
                    query += " WHERE " + " AND ".join(filter_clauses)
            # Apply sample size limit
            if sample_size:
                query += f" LIMIT {sample_size}"
            # Execute the query and fetch the data
            df = con.execute(query).fetchdf()
            con.close()
        elif table_format_type == 'parquet':
            df = load_parquet_with_tz(file_path, columns, filters, sample_size, verbose)
        else:
            raise ValueError("Unsupported filetype. Only 'csv' and 'parquet' are supported.")
        # Extract just the filename for cleaner output
        filename = os.path.basename(file_path)
        if verbose:
            logger.info(f"Data loaded successfully from {filename}")
        df = _cast_id_cols_to_string(df) # Cast id columns to string

        # Convert datetime columns to site timezone if specified
        if site_tz:
            df = convert_datetime_columns_to_site_tz(df, site_tz, verbose)

        return df
    else:
        raise FileNotFoundError(f"The file {file_path} does not exist in the specified directory.")

Simplified Import Paths

As of version 0.0.1, commonly used utilities are available directly from the clifpy package:

# Direct imports from clifpy
import clifpy

# Encounter stitching
hospitalization_stitched, adt_stitched, mapping = clifpy.stitch_encounters(
    hospitalization_df, 
    adt_df,
    time_interval=6
)

# Wide dataset creation
wide_df = clifpy.create_wide_dataset(
    clif_instance=orchestrator,
    optional_tables=['vitals', 'labs'],
    category_filters={'vitals': ['heart_rate', 'sbp']}
)

# Calculate comorbidity index
cci_scores = clifpy.calculate_cci(
    hospital_diagnosis_df,
    hospitalization_df
)

# Apply outlier handling
clifpy.apply_outlier_handling(table_object)

For backward compatibility, the original import paths (clifpy.utils.module.function) remain available.