Skip to main content

Data Engineering with Databricks

Learn how to build robust data pipelines, perform ETL operations, and manage data lakes using Databricks and Delta Lake.

Overview

Data engineering in Databricks focuses on:

  • Ingesting data from various sources
  • Transforming and cleaning data
  • Building reliable data pipelines
  • Managing data lakes with Delta Lake
  • Implementing the Medallion Architecture

Delta Lake Fundamentals

Delta Lake is an open-source storage layer that brings ACID transactions to data lakes.

Creating Delta Tables

# Write DataFrame as Delta table
df.write.format("delta").mode("overwrite").save("/delta/table/path")

# Create managed Delta table
df.write.format("delta").mode("overwrite").saveAsTable("my_database.my_table")

# Create table with partitioning
df.write.format("delta") \
.partitionBy("date", "region") \
.mode("overwrite") \
.save("/delta/partitioned_table")

Reading Delta Tables

# Read from path
df = spark.read.format("delta").load("/delta/table/path")

# Read from table name
df = spark.table("my_database.my_table")

# Read specific version (time travel)
df = spark.read.format("delta").option("versionAsOf", 5).load("/delta/table/path")

# Read data as of timestamp
df = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-01 00:00:00") \
.load("/delta/table/path")

ACID Transactions

from delta.tables import DeltaTable

# Upsert (merge) operation
delta_table = DeltaTable.forPath(spark, "/delta/table/path")

delta_table.alias("target").merge(
updates_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={
"value": "source.value",
"updated_at": "source.updated_at"
}).whenNotMatchedInsert(values={
"id": "source.id",
"value": "source.value",
"updated_at": "source.updated_at"
}).execute()

Time Travel

# View table history
delta_table = DeltaTable.forPath(spark, "/delta/table/path")
display(delta_table.history())

# Query old versions
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/delta/table/path")
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-01") \
.load("/delta/table/path")

# Restore table to previous version
delta_table.restoreToVersion(5)

# Restore to timestamp
delta_table.restoreToTimestamp("2024-01-01 00:00:00")

Table Maintenance

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/delta/table/path")

# Optimize - compact small files
delta_table.optimize().executeCompaction()

# Z-order optimization for specific columns
delta_table.optimize().executeZOrderBy("date", "category")

# Vacuum - remove old files (default 7 days retention)
delta_table.vacuum(168) # 168 hours = 7 days

