Skip to main content

Best Practices

This guide covers best practices for deploying production-ready machine learning models with BentoML.

Model Management

Version Control Your Models

Always use semantic versioning for models:

import bentoml

# Save with descriptive tags
bentoml.sklearn.save_model(
"fraud_detector",
model,
labels={
"version": "v2.1.0",
"stage": "production",
"model_type": "random_forest"
},
metadata={
"training_date": "2024-01-15",
"accuracy": 0.94,
"dataset_size": 100000,
"features": ["amount", "merchant_id", "timestamp"]
}
)

Model Registry Organization

Structure your model registry:

# List models with labels
bentoml models list --label stage=production
bentoml models list --label version=v2.*

# Use descriptive names
customer_churn_predictor:v1.0.0
fraud_detection_model:v2.1.0
recommendation_engine:v3.0.0

Model Archival Strategy

# Keep only necessary versions
# Archive old versions
bentoml.models.delete("old_model:v1.0.0")

# Export for backup
bentoml.models.export("model:latest", "/backup/models/")

# Import when needed
bentoml.models.import_model("/backup/models/model.bentomodel")

Service Design

Input Validation

Always validate inputs with Pydantic:

from pydantic import BaseModel, Field, validator
from typing import List, Optional

class PredictionInput(BaseModel):
features: List[float] = Field(..., min_items=4, max_items=4)
user_id: Optional[str] = None

@validator('features')
def validate_features(cls, v):
if any(x < 0 for x in v):
raise ValueError('Features must be non-negative')
return v

@bentoml.service
class MyService:
@bentoml.api
async def predict(self, input_data: PredictionInput) -> dict:
# Input is already validated
return self.model.predict(input_data.features)

Error Handling

Implement comprehensive error handling:

import bentoml
from bentoml.exceptions import BentoMLException
import logging

logger = logging.getLogger(__name__)

@bentoml.service
class RobustService:

@bentoml.api
async def predict(self, input_data: dict) -> dict:
try:
# Validate input
if not input_data:
raise ValueError("Empty input data")

# Make prediction
result = self.model.predict(input_data)

# Validate output
if result is None:
raise RuntimeError("Model returned None")

return {"prediction": result, "status": "success"}

except ValueError as e:
logger.error(f"Validation error: {e}")
raise BentoMLException(
f"Invalid input: {str(e)}",
error_code=400
)
except Exception as e:
logger.error(f"Prediction error: {e}", exc_info=True)
raise BentoMLException(
"Internal server error",
error_code=500
)

Resource Management

Configure resources appropriately:

@bentoml.service(
resources={
"cpu": "4", # 4 CPU cores
"memory": "4Gi", # 4GB RAM
"gpu": 1, # 1 GPU (if available)
},
traffic={
"timeout": 30, # 30 second timeout
"max_concurrency": 100, # Max concurrent requests
},
workers=4, # Number of worker processes
)
class OptimizedService:
pass

Performance Optimization

Adaptive Batching

Enable batching for improved throughput:

import bentoml
from bentoml.io import JSON, NumpyNdarray

@bentoml.service(
traffic={
"timeout": 30,
"max_concurrency": 200,
}
)
class BatchedService:

model_runner = bentoml.models.get("my_model:latest").to_runner(
max_batch_size=32, # Maximum batch size
max_latency_ms=100, # Maximum wait time
)

@bentoml.api(
batchable=True,
batch_dim=0, # Batch along first dimension
max_batch_size=32,
max_latency_ms=100,
)
async def predict(self, inputs: NumpyNdarray) -> NumpyNdarray:
return await self.model_runner.predict.async_run(inputs)

Caching

Implement result caching:

from functools import lru_cache
import hashlib
import json

@bentoml.service
class CachedService:

@lru_cache(maxsize=1000)
def _cached_predict(self, input_hash: str):
return self.model.predict(self._input_cache[input_hash])

@bentoml.api
async def predict(self, input_data: dict) -> dict:
# Create hash of input
input_str = json.dumps(input_data, sort_keys=True)
input_hash = hashlib.md5(input_str.encode()).hexdigest()

# Check cache
try:
result = self._cached_predict(input_hash)
return {"prediction": result, "cached": True}
except KeyError:
# Compute and cache
self._input_cache[input_hash] = input_data
result = self._cached_predict(input_hash)
return {"prediction": result, "cached": False}

Model Loading

Optimize model loading:

@bentoml.service
class EfficientService:

def __init__(self):
# Lazy loading - load model when first needed
self._model = None
self.model_ref = bentoml.models.get("my_model:latest")

@property
def model(self):
if self._model is None:
import torch
self._model = bentoml.pytorch.load_model(self.model_ref)

# Move to GPU if available
if torch.cuda.is_available():
self._model = self._model.cuda()
self._model.eval() # Set to evaluation mode

# Optional: Compile model for better performance
if hasattr(torch, 'compile'):
self._model = torch.compile(self._model)

return self._model

Testing

