Skip to content

Table Classes

Patient

clifpy.tables.patient.Patient

Patient(
    data_directory=None,
    filetype=None,
    timezone="UTC",
    output_directory=None,
    data=None,
)

Bases: BaseTable

Patient table wrapper inheriting from BaseTable.

This class handles patient-specific data and validations while leveraging the common functionality provided by BaseTable.

Initialize the patient table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/patient.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the patient table.

    Parameters:
        data_directory (str): Path to the directory containing data files
        filetype (str): Type of data file (csv, parquet, etc.)
        timezone (str): Timezone for datetime columns
        output_directory (str, optional): Directory for saving output files and logs
        data (pd.DataFrame, optional): Pre-loaded data to use instead of loading from file
    """

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

ADT (Admission, Discharge, Transfer)

clifpy.tables.adt.Adt

Adt(
    data_directory=None,
    filetype=None,
    timezone="UTC",
    output_directory=None,
    data=None,
)

Bases: BaseTable

ADT (Admission/Discharge/Transfer) table wrapper inheriting from BaseTable.

This class handles ADT-specific data and validations while leveraging the common functionality provided by BaseTable.

Initialize the ADT table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/adt.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the ADT table.

    Parameters:
        data_directory (str): Path to the directory containing data files
        filetype (str): Type of data file (csv, parquet, etc.)
        timezone (str): Timezone for datetime columns
        output_directory (str, optional): Directory for saving output files and logs
        data (pd.DataFrame, optional): Pre-loaded data to use instead of loading from file
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: adt(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

filter_by_date_range

filter_by_date_range(
    start_date, end_date, date_column="in_dttm"
)

Return records within a specific date range for a given datetime column.

Source code in clifpy/tables/adt.py
def filter_by_date_range(self, start_date: datetime, end_date: datetime, 
                       date_column: str = 'in_dttm') -> pd.DataFrame:
    """Return records within a specific date range for a given datetime column."""
    if self.df is None or date_column not in self.df.columns:
        return pd.DataFrame()

    # Convert datetime column to datetime if it's not already
    df_copy = self.df.copy()
    df_copy[date_column] = pd.to_datetime(df_copy[date_column])

    mask = (df_copy[date_column] >= start_date) & (df_copy[date_column] <= end_date)
    return df_copy[mask]

filter_by_hospitalization

filter_by_hospitalization(hospitalization_id)

Return all ADT records for a specific hospitalization.

Source code in clifpy/tables/adt.py
def filter_by_hospitalization(self, hospitalization_id: str) -> pd.DataFrame:
    """Return all ADT records for a specific hospitalization."""
    if self.df is None:
        return pd.DataFrame()

    return self.df[self.df['hospitalization_id'] == hospitalization_id].copy()

filter_by_location_category

filter_by_location_category(location_category)

Return all records for a specific location category (e.g., 'icu', 'ward').

Source code in clifpy/tables/adt.py
def filter_by_location_category(self, location_category: str) -> pd.DataFrame:
    """Return all records for a specific location category (e.g., 'icu', 'ward')."""
    if self.df is None or 'location_category' not in self.df.columns:
        return pd.DataFrame()

    return self.df[self.df['location_category'] == location_category].copy()

get_hospital_types

get_hospital_types()

Return unique hospital types in the dataset.

Source code in clifpy/tables/adt.py
def get_hospital_types(self) -> List[str]:
    """Return unique hospital types in the dataset."""
    if self.df is None or 'hospital_type' not in self.df.columns:
        return []
    return self.df['hospital_type'].dropna().unique().tolist()

get_location_categories

get_location_categories()

Return unique location categories in the dataset.

Source code in clifpy/tables/adt.py
def get_location_categories(self) -> List[str]:
    """Return unique location categories in the dataset."""
    if self.df is None or 'location_category' not in self.df.columns:
        return []
    return self.df['location_category'].dropna().unique().tolist()

get_summary_stats

get_summary_stats()

Return summary statistics for the ADT data.

Source code in clifpy/tables/adt.py
def get_summary_stats(self) -> Dict:
    """Return summary statistics for the ADT data."""
    if self.df is None:
        return {}

    stats = {
        'total_records': len(self.df),
        'unique_hospitalizations': self.df['hospitalization_id'].nunique() if 'hospitalization_id' in self.df.columns else 0,
        'unique_hospitals': self.df['hospital_id'].nunique() if 'hospital_id' in self.df.columns else 0,
        'location_category_counts': self.df['location_category'].value_counts().to_dict() if 'location_category' in self.df.columns else {},
        'hospital_type_counts': self.df['hospital_type'].value_counts().to_dict() if 'hospital_type' in self.df.columns else {},
        'date_range': {
            'earliest_in': self.df['in_dttm'].min() if 'in_dttm' in self.df.columns else None,
            'latest_in': self.df['in_dttm'].max() if 'in_dttm' in self.df.columns else None,
            'earliest_out': self.df['out_dttm'].min() if 'out_dttm' in self.df.columns else None,
            'latest_out': self.df['out_dttm'].max() if 'out_dttm' in self.df.columns else None
        }
    }

    return stats

Hospitalization

clifpy.tables.hospitalization.Hospitalization

Hospitalization(
    data_directory=None,
    filetype=None,
    timezone="UTC",
    output_directory=None,
    data=None,
)

Bases: BaseTable

Hospitalization table wrapper inheriting from BaseTable.

This class handles hospitalization-specific data and validations while leveraging the common functionality provided by BaseTable.

Initialize the hospitalization table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/hospitalization.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the hospitalization table.

    Parameters:
        data_directory (str): Path to the directory containing data files
        filetype (str): Type of data file (csv, parquet, etc.)
        timezone (str): Timezone for datetime columns
        output_directory (str, optional): Directory for saving output files and logs
        data (pd.DataFrame, optional): Pre-loaded data to use instead of loading from file
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: hospitalization(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

calculate_length_of_stay

calculate_length_of_stay()

Calculate length of stay for each hospitalization and return DataFrame with LOS column.

Source code in clifpy/tables/hospitalization.py
def calculate_length_of_stay(self) -> pd.DataFrame:
    """Calculate length of stay for each hospitalization and return DataFrame with LOS column."""
    if self.df is None:
        return pd.DataFrame()

    required_cols = ['admission_dttm', 'discharge_dttm']
    if not all(col in self.df.columns for col in required_cols):
        print(f"Missing required columns: {[col for col in required_cols if col not in self.df.columns]}")
        return pd.DataFrame()

    df_copy = self.df.copy()
    df_copy['admission_dttm'] = pd.to_datetime(df_copy['admission_dttm'])
    df_copy['discharge_dttm'] = pd.to_datetime(df_copy['discharge_dttm'])

    # Calculate LOS in days
    df_copy['length_of_stay_days'] = (df_copy['discharge_dttm'] - df_copy['admission_dttm']).dt.total_seconds() / (24 * 3600)

    return df_copy

get_mortality_rate

get_mortality_rate()

Calculate in-hospital mortality rate.

Source code in clifpy/tables/hospitalization.py
def get_mortality_rate(self) -> float:
    """Calculate in-hospital mortality rate."""
    if self.df is None or 'discharge_category' not in self.df.columns:
        return 0.0

    total_hospitalizations = len(self.df)
    if total_hospitalizations == 0:
        return 0.0

    expired_count = len(self.df[self.df['discharge_category'] == 'Expired'])
    return (expired_count / total_hospitalizations) * 100

get_patient_hospitalization_counts

get_patient_hospitalization_counts()

Return DataFrame with hospitalization counts per patient.

Source code in clifpy/tables/hospitalization.py
def get_patient_hospitalization_counts(self) -> pd.DataFrame:
    """Return DataFrame with hospitalization counts per patient."""
    if self.df is None or 'patient_id' not in self.df.columns:
        return pd.DataFrame()

    patient_counts = (self.df.groupby('patient_id')
                     .agg({
                         'hospitalization_id': 'count',
                         'admission_dttm': ['min', 'max']
                     })
                     .reset_index())

    # Flatten column names
    patient_counts.columns = ['patient_id', 'hospitalization_count', 'first_admission', 'last_admission']

    # Calculate span of care
    patient_counts['first_admission'] = pd.to_datetime(patient_counts['first_admission'])
    patient_counts['last_admission'] = pd.to_datetime(patient_counts['last_admission'])
    patient_counts['care_span_days'] = (patient_counts['last_admission'] - patient_counts['first_admission']).dt.total_seconds() / (24 * 3600)

    return patient_counts.sort_values('hospitalization_count', ascending=False)

get_summary_stats

get_summary_stats()

Return comprehensive summary statistics for hospitalization data.

Source code in clifpy/tables/hospitalization.py
def get_summary_stats(self) -> Dict:
    """Return comprehensive summary statistics for hospitalization data."""
    if self.df is None:
        return {}

    stats = {
        'total_hospitalizations': len(self.df),
        'unique_patients': self.df['patient_id'].nunique() if 'patient_id' in self.df.columns else 0,
        'discharge_category_counts': self.df['discharge_category'].value_counts().to_dict() if 'discharge_category' in self.df.columns else {},
        'admission_type_counts': self.df['admission_type_category'].value_counts().to_dict() if 'admission_type_category' in self.df.columns else {},
        'date_range': {
            'earliest_admission': self.df['admission_dttm'].min() if 'admission_dttm' in self.df.columns else None,
            'latest_admission': self.df['admission_dttm'].max() if 'admission_dttm' in self.df.columns else None,
            'earliest_discharge': self.df['discharge_dttm'].min() if 'discharge_dttm' in self.df.columns else None,
            'latest_discharge': self.df['discharge_dttm'].max() if 'discharge_dttm' in self.df.columns else None
        }
    }

    # Age statistics
    if 'age_at_admission' in self.df.columns:
        age_data = self.df['age_at_admission'].dropna()
        if not age_data.empty:
            stats['age_stats'] = {
                'mean': round(age_data.mean(), 1),
                'median': age_data.median(),
                'min': age_data.min(),
                'max': age_data.max(),
                'std': round(age_data.std(), 1)
            }

    # Length of stay statistics
    if all(col in self.df.columns for col in ['admission_dttm', 'discharge_dttm']):
        los_df = self.calculate_length_of_stay()
        if 'length_of_stay_days' in los_df.columns:
            los_data = los_df['length_of_stay_days'].dropna()
            if not los_data.empty:
                stats['length_of_stay_stats'] = {
                    'mean_days': round(los_data.mean(), 1),
                    'median_days': round(los_data.median(), 1),
                    'min_days': round(los_data.min(), 1),
                    'max_days': round(los_data.max(), 1),
                    'std_days': round(los_data.std(), 1)
                }

    # Mortality rate
    stats['mortality_rate_percent'] = round(self.get_mortality_rate(), 2)

    return stats

Labs

clifpy.tables.labs.Labs

Labs(
    data_directory=None,
    filetype=None,
    timezone="UTC",
    output_directory=None,
    data=None,
)

Bases: BaseTable

Labs table wrapper inheriting from BaseTable.

This class handles laboratory data and validations including reference unit validation while leveraging the common functionality provided by BaseTable.

Initialize the labs table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/labs.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the labs table.

    Parameters:
        data_directory (str): Path to the directory containing data files
        filetype (str): Type of data file (csv, parquet, etc.)
        timezone (str): Timezone for datetime columns
        output_directory (str, optional): Directory for saving output files and logs
        data (pd.DataFrame, optional): Pre-loaded data to use instead of loading from file
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: labs(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    # Initialize lab reference units
    self._lab_reference_units = None

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

    # Load lab-specific schema data
    self._load_labs_schema_data()

lab_reference_units property

lab_reference_units

Get the lab reference units mapping from the schema.

get_lab_category_stats

get_lab_category_stats()

Return summary statistics for each lab category, including missingness and unique hospitalization_id counts.

Source code in clifpy/tables/labs.py
def get_lab_category_stats(self) -> pd.DataFrame:
    """Return summary statistics for each lab category, including missingness and unique hospitalization_id counts."""
    if (
        self.df is None
        or 'lab_value_numeric' not in self.df.columns
        or 'hospitalization_id' not in self.df.columns        # remove this line if hosp-id is optional
    ):
        return {"status": "Missing columns"}

    stats = (
        self.df
        .groupby('lab_category')
        .agg(
            count=('lab_value_numeric', 'count'),
            unique=('hospitalization_id', 'nunique'),
            missing_pct=('lab_value_numeric', lambda x: 100 * x.isna().mean()),
            mean=('lab_value_numeric', 'mean'),
            std=('lab_value_numeric', 'std'),
            min=('lab_value_numeric', 'min'),
            q1=('lab_value_numeric', lambda x: x.quantile(0.25)),
            median=('lab_value_numeric', 'median'),
            q3=('lab_value_numeric', lambda x: x.quantile(0.75)),
            max=('lab_value_numeric', 'max'),
        )
        .round(2)
    )

    return stats

get_lab_specimen_stats

get_lab_specimen_stats()

Return summary statistics for each lab category, including missingness and unique hospitalization_id counts.

Source code in clifpy/tables/labs.py
def get_lab_specimen_stats(self) -> pd.DataFrame:
    """Return summary statistics for each lab category, including missingness and unique hospitalization_id counts."""
    if (
        self.df is None
        or 'lab_value_numeric' not in self.df.columns
        or 'hospitalization_id' not in self.df.columns 
        or 'lab_speciment_category' not in self.df.columns       # remove this line if hosp-id is optional
    ):
        return {"status": "Missing columns"}

    stats = (
        self.df
        .groupby('lab_specimen_category')
        .agg(
            count=('lab_value_numeric', 'count'),
            unique=('hospitalization_id', 'nunique'),
            missing_pct=('lab_value_numeric', lambda x: 100 * x.isna().mean()),
            mean=('lab_value_numeric', 'mean'),
            std=('lab_value_numeric', 'std'),
            min=('lab_value_numeric', 'min'),
            q1=('lab_value_numeric', lambda x: x.quantile(0.25)),
            median=('lab_value_numeric', 'median'),
            q3=('lab_value_numeric', lambda x: x.quantile(0.75)),
            max=('lab_value_numeric', 'max'),
        )
        .round(2)
    )

    return stats

Vitals

clifpy.tables.vitals.Vitals

Vitals(
    data_directory=None,
    filetype=None,
    timezone="UTC",
    output_directory=None,
    data=None,
)

Bases: BaseTable

Vitals table wrapper inheriting from BaseTable.

This class handles vitals-specific data and validations including range validation for vital signs.

Initialize the vitals table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/vitals.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the vitals table.

    Parameters:
        data_directory (str): Path to the directory containing data files
        filetype (str): Type of data file (csv, parquet, etc.)
        timezone (str): Timezone for datetime columns
        output_directory (str, optional): Directory for saving output files and logs
        data (pd.DataFrame, optional): Pre-loaded data to use instead of loading from file
    """
    # Initialize range validation errors list
    self.range_validation_errors: List[dict] = []

    # Load vital ranges and units from schema
    self._vital_units = None
    self._vital_ranges = None

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

    # Load vital-specific schema data
    self._load_vitals_schema_data()

