ETL Pipeline with Python
Learn how to build robust ETL (Extract, Transform, Load) pipelines using Python. This tutorial covers extracting data from various sources, transforming it, and loading it to destinations.
Overview
ETL is the process of:
- Extract: Gathering data from various sources
- Transform: Cleaning, validating, and transforming data
- Load: Loading data into a target database or data warehouse
Prerequisites
Install required Python packages:
pip install pandas sqlalchemy psycopg2-binary requests python-dotenv
Project Structure
etl_pipeline/
├── config.py # Configuration and environment variables
├── extract.py # Data extraction functions
├── transform.py # Data transformation functions
├── load.py # Data loading functions
├── pipeline.py # Main pipeline orchestration
├── utils.py # Helper functions
└── requirements.txt # Dependencies
1. Extract Phase
Extract from CSV Files
# extract.py
import pandas as pd
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def extract_from_csv(file_path: str) -> pd.DataFrame:
"""
Extract data from a CSV file.
Args:
file_path: Path to the CSV file
Returns:
DataFrame containing the extracted data
"""
try:
logger.info(f"Extracting data from CSV: {file_path}")
df = pd.read_csv(file_path)
logger.info(f"Successfully extracted {len(df)} rows from CSV")
return df
except FileNotFoundError:
logger.error(f"File not found: {file_path}")
raise
except Exception as e:
logger.error(f"Error extracting from CSV: {str(e)}")
raise
Extract from APIs
# extract.py (continued)
import requests
from typing import Dict, List
import time
def extract_from_api(
api_url: str,
headers: Dict = None,
params: Dict = None,
retry_count: int = 3,
retry_delay: int = 5
) -> List[Dict]:
"""
Extract data from a REST API with retry logic.
Args:
api_url: API endpoint URL
headers: Request headers
params: Query parameters
retry_count: Number of retry attempts
retry_delay: Delay between retries in seconds
Returns:
List of records from API
"""
for attempt in range(retry_count):
try:
logger.info(f"Fetching data from API: {api_url} (Attempt {attempt + 1})")
response = requests.get(api_url, headers=headers, params=params, timeout=30)
response.raise_for_status()
data = response.json()
logger.info(f"Successfully extracted {len(data)} records from API")
return data
except requests.exceptions.RequestException as e:
logger.warning(f"API request failed (Attempt {attempt + 1}): {str(e)}")
if attempt < retry_count - 1:
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
logger.error("Max retries reached. API extraction failed.")
raise
Extract from Databases
# extract.py (continued)
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
def extract_from_database(
connection_string: str,
query: str
) -> pd.DataFrame:
"""
Extract data from a database using SQL query.
Args:
connection_string: Database connection string
query: SQL query to execute
Returns:
DataFrame containing query results
"""
try:
logger.info("Connecting to database...")
engine = create_engine(connection_string, poolclass=NullPool)
logger.info(f"Executing query: {query[:100]}...")
df = pd.read_sql(query, engine)
logger.info(f"Successfully extracted {len(df)} rows from database")
engine.dispose()
return df
except Exception as e:
logger.error(f"Database extraction failed: {str(e)}")
raise
2. Transform Phase
Data Cleaning
# transform.py
import pandas as pd
import numpy as np
from typing import List, Dict
import logging
logger = logging.getLogger(__name__)
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean and prepare data for transformation.
Args:
df: Input DataFrame
Returns:
Cleaned DataFrame
"""
logger.info("Starting data cleaning...")
initial_rows = len(df)
# Remove duplicate rows
df = df.drop_duplicates()
logger.info(f"Removed {initial_rows - len(df)} duplicate rows")
# Remove rows with all null values
df = df.dropna(how='all')
# Strip whitespace from string columns
string_columns = df.select_dtypes(include=['object']).columns
for col in string_columns:
df[col] = df[col].str.strip() if df[col].dtype == 'object' else df[col]
logger.info(f"Data cleaning complete. Rows remaining: {len(df)}")
return df
Data Validation
# transform.py (continued)
from datetime import datetime
def validate_data(df: pd.DataFrame, rules: Dict) -> pd.DataFrame:
"""
Validate data against business rules.
Args:
df: Input DataFrame
rules: Dictionary of validation rules
Returns:
Validated DataFrame with invalid rows removed
"""
logger.info("Starting data validation...")
initial_rows = len(df)
# Example validation rules
if 'required_columns' in rules:
for col in rules['required_columns']:
if col in df.columns:
df = df[df[col].notna()]
logger.info(f"Removed rows with null {col}")
if 'value_ranges' in rules:
for col, (min_val, max_val) in rules['value_ranges'].items():
if col in df.columns:
df = df[(df[col] >= min_val) & (df[col] <= max_val)]
logger.info(f"Applied range validation to {col}: [{min_val}, {max_val}]")
if 'date_columns' in rules:
for col in rules['date_columns']:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
df = df[df[col].notna()]
logger.info(f"Validated date format for {col}")
invalid_rows = initial_rows - len(df)
logger.info(f"Validation complete. Removed {invalid_rows} invalid rows")
return df
Data Transformation
# transform.py (continued)
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Apply business logic transformations to data.
Args:
df: Input DataFrame
Returns:
Transformed DataFrame
"""
logger.info("Starting data transformation...")
# Example transformations
# 1. Create derived columns
if 'first_name' in df.columns and 'last_name' in df.columns:
df['full_name'] = df['first_name'] + ' ' + df['last_name']
# 2. Normalize values
if 'email' in df.columns:
df['email'] = df['email'].str.lower()
# 3. Parse dates
if 'created_at' in df.columns:
df['created_at'] = pd.to_datetime(df['created_at'])
df['created_year'] = df['created_at'].dt.year
df['created_month'] = df['created_at'].dt.month
# 4. Handle categorical data
if 'status' in df.columns:
df['status'] = df['status'].str.upper()
df['status_code'] = df['status'].map({
'ACTIVE': 1,
'INACTIVE': 0,
'PENDING': 2
})
# 5. Calculate aggregations
if 'amount' in df.columns:
df['amount_rounded'] = df['amount'].round(2)
logger.info("Data transformation complete")
return df
Data Quality Checks
# transform.py (continued)
def run_quality_checks(df: pd.DataFrame) -> Dict:
"""
Run comprehensive data quality checks.
Args:
df: Input DataFrame
Returns:
Dictionary with quality metrics
"""
logger.info("Running data quality checks...")
quality_report = {
'total_rows': len(df),
'total_columns': len(df.columns),
'duplicate_rows': df.duplicated().sum(),
'null_counts': df.isnull().sum().to_dict(),
'null_percentages': (df.isnull().sum() / len(df) * 100).to_dict(),
'column_types': df.dtypes.astype(str).to_dict(),
'memory_usage': df.memory_usage(deep=True).sum() / 1024**2 # MB
}
# Check for completeness
completeness = (1 - df.isnull().sum() / len(df)) * 100
quality_report['completeness'] = completeness.to_dict()
# Check for uniqueness (for potential key columns)
uniqueness = {}
for col in df.columns:
unique_count = df[col].nunique()
uniqueness[col] = (unique_count / len(df)) * 100
quality_report['uniqueness_percentage'] = uniqueness
logger.info("Quality checks complete")
logger.info(f"Total rows: {quality_report['total_rows']}")
logger.info(f"Duplicate rows: {quality_report['duplicate_rows']}")
return quality_report
3. Load Phase
Load to Database
# load.py
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
import logging
logger = logging.getLogger(__name__)
def load_to_database(
df: pd.DataFrame,
connection_string: str,
table_name: str,
if_exists: str = 'append',
chunksize: int = 1000
) -> int:
"""
Load DataFrame to a database table.
Args:
df: DataFrame to load
connection_string: Database connection string
table_name: Target table name
if_exists: How to behave if table exists ('append', 'replace', 'fail')
chunksize: Number of rows to write at a time
Returns:
Number of rows loaded
"""
try:
logger.info(f"Loading {len(df)} rows to table: {table_name}")
engine = create_engine(connection_string, poolclass=NullPool)
df.to_sql(
name=table_name,
con=engine,
if_exists=if_exists,
index=False,
chunksize=chunksize,
method='multi'
)
engine.dispose()
logger.info(f"Successfully loaded {len(df)} rows to {table_name}")
return len(df)
except Exception as e:
logger.error(f"Failed to load data to database: {str(e)}")
raise
Load to CSV
# load.py (continued)
from pathlib import Path
def load_to_csv(
df: pd.DataFrame,
output_path: str,
append: bool = False
) -> str:
"""
Load DataFrame to a CSV file.
Args:
df: DataFrame to save
output_path: Output file path
append: Whether to append to existing file
Returns:
Path to the saved file
"""
try:
logger.info(f"Saving {len(df)} rows to CSV: {output_path}")
# Create directory if it doesn't exist
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
mode = 'a' if append else 'w'
header = not append
df.to_csv(output_path, mode=mode, header=header, index=False)
logger.info(f"Successfully saved data to {output_path}")
return output_path
except Exception as e:
logger.error(f"Failed to save CSV: {str(e)}")
raise
Load to Parquet
# load.py (continued)
def load_to_parquet(
df: pd.DataFrame,
output_path: str,
compression: str = 'snappy',
partition_cols: List[str] = None
) -> str:
"""
Load DataFrame to Parquet format (efficient columnar storage).
Args:
df: DataFrame to save
output_path: Output file path
compression: Compression algorithm ('snappy', 'gzip', 'brotli')
partition_cols: Columns to partition by
Returns:
Path to the saved file
"""
try:
logger.info(f"Saving {len(df)} rows to Parquet: {output_path}")
# Create directory if it doesn't exist
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
if partition_cols:
df.to_parquet(
output_path,
engine='pyarrow',
compression=compression,
partition_cols=partition_cols,
index=False
)
else:
df.to_parquet(
output_path,
engine='pyarrow',
compression=compression,
index=False
)
logger.info(f"Successfully saved data to {output_path}")
return output_path
except Exception as e:
logger.error(f"Failed to save Parquet: {str(e)}")
raise
4. Complete Pipeline
Main Pipeline Orchestration
# pipeline.py
import logging
from datetime import datetime
from typing import Dict, Any
import sys
# Import custom modules
from extract import extract_from_csv, extract_from_api, extract_from_database
from transform import clean_data, validate_data, transform_data, run_quality_checks
from load import load_to_database, load_to_csv, load_to_parquet
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'etl_pipeline_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
def run_etl_pipeline(config: Dict[str, Any]) -> None:
"""
Execute the complete ETL pipeline.
Args:
config: Pipeline configuration dictionary
"""
pipeline_start = datetime.now()
logger.info("="*80)
logger.info("ETL PIPELINE STARTED")
logger.info("="*80)
try:
# EXTRACT PHASE
logger.info("\n" + "="*80)
logger.info("PHASE 1: EXTRACT")
logger.info("="*80)
if config['source_type'] == 'csv':
df = extract_from_csv(config['source_path'])
elif config['source_type'] == 'api':
data = extract_from_api(
config['api_url'],
headers=config.get('headers'),
params=config.get('params')
)
df = pd.DataFrame(data)
elif config['source_type'] == 'database':
df = extract_from_database(
config['connection_string'],
config['query']
)
else:
raise ValueError(f"Unsupported source type: {config['source_type']}")
logger.info(f"Extraction complete. Rows extracted: {len(df)}")
# TRANSFORM PHASE
logger.info("\n" + "="*80)
logger.info("PHASE 2: TRANSFORM")
logger.info("="*80)
# Clean data
df = clean_data(df)
# Validate data
if 'validation_rules' in config:
df = validate_data(df, config['validation_rules'])
# Transform data
df = transform_data(df)
# Run quality checks
quality_report = run_quality_checks(df)
logger.info(f"Quality Report: {quality_report}")
logger.info(f"Transformation complete. Rows after transformation: {len(df)}")
# LOAD PHASE
logger.info("\n" + "="*80)
logger.info("PHASE 3: LOAD")
logger.info("="*80)
if config['target_type'] == 'database':
rows_loaded = load_to_database(
df,
config['target_connection'],
config['target_table'],
if_exists=config.get('if_exists', 'append')
)
logger.info(f"Loaded {rows_loaded} rows to database")
elif config['target_type'] == 'csv':
output_path = load_to_csv(
df,
config['target_path'],
append=config.get('append', False)
)
logger.info(f"Data saved to: {output_path}")
elif config['target_type'] == 'parquet':
output_path = load_to_parquet(
df,
config['target_path'],
compression=config.get('compression', 'snappy'),
partition_cols=config.get('partition_cols')
)
logger.info(f"Data saved to: {output_path}")
else:
raise ValueError(f"Unsupported target type: {config['target_type']}")
# PIPELINE COMPLETE
pipeline_end = datetime.now()
duration = (pipeline_end - pipeline_start).total_seconds()
logger.info("\n" + "="*80)
logger.info("ETL PIPELINE COMPLETED SUCCESSFULLY")
logger.info(f"Duration: {duration:.2f} seconds")
logger.info(f"Rows processed: {len(df)}")
logger.info("="*80)
except Exception as e:
logger.error("\n" + "="*80)
logger.error("ETL PIPELINE FAILED")
logger.error(f"Error: {str(e)}")
logger.error("="*80)
raise
# Example usage
if __name__ == "__main__":
# Configuration example
config = {
'source_type': 'csv',
'source_path': 'data/raw/customers.csv',
'validation_rules': {
'required_columns': ['customer_id', 'email'],
'value_ranges': {'age': (0, 120)},
'date_columns': ['created_at']
},
'target_type': 'database',
'target_connection': 'postgresql://user:password@localhost:5432/warehouse',
'target_table': 'customers',
'if_exists': 'append'
}
run_etl_pipeline(config)
5. Configuration Management
Environment Variables
# config.py
from dotenv import load_dotenv
import os
from pathlib import Path
# Load environment variables
load_dotenv()
class Config:
"""Configuration class for ETL pipeline."""
# Source configuration
SOURCE_TYPE = os.getenv('SOURCE_TYPE', 'csv')
SOURCE_PATH = os.getenv('SOURCE_PATH', 'data/raw/')
API_URL = os.getenv('API_URL')
API_KEY = os.getenv('API_KEY')
# Database configuration
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = os.getenv('DB_PORT', '5432')
DB_NAME = os.getenv('DB_NAME', 'warehouse')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
# Target configuration
TARGET_TYPE = os.getenv('TARGET_TYPE', 'database')
TARGET_TABLE = os.getenv('TARGET_TABLE', 'processed_data')
OUTPUT_PATH = os.getenv('OUTPUT_PATH', 'data/processed/')
# Pipeline settings
BATCH_SIZE = int(os.getenv('BATCH_SIZE', '1000'))
RETRY_COUNT = int(os.getenv('RETRY_COUNT', '3'))
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
@staticmethod
def get_connection_string():
"""Generate database connection string."""
return (
f"postgresql://{Config.DB_USER}:{Config.DB_PASSWORD}"
f"@{Config.DB_HOST}:{Config.DB_PORT}/{Config.DB_NAME}"
)
.env File Example
# .env
# Source Configuration
SOURCE_TYPE=csv
SOURCE_PATH=data/raw/customers.csv
API_URL=https://api.example.com/data
API_KEY=your_api_key_here
# Database Configuration
DB_HOST=localhost
DB_PORT=5432
DB_NAME=warehouse
DB_USER=etl_user
DB_PASSWORD=secure_password
# Target Configuration
TARGET_TYPE=database
TARGET_TABLE=customers
OUTPUT_PATH=data/processed/
# Pipeline Settings
BATCH_SIZE=1000
RETRY_COUNT=3
LOG_LEVEL=INFO
6. Error Handling and Retry Logic
Robust Error Handling
# utils.py
import logging
import time
from functools import wraps
from typing import Callable, Any
logger = logging.getLogger(__name__)
def retry_on_failure(max_retries: int = 3, delay: int = 5):
"""
Decorator to retry a function on failure.
Args:
max_retries: Maximum number of retry attempts
delay: Delay between retries in seconds
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
logger.warning(
f"{func.__name__} failed (attempt {attempt + 1}/{max_retries}): {str(e)}"
)
logger.info(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
logger.error(f"{func.__name__} failed after {max_retries} attempts")
raise
return wrapper
return decorator
def log_execution_time(func: Callable) -> Callable:
"""Decorator to log function execution time."""
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
start_time = time.time()
logger.info(f"Starting {func.__name__}...")
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info(f"{func.__name__} completed in {execution_time:.2f} seconds")
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"{func.__name__} failed after {execution_time:.2f} seconds: {str(e)}")
raise
return wrapper
7. Monitoring and Alerting
Pipeline Monitoring
# utils.py (continued)
class PipelineMonitor:
"""Monitor ETL pipeline execution."""
def __init__(self):
self.metrics = {
'start_time': None,
'end_time': None,
'rows_extracted': 0,
'rows_transformed': 0,
'rows_loaded': 0,
'errors': [],
'warnings': []
}
def start(self):
"""Mark pipeline start."""
self.metrics['start_time'] = datetime.now()
logger.info("Pipeline monitoring started")
def end(self):
"""Mark pipeline end."""
self.metrics['end_time'] = datetime.now()
duration = (self.metrics['end_time'] - self.metrics['start_time']).total_seconds()
self.metrics['duration'] = duration
logger.info(f"Pipeline monitoring ended. Duration: {duration:.2f}s")
def record_extraction(self, row_count: int):
"""Record extraction metrics."""
self.metrics['rows_extracted'] = row_count
logger.info(f"Recorded extraction: {row_count} rows")
def record_transformation(self, row_count: int):
"""Record transformation metrics."""
self.metrics['rows_transformed'] = row_count
logger.info(f"Recorded transformation: {row_count} rows")
def record_load(self, row_count: int):
"""Record load metrics."""
self.metrics['rows_loaded'] = row_count
logger.info(f"Recorded load: {row_count} rows")
def record_error(self, error: str):
"""Record an error."""
self.metrics['errors'].append({
'timestamp': datetime.now(),
'error': error
})
logger.error(f"Recorded error: {error}")
def record_warning(self, warning: str):
"""Record a warning."""
self.metrics['warnings'].append({
'timestamp': datetime.now(),
'warning': warning
})
logger.warning(f"Recorded warning: {warning}")
def get_summary(self) -> Dict:
"""Get pipeline execution summary."""
return {
'duration': self.metrics.get('duration', 0),
'rows_extracted': self.metrics['rows_extracted'],
'rows_transformed': self.metrics['rows_transformed'],
'rows_loaded': self.metrics['rows_loaded'],
'data_loss': self.metrics['rows_extracted'] - self.metrics['rows_loaded'],
'error_count': len(self.metrics['errors']),
'warning_count': len(self.metrics['warnings'])
}
8. Best Practices
1. Use Connection Pooling
from sqlalchemy.pool import QueuePool
engine = create_engine(
connection_string,
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_pre_ping=True # Verify connections before using
)
2. Implement Incremental Loading
def extract_incremental_data(last_updated: datetime) -> pd.DataFrame:
"""Extract only new or updated records."""
query = f"""
SELECT * FROM source_table
WHERE updated_at > '{last_updated}'
ORDER BY updated_at
"""
return extract_from_database(connection_string, query)
3. Use Batch Processing
def process_in_batches(df: pd.DataFrame, batch_size: int = 1000):
"""Process large datasets in batches."""
total_rows = len(df)
for i in range(0, total_rows, batch_size):
batch = df.iloc[i:i + batch_size]
logger.info(f"Processing batch {i//batch_size + 1}")
yield batch
4. Implement Data Versioning
def add_metadata_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Add metadata columns for tracking."""
df['etl_timestamp'] = datetime.now()
df['etl_version'] = '1.0.0'
df['source_system'] = 'production_db'
return df
5. Handle Large Files
def extract_large_csv_chunked(file_path: str, chunksize: int = 10000):
"""Read large CSV files in chunks."""
chunks = []
for chunk in pd.read_csv(file_path, chunksize=chunksize):
# Process each chunk
chunk = clean_data(chunk)
chunks.append(chunk)
return pd.concat(chunks, ignore_index=True)
9. Testing
Unit Tests
# test_pipeline.py
import pytest
import pandas as pd
from transform import clean_data, validate_data, transform_data
def test_clean_data():
"""Test data cleaning function."""
# Create test data with duplicates and nulls
df = pd.DataFrame({
'id': [1, 2, 2, 3],
'name': ['Alice', 'Bob', 'Bob', 'Charlie'],
'email': ['alice@example.com', 'bob@example.com', 'bob@example.com', None]
})
cleaned = clean_data(df)
# Should remove duplicates
assert len(cleaned) == 3
assert cleaned['id'].duplicated().sum() == 0
def test_validate_data():
"""Test data validation function."""
df = pd.DataFrame({
'id': [1, 2, 3],
'age': [25, 150, 30], # 150 is invalid
'email': ['alice@test.com', 'bob@test.com', 'charlie@test.com']
})
rules = {
'required_columns': ['id', 'email'],
'value_ranges': {'age': (0, 120)}
}
validated = validate_data(df, rules)
# Should remove row with invalid age
assert len(validated) == 2
assert all(validated['age'] <= 120)
def test_transform_data():
"""Test data transformation function."""
df = pd.DataFrame({
'first_name': ['Alice', 'Bob'],
'last_name': ['Smith', 'Jones'],
'email': ['ALICE@TEST.COM', 'BOB@TEST.COM']
})
transformed = transform_data(df)
# Should create full_name column
assert 'full_name' in transformed.columns
assert transformed['full_name'].tolist() == ['Alice Smith', 'Bob Jones']
# Should normalize email
assert all(transformed['email'].str.islower())
10. Scheduling and Automation
Using Schedule Library
# scheduler.py
import schedule
import time
from pipeline import run_etl_pipeline
from config import Config
def scheduled_job():
"""Run ETL pipeline as a scheduled job."""
config = {
'source_type': Config.SOURCE_TYPE,
'source_path': Config.SOURCE_PATH,
'target_type': Config.TARGET_TYPE,
'target_connection': Config.get_connection_string(),
'target_table': Config.TARGET_TABLE
}
try:
run_etl_pipeline(config)
except Exception as e:
logger.error(f"Scheduled job failed: {str(e)}")
# Send alert notification here
# Schedule the job
schedule.every().day.at("02:00").do(scheduled_job) # Run daily at 2 AM
# schedule.every().hour.do(scheduled_job) # Run hourly
# schedule.every(30).minutes.do(scheduled_job) # Run every 30 minutes
if __name__ == "__main__":
logger.info("ETL Scheduler started")
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
Summary
This tutorial covered:
- Extract: Reading data from CSV, APIs, and databases
- Transform: Cleaning, validating, and transforming data
- Load: Writing data to databases, CSV, and Parquet formats
- Error Handling: Retry logic and robust exception handling
- Monitoring: Tracking pipeline execution and metrics
- Best Practices: Connection pooling, batch processing, and incremental loading
- Testing: Unit tests for ETL functions
- Scheduling: Automating pipeline execution
Next Steps
- Implement parallel processing for large datasets using
multiprocessingordask - Add data profiling and anomaly detection
- Integrate with workflow orchestration tools like Apache Airflow
- Implement CDC (Change Data Capture) for real-time data sync
- Add data lineage tracking
- Implement automated data quality monitoring and alerting
Additional Resources
- Pandas Documentation
- SQLAlchemy Documentation
- Apache Airflow for advanced orchestration
- Great Expectations for data validation