Home Programming Apache Airflow for Data Pipeline Orchestration: A Practical Guide

Apache Airflow for Data Pipeline Orchestration: A Practical Guide

Last updated: May 27, 2026
k
Published April 14, 2026 · Updated May 27, 2026 · 29 min read

Summary

What this post covers: A production-focused walkthrough of Apache Airflow for data engineers who are replacing cron-based pipelines. The discussion covers DAGs, operators, sensors, executors, the TaskFlow API, and a complete end-to-end ETL example that lands data from Postgres into S3 and Snowflake.

Key insights:

  • The contrast between cron and Airflow is not a matter of additional features. It is the difference between executing isolated commands and orchestrating a directed graph with dependencies, retries, backfills, alerting, and a debuggable web UI.
  • Idempotency is the single most important property of a production pipeline. Every task must produce the same result when re-run for the same logical date, which is what makes retries and backfills safe.
  • The choice of executor (LocalExecutor, CeleryExecutor, or KubernetesExecutor) is the most important scaling decision and should be driven by task isolation needs and infrastructure, rather than by Airflow features.
  • The most damaging anti-pattern is heavy top-level code in DAG files. The scheduler re-parses files approximately every 30 seconds, so a single module-scope HTTP call can degrade throughput across the entire deployment.
  • Production reliability derives from a small set of patterns applied consistently: small atomic tasks, pools for shared-resource limits, SLAs for time budgets, an on_failure_callback wired to Slack or PagerDuty, and DAGs that are treated as code with reviews and tests.

Main topics: why orchestration matters, core concepts (DAGs, tasks, operators), Airflow architecture, writing your first DAG, operators in practice, sensors and trigger rules, scheduling and backfills, branching and short-circuiting, XCom, deployment architectures and executors, best practices for production, common pitfalls, a complete production ETL example, and monitoring and observability.

The Limitations of Cron-Based Pipelines

This post examines the use of Apache Airflow as a workflow orchestration platform for data pipelines and contrasts it with the limitations of cron-based scheduling. The discussion is intended for data engineers who are moving from ad hoc cron jobs to managed orchestration and who require an understanding of the operational considerations involved.

The recurrent failure mode for cron-based pipelines can be summarised as follows. A nightly ETL job fails because of a transient database error and produces a single line in a log such as psql: FATAL: connection refused. The job receives no retry, generates no alert, and emits no visible signal that anything is wrong. Downstream dashboards continue to render stale data for days. The problem is not the failure itself, which is an ordinary operational event, but the absence of orchestration around it.

Cron is well suited to one task: running a command at a specific moment. It has no opinion about whether the command succeeded, whether its upstream dependencies completed, whether it should retry, whether a downstream job now has stale inputs, or whether a human should be notified. For a single script running on a single host, cron is adequate. For a data platform that spans Postgres, S3, Snowflake, Kafka, and a dozen internal services, cron becomes a liability once complexity increases.

This is the problem that Apache Airflow was designed to solve. Airflow is a workflow orchestration platform that allows data pipelines to be defined as Python code, scheduled, monitored, retried, backfilled, and treated as first-class engineering artefacts. It is now the de facto standard for batch orchestration at organisations ranging from Airbnb (where it was developed) to Netflix, Stripe, and Robinhood, as well as many smaller teams that have transitioned away from bash and cron.

The remainder of this post examines what is required to operate Airflow in production. It develops real DAGs using the modern TaskFlow API, sets up sensors and branches, compares executors, and constructs a complete ETL pipeline that extracts from Postgres, transforms with pandas, and loads the result into S3 and Snowflake. By the end, the reader will understand not only how to write Airflow code but also how to design pipelines that are observable, idempotent, and safe to rerun when failures occur.

Key Takeaway: Cron executes commands. Airflow orchestrates workflows. The difference lies in retries, dependencies, backfills, visibility, and a complete web UI that indicates precisely what failed and why.

Why Orchestration Matters

Before turning to practice, it is useful to clarify why orchestration is a distinct discipline. A modern data pipeline is rarely a single script. It is a directed graph of dozens or hundreds of steps that must run in the correct order, survive partial failures, rerun cleanly after bugs are fixed, and emit telemetry that humans and monitoring systems can act upon. Cron treats each step as an isolated unit. Airflow treats the graph itself as the primary object.

