Skip to main content

Advanced Examples

Real-world applications and complex use cases demonstrating PostgresML capabilities.

E-Commerce Product Recommendations

Complete Recommendation System

-- Setup: Product catalog with features
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT,
description TEXT,
category TEXT,
price NUMERIC,
brand TEXT,
embedding vector(384)
);

-- User behavior tracking
CREATE TABLE user_events (
id SERIAL PRIMARY KEY,
user_id INT,
product_id INT,
event_type TEXT, -- 'view', 'cart', 'purchase'
created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Generate product embeddings
UPDATE products
SET embedding = pgml.embed(
'sentence-transformers/all-MiniLM-L6-v2',
name || ' ' || description || ' ' || category
)::vector;

-- Create recommendation models
SELECT pgml.train(
'product_recommender',
'classification',
$$
SELECT
u.age,
u.gender,
u.location,
p.price,
p.category,
COUNT(CASE WHEN e.event_type = 'view' THEN 1 END) AS view_count,
COUNT(CASE WHEN e.event_type = 'purchase' THEN 1 END) > 0 AS purchased
FROM users u
CROSS JOIN products p
LEFT JOIN user_events e ON u.id = e.user_id AND p.id = e.product_id
GROUP BY u.id, u.age, u.gender, u.location, p.id, p.price, p.category
$$,
'purchased',
algorithm => 'xgboost'
);

-- Hybrid recommendation function
CREATE OR REPLACE FUNCTION get_recommendations(
target_user_id INT,
limit_count INT DEFAULT 10
)
RETURNS TABLE (
product_id INT,
product_name TEXT,
recommendation_score FLOAT,
reason TEXT
) AS $$
WITH user_profile AS (
-- User's interaction history
SELECT
AVG(p.embedding) AS avg_embedding,
ARRAY_AGG(DISTINCT p.category) AS preferred_categories
FROM user_events ue
JOIN products p ON ue.product_id = p.id
WHERE ue.user_id = target_user_id
AND ue.created_at > NOW() - INTERVAL '30 days'
),
collaborative_scores AS (
-- Collaborative filtering
SELECT
p.id,
pgml.predict('product_recommender',
ARRAY[u.age, u.gender_encoded, u.location_encoded, p.price, p.category_encoded]
) AS cf_score
FROM products p, users u
WHERE u.id = target_user_id
),
content_scores AS (
-- Content-based filtering
SELECT
p.id,
1 - (p.embedding <=> up.avg_embedding) AS content_score
FROM products p, user_profile up
WHERE p.embedding IS NOT NULL
),
excluded AS (
SELECT product_id
FROM user_events
WHERE user_id = target_user_id
AND event_type = 'purchase'
)
SELECT
p.id,
p.name,
(COALESCE(cs.content_score, 0) * 0.6 +
COALESCE(cf.cf_score, 0) * 0.4) AS score,
CASE
WHEN cs.content_score > 0.8 THEN 'Similar to your interests'
WHEN cf.cf_score > 0.7 THEN 'Popular with similar users'
ELSE 'Recommended for you'
END AS reason
FROM products p
LEFT JOIN content_scores cs ON p.id = cs.id
LEFT JOIN collaborative_scores cf ON p.id = cf.id
WHERE p.id NOT IN (SELECT product_id FROM excluded)
ORDER BY score DESC
LIMIT limit_count;
$$ LANGUAGE sql;

-- Usage
SELECT * FROM get_recommendations(12345, 10);

Real-Time Fraud Detection

Transaction Monitoring System

-- Transaction data
CREATE TABLE transactions (
id SERIAL PRIMARY KEY,
user_id INT,
amount NUMERIC,
merchant_id INT,
location TEXT,
device_id TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
is_fraud BOOLEAN DEFAULT FALSE
);

-- Feature engineering view
CREATE OR REPLACE VIEW transaction_features AS
SELECT
t.id,
t.user_id,
t.amount,
t.merchant_id,
EXTRACT(HOUR FROM t.created_at) AS hour_of_day,
EXTRACT(DOW FROM t.created_at) AS day_of_week,
-- User statistics (last 24 hours)
COUNT(*) OVER (
PARTITION BY t.user_id
ORDER BY t.created_at
RANGE BETWEEN INTERVAL '24 hours' PRECEDING AND CURRENT ROW
) AS txn_count_24h,
AVG(t.amount) OVER (
PARTITION BY t.user_id
ORDER BY t.created_at
RANGE BETWEEN INTERVAL '24 hours' PRECEDING AND CURRENT ROW
) AS avg_amount_24h,
-- Location changes
LAG(t.location) OVER (
PARTITION BY t.user_id
ORDER BY t.created_at
) != t.location AS location_changed,
-- Device changes
LAG(t.device_id) OVER (
PARTITION BY t.user_id
ORDER BY t.created_at
) != t.device_id AS device_changed,
t.is_fraud
FROM transactions t;

-- Train fraud detection model
SELECT pgml.train(
'fraud_detector',
'classification',
'transaction_features',
'is_fraud',
algorithm => 'xgboost',
hyperparams => '{
"n_estimators": 200,
"max_depth": 8,
"learning_rate": 0.05,
"scale_pos_weight": 50
}'
);

-- Real-time fraud detection function
CREATE OR REPLACE FUNCTION detect_fraud(
txn_user_id INT,
txn_amount NUMERIC,
txn_merchant_id INT,
txn_location TEXT,
txn_device_id TEXT
)
RETURNS JSONB AS $$
DECLARE
fraud_probability FLOAT;
fraud_score FLOAT;
risk_level TEXT;
features FLOAT[];
BEGIN
-- Compute features
SELECT ARRAY[
txn_amount,
txn_merchant_id,
EXTRACT(HOUR FROM NOW()),
EXTRACT(DOW FROM NOW()),
COUNT(*),
AVG(amount),
(SELECT location FROM transactions WHERE user_id = txn_user_id ORDER BY created_at DESC LIMIT 1) != txn_location AS location_change,
(SELECT device_id FROM transactions WHERE user_id = txn_user_id ORDER BY created_at DESC LIMIT 1) != txn_device_id AS device_change
] INTO features
FROM transactions
WHERE user_id = txn_user_id
AND created_at > NOW() - INTERVAL '24 hours';

-- Get fraud probability
fraud_probability := (pgml.predict_proba(
'fraud_detector',
features
)->>'1')::FLOAT;

-- Determine risk level
risk_level := CASE
WHEN fraud_probability > 0.9 THEN 'critical'
WHEN fraud_probability > 0.7 THEN 'high'
WHEN fraud_probability > 0.4 THEN 'medium'
ELSE 'low'
END;

RETURN jsonb_build_object(
'fraud_probability', fraud_probability,
'risk_level', risk_level,
'action', CASE
WHEN fraud_probability > 0.9 THEN 'block'
WHEN fraud_probability > 0.7 THEN 'review'
ELSE 'approve'
END,
'timestamp', NOW()
);
END;
$$ LANGUAGE plpgsql;

-- Trigger for automatic fraud detection
CREATE OR REPLACE FUNCTION check_fraud_on_insert()
RETURNS TRIGGER AS $$
DECLARE
fraud_check JSONB;
BEGIN
fraud_check := detect_fraud(
NEW.user_id,
NEW.amount,
NEW.merchant_id,
NEW.location,
NEW.device_id
);

-- Store fraud score in transaction
NEW.fraud_score := (fraud_check->>'fraud_probability')::FLOAT;
NEW.fraud_check_result := fraud_check;

-- Alert if high risk
IF (fraud_check->>'risk_level') IN ('critical', 'high') THEN
INSERT INTO fraud_alerts (transaction_id, fraud_check, created_at)
VALUES (NEW.id, fraud_check, NOW());
END IF;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER fraud_detection_trigger
BEFORE INSERT ON transactions
FOR EACH ROW
EXECUTE FUNCTION check_fraud_on_insert();

Customer Churn Prediction

Predictive Customer Retention

-- Customer data
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
signup_date DATE,
age INT,
location TEXT,
plan_type TEXT,
monthly_spend NUMERIC
);

-- Usage metrics
CREATE TABLE usage_metrics (
customer_id INT,
metric_date DATE,
logins INT,
feature_usage JSONB,
support_tickets INT,
created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Churn labels
CREATE TABLE churn_labels (
customer_id INT PRIMARY KEY,
churned BOOLEAN,
churn_date DATE
);

-- Feature engineering
CREATE MATERIALIZED VIEW churn_features AS
SELECT
c.id AS customer_id,
EXTRACT(days FROM NOW() - c.signup_date) AS customer_age_days,
c.age,
c.plan_type,
c.monthly_spend,
-- Recent activity (30 days)
COALESCE(SUM(um.logins), 0) AS logins_30d,
COALESCE(SUM(um.support_tickets), 0) AS support_tickets_30d,
-- Engagement trends
COALESCE(
(SUM(um.logins) FILTER (WHERE um.metric_date > CURRENT_DATE - 7)) /
NULLIF(SUM(um.logins) FILTER (WHERE um.metric_date BETWEEN CURRENT_DATE - 30 AND CURRENT_DATE - 7), 0),
0
) AS login_trend_ratio,
-- Spending changes
c.monthly_spend - LAG(c.monthly_spend) OVER (PARTITION BY c.id ORDER BY c.signup_date) AS spend_change,
COALESCE(cl.churned, FALSE) AS churned
FROM customers c
LEFT JOIN usage_metrics um ON c.id = um.customer_id
AND um.metric_date > CURRENT_DATE - 30
LEFT JOIN churn_labels cl ON c.id = cl.customer_id
GROUP BY c.id, c.signup_date, c.age, c.plan_type, c.monthly_spend, cl.churned;

-- Train churn prediction model
SELECT pgml.train(
'churn_predictor',
'classification',
'churn_features',
'churned',
algorithm => 'xgboost',
hyperparams => '{
"n_estimators": 150,
"max_depth": 6,
"learning_rate": 0.1,
"scale_pos_weight": 3
}',
test_size => 0.2
);

-- Proactive retention function
CREATE OR REPLACE FUNCTION identify_at_risk_customers()
RETURNS TABLE (
customer_id INT,
churn_probability FLOAT,
risk_factors JSONB,
recommended_action TEXT
) AS $$
WITH predictions AS (
SELECT
cf.customer_id,
(pgml.predict_proba('churn_predictor',
ARRAY[
cf.customer_age_days, cf.age, cf.plan_type_encoded,
cf.monthly_spend, cf.logins_30d, cf.support_tickets_30d,
cf.login_trend_ratio, cf.spend_change
]
)->>'1')::FLOAT AS churn_prob,
cf.*
FROM churn_features cf
WHERE NOT EXISTS (
SELECT 1 FROM churn_labels cl
WHERE cl.customer_id = cf.customer_id AND cl.churned = TRUE
)
)
SELECT
p.customer_id,
p.churn_prob,
jsonb_build_object(
'low_engagement', p.logins_30d < 5,
'declining_usage', p.login_trend_ratio < 0.5,
'high_support_tickets', p.support_tickets_30d > 3,
'spending_decrease', p.spend_change < -10
) AS risk_factors,
CASE
WHEN p.churn_prob > 0.8 THEN 'Urgent: Personal outreach + retention offer'
WHEN p.churn_prob > 0.6 THEN 'High priority: Email campaign with incentive'
WHEN p.churn_prob > 0.4 THEN 'Monitor: Send engagement content'
ELSE 'Low risk: Continue standard engagement'
END AS recommended_action
FROM predictions p
WHERE p.churn_prob > 0.4
ORDER BY p.churn_prob DESC;
$$ LANGUAGE sql;

