CLIF Orchestrator¶
The ClifOrchestrator
class provides a centralized interface for managing multiple CLIF tables with consistent configuration. This guide covers how to use the orchestrator effectively.
Overview¶
The orchestrator simplifies working with multiple CLIF tables by:
- Ensuring consistent configuration across all tables
- Providing bulk operations (load, validate)
- Managing shared settings (timezone, file format, output directory)
- Offering a unified interface for multi-table workflows
Basic Usage¶
Initialization¶
from clifpy.clif_orchestrator import ClifOrchestrator
# Create orchestrator with your data configuration
orchestrator = ClifOrchestrator(
data_directory='/path/to/clif/data',
filetype='parquet', # or 'csv'
timezone='US/Central',
output_directory='/path/to/outputs' # Optional
)
Loading Tables¶
# Load specific tables
orchestrator.initialize(tables=['patient', 'labs', 'vitals'])
# Load all available tables
all_tables = ['patient', 'hospitalization', 'adt', 'labs', 'vitals',
'medication_admin_continuous', 'patient_assessments',
'respiratory_support', 'position']
orchestrator.initialize(tables=all_tables)
# Load with sampling (useful for testing)
orchestrator.initialize(
tables=['patient', 'labs'],
sample_size=1000
)
Accessing Tables¶
Once loaded, tables are available as attributes:
# Access individual tables
patient_data = orchestrator.patient
labs_data = orchestrator.labs
vitals_data = orchestrator.vitals
# Get the underlying DataFrames
patient_df = orchestrator.patient.df
labs_df = orchestrator.labs.df
Advanced Usage¶
Selective Column Loading¶
Load only specific columns to reduce memory usage:
orchestrator.initialize(
tables=['labs', 'vitals'],
columns={
'labs': ['hospitalization_id', 'lab_result_dttm', 'lab_value', 'lab_name'],
'vitals': ['hospitalization_id', 'recorded_dttm', 'vital_value']
}
)
Filtered Loading¶
Apply filters during loading:
# Filter by patient IDs
hospitalization_ids = ['P001', 'P002', 'P003']
orchestrator.initialize(
tables=['labs', 'vitals'],
filters={
'labs': {'hospitalization_id': patient_ids},
'vitals': {'hospitalization_id': patient_ids}
}
)
# Filter by categories
orchestrator.initialize(
tables=['labs', 'adt'],
filters={
'labs': {'lab_category': 'lactate'},
'adt': {'location_category': 'icu'}
}
)
Validation Workflow¶
# Validate all loaded tables
orchestrator.validate_all()
# Check which tables are valid
for table_name in orchestrator.get_loaded_tables():
table = getattr(orchestrator, table_name)
if table.isvalid():
print(f"✓ {table_name} passed validation")
else:
print(f"✗ {table_name} has {len(table.errors)} errors")
Utility Methods¶
Get Loaded Tables¶
# List of loaded table names
loaded = orchestrator.get_loaded_tables()
print(f"Loaded tables: {', '.join(loaded)}")
# Get table objects
table_objects = orchestrator.get_tables_obj_list()
for table in table_objects:
print(f"{table.table_name}: {len(table.df)} rows")
Individual Table Loading¶
You can also load tables individually:
# Load tables one at a time
orchestrator.load_patient_data(sample_size=1000)
orchestrator.load_labs_data(
columns=['hospitalization_id', 'lab_result_dttm', 'lab_value']
)
orchestrator.load_vitals_data(
filters={'vital_category': ['heart_rate', 'sbp', 'dbp']}
)
Common Patterns¶
Multi-table Analysis¶
# Load related tables for ICU analysis
orchestrator.initialize(
tables=['patient', 'adt', 'labs', 'vitals', 'respiratory_support']
)
# Get ICU patients
icu_stays = orchestrator.adt.filter_by_location_category('icu')
icu_patient_ids = icu_stays['patient_id'].unique()
# Analyze their labs
icu_labs = orchestrator.labs.df[
orchestrator.labs.df['patient_id'].isin(icu_patient_ids)
]
# Check ventilation status
vent_patients = orchestrator.respiratory_support.df[
orchestrator.respiratory_support.df['device_category'] == 'IMV'
]['patient_id'].unique()
Best Practices¶
- Load only what you need: Use column and filter parameters to reduce memory usage
- Validate early: Run validation immediately after loading to catch issues
- Use consistent timezones: The orchestrator ensures all tables use the same timezone
- Check output directory: Validation reports and logs are saved to the output directory
- Handle missing tables gracefully: Check if a table is loaded before accessing it
Error Handling¶
# Check if table is loaded
if orchestrator.labs is not None:
labs_data = orchestrator.labs.df
else:
print("Labs table not loaded")
# Handle validation errors
orchestrator.validate_all()
for table_name in orchestrator.get_loaded_tables():
table = getattr(orchestrator, table_name)
if not table.isvalid():
# Save errors for review
error_file = f"{table_name}_errors.csv"
pd.DataFrame(table.errors).to_csv(error_file, index=False)
print(f"Saved {len(table.errors)} errors to {error_file}")
Next Steps¶
- Learn about individual table types
- Understand data validation
- See practical examples