Consider a typical nightly workload for a data team: ingest raw events from Kafka, land them in S3, validate schemas, run dbt models against Snowflake, compute marketing attribution, refresh ML features, push dashboards to Looker, and email a summary. The workflow comprises seven or more stages, each with its own upstream dependencies, retry semantics, SLAs, and failure modes. Implementing this manually with cron and shell scripts amounts to building a distributed system by hand. Airflow provides that distributed system without bespoke implementation.

Cron and Airflow Compared

Capability Cron Airflow
Dependency management None Native DAGs
Automatic retries DIY Built-in per task
Failure alerts Silent by default Email, Slack, PagerDuty
Backfill historical runs Manual scripting One CLI command
Web UI for debugging Log files only Full graph + logs + Gantt
Parallelism Single host Celery, Kubernetes
Code as source of truth crontab files Python, Git, PRs
Secrets management Env vars or worse Connections, Secrets backends

 

The final row is the one that becomes most important as teams grow. When pipelines reside in Python and Git, they become reviewable, testable, and versioned. When they reside in a crontab -e buffer on a single person’s machine, they become a liability. Airflow transforms operational automation into a software engineering practice.

Core Concepts: DAGs, Tasks, Operators, and Related Terms

Airflow has a small vocabulary that repays careful study. An understanding of these eight terms allows most of the documentation to be readily understood.

  • DAG (Directed Acyclic Graph): the pipeline itself, namely a collection of tasks with directional dependencies and no cycles. Every DAG has a schedule, a start date, and a set of default arguments.
  • Task: a single unit of work within a DAG. Tasks are instances of operators.
  • Operator: a template for a particular kind of work. BashOperator runs a shell command, PythonOperator calls a Python function, SnowflakeOperator runs SQL, and so on.
  • Sensor: a special operator that waits for a condition to become true, such as a file arriving in S3, a partition appearing in Hive, or a row appearing in a database.
  • XCom (Cross-Communication): a lightweight mechanism for tasks to exchange small pieces of data, such as keys, filenames, and row counts. It is not intended for large payloads.
  • Hook: a reusable client for an external system (Postgres, S3, or Snowflake). Operators use hooks internally. Hooks can also be used directly inside Python callables.
  • Connection: stored credentials and endpoint metadata for an external system, managed in the Airflow UI or through a secrets backend.
  • Variable: a globally accessible key-value pair for non-secret configuration, such as feature flags or environment identifiers.
Tip: Connections should be used for anything that involves a password. Variables should be used for configuration. XCom should be used for small return values. Bulk data should never be stored in XCom; it should be written to S3 or a database, and only the URI should be passed.

Airflow Architecture at a Glance

Before writing code, it is helpful to consider how Airflow’s components interact. The scheduler parses the DAG files, determines what should run, and queues work. The executor picks up queued tasks and dispatches them to workers. The metadata database is the single source of truth for state. The web server renders the UI and API on top of the metadata database.

Apache Airflow Architecture DAG Folder Python files (Git-synced) Scheduler Parses DAGs Queues tasks Web Server UI / REST API Flask + Gunicorn Metadata DB Postgres / MySQL State, history Executor Local / Celery / Kubernetes Worker 1 Runs tasks Worker N Runs tasks

The metadata database sits at the centre of the architecture. Every component both reads from and writes to it. Selecting a production-grade database (Postgres is the standard choice) and maintaining backups is therefore not optional. If the metadata database becomes unavailable, Airflow becomes unavailable.

Writing a First DAG

The following example uses the modern TaskFlow API, which was introduced in Airflow 2.0 and substantially reduces boilerplate. The earlier PythonOperator-heavy style still works, but TaskFlow allows tasks to be treated as decorated Python functions and passes XCom values automatically.

from __future__ import annotations

import pendulum
from airflow.decorators import dag, task


@dag(
    dag_id="hello_taskflow",
    description="A minimal TaskFlow DAG that greets the world.",
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    default_args={
        "owner": "data-eng",
        "retries": 3,
        "retry_delay": pendulum.duration(minutes=5),
    },
    tags=["tutorial", "taskflow"],
)
def hello_taskflow():

    @task
    def extract() -> dict:
        return {"greeting": "hello", "subject": "world"}

    @task
    def transform(payload: dict) -> str:
        return f"{payload['greeting'].upper()}, {payload['subject'].title()}!"

    @task
    def load(message: str) -> None:
        print(f"Final message: {message}")

    payload = extract()
    message = transform(payload)
    load(message)


