Skip to main content

ETL Pipeline with Python

Learn how to build robust ETL (Extract, Transform, Load) pipelines using Python. This tutorial covers extracting data from various sources, transforming it, and loading it to destinations.

Overview

ETL is the process of:

  1. Extract: Gathering data from various sources
  2. Transform: Cleaning, validating, and transforming data
  3. Load: Loading data into a target database or data warehouse

Prerequisites

Install required Python packages:

pip install pandas sqlalchemy psycopg2-binary requests python-dotenv

Project Structure

etl_pipeline/
├── config.py # Configuration and environment variables
├── extract.py # Data extraction functions
├── transform.py # Data transformation functions
├── load.py # Data loading functions
├── pipeline.py # Main pipeline orchestration
├── utils.py # Helper functions
└── requirements.txt # Dependencies

1. Extract Phase

Extract from CSV Files

# extract.py
import pandas as pd
from pathlib import Path
import logging

logger = logging.getLogger(__name__)

def extract_from_csv(file_path: str) -> pd.DataFrame:
"""
Extract data from a CSV file.

Args:
file_path: Path to the CSV file

Returns:
DataFrame containing the extracted data
"""
try:
logger.info(f"Extracting data from CSV: {file_path}")
df = pd.read_csv(file_path)
logger.info(f"Successfully extracted {len(df)} rows from CSV")
return df
except FileNotFoundError:
logger.error(f"File not found: {file_path}")
raise
except Exception as e:
logger.error(f"Error extracting from CSV: {str(e)}")
raise

Extract from APIs

# extract.py (continued)
import requests
from typing import Dict, List
import time

def extract_from_api(
api_url: str,
headers: Dict = None,
params: Dict = None,
retry_count: int = 3,
retry_delay: int = 5
) -> List[Dict]:
"""
Extract data from a REST API with retry logic.

Args:
api_url: API endpoint URL
headers: Request headers
params: Query parameters
retry_count: Number of retry attempts
retry_delay: Delay between retries in seconds

Returns:
List of records from API
"""
for attempt in range(retry_count):
try:
logger.info(f"Fetching data from API: {api_url} (Attempt {attempt + 1})")
response = requests.get(api_url, headers=headers, params=params, timeout=30)
response.raise_for_status()

data = response.json()
logger.info(f"Successfully extracted {len(data)} records from API")
return data

except requests.exceptions.RequestException as e:
logger.warning(f"API request failed (Attempt {attempt + 1}): {str(e)}")
if attempt < retry_count - 1:
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
logger.error("Max retries reached. API extraction failed.")
raise

Extract from Databases

# extract.py (continued)
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool

def extract_from_database(
connection_string: str,
query: str
) -> pd.DataFrame:
"""
Extract data from a database using SQL query.

Args:
connection_string: Database connection string
query: SQL query to execute

Returns:
DataFrame containing query results
"""
try:
logger.info("Connecting to database...")
engine = create_engine(connection_string, poolclass=NullPool)

logger.info(f"Executing query: {query[:100]}...")
df = pd.read_sql(query, engine)

logger.info(f"Successfully extracted {len(df)} rows from database")
engine.dispose()
return df

except Exception as e:
logger.error(f"Database extraction failed: {str(e)}")
raise

2. Transform Phase

Data Cleaning

# transform.py
import pandas as pd
import numpy as np
from typing import List, Dict
import logging

logger = logging.getLogger(__name__)

def clean_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean and prepare data for transformation.

Args:
df: Input DataFrame

Returns:
Cleaned DataFrame
"""
logger.info("Starting data cleaning...")
initial_rows = len(df)

# Remove duplicate rows
df = df.drop_duplicates()
logger.info(f"Removed {initial_rows - len(df)} duplicate rows")

# Remove rows with all null values
df = df.dropna(how='all')

# Strip whitespace from string columns
string_columns = df.select_dtypes(include=['object']).columns
for col in string_columns:
df[col] = df[col].str.strip() if df[col].dtype == 'object' else df[col]

logger.info(f"Data cleaning complete. Rows remaining: {len(df)}")
return df

Data Validation

# transform.py (continued)
from datetime import datetime

def validate_data(df: pd.DataFrame, rules: Dict) -> pd.DataFrame:
"""
Validate data against business rules.