vital_ranges property

vital_ranges

Get the vital ranges from the schema.

vital_units property

vital_units

Get the vital units mapping from the schema.

filter_by_vital_category

filter_by_vital_category(vital_category)

Return all records for a specific vital category (e.g., 'heart_rate', 'temp_c').

Source code in clifpy/tables/vitals.py
def filter_by_vital_category(self, vital_category: str) -> pd.DataFrame:
    """Return all records for a specific vital category (e.g., 'heart_rate', 'temp_c')."""
    if self.df is None or 'vital_category' not in self.df.columns:
        return pd.DataFrame()

    return self.df[self.df['vital_category'] == vital_category].copy()

get_vital_summary_stats

get_vital_summary_stats()

Return summary statistics for each vital category.

Source code in clifpy/tables/vitals.py
def get_vital_summary_stats(self) -> pd.DataFrame:
    """Return summary statistics for each vital category."""
    if self.df is None or 'vital_value' not in self.df.columns:
        return pd.DataFrame()

    # Convert vital_value to numeric
    df_copy = self.df.copy()
    df_copy['vital_value'] = pd.to_numeric(df_copy['vital_value'], errors='coerce')

    # Group by vital category and calculate stats
    stats = df_copy.groupby('vital_category')['vital_value'].agg([
        'count', 'mean', 'std', 'min', 'max',
        ('q1', lambda x: x.quantile(0.25)),
        ('median', lambda x: x.quantile(0.5)),
        ('q3', lambda x: x.quantile(0.75))
    ]).round(2)

    return stats

isvalid

isvalid()

Return True if the last validation finished without errors.

Source code in clifpy/tables/vitals.py
def isvalid(self) -> bool:
    """Return ``True`` if the last validation finished without errors."""
    return not self.errors and not self.range_validation_errors

validate_vital_ranges

validate_vital_ranges()

Validate vital values against expected ranges using grouped data for efficiency.

Source code in clifpy/tables/vitals.py
def validate_vital_ranges(self):
    """Validate vital values against expected ranges using grouped data for efficiency."""
    self.range_validation_errors = []

    if self.df is None or not self._vital_ranges:
        return

    required_columns = ['vital_category', 'vital_value']
    required_columns_for_df = ['vital_category', 'vital_value']
    if not all(col in self.df.columns for col in required_columns_for_df):
        self.range_validation_errors.append({
            "error_type": "missing_columns_for_range_validation",
            "columns": [col for col in required_columns_for_df if col not in self.df.columns],
            "message": "vital_category or vital_value column missing, cannot perform range validation."
        })
        return

    # Work on a copy to safely convert vital_value to numeric for aggregation
    df_for_stats = self.df[required_columns_for_df].copy()
    df_for_stats['vital_value'] = pd.to_numeric(df_for_stats['vital_value'], errors='coerce')

    # Filter out rows where vital_value could not be converted
    df_for_stats.dropna(subset=['vital_value'], inplace=True)

    if df_for_stats.empty:
        # No numeric vital_value data to perform range validation on
        return

    vital_stats = (df_for_stats
                   .groupby('vital_category')['vital_value']
                   .agg(['min', 'max', 'mean', 'count'])
                   .reset_index())

    if vital_stats.empty:
        return

    # Check each vital category's ranges
    for _, row in vital_stats.iterrows():
        vital_category = row['vital_category']
        min_val = row['min']
        max_val = row['max']
        count = row['count']
        mean_val = row['mean']

        # Check if vital category has defined ranges
        if vital_category not in self._vital_ranges:
            self.range_validation_errors.append({
                'error_type': 'unknown_vital_category',
                'vital_category': vital_category,
                'affected_rows': count,
                'observed_min': min_val,
                'observed_max': max_val,
                'message': f"Unknown vital category '{vital_category}' found in data."
            })
            continue

        expected_range = self._vital_ranges[vital_category]
        expected_min = expected_range.get('min')
        expected_max = expected_range.get('max')

        # Check if any values are outside the expected range
        if expected_min is not None and min_val < expected_min:
            self.range_validation_errors.append({
                'error_type': 'below_range',
                'vital_category': vital_category,
                'observed_min': min_val,
                'expected_min': expected_min,
                'message': f"Values below expected minimum for {vital_category}"
            })

        if expected_max is not None and max_val > expected_max:
            self.range_validation_errors.append({
                'error_type': 'above_range',
                'vital_category': vital_category,
                'observed_max': max_val,
                'expected_max': expected_max,
                'message': f"Values above expected maximum for {vital_category}"
            })

    # Add range validation errors to main errors list
    if self.range_validation_errors:
        self.errors.extend(self.range_validation_errors)
        self.logger.warning(f"Found {len(self.range_validation_errors)} range validation errors")

