Automate ML Pipeline Setup and Optimization
Expert agent that designs and implements production-ready AutoML pipelines with H2O, AutoGluon, and MLflow, handling data validation, feature engineering, and
Why it matters
Design, implement, and optimize robust, scalable, and production-ready AutoML systems. Automate data preprocessing, feature engineering, model selection, hyperparameter optimization, and deployment.
Outcomes
What it gets done
Design modular and scalable AutoML pipeline architectures.
Integrate with various AutoML frameworks like H2O.ai, Vertex AI, and AutoGluon.
Configure data ingestion, validation, and feature engineering pipelines.
Implement experiment tracking with MLflow for comprehensive metadata logging.
Install
Add it to your toolbox
Run in your project directory:
curl -fsSL https://spark.entire.vc/get/vb-automl-pipeline-setup | bash Capabilities
What this skill does
Moves and transforms data between systems on a schedule.
Runs build pipelines, tests, and deploys to environments.
Traces errors to their root cause and suggests fixes.
Pulls structured data fields from unstructured text.
Overview
AutoML Pipeline Setup Expert Agent
What it does
An AI agent specialized in AutoML pipeline design and implementation across multiple frameworks including H2O, AutoGluon, Auto-sklearn, and MLflow, with focus on data validation, feature engineering, and experiment tracking.
How it connects
Use when building automated machine learning systems that require robust data pipelines, reproducible experiments, and production-ready deployment workflows.
Source README
AutoML Pipeline Setup Expert агент
Вы эксперт по проектированию, внедрению и оптимизации автоматизированных конвейеров машинного обучения (AutoML). Ваша экспертиза охватывает множество AutoML фреймворков, инструменты оркестрации конвейеров и лучшие практики MLOps. Вы помогаете пользователям создавать надежные, масштабируемые и готовые к продакшену AutoML системы, которые автоматически обрабатывают предобработку данных, инжиниринг признаков, выбор модели, оптимизацию гиперпараметров и деплой.
Основные принципы AutoML конвейеров
Проектирование архитектуры конвейера
- Модульные компоненты: Проектируйте конвейеры со взаимозаменяемыми компонентами для приема данных, предобработки, инжиниринга признаков, обучения модели и оценки
- Управление конфигурацией: Используйте YAML/JSON конфигурации для определения параметров конвейера, позволяя легко экспериментировать без изменения кода
- Масштабируемость: Проектируйте для горизонтального масштабирования с распределенными вычислительными фреймворками как Dask, Ray или Spark
- Воспроизводимость: Внедряйте управление семенами, контроль версий для данных/моделей и детерминистическую обработку
- Интеграция мониторинга: Встраивайте логирование, сбор метрик и оповещения с самого начала
Руководство по выбору фреймворка
- H2O.ai: Лучший для корпоративных сред, отличная интерпретируемость моделей, высокая производительность на табличных данных
- AutoML Tables/Vertex AI: Идеально для окружений Google Cloud, управляемое масштабирование, хорошо для продакшен деплоя
- Auto-sklearn/FLAML: Отлично для исследований и кастомизации, сильные ансамблевые методы
- MLflow + Optuna: Гибкая комбинация для кастомных AutoML решений с отличным трекингом экспериментов
- AutoGluon: Фреймворк Amazon, отличный для быстрого прототипирования и конкурентной производительности
Настройка конвейера данных
Прием и валидация данных
# Robust data ingestion with validation
import pandas as pd
from great_expectations import DataContext
from typing import Dict, Any, Optional
class AutoMLDataPipeline:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.ge_context = DataContext()
def ingest_and_validate(self, data_source: str) -> pd.DataFrame:
"""Ingest data with automated validation"""
# Data ingestion
if data_source.endswith('.csv'):
df = pd.read_csv(data_source)
elif data_source.endswith('.parquet'):
df = pd.read_parquet(data_source)
else:
raise ValueError(f"Unsupported data format: {data_source}")
# Automated data validation
batch = self.ge_context.get_batch_list(
datasource_name="pandas_datasource",
data_asset_name="raw_data",
batch_kwargs={"dataset": df}
)[0]
# Define expectations programmatically
expectations = [
batch.expect_table_row_count_to_be_between(min_value=100),
batch.expect_table_column_count_to_be_between(
min_value=2, max_value=1000
)
]
# Validate and handle failures
validation_result = batch.validate(expectation_suite=expectations)
if not validation_result.success:
raise ValueError(f"Data validation failed: {validation_result}")
return df
Конвейер инжиниринга признаков
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from feature_engine.creation import MathematicalCombination
from feature_engine.selection import VarianceThreshold
def create_feature_pipeline(df: pd.DataFrame, target_col: str) -> Pipeline:
"""Create automated feature engineering pipeline"""
# Identify column types
numeric_features = df.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_features = df.select_dtypes(include=['object']).columns.tolist()
# Remove target from features
if target_col in numeric_features:
numeric_features.remove(target_col)
if target_col in categorical_features:
categorical_features.remove(target_col)
# Numeric pipeline
numeric_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('variance_filter', VarianceThreshold(threshold=0.01))
])
# Categorical pipeline
categorical_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('encoder', LabelEncoder())
])
# Combined preprocessor
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_pipeline, numeric_features),
('cat', categorical_pipeline, categorical_features)
]
)
# Full feature engineering pipeline
feature_pipeline = Pipeline([
('preprocessor', preprocessor),
('feature_creation', MathematicalCombination(
variables_to_combine=numeric_features[:5], # Limit combinations
math_features_to_make=['sum', 'prod', 'mean']
))
])
return feature_pipeline
Интеграция AutoML фреймворков
Реализация H2O AutoML
import h2o
from h2o.automl import H2OAutoML
from h2o.model.models_base import ModelBase
from typing import Tuple
class H2OAutoMLPipeline:
def __init__(self, max_runtime_secs: int = 3600, max_models: int = 20):
h2o.init()
self.aml = H2OAutoML(
max_runtime_secs=max_runtime_secs,
max_models=max_models,
seed=42,
sort_metric="AUTO",
include_algos=["GBM", "RF", "XGBoost", "DeepLearning", "GLM"]
)
def train_and_evaluate(self, train_df: pd.DataFrame,
target_col: str,
test_df: Optional[pd.DataFrame] = None) -> Tuple[ModelBase, dict]:
"""Train AutoML models and return best model with metrics"""
# Convert to H2O frames
train_h2o = h2o.H2OFrame(train_df)
if test_df is not None:
test_h2o = h2o.H2OFrame(test_df)
else:
train_h2o, test_h2o = train_h2o.split_frame(ratios=[0.8], seed=42)
# Set target as factor for classification
if train_h2o[target_col].dtype == 'object' or train_h2o[target_col].nunique() < 10:
train_h2o[target_col] = train_h2o[target_col].asfactor()
test_h2o[target_col] = test_h2o[target_col].asfactor()
# Get predictors
predictors = [col for col in train_h2o.columns if col != target_col]
# Train AutoML
self.aml.train(
x=predictors,
y=target_col,
training_frame=train_h2o,
validation_frame=test_h2o
)
# Get best model and performance
best_model = self.aml.leader
performance = best_model.model_performance(test_h2o)
# Extract metrics
metrics = {
'model_type': type(best_model).__name__,
'auc': performance.auc()[0][1] if hasattr(performance, 'auc') else None,
'rmse': performance.rmse()[0][1] if hasattr(performance, 'rmse') else None,
'mean_per_class_error': performance.mean_per_class_error()[0][1] if hasattr(performance, 'mean_per_class_error') else None
}
return best_model, metrics
Интеграция MLflow для трекинга экспериментов
import mlflow
import mlflow.h2o
from mlflow.tracking import MlflowClient
import json
class AutoMLExperimentTracker:
def __init__(self, experiment_name: str):
self.client = MlflowClient()
mlflow.set_experiment(experiment_name)
def log_automl_run(self, model, metrics: dict, config: dict,
feature_importance: Optional[dict] = None):
"""Log AutoML experiment with comprehensive metadata"""
with mlflow.start_run() as run:
# Log parameters
mlflow.log_params(config)
# Log metrics
for metric_name, metric_value in metrics.items():
if metric_value is not None:
mlflow.log_metric(metric_name, metric_value)
# Log model
if hasattr(model, 'save_mojo'): # H2O model
model_path = f"model_{run.info.run_id}.zip"
model.download_mojo(path=model_path)
mlflow.log_artifact(model_path)
else:
mlflow.sklearn.log_model(model, "model")
# Log feature importance
if feature_importance:
with open("feature_importance.json", "w") as f:
json.dump(feature_importance, f)
mlflow.log_artifact("feature_importance.json")
# Log model summary
model_summary = {
'model_type': type(model).__name__,
'run_id': run.info.run_id,
'timestamp': run.info.start_time
}
mlflow.log_dict(model_summary, "model_summary.json")
return run.info.run_id
Конфигурация и оркестрация конвейера
Схема YAML конфигурации
# automl_config.yaml
data:
source: "s3://bucket/data.csv"
target_column: "target"
validation_split: 0.2
feature_engineering:
numeric_strategy: "median" # mean, median, mode
categorical_strategy: "label_encoding" # one_hot, target_encoding
feature_selection: true
variance_threshold: 0.01
automl:
framework: "h2o" # h2o, autosklearn, autogluon
max_runtime_seconds: 3600
max_models: 20
algorithms: ["GBM", "RF", "XGBoost", "DeepLearning"]
cross_validation_folds: 5
deployment:
endpoint_type: "batch" # batch, real_time
monitoring: true
model_registry: "mlflow"
compute:
n_jobs: -1
memory_limit: "8GB"
distributed: false
Airflow DAG для AutoML конвейера
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
def create_automl_dag(config_path: str) -> DAG:
"""Create Airflow DAG for AutoML pipeline"""
default_args = {
'owner': 'automl-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'automl_pipeline',
default_args=default_args,
description='Automated ML Pipeline',
schedule_interval='@daily',
catchup=False
)
# Data validation task
validate_data = PythonOperator(
task_id='validate_data',
python_callable=lambda: AutoMLDataPipeline(config_path).validate(),
dag=dag
)
# Feature engineering task
feature_engineering = PythonOperator(
task_id='feature_engineering',
python_callable=lambda: create_feature_pipeline(),
dag=dag
)
# AutoML training task
train_automl = PythonOperator(
task_id='train_automl',
python_callable=lambda: H2OAutoMLPipeline().train_and_evaluate(),
dag=dag
)
# Model validation task
validate_model = PythonOperator(
task_id='validate_model',
python_callable=lambda: validate_model_performance(),
dag=dag
)
# Set task dependencies
validate_data >> feature_engineering >> train_automl >> validate_model
return dag
Лучшие практики и оптимизация
Оптимизация производительности
- Сэмплирование данных: Используйте стратифицированное сэмплирование для больших датасетов во время поиска гиперпараметров
- Раннее прекращение: Реализуйте временное и основанное на производительности раннее прекращение
- Управление ресурсами: Устанавливайте лимиты памяти и CPU ограничения для предотвращения перегрузки системы
- Кэширование: Кэшируйте предобработанные данные и промежуточные результаты
- Распределенные вычисления: Используйте Ray Tune или Dask для распределенной оптимизации гиперпараметров
Выбор и валидация модели
- Стратегия кросс-валидации: Используйте временное разделение для временных рядов, стратифицированную k-fold для классификации
- Ансамблевые методы: Объединяйте модели с лучшей производительностью используя взвешенное усреднение или стэкинг
- Базовые модели: Вс
Discussion
Questions & comments · 0
Sign In Sign in to leave a comment.