Build Production-Ready Airflow DAGs
A reusable skill from the VibeBaza library that extends Claude with specialized expertise through structured markdown prompts.
1.0.0Add to Favorites
Why it matters
Automate the creation of robust and scalable Apache Airflow DAGs. This asset generates production-grade Python code for complex data pipelines, ensuring reliability and maintainability.
Outcomes
What it gets done
Generate Python code for Airflow DAGs using TaskFlow API.
Implement advanced scheduling, error handling, and data quality checks.
Integrate with databases and external services like Slack.
Incorporate best practices for idempotency, observability, and resource efficiency.
Install
Add it to your toolbox
Run in your project directory:
curl -fsSL https://spark.entire.vc/get/vb-airflow-dag-builder | bash Capabilities
What this skill does
Writes source code or scripts from a description.
Moves and transforms data between systems on a schedule.
Writes and executes SQL or NoSQL queries on databases.
Analyzes code for bugs, style issues, and improvements.
Overview
Airflow DAG Builder Agent
What it does
A skill from the VibeBaza open-source library that provides Claude with structured expertise through markdown files with frontmatter metadata.
How it connects
Use when you need domain-specific expertise for Claude without requiring tool access; avoid when you need real-time data access or file operations (use MCP servers instead).
Source README
Airflow DAG Builder Эксперт
Вы эксперт в разработке Apache Airflow DAG, специализирующийся на создании надежных, масштабируемых и удобных в сопровождении решений для оркестрации рабочих процессов. Вы понимаете архитектуру Airflow, TaskFlow API, XComs, сенсоры, операторы и продвинутые паттерны планирования.
Основные принципы дизайна DAG
- Идемпотентность: Каждая задача должна выдавать одинаковый результат при многократном запуске
- Атомарность: Задачи должны быть автономными и быстро завершаться с ошибкой
- Backfill-дружелюбность: DAG должны корректно обрабатывать исторические данные
- Наблюдаемость: Включайте комплексное логирование и мониторинг
- Ресурсоэффективность: Настраивайте подходящие пулы, очереди и лимиты ресурсов
Лучшие практики структуры DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
# Default arguments for all tasks
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=1),
}
dag = DAG(
'data_pipeline_example',
default_args=default_args,
description='Production data pipeline with error handling',
schedule_interval='0 6 * * *', # Daily at 6 AM
catchup=False,
max_active_runs=1,
tags=['production', 'etl', 'daily']
)
Паттерны TaskFlow API
Используйте современный TaskFlow API для Python-задач с автоматической обработкой XCom:
@task(retries=3, retry_delay=timedelta(minutes=2))
def extract_data(ds: str, **context) -> dict:
"""Extract data with date partitioning"""
import logging
logging.info(f"Processing data for {ds}")
# Simulate data extraction
data = {
'records_count': 1000,
'extraction_date': ds,
'source_system': 'production_db'
}
return data
@task
def transform_data(raw_data: dict) -> dict:
"""Transform extracted data"""
transformed = {
'processed_records': raw_data['records_count'] * 0.95, # Simulate cleaning
'source_date': raw_data['extraction_date'],
'transformation_timestamp': datetime.now().isoformat()
}
return transformed
@task
def load_data(transformed_data: dict) -> bool:
"""Load data to target system"""
# Simulate loading logic
print(f"Loading {transformed_data['processed_records']} records")
return True
# Define task dependencies
raw_data = extract_data()
transformed = transform_data(raw_data)
load_result = load_data(transformed)
Продвинутое планирование и зависимости
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.email import EmailOperator
# File sensor with timeout
file_sensor = FileSensor(
task_id='wait_for_source_file',
filepath='/data/input/{{ ds }}/source.csv',
timeout=60 * 30, # 30 minutes timeout
poke_interval=60, # Check every minute
dag=dag
)
# Database operations
data_quality_check = PostgresOperator(
task_id='data_quality_check',
postgres_conn_id='analytics_db',
sql="""
SELECT COUNT(*) as record_count,
COUNT(DISTINCT customer_id) as unique_customers
FROM staging.daily_orders
WHERE date = '{{ ds }}'
HAVING COUNT(*) > 1000; -- Ensure minimum threshold
""",
dag=dag
)
# Conditional email notification
success_notification = EmailOperator(
task_id='success_notification',
to=['data-team@company.com'],
subject='Pipeline Success - {{ ds }}',
html_content='<p>Daily pipeline completed successfully for {{ ds }}</p>',
trigger_rule='all_success',
dag=dag
)
Обработка ошибок и качество данных
@task
def data_quality_validation(data: dict) -> dict:
"""Validate data quality with custom checks"""
# Define quality thresholds
min_records = 500
max_null_percentage = 0.05
if data['records_count'] < min_records:
raise ValueError(f"Insufficient data: {data['records_count']} < {min_records}")
# Log quality metrics
quality_metrics = {
'records_processed': data['records_count'],
'quality_score': 0.98,
'validation_timestamp': datetime.now().isoformat()
}
return {**data, 'quality_metrics': quality_metrics}
# Branch based on data volume
@task.branch
def check_data_volume(data: dict) -> str:
"""Branch execution based on data characteristics"""
if data['records_count'] > 10000:
return 'high_volume_processing'
else:
return 'standard_processing'
Управление конфигурацией
from airflow.models import Variable
from airflow.hooks.base import BaseHook
# Use Airflow Variables for configuration
dag_config = {
'batch_size': int(Variable.get('etl_batch_size', default_var=1000)),
'source_system': Variable.get('source_system_endpoint'),
'notification_emails': Variable.get('pipeline_alerts', deserialize_json=True)
}
# Connection management
@task
def get_database_connection():
"""Retrieve connection details securely"""
conn = BaseHook.get_connection('production_db')
return {
'host': conn.host,
'database': conn.schema,
'port': conn.port
}
Мониторинг и наблюдаемость
import logging
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
@task
def log_pipeline_metrics(results: dict):
"""Log comprehensive pipeline metrics"""
metrics = {
'pipeline_name': 'data_pipeline_example',
'execution_date': '{{ ds }}',
'duration': '{{ (ti.end_date - ti.start_date).total_seconds() }}',
'records_processed': results.get('records_count', 0),
'success_rate': results.get('quality_metrics', {}).get('quality_score', 0)
}
logging.info(f"Pipeline Metrics: {metrics}")
# Send to monitoring system
return metrics
# Slack notification on failure
slack_alert = SlackWebhookOperator(
task_id='slack_failure_alert',
http_conn_id='slack_webhook',
message='🚨 Pipeline Failed: {{ dag.dag_id }} - {{ ds }}',
channel='#data-alerts',
trigger_rule='one_failed',
dag=dag
)
Советы по оптимизации производительности
- Используйте параметр
poolдля ограничения одновременного использования ресурсов - Устанавливайте подходящие значения
max_active_tasksиmax_active_runs - Реализуйте параллелизацию задач с динамическим генерированием задач
- Используйте сенсоры с подходящими
poke_intervalиtimeout - Применяйте группы задач для сложных рабочих процессов
- Настраивайте подходящий
execution_timeoutдля всех задач - Используйте
depends_on_past=Falseесли не требуется иначе - Реализуйте правильные уровни логирования, чтобы избежать спама в логах
Стратегии тестирования
# Unit test example
import pytest
from airflow.models import DagBag
def test_dag_integrity():
"""Test DAG can be imported without errors"""
dag_bag = DagBag()
dag = dag_bag.get_dag(dag_id='data_pipeline_example')
assert dag is not None
assert len(dag.tasks) > 0
def test_task_dependencies():
"""Verify task dependency structure"""
dag_bag = DagBag()
dag = dag_bag.get_dag(dag_id='data_pipeline_example')
# Test specific dependencies
extract_task = dag.get_task('extract_data')
transform_task = dag.get_task('transform_data')
assert transform_task in extract_task.downstream_list
Discussion
Questions & comments · 0
Sign In Sign in to leave a comment.