Machine Learning with Databricks
Learn how to build, train, and deploy machine learning models using Databricks ML runtime, MLflow, and AutoML.
Overview
Databricks provides a comprehensive platform for the entire ML lifecycle:
- Data preparation and feature engineering
- Experiment tracking with MLflow
- Model training and hyperparameter tuning
- Model deployment and serving
- AutoML for automated model development
MLflow Fundamentals
MLflow is an open-source platform for managing the ML lifecycle, deeply integrated with Databricks.
Setting Up MLflow
import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature
# Set experiment
mlflow.set_experiment("/Users/your.email@company.com/my-ml-experiment")
# Or create a new experiment
experiment_id = mlflow.create_experiment(
name="/Shared/team-experiments/customer-churn",
tags={"team": "data-science", "project": "churn-prediction"}
)
Tracking Experiments
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
# Load and prepare data
df = spark.table("feature_store.customer_features").toPandas()
X = df.drop("churn", axis=1)
y = df["churn"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Start MLflow run
with mlflow.start_run(run_name="random-forest-baseline") as run:
# Log parameters
n_estimators = 100
max_depth = 10
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
mlflow.log_param("model_type", "RandomForest")
# Train model
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
model.fit(X_train, y_train)
# Make predictions
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
# Log metrics
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_pred_proba)
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
mlflow.log_metric("auc_roc", auc)
# Log model
signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model(
model,
"model",
signature=signature,
input_example=X_train.iloc[:5]
)
# Log artifacts
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.savefig("/tmp/confusion_matrix.png")
mlflow.log_artifact("/tmp/confusion_matrix.png")
print(f"Run ID: {run.info.run_id}")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"AUC-ROC: {auc:.4f}")
Hyperparameter Tuning
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
import mlflow
# Define search space
search_space = {
'n_estimators': hp.choice('n_estimators', [50, 100, 200, 300]),
'max_depth': hp.choice('max_depth', [5, 10, 15, 20]),
'min_samples_split': hp.choice('min_samples_split', [2, 5, 10]),
'min_samples_leaf': hp.choice('min_samples_leaf', [1, 2, 4])
}
# Objective function
def objective(params):
with mlflow.start_run(nested=True):
# Log parameters
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params, random_state=42)
model.fit(X_train, y_train)
# Evaluate
y_pred_proba = model.predict_proba(X_test)[:, 1]
auc = roc_auc_score(y_test, y_pred_proba)
# Log metric
mlflow.log_metric("auc_roc", auc)
# Return loss (negative because hyperopt minimizes)
return {'loss': -auc, 'status': STATUS_OK}
# Run hyperparameter tuning
with mlflow.start_run(run_name="hyperparameter-tuning"):
spark_trials = SparkTrials(parallelism=4)
best_params = fmin(
fn=objective,
space=search_space,
algo=tpe.suggest,
max_evals=50,
trials=spark_trials
)
mlflow.log_params(best_params)
print(f"Best parameters: {best_params}")
Feature Engineering
Feature Store
Databricks Feature Store centralizes feature management:
from databricks.feature_store import FeatureStoreClient
fs = FeatureStoreClient()
# Create feature table
def compute_customer_features(data):
from pyspark.sql.functions import datediff, current_date, count, sum, avg
features = data.groupBy("customer_id").agg(
count("order_id").alias("total_orders"),
sum("order_amount").alias("total_spent"),
avg("order_amount").alias("avg_order_value"),
datediff(current_date(), max("order_date")).alias("days_since_last_order")
)
return features
# Write to Feature Store
customer_features = compute_customer_features(spark.table("orders"))
fs.create_table(
name="feature_store.customer_features",
primary_keys=["customer_id"],
df=customer_features,
description="Customer behavioral features"
)
# Update features
new_features = compute_customer_features(spark.table("orders"))
fs.write_table(
name="feature_store.customer_features",
df=new_features,
mode="merge"
)
Feature Engineering Patterns
from pyspark.sql.functions import col, when, datediff, current_date, lag, avg
from pyspark.sql.window import Window
# Time-based features
df = df.withColumn(
"days_since_signup",
datediff(current_date(), col("signup_date"))
)
# Binning continuous variables
df = df.withColumn(
"age_group",
when(col("age") < 25, "young")
.when((col("age") >= 25) & (col("age") < 40), "middle")
.when(col("age") >= 40, "senior")
)
# Window functions for lag features
window_spec = Window.partitionBy("customer_id").orderBy("date")
df = df.withColumn(
"previous_purchase_amount",
lag("amount", 1).over(window_spec)
)
# Rolling aggregations
window_30d = Window.partitionBy("customer_id").orderBy("date").rowsBetween(-30, 0)
df = df.withColumn(
"avg_30d_purchase",
avg("amount").over(window_30d)
)
# One-hot encoding
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
indexer = StringIndexer(inputCol="category", outputCol="category_index")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_encoded")
pipeline = Pipeline(stages=[indexer, encoder])
df_encoded = pipeline.fit(df).transform(df)
AutoML
Databricks AutoML automatically trains and tunes models:
from databricks import automl
# Load data
df = spark.table("feature_store.customer_features")
# Run AutoML for classification
summary = automl.classify(
dataset=df,
target_col="churn",
primary_metric="roc_auc",
timeout_minutes=30,
max_trials=50
)
# Access results
print(f"Best trial run ID: {summary.best_trial.mlflow_run_id}")
print(f"Best model metric: {summary.best_trial.metrics['roc_auc']}")
# Load best model
import mlflow
best_model = mlflow.sklearn.load_model(f"runs:/{summary.best_trial.mlflow_run_id}/model")
# Make predictions
predictions = best_model.predict(X_test)
AutoML for Different Tasks
# Regression
summary_reg = automl.regress(
dataset=df,
target_col="price",
primary_metric="rmse",
timeout_minutes=30
)
# Forecasting
summary_forecast = automl.forecast(
dataset=df,
target_col="sales",
time_col="date",
frequency="D",
horizon=30,
timeout_minutes=60
)
Deep Learning
Using TensorFlow/Keras
import mlflow
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Prepare data
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Build model
def create_model(learning_rate=0.001):
model = keras.Sequential([
layers.Dense(128, activation='relu', input_shape=(X_train.shape[1],)),
layers.Dropout(0.3),
layers.Dense(64, activation='relu'),
layers.Dropout(0.3),
layers.Dense(32, activation='relu'),
layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate),
loss='binary_crossentropy',
metrics=['accuracy', keras.metrics.AUC()]
)
return model
# Train with MLflow
with mlflow.start_run(run_name="neural-network"):
# Log parameters
mlflow.log_param("architecture", "128-64-32")
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("dropout", 0.3)
# Create and train model
model = create_model()
# Callbacks
early_stopping = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
)
history = model.fit(
X_train_scaled, y_train,
epochs=100,
batch_size=32,
validation_split=0.2,
callbacks=[early_stopping],
verbose=0
)
# Log metrics
test_loss, test_acc, test_auc = model.evaluate(X_test_scaled, y_test)
mlflow.log_metric("test_accuracy", test_acc)
mlflow.log_metric("test_auc", test_auc)
# Log model
mlflow.tensorflow.log_model(model, "model")
# Log training history
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='validation')
plt.title('Model Loss')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history.history['auc'], label='train')
plt.plot(history.history['val_auc'], label='validation')
plt.title('Model AUC')
plt.legend()
plt.savefig("/tmp/training_history.png")
mlflow.log_artifact("/tmp/training_history.png")
Using PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import mlflow.pytorch
# Define model
class NeuralNet(nn.Module):
def __init__(self, input_size):
super(NeuralNet, self).__init__()
self.fc1 = nn.Linear(input_size, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, 1)
self.dropout = nn.Dropout(0.3)
self.relu = nn.ReLU()
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.dropout(x)
x = self.relu(self.fc2(x))
x = self.dropout(x)
x = self.sigmoid(self.fc3(x))
return x
# Prepare data
X_train_tensor = torch.FloatTensor(X_train_scaled)
y_train_tensor = torch.FloatTensor(y_train.values).reshape(-1, 1)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# Train model
with mlflow.start_run(run_name="pytorch-model"):
model = NeuralNet(X_train.shape[1])
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training loop
epochs = 50
for epoch in range(epochs):
model.train()
total_loss = 0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 10 == 0:
avg_loss = total_loss / len(train_loader)
mlflow.log_metric("train_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
# Log model
mlflow.pytorch.log_model(model, "model")
Model Registry and Deployment
Registering Models
import mlflow
# Register model from a run
run_id = "your-run-id"
model_uri = f"runs:/{run_id}/model"
model_details = mlflow.register_model(
model_uri=model_uri,
name="customer_churn_model"
)
print(f"Model registered: {model_details.name}")
print(f"Version: {model_details.version}")
Model Versioning and Stages
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Transition model to staging
client.transition_model_version_stage(
name="customer_churn_model",
version=1,
stage="Staging"
)
# Add description
client.update_model_version(
name="customer_churn_model",
version=1,
description="RandomForest model with 95% accuracy"
)
# Transition to production
client.transition_model_version_stage(
name="customer_churn_model",
version=1,
stage="Production"
)
Batch Inference
import mlflow
# Load production model
model_name = "customer_churn_model"
model_stage = "Production"
model_uri = f"models:/{model_name}/{model_stage}"
model = mlflow.pyfunc.load_model(model_uri)
# Load data for inference
df_inference = spark.table("feature_store.customer_features")
# Make predictions
predictions_df = df_inference.withColumn(
"churn_prediction",
model.predict(df_inference.toPandas())
)
# Save predictions
predictions_df.write.format("delta").mode("overwrite").saveAsTable("predictions.customer_churn")
Real-time Model Serving
# Enable model serving via Databricks UI
# 1. Navigate to Models in the sidebar
# 2. Select your model
# 3. Click "Use model for inference"
# 4. Choose "Real-time" endpoint
# 5. Configure serving settings
# Query the endpoint
import requests
import json
# Endpoint details (from serving UI)
endpoint_url = "https://your-workspace.databricks.com/model/customer_churn_model/Production/invocations"
token = dbutils.secrets.get(scope="tokens", key="api-token")
# Prepare input
data = {
"dataframe_records": [
{
"total_orders": 15,
"total_spent": 2500.0,
"avg_order_value": 166.67,
"days_since_last_order": 3
}
]
}
# Make request
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
response = requests.post(endpoint_url, json=data, headers=headers)
predictions = response.json()
print(predictions)
Distributed ML with Spark ML
Spark MLlib Pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Load data
df = spark.table("feature_store.customer_features")
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Define pipeline stages
feature_cols = ["total_orders", "total_spent", "avg_order_value", "days_since_last_order"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
rf = RandomForestClassifier(
featuresCol="features",
labelCol="churn",
numTrees=100,
maxDepth=10
)
# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])
# Train model
with mlflow.start_run():
model = pipeline.fit(train_df)
# Make predictions
predictions = model.transform(test_df)
# Evaluate
evaluator = BinaryClassificationEvaluator(labelCol="churn")
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
# Log
mlflow.log_metric("auc_roc", auc)
mlflow.spark.log_model(model, "model")
print(f"AUC: {auc:.4f}")
Pandas UDF for Distributed Inference
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
import mlflow
# Load model
model = mlflow.sklearn.load_model("models:/customer_churn_model/Production")
# Define UDF
@pandas_udf("double")
def predict_udf(features: pd.Series) -> pd.Series:
# Convert to DataFrame with proper feature names
X = pd.DataFrame(features.tolist(), columns=feature_cols)
predictions = model.predict_proba(X)[:, 1]
return pd.Series(predictions)
# Apply at scale
df_with_predictions = df.withColumn(
"churn_probability",
predict_udf(struct([col(c) for c in feature_cols]))
)
display(df_with_predictions)
MLOps Best Practices
Model Monitoring
def monitor_model_performance(predictions_table, actuals_table):
"""Monitor model performance metrics."""
from pyspark.sql.functions import col, avg, count
from sklearn.metrics import accuracy_score, roc_auc_score
# Join predictions with actuals
df = spark.table(predictions_table).join(
spark.table(actuals_table),
"customer_id"
)
# Calculate metrics
y_true = df.select("actual_churn").toPandas()
y_pred = df.select("predicted_churn").toPandas()
y_prob = df.select("churn_probability").toPandas()
accuracy = accuracy_score(y_true, y_pred)
auc = roc_auc_score(y_true, y_prob)
# Log metrics
with mlflow.start_run(run_name="model-monitoring"):
mlflow.log_metric("production_accuracy", accuracy)
mlflow.log_metric("production_auc", auc)
# Alert if performance degrades
if auc < 0.85:
print("⚠️ Warning: Model AUC below threshold!")
# Send alert (email, Slack, etc.)
return {"accuracy": accuracy, "auc": auc}
# Run monitoring
metrics = monitor_model_performance("predictions.customer_churn", "actuals.customer_churn")
print(metrics)
A/B Testing
def ab_test_models(model_a, model_b, test_data, test_ratio=0.5):
"""Compare two models with A/B testing."""
import random
results = []
for row in test_data.collect():
# Randomly assign to A or B
use_model_a = random.random() < test_ratio
model = model_a if use_model_a else model_b
prediction = model.predict([row.features])[0]
results.append({
"customer_id": row.customer_id,
"model": "A" if use_model_a else "B",
"prediction": prediction,
"actual": row.label
})
# Analyze results
results_df = spark.createDataFrame(results)
model_a_acc = results_df.filter(col("model") == "A") \
.select(avg((col("prediction") == col("actual")).cast("int"))) \
.collect()[0][0]
model_b_acc = results_df.filter(col("model") == "B") \
.select(avg((col("prediction") == col("actual")).cast("int"))) \
.collect()[0][0]
print(f"Model A Accuracy: {model_a_acc:.4f}")
print(f"Model B Accuracy: {model_b_acc:.4f}")
return results_df
Next Steps
- Review Best Practices for production ML
- Explore Databricks ML Documentation
- Learn about Unity Catalog for ML