Building ML Pipelines
Learn how to build a complete end-to-end ML pipeline using Kubeflow. This example demonstrates a real-world scenario: predicting customer churn using a classification model.
Pipeline Architecture
Our pipeline will consist of the following stages:
- Data Ingestion: Load data from a data source
- Data Validation: Check data quality and schema
- Data Preprocessing: Clean and transform data
- Feature Engineering: Create features for the model
- Model Training: Train multiple models with hyperparameter tuning
- Model Evaluation: Evaluate model performance
- Model Validation: Validate against business metrics
- Model Deployment: Deploy the best model to production
- Monitoring Setup: Configure model monitoring
Step 1: Define Pipeline Components
Create reusable components for each pipeline stage:
Data Ingestion Component
# components/data_ingestion.py
from kfp.v2.dsl import component, Output, Dataset
@component(
base_image='python:3.9',
packages_to_install=['pandas', 'google-cloud-storage']
)
def ingest_data(
data_source: str,
output_dataset: Output[Dataset]
):
"""
Ingest data from source and save as a dataset.
Args:
data_source: Path to data source (GCS, S3, etc.)
output_dataset: Output dataset artifact
"""
import pandas as pd
from pathlib import Path
# Load data from source
df = pd.read_csv(data_source)
# Save dataset
output_path = Path(output_dataset.path)
output_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(output_dataset.path, index=False)
# Add metadata
output_dataset.metadata['rows'] = len(df)
output_dataset.metadata['columns'] = len(df.columns)
print(f"Ingested {len(df)} rows with {len(df.columns)} columns")
Data Preprocessing Component
# components/preprocessing.py
from kfp.v2.dsl import component, Input, Output, Dataset, Metrics
@component(
base_image='python:3.9',
packages_to_install=['pandas', 'scikit-learn']
)
def preprocess_data(
input_dataset: Input[Dataset],
output_train_dataset: Output[Dataset],
output_test_dataset: Output[Dataset],
metrics: Output[Metrics],
test_size: float = 0.2
):
"""
Preprocess data and split into train/test sets.
Args:
input_dataset: Input raw dataset
output_train_dataset: Output training dataset
output_test_dataset: Output test dataset
metrics: Output metrics
test_size: Proportion of data for testing
"""
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
import pickle
from pathlib import Path
# Load data
df = pd.read_csv(input_dataset.path)
# Handle missing values
df = df.dropna()
# Encode categorical variables
label_encoders = {}
for column in df.select_dtypes(include=['object']).columns:
if column != 'churn': # Assuming 'churn' is the target
le = LabelEncoder()
df[column] = le.fit_transform(df[column])
label_encoders[column] = le
# Separate features and target
X = df.drop('churn', axis=1)
y = df['churn']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42, stratify=y
)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Save datasets
train_df = pd.DataFrame(X_train_scaled, columns=X.columns)
train_df['churn'] = y_train.values
train_df.to_csv(output_train_dataset.path, index=False)
test_df = pd.DataFrame(X_test_scaled, columns=X.columns)
test_df['churn'] = y_test.values
test_df.to_csv(output_test_dataset.path, index=False)
# Log metrics
metrics.log_metric('train_samples', len(X_train))
metrics.log_metric('test_samples', len(X_test))
metrics.log_metric('num_features', len(X.columns))
print(f"Train set: {len(X_train)} samples")
print(f"Test set: {len(X_test)} samples")
Model Training Component
# components/training.py
from kfp.v2.dsl import component, Input, Output, Dataset, Model, Metrics
@component(
base_image='python:3.9',
packages_to_install=['pandas', 'scikit-learn', 'xgboost']
)
def train_model(
train_dataset: Input[Dataset],
model_output: Output[Model],
metrics: Output[Metrics],
model_type: str = 'xgboost',
max_depth: int = 6,
n_estimators: int = 100,
learning_rate: float = 0.1
):
"""
Train a machine learning model.
Args:
train_dataset: Training dataset
model_output: Output trained model
metrics: Output training metrics
model_type: Type of model to train
max_depth: Maximum tree depth
n_estimators: Number of trees
learning_rate: Learning rate
"""
import pandas as pd
import pickle
from pathlib import Path
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# Load training data
df = pd.read_csv(train_dataset.path)
X_train = df.drop('churn', axis=1)
y_train = df['churn']
# Select and train model
if model_type == 'xgboost':
model = XGBClassifier(
max_depth=max_depth,
n_estimators=n_estimators,
learning_rate=learning_rate,
random_state=42
)
elif model_type == 'random_forest':
model = RandomForestClassifier(
max_depth=max_depth,
n_estimators=n_estimators,
random_state=42
)
else:
raise ValueError(f"Unsupported model type: {model_type}")
# Train model
model.fit(X_train, y_train)
# Calculate training metrics
y_pred_train = model.predict(X_train)
train_accuracy = accuracy_score(y_train, y_pred_train)
train_precision = precision_score(y_train, y_pred_train, average='weighted')
train_recall = recall_score(y_train, y_pred_train, average='weighted')
train_f1 = f1_score(y_train, y_pred_train, average='weighted')
# Save model
model_path = Path(model_output.path)
model_path.parent.mkdir(parents=True, exist_ok=True)
with open(model_output.path, 'wb') as f:
pickle.dump(model, f)
# Log metrics
metrics.log_metric('train_accuracy', train_accuracy)
metrics.log_metric('train_precision', train_precision)
metrics.log_metric('train_recall', train_recall)
metrics.log_metric('train_f1', train_f1)
# Add metadata
model_output.metadata['model_type'] = model_type
model_output.metadata['framework'] = 'xgboost' if model_type == 'xgboost' else 'sklearn'
print(f"Model trained successfully")
print(f"Training Accuracy: {train_accuracy:.4f}")
print(f"Training F1 Score: {train_f1:.4f}")
Model Evaluation Component
# components/evaluation.py
from kfp.v2.dsl import component, Input, Output, Dataset, Model, Metrics, ClassificationMetrics
@component(
base_image='python:3.9',
packages_to_install=['pandas', 'scikit-learn', 'xgboost']
)
def evaluate_model(
test_dataset: Input[Dataset],
model: Input[Model],
metrics: Output[Metrics],
classification_metrics: Output[ClassificationMetrics]
):
"""
Evaluate trained model on test data.
Args:
test_dataset: Test dataset
model: Trained model
metrics: Output evaluation metrics
classification_metrics: Output classification metrics
"""
import pandas as pd
import pickle
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, confusion_matrix, classification_report
)
# Load test data
df = pd.read_csv(test_dataset.path)
X_test = df.drop('churn', axis=1)
y_test = df['churn']
# Load model
with open(model.path, 'rb') as f:
trained_model = pickle.load(f)
# Make predictions
y_pred = trained_model.predict(X_test)
y_pred_proba = trained_model.predict_proba(X_test)[:, 1]
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')
roc_auc = roc_auc_score(y_test, y_pred_proba)
# Confusion matrix
cm = confusion_matrix(y_test, y_pred)
# Log metrics
metrics.log_metric('test_accuracy', accuracy)
metrics.log_metric('test_precision', precision)
metrics.log_metric('test_recall', recall)
metrics.log_metric('test_f1', f1)
metrics.log_metric('test_roc_auc', roc_auc)
# Log classification metrics
classification_metrics.log_confusion_matrix(
['no_churn', 'churn'],
cm.tolist()
)
print(f"Model Evaluation Results:")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")
print(f"\nConfusion Matrix:\n{cm}")
# Print classification report
report = classification_report(y_test, y_pred)
print(f"\nClassification Report:\n{report}")
Step 2: Build the Complete Pipeline
Now, let's combine all components into a complete pipeline:
# pipeline.py
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component
@pipeline(
name='customer-churn-prediction-pipeline',
description='End-to-end pipeline for customer churn prediction'
)
def churn_prediction_pipeline(
data_source: str = 'gs://my-bucket/customer_data.csv',
test_size: float = 0.2,
model_type: str = 'xgboost',
max_depth: int = 6,
n_estimators: int = 100,
learning_rate: float = 0.1,
min_accuracy: float = 0.85
):
"""
Complete MLOps pipeline for customer churn prediction.
Args:
data_source: Path to input data
test_size: Test set proportion
model_type: Type of model to train
max_depth: Model max depth
n_estimators: Number of estimators
learning_rate: Learning rate
min_accuracy: Minimum acceptable accuracy
"""
# Step 1: Data Ingestion
data_ingestion_task = ingest_data(data_source=data_source)
# Step 2: Data Preprocessing
preprocessing_task = preprocess_data(
input_dataset=data_ingestion_task.outputs['output_dataset'],
test_size=test_size
)
# Step 3: Model Training
training_task = train_model(
train_dataset=preprocessing_task.outputs['output_train_dataset'],
model_type=model_type,
max_depth=max_depth,
n_estimators=n_estimators,
learning_rate=learning_rate
)
# Step 4: Model Evaluation
evaluation_task = evaluate_model(
test_dataset=preprocessing_task.outputs['output_test_dataset'],
model=training_task.outputs['model_output']
)
# Step 5: Conditional deployment based on accuracy
with dsl.Condition(
evaluation_task.outputs['metrics'].get_metric('test_accuracy') >= min_accuracy,
name='check-model-accuracy'
):
# Step 6: Deploy model if accuracy threshold is met
deploy_task = deploy_model(
model=training_task.outputs['model_output'],
model_name='churn-predictor',
namespace='ml-serving'
)
# Compile the pipeline
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=churn_prediction_pipeline,
package_path='churn_prediction_pipeline.yaml'
)
Step 3: Model Deployment Component
Create a component to deploy the model using KServe:
# components/deployment.py
from kfp.v2.dsl import component, Input, Model
@component(
base_image='python:3.9',
packages_to_install=['kubernetes']
)
def deploy_model(
model: Input[Model],
model_name: str,
namespace: str = 'default'
):
"""
Deploy model using KServe InferenceService.
Args:
model: Trained model artifact
model_name: Name for the deployed model
namespace: Kubernetes namespace
"""
from kubernetes import client, config
import yaml
# Load Kubernetes config
config.load_incluster_config()
# Create InferenceService manifest
inference_service = {
'apiVersion': 'serving.kserve.io/v1beta1',
'kind': 'InferenceService',
'metadata': {
'name': model_name,
'namespace': namespace
},
'spec': {
'predictor': {
'sklearn': {
'storageUri': model.uri,
'resources': {
'requests': {
'cpu': '100m',
'memory': '256Mi'
},
'limits': {
'cpu': '1',
'memory': '1Gi'
}
}
}
}
}
}
# Apply the InferenceService
api = client.CustomObjectsApi()
try:
api.create_namespaced_custom_object(
group='serving.kserve.io',
version='v1beta1',
namespace=namespace,
plural='inferenceservices',
body=inference_service
)
print(f"Model deployed successfully as {model_name}")
except client.exceptions.ApiException as e:
if e.status == 409:
# Update existing service
api.patch_namespaced_custom_object(
group='serving.kserve.io',
version='v1beta1',
namespace=namespace,
plural='inferenceservices',
name=model_name,
body=inference_service
)
print(f"Model {model_name} updated successfully")
else:
raise
Step 4: Run the Pipeline
Execute the pipeline using the Kubeflow Pipelines SDK:
# run_pipeline.py
from kfp import Client
from datetime import datetime
# Initialize Kubeflow Pipelines client
client = Client(host='http://localhost:8080')
# Create an experiment
experiment_name = 'customer-churn-prediction'
experiment = client.create_experiment(name=experiment_name)
# Run the pipeline
run_name = f'churn-pipeline-{datetime.now().strftime("%Y%m%d-%H%M%S")}'
run = client.run_pipeline(
experiment_id=experiment.id,
job_name=run_name,
pipeline_package_path='churn_prediction_pipeline.yaml',
params={
'data_source': 'gs://my-bucket/customer_data.csv',
'test_size': 0.2,
'model_type': 'xgboost',
'max_depth': 6,
'n_estimators': 100,
'learning_rate': 0.1,
'min_accuracy': 0.85
}
)
print(f"Pipeline run created: {run.id}")
print(f"View run at: {client._get_url_prefix()}/#/runs/details/{run.id}")
Step 5: Hyperparameter Tuning with Katib
For advanced model optimization, use Katib to automatically tune hyperparameters:
# katib-experiment.yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
name: churn-model-hpo
namespace: kubeflow
spec:
objective:
type: maximize
goal: 0.95
objectiveMetricName: accuracy
algorithm:
algorithmName: bayesianoptimization
parallelTrialCount: 3
maxTrialCount: 20
maxFailedTrialCount: 3
parameters:
- name: max_depth
parameterType: int
feasibleSpace:
min: "3"
max: "10"
- name: n_estimators
parameterType: int
feasibleSpace:
min: "50"
max: "200"
- name: learning_rate
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.3"
trialTemplate:
primaryContainerName: training-container
trialParameters:
- name: maxDepth
description: Maximum depth of trees
reference: max_depth
- name: nEstimators
description: Number of trees
reference: n_estimators
- name: learningRate
description: Learning rate
reference: learning_rate
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: python:3.9
command:
- python
- /app/train.py
- --max-depth=${trialParameters.maxDepth}
- --n-estimators=${trialParameters.nEstimators}
- --learning-rate=${trialParameters.learningRate}
restartPolicy: Never
Apply the Katib experiment:
kubectl apply -f katib-experiment.yaml
# Monitor the experiment
kubectl get experiment churn-model-hpo -n kubeflow -w
# View trials
kubectl get trials -n kubeflow
# Get best trial results
kubectl get experiment churn-model-hpo -n kubeflow -o yaml
Next Steps
Now that you've built your first pipeline, learn about:
- Model Deployment - Deploy models for serving
- Monitoring - Monitor pipeline and model performance
- Best Practices - Follow MLOps best practices