Home Programming Apache Airflow for Data Pipeline Orchestration: A Practical Guide

Apache Airflow for Data Pipeline Orchestration: A Practical Guide

Why Your Cron Jobs Keep Lying to You

It was 3:47 a.m. when the CFO’s dashboard went dark. The revenue numbers from yesterday simply did not exist. The engineering team scrambled, logged into the analytics server, and discovered a single line in a cron log: psql: FATAL: connection refused. That was five days ago. The cron job had failed silently every single night. No alerts. No retries. No visibility. Just a nightly ETL pipeline quietly rotting while executive reports continued to render yesterday’s stale data as if nothing were wrong.

If that story makes you wince, you have felt the sharp edge of cron-based data pipelines. Cron is brilliant at one thing: running a command at a specific moment. It has absolutely no opinion about whether the command worked, whether its upstream dependencies finished, whether it should retry, whether a downstream job now has stale inputs, or whether a human should be woken up. For a single script running on a single box, cron is fine. For a data platform that spans Postgres, S3, Snowflake, Kafka, and a dozen internal services, cron is a trap that punishes you the moment complexity arrives.

This is exactly the problem Apache Airflow was built to solve. Airflow is a workflow orchestration platform that lets you define data pipelines as Python code, schedule them, monitor them, retry them, backfill them, and reason about them as first-class engineering artifacts. It is now the de facto standard for batch orchestration at companies ranging from Airbnb (where it was born) to Netflix, Stripe, Robinhood, and countless startups that have graduated from bash and cron.

In this guide, we will walk through everything you need to operate Airflow in production. We will write real DAGs, use the modern TaskFlow API, wire up sensors and branches, compare executors, and build a complete ETL pipeline that pulls from Postgres, transforms with pandas, and lands the result in S3 and Snowflake. By the end, you will understand not just how to write Airflow code, but how to design pipelines that are observable, idempotent, and safe to rerun at 3:47 a.m. when something breaks.

Key Takeaway: Cron executes commands. Airflow orchestrates workflows. The difference is retries, dependencies, backfills, visibility, and a complete web UI that tells you exactly what failed and why.

Why Orchestration Matters

Before we get tactical, let’s clarify why orchestration is a distinct discipline. A modern data pipeline is rarely a single script. It is a directed graph of dozens or hundreds of steps that must run in the right order, survive partial failures, rerun cleanly after bugs are fixed, and emit telemetry that humans and monitoring systems can act on. Cron treats each step as an island. Airflow treats the graph as the primary object.

Consider a typical data team’s nightly workload: ingest raw events from Kafka, land them in S3, validate schemas, run dbt models against Snowflake, compute marketing attribution, refresh ML features, push dashboards to Looker, and email a summary. That is seven-plus stages, each with its own upstream dependencies, retry semantics, SLAs, and failure modes. Hand-rolling this with cron and shell scripts means hand-rolling a distributed system. Airflow gives you that distributed system for free.

Cron vs Airflow: A Direct Comparison

Capability Cron Airflow
Dependency management None Native DAGs
Automatic retries DIY Built-in per task
Failure alerts Silent by default Email, Slack, PagerDuty
Backfill historical runs Manual scripting One CLI command
Web UI for debugging Log files only Full graph + logs + Gantt
Parallelism Single host Celery, Kubernetes
Code as source of truth crontab files Python, Git, PRs
Secrets management Env vars or worse Connections, Secrets backends

 

The bottom row is the one that matters most as teams grow. When your pipelines live in Python and Git, they become reviewable, testable, and versioned. When they live in a crontab -e buffer on someone’s laptop, they become a liability. Airflow turns operational automation into a software engineering practice.

Core Concepts: DAGs, Tasks, Operators, and More

Airflow has a small vocabulary that repays careful study. Understand these eight words and most of the documentation falls into place.

  • DAG (Directed Acyclic Graph): The pipeline itself. A collection of tasks with directional dependencies and no cycles. Every DAG has a schedule, a start date, and a set of default arguments.
  • Task: A single unit of work within a DAG. Tasks are instances of operators.
  • Operator: A template for a kind of work. BashOperator runs a shell command, PythonOperator calls a Python function, SnowflakeOperator runs SQL, and so on.
  • Sensor: A special operator that waits for a condition to become true — a file landing in S3, a partition appearing in Hive, a row showing up in a database.
  • XCom (Cross-Communication): A lightweight mechanism for tasks to exchange small pieces of data (keys, filenames, row counts). Not for large payloads.
  • Hook: A reusable client for an external system (Postgres, S3, Snowflake). Operators use hooks under the hood. You can also use hooks directly inside Python callables.
  • Connection: Stored credentials and endpoint metadata for an external system, managed in the Airflow UI or via a secrets backend.
  • Variable: A globally accessible key-value pair for non-secret configuration (think feature flags or environment identifiers).
Tip: Use Connections for anything with a password. Use Variables for configuration. Use XCom for small return values. Never store bulk data in XCom — push it to S3 or a database and pass the URI instead.

Airflow Architecture at a Glance

Before we write code, it helps to see how Airflow’s moving parts fit together. The scheduler parses your DAG files, decides what should run, and queues work. The executor picks up queued tasks and sends them to workers. The metadata database is the single source of truth for state. The web server renders the UI and API on top of the metadata DB.

Apache Airflow Architecture DAG Folder Python files (Git-synced) Scheduler Parses DAGs Queues tasks Web Server UI / REST API Flask + Gunicorn Metadata DB Postgres / MySQL State, history Executor Local / Celery / Kubernetes Worker 1 Runs tasks Worker N Runs tasks

Notice how the metadata database sits at the center. Every component reads from and writes to it. That is why choosing a production-grade database (Postgres is the usual answer) and backing it up is not optional. If the metadata DB goes down, Airflow goes down.

Writing Your First DAG

Let’s write a real DAG using the modern TaskFlow API, which was introduced in Airflow 2.0 and dramatically reduces boilerplate. The old PythonOperator-heavy style still works, but TaskFlow lets you treat tasks as decorated Python functions and it passes XCom values automatically.

from __future__ import annotations

import pendulum
from airflow.decorators import dag, task


@dag(
    dag_id="hello_taskflow",
    description="A minimal TaskFlow DAG that greets the world.",
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    default_args={
        "owner": "data-eng",
        "retries": 3,
        "retry_delay": pendulum.duration(minutes=5),
    },
    tags=["tutorial", "taskflow"],
)
def hello_taskflow():

    @task
    def extract() -> dict:
        return {"greeting": "hello", "subject": "world"}

    @task
    def transform(payload: dict) -> str:
        return f"{payload['greeting'].upper()}, {payload['subject'].title()}!"

    @task
    def load(message: str) -> None:
        print(f"Final message: {message}")

    payload = extract()
    message = transform(payload)
    load(message)


hello_taskflow()

Drop that file into your dags/ folder and within a minute the scheduler will pick it up. The UI will show three tasks wired in a line. Notice what we did not have to do: we never called set_upstream, never declared XCom keys, never wrote a PythonOperator(python_callable=...) line. TaskFlow inferred dependencies from the function call graph and serialized return values through XCom automatically.

Tip: Always set catchup=False unless you genuinely want Airflow to run every missed schedule interval from start_date to now. Forgetting this will cause your DAG to unleash a flood of historical runs the moment you deploy it.

Operators You Will Actually Use

Airflow ships with hundreds of operators across dozens of provider packages. In practice most pipelines are built from a small, stable set. Let’s walk through the ones you will reach for daily.

BashOperator

The old reliable. It runs a shell command. Useful for invoking CLI tools, running dbt run, or shelling out when Python bindings are unavailable.

from airflow.operators.bash import BashOperator

run_dbt = BashOperator(
    task_id="run_dbt_models",
    bash_command="cd /opt/dbt/project && dbt run --select tag:daily --profiles-dir .",
    env={"DBT_TARGET": "prod"},
)

PythonOperator and @task

Any time a plain shell command will not do, reach for Python. With TaskFlow this is just @task. With the legacy API it looks like this:

from airflow.operators.python import PythonOperator

def compute_attribution(**context):
    ds = context["ds"]  # logical date as YYYY-MM-DD
    print(f"Computing attribution for {ds}")

compute = PythonOperator(
    task_id="compute_attribution",
    python_callable=compute_attribution,
)

KubernetesPodOperator

For heavy, resource-isolated work, spin up a fresh pod for each task. This is the cleanest way to run untrusted code, GPU workloads, or binaries that conflict with Airflow’s Python environment.

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s

train_model = KubernetesPodOperator(
    task_id="train_churn_model",
    name="churn-trainer",
    namespace="ml-jobs",
    image="registry.example.com/ml/churn-trainer:2.4.1",
    cmds=["python", "train.py"],
    arguments=["--date", "{{ ds }}"],
    container_resources=k8s.V1ResourceRequirements(
        requests={"cpu": "2", "memory": "8Gi"},
        limits={"cpu": "4", "memory": "16Gi", "nvidia.com/gpu": "1"},
    ),
    get_logs=True,
    is_delete_operator_pod=True,
)

DockerOperator

Similar idea without Kubernetes. If your workers can reach a Docker daemon, you can run each task inside a container. We cover container fundamentals in detail in our Docker containers explained guide and the production-oriented dev-to-production Docker guide.

from airflow.providers.docker.operators.docker import DockerOperator

score_model = DockerOperator(
    task_id="score_leads",
    image="registry.example.com/ml/lead-scorer:1.0.0",
    command="python score.py --date {{ ds }}",
    network_mode="bridge",
    auto_remove=True,
    mount_tmp_dir=False,
)

SnowflakeOperator

For data warehouse work. Stores the connection in Airflow’s Connections, executes SQL, and emits rich logs.

from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

refresh_revenue_mart = SnowflakeOperator(
    task_id="refresh_revenue_mart",
    snowflake_conn_id="snowflake_prod",
    sql="""
        MERGE INTO analytics.revenue_daily t
        USING staging.revenue_daily s
        ON t.date_key = s.date_key
        WHEN MATCHED THEN UPDATE SET t.revenue = s.revenue
        WHEN NOT MATCHED THEN INSERT (date_key, revenue) VALUES (s.date_key, s.revenue);
    """,
)

S3Hook

Hooks are the programmatic cousin of operators. Use them inside Python callables when you need fine-grained control. For broader context on choosing between object stores, columnar warehouses, and time-series engines, see our databases comparison guide.

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

@task
def upload_parquet(local_path: str, key: str) -> str:
    hook = S3Hook(aws_conn_id="aws_default")
    hook.load_file(
        filename=local_path,
        key=key,
        bucket_name="acme-data-lake",
        replace=True,
    )
    return f"s3://acme-data-lake/{key}"

Sensors and Trigger Rules

Sensors are how Airflow waits for the world. A sensor is just an operator with a poke() method that returns True or False; the task stays running until poke() returns True or the timeout fires. Modern Airflow supports deferrable sensors that release their worker slot while waiting, which matters enormously at scale.

S3KeySensor

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_for_export = S3KeySensor(
    task_id="wait_for_crm_export",
    bucket_key="s3://acme-data-lake/crm/export/{{ ds }}/manifest.json",
    aws_conn_id="aws_default",
    poke_interval=60,
    timeout=60 * 60 * 6,  # 6 hours
    mode="reschedule",    # free the slot between pokes
)

FileSensor

from airflow.sensors.filesystem import FileSensor

wait_for_trigger = FileSensor(
    task_id="wait_for_trigger_file",
    filepath="/mnt/shared/triggers/{{ ds }}.ready",
    poke_interval=30,
    timeout=60 * 30,
)

