AI ML Pipeline Dagstart vs Apache Airflow
image generated by meta.ai
Let's comapare Dagster and Apache Airflow, for an AI/ML pipeline, we'll design a simple pipeline that includes data loading, data preprocessing, model training, and model evaluation.
Here's a breakdown of the common steps in an AI/ML pipeline and how they can be implemented in both Dagster and Airflow.
AI/ML Pipeline Steps:
- Data Loading: Load raw data from a source (e.g., CSV, database).
- Data Preprocessing: Clean, transform, and prepare the data for model training (e.g., handle missing values, feature scaling).
- Model Training: Train a machine learning model on the preprocessed data.
- Model Evaluation: Evaluate the trained model's performance.
Let's assume we're working with a simple scikit-learn example, like training a Logistic Regression model on the Iris dataset.
1. Dagster Script
Dagster uses "assets" to represent data and computations. A collection of assets forms a "code location."
File: iris_pipeline.py
from dagster import asset, define_asset_job, ScheduleDefinition, DefaultScheduleStatus
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.datasets import load_iris
# --- Assets ---
@asset
def raw_iris_data() -> pd.DataFrame:
"""
Loads the raw Iris dataset.
"""
iris = load_iris()
df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
df['target'] = iris.target
print("Raw Iris data loaded.")
return df
@asset
def preprocessed_iris_data(raw_iris_data: pd.DataFrame) -> pd.DataFrame:
"""
Preprocesses the raw Iris data (e.g., feature scaling, handling missing values - simple for Iris).
For Iris, this might just involve separating features and target.
"""
# In a real-world scenario, you'd add more complex preprocessing here.
# For now, let's assume raw_iris_data is clean enough for direct use,
# but we'll still pass it through a dedicated preprocessing asset for structure.
print("Iris data preprocessed.")
return raw_iris_data.copy() # Return a copy to avoid modifying the original
@asset
def trained_iris_model(preprocessed_iris_data: pd.DataFrame):
"""
Trains a Logistic Regression model on the preprocessed Iris data.
"""
X = preprocessed_iris_data.drop('target', axis=1)
y = preprocessed_iris_data['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = LogisticRegression(max_iter=200) # Increased max_iter for convergence
model.fit(X_train, y_train)
print("Iris model trained.")
# In a real scenario, you might save the model to a file or S3.
# For simplicity, we'll just return it directly for the next asset.
return model, X_test, y_test
@asset
def model_evaluation(trained_iris_model):
"""
Evaluates the trained Iris model.
"""
model, X_test, y_test = trained_iris_model
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {accuracy:.4f}")
return {"accuracy": accuracy}
# --- Jobs ---
# A job that materializes all assets in the pipeline
ml_pipeline_job = define_asset_job(
name="iris_ml_pipeline",
selection=[raw_iris_data, preprocessed_iris_data, trained_iris_model, model_evaluation]
)
# --- Schedules (Optional) ---
# Schedule to run the pipeline daily
daily_ml_pipeline_schedule = ScheduleDefinition(
job=ml_pipeline_job,
cron_schedule="0 0 * * *", # run daily at midnight
default_status=DefaultScheduleStatus.RUNNING,
name="daily_iris_ml_pipeline_schedule"
)
# To run this Dagster pipeline:
# 1. Save the code as `iris_pipeline.py`.
# 2. Install Dagster: `pip install dagster dagster-webserver scikit-learn pandas`
# 3. Run the Dagster UI: `dagster dev -f iris_pipeline.py`
# 4. Open your browser to http://localhost:3000 and navigate to the "Deployments" tab,
# then select your "Code Location". You can launch the job or enable the schedule.
Explanation for Dagster:
@assetdecorator: Marks a function as an asset, representing a piece of data or a computation that produces data.- Dependencies: Dagster automatically infers dependencies between assets based on their function arguments (e.g.,
preprocessed_iris_datadepends onraw_iris_data). - Type Hints: Type hints (
-> pd.DataFrame) help Dagster understand the expected output types, which can be used for data validation. define_asset_job: Groups related assets into a job, which is a runnable unit of work.ScheduleDefinition: Allows you to define recurring runs for your jobs based on cron schedules.- Dagster UI (
dagster dev): Provides a rich interface for visualizing pipelines, launching runs, and monitoring their execution.
2. Apache Airflow Script
Apache Airflow uses "DAGs" (Directed Acyclic Graphs) to define workflows. Each node in the DAG is a "Task."
File: iris_ml_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.datasets import load_iris
# Define the default arguments for the DAG
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 1,
}
# --- Python Functions for Tasks ---
def _load_raw_data(**kwargs):
"""
Loads the raw Iris dataset and pushes it to XCom.
"""
iris = load_iris()
df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
df['target'] = iris.target
print("Raw Iris data loaded.")
# Push DataFrame to XCom (eXchange Communication) for downstream tasks
kwargs['ti'].xcom_push(key='raw_data', value=df.to_json())
def _preprocess_data(**kwargs):
"""
Preprocesses the raw Iris data and pushes it to XCom.
"""
ti = kwargs['ti']
raw_data_json = ti.xcom_pull(task_ids='load_raw_data', key='raw_data')
df = pd.read_json(raw_data_json)
# In a real-world scenario, you'd add more complex preprocessing here.
print("Iris data preprocessed.")
kwargs['ti'].xcom_push(key='preprocessed_data', value=df.to_json())
def _train_model(**kwargs):
"""
Trains a Logistic Regression model and pushes the model, X_test, and y_test to XCom.
"""
ti = kwargs['ti']
preprocessed_data_json = ti.xcom_pull(task_ids='preprocess_data', key='preprocessed_data')
df = pd.read_json(preprocessed_data_json)
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = LogisticRegression(max_iter=200)
model.fit(X_train, y_train)
print("Iris model trained.")
# For simplicity, we'll store X_test and y_test in XCom.
# In a production setting, you'd serialize the model (e.g., with joblib or pickle)
# and save it to a persistent store (e.g., S3, local file system).
kwargs['ti'].xcom_push(key='trained_model_X_test_json', value=X_test.to_json())
kwargs['ti'].xcom_push(key='trained_model_y_test_json', value=pd.Series(y_test).to_json())
# For the model itself, you'd typically save it to a file and pass the path,
# or use a model registry. For this example, we'll skip direct model passing via XCom
# due to serialization complexity and size limits, and assume it's "available" for eval.
def _evaluate_model(**kwargs):
"""
Evaluates the trained Iris model.
"""
ti = kwargs['ti']
X_test_json = ti.xcom_pull(task_ids='train_model', key='trained_model_X_test_json')
y_test_json = ti.xcom_pull(task_ids='train_model', key='trained_model_y_test_json')
X_test = pd.read_json(X_test_json)
y_test = pd.read_json(y_test_json, typ='series') # Use typ='series' for Series
# Re-instantiate a dummy model or load the saved model for evaluation.
# In a real scenario, the model would be loaded from a persistent store.
# For this example, let's assume we re-train a quick model for demo purposes
# or if we had serialized the model in the previous step, we'd load it here.
# To truly evaluate the *trained* model, you'd need to serialize/deserialize it.
# Let's slightly adjust `_train_model` to save and load a dummy model.
# For simplicity of *this specific example*, we'll use the preprocessed data directly
# and re-train a temporary model *for evaluation's purpose*, emphasizing the flow.
# A robust solution would pass the actual trained model.
# Re-doing the training to get a model instance for evaluation for this demo.
# This is NOT how you'd do it in production! You'd serialize the model!
preprocessed_data_json = ti.xcom_pull(task_ids='preprocess_data', key='preprocessed_data')
df = pd.read_json(preprocessed_data_json)
X = df.drop('target', axis=1)
y = df['target']
X_train, _, y_train, _ = train_test_split(X, y, test_size=0.2, random_state=42)
model_for_eval = LogisticRegression(max_iter=200)
model_for_eval.fit(X_train, y_train)
y_pred = model_for_eval.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {accuracy:.4f}")
kwargs['ti'].xcom_push(key='model_accuracy', value=accuracy)
# Define the DAG
with DAG(
dag_id='iris_ml_pipeline_airflow',
default_args=default_args,
description='An ML pipeline for Iris dataset using Airflow',
schedule_interval='@daily', # Run daily
tags=['ml', 'iris'],
catchup=False # Do not run for past missed schedules
) as dag:
load_raw_data_task = PythonOperator(
task_id='load_raw_data',
python_callable=_load_raw_data,
)
preprocess_data_task = PythonOperator(
task_id='preprocess_data',
python_callable=_preprocess_data,
)
train_model_task = PythonOperator(
task_id='train_model',
python_callable=_train_model,
)
evaluate_model_task = PythonOperator(
task_id='evaluate_model',
python_callable=_evaluate_model,
)
# Define task dependencies
load_raw_data_task >> preprocess_data_task >> train_model_task >> evaluate_model_task
# To run this Airflow DAG:
# 1. Save the code as `iris_ml_dag.py` in your Airflow DAGs folder.
# 2. Ensure Airflow is running (Scheduler and Webserver).
# 3. Install necessary packages: `pip install apache-airflow scikit-learn pandas`
# 4. The DAG should appear in the Airflow UI (http://localhost:8080 by default).
# 5. Unpause the DAG and trigger it manually or wait for the schedule.
Explanation for Apache Airflow:
DAGobject: Defines the Directed Acyclic Graph, which is the entire workflow.default_args: Common arguments applied to all tasks in the DAG.PythonOperator: Executes a Python callable function as a task._load_raw_data,_preprocess_data, etc.: These are standard Python functions that define the logic for each step of the pipeline.XCom(Cross-communication): Airflow's mechanism for tasks to exchange small amounts of data. We usexcom_pushto send data from one task andxcom_pullto retrieve it in a downstream task. For larger datasets or models, you'd typically save them to a shared storage (S3, GCS, HDFS) and pass the path via XCom.- Task Dependencies (
>>): Defines the order in which tasks should run.A >> Bmeans task A must complete successfully before task B starts. schedule_interval: Defines how often the DAG should run (e.g.,@daily,@hourly,Nonefor manual trigger).- Airflow UI: Provides a web interface to monitor DAG runs, view logs, and trigger tasks.
Key Differences and Considerations for AI/ML Pipelines:
| Feature | Dagster | Apache Airflow |
| Core Abstraction | Assets (data/computations) | Tasks (units of work) |
| Data Flow | Explicit input/output via function arguments (type-safe) | Implicit via XCom or external storage |
| Data Lineage | Built-in, strong focus on data dependencies | Can be tracked, but less opinionated out-of-the-box |
| Reproducibility | Strong asset versioning and snapshotting | Relies on task code and external versioning |
| UI/Observability | Comprehensive, asset-centric UI | Task-centric UI, good for monitoring runs |
| Testing | Easier to test individual assets | Testing individual tasks can be more isolated |
| Scalability | Good for local and distributed execution | Highly scalable, robust for large organizations |
| ML-Specific | Designed with data/ML in mind (e.g., software-defined assets, ops) | General-purpose workflow orchestrator |
| Complexity for ML | Can be more intuitive for data scientists due to asset model | Requires more explicit data handling between tasks |
Choose between Dagster and Airflow based on your team's familiarity, project complexity, and specific requirements for data lineage, reproducibility, and observability in your ML workflows. Dagster often feels more "data-aware" and can be a strong choice for data-centric ML projects, while Airflow is a very mature and widely adopted general-purpose orchestrator.

Comments