hello_taskflow()

The file is placed in the dags/ folder, and within a minute the scheduler will pick it up. The UI will display three tasks wired in sequence. Several actions were not required: set_upstream was not called, XCom keys were not declared, and no PythonOperator(python_callable=...) line was written. TaskFlow inferred dependencies from the function call graph and serialised return values through XCom automatically.

Tip: catchup=False should always be set, unless Airflow is genuinely required to run every missed schedule interval from start_date to the present. Omitting this setting will cause the DAG to launch a large number of historical runs the moment it is deployed.

Operators in Common Use

Airflow includes hundreds of operators across dozens of provider packages. In practice, most pipelines are built from a small and stable subset. The operators in regular daily use are described below.

BashOperator

This is a reliable, widely used operator that runs a shell command. It is useful for invoking CLI tools, running dbt run, or executing external programs when Python bindings are not available.

from airflow.operators.bash import BashOperator

run_dbt = BashOperator(
    task_id="run_dbt_models",
    bash_command="cd /opt/dbt/project && dbt run --select tag:daily --profiles-dir .",
    env={"DBT_TARGET": "prod"},
)

PythonOperator and @task

When a shell command is insufficient, Python should be used. With TaskFlow this is simply @task. With the legacy API the syntax is as follows:

from airflow.operators.python import PythonOperator

def compute_attribution(**context):
    ds = context["ds"]  # logical date as YYYY-MM-DD
    print(f"Computing attribution for {ds}")

compute = PythonOperator(
    task_id="compute_attribution",
    python_callable=compute_attribution,
)

KubernetesPodOperator

For heavy, resource-isolated work, a fresh pod can be created for each task. This is the cleanest method for running untrusted code, GPU workloads, or binaries that conflict with Airflow’s Python environment.

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s

train_model = KubernetesPodOperator(
    task_id="train_churn_model",
    name="churn-trainer",
    namespace="ml-jobs",
    image="registry.example.com/ml/churn-trainer:2.4.1",
    cmds=["python", "train.py"],
    arguments=["--date", "{{ ds }}"],
    container_resources=k8s.V1ResourceRequirements(
        requests={"cpu": "2", "memory": "8Gi"},
        limits={"cpu": "4", "memory": "16Gi", "nvidia.com/gpu": "1"},
    ),
    get_logs=True,
    is_delete_operator_pod=True,
)

DockerOperator

The DockerOperator follows a similar principle without Kubernetes. If the workers can reach a Docker daemon, each task can run inside a container. Container fundamentals are covered in detail in the Docker containers explained guide and the production-oriented dev-to-production Docker guide.

from airflow.providers.docker.operators.docker import DockerOperator

score_model = DockerOperator(
    task_id="score_leads",
    image="registry.example.com/ml/lead-scorer:1.0.0",
    command="python score.py --date {{ ds }}",
    network_mode="bridge",
    auto_remove=True,
    mount_tmp_dir=False,
)

SnowflakeOperator

This operator is used for data warehouse work. It stores the connection in Airflow’s Connections, executes SQL, and produces detailed logs.

from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

refresh_revenue_mart = SnowflakeOperator(
    task_id="refresh_revenue_mart",
    snowflake_conn_id="snowflake_prod",
    sql="""
        MERGE INTO analytics.revenue_daily t
        USING staging.revenue_daily s
        ON t.date_key = s.date_key
        WHEN MATCHED THEN UPDATE SET t.revenue = s.revenue
        WHEN NOT MATCHED THEN INSERT (date_key, revenue) VALUES (s.date_key, s.revenue);
    """,
)

S3Hook

Hooks are the programmatic counterpart to operators. They are used inside Python callables when fine-grained control is required. For broader context on choosing between object stores, columnar warehouses, and time-series engines, see the databases comparison guide.

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

@task
def upload_parquet(local_path: str, key: str) -> str:
    hook = S3Hook(aws_conn_id="aws_default")
    hook.load_file(
        filename=local_path,
        key=key,
        bucket_name="acme-data-lake",
        replace=True,
    )
    return f"s3://acme-data-lake/{key}"

