Data Engineering with Databricks
Learn how to build robust data pipelines, perform ETL operations, and manage data lakes using Databricks and Delta Lake.
Overview
Data engineering in Databricks focuses on:
- Ingesting data from various sources
- Transforming and cleaning data
- Building reliable data pipelines
- Managing data lakes with Delta Lake
- Implementing the Medallion Architecture
Delta Lake Fundamentals
Delta Lake is an open-source storage layer that brings ACID transactions to data lakes.
Creating Delta Tables
# Write DataFrame as Delta table
df.write.format("delta").mode("overwrite").save("/delta/table/path")
# Create managed Delta table
df.write.format("delta").mode("overwrite").saveAsTable("my_database.my_table")
# Create table with partitioning
df.write.format("delta") \
.partitionBy("date", "region") \
.mode("overwrite") \
.save("/delta/partitioned_table")
Reading Delta Tables
# Read from path
df = spark.read.format("delta").load("/delta/table/path")
# Read from table name
df = spark.table("my_database.my_table")
# Read specific version (time travel)
df = spark.read.format("delta").option("versionAsOf", 5).load("/delta/table/path")
# Read data as of timestamp
df = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-01 00:00:00") \
.load("/delta/table/path")
ACID Transactions
from delta.tables import DeltaTable
# Upsert (merge) operation
delta_table = DeltaTable.forPath(spark, "/delta/table/path")
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={
"value": "source.value",
"updated_at": "source.updated_at"
}).whenNotMatchedInsert(values={
"id": "source.id",
"value": "source.value",
"updated_at": "source.updated_at"
}).execute()
Time Travel
# View table history
delta_table = DeltaTable.forPath(spark, "/delta/table/path")
display(delta_table.history())
# Query old versions
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/delta/table/path")
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-01") \
.load("/delta/table/path")
# Restore table to previous version
delta_table.restoreToVersion(5)
# Restore to timestamp
delta_table.restoreToTimestamp("2024-01-01 00:00:00")
Table Maintenance
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/delta/table/path")
# Optimize - compact small files
delta_table.optimize().executeCompaction()
# Z-order optimization for specific columns
delta_table.optimize().executeZOrderBy("date", "category")
# Vacuum - remove old files (default 7 days retention)
delta_table.vacuum(168) # 168 hours = 7 days
# Enable auto-optimize
spark.sql("""
ALTER TABLE my_database.my_table
SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
Medallion Architecture
The Medallion Architecture organizes data into layers: Bronze, Silver, and Gold.
Bronze Layer (Raw Data)
Ingest raw data with minimal transformation:
# Read from various sources
# CSV files
bronze_df = spark.read.csv("s3://bucket/raw-data/*.csv", header=True)
# JSON files
bronze_df = spark.read.json("s3://bucket/raw-data/*.json")
# Parquet files
bronze_df = spark.read.parquet("s3://bucket/raw-data/*.parquet")
# Streaming data (Kafka)
bronze_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "raw-topic") \
.load()
# Add metadata columns
from pyspark.sql.functions import current_timestamp, input_file_name
bronze_df = bronze_df \
.withColumn("ingestion_time", current_timestamp()) \
.withColumn("source_file", input_file_name())
# Write to Bronze table
bronze_df.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.saveAsTable("bronze.raw_events")
Silver Layer (Cleaned Data)
Clean, validate, and enrich data:
from pyspark.sql.functions import col, when, trim, upper, to_timestamp
# Read from Bronze
bronze_df = spark.table("bronze.raw_events")
# Data cleaning and validation
silver_df = bronze_df \
.filter(col("id").isNotNull()) \
.withColumn("email", trim(lower(col("email")))) \
.withColumn("status", upper(col("status"))) \
.withColumn("amount", col("amount").cast("decimal(10,2)")) \
.withColumn("timestamp", to_timestamp(col("timestamp"))) \
.dropDuplicates(["id", "timestamp"])
# Data quality checks
from pyspark.sql.functions import when, count
quality_report = silver_df.select([
count(when(col(c).isNull(), c)).alias(c + "_nulls")
for c in silver_df.columns
])
display(quality_report)
# Write to Silver table
silver_df.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable("silver.clean_events")
Gold Layer (Aggregated Data)
Create business-level aggregations:
from pyspark.sql.functions import sum, avg, count, max, min
from pyspark.sql.window import Window
# Read from Silver
silver_df = spark.table("silver.clean_events")
# Business aggregations
gold_df = silver_df.groupBy("date", "category", "region").agg(
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
count("id").alias("transaction_count"),
max("amount").alias("max_amount"),
min("amount").alias("min_amount")
)
# Add calculated metrics
gold_df = gold_df.withColumn(
"amount_per_transaction",
col("total_amount") / col("transaction_count")
)
# Write to Gold table
gold_df.write.format("delta") \
.mode("overwrite") \
.saveAsTable("gold.daily_metrics")
# Create aggregate views for analytics
spark.sql("""
CREATE OR REPLACE VIEW gold.monthly_summary AS
SELECT
DATE_TRUNC('month', date) as month,
category,
region,
SUM(total_amount) as monthly_revenue,
SUM(transaction_count) as monthly_transactions
FROM gold.daily_metrics
GROUP BY month, category, region
""")
Data Ingestion Patterns
Batch Ingestion
# Simple batch load
def ingest_batch_data(source_path, target_table):
df = spark.read.format("json").load(source_path)
df.write.format("delta") \
.mode("append") \
.saveAsTable(target_table)
print(f"Ingested {df.count()} records to {target_table}")
# Incremental batch load
def ingest_incremental(source_path, target_table, checkpoint_col):
# Get last checkpoint
last_checkpoint = spark.sql(f"""
SELECT MAX({checkpoint_col}) as max_value
FROM {target_table}
""").collect()[0]['max_value']
# Read only new data
df = spark.read.format("json").load(source_path)
new_df = df.filter(col(checkpoint_col) > last_checkpoint)
# Append new data
new_df.write.format("delta") \
.mode("append") \
.saveAsTable(target_table)
print(f"Ingested {new_df.count()} new records")
# Usage
ingest_incremental("s3://bucket/data/", "bronze.events", "timestamp")
Streaming Ingestion
# Read from streaming source
stream_df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/schema/location") \
.load("s3://bucket/incoming/")
# Process stream
processed_stream = stream_df \
.withColumn("ingestion_time", current_timestamp()) \
.select("id", "name", "value", "timestamp", "ingestion_time")
# Write stream to Delta table
query = processed_stream.writeStream \
.format("delta") \
.option("checkpointLocation", "/checkpoint/location") \
.outputMode("append") \
.table("bronze.streaming_data")
# Monitor stream
display(query.status)
Change Data Capture (CDC)
# Process CDC events
def process_cdc(cdc_df, target_table):
from delta.tables import DeltaTable
delta_table = DeltaTable.forName(spark, target_table)
# Separate operations
inserts = cdc_df.filter(col("operation") == "INSERT")
updates = cdc_df.filter(col("operation") == "UPDATE")
deletes = cdc_df.filter(col("operation") == "DELETE")
# Apply inserts
inserts.write.format("delta").mode("append").saveAsTable(target_table)
# Apply updates
for row in updates.collect():
delta_table.update(
condition=f"id = {row['id']}",
set={col: row[col] for col in updates.columns if col != 'operation'}
)
# Apply deletes
for row in deletes.collect():
delta_table.delete(f"id = {row['id']}")
# Usage with streaming CDC
cdc_stream = spark.readStream.format("delta").table("cdc_source")
processed_cdc = process_cdc(cdc_stream, "target_table")
Data Transformation Patterns
Window Functions
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead
# Define window
window_spec = Window.partitionBy("category").orderBy(col("amount").desc())
# Apply window functions
df_with_ranks = df.withColumn("rank", rank().over(window_spec)) \
.withColumn("row_num", row_number().over(window_spec)) \
.withColumn("prev_amount", lag("amount", 1).over(window_spec)) \
.withColumn("next_amount", lead("amount", 1).over(window_spec))
display(df_with_ranks)
Pivot and Unpivot
# Pivot - transform rows to columns
pivoted_df = df.groupBy("date").pivot("category").sum("amount")
display(pivoted_df)
# Unpivot - transform columns to rows
from pyspark.sql.functions import expr, array, explode, lit
# Create array of column expressions
cols = ["category_A", "category_B", "category_C"]
unpivoted_df = df.select(
"date",
explode(
array([
expr(f"struct('{col}' as category, {col} as amount)")
for col in cols
])
).alias("struct")
).select("date", "struct.category", "struct.amount")
display(unpivoted_df)
Complex Data Types
from pyspark.sql.functions import struct, array, map_from_arrays, explode
# Create struct
df_with_struct = df.withColumn(
"address",
struct(col("street"), col("city"), col("state"), col("zip"))
)
# Create array
df_with_array = df.withColumn(
"tags",
array(col("tag1"), col("tag2"), col("tag3"))
)
# Explode array
df_exploded = df_with_array.select(
col("id"),
explode(col("tags")).alias("tag")
)
# Access struct fields
df_address = df_with_struct.select(
col("id"),
col("address.city"),
col("address.state")
)
Data Quality and Validation
Schema Enforcement
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# Define schema
schema = StructType([
StructField("id", StringType(), False),
StructField("name", StringType(), False),
StructField("age", IntegerType(), True),
StructField("email", StringType(), True),
StructField("created_at", TimestampType(), False)
])
# Read with schema
df = spark.read.schema(schema).json("/path/to/data")
# Schema validation
def validate_schema(df, expected_schema):
if df.schema != expected_schema:
raise ValueError("Schema mismatch detected")
return df
validated_df = validate_schema(df, schema)
Data Quality Checks
from pyspark.sql.functions import col, count, when, isnan
def run_quality_checks(df, table_name):
checks = []
# Row count check
row_count = df.count()
checks.append(("Row Count", row_count, row_count > 0))
# Null checks
for column in df.columns:
null_count = df.filter(col(column).isNull()).count()
null_pct = (null_count / row_count * 100) if row_count > 0 else 0
checks.append((f"{column} Nulls", null_count, null_pct < 10))
# Duplicate check
duplicate_count = df.count() - df.dropDuplicates().count()
checks.append(("Duplicates", duplicate_count, duplicate_count == 0))
# Create results DataFrame
results_df = spark.createDataFrame(checks, ["Check", "Value", "Passed"])
# Log results
print(f"\nQuality checks for {table_name}:")
display(results_df)
# Fail if any check failed
failed_checks = results_df.filter(col("Passed") == False)
if failed_checks.count() > 0:
raise ValueError(f"Quality checks failed for {table_name}")
return True
# Run checks
run_quality_checks(df, "silver.clean_events")
Expectations (Delta Live Tables)
# Using Delta Live Tables for data quality
import dlt
from pyspark.sql.functions import col
@dlt.table(
name="clean_customers",
comment="Cleaned customer data with quality checks"
)
@dlt.expect_or_drop("valid_email", "email IS NOT NULL AND email LIKE '%@%'")
@dlt.expect_or_fail("valid_id", "id IS NOT NULL")
@dlt.expect("recent_data", "created_at > '2020-01-01'")
def clean_customers():
return spark.table("bronze.raw_customers")
Orchestration and Workflows
Jobs Configuration
# Create a job using Databricks API
job_config = {
"name": "Daily ETL Pipeline",
"tasks": [
{
"task_key": "bronze_ingestion",
"notebook_task": {
"notebook_path": "/pipelines/01_ingest_bronze",
"base_parameters": {
"date": "{{job.start_time}}"
}
},
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "m5.large",
"num_workers": 2
}
},
{
"task_key": "silver_transformation",
"depends_on": [{"task_key": "bronze_ingestion"}],
"notebook_task": {
"notebook_path": "/pipelines/02_transform_silver"
}
},
{
"task_key": "gold_aggregation",
"depends_on": [{"task_key": "silver_transformation"}],
"notebook_task": {
"notebook_path": "/pipelines/03_aggregate_gold"
}
}
],
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?", # Daily at 2 AM
"timezone_id": "America/New_York"
},
"email_notifications": {
"on_failure": ["data-team@company.com"],
"on_success": ["data-team@company.com"]
}
}
Error Handling and Retry Logic
from datetime import datetime
import time
def robust_etl_step(func, max_retries=3, retry_delay=60):
"""Wrapper for ETL steps with retry logic."""
for attempt in range(max_retries):
try:
print(f"Attempt {attempt + 1} of {max_retries}")
result = func()
print("Success!")
return result
except Exception as e:
print(f"Error on attempt {attempt + 1}: {str(e)}")
if attempt < max_retries - 1:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
print("Max retries reached. Failing.")
raise
# Usage
def my_etl_step():
df = spark.table("source_table")
transformed = df.filter(col("amount") > 0)
transformed.write.format("delta").mode("overwrite").saveAsTable("target_table")
return transformed.count()
result = robust_etl_step(my_etl_step)
Performance Optimization
Partitioning Strategy
# Partition by date for time-series data
df.write.format("delta") \
.partitionBy("year", "month", "day") \
.mode("overwrite") \
.save("/delta/partitioned_data")
# Read specific partitions
df_filtered = spark.read.format("delta") \
.load("/delta/partitioned_data") \
.filter("year = 2024 AND month = 1")
Broadcast Joins
from pyspark.sql.functions import broadcast
# Broadcast small dimension tables
large_fact = spark.table("fact_sales")
small_dim = spark.table("dim_product")
# Optimized join
result = large_fact.join(
broadcast(small_dim),
"product_id"
)
Adaptive Query Execution
# Enable AQE (enabled by default in recent versions)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Monitor query execution
df.explain(mode="formatted")
Next Steps
- Explore Machine Learning workflows
- Review Best Practices for production pipelines
- Learn about Delta Live Tables