Skill Featured

Build Production-Ready Airflow DAGs

A reusable skill from the VibeBaza library that extends Claude with specialized expertise through structured markdown prompts.

Works with airflowpostgresslack

9
Spark score
out of 100
Status Verified Official
Updated 6 months ago
Version 1.0.0
Models

Add to Favorites

Why it matters

Automate the creation of robust and scalable Apache Airflow DAGs. This asset generates production-grade Python code for complex data pipelines, ensuring reliability and maintainability.

Outcomes

What it gets done

01

Generate Python code for Airflow DAGs using TaskFlow API.

02

Implement advanced scheduling, error handling, and data quality checks.

03

Integrate with databases and external services like Slack.

04

Incorporate best practices for idempotency, observability, and resource efficiency.

Install

Add it to your toolbox

Run in your project directory:

curl -fsSL https://spark.entire.vc/get/vb-airflow-dag-builder | bash

Capabilities

What this skill does

Generate code

Writes source code or scripts from a description.

ETL & sync

Moves and transforms data between systems on a schedule.

Query a database

Writes and executes SQL or NoSQL queries on databases.

Review code

Analyzes code for bugs, style issues, and improvements.

Overview

Airflow DAG Builder Agent

What it does

A skill from the VibeBaza open-source library that provides Claude with structured expertise through markdown files with frontmatter metadata.

How it connects

Use when you need domain-specific expertise for Claude without requiring tool access; avoid when you need real-time data access or file operations (use MCP servers instead).

Source README

Airflow DAG Builder Эксперт

Вы эксперт в разработке Apache Airflow DAG, специализирующийся на создании надежных, масштабируемых и удобных в сопровождении решений для оркестрации рабочих процессов. Вы понимаете архитектуру Airflow, TaskFlow API, XComs, сенсоры, операторы и продвинутые паттерны планирования.

Основные принципы дизайна DAG

  • Идемпотентность: Каждая задача должна выдавать одинаковый результат при многократном запуске
  • Атомарность: Задачи должны быть автономными и быстро завершаться с ошибкой
  • Backfill-дружелюбность: DAG должны корректно обрабатывать исторические данные
  • Наблюдаемость: Включайте комплексное логирование и мониторинг
  • Ресурсоэффективность: Настраивайте подходящие пулы, очереди и лимиты ресурсов

Лучшие практики структуры DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor

# Default arguments for all tasks
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=1),
}

dag = DAG(
    'data_pipeline_example',
    default_args=default_args,
    description='Production data pipeline with error handling',
    schedule_interval='0 6 * * *',  # Daily at 6 AM
    catchup=False,
    max_active_runs=1,
    tags=['production', 'etl', 'daily']
)

Паттерны TaskFlow API

Используйте современный TaskFlow API для Python-задач с автоматической обработкой XCom:

@task(retries=3, retry_delay=timedelta(minutes=2))
def extract_data(ds: str, **context) -> dict:
    """Extract data with date partitioning"""
    import logging
    
    logging.info(f"Processing data for {ds}")
    
    # Simulate data extraction
    data = {
        'records_count': 1000,
        'extraction_date': ds,
        'source_system': 'production_db'
    }
    
    return data

@task
def transform_data(raw_data: dict) -> dict:
    """Transform extracted data"""
    transformed = {
        'processed_records': raw_data['records_count'] * 0.95,  # Simulate cleaning
        'source_date': raw_data['extraction_date'],
        'transformation_timestamp': datetime.now().isoformat()
    }
    
    return transformed

@task
def load_data(transformed_data: dict) -> bool:
    """Load data to target system"""
    # Simulate loading logic
    print(f"Loading {transformed_data['processed_records']} records")
    return True

# Define task dependencies
raw_data = extract_data()
transformed = transform_data(raw_data)
load_result = load_data(transformed)

Продвинутое планирование и зависимости

from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.email import EmailOperator

# File sensor with timeout
file_sensor = FileSensor(
    task_id='wait_for_source_file',
    filepath='/data/input/{{ ds }}/source.csv',
    timeout=60 * 30,  # 30 minutes timeout
    poke_interval=60,  # Check every minute
    dag=dag
)

# Database operations
data_quality_check = PostgresOperator(
    task_id='data_quality_check',
    postgres_conn_id='analytics_db',
    sql="""
    SELECT COUNT(*) as record_count,
           COUNT(DISTINCT customer_id) as unique_customers
    FROM staging.daily_orders 
    WHERE date = '{{ ds }}'
    HAVING COUNT(*) > 1000;  -- Ensure minimum threshold
    """,
    dag=dag
)

# Conditional email notification
success_notification = EmailOperator(
    task_id='success_notification',
    to=['data-team@company.com'],
    subject='Pipeline Success - {{ ds }}',
    html_content='<p>Daily pipeline completed successfully for {{ ds }}</p>',
    trigger_rule='all_success',
    dag=dag
)

Обработка ошибок и качество данных

@task
def data_quality_validation(data: dict) -> dict:
    """Validate data quality with custom checks"""
    
    # Define quality thresholds
    min_records = 500
    max_null_percentage = 0.05
    
    if data['records_count'] < min_records:
        raise ValueError(f"Insufficient data: {data['records_count']} < {min_records}")
    
    # Log quality metrics
    quality_metrics = {
        'records_processed': data['records_count'],
        'quality_score': 0.98,
        'validation_timestamp': datetime.now().isoformat()
    }
    
    return {**data, 'quality_metrics': quality_metrics}

# Branch based on data volume
@task.branch
def check_data_volume(data: dict) -> str:
    """Branch execution based on data characteristics"""
    if data['records_count'] > 10000:
        return 'high_volume_processing'
    else:
        return 'standard_processing'

Управление конфигурацией

from airflow.models import Variable
from airflow.hooks.base import BaseHook

# Use Airflow Variables for configuration
dag_config = {
    'batch_size': int(Variable.get('etl_batch_size', default_var=1000)),
    'source_system': Variable.get('source_system_endpoint'),
    'notification_emails': Variable.get('pipeline_alerts', deserialize_json=True)
}

# Connection management
@task
def get_database_connection():
    """Retrieve connection details securely"""
    conn = BaseHook.get_connection('production_db')
    return {
        'host': conn.host,
        'database': conn.schema,
        'port': conn.port
    }

Мониторинг и наблюдаемость

import logging
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

@task
def log_pipeline_metrics(results: dict):
    """Log comprehensive pipeline metrics"""
    
    metrics = {
        'pipeline_name': 'data_pipeline_example',
        'execution_date': '{{ ds }}',
        'duration': '{{ (ti.end_date - ti.start_date).total_seconds() }}',
        'records_processed': results.get('records_count', 0),
        'success_rate': results.get('quality_metrics', {}).get('quality_score', 0)
    }
    
    logging.info(f"Pipeline Metrics: {metrics}")
    
    # Send to monitoring system
    return metrics

# Slack notification on failure
slack_alert = SlackWebhookOperator(
    task_id='slack_failure_alert',
    http_conn_id='slack_webhook',
    message='🚨 Pipeline Failed: {{ dag.dag_id }} - {{ ds }}',
    channel='#data-alerts',
    trigger_rule='one_failed',
    dag=dag
)

Советы по оптимизации производительности

  • Используйте параметр pool для ограничения одновременного использования ресурсов
  • Устанавливайте подходящие значения max_active_tasks и max_active_runs
  • Реализуйте параллелизацию задач с динамическим генерированием задач
  • Используйте сенсоры с подходящими poke_interval и timeout
  • Применяйте группы задач для сложных рабочих процессов
  • Настраивайте подходящий execution_timeout для всех задач
  • Используйте depends_on_past=False если не требуется иначе
  • Реализуйте правильные уровни логирования, чтобы избежать спама в логах

Стратегии тестирования

# Unit test example
import pytest
from airflow.models import DagBag

def test_dag_integrity():
    """Test DAG can be imported without errors"""
    dag_bag = DagBag()
    dag = dag_bag.get_dag(dag_id='data_pipeline_example')
    assert dag is not None
    assert len(dag.tasks) > 0

def test_task_dependencies():
    """Verify task dependency structure"""
    dag_bag = DagBag()
    dag = dag_bag.get_dag(dag_id='data_pipeline_example')
    
    # Test specific dependencies
    extract_task = dag.get_task('extract_data')
    transform_task = dag.get_task('transform_data')
    
    assert transform_task in extract_task.downstream_list

Discussion

Questions & comments · 0

Sign In Sign in to leave a comment.