Skip to main content

Machine Learning with Databricks

Learn how to build, train, and deploy machine learning models using Databricks ML runtime, MLflow, and AutoML.

Overview

Databricks provides a comprehensive platform for the entire ML lifecycle:

  • Data preparation and feature engineering
  • Experiment tracking with MLflow
  • Model training and hyperparameter tuning
  • Model deployment and serving
  • AutoML for automated model development

MLflow Fundamentals

MLflow is an open-source platform for managing the ML lifecycle, deeply integrated with Databricks.

Setting Up MLflow

import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature

# Set experiment
mlflow.set_experiment("/Users/your.email@company.com/my-ml-experiment")

# Or create a new experiment
experiment_id = mlflow.create_experiment(
name="/Shared/team-experiments/customer-churn",
tags={"team": "data-science", "project": "churn-prediction"}
)

Tracking Experiments

import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score

# Load and prepare data
df = spark.table("feature_store.customer_features").toPandas()
X = df.drop("churn", axis=1)
y = df["churn"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Start MLflow run
with mlflow.start_run(run_name="random-forest-baseline") as run:
# Log parameters
n_estimators = 100
max_depth = 10
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
mlflow.log_param("model_type", "RandomForest")

# Train model
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
model.fit(X_train, y_train)

# Make predictions
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]

# Log metrics
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_pred_proba)

mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
mlflow.log_metric("auc_roc", auc)

# Log model
signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model(
model,
"model",
signature=signature,
input_example=X_train.iloc[:5]
)

# Log artifacts
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.savefig("/tmp/confusion_matrix.png")
mlflow.log_artifact("/tmp/confusion_matrix.png")

print(f"Run ID: {run.info.run_id}")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"AUC-ROC: {auc:.4f}")

Hyperparameter Tuning

from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
import mlflow

# Define search space
search_space = {
'n_estimators': hp.choice('n_estimators', [50, 100, 200, 300]),
'max_depth': hp.choice('max_depth', [5, 10, 15, 20]),
'min_samples_split': hp.choice('min_samples_split', [2, 5, 10]),
'min_samples_leaf': hp.choice('min_samples_leaf', [1, 2, 4])
}

# Objective function
def objective(params):
with mlflow.start_run(nested=True):
# Log parameters
mlflow.log_params(params)

# Train model
model = RandomForestClassifier(**params, random_state=42)
model.fit(X_train, y_train)

# Evaluate
y_pred_proba = model.predict_proba(X_test)[:, 1]
auc = roc_auc_score(y_test, y_pred_proba)

# Log metric
mlflow.log_metric("auc_roc", auc)

# Return loss (negative because hyperopt minimizes)
return {'loss': -auc, 'status': STATUS_OK}

# Run hyperparameter tuning
with mlflow.start_run(run_name="hyperparameter-tuning"):
spark_trials = SparkTrials(parallelism=4)

best_params = fmin(
fn=objective,
space=search_space,
algo=tpe.suggest,
max_evals=50,
trials=spark_trials
)

mlflow.log_params(best_params)
print(f"Best parameters: {best_params}")

Feature Engineering

Feature Store

Databricks Feature Store centralizes feature management:

from databricks.feature_store import FeatureStoreClient

fs = FeatureStoreClient()

# Create feature table
def compute_customer_features(data):
from pyspark.sql.functions import datediff, current_date, count, sum, avg

features = data.groupBy("customer_id").agg(
count("order_id").alias("total_orders"),
sum("order_amount").alias("total_spent"),
avg("order_amount").alias("avg_order_value"),
datediff(current_date(), max("order_date")).alias("days_since_last_order")
)
return features

# Write to Feature Store
customer_features = compute_customer_features(spark.table("orders"))

fs.create_table(
name="feature_store.customer_features",
primary_keys=["customer_id"],
df=customer_features,
description="Customer behavioral features"
)

# Update features
new_features = compute_customer_features(spark.table("orders"))
fs.write_table(
name="feature_store.customer_features",
df=new_features,
mode="merge"
)

Feature Engineering Patterns

from pyspark.sql.functions import col, when, datediff, current_date, lag, avg
from pyspark.sql.window import Window

# Time-based features
df = df.withColumn(
"days_since_signup",
datediff(current_date(), col("signup_date"))
)

# Binning continuous variables
df = df.withColumn(
"age_group",
when(col("age") < 25, "young")
.when((col("age") >= 25) & (col("age") < 40), "middle")
.when(col("age") >= 40, "senior")
)

# Window functions for lag features
window_spec = Window.partitionBy("customer_id").orderBy("date")
df = df.withColumn(
"previous_purchase_amount",
lag("amount", 1).over(window_spec)
)

# Rolling aggregations
window_30d = Window.partitionBy("customer_id").orderBy("date").rowsBetween(-30, 0)
df = df.withColumn(
"avg_30d_purchase",
avg("amount").over(window_30d)
)