Respiratory Support

clifpy.tables.respiratory_support.RespiratorySupport

RespiratorySupport(
    data_directory=None,
    filetype=None,
    timezone="UTC",
    output_directory=None,
    data=None,
)

Bases: BaseTable

Respiratory support table wrapper inheriting from BaseTable.

This class handles respiratory support data and validations while leveraging the common functionality provided by BaseTable.

Initialize the respiratory_support table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/respiratory_support.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the respiratory_support table.

    Parameters:
        data_directory (str): Path to the directory containing data files
        filetype (str): Type of data file (csv, parquet, etc.)
        timezone (str): Timezone for datetime columns
        output_directory (str, optional): Directory for saving output files and logs
        data (pd.DataFrame, optional): Pre-loaded data to use instead of loading from file
    """

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

waterfall

waterfall(
    *,
    id_col="hospitalization_id",
    bfill=False,
    verbose=True,
    return_dataframe=False
)

Clean + waterfall-fill the respiratory_support table.

Parameters:

Name Type Description Default
id_col str

Encounter-level identifier column (default: hospitalization_id)

'hospitalization_id'
bfill bool

If True, numeric setters are back-filled after forward-fill

False
verbose bool

Print progress messages

True
return_dataframe bool

If True, returns DataFrame instead of RespiratorySupport instance

False

Returns:

Name Type Description
RespiratorySupport Union[RespiratorySupport, DataFrame]

New instance with processed data (or DataFrame if return_dataframe=True)

Note

The waterfall function expects data in UTC timezone. If your data is in a different timezone, it will be converted to UTC for processing. The original object is not modified; a new instance is returned.

Example

processed = resp_support.waterfall() processed.validate() # Can run validation on processed data df = processed.df # Access the DataFrame

Source code in clifpy/tables/respiratory_support.py
def waterfall(
    self,
    *,
    id_col: str = "hospitalization_id",
    bfill: bool = False,
    verbose: bool = True,
    return_dataframe: bool = False
) -> Union['RespiratorySupport', pd.DataFrame]:
    """
    Clean + waterfall-fill the respiratory_support table.

    Parameters:
        id_col (str): Encounter-level identifier column (default: hospitalization_id)
        bfill (bool): If True, numeric setters are back-filled after forward-fill
        verbose (bool): Print progress messages
        return_dataframe (bool): If True, returns DataFrame instead of RespiratorySupport instance

    Returns:
        RespiratorySupport: New instance with processed data (or DataFrame if return_dataframe=True)

    Note:
        The waterfall function expects data in UTC timezone. If your data is in a 
        different timezone, it will be converted to UTC for processing.
        The original object is not modified; a new instance is returned.

    Example:
        >>> processed = resp_support.waterfall()
        >>> processed.validate()  # Can run validation on processed data
        >>> df = processed.df     # Access the DataFrame
    """
    if self.df is None or self.df.empty:
        raise ValueError("No data available to process. Load data first.")

    # Create a copy to avoid modifying the original data
    df_copy = self.df.copy()

    # Convert to UTC if the recorded_dttm column has timezone info
    if 'recorded_dttm' in df_copy.columns and df_copy['recorded_dttm'].dt.tz is not None:
        original_tz = df_copy['recorded_dttm'].dt.tz
        df_copy['recorded_dttm'] = df_copy['recorded_dttm'].dt.tz_convert('UTC')
        if verbose:
            print(f"Converting timezone from {original_tz} to UTC for waterfall processing")

    # Use the existing waterfall function
    processed_df = process_resp_support_waterfall(
        df_copy,
        id_col=id_col,
        bfill=bfill,
        verbose=verbose
    )

    # Return DataFrame if requested
    if return_dataframe:
        return processed_df

    # Otherwise, create a new RespiratorySupport instance with processed data
    return RespiratorySupport(
        data_directory=self.data_directory,
        filetype=self.filetype,
        timezone=self.timezone,
        output_directory=self.output_directory,
        data=processed_df
    )

Medication Administration (Continuous)

clifpy.tables.medication_admin_continuous.MedicationAdminContinuous

MedicationAdminContinuous(
    data_directory=None,
    filetype=None,
    timezone="UTC",
    output_directory=None,
    data=None,
)

Bases: BaseTable

Medication administration continuous table wrapper inheriting from BaseTable.

This class handles medication administration continuous data and validations while leveraging the common functionality provided by BaseTable.

Initialize the MedicationAdminContinuous table.

This class handles continuous medication administration data, including validation, dose unit standardization, and unit conversion capabilities.

Parameters

data_directory : str, optional Path to the directory containing data files. If None and data is provided, defaults to current directory. filetype : str, optional Type of data file (csv, parquet, etc.). If None and data is provided, defaults to 'parquet'. timezone : str, default="UTC" Timezone for datetime columns. Used for proper timestamp handling. output_directory : str, optional Directory for saving output files and logs. If not specified, outputs are saved to the current working directory. data : pd.DataFrame, optional Pre-loaded DataFrame to use instead of loading from file. Supports backward compatibility with direct DataFrame initialization.

Notes

The class supports two initialization patterns: 1. Loading from file: provide data_directory and filetype 2. Direct DataFrame: provide data parameter (legacy support)

Upon initialization, the class loads medication schema data including category-to-group mappings from the YAML schema.

Source code in clifpy/tables/medication_admin_continuous.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the MedicationAdminContinuous table.

    This class handles continuous medication administration data, including validation,
    dose unit standardization, and unit conversion capabilities.

    Parameters
    ----------
    data_directory : str, optional
        Path to the directory containing data files. If None and data is provided,
        defaults to current directory.
    filetype : str, optional
        Type of data file (csv, parquet, etc.). If None and data is provided,
        defaults to 'parquet'.
    timezone : str, default="UTC"
        Timezone for datetime columns. Used for proper timestamp handling.
    output_directory : str, optional
        Directory for saving output files and logs. If not specified, outputs
        are saved to the current working directory.
    data : pd.DataFrame, optional
        Pre-loaded DataFrame to use instead of loading from file. Supports
        backward compatibility with direct DataFrame initialization.

    Notes
    -----
    The class supports two initialization patterns:
    1. Loading from file: provide data_directory and filetype
    2. Direct DataFrame: provide data parameter (legacy support)

    Upon initialization, the class loads medication schema data including
    category-to-group mappings from the YAML schema.
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: medication_admin_continuous(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    # Load medication mappings
    self._med_category_to_group = None

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

    # Load medication-specific schema data
    self._load_medication_schema_data()

med_category_to_group_mapping property

med_category_to_group_mapping

Get the medication category to group mapping from the schema.

Returns

Dict[str, str] A dictionary mapping medication categories to their therapeutic groups. Returns a copy to prevent external modification of the internal mapping. Returns an empty dict if no mappings are loaded.

Examples

mac = MedicationAdminContinuous(data) mappings = mac.med_category_to_group_mapping mappings['Antibiotics'] 'Antimicrobials'

convert_dose_to_limited_units

convert_dose_to_limited_units(vitals_df, med_df=None)

Convert medication doses to standardized units per minute.

This method converts all medication doses to one of three standard units: - mcg/min for mass-based medications - ml/min for volume-based medications
- units/min for unit-based medications

The conversion handles different time scales (per hour vs per minute) and weight-based dosing (per kg) by incorporating patient weights from vitals.

Parameters

vitals_df : pd.DataFrame DataFrame containing patient vital signs, must include: - hospitalization_id: Patient identifier - recorded_dttm: Timestamp of vital recording - vital_category: Type of vital (looks for 'weight_kg') - vital_value: Numeric value of the vital med_df : pd.DataFrame, optional DataFrame containing medication administration data. If None, uses self.df. Required columns: - hospitalization_id: Patient identifier - admin_dttm: Medication administration timestamp - med_dose_unit: Original dose unit (case-insensitive) - med_dose: Original dose value - med_category: Medication category (used for SQL query) Optional columns: - weight_kg: Patient weight; if absent, pulled from vitals_df

Returns

pd.DataFrame Original med_df with additional columns: - med_dose_unit_clean: Standardized unit pattern - weight_kg: Patient weight used for conversion (if applicable) - med_dose_converted: Dose value in standardized units - med_dose_unit_converted: Standardized unit ('mcg/min', 'ml/min', or 'units/min') - Additional calculation columns (time_multiplier, pt_weight_multiplier, amount_multiplier)

Raises

ValueError If med_df is None and self.df is also None, or if required columns are missing.

Warnings

Logs warnings for unrecognized dose units that cannot be converted.

Notes
  • Weight-based dosing (/kg) uses the most recent weight prior to administration
  • Unrecognized dose units result in NULL converted values
  • The conversion preserves the original columns and adds new ones
Examples

vitals = pd.DataFrame({ ... 'hospitalization_id': ['H001'], ... 'recorded_dttm': pd.to_datetime(['2023-01-01']), ... 'vital_category': ['weight_kg'], ... 'vital_value': [70.0] ... }) meds = pd.DataFrame({ ... 'hospitalization_id': ['H001'], ... 'admin_dttm': pd.to_datetime(['2023-01-02']), ... 'med_dose': [5.0], ... 'med_dose_unit': ['mcg/kg/hr'], ... 'med_category': ['Vasopressors'] ... }) result = mac.convert_dose_to_limited_units(vitals, meds) result['med_dose_converted'].iloc[0] 5.833333... # 5 * 70 / 60 (mcg/kg/hr to mcg/min with 70kg patient)

Source code in clifpy/tables/medication_admin_continuous.py
def convert_dose_to_limited_units(self, vitals_df: pd.DataFrame, med_df: pd.DataFrame = None) -> pd.DataFrame:
    """
    Convert medication doses to standardized units per minute.

    This method converts all medication doses to one of three standard units:
    - mcg/min for mass-based medications
    - ml/min for volume-based medications  
    - units/min for unit-based medications

    The conversion handles different time scales (per hour vs per minute) and
    weight-based dosing (per kg) by incorporating patient weights from vitals.

    Parameters
    ----------
    vitals_df : pd.DataFrame
        DataFrame containing patient vital signs, must include:
        - hospitalization_id: Patient identifier
        - recorded_dttm: Timestamp of vital recording
        - vital_category: Type of vital (looks for 'weight_kg')
        - vital_value: Numeric value of the vital
    med_df : pd.DataFrame, optional
        DataFrame containing medication administration data. If None, uses self.df.
        Required columns:
        - hospitalization_id: Patient identifier
        - admin_dttm: Medication administration timestamp
        - med_dose_unit: Original dose unit (case-insensitive)
        - med_dose: Original dose value
        - med_category: Medication category (used for SQL query)
        Optional columns:
        - weight_kg: Patient weight; if absent, pulled from vitals_df

    Returns
    -------
    pd.DataFrame
        Original med_df with additional columns:
        - med_dose_unit_clean: Standardized unit pattern
        - weight_kg: Patient weight used for conversion (if applicable)
        - med_dose_converted: Dose value in standardized units
        - med_dose_unit_converted: Standardized unit ('mcg/min', 'ml/min', or 'units/min')
        - Additional calculation columns (time_multiplier, pt_weight_multiplier, amount_multiplier)

    Raises
    ------
    ValueError
        If med_df is None and self.df is also None, or if required columns are missing.

    Warnings
    --------
    Logs warnings for unrecognized dose units that cannot be converted.

    Notes
    -----
    - Weight-based dosing (/kg) uses the most recent weight prior to administration
    - Unrecognized dose units result in NULL converted values
    - The conversion preserves the original columns and adds new ones

    Examples
    --------
    >>> vitals = pd.DataFrame({
    ...     'hospitalization_id': ['H001'],
    ...     'recorded_dttm': pd.to_datetime(['2023-01-01']),
    ...     'vital_category': ['weight_kg'],
    ...     'vital_value': [70.0]
    ... })
    >>> meds = pd.DataFrame({
    ...     'hospitalization_id': ['H001'],
    ...     'admin_dttm': pd.to_datetime(['2023-01-02']),
    ...     'med_dose': [5.0],
    ...     'med_dose_unit': ['mcg/kg/hr'],
    ...     'med_category': ['Vasopressors']
    ... })
    >>> result = mac.convert_dose_to_limited_units(vitals, meds)
    >>> result['med_dose_converted'].iloc[0]
    5.833333...  # 5 * 70 / 60 (mcg/kg/hr to mcg/min with 70kg patient)
    """
    if med_df is None:
        med_df = self.df
    if med_df is None:
        raise ValueError("No data provided")

    if 'weight_kg' not in med_df.columns:
        self.logger.info("No weight_kg column found, adding the most recent from vitals")
        query = """
        SELECT m.*
            , v.vital_value as weight_kg
            , v.recorded_dttm as weight_recorded_dttm
            , ROW_NUMBER() OVER (
                PARTITION BY m.hospitalization_id, m.admin_dttm, m.med_category
                ORDER BY v.recorded_dttm DESC
                ) as rn
        FROM med_df m
        LEFT JOIN vitals_df v 
            ON m.hospitalization_id = v.hospitalization_id 
            AND v.vital_category = 'weight_kg' AND v.vital_value IS NOT NULL
            AND v.recorded_dttm <= m.admin_dttm  -- only past weights
        -- rn = 1 for the weight w/ the latest recorded_dttm (and thus most recent)
        QUALIFY (rn = 1) 
        ORDER BY m.hospitalization_id, m.admin_dttm, m.med_category, rn
        """
        med_df = duckdb.sql(query).to_df()

    # check if the required columns are present
    required_columns = {'med_dose_unit', 'med_dose', 'weight_kg'}
    missing_columns = required_columns - set(med_df.columns)
    if missing_columns:
        raise ValueError(f"The following column(s) are required but not found: {missing_columns}")

    med_df, unrecognized = self._normalize_dose_unit_pattern(med_df)
    if not unrecognized:
        self.logger.info("No unrecognized dose units found, continuing with conversion")
    else:
        self.logger.warning(f"Unrecognized dose units found: {unrecognized}")

    acceptable_unit_patterns_str = "','".join(self._acceptable_dose_unit_patterns)

    query = f"""
    SELECT *
        , CASE WHEN regexp_matches(med_dose_unit_clean, '/h(r|our)?\\b') THEN 1/60.0
            WHEN regexp_matches(med_dose_unit_clean, '/m(in|inute)?\\b') THEN 1.0
            ELSE NULL END as time_multiplier
        , CASE WHEN contains(med_dose_unit_clean, '/kg/') THEN weight_kg
            ELSE 1 END AS pt_weight_multiplier
        , CASE WHEN contains(med_dose_unit_clean, 'mcg/') THEN 1.0
            WHEN contains(med_dose_unit_clean, 'mg/') THEN 1000.0
            WHEN contains(med_dose_unit_clean, 'ng/') THEN 0.001
            WHEN contains(med_dose_unit_clean, 'milli') THEN 0.001
            WHEN contains(med_dose_unit_clean, 'units/') THEN 1
            WHEN contains(med_dose_unit_clean, 'ml/') THEN 1.0
            WHEN contains(med_dose_unit_clean, 'l/') AND NOT contains(med_dose_unit_clean, 'ml/') THEN 1000.0
            ELSE NULL END as amount_multiplier
        , med_dose * time_multiplier * pt_weight_multiplier * amount_multiplier as med_dose_converted
        , CASE WHEN med_dose_unit_clean NOT IN ('{acceptable_unit_patterns_str}') THEN NULL
            WHEN contains(med_dose_unit_clean, 'units/') THEN 'units/min'
            WHEN contains(med_dose_unit_clean, 'l/') THEN 'ml/min'
            ELSE 'mcg/min' END as med_dose_unit_converted
    FROM med_df
    """
    return duckdb.sql(query).to_df()

Patient Assessments

clifpy.tables.patient_assessments.PatientAssessments

PatientAssessments(
    data_directory=None,
    filetype=None,
    timezone="UTC",
    output_directory=None,
    data=None,
)

Bases: BaseTable

Patient assessments table wrapper inheriting from BaseTable.

This class handles patient assessment data and validations while leveraging the common functionality provided by BaseTable.

Initialize the patient_assessments table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/patient_assessments.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the patient_assessments table.

    Parameters:
        data_directory (str): Path to the directory containing data files
        filetype (str): Type of data file (csv, parquet, etc.)
        timezone (str): Timezone for datetime columns
        output_directory (str, optional): Directory for saving output files and logs
        data (pd.DataFrame, optional): Pre-loaded data to use instead of loading from file
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: patient_assessments(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    # Initialize assessment mappings
    self._assessment_category_to_group = None

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

    # Load assessment-specific schema data
    self._load_assessment_schema_data()

assessment_category_to_group_mapping property

assessment_category_to_group_mapping

Get the assessment category to group mapping from the schema.

Position

clifpy.tables.position.Position

Position(
    data_directory=None,
    filetype=None,
    timezone="UTC",
    output_directory=None,
    data=None,
)

Bases: BaseTable

Position table wrapper inheriting from BaseTable.

This class handles patient position data and validations while leveraging the common functionality provided by BaseTable.

Initialize the position table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/position.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the position table.

    Parameters:
        data_directory (str): Path to the directory containing data files
        filetype (str): Type of data file (csv, parquet, etc.)
        timezone (str): Timezone for datetime columns
        output_directory (str, optional): Directory for saving output files and logs
        data (pd.DataFrame, optional): Pre-loaded data to use instead of loading from file
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: position(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

get_position_category_stats

get_position_category_stats()

Return summary statistics for each position category, including missingness and unique patient counts. Expects columns: 'position_category', 'position_name', and optionally 'hospitalization_id'.

Source code in clifpy/tables/position.py
def get_position_category_stats(self) -> pd.DataFrame:
    """
    Return summary statistics for each position category, including missingness and unique patient counts.
    Expects columns: 'position_category', 'position_name', and optionally 'hospitalization_id'.
    """
    if self.df is None or 'position_category' not in self.df.columns or 'hospitalization_id' not in self.df.columns:
        return {"status": "Missing columns"}

    agg_dict = {
        'count': ('position_category', 'count'),
        'unique': ('hospitalization_id', 'nunique'),
    }

    stats = (
        self.df
        .groupby('position_category')
        .agg(**agg_dict)
        .round(2)
    )

    return stats