ExternalTaskSensor

Cross-DAG dependencies. Use sparingly — they couple DAGs tightly — but they are invaluable when one pipeline genuinely must not run until another has finished.

from airflow.sensors.external_task import ExternalTaskSensor

wait_for_raw = ExternalTaskSensor(
    task_id="wait_for_raw_ingest",
    external_dag_id="raw_ingest",
    external_task_id="load_done",
    allowed_states=["success"],
    failed_states=["failed", "skipped"],
    poke_interval=120,
    timeout=60 * 60 * 3,
    mode="reschedule",
)

Trigger Rules

Every task has a trigger rule that decides whether it runs given the state of its upstream tasks. The default is all_success, but there are useful alternatives.

Trigger Rule Runs When
all_success All upstream tasks succeeded (default)
all_failed All upstream failed (useful for cleanup)
all_done All upstream finished regardless of state
one_success At least one upstream succeeded
none_failed No upstream failed (succeeded or skipped)
none_failed_min_one_success Typical rule for tasks after a branch

 

Scheduling, Data Intervals, and Backfills

Scheduling is where Airflow beginners get tripped up most often. The mental model is different from cron. Airflow schedules intervals, not instants. A DAG with schedule="@daily" and a start_date of 2026-01-01 produces its first run at the end of 2026-01-01, covering the data interval [2026-01-01 00:00, 2026-01-02 00:00). The run’s logical_date is 2026-01-01, but wall-clock execution happens on 2026-01-02.

This matters because every template variable — {{ ds }}, {{ data_interval_start }}, {{ data_interval_end }} — refers to the interval the run represents, not the moment it runs. Build your pipelines to process the interval, not “today”, and backfills become trivial.

Schedule Options

# Cron expression
schedule="0 2 * * *"          # 2 a.m. UTC daily

# Presets
schedule="@hourly"
schedule="@daily"
schedule="@weekly"

# timedelta (relative)
from datetime import timedelta
schedule=timedelta(hours=6)

# Dataset-driven (event-based)
from airflow.datasets import Dataset
raw_events = Dataset("s3://acme-data-lake/raw/events/")
schedule=[raw_events]

# No schedule (manual/triggered only)
schedule=None

Backfill

Need to reprocess January because you found a bug? One command:

airflow dags backfill \
  --start-date 2026-01-01 \
  --end-date 2026-01-31 \
  --reset-dagruns \
  daily_revenue_pipeline
Caution: Backfills only work correctly if your tasks are idempotent. A task that appends rows will duplicate data on a rerun. A task that uses MERGE or writes to a date-partitioned key will not. We cover this in more depth in the best practices section.

Dependencies, Branching, and Short-Circuiting

Real pipelines are not straight lines. You may want to run different downstream paths depending on the day of week, skip a branch entirely if there is no new data, or fan out into parallel tasks and fan back in.

BranchPythonOperator

from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator

def choose_path(**context):
    execution_date = context["logical_date"]
    if execution_date.weekday() == 0:  # Monday
        return "run_weekly_rollup"
    return "skip_weekly"

branch = BranchPythonOperator(
    task_id="branch_on_weekday",
    python_callable=choose_path,
)

weekly = EmptyOperator(task_id="run_weekly_rollup")
skip   = EmptyOperator(task_id="skip_weekly")
join   = EmptyOperator(task_id="join", trigger_rule="none_failed_min_one_success")

branch >> [weekly, skip] >> join

ShortCircuitOperator

If a condition is false, skip everything downstream. Great for “no new data, no work” patterns.

from airflow.operators.python import ShortCircuitOperator

def has_new_rows(**context):
    hook = PostgresHook(postgres_conn_id="warehouse")
    count = hook.get_first(
        "SELECT COUNT(*) FROM raw.events WHERE event_date = %s",
        parameters=(context["ds"],),
    )[0]
    return count > 0

gate = ShortCircuitOperator(
    task_id="only_if_new_data",
    python_callable=has_new_rows,
)