-- Daily churn risk report
SELECT * FROM identify_at_risk_customers();

Sentiment Analysis Dashboard

Social Media Monitoring

-- Social media posts
CREATE TABLE social_posts (
id SERIAL PRIMARY KEY,
platform TEXT,
author TEXT,
content TEXT,
posted_at TIMESTAMPTZ,
engagement_count INT,
sentiment JSONB,
topics TEXT[],
embedding vector(384)
);

-- Process incoming posts
CREATE OR REPLACE FUNCTION process_social_post(
post_content TEXT,
post_platform TEXT DEFAULT 'twitter'
)
RETURNS JSONB AS $$
DECLARE
result JSONB;
sentiment_result JSONB;
topics_result JSONB;
entities_result JSONB;
embedding_result VECTOR;
BEGIN
-- Sentiment analysis
sentiment_result := pgml.transform(
task => 'text-classification',
inputs => ARRAY[post_content]
)->0;

-- Topic classification
topics_result := pgml.transform(
task => 'zero-shot-classification',
inputs => ARRAY[post_content],
args => '{
"candidate_labels": ["product", "support", "feature_request", "bug", "praise", "complaint"]
}'::JSONB
)->0;

-- Entity extraction
entities_result := pgml.transform(
task => 'token-classification',
inputs => ARRAY[post_content]
)->0;

-- Generate embedding
embedding_result := pgml.embed(
'sentence-transformers/all-MiniLM-L6-v2',
post_content
)::vector;

-- Combine results
result := jsonb_build_object(
'sentiment', sentiment_result,
'topics', topics_result,
'entities', entities_result,
'processed_at', NOW()
);

RETURN result;
END;
$$ LANGUAGE plpgsql;

-- Sentiment trend analysis
CREATE OR REPLACE VIEW sentiment_trends AS
SELECT
DATE_TRUNC('hour', posted_at) AS hour,
platform,
COUNT(*) AS post_count,
AVG(CASE WHEN sentiment->>'label' = 'POSITIVE' THEN 1 ELSE 0 END) AS positive_ratio,
AVG(CASE WHEN sentiment->>'label' = 'NEGATIVE' THEN 1 ELSE 0 END) AS negative_ratio,
AVG((sentiment->>'score')::FLOAT) AS avg_confidence
FROM social_posts
WHERE posted_at > NOW() - INTERVAL '7 days'
GROUP BY DATE_TRUNC('hour', posted_at), platform
ORDER BY hour DESC;

-- Crisis detection
CREATE OR REPLACE FUNCTION detect_sentiment_crisis()
RETURNS TABLE (
alert_level TEXT,
platform TEXT,
negative_spike FLOAT,
sample_posts TEXT[]
) AS $$
WITH recent_sentiment AS (
SELECT
platform,
DATE_TRUNC('hour', posted_at) AS hour,
AVG(CASE WHEN sentiment->>'label' = 'NEGATIVE' THEN 1 ELSE 0 END) AS neg_ratio
FROM social_posts
WHERE posted_at > NOW() - INTERVAL '24 hours'
GROUP BY platform, DATE_TRUNC('hour', posted_at)
),
baseline AS (
SELECT
platform,
AVG(neg_ratio) AS baseline_neg_ratio,
STDDEV(neg_ratio) AS stddev_neg_ratio
FROM recent_sentiment
GROUP BY platform
),
current AS (
SELECT
platform,
neg_ratio AS current_neg_ratio
FROM recent_sentiment
WHERE hour = DATE_TRUNC('hour', NOW())
)
SELECT
CASE
WHEN c.current_neg_ratio > b.baseline_neg_ratio + (3 * b.stddev_neg_ratio) THEN 'critical'
WHEN c.current_neg_ratio > b.baseline_neg_ratio + (2 * b.stddev_neg_ratio) THEN 'high'
ELSE 'normal'
END AS alert_level,
c.platform,
c.current_neg_ratio - b.baseline_neg_ratio AS negative_spike,
ARRAY(
SELECT content
FROM social_posts
WHERE platform = c.platform
AND sentiment->>'label' = 'NEGATIVE'
AND posted_at > NOW() - INTERVAL '1 hour'
ORDER BY (sentiment->>'score')::FLOAT DESC
LIMIT 5
) AS sample_posts
FROM current c
JOIN baseline b ON c.platform = b.platform
WHERE c.current_neg_ratio > b.baseline_neg_ratio + (2 * b.stddev_neg_ratio);
$$ LANGUAGE sql;

Time Series Forecasting

Demand Prediction System

-- Sales data
CREATE TABLE sales (
id SERIAL PRIMARY KEY,
product_id INT,
store_id INT,
date DATE,
quantity INT,
revenue NUMERIC,
promotion BOOLEAN
);

-- Weather data (external factor)
CREATE TABLE weather (
date DATE,
store_id INT,
temperature FLOAT,
precipitation FLOAT,
is_holiday BOOLEAN
);

-- Feature engineering for time series
CREATE MATERIALIZED VIEW sales_features AS
SELECT
s.date,
s.product_id,
s.store_id,
s.quantity,
-- Time features
EXTRACT(DOW FROM s.date) AS day_of_week,
EXTRACT(MONTH FROM s.date) AS month,
EXTRACT(YEAR FROM s.date) AS year,
-- Lag features
LAG(s.quantity, 1) OVER w AS lag_1_day,
LAG(s.quantity, 7) OVER w AS lag_7_days,
LAG(s.quantity, 30) OVER w AS lag_30_days,
-- Rolling statistics
AVG(s.quantity) OVER (w ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS rolling_7d_avg,
AVG(s.quantity) OVER (w ROWS BETWEEN 29 PRECEDING AND CURRENT ROW) AS rolling_30d_avg,
STDDEV(s.quantity) OVER (w ROWS BETWEEN 29 PRECEDING AND CURRENT ROW) AS rolling_30d_std,
-- External factors
w.temperature,
w.precipitation,
w.is_holiday,
s.promotion
FROM sales s
LEFT JOIN weather w ON s.date = w.date AND s.store_id = w.store_id
WINDOW w AS (PARTITION BY s.product_id, s.store_id ORDER BY s.date)
ORDER BY s.date;

-- Train forecasting model
SELECT pgml.train(
'demand_forecaster',
'regression',
$$
SELECT * FROM sales_features
WHERE lag_30_days IS NOT NULL
AND date < CURRENT_DATE - 7
$$,
'quantity',
algorithm => 'xgboost',
hyperparams => '{
"n_estimators": 200,
"max_depth": 7,
"learning_rate": 0.05,
"objective": "reg:squarederror"
}'
);

-- Forecasting function
CREATE OR REPLACE FUNCTION forecast_demand(
target_product_id INT,
target_store_id INT,
forecast_days INT DEFAULT 7
)
RETURNS TABLE (
forecast_date DATE,
predicted_quantity FLOAT,
confidence_lower FLOAT,
confidence_upper FLOAT
) AS $$
-- Implementation would use rolling forecasts
-- This is a simplified version
SELECT
CURRENT_DATE + i AS forecast_date,
pgml.predict('demand_forecaster', features) AS predicted_quantity,
pgml.predict('demand_forecaster', features) * 0.8 AS confidence_lower,
pgml.predict('demand_forecaster', features) * 1.2 AS confidence_upper
FROM generate_series(1, forecast_days) i,
LATERAL (
SELECT ARRAY[
EXTRACT(DOW FROM CURRENT_DATE + i),
EXTRACT(MONTH FROM CURRENT_DATE + i),
-- Include latest lag features and other predictors
...
] AS features
) f;
$$ LANGUAGE sql;

-- Usage
SELECT * FROM forecast_demand(101, 5, 14);

Next Steps

Explore additional resources: