Skip to main content

Apache Airflow Tutorial

This tutorial reflects Apache Airflow stable docs (2025). Always verify the version you install in production for compatibility.

1. What is Apache Airflow?

Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. Originally developed at Airbnb, it has become the industry standard for orchestrating complex data and ML pipelines.

Core capabilities:

FeatureDescription
Dynamic Pipeline GenerationDefine workflows as Python code (DAGs) with full programming flexibility
Rich UIWeb interface for visualizing pipelines, monitoring progress, debugging failures
Extensible1000+ community providers for integrations (AWS, GCP, Azure, Kubernetes, Databricks, etc.)
SchedulingCron-based and interval-based scheduling with backfilling support
Task DependenciesDefine complex task relationships (sequential, parallel, conditional, branching)
Distributed ExecutionScale horizontally with executors (Local, Celery, Kubernetes, Dask)
Alerting & MonitoringBuilt-in notifications (email, Slack, PagerDuty) and metrics
MLOps IntegrationNative support for ML frameworks, model training, deployment, and monitoring pipelines

2. Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│ Airflow Components │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Web Server (Flask) ←→ Scheduler ←→ Metadata DB (Postgres) │
│ ↓ ↓ │
│ DAG Files (.py) Task Queue │
│ ↓ │
│ Executor │
│ (Local/Celery/K8s) │
│ ↓ │
│ Workers │
│ (Execute Tasks) │
│ │
└─────────────────────────────────────────────────────────────────┘

Key components:

  • Web Server: UI for visualization, monitoring, and manual triggers
  • Scheduler: Reads DAGs, schedules tasks, submits to executor
  • Executor: Defines how tasks run (local process, Celery workers, K8s pods, etc.)
  • Metadata Database: Stores DAG definitions, task states, execution history (PostgreSQL/MySQL recommended)
  • Workers: Execute task logic (can scale horizontally)
  • DAG Directory: File system location where Python DAG files are stored

3. Installation & Setup

Standalone Installation (Development)

# Create virtual environment
python -m venv airflow_env
source airflow_env/bin/activate

# Set Airflow home (default: ~/airflow)
export AIRFLOW_HOME=~/airflow

# Install Airflow with constraints for Python 3.11
AIRFLOW_VERSION=2.10.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

Install Providers (Optional)

# Common providers for MLOps
pip install apache-airflow-providers-amazon # AWS (S3, SageMaker, EMR)
pip install apache-airflow-providers-google # GCP (BigQuery, GCS, Vertex AI)
pip install apache-airflow-providers-microsoft-azure # Azure ML
pip install apache-airflow-providers-cncf-kubernetes # Kubernetes operators
pip install apache-airflow-providers-docker # Docker operators
pip install apache-airflow-providers-http # HTTP/REST APIs

Initialize Database

# Initialize metadata database (SQLite for dev)
airflow db init

# Create admin user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin

Start Services

# Terminal 1: Start web server
airflow webserver --port 8080

# Terminal 2: Start scheduler
airflow scheduler

Access UI at http://localhost:8080 (login: admin/admin)

4. Core Concepts

DAG (Directed Acyclic Graph)

A DAG is a collection of tasks with dependencies, representing your workflow. Key properties:

  • Directed: Tasks have clear upstream/downstream relationships
  • Acyclic: No circular dependencies (prevents infinite loops)
  • Idempotent: Re-running same DAG with same inputs produces same output (best practice)

Tasks & Operators

  • Task: A unit of work (defined by an operator)
  • Operator: Template for a specific action (e.g., BashOperator, PythonOperator, SQLOperator)
  • Task Instance: Specific execution of a task for a DAG run

Common operator types:

OperatorPurposeExample Use Case
BashOperatorExecute bash commandsRun shell scripts, CLI tools
PythonOperatorExecute Python functionData processing, model training
EmailOperatorSend email notificationsAlert on success/failure
HttpSensorWait for HTTP endpointWait for API availability
S3SensorWait for S3 objectWait for data arrival
KubernetesPodOperatorRun containerized taskIsolated execution environments
DockerOperatorRun Docker containerLegacy containerized tasks

Task Dependencies

# Sequential
task_a >> task_b >> task_c

# Parallel then merge
task_a >> [task_b, task_c] >> task_d

# Fan-out and fan-in
[task_a, task_b] >> task_c >> [task_d, task_e]

5. Quick Start: First DAG

dags/hello_airflow.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# Default arguments applied to all tasks
default_args = {
'owner': 'mlops-team',
'depends_on_past': False,
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}

# Define DAG
with DAG(
dag_id='hello_airflow',
default_args=default_args,
description='A simple tutorial DAG',
schedule='@daily', # Run daily at midnight UTC
start_date=datetime(2025, 1, 1),
catchup=False, # Don't backfill past runs
tags=['tutorial', 'mlops'],
) as dag:

# Task 1: Print date using bash
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)

# Task 2: Python function
def greet():
print("Hello from Airflow!")
return "Greeting complete"

t2 = PythonOperator(
task_id='greet',
python_callable=greet,
)

# Task 3: Sleep
t3 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
)

# Set dependencies
t1 >> [t2, t3] # t2 and t3 run in parallel after t1

Save this file in $AIRFLOW_HOME/dags/ and it will appear in the UI within seconds.

Trigger manually: Click the play button in UI or run:

airflow dags test hello_airflow 2025-01-01

6. Scheduling & Execution

Schedule Formats

# Cron expression (minute hour day month day_of_week)
schedule='0 2 * * *' # Daily at 2 AM UTC

# Preset aliases
schedule='@daily' # Midnight UTC
schedule='@hourly' # Top of every hour
schedule='@weekly' # Sunday midnight
schedule='@monthly' # First of month
schedule='@once' # Run once then never again

# Timedelta for intervals
schedule=timedelta(hours=6) # Every 6 hours

# No schedule (manual only)
schedule=None

Execution Date vs Run Date

  • Execution Date (logical date): The start of the data interval the DAG processes
  • Actual Run Date: When the scheduler actually triggers the DAG
  • Example: DAG with schedule='@daily' and execution_date 2025-01-01 runs on 2025-01-02 00:00:00

Catchup & Backfilling

# Disable catchup (recommended for most ML pipelines)
catchup=False

# Enable catchup (process all past intervals)
catchup=True

Manual backfill:

airflow dags backfill hello_airflow \
--start-date 2025-01-01 \
--end-date 2025-01-07

7. Variables, Connections & XComs

Variables (Global Configuration)

from airflow.models import Variable

# Set via UI: Admin → Variables
# Or programmatically:
Variable.set("data_path", "/data/ml_datasets")
Variable.set("model_version", "v2.1")

# Access in DAG
data_path = Variable.get("data_path")
model_version = Variable.get("model_version", default_var="v1.0")

Connections (External Systems)

Store credentials securely:

# CLI
airflow connections add 'aws_default' \
--conn-type 'aws' \
--conn-extra '{"region_name": "us-east-1"}'

# Or via UI: Admin → Connections

Use in tasks:

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

def upload_to_s3():
s3_hook = S3Hook(aws_conn_id='aws_default')
s3_hook.load_file('/local/model.pkl', 's3://bucket/model.pkl')

XComs (Cross-Communication)

Share data between tasks:

def extract_data(**context):
data = {"records": 1000, "status": "success"}
# Push to XCom
context['ti'].xcom_push(key='extract_result', value=data)
return data # Also pushed to XCom with default key

def transform_data(**context):
# Pull from XCom
ti = context['ti']
result = ti.xcom_pull(task_ids='extract', key='extract_result')
print(f"Processing {result['records']} records")

Note: XComs are stored in metadata DB, keep payload small (less than 1 MB). For large data, use external storage.

8. ML Pipeline Example: Training & Deployment

dags/ml_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.models import Variable
import pickle
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

default_args = {
'owner': 'ml-team',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

def extract_data(**context):
"""Simulate data extraction"""
# In real scenario: fetch from database, API, or data lake
data = pd.DataFrame({
'feature1': range(1000),
'feature2': range(1000, 2000),
'label': [i % 2 for i in range(1000)]
})

# Save to local temp storage
data.to_csv('/tmp/training_data.csv', index=False)
context['ti'].xcom_push(key='data_path', value='/tmp/training_data.csv')
context['ti'].xcom_push(key='num_records', value=len(data))
return '/tmp/training_data.csv'

def preprocess_data(**context):
"""Data preprocessing"""
ti = context['ti']
data_path = ti.xcom_pull(task_ids='extract_data', key='data_path')

df = pd.read_csv(data_path)
# Preprocessing steps here (scaling, encoding, etc.)

X = df[['feature1', 'feature2']]
y = df['label']

X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)

# Save splits
X_train.to_csv('/tmp/X_train.csv', index=False)
X_test.to_csv('/tmp/X_test.csv', index=False)
pd.DataFrame(y_train).to_csv('/tmp/y_train.csv', index=False)
pd.DataFrame(y_test).to_csv('/tmp/y_test.csv', index=False)

return "Preprocessing complete"

def train_model(**context):
"""Train ML model"""
# Load data
X_train = pd.read_csv('/tmp/X_train.csv')
y_train = pd.read_csv('/tmp/y_train.csv').values.ravel()

# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Save model
model_path = '/tmp/model.pkl'
with open(model_path, 'wb') as f:
pickle.dump(model, f)

context['ti'].xcom_push(key='model_path', value=model_path)
return model_path

def evaluate_model(**context):
"""Evaluate model performance"""
ti = context['ti']
model_path = ti.xcom_pull(task_ids='train_model', key='model_path')

# Load model and test data
with open(model_path, 'rb') as f:
model = pickle.load(f)

X_test = pd.read_csv('/tmp/X_test.csv')
y_test = pd.read_csv('/tmp/y_test.csv').values.ravel()

# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)

print(f"Model Accuracy: {accuracy:.4f}")

# Store metrics
context['ti'].xcom_push(key='accuracy', value=accuracy)

# Validation gate
threshold = float(Variable.get("min_accuracy", default_var="0.7"))
if accuracy < threshold:
raise ValueError(f"Model accuracy {accuracy} below threshold {threshold}")

return accuracy

def deploy_model(**context):
"""Deploy model (simulated)"""
ti = context['ti']
model_path = ti.xcom_pull(task_ids='train_model', key='model_path')
accuracy = ti.xcom_pull(task_ids='evaluate_model', key='accuracy')

# In real scenario: upload to model registry, update serving endpoint
print(f"Deploying model with accuracy {accuracy}")
print(f"Model path: {model_path}")

# Simulate deployment
deployment_id = f"model_v{datetime.now().strftime('%Y%m%d_%H%M%S')}"
context['ti'].xcom_push(key='deployment_id', value=deployment_id)

return f"Deployed: {deployment_id}"

# Define DAG
with DAG(
dag_id='ml_training_pipeline',
default_args=default_args,
description='End-to-end ML training and deployment pipeline',
schedule='@weekly',
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['ml', 'training', 'production'],
) as dag:

extract = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
)

preprocess = PythonOperator(
task_id='preprocess_data',
python_callable=preprocess_data,
)

train = PythonOperator(
task_id='train_model',
python_callable=train_model,
)

evaluate = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model,
)

deploy = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
)

# Define pipeline
extract >> preprocess >> train >> evaluate >> deploy

9. Sensors & Branching

Sensors (Waiting for Conditions)

from airflow.sensors.filesystem import FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

# Wait for file to appear
wait_for_file = FileSensor(
task_id='wait_for_data',
filepath='/data/input.csv',
poke_interval=30, # Check every 30 seconds
timeout=3600, # Timeout after 1 hour
mode='poke', # 'poke' (blocking) or 'reschedule' (non-blocking)
)

# Wait for S3 object
wait_for_s3 = S3KeySensor(
task_id='wait_for_s3_file',
bucket_key='s3://my-bucket/data/{{ ds }}/input.parquet',
aws_conn_id='aws_default',
poke_interval=60,
timeout=7200,
)

Branching (Conditional Logic)

from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator

def choose_branch(**context):
"""Decide which branch to take"""
ti = context['ti']
accuracy = ti.xcom_pull(task_ids='evaluate_model', key='accuracy')

if accuracy > 0.95:
return 'deploy_production'
elif accuracy > 0.85:
return 'deploy_staging'
else:
return 'alert_team'

branching = BranchPythonOperator(
task_id='choose_deployment',
python_callable=choose_branch,
)

deploy_prod = DummyOperator(task_id='deploy_production')
deploy_stage = DummyOperator(task_id='deploy_staging')
alert = DummyOperator(task_id='alert_team')

branching >> [deploy_prod, deploy_stage, alert]

10. Dynamic DAGs & Task Generation

Dynamic Task Generation

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_partition(partition_id):
print(f"Processing partition {partition_id}")

with DAG(
dag_id='dynamic_tasks',
start_date=datetime(2025, 1, 1),
schedule='@daily',
catchup=False,
) as dag:

# Generate 10 parallel tasks dynamically
for i in range(10):
task = PythonOperator(
task_id=f'process_partition_{i}',
python_callable=process_partition,
op_args=[i],
)

Task Mapping (Airflow 2.3+)

from airflow.decorators import task

@task
def process_item(item):
return f"Processed {item}"

with DAG('task_mapping', start_date=datetime(2025, 1, 1)) as dag:
items = ['A', 'B', 'C', 'D', 'E']

# Automatically creates 5 parallel tasks
results = process_item.expand(item=items)

11. TaskFlow API (@task Decorator)

Modern Airflow uses the TaskFlow API for cleaner code:

dags/taskflow_example.py
from airflow.decorators import dag, task
from datetime import datetime

@dag(
dag_id='taskflow_ml_pipeline',
start_date=datetime(2025, 1, 1),
schedule='@daily',
catchup=False,
tags=['taskflow', 'ml'],
)
def ml_pipeline():

@task
def extract():
# Returns are automatically pushed to XCom
return {"data": "sample_data", "size": 1000}

@task
def transform(data: dict):
# Input automatically pulled from XCom
print(f"Transforming {data['size']} records")
return {"processed": True, "size": data['size']}

@task
def load(result: dict):
print(f"Loading processed data: {result}")
return "Complete"

# Define dependencies with chaining
data = extract()
processed = transform(data)
load(processed)

# Instantiate the DAG
ml_pipeline()

12. Best Practices for MLOps

1. Idempotency

# BAD: Appending to existing data
def bad_task():
df = pd.read_csv('data.csv')
new_row = {'col': 'value'}
df = df.append(new_row) # Non-idempotent!
df.to_csv('data.csv')

# GOOD: Overwrite or use execution date
def good_task(**context):
execution_date = context['ds'] # 'YYYY-MM-DD'
df = generate_data_for_date(execution_date)
df.to_csv(f'data_{execution_date}.csv')

2. Data Versioning

from airflow.models import Variable

@task
def train_with_version(**context):
data_version = context['ds']
model_version = f"v_{datetime.now().strftime('%Y%m%d_%H%M')}"

# Track lineage
Variable.set(f"model_{model_version}_data", data_version)

# Train with versioned data
train_model(data_version, model_version)

3. Separate Concerns

# One DAG for data ingestion
# One DAG for training
# One DAG for deployment
# Use sensors to coordinate

# In training DAG:
wait_for_data = FileSensor(
task_id='wait_for_daily_data',
filepath='/data/{{ ds }}/processed.parquet',
)

4. Resource Management

from airflow.operators.python import PythonOperator

heavy_task = PythonOperator(
task_id='train_large_model',
python_callable=train,
pool='gpu_pool', # Limit concurrent GPU tasks
priority_weight=10, # Higher priority
queue='gpu_queue', # Route to specific workers
)

5. Error Handling

from airflow.exceptions import AirflowFailException

@task
def validate_data(**context):
data = load_data()

if len(data) == 0:
# Send alert before failing
send_slack_alert("No data found!")
raise AirflowFailException("Empty dataset")