# Enable auto-optimize
spark.sql("""
ALTER TABLE my_database.my_table
SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")

Medallion Architecture

The Medallion Architecture organizes data into layers: Bronze, Silver, and Gold.

Bronze Layer (Raw Data)

Ingest raw data with minimal transformation:

# Read from various sources
# CSV files
bronze_df = spark.read.csv("s3://bucket/raw-data/*.csv", header=True)

# JSON files
bronze_df = spark.read.json("s3://bucket/raw-data/*.json")

# Parquet files
bronze_df = spark.read.parquet("s3://bucket/raw-data/*.parquet")

# Streaming data (Kafka)
bronze_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "raw-topic") \
.load()

# Add metadata columns
from pyspark.sql.functions import current_timestamp, input_file_name

bronze_df = bronze_df \
.withColumn("ingestion_time", current_timestamp()) \
.withColumn("source_file", input_file_name())

# Write to Bronze table
bronze_df.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.saveAsTable("bronze.raw_events")

Silver Layer (Cleaned Data)

Clean, validate, and enrich data:

from pyspark.sql.functions import col, when, trim, upper, to_timestamp

# Read from Bronze
bronze_df = spark.table("bronze.raw_events")

# Data cleaning and validation
silver_df = bronze_df \
.filter(col("id").isNotNull()) \
.withColumn("email", trim(lower(col("email")))) \
.withColumn("status", upper(col("status"))) \
.withColumn("amount", col("amount").cast("decimal(10,2)")) \
.withColumn("timestamp", to_timestamp(col("timestamp"))) \
.dropDuplicates(["id", "timestamp"])

# Data quality checks
from pyspark.sql.functions import when, count

quality_report = silver_df.select([
count(when(col(c).isNull(), c)).alias(c + "_nulls")
for c in silver_df.columns
])
display(quality_report)

# Write to Silver table
silver_df.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable("silver.clean_events")

Gold Layer (Aggregated Data)

Create business-level aggregations:

from pyspark.sql.functions import sum, avg, count, max, min
from pyspark.sql.window import Window

# Read from Silver
silver_df = spark.table("silver.clean_events")

# Business aggregations
gold_df = silver_df.groupBy("date", "category", "region").agg(
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
count("id").alias("transaction_count"),
max("amount").alias("max_amount"),
min("amount").alias("min_amount")
)

# Add calculated metrics
gold_df = gold_df.withColumn(
"amount_per_transaction",
col("total_amount") / col("transaction_count")
)

# Write to Gold table
gold_df.write.format("delta") \
.mode("overwrite") \
.saveAsTable("gold.daily_metrics")

# Create aggregate views for analytics
spark.sql("""
CREATE OR REPLACE VIEW gold.monthly_summary AS
SELECT
DATE_TRUNC('month', date) as month,
category,
region,
SUM(total_amount) as monthly_revenue,
SUM(transaction_count) as monthly_transactions
FROM gold.daily_metrics
GROUP BY month, category, region
""")

Data Ingestion Patterns

Batch Ingestion

# Simple batch load
def ingest_batch_data(source_path, target_table):
df = spark.read.format("json").load(source_path)

df.write.format("delta") \
.mode("append") \
.saveAsTable(target_table)

print(f"Ingested {df.count()} records to {target_table}")

# Incremental batch load
def ingest_incremental(source_path, target_table, checkpoint_col):
# Get last checkpoint
last_checkpoint = spark.sql(f"""
SELECT MAX({checkpoint_col}) as max_value
FROM {target_table}
""").collect()[0]['max_value']

# Read only new data
df = spark.read.format("json").load(source_path)
new_df = df.filter(col(checkpoint_col) > last_checkpoint)

# Append new data
new_df.write.format("delta") \
.mode("append") \
.saveAsTable(target_table)

print(f"Ingested {new_df.count()} new records")

# Usage
ingest_incremental("s3://bucket/data/", "bronze.events", "timestamp")

Streaming Ingestion

# Read from streaming source
stream_df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/schema/location") \
.load("s3://bucket/incoming/")

# Process stream
processed_stream = stream_df \
.withColumn("ingestion_time", current_timestamp()) \
.select("id", "name", "value", "timestamp", "ingestion_time")

# Write stream to Delta table
query = processed_stream.writeStream \
.format("delta") \
.option("checkpointLocation", "/checkpoint/location") \
.outputMode("append") \
.table("bronze.streaming_data")

# Monitor stream
display(query.status)

Change Data Capture (CDC)

# Process CDC events
def process_cdc(cdc_df, target_table):
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, target_table)

# Separate operations
inserts = cdc_df.filter(col("operation") == "INSERT")
updates = cdc_df.filter(col("operation") == "UPDATE")
deletes = cdc_df.filter(col("operation") == "DELETE")

# Apply inserts
inserts.write.format("delta").mode("append").saveAsTable(target_table)

# Apply updates
for row in updates.collect():
delta_table.update(
condition=f"id = {row['id']}",
set={col: row[col] for col in updates.columns if col != 'operation'}
)

# Apply deletes
for row in deletes.collect():
delta_table.delete(f"id = {row['id']}")

# Usage with streaming CDC
cdc_stream = spark.readStream.format("delta").table("cdc_source")
processed_cdc = process_cdc(cdc_stream, "target_table")

Data Transformation Patterns

Window Functions

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead

# Define window
window_spec = Window.partitionBy("category").orderBy(col("amount").desc())

# Apply window functions
df_with_ranks = df.withColumn("rank", rank().over(window_spec)) \
.withColumn("row_num", row_number().over(window_spec)) \
.withColumn("prev_amount", lag("amount", 1).over(window_spec)) \
.withColumn("next_amount", lead("amount", 1).over(window_spec))

display(df_with_ranks)

Pivot and Unpivot

# Pivot - transform rows to columns
pivoted_df = df.groupBy("date").pivot("category").sum("amount")
display(pivoted_df)

# Unpivot - transform columns to rows
from pyspark.sql.functions import expr, array, explode, lit

# Create array of column expressions
cols = ["category_A", "category_B", "category_C"]
unpivoted_df = df.select(
"date",
explode(
array([
expr(f"struct('{col}' as category, {col} as amount)")
for col in cols
])
).alias("struct")
).select("date", "struct.category", "struct.amount")

display(unpivoted_df)

Complex Data Types

from pyspark.sql.functions import struct, array, map_from_arrays, explode

# Create struct
df_with_struct = df.withColumn(
"address",
struct(col("street"), col("city"), col("state"), col("zip"))
)

# Create array
df_with_array = df.withColumn(
"tags",
array(col("tag1"), col("tag2"), col("tag3"))
)

# Explode array
df_exploded = df_with_array.select(
col("id"),
explode(col("tags")).alias("tag")
)

# Access struct fields
df_address = df_with_struct.select(
col("id"),
col("address.city"),
col("address.state")
)

Data Quality and Validation

Schema Enforcement

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Define schema
schema = StructType([
StructField("id", StringType(), False),
StructField("name", StringType(), False),
StructField("age", IntegerType(), True),
StructField("email", StringType(), True),
StructField("created_at", TimestampType(), False)
])

# Read with schema
df = spark.read.schema(schema).json("/path/to/data")

# Schema validation
def validate_schema(df, expected_schema):
if df.schema != expected_schema:
raise ValueError("Schema mismatch detected")
return df

validated_df = validate_schema(df, schema)

Data Quality Checks

from pyspark.sql.functions import col, count, when, isnan

def run_quality_checks(df, table_name):
checks = []

# Row count check
row_count = df.count()
checks.append(("Row Count", row_count, row_count > 0))

# Null checks
for column in df.columns:
null_count = df.filter(col(column).isNull()).count()
null_pct = (null_count / row_count * 100) if row_count > 0 else 0
checks.append((f"{column} Nulls", null_count, null_pct < 10))

# Duplicate check
duplicate_count = df.count() - df.dropDuplicates().count()
checks.append(("Duplicates", duplicate_count, duplicate_count == 0))

# Create results DataFrame
results_df = spark.createDataFrame(checks, ["Check", "Value", "Passed"])

# Log results
print(f"\nQuality checks for {table_name}:")
display(results_df)

# Fail if any check failed
failed_checks = results_df.filter(col("Passed") == False)
if failed_checks.count() > 0:
raise ValueError(f"Quality checks failed for {table_name}")

return True

# Run checks
run_quality_checks(df, "silver.clean_events")

Expectations (Delta Live Tables)

# Using Delta Live Tables for data quality
import dlt
from pyspark.sql.functions import col

@dlt.table(
name="clean_customers",
comment="Cleaned customer data with quality checks"
)
@dlt.expect_or_drop("valid_email", "email IS NOT NULL AND email LIKE '%@%'")
@dlt.expect_or_fail("valid_id", "id IS NOT NULL")
@dlt.expect("recent_data", "created_at > '2020-01-01'")
def clean_customers():
return spark.table("bronze.raw_customers")

Orchestration and Workflows

Jobs Configuration

# Create a job using Databricks API
job_config = {
"name": "Daily ETL Pipeline",
"tasks": [
{
"task_key": "bronze_ingestion",
"notebook_task": {
"notebook_path": "/pipelines/01_ingest_bronze",
"base_parameters": {
"date": "{{job.start_time}}"
}
},
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "m5.large",
"num_workers": 2
}
},
{
"task_key": "silver_transformation",
"depends_on": [{"task_key": "bronze_ingestion"}],
"notebook_task": {
"notebook_path": "/pipelines/02_transform_silver"
}
},
{
"task_key": "gold_aggregation",
"depends_on": [{"task_key": "silver_transformation"}],
"notebook_task": {
"notebook_path": "/pipelines/03_aggregate_gold"
}
}
],
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?", # Daily at 2 AM
"timezone_id": "America/New_York"
},
"email_notifications": {
"on_failure": ["data-team@company.com"],
"on_success": ["data-team@company.com"]
}
}

Error Handling and Retry Logic

from datetime import datetime
import time

def robust_etl_step(func, max_retries=3, retry_delay=60):
"""Wrapper for ETL steps with retry logic."""
for attempt in range(max_retries):
try:
print(f"Attempt {attempt + 1} of {max_retries}")
result = func()
print("Success!")
return result
except Exception as e:
print(f"Error on attempt {attempt + 1}: {str(e)}")
if attempt < max_retries - 1:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
print("Max retries reached. Failing.")
raise

# Usage
def my_etl_step():
df = spark.table("source_table")
transformed = df.filter(col("amount") > 0)
transformed.write.format("delta").mode("overwrite").saveAsTable("target_table")
return transformed.count()

result = robust_etl_step(my_etl_step)

Performance Optimization

Partitioning Strategy

# Partition by date for time-series data
df.write.format("delta") \
.partitionBy("year", "month", "day") \
.mode("overwrite") \
.save("/delta/partitioned_data")

# Read specific partitions
df_filtered = spark.read.format("delta") \
.load("/delta/partitioned_data") \
.filter("year = 2024 AND month = 1")

Broadcast Joins

from pyspark.sql.functions import broadcast

# Broadcast small dimension tables
large_fact = spark.table("fact_sales")
small_dim = spark.table("dim_product")

# Optimized join
result = large_fact.join(
broadcast(small_dim),
"product_id"
)

Adaptive Query Execution

# Enable AQE (enabled by default in recent versions)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Monitor query execution
df.explain(mode="formatted")

Next Steps

Additional Resources