Args:
df: Input DataFrame
rules: Dictionary of validation rules

Returns:
Validated DataFrame with invalid rows removed
"""
logger.info("Starting data validation...")
initial_rows = len(df)

# Example validation rules
if 'required_columns' in rules:
for col in rules['required_columns']:
if col in df.columns:
df = df[df[col].notna()]
logger.info(f"Removed rows with null {col}")

if 'value_ranges' in rules:
for col, (min_val, max_val) in rules['value_ranges'].items():
if col in df.columns:
df = df[(df[col] >= min_val) & (df[col] <= max_val)]
logger.info(f"Applied range validation to {col}: [{min_val}, {max_val}]")

if 'date_columns' in rules:
for col in rules['date_columns']:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
df = df[df[col].notna()]
logger.info(f"Validated date format for {col}")

invalid_rows = initial_rows - len(df)
logger.info(f"Validation complete. Removed {invalid_rows} invalid rows")
return df

Data Transformation

# transform.py (continued)

def transform_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Apply business logic transformations to data.

Args:
df: Input DataFrame

Returns:
Transformed DataFrame
"""
logger.info("Starting data transformation...")

# Example transformations
# 1. Create derived columns
if 'first_name' in df.columns and 'last_name' in df.columns:
df['full_name'] = df['first_name'] + ' ' + df['last_name']

# 2. Normalize values
if 'email' in df.columns:
df['email'] = df['email'].str.lower()

# 3. Parse dates
if 'created_at' in df.columns:
df['created_at'] = pd.to_datetime(df['created_at'])
df['created_year'] = df['created_at'].dt.year
df['created_month'] = df['created_at'].dt.month

# 4. Handle categorical data
if 'status' in df.columns:
df['status'] = df['status'].str.upper()
df['status_code'] = df['status'].map({
'ACTIVE': 1,
'INACTIVE': 0,
'PENDING': 2
})

# 5. Calculate aggregations
if 'amount' in df.columns:
df['amount_rounded'] = df['amount'].round(2)

logger.info("Data transformation complete")
return df

Data Quality Checks

# transform.py (continued)

def run_quality_checks(df: pd.DataFrame) -> Dict:
"""
Run comprehensive data quality checks.

Args:
df: Input DataFrame

Returns:
Dictionary with quality metrics
"""
logger.info("Running data quality checks...")

quality_report = {
'total_rows': len(df),
'total_columns': len(df.columns),
'duplicate_rows': df.duplicated().sum(),
'null_counts': df.isnull().sum().to_dict(),
'null_percentages': (df.isnull().sum() / len(df) * 100).to_dict(),
'column_types': df.dtypes.astype(str).to_dict(),
'memory_usage': df.memory_usage(deep=True).sum() / 1024**2 # MB
}

# Check for completeness
completeness = (1 - df.isnull().sum() / len(df)) * 100
quality_report['completeness'] = completeness.to_dict()

# Check for uniqueness (for potential key columns)
uniqueness = {}
for col in df.columns:
unique_count = df[col].nunique()
uniqueness[col] = (unique_count / len(df)) * 100
quality_report['uniqueness_percentage'] = uniqueness

logger.info("Quality checks complete")
logger.info(f"Total rows: {quality_report['total_rows']}")
logger.info(f"Duplicate rows: {quality_report['duplicate_rows']}")

return quality_report

3. Load Phase

Load to Database

# load.py
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
import logging

logger = logging.getLogger(__name__)

def load_to_database(
df: pd.DataFrame,
connection_string: str,
table_name: str,
if_exists: str = 'append',
chunksize: int = 1000
) -> int:
"""
Load DataFrame to a database table.

Args:
df: DataFrame to load
connection_string: Database connection string
table_name: Target table name
if_exists: How to behave if table exists ('append', 'replace', 'fail')
chunksize: Number of rows to write at a time

Returns:
Number of rows loaded
"""
try:
logger.info(f"Loading {len(df)} rows to table: {table_name}")
engine = create_engine(connection_string, poolclass=NullPool)

df.to_sql(
name=table_name,
con=engine,
if_exists=if_exists,
index=False,
chunksize=chunksize,
method='multi'
)

engine.dispose()
logger.info(f"Successfully loaded {len(df)} rows to {table_name}")
return len(df)

except Exception as e:
logger.error(f"Failed to load data to database: {str(e)}")
raise

Load to CSV

# load.py (continued)
from pathlib import Path

def load_to_csv(
df: pd.DataFrame,
output_path: str,
append: bool = False
) -> str:
"""
Load DataFrame to a CSV file.

Args:
df: DataFrame to save
output_path: Output file path
append: Whether to append to existing file

Returns:
Path to the saved file
"""
try:
logger.info(f"Saving {len(df)} rows to CSV: {output_path}")

# Create directory if it doesn't exist
Path(output_path).parent.mkdir(parents=True, exist_ok=True)

mode = 'a' if append else 'w'
header = not append

df.to_csv(output_path, mode=mode, header=header, index=False)

logger.info(f"Successfully saved data to {output_path}")
return output_path

except Exception as e:
logger.error(f"Failed to save CSV: {str(e)}")
raise

Load to Parquet

# load.py (continued)

def load_to_parquet(
df: pd.DataFrame,
output_path: str,
compression: str = 'snappy',
partition_cols: List[str] = None
) -> str:
"""
Load DataFrame to Parquet format (efficient columnar storage).

Args:
df: DataFrame to save
output_path: Output file path
compression: Compression algorithm ('snappy', 'gzip', 'brotli')
partition_cols: Columns to partition by

Returns:
Path to the saved file
"""
try:
logger.info(f"Saving {len(df)} rows to Parquet: {output_path}")

# Create directory if it doesn't exist
Path(output_path).parent.mkdir(parents=True, exist_ok=True)

if partition_cols:
df.to_parquet(
output_path,
engine='pyarrow',
compression=compression,
partition_cols=partition_cols,
index=False
)
else:
df.to_parquet(
output_path,
engine='pyarrow',
compression=compression,
index=False
)

logger.info(f"Successfully saved data to {output_path}")
return output_path

except Exception as e:
logger.error(f"Failed to save Parquet: {str(e)}")
raise

4. Complete Pipeline

Main Pipeline Orchestration

# pipeline.py
import logging
from datetime import datetime
from typing import Dict, Any
import sys

# Import custom modules
from extract import extract_from_csv, extract_from_api, extract_from_database
from transform import clean_data, validate_data, transform_data, run_quality_checks
from load import load_to_database, load_to_csv, load_to_parquet

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'etl_pipeline_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler(sys.stdout)
]
)

logger = logging.getLogger(__name__)

def run_etl_pipeline(config: Dict[str, Any]) -> None:
"""
Execute the complete ETL pipeline.

Args:
config: Pipeline configuration dictionary
"""
pipeline_start = datetime.now()
logger.info("="*80)
logger.info("ETL PIPELINE STARTED")
logger.info("="*80)

try:
# EXTRACT PHASE
logger.info("\n" + "="*80)
logger.info("PHASE 1: EXTRACT")
logger.info("="*80)

if config['source_type'] == 'csv':
df = extract_from_csv(config['source_path'])
elif config['source_type'] == 'api':
data = extract_from_api(
config['api_url'],
headers=config.get('headers'),
params=config.get('params')
)
df = pd.DataFrame(data)
elif config['source_type'] == 'database':
df = extract_from_database(
config['connection_string'],
config['query']
)
else:
raise ValueError(f"Unsupported source type: {config['source_type']}")

logger.info(f"Extraction complete. Rows extracted: {len(df)}")

# TRANSFORM PHASE
logger.info("\n" + "="*80)
logger.info("PHASE 2: TRANSFORM")
logger.info("="*80)

# Clean data
df = clean_data(df)

# Validate data
if 'validation_rules' in config:
df = validate_data(df, config['validation_rules'])

# Transform data
df = transform_data(df)

# Run quality checks
quality_report = run_quality_checks(df)
logger.info(f"Quality Report: {quality_report}")

logger.info(f"Transformation complete. Rows after transformation: {len(df)}")

# LOAD PHASE
logger.info("\n" + "="*80)
logger.info("PHASE 3: LOAD")
logger.info("="*80)

if config['target_type'] == 'database':
rows_loaded = load_to_database(
df,
config['target_connection'],
config['target_table'],
if_exists=config.get('if_exists', 'append')
)
logger.info(f"Loaded {rows_loaded} rows to database")

elif config['target_type'] == 'csv':
output_path = load_to_csv(
df,
config['target_path'],
append=config.get('append', False)
)
logger.info(f"Data saved to: {output_path}")

elif config['target_type'] == 'parquet':
output_path = load_to_parquet(
df,
config['target_path'],
compression=config.get('compression', 'snappy'),
partition_cols=config.get('partition_cols')
)
logger.info(f"Data saved to: {output_path}")
else:
raise ValueError(f"Unsupported target type: {config['target_type']}")

# PIPELINE COMPLETE
pipeline_end = datetime.now()
duration = (pipeline_end - pipeline_start).total_seconds()

logger.info("\n" + "="*80)
logger.info("ETL PIPELINE COMPLETED SUCCESSFULLY")
logger.info(f"Duration: {duration:.2f} seconds")
logger.info(f"Rows processed: {len(df)}")
logger.info("="*80)

except Exception as e:
logger.error("\n" + "="*80)
logger.error("ETL PIPELINE FAILED")
logger.error(f"Error: {str(e)}")
logger.error("="*80)
raise

# Example usage
if __name__ == "__main__":
# Configuration example
config = {
'source_type': 'csv',
'source_path': 'data/raw/customers.csv',
'validation_rules': {
'required_columns': ['customer_id', 'email'],
'value_ranges': {'age': (0, 120)},
'date_columns': ['created_at']
},
'target_type': 'database',
'target_connection': 'postgresql://user:password@localhost:5432/warehouse',
'target_table': 'customers',
'if_exists': 'append'
}

run_etl_pipeline(config)

5. Configuration Management

Environment Variables

# config.py
from dotenv import load_dotenv
import os
from pathlib import Path

# Load environment variables
load_dotenv()

class Config:
"""Configuration class for ETL pipeline."""

# Source configuration
SOURCE_TYPE = os.getenv('SOURCE_TYPE', 'csv')
SOURCE_PATH = os.getenv('SOURCE_PATH', 'data/raw/')
API_URL = os.getenv('API_URL')
API_KEY = os.getenv('API_KEY')

# Database configuration
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = os.getenv('DB_PORT', '5432')
DB_NAME = os.getenv('DB_NAME', 'warehouse')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')

# Target configuration
TARGET_TYPE = os.getenv('TARGET_TYPE', 'database')
TARGET_TABLE = os.getenv('TARGET_TABLE', 'processed_data')
OUTPUT_PATH = os.getenv('OUTPUT_PATH', 'data/processed/')

# Pipeline settings
BATCH_SIZE = int(os.getenv('BATCH_SIZE', '1000'))
RETRY_COUNT = int(os.getenv('RETRY_COUNT', '3'))
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')

@staticmethod
def get_connection_string():
"""Generate database connection string."""
return (
f"postgresql://{Config.DB_USER}:{Config.DB_PASSWORD}"
f"@{Config.DB_HOST}:{Config.DB_PORT}/{Config.DB_NAME}"
)

.env File Example

# .env
# Source Configuration
SOURCE_TYPE=csv
SOURCE_PATH=data/raw/customers.csv
API_URL=https://api.example.com/data
API_KEY=your_api_key_here

# Database Configuration
DB_HOST=localhost
DB_PORT=5432
DB_NAME=warehouse
DB_USER=etl_user
DB_PASSWORD=secure_password

# Target Configuration
TARGET_TYPE=database
TARGET_TABLE=customers
OUTPUT_PATH=data/processed/

# Pipeline Settings
BATCH_SIZE=1000
RETRY_COUNT=3
LOG_LEVEL=INFO

6. Error Handling and Retry Logic

Robust Error Handling

# utils.py
import logging
import time
from functools import wraps
from typing import Callable, Any

logger = logging.getLogger(__name__)

def retry_on_failure(max_retries: int = 3, delay: int = 5):
"""
Decorator to retry a function on failure.

Args:
max_retries: Maximum number of retry attempts
delay: Delay between retries in seconds
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
logger.warning(
f"{func.__name__} failed (attempt {attempt + 1}/{max_retries}): {str(e)}"
)
logger.info(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
logger.error(f"{func.__name__} failed after {max_retries} attempts")
raise
return wrapper
return decorator

def log_execution_time(func: Callable) -> Callable:
"""Decorator to log function execution time."""
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
start_time = time.time()
logger.info(f"Starting {func.__name__}...")

try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info(f"{func.__name__} completed in {execution_time:.2f} seconds")
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"{func.__name__} failed after {execution_time:.2f} seconds: {str(e)}")
raise

return wrapper

7. Monitoring and Alerting

Pipeline Monitoring

# utils.py (continued)

class PipelineMonitor:
"""Monitor ETL pipeline execution."""

def __init__(self):
self.metrics = {
'start_time': None,
'end_time': None,
'rows_extracted': 0,
'rows_transformed': 0,
'rows_loaded': 0,
'errors': [],
'warnings': []
}

def start(self):
"""Mark pipeline start."""
self.metrics['start_time'] = datetime.now()
logger.info("Pipeline monitoring started")

def end(self):
"""Mark pipeline end."""
self.metrics['end_time'] = datetime.now()
duration = (self.metrics['end_time'] - self.metrics['start_time']).total_seconds()
self.metrics['duration'] = duration
logger.info(f"Pipeline monitoring ended. Duration: {duration:.2f}s")

def record_extraction(self, row_count: int):
"""Record extraction metrics."""
self.metrics['rows_extracted'] = row_count
logger.info(f"Recorded extraction: {row_count} rows")

def record_transformation(self, row_count: int):
"""Record transformation metrics."""
self.metrics['rows_transformed'] = row_count
logger.info(f"Recorded transformation: {row_count} rows")

def record_load(self, row_count: int):
"""Record load metrics."""
self.metrics['rows_loaded'] = row_count
logger.info(f"Recorded load: {row_count} rows")

def record_error(self, error: str):
"""Record an error."""
self.metrics['errors'].append({
'timestamp': datetime.now(),
'error': error
})
logger.error(f"Recorded error: {error}")

def record_warning(self, warning: str):
"""Record a warning."""
self.metrics['warnings'].append({
'timestamp': datetime.now(),
'warning': warning
})
logger.warning(f"Recorded warning: {warning}")

def get_summary(self) -> Dict:
"""Get pipeline execution summary."""
return {
'duration': self.metrics.get('duration', 0),
'rows_extracted': self.metrics['rows_extracted'],
'rows_transformed': self.metrics['rows_transformed'],
'rows_loaded': self.metrics['rows_loaded'],
'data_loss': self.metrics['rows_extracted'] - self.metrics['rows_loaded'],
'error_count': len(self.metrics['errors']),
'warning_count': len(self.metrics['warnings'])
}

8. Best Practices

1. Use Connection Pooling

from sqlalchemy.pool import QueuePool

engine = create_engine(
connection_string,
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_pre_ping=True # Verify connections before using
)

2. Implement Incremental Loading

def extract_incremental_data(last_updated: datetime) -> pd.DataFrame:
"""Extract only new or updated records."""
query = f"""
SELECT * FROM source_table
WHERE updated_at > '{last_updated}'
ORDER BY updated_at
"""
return extract_from_database(connection_string, query)

3. Use Batch Processing

def process_in_batches(df: pd.DataFrame, batch_size: int = 1000):
"""Process large datasets in batches."""
total_rows = len(df)
for i in range(0, total_rows, batch_size):
batch = df.iloc[i:i + batch_size]
logger.info(f"Processing batch {i//batch_size + 1}")
yield batch

4. Implement Data Versioning

def add_metadata_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Add metadata columns for tracking."""
df['etl_timestamp'] = datetime.now()
df['etl_version'] = '1.0.0'
df['source_system'] = 'production_db'
return df

5. Handle Large Files

def extract_large_csv_chunked(file_path: str, chunksize: int = 10000):
"""Read large CSV files in chunks."""
chunks = []
for chunk in pd.read_csv(file_path, chunksize=chunksize):
# Process each chunk
chunk = clean_data(chunk)
chunks.append(chunk)
return pd.concat(chunks, ignore_index=True)

9. Testing

Unit Tests

# test_pipeline.py
import pytest
import pandas as pd
from transform import clean_data, validate_data, transform_data

def test_clean_data():
"""Test data cleaning function."""
# Create test data with duplicates and nulls
df = pd.DataFrame({
'id': [1, 2, 2, 3],
'name': ['Alice', 'Bob', 'Bob', 'Charlie'],
'email': ['alice@example.com', 'bob@example.com', 'bob@example.com', None]
})

cleaned = clean_data(df)

# Should remove duplicates
assert len(cleaned) == 3
assert cleaned['id'].duplicated().sum() == 0

def test_validate_data():
"""Test data validation function."""
df = pd.DataFrame({
'id': [1, 2, 3],
'age': [25, 150, 30], # 150 is invalid
'email': ['alice@test.com', 'bob@test.com', 'charlie@test.com']
})

rules = {
'required_columns': ['id', 'email'],
'value_ranges': {'age': (0, 120)}
}

validated = validate_data(df, rules)

# Should remove row with invalid age
assert len(validated) == 2
assert all(validated['age'] <= 120)

def test_transform_data():
"""Test data transformation function."""
df = pd.DataFrame({
'first_name': ['Alice', 'Bob'],
'last_name': ['Smith', 'Jones'],
'email': ['ALICE@TEST.COM', 'BOB@TEST.COM']
})

transformed = transform_data(df)

# Should create full_name column
assert 'full_name' in transformed.columns
assert transformed['full_name'].tolist() == ['Alice Smith', 'Bob Jones']

# Should normalize email
assert all(transformed['email'].str.islower())

10. Scheduling and Automation

Using Schedule Library

# scheduler.py
import schedule
import time
from pipeline import run_etl_pipeline
from config import Config

def scheduled_job():
"""Run ETL pipeline as a scheduled job."""
config = {
'source_type': Config.SOURCE_TYPE,
'source_path': Config.SOURCE_PATH,
'target_type': Config.TARGET_TYPE,
'target_connection': Config.get_connection_string(),
'target_table': Config.TARGET_TABLE
}

try:
run_etl_pipeline(config)
except Exception as e:
logger.error(f"Scheduled job failed: {str(e)}")
# Send alert notification here

# Schedule the job
schedule.every().day.at("02:00").do(scheduled_job) # Run daily at 2 AM
# schedule.every().hour.do(scheduled_job) # Run hourly
# schedule.every(30).minutes.do(scheduled_job) # Run every 30 minutes

if __name__ == "__main__":
logger.info("ETL Scheduler started")

while True:
schedule.run_pending()
time.sleep(60) # Check every minute

Summary

This tutorial covered:

  • Extract: Reading data from CSV, APIs, and databases
  • Transform: Cleaning, validating, and transforming data
  • Load: Writing data to databases, CSV, and Parquet formats
  • Error Handling: Retry logic and robust exception handling
  • Monitoring: Tracking pipeline execution and metrics
  • Best Practices: Connection pooling, batch processing, and incremental loading
  • Testing: Unit tests for ETL functions
  • Scheduling: Automating pipeline execution

Next Steps

  • Implement parallel processing for large datasets using multiprocessing or dask
  • Add data profiling and anomaly detection
  • Integrate with workflow orchestration tools like Apache Airflow
  • Implement CDC (Change Data Capture) for real-time data sync
  • Add data lineage tracking
  • Implement automated data quality monitoring and alerting

Additional Resources