Sensors and Trigger Rules

Sensors are the mechanism through which Airflow waits for external conditions. A sensor is an operator with a poke() method that returns True or False; the task remains running until poke() returns True or the timeout fires. Modern Airflow supports deferrable sensors that release their worker slot while waiting, which is particularly important at scale.

S3KeySensor

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_for_export = S3KeySensor(
    task_id="wait_for_crm_export",
    bucket_key="s3://acme-data-lake/crm/export/{{ ds }}/manifest.json",
    aws_conn_id="aws_default",
    poke_interval=60,
    timeout=60 * 60 * 6,  # 6 hours
    mode="reschedule",    # free the slot between pokes
)

FileSensor

from airflow.sensors.filesystem import FileSensor

wait_for_trigger = FileSensor(
    task_id="wait_for_trigger_file",
    filepath="/mnt/shared/triggers/{{ ds }}.ready",
    poke_interval=30,
    timeout=60 * 30,
)

ExternalTaskSensor

The ExternalTaskSensor expresses cross-DAG dependencies. It should be used sparingly, because it couples DAGs tightly, but it is valuable when one pipeline genuinely must not run until another has completed.

from airflow.sensors.external_task import ExternalTaskSensor

wait_for_raw = ExternalTaskSensor(
    task_id="wait_for_raw_ingest",
    external_dag_id="raw_ingest",
    external_task_id="load_done",
    allowed_states=["success"],
    failed_states=["failed", "skipped"],
    poke_interval=120,
    timeout=60 * 60 * 3,
    mode="reschedule",
)

Trigger Rules

Every task has a trigger rule that determines whether it runs given the state of its upstream tasks. The default is all_success, but several useful alternatives are available.

Trigger Rule Runs When
all_success All upstream tasks succeeded (default)
all_failed All upstream failed (useful for cleanup)
all_done All upstream finished regardless of state
one_success At least one upstream succeeded
none_failed No upstream failed (succeeded or skipped)
none_failed_min_one_success Typical rule for tasks after a branch

 

Scheduling, Data Intervals, and Backfills

Scheduling is the area in which Airflow beginners encounter the most difficulty. The conceptual model differs from that of cron. Airflow schedules intervals rather than instants. A DAG with schedule="@daily" and a start_date of 2026-01-01 produces its first run at the end of 2026-01-01, which covers the data interval [2026-01-01 00:00, 2026-01-02 00:00). The run’s logical_date is 2026-01-01, but wall-clock execution occurs on 2026-01-02.

This distinction matters because every template variable, including {{ ds }}, {{ data_interval_start }}, and {{ data_interval_end }}, refers to the interval that the run represents, not to the moment at which the run executes. Pipelines should be built to process the interval rather than “today”, which makes backfills straightforward.

Schedule Options

# Cron expression
schedule="0 2 * * *"          # 2 a.m. UTC daily

# Presets
schedule="@hourly"
schedule="@daily"
schedule="@weekly"

# timedelta (relative)
from datetime import timedelta
schedule=timedelta(hours=6)

# Dataset-driven (event-based)
from airflow.datasets import Dataset
raw_events = Dataset("s3://acme-data-lake/raw/events/")
schedule=[raw_events]

# No schedule (manual/triggered only)
schedule=None

Backfill

If January must be reprocessed because of a discovered bug, a single command will suffice:

airflow dags backfill \
  --start-date 2026-01-01 \
  --end-date 2026-01-31 \
  --reset-dagruns \
  daily_revenue_pipeline
Caution: Backfills function correctly only when tasks are idempotent. A task that appends rows will duplicate data on a rerun, whereas a task that uses MERGE or writes to a date-partitioned key will not. This subject is treated in more detail in the best practices section.

Dependencies, Branching, and Short-Circuiting

Real pipelines are not linear. Different downstream paths may be required depending on the day of the week, a branch may need to be skipped entirely if no new data exists, or parallel tasks may need to fan out and then fan in.

BranchPythonOperator

from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator

def choose_path(**context):
    execution_date = context["logical_date"]
    if execution_date.weekday() == 0:  # Monday
        return "run_weekly_rollup"
    return "skip_weekly"

