Core Concepts
Understanding Collie’s Architecture
Collie is built on three core principles:
Modular Components - Each stage of the ML pipeline is a separate component
Event-Driven - Components communicate through events
MLflow Integration - Deep integration for tracking and model management
Component Lifecycle
Every Collie component follows this lifecycle:
1. Initialize → 2. Execute → 3. Log to MLflow → 4. Emit Event → 5. Next Component
The Pipeline Flow
A typical Collie pipeline:
Raw Data → Transformer → Trainer → Tuner → Evaluator → Pusher → Deployed Model
↓ ↓ ↓ ↓ ↓
MLflow MLflow MLflow MLflow MLflow
(data log) (params) (trials) (metrics) (register)
Components in Detail
Transformer
Purpose: Data preprocessing and feature engineering
Responsibilities: - Load and clean data - Feature engineering - Data validation - Train/test split
MLflow Usage: - Log input datasets - Log transformation parameters - Log data statistics
Example:
from collie import Event
from collie.core import TransformerPayload
class MyTransformer(Transformer):
def handle(self, event: Event) -> Event:
# Load data
df = pd.read_csv("data.csv")
# Log pandas DataFrame
self.mlflow.log_pd_data(
data=df,
context="training",
source="data.csv"
)
# Feature engineering
df['new_feature'] = df['feature1'] * df['feature2']
# Log stats
self.mlflow.log_params({
"n_samples": len(df),
"n_features": len(df.columns)
})
return Event(
payload=TransformerPayload(
train_data=df,
validation_data=None,
test_data=None
)
)
Trainer
Purpose: Model training
Responsibilities: - Train machine learning models - Log hyperparameters - Log training metrics - Save model artifacts
MLflow Usage: - Log hyperparameters - Log training metrics (loss, accuracy, etc.) - Log model automatically
Example:
from collie import Event
from collie.core import TrainerPayload
class MyTrainer(Trainer):
def handle(self, event: Event) -> Event:
train_data = event.payload.train_data
X = train_data.drop("target", axis=1)
y = train_data["target"]
# Log hyperparameters
self.mlflow.log_params({
"learning_rate": 0.01,
"batch_size": 32
})
# Training loop
for epoch in range(100):
loss = train_one_epoch(model, X, y)
self.mlflow.log_metric("loss", loss, step=epoch)
# Return model in payload
return Event(
payload=TrainerPayload(model=model)
)
Tuner
Purpose: Hyperparameter optimization
Responsibilities: - Search hyperparameter space - Track trials - Select best parameters
MLflow Usage: - Create nested runs for each trial - Log trial parameters and metrics - Log best parameters
Example:
from collie import Event
from collie.core import TunerPayload
class MyTuner(Tuner):
def handle(self, event: Event) -> Event:
best_score = 0
best_params = {}
for params in param_grid:
# Each trial gets its own run
with self.mlflow.start_run(nested=True):
self.mlflow.log_params(params)
score = evaluate(params)
self.mlflow.log_metric("cv_score", score)
if score > best_score:
best_score = score
best_params = params
self.mlflow.log_dict(best_params, "best_params.json")
# Need to pass train, validation, test data to the next stage
return Event(
payload=TunerPayload(
hyperparameters=best_params,
train_data=event.payload.train_data,
validation_data=event.payload.validation_data,
test_data=event.payload.test_data
)
)
Evaluator
Purpose: Model evaluation and comparison
Responsibilities: - Evaluate model performance on test/validation data - Compare experiment model with production model - Calculate and log evaluation metrics - Determine if the new model is better (set pass_evaluation flag) - Generate evaluation reports and visualizations
Does NOT do: - Register models to MLflow (handled by Pusher) - Transition model stages (handled by Pusher)
MLflow Usage: - Log evaluation metrics (accuracy, precision, recall, etc.) - Log evaluation parameters and decisions - Log evaluation artifacts (confusion matrix, ROC curves, reports) - Load production model for comparison
Outputs:
- Returns EvaluatorPayload with metrics and comparison results
Example:
from collie import Evaluator, Event
from collie.core import EvaluatorPayload
from collie.core.enums.ml_models import MLflowModelStage, ModelFlavor
from sklearn.metrics import accuracy_score
class MyEvaluator(Evaluator):
def __init__(self):
super().__init__(
description="Model evaluation",
tags={"team": "data-science"}
)
def handle(self, event: Event) -> Event:
model = event.payload.model
test_data = event.payload.test_data
X_test = test_data.drop("target", axis=1)
y_test = test_data["target"]
# Evaluate experiment model
y_pred = model.predict(X_test)
experiment_accuracy = accuracy_score(y_test, y_pred)
# Log metrics
self.mlflow.log_metric("experiment_accuracy", experiment_accuracy)
# Compare with production model
prod_model = self.load_latest_model(
model_name=self.registered_model_name,
stage=MLflowModelStage.PRODUCTION,
flavor=ModelFlavor.SKLEARN
)
if prod_model is not None:
y_pred_prod = prod_model.predict(X_test)
production_accuracy = accuracy_score(y_test, y_pred_prod)
is_better = experiment_accuracy > production_accuracy
else:
production_accuracy = 0
is_better = True # No production model, so experiment is better
# Log comparison results
self.mlflow.log_metric("production_accuracy", production_accuracy)
self.mlflow.log_metric("accuracy_improvement",
experiment_accuracy - production_accuracy)
# Return evaluation results
# Pusher will check pass_evaluation and register model if True
return Event(
payload=EvaluatorPayload(
metrics=[
{
"experiment_accuracy": experiment_accuracy,
"production_accuracy": production_accuracy,
"accuracy_improvement": experiment_accuracy - production_accuracy
}
],
is_better_than_production=is_better
)
)
Pusher
Purpose: Model registration and deployment
Responsibilities:
- Check evaluation results (pass_evaluation flag from Evaluator)
- Register model to MLflow if evaluation passed
- Transition model to target stage (e.g., Staging, Production)
- Archive old versions at target stage (optional)
- Log deployment metadata
- (Optional) Deploy to external services
Automatic Behavior:
When pass_evaluation is True, Pusher automatically:
Registers the model to MLflow Model Registry
Transitions model to the specified
target_stageArchives existing models at that stage (if
archive_existing_versions=True)
MLflow Usage:
- register_model() - Register model to Model Registry
- transition_model_version() - Move model to Production/Staging
- Log deployment parameters and status
Configuration:
from collie import Pusher
from collie.core.enums.ml_models import MLflowModelStage
# Configure Pusher with target stage
pusher = Pusher(
target_stage=MLflowModelStage.PRODUCTION, # Deploy to Production
archive_existing_versions=True, # Archive old Production models
description="Production deployment",
tags={"env": "production"}
)
Example : Basic MLflow Deployment (Recommended)
from collie import Pusher, Event
from collie.core import PusherPayload
from collie.core.enums.ml_models import MLflowModelStage
class MyPusher(Pusher):
def __init__(self):
super().__init__(
target_stage=MLflowModelStage.PRODUCTION,
archive_existing_versions=True,
description="Deploy to Production"
)
def handle(self, event: Event) -> Event:
# Pusher automatically handles:
# - Model registration
# - Stage transition
# - Old version archival
# You can add custom logic here if needed
# e.g., send notifications, update database
return Event(
payload=PusherPayload(
model_uri="Your model URI here"
)
)
Event-Based Data Flow
Event and Payload System
Collie uses an event-driven architecture where components communicate through Event objects containing typed Payload data:
Passing Data Between Components:
Standard Fields: Use predefined payload fields for common data
Extra Data: Use
extra_datafor custom/experimental data
from collie import Event, TransformerPayload
# Pass data using standard fields
event = Event(
payload=TransformerPayload(
train_data=df,
validation_data=val_df,
test_data=test_df
)
)
# Pass custom data using extra_data
event = Event(
payload=TransformerPayload(
train_data=df,
extra_data={
"feature_names": ["age", "income"],
"preprocessing_steps": ["scaling", "encoding"],
"data_version": "v2.1"
}
)
)
Accessing Data in Components
Components receive data through event.payload and can access both standard fields and custom data:
from collie import Event, Trainer, TrainerPayload
class MyTrainer(Trainer):
def handle(self, event: Event) -> Event:
# Access standard payload fields
train_data = event.payload.train_data
val_data = event.payload.validation_data
# Access custom data from extra_data
feature_names = event.payload.get_extra("feature_names", [])
hyperparams = event.payload.get_extra("best_params", {})
# Train model
model = train_model(train_data, hyperparams)
# Return new payload with results
return Event(
payload=TrainerPayload(
model=model,
extra_data={
"training_time": 120.5,
"n_epochs": 50
}
)
)
Data Passing Between Components
Components pass data through Event payloads:
from collie import Event
from collie.core import TransformerPayload, TrainerPayload, EvaluatorPayload
# Transformer output
class DataLoader(Transformer):
def handle(self, event: Event) -> Event:
# Load and process data
X_train, X_test, y_train, y_test = load_and_split_data()
return Event(
payload=TransformerPayload(
train_data=pd.concat([X_train, y_train], axis=1),
validation_data=None,
test_data=pd.concat([X_test, y_test], axis=1)
)
)
# Trainer uses transformer output
class ModelTrainer(Trainer):
def handle(self, event: Event) -> Event:
train_data = event.payload.train_data
X_train = train_data.drop("target", axis=1)
y_train = train_data["target"]
# ... train model ...
return Event(
payload=TrainerPayload(
model=trained_model
)
)
# Evaluator uses trainer output
class ModelEvaluator(Evaluator):
def handle(self, event: Event) -> Event:
model = event.payload.model
test_data = event.payload.test_data
X_test = test_data.drop("target", axis=1)
y_test = test_data["target"]
# Evaluate model
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
self.mlflow.log_metric("accuracy", accuracy)
return Event(
payload=EvaluatorPayload(
metrics=[{"accuracy": accuracy}],
is_better_than_production=True
)
)
Passing Custom Data
Each Payload has an extra_data field for custom data beyond standard fields:
Standard Fields vs Extra Data:
from collie import Event
from collie.core import TransformerPayload, TrainerPayload
# Use standard fields for common data
class MyTransformer(Transformer):
def handle(self, event: Event) -> Event:
return Event(
payload=TransformerPayload(
# Standard fields - strongly typed
train_data=train_df,
validation_data=val_df,
test_data=test_df,
# Custom data - flexible
extra_data={
"feature_names": ["age", "income", "score"],
"data_source": "database",
"preprocessing_steps": ["normalization", "encoding"],
"data_version": "v2.1"
}
)
)
Accessing Extra Data:
class MyTrainer(Trainer):
def handle(self, event: Event) -> Event:
# Access standard fields
train_data = event.payload.train_data
# Access extra data using helper methods (recommended)
feature_names = event.payload.get_extra("feature_names", [])
data_version = event.payload.get_extra("data_version", "unknown")
# Or check if exists first
if event.payload.has_extra("preprocessing_steps"):
steps = event.payload.get_extra("preprocessing_steps")
# Log custom info
self.mlflow.log_params({
"data_version": data_version,
"n_features": len(feature_names)
})
# Train model...
return Event(
payload=TrainerPayload(
model=model,
extra_data={
"training_time": training_time,
"early_stopping_epoch": best_epoch,
"optimizer": "Adam",
"train_loss": loss, # Optional metrics via extra_data
"val_loss": val_loss,
**event.payload.extra_data # Keep previous extra data
}
)
)
Common Use Cases for Extra Data:
Transformer: Feature engineering metadata, data quality metrics
Tuner: Trial history, convergence info, search space details
Trainer: Training curves, checkpoints, optimizer state
Evaluator: Detailed reports, plot file paths, per-class metrics
Pusher: Deployment endpoints, container IDs, rollback info
Best Practices:
Use standard fields for data that all pipelines need (model, train_data, metrics)
Use extra_data for pipeline-specific or experimental data
Model Comparison
Compare models using MLflow:
from collie import Event
from collie.core import Evaluator, EvaluatorPayload
from collie.core.enums.ml_models import MLflowModelStage, ModelFlavor
class ModelEvaluator(Evaluator):
def handle(self, event: Event) -> Event:
# Load production model
prod_model = self.load_latest_model(
model_name=self.registered_model_name,
stage=MLflowModelStage.PRODUCTION,
flavor=ModelFlavor.SKLEARN
)
# Get new model and test data
new_model = event.payload.model
test_data = event.payload.test_data
X_test = test_data.drop("target", axis=1)
y_test = test_data["target"]
new_score = new_model.score(X_test, y_test)
if prod_model is not None:
prod_score = prod_model.score(X_test, y_test)
improvement = new_score - prod_score
is_better = new_score > prod_score
else:
prod_score = 0
improvement = new_score
is_better = True
self.mlflow.log_metrics({
"new_model_score": new_score,
"prod_model_score": prod_score,
"improvement": improvement
})
return Event(
payload=EvaluatorPayload(
metrics=[
{
"new_model_score": new_score,
"prod_model_score": prod_score,
"improvement": improvement
}
],
is_better_than_production=is_better
)
)
Next Steps
See MLflow Integration for MLflow usage patterns
See Data Passing Guide for advanced data passing techniques