Unit Tests

Test your service logic:

# test_service.py
import pytest
import numpy as np
from service import MyService

@pytest.fixture
def service():
return MyService()

def test_prediction(service):
input_data = np.array([[1, 2, 3, 4]])
result = service.predict(input_data)
assert result is not None
assert "prediction" in result

def test_invalid_input(service):
with pytest.raises(ValueError):
service.predict(None)

def test_batch_prediction(service):
inputs = np.array([[1, 2, 3, 4], [5, 6, 7, 8]])
results = service.predict_batch(inputs)
assert len(results) == 2

Integration Tests

Test the deployed service:

# test_integration.py
import requests
import pytest

BASE_URL = "http://localhost:3000"

def test_health_endpoint():
response = requests.get(f"{BASE_URL}/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"

def test_prediction_endpoint():
data = {"features": [1.0, 2.0, 3.0, 4.0]}
response = requests.post(f"{BASE_URL}/predict", json=data)
assert response.status_code == 200
assert "prediction" in response.json()

def test_error_handling():
data = {"invalid": "data"}
response = requests.post(f"{BASE_URL}/predict", json=data)
assert response.status_code == 400

Load Testing

Test under load:

# load_test.py
import asyncio
import aiohttp
import time

async def make_request(session, url, data):
async with session.post(url, json=data) as response:
return await response.json()

async def load_test(num_requests=1000):
url = "http://localhost:3000/predict"
data = {"features": [1.0, 2.0, 3.0, 4.0]}

start_time = time.time()

async with aiohttp.ClientSession() as session:
tasks = [make_request(session, url, data) for _ in range(num_requests)]
results = await asyncio.gather(*tasks)

duration = time.time() - start_time
rps = num_requests / duration

print(f"Completed {num_requests} requests in {duration:.2f}s")
print(f"Throughput: {rps:.2f} requests/second")

if __name__ == "__main__":
asyncio.run(load_test())

Deployment

Environment Configuration

Use environment variables:

# service.py
import os
import bentoml

@bentoml.service(
resources={
"cpu": os.getenv("BENTOML_CPU", "2"),
"memory": os.getenv("BENTOML_MEMORY", "2Gi"),
}
)
class ConfigurableService:

def __init__(self):
self.model_tag = os.getenv("MODEL_TAG", "latest")
self.model = bentoml.sklearn.get(f"my_model:{self.model_tag}")

self.debug = os.getenv("DEBUG", "false").lower() == "true"
if self.debug:
self._setup_debug_logging()

Docker Best Practices

Create optimized Dockerfile:

# Dockerfile (if customizing)
FROM bentoml/bentoml:1.2.0-python3.10

# Install system dependencies
RUN apt-get update && apt-get install -y \
libgomp1 \
&& rm -rf /var/lib/apt/lists/*

# Copy and install requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . /bento
WORKDIR /bento

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=30s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1

# Run as non-root user
USER bentoml

EXPOSE 3000
CMD ["bentoml", "serve", "service:MyService"]

Kubernetes Deployment

Production-ready Kubernetes setup:

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-service
labels:
app: ml-service
version: v1.0.0
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: ml-service
template:
metadata:
labels:
app: ml-service
version: v1.0.0
spec:
containers:
- name: ml-service
image: ml-service:v1.0.0
ports:
- containerPort: 3000
name: http
env:
- name: BENTOML_CPU
value: "4"
- name: BENTOML_MEMORY
value: "4Gi"
- name: MODEL_TAG
value: "v1.0.0"
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
livenessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 2
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 10"]
---
apiVersion: v1
kind: Service
metadata:
name: ml-service
spec:
type: ClusterIP
selector:
app: ml-service
ports:
- port: 80
targetPort: 3000
protocol: TCP
name: http
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ml-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ml-service
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 30
- type: Pods
value: 2
periodSeconds: 30

Monitoring and Observability

Structured Logging

Implement structured logging:

import logging
import json
from datetime import datetime

class StructuredLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)

def log(self, level, message, **kwargs):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"message": message,
**kwargs
}
self.logger.log(getattr(logging, level.upper()), json.dumps(log_data))

@bentoml.service
class MonitoredService:

def __init__(self):
self.logger = StructuredLogger(__name__)

@bentoml.api
async def predict(self, input_data: dict) -> dict:
start_time = datetime.utcnow()

self.logger.log(
"info",
"prediction_started",
input_size=len(input_data)
)

try:
result = self.model.predict(input_data)
duration = (datetime.utcnow() - start_time).total_seconds()

self.logger.log(
"info",
"prediction_completed",
duration=duration,
result_size=len(result)
)

return result
except Exception as e:
self.logger.log(
"error",
"prediction_failed",
error=str(e),
error_type=type(e).__name__
)
raise

Metrics Collection

Add Prometheus metrics:

from prometheus_client import Counter, Histogram, Gauge
import time

# Define metrics
prediction_counter = Counter(
'ml_predictions_total',
'Total number of predictions',
['model_version', 'status']
)

prediction_duration = Histogram(
'ml_prediction_duration_seconds',
'Prediction duration in seconds',
['model_version']
)

model_confidence = Histogram(
'ml_prediction_confidence',
'Prediction confidence scores',
['model_version'],
buckets=[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1.0]
)

active_requests = Gauge(
'ml_active_requests',
'Number of active prediction requests'
)

@bentoml.service
class MetricsService:

def __init__(self):
self.model = bentoml.sklearn.get("my_model:latest")
self.model_version = str(self.model.tag.version)

@bentoml.api
async def predict(self, input_data: dict) -> dict:
active_requests.inc()
start_time = time.time()

try:
result = self.model.predict(input_data)

# Record metrics
duration = time.time() - start_time
prediction_duration.labels(model_version=self.model_version).observe(duration)
prediction_counter.labels(
model_version=self.model_version,
status="success"
).inc()

# Record confidence if available
if "confidence" in result:
model_confidence.labels(
model_version=self.model_version
).observe(result["confidence"])

return result

except Exception as e:
prediction_counter.labels(
model_version=self.model_version,
status="error"
).inc()
raise
finally:
active_requests.dec()

Security

Input Sanitization

Sanitize all inputs:

import re
from pydantic import BaseModel, validator

class SecureInput(BaseModel):
text: str
user_id: str

@validator('text')
def sanitize_text(cls, v):
# Remove control characters
v = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', v)
# Limit length
if len(v) > 10000:
raise ValueError("Text too long")
return v

@validator('user_id')
def validate_user_id(cls, v):
if not re.match(r'^[a-zA-Z0-9_-]+$', v):
raise ValueError("Invalid user_id format")
return v

Authentication

Add API authentication:

from fastapi import Security, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import bentoml

security = HTTPBearer()

def verify_token(credentials: HTTPAuthorizationCredentials):
token = credentials.credentials
# Verify token (implement your logic)
if not is_valid_token(token):
raise HTTPException(status_code=401, detail="Invalid token")
return token

@bentoml.service
class SecureService:

@bentoml.api
async def predict(
self,
input_data: dict,
credentials: HTTPAuthorizationCredentials = Security(security)
) -> dict:
# Token is verified by dependency
verify_token(credentials)
return self.model.predict(input_data)

Rate Limiting

Implement rate limiting:

from collections import defaultdict
from datetime import datetime, timedelta

class RateLimiter:
def __init__(self, max_requests=100, window_seconds=60):
self.max_requests = max_requests
self.window = timedelta(seconds=window_seconds)
self.requests = defaultdict(list)

def is_allowed(self, client_id: str) -> bool:
now = datetime.utcnow()
cutoff = now - self.window

# Remove old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > cutoff
]

# Check limit
if len(self.requests[client_id]) >= self.max_requests:
return False

# Add current request
self.requests[client_id].append(now)
return True

@bentoml.service
class RateLimitedService:

def __init__(self):
self.rate_limiter = RateLimiter(max_requests=100, window_seconds=60)

@bentoml.api
async def predict(self, input_data: dict, client_id: str) -> dict:
if not self.rate_limiter.is_allowed(client_id):
raise BentoMLException("Rate limit exceeded", error_code=429)

return self.model.predict(input_data)

CI/CD Integration

GitHub Actions Example

# .github/workflows/deploy.yml
name: Deploy ML Service

on:
push:
branches: [main]

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'

- name: Install dependencies
run: |
pip install bentoml pytest
pip install -r requirements.txt

- name: Run tests
run: pytest tests/

build:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Build Bento
run: |
pip install bentoml
bentoml build

- name: Build Docker image
run: |
bentoml containerize ml-service:latest

- name: Push to registry
run: |
docker tag ml-service:latest ${{ secrets.REGISTRY }}/ml-service:${{ github.sha }}
docker push ${{ secrets.REGISTRY }}/ml-service:${{ github.sha }}

deploy:
needs: build
runs-on: ubuntu-latest
steps:
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/ml-service \
ml-service=${{ secrets.REGISTRY }}/ml-service:${{ github.sha }}

Summary Checklist

Before Production

  • Model versioning implemented
  • Input validation with Pydantic
  • Comprehensive error handling
  • Unit and integration tests
  • Load testing completed
  • Resource limits configured
  • Health check endpoint
  • Structured logging
  • Metrics collection
  • Security measures (auth, rate limiting)
  • Documentation updated
  • CI/CD pipeline configured
  • Monitoring and alerting setup
  • Backup and recovery plan

Performance

  • Adaptive batching enabled
  • Caching implemented where appropriate
  • Model loading optimized
  • GPU utilization maximized (if applicable)
  • Network latency minimized
  • Database queries optimized

Operations

  • Deployment automation
  • Rollback procedure documented
  • Incident response plan
  • On-call rotation established
  • SLA/SLO defined
  • Cost monitoring enabled

Next Steps