Visualizing a DAG

Here is what a representative ETL DAG looks like — fan-out at ingest, a branch for weekend-only work, and a fan-in for publishing.

Sample ETL DAG wait_for_export extract_pg extract_s3 extract_kafka transform branch load_snowflake load_s3 weekly_rollup publish_dashboard

XCom: Passing Data Between Tasks

XCom is Airflow’s built-in mechanism for tasks to pass small messages. Under the hood it is a row in the metadata database with a serialized value. That detail is crucial: XCom is not a data pipe. It is a message bus. Anything more than a few kilobytes should go to S3 or a database, and only the pointer goes through XCom.

@task
def stage_batch(**context) -> dict:
    # ... write a CSV to S3 ...
    return {
        "s3_key": f"staging/{context['ds']}/batch.csv",
        "row_count": 128_432,
        "checksum": "a3f9...",
    }

@task
def load_batch(manifest: dict):
    print(f"Loading {manifest['row_count']} rows from {manifest['s3_key']}")

manifest = stage_batch()
load_batch(manifest)

For large intermediate artifacts, consider a custom XCom backend that transparently stores values in S3 or GCS, returning only a URI. This keeps the metadata DB small and your XCom usage consistent.

Deployment Architectures and Executors

The executor determines how tasks are physically run. Pick the wrong one and you will fight Airflow forever. Pick the right one and scaling becomes a non-event.

Executor Good For Avoid When
SequentialExecutor Local dev, SQLite backend Anything production
LocalExecutor Small teams, single VM, <50 concurrent tasks You need horizontal scale
CeleryExecutor Medium/large deployments with stable workers Spiky workloads, heterogeneous resources
KubernetesExecutor Cloud-native orgs, isolated tasks, autoscaling You have no k8s expertise
CeleryKubernetesExecutor Mixed workloads: steady Celery + burst k8s Ops budget is limited

 

For most new installations in 2026, KubernetesExecutor on managed k8s (EKS, GKE, AKS) is the pragmatic default. Each task gets a fresh pod with its own resources, failure isolation is automatic, and autoscaling comes from the cluster itself. The downside is pod startup overhead — usually 5 to 20 seconds — which is irrelevant for multi-minute tasks but brutal for thousands of sub-second tasks.

Best Practices for Production

Airflow gives you enough rope to build a beautiful garden or hang yourself. These are the practices that separate teams with 10-year-old Airflow deployments from teams that rebuild theirs every 18 months.

Make Every Task Idempotent

Running a task twice for the same logical date must produce the same result. This means using MERGE instead of INSERT, writing to partitioned paths keyed on {{ ds }}, and deleting-then-inserting within a transaction. Idempotency is the single most important property of a production pipeline because it is what makes retries and backfills safe. The broader principle — write code that other people (including future you) can reason about — is covered in our clean code principles guide.

Keep Tasks Small and Atomic

A task that does one thing is a task you can retry, debug, and reason about. A task that does six things is a task that fails halfway through and leaves you guessing which steps completed.

Use Pools and SLAs

Pools cap the number of concurrent tasks hitting a shared resource (e.g., five slots for your overloaded production Postgres). SLAs let Airflow raise an alarm when a task takes longer than expected.

extract = SnowflakeOperator(
    task_id="extract_large_mart",
    snowflake_conn_id="snowflake_prod",
    sql="...",
    pool="snowflake_heavy",  # defined in UI: 3 slots
    sla=pendulum.duration(minutes=30),
)

Wire Up Alerts Early

Use on_failure_callback and on_retry_callback to post to Slack, open PagerDuty incidents, or file Jira tickets. A silent failure is strictly worse than a loud one.

def notify_slack(context):
    ti = context["task_instance"]
    message = (
        f":rotating_light: *{ti.dag_id}.{ti.task_id}* failed "
        f"on {context['ds']} (try {ti.try_number})"
    )
    SlackWebhookHook(slack_webhook_conn_id="slack_alerts").send(text=message)

default_args = {
    "owner": "data-eng",
    "retries": 3,
    "retry_delay": pendulum.duration(minutes=5),
    "on_failure_callback": notify_slack,
}

Treat DAGs Like Software

Pull requests, code review, unit tests for your Python callables, integration tests with airflow dags test. If you are not familiar with modern Git workflows, our Git and GitHub best practices article will save you weeks of pain.

Common Pitfalls to Avoid

These are the mistakes I see over and over again. Memorize them and you will skip a lot of incidents.

Caution — Top-level code: Any code at the top level of a DAG file runs every time the scheduler parses the file, which can be every 30 seconds. A requests.get(...) at module scope will hammer the API and slow your scheduler to a crawl. Keep top-level code minimal — only DAG definitions, imports, and cheap literals.
Caution — Context dependency: Writing tasks that assume “now” instead of {{ data_interval_start }} makes backfills meaningless. Use the interval variables religiously.
Caution — Variable overuse: Variable.get() hits the metadata DB. Calling it at top level of a DAG file once per parse cycle will melt your database. Use Variable.get(..., default_var=...) inside callables, or use Jinja templating ({{ var.value.my_key }}), which is lazily resolved.

Other frequent mistakes: not setting catchup=False, hardcoding credentials instead of using Connections, writing huge XCom payloads, running all tasks under one executor when one slow task is blocking everything else, and ignoring DAG parsing time (the UI exposes this under Admin → DAG Processor).

A Complete Production ETL Example

Let’s tie everything together with a realistic daily ETL that pulls orders from Postgres, transforms them with pandas, writes Parquet to S3, and merges into Snowflake. This is the kind of pipeline you might see feeding a revenue dashboard. If your workflow also needs streaming ingestion, take a look at our Kafka producer guide and Kafka consumer guide to see how Airflow batch jobs complement real-time pipelines.

from __future__ import annotations

import tempfile
from pathlib import Path

import pandas as pd
import pendulum
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.python import ShortCircuitOperator


DEFAULT_ARGS = {
    "owner": "data-eng",
    "retries": 3,
    "retry_delay": pendulum.duration(minutes=5),
    "sla": pendulum.duration(hours=2),
}


@dag(
    dag_id="daily_revenue_pipeline",
    description="Extract orders from Postgres, transform, land in S3, merge into Snowflake.",
    schedule="0 2 * * *",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    max_active_runs=1,
    default_args=DEFAULT_ARGS,
    tags=["etl", "revenue", "daily"],
)
def daily_revenue_pipeline():

    wait_for_crm = S3KeySensor(
        task_id="wait_for_crm_export",
        bucket_key="s3://acme-data-lake/crm/export/{{ ds }}/manifest.json",
        aws_conn_id="aws_default",
        poke_interval=120,
        timeout=60 * 60 * 4,
        mode="reschedule",
    )

    def _has_orders(**context):
        hook = PostgresHook(postgres_conn_id="orders_pg")
        count = hook.get_first(
            "SELECT COUNT(*) FROM public.orders "
            "WHERE created_at::date = %s",
            parameters=(context["ds"],),
        )[0]
        print(f"Found {count} orders for {context['ds']}")
        return count > 0

    gate = ShortCircuitOperator(
        task_id="skip_if_no_orders",
        python_callable=_has_orders,
    )

    @task
    def extract_orders(**context) -> str:
        """Pull the day's orders into a local CSV. Return the path."""
        ds = context["ds"]
        hook = PostgresHook(postgres_conn_id="orders_pg")
        sql = """
            SELECT order_id, customer_id, sku, quantity,
                   unit_price, currency, created_at
            FROM public.orders
            WHERE created_at >= %(start)s::timestamptz
              AND created_at <  %(end)s::timestamptz
        """
        df = hook.get_pandas_df(
            sql,
            parameters={
                "start": f"{ds} 00:00:00+00",
                "end":   f"{ds} 24:00:00+00",
            },
        )
        tmp = Path(tempfile.mkdtemp()) / f"orders_{ds}.parquet"
        df.to_parquet(tmp, index=False)
        return str(tmp)

    @task
    def transform(local_path: str, **context) -> str:
        """Compute revenue in USD and enrich with date dimensions."""
        df = pd.read_parquet(local_path)
        fx = {"USD": 1.0, "EUR": 1.08, "GBP": 1.27, "KRW": 0.00072}
        df["revenue_usd"] = (
            df["quantity"] * df["unit_price"] * df["currency"].map(fx).fillna(1.0)
        )
        df["order_date"] = pd.to_datetime(df["created_at"]).dt.date
        df = df.drop(columns=["created_at"])

        out = Path(local_path).with_name(f"transformed_{context['ds']}.parquet")
        df.to_parquet(out, index=False)
        return str(out)

    @task
    def upload_to_s3(local_path: str, **context) -> str:
        ds = context["ds"]
        key = f"warehouse/revenue/dt={ds}/part-000.parquet"
        S3Hook(aws_conn_id="aws_default").load_file(
            filename=local_path,
            key=key,
            bucket_name="acme-data-lake",
            replace=True,
        )
        return f"s3://acme-data-lake/{key}"

    merge_snowflake = SnowflakeOperator(
        task_id="merge_into_revenue_fact",
        snowflake_conn_id="snowflake_prod",
        sql="""
            BEGIN;

            CREATE OR REPLACE TEMPORARY TABLE staging_revenue AS
            SELECT $1:order_id::STRING      AS order_id,
                   $1:customer_id::STRING   AS customer_id,
                   $1:sku::STRING           AS sku,
                   $1:quantity::NUMBER      AS quantity,
                   $1:revenue_usd::FLOAT    AS revenue_usd,
                   $1:order_date::DATE      AS order_date
            FROM @acme_lake/warehouse/revenue/dt={{ ds }}/
                 (FILE_FORMAT => parquet_fmt);

            DELETE FROM analytics.fact_revenue
            WHERE order_date = '{{ ds }}';

            INSERT INTO analytics.fact_revenue
            SELECT * FROM staging_revenue;

            COMMIT;
        """,
    )

    @task
    def publish_metrics(**context):
        hook = PostgresHook(postgres_conn_id="metadata_pg")
        hook.run(
            """
            INSERT INTO ops.pipeline_runs (pipeline, run_date, status, finished_at)
            VALUES (%s, %s, 'success', now())
            """,
            parameters=("daily_revenue_pipeline", context["ds"]),
        )

    raw  = extract_orders()
    xfm  = transform(raw)
    uri  = upload_to_s3(xfm)

    wait_for_crm >> gate >> raw
    uri >> merge_snowflake >> publish_metrics()


daily_revenue_pipeline()

Read that code carefully. Notice that every task is idempotent (the Snowflake MERGE deletes the day’s partition before reinserting, the S3 key is deterministic, Postgres extract is bounded by an interval). Notice that we short-circuit when there is nothing to do. Notice the SLA, the retries, the max_active_runs=1 to prevent overlapping runs. Notice that we pass only paths and URIs through XCom — never the data itself.

For a deeper look at moving time-series data through a full modern stack, see our InfluxDB to AWS Iceberg pipeline guide. And if you would rather do complex event processing in-stream than in batch, the Flink CEP guide is a strong companion.

Monitoring and Observability

Airflow’s web UI is already a gift — the Graph view shows you the DAG, the Gantt chart shows you how long each task took, and Task Duration trends highlight regressions. But for real production you need more.

Task Lifecycle States

Understanding the task state machine is the foundation of debugging. Here are the transitions every task goes through.

Task Instance Lifecycle scheduled queued running success failed up_for_retry up_for_reschedule skipped retry after delay exception caught sensor poke=False branch not chosen

Metrics and Logs

