Skip to main content

Building ML Pipelines

Learn how to build a complete end-to-end ML pipeline using Kubeflow. This example demonstrates a real-world scenario: predicting customer churn using a classification model.

Pipeline Architecture

Our pipeline will consist of the following stages:

  1. Data Ingestion: Load data from a data source
  2. Data Validation: Check data quality and schema
  3. Data Preprocessing: Clean and transform data
  4. Feature Engineering: Create features for the model
  5. Model Training: Train multiple models with hyperparameter tuning
  6. Model Evaluation: Evaluate model performance
  7. Model Validation: Validate against business metrics
  8. Model Deployment: Deploy the best model to production
  9. Monitoring Setup: Configure model monitoring

Step 1: Define Pipeline Components

Create reusable components for each pipeline stage:

Data Ingestion Component

# components/data_ingestion.py
from kfp.v2.dsl import component, Output, Dataset

@component(
base_image='python:3.9',
packages_to_install=['pandas', 'google-cloud-storage']
)
def ingest_data(
data_source: str,
output_dataset: Output[Dataset]
):
"""
Ingest data from source and save as a dataset.

Args:
data_source: Path to data source (GCS, S3, etc.)
output_dataset: Output dataset artifact
"""
import pandas as pd
from pathlib import Path

# Load data from source
df = pd.read_csv(data_source)

# Save dataset
output_path = Path(output_dataset.path)
output_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(output_dataset.path, index=False)

# Add metadata
output_dataset.metadata['rows'] = len(df)
output_dataset.metadata['columns'] = len(df.columns)

print(f"Ingested {len(df)} rows with {len(df.columns)} columns")

Data Preprocessing Component

# components/preprocessing.py
from kfp.v2.dsl import component, Input, Output, Dataset, Metrics

@component(
base_image='python:3.9',
packages_to_install=['pandas', 'scikit-learn']
)
def preprocess_data(
input_dataset: Input[Dataset],
output_train_dataset: Output[Dataset],
output_test_dataset: Output[Dataset],
metrics: Output[Metrics],
test_size: float = 0.2
):
"""
Preprocess data and split into train/test sets.

Args:
input_dataset: Input raw dataset
output_train_dataset: Output training dataset
output_test_dataset: Output test dataset
metrics: Output metrics
test_size: Proportion of data for testing
"""
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
import pickle
from pathlib import Path

# Load data
df = pd.read_csv(input_dataset.path)

# Handle missing values
df = df.dropna()

# Encode categorical variables
label_encoders = {}
for column in df.select_dtypes(include=['object']).columns:
if column != 'churn': # Assuming 'churn' is the target
le = LabelEncoder()
df[column] = le.fit_transform(df[column])
label_encoders[column] = le

# Separate features and target
X = df.drop('churn', axis=1)
y = df['churn']

# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42, stratify=y
)

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Save datasets
train_df = pd.DataFrame(X_train_scaled, columns=X.columns)
train_df['churn'] = y_train.values
train_df.to_csv(output_train_dataset.path, index=False)

test_df = pd.DataFrame(X_test_scaled, columns=X.columns)
test_df['churn'] = y_test.values
test_df.to_csv(output_test_dataset.path, index=False)

# Log metrics
metrics.log_metric('train_samples', len(X_train))
metrics.log_metric('test_samples', len(X_test))
metrics.log_metric('num_features', len(X.columns))

print(f"Train set: {len(X_train)} samples")
print(f"Test set: {len(X_test)} samples")

Model Training Component

# components/training.py
from kfp.v2.dsl import component, Input, Output, Dataset, Model, Metrics

@component(
base_image='python:3.9',
packages_to_install=['pandas', 'scikit-learn', 'xgboost']
)
def train_model(
train_dataset: Input[Dataset],
model_output: Output[Model],
metrics: Output[Metrics],
model_type: str = 'xgboost',
max_depth: int = 6,
n_estimators: int = 100,
learning_rate: float = 0.1
):
"""
Train a machine learning model.

Args:
train_dataset: Training dataset
model_output: Output trained model
metrics: Output training metrics
model_type: Type of model to train
max_depth: Maximum tree depth
n_estimators: Number of trees
learning_rate: Learning rate
"""
import pandas as pd
import pickle
from pathlib import Path
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Load training data
df = pd.read_csv(train_dataset.path)
X_train = df.drop('churn', axis=1)
y_train = df['churn']

# Select and train model
if model_type == 'xgboost':
model = XGBClassifier(
max_depth=max_depth,
n_estimators=n_estimators,
learning_rate=learning_rate,
random_state=42
)
elif model_type == 'random_forest':
model = RandomForestClassifier(
max_depth=max_depth,
n_estimators=n_estimators,
random_state=42
)
else:
raise ValueError(f"Unsupported model type: {model_type}")

# Train model
model.fit(X_train, y_train)

# Calculate training metrics
y_pred_train = model.predict(X_train)
train_accuracy = accuracy_score(y_train, y_pred_train)
train_precision = precision_score(y_train, y_pred_train, average='weighted')
train_recall = recall_score(y_train, y_pred_train, average='weighted')
train_f1 = f1_score(y_train, y_pred_train, average='weighted')

# Save model
model_path = Path(model_output.path)
model_path.parent.mkdir(parents=True, exist_ok=True)
with open(model_output.path, 'wb') as f:
pickle.dump(model, f)

# Log metrics
metrics.log_metric('train_accuracy', train_accuracy)
metrics.log_metric('train_precision', train_precision)
metrics.log_metric('train_recall', train_recall)
metrics.log_metric('train_f1', train_f1)

# Add metadata
model_output.metadata['model_type'] = model_type
model_output.metadata['framework'] = 'xgboost' if model_type == 'xgboost' else 'sklearn'

print(f"Model trained successfully")
print(f"Training Accuracy: {train_accuracy:.4f}")
print(f"Training F1 Score: {train_f1:.4f}")

Model Evaluation Component

# components/evaluation.py
from kfp.v2.dsl import component, Input, Output, Dataset, Model, Metrics, ClassificationMetrics

@component(
base_image='python:3.9',
packages_to_install=['pandas', 'scikit-learn', 'xgboost']
)
def evaluate_model(
test_dataset: Input[Dataset],
model: Input[Model],
metrics: Output[Metrics],
classification_metrics: Output[ClassificationMetrics]
):
"""
Evaluate trained model on test data.

Args:
test_dataset: Test dataset
model: Trained model
metrics: Output evaluation metrics
classification_metrics: Output classification metrics
"""
import pandas as pd
import pickle
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, confusion_matrix, classification_report
)

# Load test data
df = pd.read_csv(test_dataset.path)
X_test = df.drop('churn', axis=1)
y_test = df['churn']

# Load model
with open(model.path, 'rb') as f:
trained_model = pickle.load(f)

# Make predictions
y_pred = trained_model.predict(X_test)
y_pred_proba = trained_model.predict_proba(X_test)[:, 1]

# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')
roc_auc = roc_auc_score(y_test, y_pred_proba)

# Confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Log metrics
metrics.log_metric('test_accuracy', accuracy)
metrics.log_metric('test_precision', precision)
metrics.log_metric('test_recall', recall)
metrics.log_metric('test_f1', f1)
metrics.log_metric('test_roc_auc', roc_auc)

# Log classification metrics
classification_metrics.log_confusion_matrix(
['no_churn', 'churn'],
cm.tolist()
)

print(f"Model Evaluation Results:")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")
print(f"\nConfusion Matrix:\n{cm}")

# Print classification report
report = classification_report(y_test, y_pred)
print(f"\nClassification Report:\n{report}")

Step 2: Build the Complete Pipeline

Now, let's combine all components into a complete pipeline:

# pipeline.py
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component

@pipeline(
name='customer-churn-prediction-pipeline',
description='End-to-end pipeline for customer churn prediction'
)
def churn_prediction_pipeline(
data_source: str = 'gs://my-bucket/customer_data.csv',
test_size: float = 0.2,
model_type: str = 'xgboost',
max_depth: int = 6,
n_estimators: int = 100,
learning_rate: float = 0.1,
min_accuracy: float = 0.85
):
"""
Complete MLOps pipeline for customer churn prediction.

Args:
data_source: Path to input data
test_size: Test set proportion
model_type: Type of model to train
max_depth: Model max depth
n_estimators: Number of estimators
learning_rate: Learning rate
min_accuracy: Minimum acceptable accuracy
"""

# Step 1: Data Ingestion
data_ingestion_task = ingest_data(data_source=data_source)

# Step 2: Data Preprocessing
preprocessing_task = preprocess_data(
input_dataset=data_ingestion_task.outputs['output_dataset'],
test_size=test_size
)

# Step 3: Model Training
training_task = train_model(
train_dataset=preprocessing_task.outputs['output_train_dataset'],
model_type=model_type,
max_depth=max_depth,
n_estimators=n_estimators,
learning_rate=learning_rate
)

# Step 4: Model Evaluation
evaluation_task = evaluate_model(
test_dataset=preprocessing_task.outputs['output_test_dataset'],
model=training_task.outputs['model_output']
)

# Step 5: Conditional deployment based on accuracy
with dsl.Condition(
evaluation_task.outputs['metrics'].get_metric('test_accuracy') >= min_accuracy,
name='check-model-accuracy'
):
# Step 6: Deploy model if accuracy threshold is met
deploy_task = deploy_model(
model=training_task.outputs['model_output'],
model_name='churn-predictor',
namespace='ml-serving'
)

# Compile the pipeline
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=churn_prediction_pipeline,
package_path='churn_prediction_pipeline.yaml'
)

Step 3: Model Deployment Component

Create a component to deploy the model using KServe:

# components/deployment.py
from kfp.v2.dsl import component, Input, Model

@component(
base_image='python:3.9',
packages_to_install=['kubernetes']
)
def deploy_model(
model: Input[Model],
model_name: str,
namespace: str = 'default'
):
"""
Deploy model using KServe InferenceService.

Args:
model: Trained model artifact
model_name: Name for the deployed model
namespace: Kubernetes namespace
"""
from kubernetes import client, config
import yaml

# Load Kubernetes config
config.load_incluster_config()

# Create InferenceService manifest
inference_service = {
'apiVersion': 'serving.kserve.io/v1beta1',
'kind': 'InferenceService',
'metadata': {
'name': model_name,
'namespace': namespace
},
'spec': {
'predictor': {
'sklearn': {
'storageUri': model.uri,
'resources': {
'requests': {
'cpu': '100m',
'memory': '256Mi'
},
'limits': {
'cpu': '1',
'memory': '1Gi'
}
}
}
}
}
}

# Apply the InferenceService
api = client.CustomObjectsApi()
try:
api.create_namespaced_custom_object(
group='serving.kserve.io',
version='v1beta1',
namespace=namespace,
plural='inferenceservices',
body=inference_service
)
print(f"Model deployed successfully as {model_name}")
except client.exceptions.ApiException as e:
if e.status == 409:
# Update existing service
api.patch_namespaced_custom_object(
group='serving.kserve.io',
version='v1beta1',
namespace=namespace,
plural='inferenceservices',
name=model_name,
body=inference_service
)
print(f"Model {model_name} updated successfully")
else:
raise

Step 4: Run the Pipeline

Execute the pipeline using the Kubeflow Pipelines SDK:

# run_pipeline.py
from kfp import Client
from datetime import datetime

# Initialize Kubeflow Pipelines client
client = Client(host='http://localhost:8080')

# Create an experiment
experiment_name = 'customer-churn-prediction'
experiment = client.create_experiment(name=experiment_name)

# Run the pipeline
run_name = f'churn-pipeline-{datetime.now().strftime("%Y%m%d-%H%M%S")}'
run = client.run_pipeline(
experiment_id=experiment.id,
job_name=run_name,
pipeline_package_path='churn_prediction_pipeline.yaml',
params={
'data_source': 'gs://my-bucket/customer_data.csv',
'test_size': 0.2,
'model_type': 'xgboost',
'max_depth': 6,
'n_estimators': 100,
'learning_rate': 0.1,
'min_accuracy': 0.85
}
)

print(f"Pipeline run created: {run.id}")
print(f"View run at: {client._get_url_prefix()}/#/runs/details/{run.id}")

Step 5: Hyperparameter Tuning with Katib

For advanced model optimization, use Katib to automatically tune hyperparameters:

# katib-experiment.yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
name: churn-model-hpo
namespace: kubeflow
spec:
objective:
type: maximize
goal: 0.95
objectiveMetricName: accuracy
algorithm:
algorithmName: bayesianoptimization
parallelTrialCount: 3
maxTrialCount: 20
maxFailedTrialCount: 3
parameters:
- name: max_depth
parameterType: int
feasibleSpace:
min: "3"
max: "10"
- name: n_estimators
parameterType: int
feasibleSpace:
min: "50"
max: "200"
- name: learning_rate
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.3"
trialTemplate:
primaryContainerName: training-container
trialParameters:
- name: maxDepth
description: Maximum depth of trees
reference: max_depth
- name: nEstimators
description: Number of trees
reference: n_estimators
- name: learningRate
description: Learning rate
reference: learning_rate
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: python:3.9
command:
- python
- /app/train.py
- --max-depth=${trialParameters.maxDepth}
- --n-estimators=${trialParameters.nEstimators}
- --learning-rate=${trialParameters.learningRate}
restartPolicy: Never

Apply the Katib experiment:

kubectl apply -f katib-experiment.yaml

# Monitor the experiment
kubectl get experiment churn-model-hpo -n kubeflow -w

# View trials
kubectl get trials -n kubeflow

# Get best trial results
kubectl get experiment churn-model-hpo -n kubeflow -o yaml

Next Steps

Now that you've built your first pipeline, learn about: