Data Passing Guide
Understanding Data Flow in Collie
Collie uses an event-driven architecture where components communicate through Event objects. Each Event contains a Payload with typed fields and an optional Context for metadata.
User-Friendly Features
Collie Payloads are designed with developer experience in mind:
Type Safety: Standard fields have clear types for IDE autocomplete
Flexibility:
extra_datafield for custom data without breaking changesHelper Methods: Convenient methods for accessing extra data:
payload.get_extra("key", default)- Safe access with default
payload.set_extra("key", value)- Fluent setter with chaining
payload.has_extra("key")- Check existenceMethod Chaining: Build payloads fluently:
payload = (TransformerPayload(train_data=df)
.set_extra("feature_names", features)
.set_extra("n_classes", 3))
Pydantic Validation: Automatic validation and serialization
Three Ways to Pass Data
1. Standard Payload Fields (Recommended for Common Data)
Use the predefined fields in each Payload type for standard ML pipeline data:
TransformerPayload
TransformerPayload(
train_data=pd.DataFrame, # Training dataset
validation_data=pd.DataFrame, # Validation dataset
test_data=pd.DataFrame # Test dataset
)
TunerPayload
TunerPayload(
hyperparameters=dict, # Best hyperparameters found
train_data=pd.DataFrame, # Pass along training data
validation_data=pd.DataFrame, # Pass along validation data
test_data=pd.DataFrame # Pass along test data
)
TrainerPayload
TrainerPayload(
model=Any # Trained model object
)
# Optional: Use extra_data for framework-specific metrics
TrainerPayload(
model=model,
extra_data={
"train_loss": 0.05, # PyTorch/TensorFlow loss
"val_loss": 0.08,
"epochs": 100
}
)
EvaluatorPayload
EvaluatorPayload(
metrics=List[Dict], # List of metric dictionaries
is_better_than_production=bool # Whether to promote model
)
PusherPayload
PusherPayload(
model_uri=str, # MLflow model URI
status=str, # Deployment status
model_version=str # Model version number
)
2. Extra Data Field (For Custom/Experimental Data)
Every Payload has an extra_data dictionary field for flexible custom data.
Three Ways to Use Extra Data:
# Method 1: Direct dictionary access
payload = TransformerPayload(
train_data=df,
extra_data={"feature_names": ["age", "income"]}
)
# Method 2: Using helper methods (recommended for better readability)
payload = TransformerPayload(train_data=df)
payload.set_extra("feature_names", ["age", "income"])
payload.set_extra("n_classes", 3)
# Method 3: Method chaining
payload = (TransformerPayload(train_data=df)
.set_extra("feature_names", ["age", "income"])
.set_extra("n_classes", 3)
.set_extra("data_source", "database"))
Helper Methods:
# Set a value
payload.set_extra("key", value) # Returns self for chaining
# Get a value with default
value = payload.get_extra("key", default_value)
# Check if key exists
if payload.has_extra("key"):
value = payload.get_extra("key")
Example: Passing Feature Engineering Metadata
class MyTransformer(Transformer):
def handle(self, event: Event) -> Event:
# Process data
train_data, feature_info = preprocess_data()
return Event(
payload=TransformerPayload(
train_data=train_data,
validation_data=None,
test_data=None,
extra_data={
"feature_names": ["age", "income", "score"],
"categorical_features": ["gender", "city"],
"numeric_features": ["age", "income"],
"encoding_mappings": {"city": {"NY": 0, "LA": 1}},
"scaler_params": {"mean": 0.5, "std": 0.2}
}
)
)
Example: Accessing and Extending Extra Data
class MyTrainer(Trainer):
def handle(self, event: Event) -> Event:
# Get data from previous component
train_data = event.payload.train_data
# Access extra data using helper methods (recommended)
feature_names = event.payload.get_extra("feature_names", [])
categorical_features = event.payload.get_extra("categorical_features", [])
# Or check if exists first
if event.payload.has_extra("scaler_params"):
scaler_params = event.payload.get_extra("scaler_params")
# Use the information
self.mlflow.log_params({
"n_features": len(feature_names),
"n_categorical": len(categorical_features)
})
# Train model
model = train_model(train_data, feature_names)
# Build new payload with extra data - three approaches:
# Approach 1: Create with dict (merge previous extra_data)
return Event(
payload=TrainerPayload(
model=model,
extra_data={
**event.payload.extra_data, # Keep previous
"training_time_seconds": 120.5,
"n_epochs": 50,
"train_loss": 0.1, # Optional metrics
"val_loss": 0.15
}
)
)
# Approach 2: Use helper methods (more readable)
payload = TrainerPayload(
model=model,
extra_data=event.payload.extra_data.copy() # Copy previous
)
payload.set_extra("training_time_seconds", 120.5)
payload.set_extra("n_epochs", 50)
payload.set_extra("train_loss", 0.1)
payload.set_extra("val_loss", 0.15)
return Event(payload=payload)
# Approach 3: Method chaining (most concise)
payload = (TrainerPayload(model=model,
extra_data=event.payload.extra_data.copy())
.set_extra("training_time_seconds", 120.5)
.set_extra("n_epochs", 50)
.set_extra("train_loss", 0.1)
.set_extra("val_loss", 0.15)
.set_extra("early_stopping_epoch", 35))
return Event(payload=payload)
Example: Evaluation with Custom Metrics
class MyEvaluator(Evaluator):
def handle(self, event: Event) -> Event:
model = event.payload.model
test_data = event.payload.test_data
# Perform evaluation
metrics = evaluate_model(model, test_data)
# Save detailed reports
report_path = "evaluation_report.html"
generate_report(metrics, report_path)
self.mlflow.log_artifact(report_path)
return Event(
payload=EvaluatorPayload(
metrics=[metrics],
is_better_than_production=metrics["accuracy"] > 0.9,
extra_data={
"report_path": report_path,
"confusion_matrix": metrics["confusion_matrix"].tolist(),
"per_class_metrics": metrics["per_class"],
"roc_auc_scores": metrics["roc_auc"],
"evaluation_time": "2024-01-15T10:30:00"
}
)
)
Common Patterns
Pattern 1: Passing Data Through the Pipeline
# Transformer creates data
TransformerPayload(
train_data=train_df,
extra_data={"feature_names": features}
)
# Tuner passes it along with hyperparameters
TunerPayload(
hyperparameters=best_params,
train_data=event.payload.train_data, # Pass through
extra_data=event.payload.extra_data # Pass through
)
# Trainer uses it
train_data = event.payload.train_data
features = event.payload.extra_data["feature_names"]
Pattern 2: Accumulating Extra Data
# Each component adds to extra_data
return Event(
payload=SomePayload(
...,
extra_data={
**event.payload.extra_data, # Keep previous data
"my_new_field": my_value # Add new data
}
)
)
Pattern 3: Conditional Data Passing
class MyTrainer(Trainer):
def handle(self, event: Event) -> Event:
extra_data = {}
# Only include if debugging
if self.debug_mode:
extra_data["training_curve"] = training_history
extra_data["gradient_norms"] = gradient_norms
return Event(
payload=TrainerPayload(
model=model,
extra_data=extra_data
)
)
Best Practices
Complete Example
Here’s a complete pipeline showing all three data passing methods:
from collie import Event
from collie.core import (
Transformer, Trainer, Evaluator,
TransformerPayload, TrainerPayload, EvaluatorPayload
)
import time
class DataTransformer(Transformer):
def handle(self, event: Event) -> Event:
start = time.time()
# Load and process data
train_data, metadata = load_and_process()
return Event(
payload=TransformerPayload(
# Standard fields
train_data=train_data,
validation_data=None,
test_data=None,
# Extra data: feature engineering info
extra_data={
"feature_names": metadata["features"],
"encoding_maps": metadata["encodings"],
"outliers_removed": metadata["outliers_count"]
}
),
context=event.context
)
class ModelTrainer(Trainer):
def handle(self, event: Event) -> Event:
start = time.time()
# Standard fields
train_data = event.payload.train_data
# Extra data
features = event.payload.extra_data.get("feature_names", [])
self.mlflow.log_param("feature_names", features)
self.mlflow.log_param("data_version", event.context.get("data_version"))
# Train
model, history = train_model(train_data, features)
return Event(
payload=TrainerPayload(
# Standard field
model=model,
# Extra data: optional metrics and training details
extra_data={
**event.payload.extra_data, # Keep previous
"train_loss": history["loss"][-1],
"val_loss": history["val_loss"][-1],
"training_curve": history["loss"],
"best_epoch": history["best_epoch"],
"optimizer": "Adam"
}
),
context=event.context
)
class ModelEvaluator(Evaluator):
def handle(self, event: Event) -> Event:
model = event.payload.model
# Evaluate
metrics = evaluate(model)
return Event(
payload=EvaluatorPayload(
metrics=[metrics],
is_better_than_production=metrics["accuracy"] > 0.9,
extra_data={
"detailed_report": "report.html",
"confusion_matrix": metrics["cm"].tolist()
}
)
)
For more examples, see the Core Concepts and MLflow Integration pages.