if data.isnull().sum().sum() > 100:
# Log warning but continue
context['ti'].log.warning("High null count detected")

13. Testing & Debugging

Unit Testing DAGs

tests/test_dags.py
import pytest
from airflow.models import DagBag

def test_dag_loading():
"""Test that all DAGs load without errors"""
dag_bag = DagBag(include_examples=False)
assert len(dag_bag.import_errors) == 0, f"DAG import errors: {dag_bag.import_errors}"

def test_dag_structure():
"""Test specific DAG structure"""
dag_bag = DagBag(include_examples=False)
dag = dag_bag.get_dag('ml_training_pipeline')

assert dag is not None
assert len(dag.tasks) == 5
assert 'extract_data' in dag.task_ids

Testing Tasks

# Test specific task
airflow tasks test ml_training_pipeline extract_data 2025-01-01

# Test entire DAG (no state changes)
airflow dags test ml_training_pipeline 2025-01-01

Debugging

# Add logging
import logging
logger = logging.getLogger(__name__)

@task
def debug_task(**context):
logger.info("Task started")
logger.debug(f"Context: {context}")

# Access task instance for detailed info
ti = context['ti']
logger.info(f"Run ID: {ti.run_id}")
logger.info(f"Try number: {ti.try_number}")

View logs in UI: Graph View → Click Task → Logs

14. Executors & Scaling

Local Executor (Development)

# airflow.cfg
[core]
executor = LocalExecutor

Celery Executor (Production - Horizontal Scaling)

# Install Celery
pip install apache-airflow[celery]

# Start Redis (message broker)
docker run -d -p 6379:6379 redis

# Configure
# airflow.cfg:
# executor = CeleryExecutor
# broker_url = redis://localhost:6379/0
# result_backend = db+postgresql://user:pass@localhost/airflow

# Start worker
airflow celery worker

# Monitor with Flower
airflow celery flower

Kubernetes Executor (Cloud-Native)

# Each task runs in isolated K8s pod
pip install apache-airflow[kubernetes]

# airflow.cfg:
# executor = KubernetesExecutor
# namespace = airflow

Example task with custom pod config:

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

train_task = KubernetesPodOperator(
task_id='train_on_gpu',
name='ml-training',
namespace='ml-pipelines',
image='my-ml-image:latest',
cmds=['python', 'train.py'],
resources={
'request_cpu': '2',
'request_memory': '8Gi',
'limit_gpu': '1',
},
)

15. Monitoring & Alerting

Email Alerts

default_args = {
'email': ['team@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'email_on_success': False,
}

Slack Integration

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

def send_slack_alert(context):
slack_msg = f"""
:red_circle: Task Failed
*Task*: {context.get('task_instance').task_id}
*Dag*: {context.get('task_instance').dag_id}
*Execution Time*: {context.get('execution_date')}
*Log Url*: {context.get('task_instance').log_url}
"""

alert = SlackWebhookOperator(
task_id='slack_alert',
http_conn_id='slack_webhook',
message=slack_msg,
)
return alert.execute(context=context)

# Add to DAG
default_args['on_failure_callback'] = send_slack_alert

Metrics & Observability

Airflow exposes Prometheus metrics:

# Enable StatsD
pip install apache-airflow[statsd]

# airflow.cfg:
# [metrics]
# statsd_on = True
# statsd_host = localhost
# statsd_port = 8125

Common metrics:

  • dag_processing.total_parse_time
  • scheduler.tasks.running
  • scheduler.tasks.executable
  • executor.queued_tasks
  • pool.open_slots

16. Security & Access Control

RBAC (Role-Based Access Control)

# Create roles and users via CLI or UI
airflow users create \
--username data_scientist \
--role Viewer \
--email ds@example.com

# Built-in roles: Admin, User, Op, Viewer, Public

Secrets Backend

# Use external secrets (AWS Secrets Manager, Vault, GCP Secret Manager)
pip install apache-airflow[amazon]

# airflow.cfg:
# [secrets]
# backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
# backend_kwargs = {"connections_prefix": "airflow/connections"}

Access in DAG:

from airflow.hooks.base import BaseHook

# Automatically fetches from secrets backend
conn = BaseHook.get_connection('my_secure_conn')

Encryption

# Encrypt sensitive Variable values
# airflow.cfg:
# [core]
# fernet_key = <generated_key>

# Generate key
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"

17. CI/CD Integration

DAG Testing in CI

.github/workflows/test-dags.yml
name: Test Airflow DAGs

on: [push, pull_request]

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'

- name: Install dependencies
run: |
pip install apache-airflow pytest
pip install -r requirements.txt

- name: Initialize Airflow DB
run: airflow db init

- name: Test DAG integrity
run: |
pytest tests/test_dags.py

- name: Validate DAGs
run: |
python -m py_compile dags/*.py

Deployment Strategy

# Option 1: Git-sync sidecar (Kubernetes)
# Automatically sync DAG files from Git repo

# Option 2: CI/CD pipeline
# Build artifact → Upload to S3/GCS → Workers sync
rsync -av dags/ s3://airflow-bucket/dags/

# Option 3: Docker image with embedded DAGs
FROM apache/airflow:2.10.0
COPY dags/ ${AIRFLOW_HOME}/dags/

18. Real-World MLOps Examples

Model Retraining Pipeline

from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(
dag_id='model_retraining_pipeline',
schedule='0 2 * * 0', # Weekly on Sunday at 2 AM
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['ml', 'retraining'],
)
def model_retraining():

@task
def check_model_drift(**context):
"""Check if model performance has degraded"""
from airflow.models import Variable
import random

current_accuracy = random.uniform(0.75, 0.95) # Simulate metric
baseline = float(Variable.get("baseline_accuracy", default_var="0.9"))

drift_detected = current_accuracy < baseline * 0.95

context['ti'].xcom_push(key='drift_detected', value=drift_detected)
context['ti'].xcom_push(key='current_accuracy', value=current_accuracy)

return drift_detected

@task.branch
def decide_retraining(drift_detected: bool):
if drift_detected:
return 'retrain_model'
else:
return 'skip_retraining'

@task
def fetch_latest_data():
"""Fetch data from the past week"""
# Implementation here
return "/data/latest_week.parquet"

@task
def retrain_model(data_path: str):
"""Retrain model with latest data"""
# Implementation here
new_model_id = f"model_{datetime.now().strftime('%Y%m%d_%H%M')}"
return new_model_id

@task
def evaluate_new_model(model_id: str):
"""Evaluate on validation set"""
# Implementation here
accuracy = 0.92 # Simulated
return {"model_id": model_id, "accuracy": accuracy}

@task
def deploy_model(eval_result: dict):
"""Deploy if better than baseline"""
if eval_result['accuracy'] > 0.88:
print(f"Deploying {eval_result['model_id']}")
# Update serving endpoint
else:
raise ValueError("New model doesn't meet quality threshold")

@task
def skip_retraining():
print("No drift detected, skipping retraining")

# Build pipeline
drift = check_model_drift()
branch = decide_retraining(drift)

# Retraining path
data = fetch_latest_data()
model = retrain_model(data)
evaluation = evaluate_new_model(model)
deploy = deploy_model(evaluation)

# Wire dependencies
branch >> [data, skip_retraining()]
data >> model >> evaluation >> deploy

model_retraining()

Feature Engineering Pipeline

@dag(
dag_id='feature_engineering',
schedule='@hourly',
start_date=datetime(2025, 1, 1),
catchup=False,
)
def feature_pipeline():

@task
def extract_raw_events(**context):
"""Extract from event stream"""
# Read from Kafka, Kinesis, etc.
return {"events": 5000, "timestamp": context['ts']}

@task
def compute_aggregations(events: dict):
"""Compute hourly aggregations"""
# Group by user, calculate stats
features = {
"user_count": 100,
"avg_session_duration": 300,
"top_features": ["f1", "f2", "f3"]
}
return features

@task
def write_to_feature_store(features: dict):
"""Write to online/offline feature store"""
# Write to Feast, Tecton, or custom store
print(f"Writing {features['user_count']} user features")

events = extract_raw_events()
features = compute_aggregations(events)
write_to_feature_store(features)

feature_pipeline()

19. Performance Optimization

Parallelism Tuning

# airflow.cfg
[core]
parallelism = 32 # Max tasks across all DAGs
dag_concurrency = 16 # Max tasks per DAG
max_active_runs_per_dag = 3 # Max concurrent DAG runs

[scheduler]
max_tis_per_query = 512 # Task instances per query

Pools (Resource Limiting)

# Create pool via UI or CLI
airflow pools set gpu_pool 2 "GPU tasks"
airflow pools set db_pool 5 "Database connections"

Use in tasks:

task = PythonOperator(
task_id='gpu_intensive',
python_callable=train,
pool='gpu_pool',
)

Database Optimization

-- Regular cleanup of old data
DELETE FROM task_instance WHERE execution_date < NOW() - INTERVAL '90 days';
DELETE FROM dag_run WHERE execution_date < NOW() - INTERVAL '90 days';
DELETE FROM log WHERE dttm < NOW() - INTERVAL '30 days';

-- Or use built-in cleanup
airflow db clean --clean-before-timestamp "2024-01-01" --yes

20. Troubleshooting

IssueCauseSolution
Tasks stuck in "queued"No available executor slotsIncrease parallelism, check worker health
"Broken DAG" errorPython syntax error in DAG fileCheck scheduler logs, validate with python dags/my_dag.py
Slow schedulerToo many DAG files or complex parsingReduce DAG file count, optimize DAG generation, increase dag_dir_list_interval
Zombie tasksWorker died during executionEnable schedule_after_task_execution, check worker logs
Out of memoryLarge XCom payloadsUse external storage, keep XComs small (less than 1 MB)
Task timeoutLong-running task without timeoutSet execution_timeout on task

Debug Checklist

# 1. Check scheduler logs
tail -f $AIRFLOW_HOME/logs/scheduler/latest/*.log

# 2. Validate DAG file
python dags/my_dag.py

# 3. Test specific task
airflow tasks test my_dag my_task 2025-01-01

# 4. Check task logs in UI
# Graph View → Click task → Logs tab

# 5. Inspect metadata
airflow db check

# 6. List DAGs and their states
airflow dags list
airflow dags state my_dag 2025-01-01

21. Migration & Upgrades

Upgrading Airflow

# Backup database first!
pg_dump airflow > airflow_backup.sql

# Upgrade package
pip install --upgrade apache-airflow

# Migrate database schema
airflow db migrate

# Verify
airflow version

Migration from Airflow 1.x to 2.x

Key changes:

  • Operators moved to providers packages
  • execution_datelogical_date
  • RBAC mandatory
  • TaskFlow API introduced
  • XCom updates (JSON serialization)

22. Frequently Asked Questions

Q: Can I run multiple schedulers?
A: Yes (Airflow 2.0+) for high availability. Set scheduler.parsing_processes appropriately.

Q: How to handle long-running tasks (days)?
A: Use sensors with mode='reschedule', or external trigger with event-based execution.

Q: Best practice for DAG versioning?
A: Use semantic versioning in DAG ID or tags. Archive old DAGs by moving to separate folder.

Q: How to share code between DAGs?
A: Create Python package in plugins/ or dags/common/, import as needed.

Q: Can I trigger DAG from external system?
A: Yes, use REST API:

curl -X POST http://localhost:8080/api/v1/dags/my_dag/dagRuns \
-H "Content-Type: application/json" \
-u "admin:admin" \
-d '{"conf": {"param": "value"}}'

23. Next Steps & Resources

  1. Master core concepts (DAGs, tasks, dependencies)
  2. Practice with TaskFlow API
  3. Integrate with your ML stack (MLflow, W&B, etc.)
  4. Implement monitoring and alerting
  5. Scale with Celery or Kubernetes executor
  6. Establish CI/CD for DAG deployment

Last validated against Apache Airflow 2.10.0 documentation on 2025-10-18.

Have improvements or organization-specific patterns? Contribute by opening a PR.