Airflow emits StatsD metrics out of the box — scheduler heartbeat, task duration, DAG parsing time, pool usage. Scrape these with Prometheus via a StatsD exporter and build Grafana dashboards. For logs, configure a remote logging backend (S3, GCS, Elasticsearch) so worker pods can die without taking their history with them.

# airflow.cfg
[metrics]
statsd_on = True
statsd_host = statsd-exporter.monitoring.svc
statsd_port = 9125
statsd_prefix = airflow

[logging]
remote_logging = True
remote_base_log_folder = s3://acme-airflow-logs/
remote_log_conn_id = aws_default
Key Takeaway: The four golden signals for Airflow monitoring are scheduler heartbeat, DAG parsing time, task queue depth, and SLA misses. Alert on all four. Everything else is detail.

Frequently Asked Questions

Airflow vs cron — when is it overkill?

If you have fewer than five scheduled scripts, they run on one box, they never depend on each other, and nobody cares if they fail silently, cron is fine. The moment you need dependencies, retries, alerts, backfills, or visibility across a team, Airflow pays for itself within weeks.

Airflow vs Prefect vs Dagster — which should I pick?

Airflow has the biggest ecosystem, the most provider packages, and the most battle-tested scaling story. Prefect is more Pythonic and has an elegant local dev story. Dagster emphasizes software-defined assets and data lineage, which is appealing if you think in datasets rather than tasks. For most teams in 2026, Airflow is still the safest bet because hiring and community support are unmatched, but Dagster is a strong choice for greenfield data platforms that want asset-centric semantics from day one.

How do I handle long-running tasks?

First, ask whether the task actually needs to live inside Airflow. If it is a 12-hour Spark job, Airflow should trigger it (via SparkSubmitOperator or an EMR/Databricks operator) and wait for completion via a deferrable sensor, not run the work itself. Deferrable operators and sensors suspend the task to the triggerer process, freeing the worker slot entirely. That way one Airflow worker can babysit thousands of long-running external jobs at once.

What is the best way to deploy Airflow in production?

For most teams, managed Airflow (Astronomer, AWS MWAA, Google Cloud Composer) is worth the money — it removes the operational burden of running the scheduler, metadata DB, and executor infrastructure. If you self-host, run on Kubernetes with the official Helm chart, use KubernetesExecutor, back the metadata DB with a managed Postgres (RDS or Cloud SQL), ship logs to S3/GCS, and scrape metrics to Prometheus. Pin every provider package version and treat upgrades as real projects, not Tuesday afternoon activities.

How do I handle secrets in Airflow?

Never put credentials directly in DAG code or Airflow Variables. Use Airflow Connections, and back them with a secrets manager: AWS Secrets Manager, HashiCorp Vault, GCP Secret Manager, or Azure Key Vault. Configure the secrets_backend in airflow.cfg so Airflow transparently fetches connections and variables at runtime. That way secrets live in a dedicated, audited system and never touch the metadata DB.

Conclusion

Airflow is not a magic wand. It will not fix a bad data model, it will not make your SQL faster, and it will not paper over a team that does not practice code review. What it will do is turn your data pipelines from a fragile web of scripts into a first-class software system with dependencies, retries, backfills, alerts, and an auditable history. The difference is the difference between writing a midnight log file to the void and running a platform you can actually trust.

Start small. Pick one brittle cron job and move it to Airflow this week. Get comfortable with the TaskFlow API, the data interval mental model, and the Graph view. Wire up Slack alerts before you wire up anything else. Add retries and pools. Then graduate to KubernetesExecutor and deferrable sensors as your workload grows. The language you write along the way — DAGs, tasks, operators, sensors — is the same language used by thousands of data teams worldwide, which means the skills you build transfer everywhere. For complementary deep dives on the broader data ecosystem, check our guides on Python versus Rust for choosing the right language for your pipeline’s hottest paths and the time-series databases comparison for picking the right sink.

Related Reading:

References

You Might Also Like

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *