Skip to content

Table Classes

Patient

clifpy.tables.patient.Patient

Patient(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Patient table wrapper inheriting from BaseTable.

This class handles patient-specific data and validations while leveraging the common functionality provided by BaseTable.

Initialize the patient table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/patient.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the patient table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

ADT (Admission, Discharge, Transfer)

clifpy.tables.adt.Adt

Adt(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

ADT (Admission/Discharge/Transfer) table wrapper inheriting from BaseTable.

This class handles ADT-specific data and validations while leveraging the common functionality provided by BaseTable.

Initialize the ADT table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/adt.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the ADT table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: adt(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

check_overlapping_admissions

check_overlapping_admissions(save_overlaps=False, overlaps_output_directory=None)

Check for overlapping admissions within the same hospitalization.

Identifies cases where a patient has overlapping stays in different locations within the same hospitalization (i.e., the out_dttm of one location is after the in_dttm of the next location).

Parameters: save_overlaps (bool): If True, save detailed overlap information to CSV. Default is False. overlaps_output_directory (str, optional): Directory for saving the overlaps CSV file. If None, uses the output_directory provided at initialization.

Returns: int: Count of unique hospitalizations that have overlapping admissions

Raises: RuntimeError: If an error occurs during processing

Source code in clifpy/tables/adt.py
def check_overlapping_admissions(self, save_overlaps: bool = False, overlaps_output_directory: Optional[str] = None) -> int:
    """
    Check for overlapping admissions within the same hospitalization.

    Identifies cases where a patient has overlapping stays in different locations
    within the same hospitalization (i.e., the out_dttm of one location is after
    the in_dttm of the next location).

    Parameters:
        save_overlaps (bool): If True, save detailed overlap information to CSV. Default is False.
        overlaps_output_directory (str, optional): Directory for saving the overlaps CSV file. 
            If None, uses the output_directory provided at initialization.

    Returns:
        int: Count of unique hospitalizations that have overlapping admissions

    Raises:
        RuntimeError: If an error occurs during processing
    """
    try:
        if self.df is None:
            return 0

        if 'hospitalization_id' not in self.df.columns:
            error = "hospitalization_id is missing."
            raise ValueError(error)

        # Sort by hospitalization_id and in_dttm to make comparisons easier
        data = self.df.sort_values(by=['hospitalization_id', 'in_dttm'])

        overlaps = []
        overlapping_hospitalizations = set()

        # Group by hospitalization_id to compare bookings for each hospitalization
        for hospitalization_id, group in data.groupby('hospitalization_id'):
            for i in range(len(group) - 1):
                # Current and next bookings
                current = group.iloc[i]
                next = group.iloc[i + 1]

                # Check if the locations are different and times overlap
                if (
                    current['location_name'] != next['location_name'] and
                    current['out_dttm'] > next['in_dttm']
                ):
                    overlapping_hospitalizations.add(hospitalization_id)

                    if save_overlaps:
                        overlaps.append({
                            'hospitalization_id': hospitalization_id,
                            'Initial Location': current['location_name'],
                            'Initial Location Category': current['location_category'],
                            'Overlapping Location': next['location_name'],
                            'Overlapping Location Category': next['location_category'],
                            'Admission Start': current['in_dttm'],
                            'Admission End': current['out_dttm'],
                            'Next Admission Start': next['in_dttm']
                        })

        # Save overlaps to CSV if requested
        if save_overlaps and overlaps:
            overlaps_df = pd.DataFrame(overlaps)
            # Determine the directory to save the overlaps file
            save_dir = overlaps_output_directory if overlaps_output_directory is not None else self.output_directory
            if save_dir is not None:
                os.makedirs(save_dir, exist_ok=True)
                file_path = os.path.join(save_dir, 'overlapping_admissions.csv')
                overlaps_df.to_csv(file_path, index=False)
            else:
                # Fallback to original method if no directory is specified
                self.save_dataframe(overlaps_df, 'overlapping_admissions')

        return len(overlapping_hospitalizations)

    except Exception as e:
        # Handle errors gracefully
        raise RuntimeError(f"Error checking time overlap: {str(e)}")

Hospitalization

clifpy.tables.hospitalization.Hospitalization

Hospitalization(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Hospitalization table wrapper inheriting from BaseTable.

This class handles hospitalization-specific data and validations while leveraging the common functionality provided by BaseTable.

Initialize the hospitalization table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/hospitalization.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the hospitalization table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: hospitalization(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

calculate_length_of_stay

calculate_length_of_stay()

Calculate length of stay for each hospitalization and return DataFrame with LOS column.

Source code in clifpy/tables/hospitalization.py
def calculate_length_of_stay(self) -> pd.DataFrame:
    """Calculate length of stay for each hospitalization and return DataFrame with LOS column."""
    if self.df is None:
        return pd.DataFrame()

    required_cols = ['admission_dttm', 'discharge_dttm']
    if not all(col in self.df.columns for col in required_cols):
        print(f"Missing required columns: {[col for col in required_cols if col not in self.df.columns]}")
        return pd.DataFrame()

    df_copy = self.df.copy()
    df_copy['admission_dttm'] = pd.to_datetime(df_copy['admission_dttm'])
    df_copy['discharge_dttm'] = pd.to_datetime(df_copy['discharge_dttm'])

    # Calculate LOS in days
    df_copy['length_of_stay_days'] = (df_copy['discharge_dttm'] - df_copy['admission_dttm']).dt.total_seconds() / (24 * 3600)

    return df_copy

get_mortality_rate

get_mortality_rate()

Calculate in-hospital mortality rate.

Source code in clifpy/tables/hospitalization.py
def get_mortality_rate(self) -> float:
    """Calculate in-hospital mortality rate."""
    if self.df is None or 'discharge_category' not in self.df.columns:
        return 0.0

    total_hospitalizations = len(self.df)
    if total_hospitalizations == 0:
        return 0.0

    expired_count = len(self.df[self.df['discharge_category'] == 'Expired'])
    return (expired_count / total_hospitalizations) * 100

get_patient_hospitalization_counts

get_patient_hospitalization_counts()

Return DataFrame with hospitalization counts per patient.

Source code in clifpy/tables/hospitalization.py
def get_patient_hospitalization_counts(self) -> pd.DataFrame:
    """Return DataFrame with hospitalization counts per patient."""
    if self.df is None or 'patient_id' not in self.df.columns:
        return pd.DataFrame()

    patient_counts = (self.df.groupby('patient_id')
                     .agg({
                         'hospitalization_id': 'count',
                         'admission_dttm': ['min', 'max']
                     })
                     .reset_index())

    # Flatten column names
    patient_counts.columns = ['patient_id', 'hospitalization_count', 'first_admission', 'last_admission']

    # Calculate span of care
    patient_counts['first_admission'] = pd.to_datetime(patient_counts['first_admission'])
    patient_counts['last_admission'] = pd.to_datetime(patient_counts['last_admission'])
    patient_counts['care_span_days'] = (patient_counts['last_admission'] - patient_counts['first_admission']).dt.total_seconds() / (24 * 3600)

    return patient_counts.sort_values('hospitalization_count', ascending=False)

get_summary_stats

get_summary_stats()

Return comprehensive summary statistics for hospitalization data.

Source code in clifpy/tables/hospitalization.py
def get_summary_stats(self) -> Dict:
    """Return comprehensive summary statistics for hospitalization data."""
    if self.df is None:
        return {}

    stats = {
        'total_hospitalizations': len(self.df),
        'unique_patients': self.df['patient_id'].nunique() if 'patient_id' in self.df.columns else 0,
        'discharge_category_counts': self.df['discharge_category'].value_counts().to_dict() if 'discharge_category' in self.df.columns else {},
        'admission_type_counts': self.df['admission_type_category'].value_counts().to_dict() if 'admission_type_category' in self.df.columns else {},
        'date_range': {
            'earliest_admission': self.df['admission_dttm'].min() if 'admission_dttm' in self.df.columns else None,
            'latest_admission': self.df['admission_dttm'].max() if 'admission_dttm' in self.df.columns else None,
            'earliest_discharge': self.df['discharge_dttm'].min() if 'discharge_dttm' in self.df.columns else None,
            'latest_discharge': self.df['discharge_dttm'].max() if 'discharge_dttm' in self.df.columns else None
        }
    }

    # Age statistics
    if 'age_at_admission' in self.df.columns:
        age_data = self.df['age_at_admission'].dropna()
        if not age_data.empty:
            stats['age_stats'] = {
                'mean': round(age_data.mean(), 1),
                'median': age_data.median(),
                'min': age_data.min(),
                'max': age_data.max(),
                'std': round(age_data.std(), 1)
            }

    # Length of stay statistics
    if all(col in self.df.columns for col in ['admission_dttm', 'discharge_dttm']):
        los_df = self.calculate_length_of_stay()
        if 'length_of_stay_days' in los_df.columns:
            los_data = los_df['length_of_stay_days'].dropna()
            if not los_data.empty:
                stats['length_of_stay_stats'] = {
                    'mean_days': round(los_data.mean(), 1),
                    'median_days': round(los_data.median(), 1),
                    'min_days': round(los_data.min(), 1),
                    'max_days': round(los_data.max(), 1),
                    'std_days': round(los_data.std(), 1)
                }

    # Mortality rate
    stats['mortality_rate_percent'] = round(self.get_mortality_rate(), 2)

    return stats

Labs

clifpy.tables.labs.Labs

Labs(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Labs table wrapper inheriting from BaseTable.

This class handles laboratory data and validations including reference unit validation while leveraging the common functionality provided by BaseTable.

Initialize the labs table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/labs.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the labs table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: labs(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    # Initialize lab reference units
    self._lab_reference_units = None

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

    # Load lab-specific schema data
    self._load_labs_schema_data()

lab_reference_units property

lab_reference_units

Get the lab reference units mapping from the schema.

get_lab_category_stats

get_lab_category_stats()

Return summary statistics for each lab category, including missingness and unique hospitalization_id counts.

Source code in clifpy/tables/labs.py
def get_lab_category_stats(self) -> pd.DataFrame:
    """Return summary statistics for each lab category, including missingness and unique hospitalization_id counts."""
    if (
        self.df is None
        or 'lab_value_numeric' not in self.df.columns
        or 'hospitalization_id' not in self.df.columns        # remove this line if hosp-id is optional
    ):
        return {"status": "Missing columns"}

    stats = (
        self.df
        .groupby('lab_category')
        .agg(
            count=('lab_value_numeric', 'count'),
            unique=('hospitalization_id', 'nunique'),
            missing_pct=('lab_value_numeric', lambda x: 100 * x.isna().mean()),
            mean=('lab_value_numeric', 'mean'),
            std=('lab_value_numeric', 'std'),
            min=('lab_value_numeric', 'min'),
            q1=('lab_value_numeric', lambda x: x.quantile(0.25)),
            median=('lab_value_numeric', 'median'),
            q3=('lab_value_numeric', lambda x: x.quantile(0.75)),
            max=('lab_value_numeric', 'max'),
        )
        .round(2)
    )

    return stats

get_lab_specimen_stats

get_lab_specimen_stats()

Return summary statistics for each lab category, including missingness and unique hospitalization_id counts.

Source code in clifpy/tables/labs.py
def get_lab_specimen_stats(self) -> pd.DataFrame:
    """Return summary statistics for each lab category, including missingness and unique hospitalization_id counts."""
    if (
        self.df is None
        or 'lab_value_numeric' not in self.df.columns
        or 'hospitalization_id' not in self.df.columns 
        or 'lab_speciment_category' not in self.df.columns       # remove this line if hosp-id is optional
    ):
        return {"status": "Missing columns"}

    stats = (
        self.df
        .groupby('lab_specimen_category')
        .agg(
            count=('lab_value_numeric', 'count'),
            unique=('hospitalization_id', 'nunique'),
            missing_pct=('lab_value_numeric', lambda x: 100 * x.isna().mean()),
            mean=('lab_value_numeric', 'mean'),
            std=('lab_value_numeric', 'std'),
            min=('lab_value_numeric', 'min'),
            q1=('lab_value_numeric', lambda x: x.quantile(0.25)),
            median=('lab_value_numeric', 'median'),
            q3=('lab_value_numeric', lambda x: x.quantile(0.75)),
            max=('lab_value_numeric', 'max'),
        )
        .round(2)
    )

    return stats

Microbiology Culture

clifpy.tables.microbiology_culture.MicrobiologyCulture

MicrobiologyCulture(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Microbiology Culture table wrapper inheriting from BaseTable.

This class handles microbiology culture-specific data and validations including organism identification validation and culture method validation.

Initialize the microbiology culture table.

Parameters: data_directory (str): Path to the directory containing data files filetype (str): Type of data file (csv, parquet, etc.) timezone (str): Timezone for datetime columns output_directory (str, optional): Directory for saving output files and logs data (pd.DataFrame, optional): Pre-loaded data to use instead of loading from file

Source code in clifpy/tables/microbiology_culture.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the microbiology culture table.

    Parameters:
        data_directory (str): Path to the directory containing data files
        filetype (str): Type of data file (csv, parquet, etc.)
        timezone (str): Timezone for datetime columns
        output_directory (str, optional): Directory for saving output files and logs
        data (pd.DataFrame, optional): Pre-loaded data to use instead of loading from file
    """
    # Initialize time order validation errors list
    self.time_order_validation_errors: List[dict] = []

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

cat_vs_name_map staticmethod

cat_vs_name_map(df, category_col, name_col, *, group_col=None, dropna=True, sort='freq_then_alpha', max_names_per_cat=None, include_counts=False)

Build mappings from category→names (2-level) or group→category→names (3-level).

Returns: - if group_col is None: { category: [names...] } or { category: [{"name":..., "n":...}, ...] } - if group_col is provided: { group: { category: [names...] } } or { group: { category: [{"name":..., "n":...}, ...] } }

Notes - Names are unique per (category[, group]) and sorted by: freq desc, then alpha (default), or alpha only if sort="alpha" - Set include_counts=True to return [{"name":..., "n":...}] instead of plain strings. - Set max_names_per_cat to truncate long lists per category.

Source code in clifpy/tables/microbiology_culture.py
@staticmethod
def cat_vs_name_map(
    df: pd.DataFrame,
    category_col: str,
    name_col: str,
    *,
    group_col: Optional[str] = None,                 # ← if provided, returns {group: {cat: [names...]}}
    dropna: bool = True,
    sort: Literal["freq_then_alpha", "alpha"] = "freq_then_alpha",
    max_names_per_cat: Optional[int] = None,
    include_counts: bool = False,                    # if True → lists of {"name":..., "n":...}
) -> Union[Dict[str, List[str]], Dict[str, Dict[str, List[str]]],
        Dict[str, Dict[str, List[Dict[str, int]]]], Dict[str, List[Dict[str, int]]]]:
    """
    Build mappings from category→names (2-level) or group→category→names (3-level).

    Returns:
    - if group_col is None:
            { category: [names...] }  or  { category: [{"name":..., "n":...}, ...] }
    - if group_col is provided:
            { group: { category: [names...] } }  or
            { group: { category: [{"name":..., "n":...}, ...] } }

    Notes
    - Names are unique per (category[, group]) and sorted by:
        freq desc, then alpha  (default), or alpha only if sort="alpha"
    - Set include_counts=True to return [{"name":..., "n":...}] instead of plain strings.
    - Set max_names_per_cat to truncate long lists per category.
    """
    if df is None:
        return {}

    required = [category_col, name_col] + ([group_col] if group_col else [])
    if any(col not in df.columns for col in required):
        return {}

    sub = df[required].copy()
    if dropna:
        sub = sub.dropna(subset=required)

    # frequency at the most granular level available
    group_by_cols = ([group_col] if group_col else []) + [category_col, name_col]
    counts = (
        sub.groupby(group_by_cols)
        .size()
        .reset_index(name="n")
    )

    def _sort_block(block: pd.DataFrame) -> pd.DataFrame:
        if sort == "alpha":
            return block.sort_values([name_col], ascending=[True], kind="mergesort")
        # default: freq desc then alpha
        return block.sort_values(["n", name_col], ascending=[False, True], kind="mergesort")

    def _emit_names(block: pd.DataFrame):
        if include_counts:
            out = [{"name": str(r[name_col]), "n": int(r["n"])} for _, r in block.iterrows()]
        else:
            out = block[name_col].astype(str).tolist()
        if max_names_per_cat is not None:
            out = out[:max_names_per_cat]
        return out

    if group_col:
        # 3-level: group → category → [names or {"name","n"}]
        result: Dict[str, Dict[str, List[Union[str, Dict[str, int]]]]] = {}
        for grp_val, grp_block in counts.groupby(group_col, sort=False):
            cat_map: Dict[str, List[Union[str, Dict[str, int]]]] = {}
            for cat_val, cat_block in grp_block.groupby(category_col, sort=False):
                sorted_block = _sort_block(cat_block)
                cat_map[str(cat_val)] = _emit_names(sorted_block)
            result[str(grp_val)] = cat_map
        return result
    else:
        # 2-level: category → [names or {"name","n"}]
        result2: Dict[str, List[Union[str, Dict[str, int]]]] = {}
        for cat_val, cat_block in counts.groupby(category_col, sort=False):
            sorted_block = _sort_block(cat_block)
            result2[str(cat_val)] = _emit_names(sorted_block)
        return result2

isvalid

isvalid()

Return True if the last validation finished without errors.

Source code in clifpy/tables/microbiology_culture.py
def isvalid(self) -> bool:
    """Return ``True`` if the last validation finished without errors."""
    return not self.errors and not self.time_order_validation_errors

top_fluid_org_outliers

top_fluid_org_outliers(level='organism_group', min_count=0, top_k=10)

Identify top positive and negative outliers in fluid_category vs organism_group or organism_category.

Parameters: level (str): "organism_group" or "organism_category" (non-standard) min_count (int): Minimum observed count to consider top_k (int): Number of top positive and negative outliers to return

Returns: Dict with keys "top_positive" and "top_negative", each containing a DataFrame of outliers.

Source code in clifpy/tables/microbiology_culture.py
def top_fluid_org_outliers(
    self,
    level: Literal["organism_group", "organism_category"] = "organism_group",
    min_count: int = 0,
    top_k: int = 10,
) -> Dict[str, pd.DataFrame]:
    """
    Identify top positive and negative outliers in fluid_category vs organism_group or organism_category.

    Parameters:
        level (str): "organism_group" or "organism_category" (non-standard)
        min_count (int): Minimum observed count to consider
        top_k (int): Number of top positive and negative outliers to return

    Returns:
        Dict with keys "top_positive" and "top_negative", each containing a DataFrame of outliers.
    """
    tbl = pd.crosstab(self.df["fluid_category"], self.df[level])
    if tbl.empty:
        return {"top_positive": pd.DataFrame(), "top_negative": pd.DataFrame()}

    total = tbl.values.sum()
    exp = (tbl.sum(1).values.reshape(-1,1) @ tbl.sum(0).values.reshape(1,-1)) / total
    with np.errstate(divide="ignore", invalid="ignore"):
        z = (tbl.values - exp) / np.sqrt(exp)

    long = pd.DataFrame({
        "fluid_category": np.repeat(tbl.index.values, tbl.shape[1]),
        level: np.tile(tbl.columns.values, tbl.shape[0]),
        "observed": tbl.values.ravel().astype(float),
        "expected": exp.ravel().astype(float),
        "std_resid": z.ravel().astype(float),
    }).dropna()

    long = long[long["observed"] >= min_count]
    top_pos = long.sort_values("std_resid", ascending=False).head(top_k).reset_index(drop=True)
    top_neg = long.sort_values("std_resid", ascending=True).head(top_k).reset_index(drop=True)
    return {"top_positive": top_pos, "top_negative": top_neg}

validate_timestamp_order

validate_timestamp_order()

Check that order_dttm ≤ collect_dttm ≤ result_dttm. - Resets self.time_order_validation_errors - Adds one entry per violated rule - Extends self.errors and logs: 'Found {len(self.time_order_validation_errors)} time order validation errors' Returns a dataframe of all violating rows (union of both rules) or None if OK.

Source code in clifpy/tables/microbiology_culture.py
def validate_timestamp_order(self):
    """
    Check that order_dttm ≤ collect_dttm ≤ result_dttm.
    - Resets self.time_order_validation_errors
    - Adds one entry per violated rule
    - Extends self.errors and logs: 'Found {len(self.time_order_validation_errors)} time order validation errors'
    Returns a dataframe of all violating rows (union of both rules) or None if OK.
    """
    # Reset time order validation bucket
    self.time_order_validation_errors = []

    df = self.df
    key_cols = ["patient_id", "hospitalization_id", "organism_id"]
    time_cols = ["order_dttm", "collect_dttm", "result_dttm"]

    # Check for missing columns
    missing = [col for col in time_cols if col not in df.columns]
    if missing:
        msg = (
            f"Missing required timestamp columns for time order validation: {', '.join(missing)}"
        )
        self.time_order_validation_errors.append({
            "type": "missing_time_order_columns",
            "columns": missing,
            "message": msg,
            "table": getattr(self, "table_name", "unknown"),
        })
        if hasattr(self, "errors"):
            self.errors.extend(self.time_order_validation_errors)
        self.logger.warning(msg)
        return None

    grace = pd.Timedelta(minutes=1)

    # Flag if order is ≥ 1 minute after collect (allow small jitter where collect ≥ order within 1 min)
    m_order_ge_collect = (df["order_dttm"] - df["collect_dttm"]) >= grace

    # Flag if collect is ≥ 1 minute after result (allow small jitter where result ≥ collect within 1 min)
    m_collect_ge_result = (df["collect_dttm"] - df["result_dttm"]) >= grace

    n1 = int(m_order_ge_collect.sum())
    n2 = int(m_collect_ge_result.sum())

    if n1 > 0:
        self.time_order_validation_errors.append({
            "type": "time_order_validation",
            "rule": "order_dttm <= collect_dttm, grace 1 min",
            "message": f"{n1} rows have order_dttm > collect_dttm",
            "rows": n1,
            "table": getattr(self, "table_name", "unknown"),
        })
    if n2 > 0:
        self.time_order_validation_errors.append({
            "type": "time_order_validation",
            "rule": "collect_dttm <= result_dttm, grace 1 min",
            "message": f"{n2} rows have collect_dttm > result_dttm",
            "rows": n2,
            "table": getattr(self, "table_name", "unknown"),
        })

    # Add range validation errors to main errors list (exact logging style)
    if self.time_order_validation_errors:
        if hasattr(self, "errors"):
            self.errors.extend(self.time_order_validation_errors)
        self.logger.warning(f"Found {len(self.time_order_validation_errors)} range validation errors")

    # Return violating rows (union), showing keys + timestamps
    any_bad = m_order_ge_collect | m_collect_ge_result
    if any_bad.any():
        show_cols = [*key_cols, "order_dttm", "collect_dttm", "result_dttm"]
        return df.loc[any_bad, [c for c in show_cols if c in df.columns]].copy()

    # Nothing to report
    self.logger.info("validate_timestamp_order: passed (no violations)")
    return None

Vitals

clifpy.tables.vitals.Vitals

Vitals(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Vitals table wrapper inheriting from BaseTable.

This class handles vitals-specific data and validations including range validation for vital signs.

Initialize the vitals table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/vitals.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the vitals table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    # Initialize range validation errors list
    self.range_validation_errors: List[dict] = []

    # Load vital ranges and units from schema
    self._vital_units = None
    self._vital_ranges = None

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

    # Load vital-specific schema data
    self._load_vitals_schema_data()

vital_ranges property

vital_ranges

Get the vital ranges from the schema.

vital_units property

vital_units

Get the vital units mapping from the schema.

filter_by_vital_category

filter_by_vital_category(vital_category)

Return all records for a specific vital category (e.g., 'heart_rate', 'temp_c').

Source code in clifpy/tables/vitals.py
def filter_by_vital_category(self, vital_category: str) -> pd.DataFrame:
    """Return all records for a specific vital category (e.g., 'heart_rate', 'temp_c')."""
    if self.df is None or 'vital_category' not in self.df.columns:
        return pd.DataFrame()

    return self.df[self.df['vital_category'] == vital_category].copy()

get_vital_summary_stats

get_vital_summary_stats()

Return summary statistics for each vital category.

Source code in clifpy/tables/vitals.py
def get_vital_summary_stats(self) -> pd.DataFrame:
    """Return summary statistics for each vital category."""
    if self.df is None or 'vital_value' not in self.df.columns:
        return pd.DataFrame()

    # Convert vital_value to numeric
    df_copy = self.df.copy()
    df_copy['vital_value'] = pd.to_numeric(df_copy['vital_value'], errors='coerce')

    # Group by vital category and calculate stats
    stats = df_copy.groupby('vital_category')['vital_value'].agg([
        'count', 'mean', 'std', 'min', 'max',
        ('q1', lambda x: x.quantile(0.25)),
        ('median', lambda x: x.quantile(0.5)),
        ('q3', lambda x: x.quantile(0.75))
    ]).round(2)

    return stats

isvalid

isvalid()

Return True if the last validation finished without errors.

Source code in clifpy/tables/vitals.py
def isvalid(self) -> bool:
    """Return ``True`` if the last validation finished without errors."""
    return not self.errors and not self.range_validation_errors

Respiratory Support

clifpy.tables.respiratory_support.RespiratorySupport

RespiratorySupport(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Respiratory support table wrapper inheriting from BaseTable.

This class handles respiratory support data and validations while leveraging the common functionality provided by BaseTable.

Initialize the respiratory_support table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/respiratory_support.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the respiratory_support table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

waterfall

waterfall(*, id_col='hospitalization_id', bfill=False, verbose=True, return_dataframe=False)

Clean + waterfall-fill the respiratory_support table.

Parameters:

Name Type Description Default
id_col str

Encounter-level identifier column (default: hospitalization_id)

'hospitalization_id'
bfill bool

If True, numeric setters are back-filled after forward-fill

False
verbose bool

Print progress messages

True
return_dataframe bool

If True, returns DataFrame instead of RespiratorySupport instance

False

Returns:

Type Description
RespiratorySupport

New instance with processed data (or DataFrame if return_dataframe=True)

Notes

The waterfall function expects data in UTC timezone. If your data is in a different timezone, it will be converted to UTC for processing, then converted back to the original timezone on return. The original object is not modified.

Source code in clifpy/tables/respiratory_support.py
    def waterfall(
    self,
    *,
    id_col: str = "hospitalization_id",
    bfill: bool = False,
    verbose: bool = True,
    return_dataframe: bool = False
) -> Union['RespiratorySupport', pd.DataFrame]:
        """
        Clean + waterfall-fill the respiratory_support table.

        Parameters
        ----------
        id_col : str
            Encounter-level identifier column (default: hospitalization_id)
        bfill : bool
            If True, numeric setters are back-filled after forward-fill
        verbose : bool
            Print progress messages
        return_dataframe : bool
            If True, returns DataFrame instead of RespiratorySupport instance

        Returns
        -------
        RespiratorySupport
            New instance with processed data (or DataFrame if return_dataframe=True)

        Notes
        -----
        The waterfall function expects data in UTC timezone. If your data is in a
        different timezone, it will be converted to UTC for processing, then converted
        back to the original timezone on return. The original object is not modified.
        """
        if self.df is None or self.df.empty:
            raise ValueError("No data available to process. Load data first.")

        # Work on a copy
        df_copy = self.df.copy()

        # --- Capture original tz (if any), convert to UTC for processing
        original_tz = None
        if 'recorded_dttm' in df_copy.columns:
            if pd.api.types.is_datetime64tz_dtype(df_copy['recorded_dttm']):
                original_tz = df_copy['recorded_dttm'].dt.tz
                if verbose and str(original_tz) != 'UTC':
                    print(f"Converting timezone from {original_tz} to UTC for waterfall processing")
                # Convert to UTC (no-op if already UTC)
                df_copy['recorded_dttm'] = df_copy['recorded_dttm'].dt.tz_convert('UTC')
            else:
                # tz-naive; leave as-is (function expects UTC semantics already)
                original_tz = None

        # --- Run the waterfall (expects UTC)
        processed_df = process_resp_support_waterfall(
            df_copy,
            id_col=id_col,
            bfill=bfill,
            verbose=verbose
        )

        # --- Convert back to original tz if we had one
        if original_tz is not None:
            # Guard: ensure tz-aware before tz_convert
            if pd.api.types.is_datetime64tz_dtype(processed_df['recorded_dttm']):
                if verbose and str(original_tz) != 'UTC':
                    print(f"Converting timezone from UTC back to {original_tz} after processing")
                processed_df = processed_df.copy()
                processed_df['recorded_dttm'] = processed_df['recorded_dttm'].dt.tz_convert(original_tz)
            else:
                # If something made it tz-naive, localize then convert (shouldn't happen, but safe)
                processed_df = processed_df.copy()
                processed_df['recorded_dttm'] = (
                    processed_df['recorded_dttm']
                    .dt.tz_localize('UTC')
                    .dt.tz_convert(original_tz)
                )

        # Return DataFrame if requested
        if return_dataframe:
            return processed_df

        # Otherwise, return a new wrapped instance (timezone metadata stays the same as the current object)
        return RespiratorySupport(
            data_directory=self.data_directory,
            filetype=self.filetype,
            timezone=self.timezone,  # this is your package-level field; we keep it unchanged
            output_directory=self.output_directory,
            data=processed_df
        )

Medication Administration (Continuous)

clifpy.tables.medication_admin_continuous.MedicationAdminContinuous

MedicationAdminContinuous(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Medication administration continuous table wrapper inheriting from BaseTable.

This class handles medication administration continuous data and validations while leveraging the common functionality provided by BaseTable.

Initialize the MedicationAdminContinuous table.

This class handles continuous medication administration data, including validation, dose unit standardization, and unit conversion capabilities.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files. If None and data is provided, defaults to current directory.

None
filetype str

Type of data file (csv, parquet, etc.). If None and data is provided, defaults to 'parquet'.

None
timezone str

Timezone for datetime columns. Used for proper timestamp handling.

"UTC"
output_directory str

Directory for saving output files and logs. If not specified, outputs are saved to the current working directory.

None
data DataFrame

Pre-loaded DataFrame to use instead of loading from file. Supports backward compatibility with direct DataFrame initialization.

None
Notes

The class supports two initialization patterns: 1. Loading from file: provide data_directory and filetype 2. Direct DataFrame: provide data parameter (legacy support)

Upon initialization, the class loads medication schema data including category-to-group mappings from the YAML schema.

Source code in clifpy/tables/medication_admin_continuous.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the MedicationAdminContinuous table.

    This class handles continuous medication administration data, including validation,
    dose unit standardization, and unit conversion capabilities.

    Parameters
    ----------
    data_directory : str, optional
        Path to the directory containing data files. If None and data is provided,
        defaults to current directory.
    filetype : str, optional
        Type of data file (csv, parquet, etc.). If None and data is provided,
        defaults to 'parquet'.
    timezone : str, default="UTC"
        Timezone for datetime columns. Used for proper timestamp handling.
    output_directory : str, optional
        Directory for saving output files and logs. If not specified, outputs
        are saved to the current working directory.
    data : pd.DataFrame, optional
        Pre-loaded DataFrame to use instead of loading from file. Supports
        backward compatibility with direct DataFrame initialization.

    Notes
    -----
    The class supports two initialization patterns:
    1. Loading from file: provide data_directory and filetype
    2. Direct DataFrame: provide data parameter (legacy support)

    Upon initialization, the class loads medication schema data including
    category-to-group mappings from the YAML schema.
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: medication_admin_continuous(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    # Load medication mappings
    self._med_category_to_group = None

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

    # Load medication-specific schema data
    self._load_medication_schema_data()

med_category_to_group_mapping property

med_category_to_group_mapping

Get the medication category to group mapping from the schema.

Returns:

Type Description
Dict[str, str]

A dictionary mapping medication categories to their therapeutic groups. Returns a copy to prevent external modification of the internal mapping. Returns an empty dict if no mappings are loaded.

Examples:

>>> mac = MedicationAdminContinuous(data)
>>> mappings = mac.med_category_to_group_mapping
>>> mappings['Antibiotics']
'Antimicrobials'

Patient Assessments

clifpy.tables.patient_assessments.PatientAssessments

PatientAssessments(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Patient assessments table wrapper inheriting from BaseTable.

This class handles patient assessment data and validations while leveraging the common functionality provided by BaseTable.

Initialize the patient_assessments table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/patient_assessments.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the patient_assessments table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: patient_assessments(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    # Initialize assessment mappings
    self._assessment_category_to_group = None

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

    # Load assessment-specific schema data
    self._load_assessment_schema_data()

assessment_category_to_group_mapping property

assessment_category_to_group_mapping

Get the assessment category to group mapping from the schema.

Position

clifpy.tables.position.Position

Position(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Position table wrapper inheriting from BaseTable.

This class handles patient position data and validations while leveraging the common functionality provided by BaseTable.

Initialize the position table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/position.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the position table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    # For backward compatibility, handle the old signature
    if data_directory is None and filetype is None and data is not None:
        # Old signature: position(data)
        # Use dummy values for required parameters
        data_directory = "."
        filetype = "parquet"

    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

get_position_category_stats

get_position_category_stats()

Return summary statistics for each position category, including missingness and unique patient counts. Expects columns: 'position_category', 'position_name', and optionally 'hospitalization_id'.

Source code in clifpy/tables/position.py
def get_position_category_stats(self) -> pd.DataFrame:
    """
    Return summary statistics for each position category, including missingness and unique patient counts.
    Expects columns: 'position_category', 'position_name', and optionally 'hospitalization_id'.
    """
    if self.df is None or 'position_category' not in self.df.columns or 'hospitalization_id' not in self.df.columns:
        return {"status": "Missing columns"}

    agg_dict = {
        'count': ('position_category', 'count'),
        'unique': ('hospitalization_id', 'nunique'),
    }

    stats = (
        self.df
        .groupby('position_category')
        .agg(**agg_dict)
        .round(2)
    )

    return stats

Medication Administration (Intermittent)

clifpy.tables.medication_admin_intermittent.MedicationAdminIntermittent

MedicationAdminIntermittent(data_directory, filetype, timezone, output_directory=None, data=None)

Bases: BaseTable

Medication administration intermittent table wrapper inheriting from BaseTable.

This class handles medication administration intermittent data and validations while leveraging the common functionality provided by BaseTable.

Source code in clifpy/tables/base_table.py
def __init__(
    self, 
    data_directory: str,
    filetype: str,
    timezone: str,
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the BaseTable.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs.
        If not provided, creates an 'output' directory in the current working directory.
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    # Store configuration
    self.data_directory = data_directory
    self.filetype = filetype
    self.timezone = timezone

    # Set output directory
    if output_directory is None:
        output_directory = os.path.join(os.getcwd(), 'output')
    self.output_directory = output_directory
    os.makedirs(self.output_directory, exist_ok=True)

    # Initialize centralized logging
    setup_logging(output_directory=self.output_directory)

    # Derive snake_case table name from PascalCase class name
    # Example: Adt -> adt, RespiratorySupport -> respiratory_support
    self.table_name = ''.join(['_' + c.lower() if c.isupper() else c for c in self.__class__.__name__]).lstrip('_')

    # Initialize data and validation state
    self.df: Optional[pd.DataFrame] = data
    self.errors: List[Dict[str, Any]] = []
    self.schema: Optional[Dict[str, Any]] = None
    self.outlier_config: Optional[Dict[str, Any]] = None
    self._validated: bool = False

    # Setup table-specific logging
    self._setup_logging()

    # Load schema
    self._load_schema()

    # Load outlier config
    self._load_outlier_config()

Hospital Diagnosis

clifpy.tables.hospital_diagnosis.HospitalDiagnosis

HospitalDiagnosis(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Hospital diagnosis table wrapper inheriting from BaseTable.

This class handles hospital diagnosis-specific data and validations while leveraging the common functionality provided by BaseTable. Hospital diagnosis codes are finalized billing diagnosis codes for hospital reimbursement, appropriate for calculation of comorbidity scores but should not be used as input features into a prediction model for an inpatient event.

Initialize the hospital diagnosis table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/hospital_diagnosis.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the hospital diagnosis table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

    # Auto-load data if not provided
    if data is None and data_directory is not None and filetype is not None:
        self.load_table()

get_diagnosis_by_format

get_diagnosis_by_format()

Group diagnoses by format (ICD9/ICD10) and return summary statistics.

Source code in clifpy/tables/hospital_diagnosis.py
def get_diagnosis_by_format(self) -> Dict:
    """Group diagnoses by format (ICD9/ICD10) and return summary statistics."""
    if self.df is None or 'diagnosis_code_format' not in self.df.columns:
        return {}

    format_stats = {}

    for format_type in self.df['diagnosis_code_format'].unique():
        subset = self.df[self.df['diagnosis_code_format'] == format_type]

        format_stats[format_type] = {
            'total_diagnoses': len(subset),
            'unique_diagnosis_codes': subset['diagnosis_code'].nunique() if 'diagnosis_code' in subset.columns else 0,
            'unique_hospitalizations': subset['hospitalization_id'].nunique() if 'hospitalization_id' in subset.columns else 0
        }

        # Primary vs secondary for this format
        if 'diagnosis_primary' in subset.columns:
            primary_counts = subset['diagnosis_primary'].value_counts().to_dict()
            format_stats[format_type]['primary_count'] = primary_counts.get(1, 0)
            format_stats[format_type]['secondary_count'] = primary_counts.get(0, 0)

        # POA statistics for this format
        if 'poa_present' in subset.columns:
            poa_counts = subset['poa_present'].value_counts().to_dict()
            format_stats[format_type]['poa_present_count'] = poa_counts.get(1, 0)
            format_stats[format_type]['poa_not_present_count'] = poa_counts.get(0, 0)

    return format_stats

get_diagnosis_summary

get_diagnosis_summary()

Return comprehensive summary statistics for hospital diagnosis data.

Source code in clifpy/tables/hospital_diagnosis.py
def get_diagnosis_summary(self) -> Dict:
    """Return comprehensive summary statistics for hospital diagnosis data."""
    if self.df is None:
        return {}

    stats = {
        'total_diagnoses': len(self.df),
        'unique_hospitalizations': self.df['hospitalization_id'].nunique() if 'hospitalization_id' in self.df.columns else 0,
        'unique_diagnosis_codes': self.df['diagnosis_code'].nunique() if 'diagnosis_code' in self.df.columns else 0
    }

    # Diagnosis code format distribution
    if 'diagnosis_code_format' in self.df.columns:
        stats['diagnosis_format_counts'] = self.df['diagnosis_code_format'].value_counts().to_dict()

    # Primary vs secondary diagnosis distribution
    if 'diagnosis_primary' in self.df.columns:
        primary_counts = self.df['diagnosis_primary'].value_counts().to_dict()
        stats['primary_diagnosis_counts'] = {
            'primary': primary_counts.get(1, 0),
            'secondary': primary_counts.get(0, 0)
        }

    # Present on admission distribution
    if 'poa_present' in self.df.columns:
        poa_counts = self.df['poa_present'].value_counts().to_dict()
        stats['poa_counts'] = {
            'present_on_admission': poa_counts.get(1, 0),
            'not_present_on_admission': poa_counts.get(0, 0)
        }

    return stats

get_hospitalization_diagnosis_counts

get_hospitalization_diagnosis_counts()

Return DataFrame with diagnosis counts per hospitalization.

Source code in clifpy/tables/hospital_diagnosis.py
def get_hospitalization_diagnosis_counts(self) -> pd.DataFrame:
    """Return DataFrame with diagnosis counts per hospitalization."""
    if self.df is None or 'hospitalization_id' not in self.df.columns:
        return pd.DataFrame()

    hosp_counts = (self.df.groupby('hospitalization_id')
                  .agg({
                      'diagnosis_code': 'count',
                      'diagnosis_primary': lambda x: (x == 1).sum(),
                      'poa_present': lambda x: (x == 1).sum() if 'poa_present' in self.df.columns else 0
                  })
                  .reset_index())

    hosp_counts.columns = ['hospitalization_id', 'total_diagnoses', 'primary_diagnoses', 'poa_present_diagnoses']
    hosp_counts['secondary_diagnoses'] = hosp_counts['total_diagnoses'] - hosp_counts['primary_diagnoses']

    return hosp_counts.sort_values('total_diagnoses', ascending=False)

get_poa_statistics

get_poa_statistics()

Calculate present on admission statistics by diagnosis type.

Source code in clifpy/tables/hospital_diagnosis.py
def get_poa_statistics(self) -> Dict:
    """Calculate present on admission statistics by diagnosis type."""
    if self.df is None or 'poa_present' not in self.df.columns or 'diagnosis_primary' not in self.df.columns:
        return {}

    stats = {}

    # Overall POA statistics
    total_diagnoses = len(self.df)
    poa_present = len(self.df[self.df['poa_present'] == 1])
    poa_not_present = len(self.df[self.df['poa_present'] == 0])

    stats['overall'] = {
        'total_diagnoses': total_diagnoses,
        'poa_present_count': poa_present,
        'poa_not_present_count': poa_not_present,
        'poa_present_rate': (poa_present / total_diagnoses * 100) if total_diagnoses > 0 else 0
    }

    # POA statistics by primary/secondary diagnosis
    for diagnosis_type, diagnosis_value in [('primary', 1), ('secondary', 0)]:
        subset = self.df[self.df['diagnosis_primary'] == diagnosis_value]
        if not subset.empty:
            subset_total = len(subset)
            subset_poa_present = len(subset[subset['poa_present'] == 1])
            subset_poa_not_present = len(subset[subset['poa_present'] == 0])

            stats[diagnosis_type] = {
                'total_diagnoses': subset_total,
                'poa_present_count': subset_poa_present,
                'poa_not_present_count': subset_poa_not_present,
                'poa_present_rate': (subset_poa_present / subset_total * 100) if subset_total > 0 else 0
            }

    return stats

get_primary_diagnosis_counts

get_primary_diagnosis_counts()

Return DataFrame with counts of primary diagnoses by diagnosis code.

Source code in clifpy/tables/hospital_diagnosis.py
def get_primary_diagnosis_counts(self) -> pd.DataFrame:
    """Return DataFrame with counts of primary diagnoses by diagnosis code."""
    if self.df is None or 'diagnosis_primary' not in self.df.columns:
        return pd.DataFrame()

    primary_diagnoses = self.df[self.df['diagnosis_primary'] == 1]

    if primary_diagnoses.empty:
        return pd.DataFrame()

    diagnosis_counts = (primary_diagnoses.groupby(['diagnosis_code', 'diagnosis_code_format'])
                       .size()
                       .reset_index(name='count'))

    return diagnosis_counts.sort_values('count', ascending=False)

load_table

load_table()

Load hospital diagnosis table data from the configured data directory.

Source code in clifpy/tables/hospital_diagnosis.py
def load_table(self):
    """Load hospital diagnosis table data from the configured data directory."""
    from ..utils.io import load_data

    if self.data_directory is None or self.filetype is None:
        raise ValueError("data_directory and filetype must be set to load data")

    self.df = load_data(
        self.table_name,
        self.data_directory,
        self.filetype,
        site_tz=self.timezone
    )

    if self.logger:
        self.logger.info(f"Loaded {len(self.df)} rows from {self.table_name} table")

CRRT Therapy

clifpy.tables.crrt_therapy.CrrtTherapy

CrrtTherapy(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

CRRT (Continuous Renal Replacement Therapy) table wrapper inheriting from BaseTable.

This class handles CRRT therapy data including dialysis modes, flow rates, and ultrafiltration parameters while leveraging the common functionality provided by BaseTable.

Initialize the CRRT therapy table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/crrt_therapy.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the CRRT therapy table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

Patient Procedures

clifpy.tables.patient_procedures.PatientProcedures

PatientProcedures(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Patient procedures table wrapper inheriting from BaseTable.

This class handles patient procedure data including CPT, ICD10PCS, and HCPCS codes while leveraging the common functionality provided by BaseTable.

Initialize the patient procedures table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/patient_procedures.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the patient procedures table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

Microbiology Susceptibility

clifpy.tables.microbiology_susceptibility.MicrobiologySusceptibility

MicrobiologySusceptibility(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Microbiology susceptibility table wrapper inheriting from BaseTable.

This class handles antimicrobial susceptibility testing data including antimicrobial categories and susceptibility results while leveraging the common functionality provided by BaseTable.

Initialize the microbiology susceptibility table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/microbiology_susceptibility.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the microbiology susceptibility table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

ECMO MCS

clifpy.tables.ecmo_mcs.EcmoMcs

EcmoMcs(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

ECMO (Extracorporeal Membrane Oxygenation) and MCS (Mechanical Circulatory Support) table wrapper inheriting from BaseTable.

This class handles ECMO/MCS device data including device types, flow rates, and support parameters while leveraging the common functionality provided by BaseTable.

Initialize the ECMO/MCS table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/ecmo_mcs.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the ECMO/MCS table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

Microbiology Non-Culture

clifpy.tables.microbiology_nonculture.MicrobiologyNonculture

MicrobiologyNonculture(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Microbiology non-culture table wrapper inheriting from BaseTable.

This class handles microbiology non-culture test data including PCR and other molecular diagnostic results while leveraging the common functionality provided by BaseTable.

Initialize the microbiology non-culture table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/microbiology_nonculture.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the microbiology non-culture table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )

Code Status

clifpy.tables.code_status.CodeStatus

CodeStatus(data_directory=None, filetype=None, timezone='UTC', output_directory=None, data=None)

Bases: BaseTable

Code status table wrapper inheriting from BaseTable.

This class handles patient code status data including DNR, DNAR, DNR/DNI, Full Code, and other resuscitation preferences while leveraging the common functionality provided by BaseTable.

Initialize the code status table.

Parameters:

Name Type Description Default
data_directory str

Path to the directory containing data files

None
filetype str

Type of data file (csv, parquet, etc.)

None
timezone str

Timezone for datetime columns

'UTC'
output_directory str

Directory for saving output files and logs

None
data DataFrame

Pre-loaded data to use instead of loading from file

None
Source code in clifpy/tables/code_status.py
def __init__(
    self,
    data_directory: str = None,
    filetype: str = None,
    timezone: str = "UTC",
    output_directory: Optional[str] = None,
    data: Optional[pd.DataFrame] = None
):
    """
    Initialize the code status table.

    Parameters
    ----------
    data_directory : str
        Path to the directory containing data files
    filetype : str
        Type of data file (csv, parquet, etc.)
    timezone : str
        Timezone for datetime columns
    output_directory : str, optional
        Directory for saving output files and logs
    data : pd.DataFrame, optional
        Pre-loaded data to use instead of loading from file
    """
    super().__init__(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        output_directory=output_directory,
        data=data
    )