# One-hot encoding
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

indexer = StringIndexer(inputCol="category", outputCol="category_index")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_encoded")
pipeline = Pipeline(stages=[indexer, encoder])

df_encoded = pipeline.fit(df).transform(df)

AutoML

Databricks AutoML automatically trains and tunes models:

from databricks import automl

# Load data
df = spark.table("feature_store.customer_features")

# Run AutoML for classification
summary = automl.classify(
dataset=df,
target_col="churn",
primary_metric="roc_auc",
timeout_minutes=30,
max_trials=50
)

# Access results
print(f"Best trial run ID: {summary.best_trial.mlflow_run_id}")
print(f"Best model metric: {summary.best_trial.metrics['roc_auc']}")

# Load best model
import mlflow
best_model = mlflow.sklearn.load_model(f"runs:/{summary.best_trial.mlflow_run_id}/model")

# Make predictions
predictions = best_model.predict(X_test)

AutoML for Different Tasks

# Regression
summary_reg = automl.regress(
dataset=df,
target_col="price",
primary_metric="rmse",
timeout_minutes=30
)

# Forecasting
summary_forecast = automl.forecast(
dataset=df,
target_col="sales",
time_col="date",
frequency="D",
horizon=30,
timeout_minutes=60
)

Deep Learning

Using TensorFlow/Keras

import mlflow
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Prepare data
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Build model
def create_model(learning_rate=0.001):
model = keras.Sequential([
layers.Dense(128, activation='relu', input_shape=(X_train.shape[1],)),
layers.Dropout(0.3),
layers.Dense(64, activation='relu'),
layers.Dropout(0.3),
layers.Dense(32, activation='relu'),
layers.Dense(1, activation='sigmoid')
])

model.compile(
optimizer=keras.optimizers.Adam(learning_rate),
loss='binary_crossentropy',
metrics=['accuracy', keras.metrics.AUC()]
)
return model

# Train with MLflow
with mlflow.start_run(run_name="neural-network"):
# Log parameters
mlflow.log_param("architecture", "128-64-32")
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("dropout", 0.3)

# Create and train model
model = create_model()

# Callbacks
early_stopping = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
)

history = model.fit(
X_train_scaled, y_train,
epochs=100,
batch_size=32,
validation_split=0.2,
callbacks=[early_stopping],
verbose=0
)

# Log metrics
test_loss, test_acc, test_auc = model.evaluate(X_test_scaled, y_test)
mlflow.log_metric("test_accuracy", test_acc)
mlflow.log_metric("test_auc", test_auc)

# Log model
mlflow.tensorflow.log_model(model, "model")

# Log training history
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='validation')
plt.title('Model Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['auc'], label='train')
plt.plot(history.history['val_auc'], label='validation')
plt.title('Model AUC')
plt.legend()

plt.savefig("/tmp/training_history.png")
mlflow.log_artifact("/tmp/training_history.png")

Using PyTorch

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import mlflow.pytorch

# Define model
class NeuralNet(nn.Module):
def __init__(self, input_size):
super(NeuralNet, self).__init__()
self.fc1 = nn.Linear(input_size, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, 1)
self.dropout = nn.Dropout(0.3)
self.relu = nn.ReLU()
self.sigmoid = nn.Sigmoid()

def forward(self, x):
x = self.relu(self.fc1(x))
x = self.dropout(x)
x = self.relu(self.fc2(x))
x = self.dropout(x)
x = self.sigmoid(self.fc3(x))
return x

# Prepare data
X_train_tensor = torch.FloatTensor(X_train_scaled)
y_train_tensor = torch.FloatTensor(y_train.values).reshape(-1, 1)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Train model
with mlflow.start_run(run_name="pytorch-model"):
model = NeuralNet(X_train.shape[1])
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
epochs = 50
for epoch in range(epochs):
model.train()
total_loss = 0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()

if (epoch + 1) % 10 == 0:
avg_loss = total_loss / len(train_loader)
mlflow.log_metric("train_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

# Log model
mlflow.pytorch.log_model(model, "model")

Model Registry and Deployment

Registering Models

import mlflow

# Register model from a run
run_id = "your-run-id"
model_uri = f"runs:/{run_id}/model"

model_details = mlflow.register_model(
model_uri=model_uri,
name="customer_churn_model"
)

print(f"Model registered: {model_details.name}")
print(f"Version: {model_details.version}")

Model Versioning and Stages

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Transition model to staging
client.transition_model_version_stage(
name="customer_churn_model",
version=1,
stage="Staging"
)

# Add description
client.update_model_version(
name="customer_churn_model",
version=1,
description="RandomForest model with 95% accuracy"
)

# Transition to production
client.transition_model_version_stage(
name="customer_churn_model",
version=1,
stage="Production"
)

Batch Inference

import mlflow

# Load production model
model_name = "customer_churn_model"
model_stage = "Production"
model_uri = f"models:/{model_name}/{model_stage}"

model = mlflow.pyfunc.load_model(model_uri)

# Load data for inference
df_inference = spark.table("feature_store.customer_features")

# Make predictions
predictions_df = df_inference.withColumn(
"churn_prediction",
model.predict(df_inference.toPandas())
)

# Save predictions
predictions_df.write.format("delta").mode("overwrite").saveAsTable("predictions.customer_churn")

Real-time Model Serving

# Enable model serving via Databricks UI
# 1. Navigate to Models in the sidebar
# 2. Select your model
# 3. Click "Use model for inference"
# 4. Choose "Real-time" endpoint
# 5. Configure serving settings

# Query the endpoint
import requests
import json

# Endpoint details (from serving UI)
endpoint_url = "https://your-workspace.databricks.com/model/customer_churn_model/Production/invocations"
token = dbutils.secrets.get(scope="tokens", key="api-token")

# Prepare input
data = {
"dataframe_records": [
{
"total_orders": 15,
"total_spent": 2500.0,
"avg_order_value": 166.67,
"days_since_last_order": 3
}
]
}

# Make request
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}

response = requests.post(endpoint_url, json=data, headers=headers)
predictions = response.json()
print(predictions)

Distributed ML with Spark ML

Spark MLlib Pipeline

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Load data
df = spark.table("feature_store.customer_features")

# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Define pipeline stages
feature_cols = ["total_orders", "total_spent", "avg_order_value", "days_since_last_order"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
rf = RandomForestClassifier(
featuresCol="features",
labelCol="churn",
numTrees=100,
maxDepth=10
)

# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])

# Train model
with mlflow.start_run():
model = pipeline.fit(train_df)

# Make predictions
predictions = model.transform(test_df)

# Evaluate
evaluator = BinaryClassificationEvaluator(labelCol="churn")
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

# Log
mlflow.log_metric("auc_roc", auc)
mlflow.spark.log_model(model, "model")

print(f"AUC: {auc:.4f}")

Pandas UDF for Distributed Inference

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
import mlflow

# Load model
model = mlflow.sklearn.load_model("models:/customer_churn_model/Production")

# Define UDF
@pandas_udf("double")
def predict_udf(features: pd.Series) -> pd.Series:
# Convert to DataFrame with proper feature names
X = pd.DataFrame(features.tolist(), columns=feature_cols)
predictions = model.predict_proba(X)[:, 1]
return pd.Series(predictions)

# Apply at scale
df_with_predictions = df.withColumn(
"churn_probability",
predict_udf(struct([col(c) for c in feature_cols]))
)

display(df_with_predictions)

MLOps Best Practices

Model Monitoring

def monitor_model_performance(predictions_table, actuals_table):
"""Monitor model performance metrics."""
from pyspark.sql.functions import col, avg, count
from sklearn.metrics import accuracy_score, roc_auc_score

# Join predictions with actuals
df = spark.table(predictions_table).join(
spark.table(actuals_table),
"customer_id"
)

# Calculate metrics
y_true = df.select("actual_churn").toPandas()
y_pred = df.select("predicted_churn").toPandas()
y_prob = df.select("churn_probability").toPandas()

accuracy = accuracy_score(y_true, y_pred)
auc = roc_auc_score(y_true, y_prob)

# Log metrics
with mlflow.start_run(run_name="model-monitoring"):
mlflow.log_metric("production_accuracy", accuracy)
mlflow.log_metric("production_auc", auc)

# Alert if performance degrades
if auc < 0.85:
print("⚠️ Warning: Model AUC below threshold!")
# Send alert (email, Slack, etc.)

return {"accuracy": accuracy, "auc": auc}

# Run monitoring
metrics = monitor_model_performance("predictions.customer_churn", "actuals.customer_churn")
print(metrics)

A/B Testing

def ab_test_models(model_a, model_b, test_data, test_ratio=0.5):
"""Compare two models with A/B testing."""
import random

results = []

for row in test_data.collect():
# Randomly assign to A or B
use_model_a = random.random() < test_ratio

model = model_a if use_model_a else model_b
prediction = model.predict([row.features])[0]

results.append({
"customer_id": row.customer_id,
"model": "A" if use_model_a else "B",
"prediction": prediction,
"actual": row.label
})

# Analyze results
results_df = spark.createDataFrame(results)

model_a_acc = results_df.filter(col("model") == "A") \
.select(avg((col("prediction") == col("actual")).cast("int"))) \
.collect()[0][0]

model_b_acc = results_df.filter(col("model") == "B") \
.select(avg((col("prediction") == col("actual")).cast("int"))) \
.collect()[0][0]

print(f"Model A Accuracy: {model_a_acc:.4f}")
print(f"Model B Accuracy: {model_b_acc:.4f}")

return results_df

Next Steps

Additional Resources