branch = BranchPythonOperator(
    task_id="branch_on_weekday",
    python_callable=choose_path,
)

weekly = EmptyOperator(task_id="run_weekly_rollup")
skip   = EmptyOperator(task_id="skip_weekly")
join   = EmptyOperator(task_id="join", trigger_rule="none_failed_min_one_success")

branch >> [weekly, skip] >> join

ShortCircuitOperator

If a condition is false, all downstream tasks are skipped. This pattern is well suited to “no new data, no work” scenarios.

from airflow.operators.python import ShortCircuitOperator

def has_new_rows(**context):
    hook = PostgresHook(postgres_conn_id="warehouse")
    count = hook.get_first(
        "SELECT COUNT(*) FROM raw.events WHERE event_date = %s",
        parameters=(context["ds"],),
    )[0]
    return count > 0

gate = ShortCircuitOperator(
    task_id="only_if_new_data",
    python_callable=has_new_rows,
)

Visualising a DAG

A representative ETL DAG is shown below, with a fan-out at ingest, a branch for weekend-only work, and a fan-in for publishing.

Sample ETL DAG wait_for_export extract_pg extract_s3 extract_kafka transform branch load_snowflake load_s3 weekly_rollup publish_dashboard

XCom: Passing Data Between Tasks

XCom is Airflow’s built-in mechanism by which tasks can exchange small messages. Internally it is a row in the metadata database that contains a serialised value. This detail is important: XCom is not a data pipe but a message bus. Anything beyond a few kilobytes should be written to S3 or a database, and only the pointer should pass through XCom.

@task
def stage_batch(**context) -> dict:
    # ... write a CSV to S3 ...
    return {
        "s3_key": f"staging/{context['ds']}/batch.csv",
        "row_count": 128_432,
        "checksum": "a3f9...",
    }

@task
def load_batch(manifest: dict):
    print(f"Loading {manifest['row_count']} rows from {manifest['s3_key']}")

manifest = stage_batch()
load_batch(manifest)

For large intermediate artefacts, a custom XCom backend that transparently stores values in S3 or GCS and returns only a URI should be considered. This approach keeps the metadata database small and ensures consistent XCom use.

Deployment Architectures and Executors

The executor determines how tasks are physically run. The wrong choice results in continual operational friction; the correct choice makes scaling routine.

Executor Good For Avoid When
SequentialExecutor Local dev, SQLite backend Anything production
LocalExecutor Small teams, single VM, <50 concurrent tasks You need horizontal scale
CeleryExecutor Medium/large deployments with stable workers Spiky workloads, heterogeneous resources
KubernetesExecutor Cloud-native orgs, isolated tasks, autoscaling You have no k8s expertise
CeleryKubernetesExecutor Mixed workloads: steady Celery + burst k8s Ops budget is limited

 

For most new installations in 2026, KubernetesExecutor on managed Kubernetes (EKS, GKE, or AKS) is the pragmatic default. Each task receives a fresh pod with its own resources, failure isolation is automatic, and autoscaling is supplied by the cluster itself. The drawback is pod startup overhead, typically 5 to 20 seconds, which is immaterial for multi-minute tasks but problematic for thousands of sub-second tasks.

Best Practices for Production

Airflow offers considerable flexibility, which permits both excellent and poor implementations. The practices below distinguish teams that maintain Airflow deployments over many years from teams that must rebuild their deployments every 18 months.

Make Every Task Idempotent

Running a task twice for the same logical date must produce the same result. This requirement implies the use of MERGE rather than INSERT, the writing of output to partitioned paths keyed on {{ ds }}, and the use of delete-then-insert within a transaction. Idempotency is the single most important property of a production pipeline, because it is what makes retries and backfills safe. The broader principle, namely writing code that others (including the author at a later date) can reason about, is discussed in the clean code principles guide.

Keep Tasks Small and Atomic

A task that performs a single action is one that can be retried, debugged, and reasoned about. A task that performs six actions is one that may fail partway through and require investigation to determine which steps completed.

Use Pools and SLAs

Pools cap the number of concurrent tasks that hit a shared resource (for example, five slots for an overloaded production Postgres instance). SLAs allow Airflow to raise an alarm when a task takes longer than expected.

extract = SnowflakeOperator(
    task_id="extract_large_mart",
    snowflake_conn_id="snowflake_prod",
    sql="...",
    pool="snowflake_heavy",  # defined in UI: 3 slots
    sla=pendulum.duration(minutes=30),
)

Configure Alerts Early

The on_failure_callback and on_retry_callback hooks should be used to post to Slack, open PagerDuty incidents, or file Jira tickets. A silent failure is strictly worse than a visible one.

def notify_slack(context):
    ti = context["task_instance"]
    message = (
        f":rotating_light: *{ti.dag_id}.{ti.task_id}* failed "
        f"on {context['ds']} (try {ti.try_number})"
    )
    SlackWebhookHook(slack_webhook_conn_id="slack_alerts").send(text=message)

default_args = {
    "owner": "data-eng",
    "retries": 3,
    "retry_delay": pendulum.duration(minutes=5),
    "on_failure_callback": notify_slack,
}

Treat DAGs as Software

The use of pull requests, code review, unit tests for Python callables, and integration tests with airflow dags test is recommended. For readers who are not familiar with modern Git workflows, the Git and GitHub best practices article provides relevant guidance.

Common Pitfalls to Avoid

The following errors occur repeatedly in practice. An awareness of them helps to prevent many incidents.

Caution, top-level code: Any code at the top level of a DAG file runs every time the scheduler parses the file, which can be every 30 seconds. A requests.get(...) call at module scope will repeatedly call the API and slow the scheduler significantly. Top-level code should be kept minimal, comprising only DAG definitions, imports, and inexpensive literals.
Caution, context dependency: Writing tasks that assume “now” rather than {{ data_interval_start }} makes backfills meaningless. The interval variables should always be used.
Caution, variable overuse: Variable.get() queries the metadata database. Calling it at the top level of a DAG file once per parse cycle will overload the database. Variable.get(..., default_var=...) should be used inside callables, or Jinja templating ({{ var.value.my_key }}), which is resolved lazily.

Other frequent errors include not setting catchup=False, hardcoding credentials rather than using Connections, writing substantial XCom payloads, running all tasks under one executor when a single slow task blocks the rest, and ignoring DAG parsing time (which the UI exposes under Admin → DAG Processor).

A Complete Production ETL Example

The following example brings the discussion together with a realistic daily ETL that extracts orders from Postgres, transforms them with pandas, writes Parquet files to S3, and merges into Snowflake. This is the type of pipeline that might feed a revenue dashboard. If the workflow also requires streaming ingestion, the Kafka producer guide and the Kafka consumer guide show how Airflow batch jobs complement real-time pipelines.

from __future__ import annotations

import tempfile
from pathlib import Path

import pandas as pd
import pendulum
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.python import ShortCircuitOperator


DEFAULT_ARGS = {
    "owner": "data-eng",
    "retries": 3,
    "retry_delay": pendulum.duration(minutes=5),
    "sla": pendulum.duration(hours=2),
}


@dag(
    dag_id="daily_revenue_pipeline",
    description="Extract orders from Postgres, transform, land in S3, merge into Snowflake.",
    schedule="0 2 * * *",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    max_active_runs=1,
    default_args=DEFAULT_ARGS,
    tags=["etl", "revenue", "daily"],
)
def daily_revenue_pipeline():

    wait_for_crm = S3KeySensor(
        task_id="wait_for_crm_export",
        bucket_key="s3://acme-data-lake/crm/export/{{ ds }}/manifest.json",
        aws_conn_id="aws_default",
        poke_interval=120,
        timeout=60 * 60 * 4,
        mode="reschedule",
    )

    def _has_orders(**context):
        hook = PostgresHook(postgres_conn_id="orders_pg")
        count = hook.get_first(
            "SELECT COUNT(*) FROM public.orders "
            "WHERE created_at::date = %s",
            parameters=(context["ds"],),
        )[0]
        print(f"Found {count} orders for {context['ds']}")
        return count > 0

    gate = ShortCircuitOperator(
        task_id="skip_if_no_orders",
        python_callable=_has_orders,
    )

    @task
    def extract_orders(**context) -> str:
        """Pull the day's orders into a local CSV. Return the path."""
        ds = context["ds"]
        hook = PostgresHook(postgres_conn_id="orders_pg")
        sql = """
            SELECT order_id, customer_id, sku, quantity,
                   unit_price, currency, created_at
            FROM public.orders
            WHERE created_at >= %(start)s::timestamptz
              AND created_at <  %(end)s::timestamptz
        """
        df = hook.get_pandas_df(
            sql,
            parameters={
                "start": f"{ds} 00:00:00+00",
                "end":   f"{ds} 24:00:00+00",
            },
        )
        tmp = Path(tempfile.mkdtemp()) / f"orders_{ds}.parquet"
        df.to_parquet(tmp, index=False)
        return str(tmp)

    @task
    def transform(local_path: str, **context) -> str:
        """Compute revenue in USD and enrich with date dimensions."""
        df = pd.read_parquet(local_path)
        fx = {"USD": 1.0, "EUR": 1.08, "GBP": 1.27, "KRW": 0.00072}
        df["revenue_usd"] = (
            df["quantity"] * df["unit_price"] * df["currency"].map(fx).fillna(1.0)
        )
        df["order_date"] = pd.to_datetime(df["created_at"]).dt.date
        df = df.drop(columns=["created_at"])

        out = Path(local_path).with_name(f"transformed_{context['ds']}.parquet")
        df.to_parquet(out, index=False)
        return str(out)

    @task
    def upload_to_s3(local_path: str, **context) -> str:
        ds = context["ds"]
        key = f"warehouse/revenue/dt={ds}/part-000.parquet"
        S3Hook(aws_conn_id="aws_default").load_file(
            filename=local_path,
            key=key,
            bucket_name="acme-data-lake",
            replace=True,
        )
        return f"s3://acme-data-lake/{key}"

    merge_snowflake = SnowflakeOperator(
        task_id="merge_into_revenue_fact",
        snowflake_conn_id="snowflake_prod",
        sql="""
            BEGIN;

            CREATE OR REPLACE TEMPORARY TABLE staging_revenue AS
            SELECT $1:order_id::STRING      AS order_id,
                   $1:customer_id::STRING   AS customer_id,
                   $1:sku::STRING           AS sku,
                   $1:quantity::NUMBER      AS quantity,
                   $1:revenue_usd::FLOAT    AS revenue_usd,
                   $1:order_date::DATE      AS order_date
            FROM @acme_lake/warehouse/revenue/dt={{ ds }}/
                 (FILE_FORMAT => parquet_fmt);

            DELETE FROM analytics.fact_revenue
            WHERE order_date = '{{ ds }}';

            INSERT INTO analytics.fact_revenue
            SELECT * FROM staging_revenue;

            COMMIT;
        """,
    )

    @task
    def publish_metrics(**context):
        hook = PostgresHook(postgres_conn_id="metadata_pg")
        hook.run(
            """
            INSERT INTO ops.pipeline_runs (pipeline, run_date, status, finished_at)
            VALUES (%s, %s, 'success', now())
            """,
            parameters=("daily_revenue_pipeline", context["ds"]),
        )

    raw  = extract_orders()
    xfm  = transform(raw)
    uri  = upload_to_s3(xfm)

    wait_for_crm >> gate >> raw
    uri >> merge_snowflake >> publish_metrics()


daily_revenue_pipeline()

The code should be read carefully. Every task is idempotent: the Snowflake MERGE deletes the day’s partition before reinserting, the S3 key is deterministic, and the Postgres extract is bounded by an interval. A short-circuit is used when there is nothing to do. The SLA, the retries, and the max_active_runs=1 setting are present to prevent overlapping runs. Only paths and URIs are passed through XCom; the data itself is never passed.

For a more detailed treatment of moving time-series data through a full modern stack, see the InfluxDB to AWS Iceberg pipeline guide. If complex event processing in-stream is preferred to batch processing, the Flink CEP guide is a useful companion.

Monitoring and Observability

Airflow’s web UI provides substantial value out of the box. The Graph view displays the DAG, the Gantt chart shows how long each task ran, and Task Duration trends highlight regressions. Production deployments, however, require additional instrumentation.

Task Lifecycle States

An understanding of the task state machine is the foundation of debugging. The following diagram shows the transitions through which every task passes.

Task Instance Lifecycle scheduled queued running success failed up_for_retry up_for_reschedule skipped retry after delay exception caught sensor poke=False branch not chosen

Metrics and Logs

Airflow emits StatsD metrics by default, including scheduler heartbeat, task duration, DAG parsing time, and pool usage. These metrics should be scraped with Prometheus via a StatsD exporter, and Grafana dashboards should be constructed for them. For logs, a remote logging backend (S3, GCS, or Elasticsearch) should be configured so that worker pods can be removed without losing their history.

# airflow.cfg
[metrics]
statsd_on = True
statsd_host = statsd-exporter.monitoring.svc
statsd_port = 9125
statsd_prefix = airflow

[logging]
remote_logging = True
remote_base_log_folder = s3://acme-airflow-logs/
remote_log_conn_id = aws_default
Key Takeaway: The four principal signals for Airflow monitoring are scheduler heartbeat, DAG parsing time, task queue depth, and SLA misses. Alerts should be configured on all four. Everything else is detail.

Frequently Asked Questions

Airflow versus cron: when is it overkill?

If a team has fewer than five scheduled scripts, all of them run on one host, none depend on each other, and silent failure is not a concern, cron is sufficient. As soon as dependencies, retries, alerts, backfills, or cross-team visibility are required, Airflow recovers its cost within a few weeks.

Airflow versus Prefect versus Dagster: which should be selected?

Airflow has the largest ecosystem, the most provider packages, and the most production-proven scaling history. Prefect is more Pythonic and offers an elegant local development experience. Dagster emphasises software-defined assets and data lineage, which is appealing for teams that think in terms of datasets rather than tasks. For most teams in 2026, Airflow remains the safest choice because hiring and community support are unmatched, although Dagster is a strong option for greenfield data platforms that wish to adopt asset-centric semantics from the outset.

How should long-running tasks be handled?

The first question is whether the task should reside inside Airflow at all. If it is a 12-hour Spark job, Airflow should trigger it (via SparkSubmitOperator or an EMR or Databricks operator) and wait for completion through a deferrable sensor, rather than executing the work itself. Deferrable operators and sensors suspend the task to the triggerer process and entirely release the worker slot. One Airflow worker can therefore supervise thousands of long-running external jobs simultaneously.

What is the recommended approach to deploying Airflow in production?

For most teams, managed Airflow (Astronomer, AWS MWAA, or Google Cloud Composer) is worth the cost, since it removes the operational burden of running the scheduler, metadata database, and executor infrastructure. For self-hosting, the recommended configuration is Kubernetes with the official Helm chart, the KubernetesExecutor, a managed Postgres (RDS or Cloud SQL) for the metadata database, log shipping to S3 or GCS, and Prometheus scraping for metrics. Every provider package version should be pinned, and upgrades should be treated as planned projects rather than incidental work.

How should secrets be handled in Airflow?

Credentials should never be placed directly in DAG code or Airflow Variables. Airflow Connections should be used, backed by a secrets manager such as AWS Secrets Manager, HashiCorp Vault, GCP Secret Manager, or Azure Key Vault. The secrets_backend setting in airflow.cfg should be configured so that Airflow transparently fetches connections and variables at runtime. Secrets then reside in a dedicated, audited system and never touch the metadata database.

Conclusion

Airflow is not a complete solution in itself. It will not fix a poor data model, it will not make SQL execute more rapidly, and it will not compensate for a team that does not practise code review. What it does is convert data pipelines from a fragile collection of scripts into a first-class software system with dependencies, retries, backfills, alerts, and an auditable history. The difference is comparable to that between writing a midnight log file into the void and operating a platform that can be relied upon.

The recommended approach is to start small. One brittle cron job should be migrated to Airflow during the first week. The TaskFlow API, the data interval mental model, and the Graph view should be mastered first. Slack alerts should be configured before anything else, followed by retries and pools. The team can then move to KubernetesExecutor and deferrable sensors as the workload grows. The vocabulary used throughout, namely DAGs, tasks, operators, and sensors, is the same vocabulary used by thousands of data teams worldwide, which means the skills acquired transfer broadly. For complementary detailed examinations of the broader data ecosystem, the guides on Python versus Rust for selecting the appropriate language for a pipeline’s hottest paths and the time-series databases comparison for selecting an appropriate data sink are recommended.

Related Reading:

References

You Might Also Like

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *