Blog

Testen und Debuggen von Apache Airflow

Aktualisiert Oktober 21, 2025
11 Minuten

Testen des Luftstroms ist schwierig

Es gibt einen guten Grund, diesen Blogbeitrag zu schreiben - das Testen von Airflow-Code kann schwierig sein. Das führt oft dazu, dass man einen ganzen Bereitstellungszyklus durchlaufen muss, um die Auslösetaste auf einem Live-System manuell zu drücken. Erst danach können sie ihren Airflow-Code verifizieren. Das ist ein mühsamer und langwieriger Prozess, und wie bei jeder anderen Software würden die Benutzer ihren Airflow-Code gerne lokal schreiben, testen und debuggen.

Die Ausführung einer Airflow-DAG auf Ihrem lokalen Rechner ist aufgrund von Abhängigkeiten von externen Systemen oft nicht möglich. Für den Anfang möchte ich Sie auf diesen hervorragenden Blogbeitrag von ING WBAA über das Testen von Airflow hinweisen. Er behandelt die Einrichtung von DTAP und CI/CD für Airflow. Außerdem wird in dem Blogbeitrag ein DAG-Integritätstest beschrieben, mit dem Sie überprüfen können, ob Ihre DAG-Dateien gültige DAG-Objekte enthalten, was ein guter Ausgangspunkt ist. Außerdem gibt es diesen Meetup-Vortrag über eine lokale Airflow-Testumgebung mit Docker Compose von meinem Kollegen Bas Beelen, die in naher Zukunft als Open Source zur Verfügung gestellt werden wird.

Das Hauptziel dieses Beitrags ist es zu erklären, wie Sie Airflow-Komponenten lokal testen können, ohne ein Produktionssystem zu benötigen. In Airflow gilt das für alles - Hooks, Utility-Funktionen, usw. Der DAG selbst ist lediglich eine Konfiguration, um verschiedene Operationen miteinander zu verbinden. Dieser Beitrag befasst sich nicht mit dem Testen kompletter DAGs (obwohl Sie dies mit den in diesem Blogbeitrag vorgestellten Tools tun könnten), sondern erklärt, wie Sie einzelne Operationen testen können.

Alle Tests wurden mit pytest und Airflow 1.10.2 geschrieben. Der gesamte Code in diesem Blogbeitrag ist auf GitHub verfügbar.

Pytest Airflow Halterungen & Helfer

Airflow-Aufgaben werden immer im Kontext einer DAG ausgeführt. Die Ausführung einer Aufgabe in einer DAG wird über eine Aufgabeninstanz gesteuert, die der Aufgabe den Kontext des aktuellen Laufs zur Verfügung stellt. Das Testen einer Aufgabe kann daher nicht von der Ausführung einer DAG entkoppelt werden. Um also Operatoren zu testen, verwende ich eine Dummy-DAG, die während meiner Tests verwendet wird.

Pytest verfügt über das Konzept der Fixtures, d.h. Objekte, die als Eingabeargumente an Testfunktionen übergeben werden können. Aus diesem Grund ziehe ich Pytest gegenüber Python Unittest vor. Diese Fixtures ermöglichen wiederverwendbaren Code und weniger Code-Duplizierung. Für Airflow habe ich ein test_dag fixture, mit dem ich Operatoren teste, die eine DAG zur Ausführung benötigen.

import datetime

import pytest
from airflow import DAG

@pytest.fixture
def test_dag():
    return DAG(
        “test_dag”,
        default_args={“owner”: “airflow”, “start_date”: datetime.datetime(2018, 1, 1)},
        schedule_interval=datetime.timedelta(days=1),
    )

Definieren Sie diese test_dag Halterung in tests/conftest.py, um sie in jedem Test zu verwenden.

Ein nettes Plugin für pytest ist der Pytest Helpers Namespace. Damit können Sie jede Funktion unter dem pytest Namespace registrieren, um sie überall in Ihren Tests zu verwenden. Installieren Sie es mit pip install pytest-helpers-namespace. Um Operatoren zu testen, muss ich eine Aufgabe mit einer DAG ausführen und daher eine run_task Hilfsfunktion definieren:

import pytest

pytest_plugins = ["helpers_namespace"]

@pytest.helpers.register
def run_task(task, dag):
    dag.clear()
    task.run(
        start_date=dag.default_args["start_date"],
        end_date=dag.default_args["start_date"],
    )

Jetzt können die test_dag fixture und run_task Hilfsfunktion verwendet werden, um Aufgaben in einem Unit-Test auszuführen:

import pytest
from airflow.operators.bash_operator import BashOperator

def test_dummy(test_dag, tmpdir):
    tmpfile = tmpdir.join("hello.txt")

    task = BashOperator(task_id="test", bash_command=f"echo 'hello' > {tmpfile}", dag=test_dag)
    pytest.helpers.run_task(task=task, dag=test_dag)

    assert len(tmpdir.listdir()) == 1
    assert tmpfile.read().replace("n", "") == "hello"

Der Test test_dummy verwendet zwei pytest-Fixtures: test_dag wie oben beschrieben und tmpdir. Tmpdir ist eine der Fixtures, die Sie bei der Verwendung von pytest kostenlos erhalten. Es stellt ein temporäres Verzeichnis bereit, das Sie normalerweise mit der eingebauten Funktion tempfile erstellen würden. Geben Sie einfach tmpdir als Argument für eine Testfunktion ein und Sie können dieses tmpdir in Ihrem Test verwenden. Im obigen Test führe ich einen BashOperator aus, der eine Datei schreibt, und überprüfe den Inhalt der Datei, ohne eine DAG in eine Airflow-Instanz hochladen und manuell testen zu müssen.

Spottender Luftstrom

Manchmal müssen Sie in Ihren Tests Objekte vortäuschen. Zum Beispiel, wenn Sie von Ihrem Laptop aus nicht direkt auf den Airflow-Metaspeicher zugreifen und somit die Verbindungen nicht lesen können. In solchen Situationen können Sie diese Objekte in Ihren Tests nachbilden. Für das Mocking verwende ich pytest-mock, das eine mocker fixture installiert, die eine dünne Hülle um das Mock-Paket darstellt. Hier sehen Sie zum Beispiel einen Test, der den SimpleHttpOperator (Code) verwendet:

from datetime import datetime

import pytest
from airflow.hooks.base_hook import BaseHook
from airflow.models import Connection
from airflow.operators.http_operator import SimpleHttpOperator

def test_simple_http_operator(test_dag, mocker):
    mocker.patch.object(
        BaseHook,
        "get_connection",
        return_value=Connection(schema="https", host="api.sunrise-sunset.org"),
    )

    def _check_light(sunset_sunrise_response):
        results = sunset_sunrise_response.json()["results"]
        sunrise = datetime.strptime(results["sunrise"][:-6], "%Y-%m-%dT%H:%M:%S")
        sunset = datetime.strptime(results["sunset"][:-6], "%Y-%m-%dT%H:%M:%S")

        if sunrise  datetime.utcnow()  sunset:
            print("It is light!")
        else:
            print("It is dark!")

        return True

    is_it_light = SimpleHttpOperator(
        task_id="is_it_light",
        http_conn_id="random_name",
        endpoint="json",
        method="GET",
        data={"lat": "52.370216", "lng": "4.895168", "formatted": "0"},
        response_check=_check_light,
        dag=test_dag,
    )

    pytest.helpers.run_task(task=is_it_light, dag=test_dag)

Diese (hypothetische) Aufgabe holt die Sonnenaufgangs- und Sonnenuntergangszeiten von api.sunrise-sunset.org und die Lambda-Funktion gibt aus, ob es gerade hell oder dunkel ist. Sie könnten die Anmeldeinformationen für eine solche API in Airflow-Verbindungen speichern, aber Sie können von Ihrem lokalen Rechner aus nicht auf den Metaspeicher zugreifen. Also patchen wir den BaseHook und spiegeln den Rückgabewert von get_connection vor, um für diesen Test immer ein Connection-Objekt mit host="api.sunrise-sunset.org" zurückzugeben.

Auf diese Weise können wir mit einem Airflow Connection Objekt arbeiten und den Operator testen.

Externe Systeme spiegeln

Mocking funktioniert für Objekte, aber was ist, wenn Sie die Implementierung Ihrer Komponente anhand eines echten externen Systems überprüfen möchten? Indem Sie Docker-Container mit dem System, das Sie testen möchten, aufsetzen, können Sie das korrekte Verhalten Ihrer Komponente überprüfen!

Es gibt natürlich mehrere Möglichkeiten, dies zu tun. Airflow selbst startet zum Beispiel eine Reihe von Containern mit Docker Compose zu Beginn seiner Testsuite.

Eine weitere Option, die mir gefällt, ist wiederum ein anderes pytest-Paket namens pytest_docker_tools. Es bietet eine Reihe von Hilfsprogrammen für pytest zur Verwaltung von Docker-Containern. Mir gefällt, dass es die Testabhängigkeiten innerhalb der Testskripte hält und Sie Docker-Container als Fixtures an Ihre Tests übergeben können.

Zur Demonstration habe ich eine PostgresToLocalOperator implementiert:

import json

from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from psycopg2.extras import RealDictCursor

class PostgresToLocalOperator(BaseOperator):
    @apply_defaults
    def __init__(self, pg_query: str, local_path: str, postgres_conn_id: str, **kwargs):
        super().__init__(**kwargs)
        self._pg_query = pg_query
        self._local_path = local_path
        self._postgres_conn_id = postgres_conn_id

    def execute(self, context):
        postgres_hook = PostgresHook(postgres_conn_id=self._postgres_conn_id)
        conn = postgres_hook.get_conn()
        cursor = conn.cursor(cursor_factory=RealDictCursor)
        cursor.execute(self._pg_query)

        with open(self._local_path, "w") as f:
            json.dump(cursor.fetchall(), f, indent=4)

Die PostgresToLocalOperator fragt eine Postgres-Datenbank ab und speichert das JSON-formatierte Ergebnis auf der lokalen Festplatte. Jetzt möchte ich das korrekte Verhalten meines überprüfen, kann aber nicht auf die produktive Postgres-Datenbank zugreifen. Lassen Sie uns also einen Test schreiben und einen Postgres-Docker-Container für die Abfrage aufsetzen:

from collections import namedtuple
from pytest_docker_tools import container, fetch

@pytest.fixture(scope="module")
def postgres_credentials():
    PostgresCredentials = namedtuple("PostgresCredentials", ["username", "password"])
    return PostgresCredentials("testuser", "testpass")

postgres_image = fetch(repository="postgres:11.1-alpine")
postgres = container(
    image="{postgres_image.id}",
    environment={
        "POSTGRES_USER": "{postgres_credentials.username}",
        "POSTGRES_PASSWORD": "{postgres_credentials.password}",
    },
    ports={"5432/tcp": None},
    volumes={
        path.join(path.dirname(__file__), "postgres-init.sql"): {
            "bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
        }
    },
)

Hier sind einige Dinge im Gange.

Zunächst erstellen wir ein namedtuple fixture, das die Postgres-Anmeldedaten enthält. Der Weg zur Übergabe von Variablen an die pytest_docker_tools Objekte führt über Fixtures. Als zusätzlicher Bonus kann die postgres_credentials fixture nun als Argument an alle Tests übergeben werden.

@pytest.fixture(scope="module")
def postgres_credentials():
    PostgresCredentials = namedtuple("PostgresCredentials", ["username", "password"])
    return PostgresCredentials("testuser", "testpass")

Als Nächstes benötigt pytest_docker_tools zwei Anweisungen für die Erstellung eines Docker-Container-Fixtures. Die fetch() für das Abrufen von Docker-Image-Metadaten und die container() für die Konstruktion des Docker-Containers. Sie folgt den gleichen Argumenten wie Docker selbst, z.B. volumes für das Einhängen von Volumes. Ich habe ein postgres-init.sql Skript erstellt, das in /docker-entrypoint-initdb.d/ in einem Postgres-Docker-Container platziert werden kann, um beim Booten[^1] ausgeführt zu werden, damit die Dummy-Postgres-DB Dummy-Daten enthält.

postgres-init.sql:

# postgres-init.sql
SET search_path TO public;
CREATE TABLE dummy (
    id integer,
    name character varying(255)
);
INSERT INTO dummy (id,name) VALUES (1, 'dummy1');
INSERT INTO dummy (id,name) VALUES (2, 'dummy2');
INSERT INTO dummy (id,name) VALUES (3, 'dummy3');

Nun können wir den Test schreiben, um den PostgresToLocalOperator zu testen, der von Postgres liest und in das lokale Dateisystem schreibt:

import json
from pathlib import Path

import pytest
from airflow.models import Connection

from my_package.operators.postgres_to_local_operator import PostgresToLocalOperator, PostgresHook

def test_postgres_to_local_operator(test_dag, mocker, tmpdir, postgres, postgres_credentials):
    output_path = str(tmpdir / "pg_dump")

    mocker.patch.object(
        PostgresHook,
        "get_connection",
        return_value=Connection(
            host="localhost",
            conn_type="postgres",
            login=postgres_credentials.username,
            password=postgres_credentials.password,
            port=postgres.ports["5432/tcp"][0],
        ),
    )

    task = PostgresToLocalOperator(
        task_id="test",
        postgres_conn_id="postgres",
        pg_query="SELECT * FROM dummy",
        local_path=output_path,
        dag=test_dag,
    )
    pytest.helpers.run_task(task=task, dag=test_dag)

    # Assert if output file exists
    output_file = Path(output_path)
    assert output_file.is_file()

    # Assert file contents, should be the same as in postgres-init.sql
    expected = [
        {"id": 1, "name": "dummy1"},
        {"id": 2, "name": "dummy2"},
        {"id": 3, "name": "dummy3"},
    ]
    with open(output_file, "r") as f:
        assert json.load(f) == expected

Der Test benötigt eine Reihe von Argumenten:

  1. test_dag - DAG-Halterung
  2. mocker - pytest mock fixture
  3. tmpdir - pytest tmpdir fixture
  4. postgres - Dummy Postgres Docker-Container-Einrichtung
  5. postgres_credentials - Postgres Anmeldeinformationen fixture

Zuerst definieren wir einen Ausgabepfad, in den die Ergebnisse geschrieben werden:

output_path = str(tmpdir / "pg_dump") 

Als nächstes patchen wir den PostgresHook so, dass er beim Aufruf von get_connection ein gespottetes Verbindungsobjekt zurückgibt, da wir lokal keinen Zugriff auf eine laufende Airflow-Instanz haben.

Wichtig! Einer der, wenn nicht sogar der am häufigsten gemachte Fehler beim Python-Mocking ist es, die falsche Stelle zu patchen. Das Ergebnis ist, dass der Patch keine Wirkung zu haben scheint. Um den PostgresHook zu patchen, dürfen Sie nicht from airflow.hooks.postgres_hook import PostgresHook importieren! Importieren Sie stattdessen den PostgresHook von dem Ort, an dem Sie den PostgresHook tatsächlich verwenden: from my_package.operators.postgres_to_local_operator import PostgresToLocalOperator, PostgresHook.

from my_package.operators.postgres_to_local_operator 
import PostgresToLocalOperator, PostgresHook

mocker.patch.object(
    PostgresHook,
    "get_connection",
    return_value=Connection(
        host="localhost",
        conn_type="postgres",
        login=postgres_credentials.username,
        password=postgres_credentials.password,
        port=postgres.ports["5432/tcp"][0],
    ),
)

Als nächstes führen wir unseren Operator aus, indem wir einfach SELECT * FROM dummy abfragen:

task = PostgresToLocalOperator(
    task_id="test",
    postgres_conn_id="postgres",
    pg_query="SELECT * FROM dummy",
    local_path=output_path,
    dag=test_dag,
)
pytest.helpers.run_task(task=task, dag=test_dag)

Der Operator hat nun seine Ausführung abgeschlossen, so dass wir eine Datei in output_path erwarten und wir erwarten, dass diese Datei alles enthält, was in der Dummy-Tabelle war:

output_file = Path(output_path)
assert output_file.is_file()

expected = [
    {"id": 1, "name": "dummy1"},
    {"id": 2, "name": "dummy2"},
    {"id": 3, "name": "dummy3"},
]
with open(output_file, "r") as f:
    assert json.load(f) == expected

Den vollständigen Code für den PostgresToLocalOperator finden Sie hier, und den vollständigen Code zum Testen des Operators finden Sie hier.

Fehlersuche im Luftstrom

Es gibt verschiedene Möglichkeiten, einen in Airflow laufenden Prozess zu debuggen. Wenn er lokal läuft, z.B. ein Unit-Test, können Sie in der IDE Ihrer Wahl einen Haltepunkt setzen. Das Remote-Debugging mit einer IDE habe ich in diesem Blog-Beitrag ausgeklammert und werde eine andere Methode erklären, die sowohl lokal als auch remote funktioniert.

PDB

Python verfügt über einen eingebauten Debugger namens pdb. Sie können ihn verwenden, indem Sie dieses Snippet an der Stelle platzieren, an der Sie das Debugging starten möchten:

import pdb
pdb.set_trace()

Wenn Sie Python 3.7 verwenden (derzeit nur von Airflow Master unterstützt), können Sie auch einfach breakpoint() irgendwo in Ihrem Code aufrufen. Es gibt auch ipdb mit mehr Funktionen wie farblicher Hervorhebung und Autovervollständigung, es ist jedoch kein Built-in, so dass Sie es mit pip install ipdb installieren müssen. Sobald Sie sich in einer Debug-Sitzung befinden, können Sie das Debugging mit diesen Shortcuts steuern(Quelle):

PDB-Spickzettel

Wenn Sie einen Haltepunkt setzen möchten, aber nicht wissen, wo Sie den Code finden, öffnen Sie einfach ein Python-Terminal:

$ python

>>> import airflow.operators.bash_operator    # Import the module
>>> airflow.operators.bash_operator.__file__  # module.__file__
'./incubator-airflow/airflow/operators/bash_operator.py'

Wenn Sie schließlich einen Airflow-Auftrag "live" debuggen möchten, können Sie eine Aufgabe manuell mit airflow test [dag_id] [task_id] [yyyy-mm-dd] ausführen. Dabei wird keine Aufgabeninstanz erstellt und die Ausführung wird nirgendwo im Metaspeicher aufgezeichnet. Es ist jedoch für die Fehlersuche nützlich. Im folgenden Beispiel zeige ich, wie Sie auf diese Weise eine fehlerhafte Pendulum.format() Anweisung debuggen können:

PDB

Letzte Worte

Mit den Beispielen in diesem Beitrag werden Sie hoffentlich in der Lage sein, Ihre Entwicklungszeit zu verkürzen und das Verhalten Ihres Airflow-Codes lokal zu überprüfen. Das lokale Testen von Operatoren mit Unit-Tests ohne eine Airflow-Installation kann sich wie ein Kinderspiel anfühlen! Wenn Sie Fragen haben, können Sie mich gerne auf Airflow Slack unter @BasPH kontaktieren.

Sind Sie an einer Apache Airflow-Schulung interessiert?

Eine kurze Vorwarnung: Wir bieten Apache Airflow als öffentlichen Kurs in unsererAcademy an. Machen Sie mit und lernen Sie alles, was Sie für die erfolgreiche Arbeit mit Airflow brauchen!

[^1]: Aus der Postgres Docker Hub Dokumentation: nachdem initdb aufgerufen wurde, wird jede *.sql und ausführbare *.sh Datei in /docker-entrypoint-initdb.d/ ausgeführt.

Contact

Let’s discuss how we can support your journey.