Wide Dataset Creation¶
The wide dataset functionality enables you to create comprehensive time-series dataset by joining multiple CLIF tables with automatic pivoting and high-performance processing using DuckDB. This feature transforms narrow, category-based data (like vitals and labs) into wide format suitable for machine learning and analysis.
Overview¶
Wide dataset creation through the ClifOrchestrator provides:
- Automatic table joining across patient, hospitalization, ADT, and optional tables
- Intelligent pivoting of category-based data (vitals, labs, medications, assessments)
- High-performance processing using DuckDB for large datasets
- Memory-efficient batch processing to handle datasets of any size
- Flexible filtering by hospitalization IDs, time windows, and categories
- System resource optimization with configurable memory and thread settings
Important: Wide dataset functionality is only available through the ClifOrchestrator
and requires specific tables to be loaded.
Quick Start¶
from clifpy.clif_orchestrator import ClifOrchestrator
# Initialize orchestrator
co = ClifOrchestrator(
data_directory='/path/to/clif/data',
filetype='parquet',
timezone='US/Central'
)
# Create wide dataset with sample data
wide_df = co.create_wide_dataset(
tables_to_load=['vitals', 'labs'],
category_filters={
'vitals': ['heart_rate', 'sbp', 'spo2'],
'labs': ['hemoglobin', 'sodium', 'glucose']
},
sample=True # Use 20 random hospitalizations for testing
)
Function Parameters¶
The create_wide_dataset()
method accepts the following parameters:
Parameter | Type | Default | Description |
---|---|---|---|
tables_to_load |
List[str] | None | Tables to include: 'vitals', 'labs', 'medication_admin_continuous', 'patient_assessments', 'respiratory_support' |
category_filters |
Dict[str, List[str]] | None | Specific categories to pivot for each table |
sample |
bool | False | If True, randomly select 20 hospitalizations for testing |
hospitalization_ids |
List[str] | None | Specific hospitalization IDs to process |
cohort_df |
DataFrame | None | DataFrame with time windows (requires 'hospitalization_id', 'start_time', 'end_time' columns) |
output_format |
str | 'dataframe' | Output format: 'dataframe', 'csv', or 'parquet' |
save_to_data_location |
bool | False | Save output to data directory |
output_filename |
str | None | Custom filename (auto-generated if None) |
return_dataframe |
bool | True | Return DataFrame even when saving to file |
batch_size |
int | 1000 | Number of hospitalizations per batch (use -1 to disable batching) |
memory_limit |
str | None | DuckDB memory limit (e.g., '8GB', '16GB') |
threads |
int | None | Number of threads for DuckDB processing |
show_progress |
bool | True | Show progress bars for long operations |
Best Practices: System Resource Management¶
Always check your system resources before running wide dataset creation on large datasets:
# Check system resources first
resources = co.get_sys_resource_info()
print(f"Available RAM: {resources['memory_available_gb']:.1f} GB")
print(f"Recommended threads: {resources['max_recommended_threads']}")
# Configure based on available resources
memory_limit = f"{int(resources['memory_available_gb'] * 0.7)}GB" # Use 70% of available RAM
threads = resources['max_recommended_threads']
# Create wide dataset with optimized settings
wide_df = co.create_wide_dataset(
tables_to_load=['vitals', 'labs'],
category_filters={
'vitals': ['heart_rate', 'sbp', 'spo2'],
'labs': ['hemoglobin', 'sodium']
},
memory_limit=memory_limit,
threads=threads,
batch_size=1000 # Adjust based on dataset size
)
Usage Examples¶
Example 1: Development and Testing¶
Use sampling for initial development and testing:
# Sample mode for development
wide_df = co.create_wide_dataset(
tables_to_load=['vitals', 'labs'],
category_filters={
'vitals': ['heart_rate', 'sbp', 'dbp', 'spo2', 'respiratory_rate'],
'labs': ['hemoglobin', 'wbc', 'sodium', 'potassium', 'creatinine']
},
sample=True, # Only 20 random hospitalizations
show_progress=True
)
print(f"Sample dataset shape: {wide_df.shape}")
print(f"Unique hospitalizations: {wide_df['hospitalization_id'].nunique()}")
Example 2: Production Use with Resource Optimization¶
For production use with large datasets:
# Get system info and configure accordingly
resources = co.get_sys_resource_info(print_summary=False)
# Production settings
wide_df = co.create_wide_dataset(
tables_to_load=['vitals', 'labs', 'medication_admin_continuous'],
category_filters={
'vitals': ['heart_rate', 'sbp', 'dbp', 'spo2', 'temp_c'],
'labs': ['hemoglobin', 'wbc', 'sodium', 'potassium'],
'medication_admin_continuous': ['norepinephrine', 'propofol', 'fentanyl']
},
batch_size=500, # Smaller batches for large datasets
memory_limit="12GB",
threads=resources['max_recommended_threads'],
save_to_data_location=True,
output_format='parquet',
output_filename='wide_dataset_production'
)
Example 3: Targeted Analysis with Specific IDs¶
Process specific hospitalizations:
# Analyze specific patient cohort
target_ids = ['12345', '67890', '11111', '22222']
wide_df = co.create_wide_dataset(
hospitalization_ids=target_ids,
tables_to_load=['vitals', 'labs', 'patient_assessments'],
category_filters={
'vitals': ['heart_rate', 'sbp', 'spo2'],
'labs': ['lactate', 'hemoglobin'],
'patient_assessments': ['gcs_total', 'rass']
}
)
print(f"Analyzed {len(target_ids)} specific hospitalizations")
Example 4: Time Window Filtering with Cohort DataFrame¶
Filter data to specific time windows:
import pandas as pd
# Define cohort with time windows
cohort_df = pd.DataFrame({
'hospitalization_id': ['12345', '67890', '11111'],
'start_time': ['2023-01-01 08:00:00', '2023-01-05 12:00:00', '2023-01-10 06:00:00'],
'end_time': ['2023-01-03 18:00:00', '2023-01-07 20:00:00', '2023-01-12 14:00:00']
})
# Convert to datetime
cohort_df['start_time'] = pd.to_datetime(cohort_df['start_time'])
cohort_df['end_time'] = pd.to_datetime(cohort_df['end_time'])
# Create wide dataset with time filtering
wide_df = co.create_wide_dataset(
tables_to_load=['vitals', 'labs'],
category_filters={
'vitals': ['heart_rate', 'sbp'],
'labs': ['hemoglobin', 'sodium']
},
cohort_df=cohort_df # Only include data within specified time windows
)
Example 5: No Batch Processing for Small Datasets¶
Disable batching for small datasets:
# Small dataset - process all at once
wide_df = co.create_wide_dataset(
hospitalization_ids=small_id_list, # < 100 hospitalizations
tables_to_load=['vitals', 'labs'],
category_filters={
'vitals': ['heart_rate', 'sbp'],
'labs': ['hemoglobin']
},
batch_size=-1 # Disable batching
)
Memory Management and Batch Processing¶
Understanding Batch Processing¶
Batch processing divides hospitalizations into smaller groups to prevent memory (larger-than-memory, OOM) issues:
# Large dataset - use batching
wide_df = co.create_wide_dataset(
tables_to_load=['vitals', 'labs'],
category_filters={...},
batch_size=1000, # Process 1000 hospitalizations at a time
memory_limit="8GB"
)
# Small dataset - disable batching for better performance
wide_df = co.create_wide_dataset(
tables_to_load=['vitals', 'labs'],
category_filters={...},
batch_size=-1 # Process all at once
)
Memory Optimization Guidelines¶
Dataset Size | Batch Size | Memory Limit | Threads |
---|---|---|---|
< 1,000 hospitalizations | -1 (no batching) | 4GB | 2-4 |
1,000 - 10,000 hospitalizations | 1000 | 8GB | 4-8 |
> 10,000 hospitalizations | 500 | 16GB+ | 6-12 |
Hourly Aggregation¶
Convert the wide dataset to hourly aggregation:
# Create aggregation configuration
aggregation_config = {
'max': ['sbp', 'map'],
'mean': ['heart_rate', 'respiratory_rate'],
'min': ['spo2'],
'median': ['temp_c'],
'first': ['gcs_total', 'rass'],
'last': ['assessment_value'],
'boolean': ['norepinephrine', 'propofol'], # 1 if present, 0 if absent
}
# Convert to hourly
hourly_df = co.convert_wide_to_hourly(
wide_df,
aggregation_config=aggregation_config,
memory_limit='8GB'
)
print(f"Hourly dataset: {hourly_df.shape}")
print(f"Hour range: {hourly_df['nth_hour'].min()} to {hourly_df['nth_hour'].max()}")
Output Structure¶
The wide dataset includes:
Core Columns¶
patient_id
: Patient identifierhospitalization_id
: Hospitalization identifierevent_time
: Timestamp for each eventday_number
: Sequential day within hospitalizationhosp_id_day_key
: Unique hospitalization-daily identifier
Patient Demographics¶
age_at_admission
: Patient age- Additional patient table columns
ADT Information¶
- Location and transfer data from ADT table
Pivoted Data Columns¶
- Individual columns for each category (e.g.,
heart_rate
,hemoglobin
,norepinephrine
) as per use provided incategory_filters
- Values aligned by timestamp and hospitalization
Troubleshooting¶
Common Issues and Solutions¶
Memory Errors
# Solution: Reduce batch size and set memory limit
wide_df = co.create_wide_dataset(
batch_size=250, # Smaller batches
memory_limit="4GB", # Conservative limit
sample=True # Test with sample first
)
System Crashes
# Solution: Check resources first and configure accordingly
resources = co.get_sys_resource_info()
if resources['memory_available_gb'] < 8:
print("Warning: Low memory available. Consider using smaller batch_size.")
batch_size = 250
else:
batch_size = 1000
Empty Results
# Check if tables and categories exist
print("Available tables:", co.get_loaded_tables())
if hasattr(co, 'vitals') and co.vitals is not None:
print("Vital categories:", co.vitals.df['vital_category'].unique())
Slow Performance
# Use optimize settings
wide_df = co.create_wide_dataset(
tables_to_load=['vitals'], # Start with one table
category_filters={
'vitals': ['heart_rate', 'sbp'] # Limit categories
},
threads=co.get_sys_resource_info(print_summary=False)['max_recommended_threads']
)
Error Messages¶
Error | Cause | Solution |
---|---|---|
"Memory limit exceeded" | Dataset too large for available RAM | Reduce batch_size, set memory_limit |
"No event times found" | No data in specified tables/categories | Check table data and category filters |
"Missing required columns" | cohort_df missing required columns | Ensure 'hospitalization_id', 'start_time', 'end_time' columns exist |
Integration with Existing Workflows¶
Wide dataset creation integrates seamlessly with existing ClifOrchestrator workflows:
# Traditional approach
co = ClifOrchestrator('/path/to/data', 'parquet', 'UTC')
co.initialize(['patient', 'hospitalization', 'adt', 'vitals', 'labs'])
# Enhanced with wide dataset
wide_df = co.create_wide_dataset(
tables_to_load=['vitals', 'labs'],
category_filters={
'vitals': ['heart_rate', 'sbp'],
'labs': ['hemoglobin']
}
)
# Continue with validation and analysis
co.validate_all()
analysis_results = your_analysis_function(wide_df)
Next Steps¶
- Learn about data validation to ensure data quality
- Explore individual table guides for detailed table documentation
- See the orchestrator guide for advanced orchestrator features
- Review timezone handling for multi-site data