Blog

Das Zen von Python und Apache Airflow

Aktualisiert Oktober 21, 2025
7 Minuten

Apache Airflow ist ein Python-Framework zur programmgesteuerten Erstellung von Workflows in DAGs, z.B. ETL-Prozesse, Erstellung von Berichten und tägliches Neutrainieren von Modellen. Dies ermöglicht prägnante und flexible Skripte, kann aber auch der Nachteil von Airflow sein. Da es sich um Python-Code handelt, gibt es unendlich viele Möglichkeiten, Ihre Pipelines zu definieren. The Zen of Python ist eine Liste von 19 Python-Designprinzipien und in diesem Blogbeitrag zeige ich einige dieser Prinzipien anhand von vier Airflow-Beispielen auf. Dieser Blogbeitrag wurde mit Airflow 1.10.2 geschrieben.

1. Der DAG-Kontextmanager

Ein DAG-Objekt kann auf zwei Arten instanziiert und in Aufgaben referenziert werden:

Option 1: Übergeben Sie explizit eine DAG-Referenz:

dag = DAG(...)
t1 = DummyOperator(aufgabe_id="Aufgabe1", dag=dag)
t2 = DummyOperator(aufgabe_id="Aufgabe2", dag=dag)

Option 2: Verwenden Sie die DAG im Kontextmanager, ohne auf das DAG-Objekt verweisen zu müssen:

mit DAG(...) als dag:
    t1 = DummyOperator(aufgabe_id="Aufgabe1")
    t2 = DummyOperator(aufgabe_id="Aufgabe2")

Wenn Sie die Implementierung des Kontextmanagers überprüfen, sehen Sie, dass die DAG auf eine globale Variable gesetzt wird1 _CONTEXT_MANAGER_DAG. Die Operatoren prüfen einfach, ob _CONTEXT_MANAGER_DAG vorhanden ist, und wenn ja, setzen sie dag=_CONTEXT_MANAGER_DAG. Wir müssen also nicht einmal as dag schreiben (es sei denn, Sie möchten irgendwo in Ihrem Code darauf verweisen):

mit DAG(...):
    t1 = DummyOperator(aufgabe_id="Aufgabe1")
    t2 = DummyOperator(aufgabe_id="Aufgabe2")

Der DAG-Kontextmanager scheint wie von Zauberhand zu funktionieren, ohne dass Sie ihn in einer Aufgabe referenzieren müssen. Angesichts des Python-Designprinzips"Explizit ist besser als implizit" ziehe ich daher die Option 1 vor.

2. Abhängigkeiten zwischen Aufgaben festlegen

Die Operatoren >> und << können verwendet werden, um sowohl eine einzelne Aufgabe als auch eine Liste von Aufgaben zu verbinden. Die Bit-Shift-Operatoren wurden in Airflow 1.8 als Alternative zu set_downstream() (>>) und set_upstream() (<<) eingeführt. Die implementierenden Funktionen (lshift und rshift) übergeben einfach Variablen an set_downstream und set_upstream, die ihrerseits alle _set_relatives() aufrufen, das sowohl eine einzelne Aufgabe als auch eine Liste von Aufgaben akzeptiert (eine einzelne Aufgabe wird in eine Liste umgewandelt, die die einzelne Aufgabe enthält).

Dies ermöglicht das Verbinden von Listen mit Aufgaben:

Aufgabe1 >> [Aufgabe2, Aufgabe3]

# Stattdessen:
Aufgabe1 >> Aufgabe2
Aufgabe1 >> Aufgabe3

rlshift und rrshift sind ebenfalls implementiert, um das Setzen von Abhängigkeiten in umgekehrter Richtung zu unterstützen. Diese Operationen werden auf dem Objekt der rechten Seite aufgerufen, wenn die linke Seite keine Implementierung für den Bit-Shift-Operator hat, was bei Listen der Fall ist:

[Aufgabe2, Aufgabe3]  Aufgabe1

# Stattdessen:
Aufgabe2  Aufgabe1
Aufgabe3  Aufgabe1

Dies ermöglicht eine konsequente Verkettung von Aufgaben wie z.B.:

Aufgabe1 >> [Aufgabe2, Aufgabe3] >> Aufgabe4  [Aufgabe5, Aufgabe6]
Referenzen Airflow Aufgabenlisten
In Anbetracht des Grundsatzes"Die Lesbarkeit zählt" halte ich es der Klarheit halber für besser, die Richtungen in einer einzigen Aussage nicht zu vermischen. Und wenn möglich, sollten Sie immer die absteigende Richtung verwenden, da die meisten Menschen auf natürliche Weise von links nach rechts lesen:
Aufgabe1 >> [Aufgabe2, Aufgabe3] >> Aufgabe4
[Aufgabe5, Aufgabe6] >> Aufgabe4
Beachten Sie, dass es nicht möglich ist, eine Liste auf beiden Seiten zu referenzieren, da eine Liste keine Implementierung für die Bitverschiebungsoperation hat:
# Das funktioniert nicht
[Aufgabe1, Aufgabe2] >> [Aufgabe3, Aufgabe4]

3. Übergabe von Kontext an Aufgaben

Eine übliche Datenpipeline ist ein tägliches Laden von Daten, wobei die Daten in Partitionen pro Tag geschrieben werden, notiert als z.B. dt=yyyy-mm-dd. Airflow unterstützt diesen Anwendungsfall, indem es Zugriff auf den Aufgabenkontext bietet.
von datetime importieren datetime

von Luftstrom importieren DAG
von airflow.operators.python_operator importieren PythonOperator

standard_args = {"Eigentümer": "Luftstrom", "start_date": datetime(2018, 10, 1)}
dag = DAG(dag_id="kontext_demo", standard_args=standard_args, zeitplan_intervall="@daily")

# Der PythonOperator mit provide_context=True übergibt den Airflow-Kontext an die angegebene Callable.
def ausdrucken_exec_date(**Kontext):
    drucken(Kontext["Ausführungsdatum"])
    # Druckt z.B. 2018-10-01T00:00:00+00:00

print_exec_date = PythonOperator(
    aufgabe_id="print_exec_date",
    python_aufrufbar=ausdrucken_exec_date,
    bereitstellen_kontext=True,
    dag=dag,
)
Wenn wir provide_context=True einem Operator zur Verfügung stellen, geben wir die Airflow-Kontextvariablen weiter, die innerhalb des Operators verwendet werden sollen. Diese Kontextvariablen enthalten u.a. das Startdatum des Intervalls in verschiedenen Formaten (z.B. ds="yyyy-mm-dd", ds_nodash="yyyymmdd" und execution_date=pendulum.pendulum.Pendulum). Um alle verfügbaren Variablen einzusehen, drucken Sie den Kontext aus oder sehen Sie sich die Dokumentation unter default-variables an.
def ausdrucken_exec_date(**Kontext):
    pprint.pprint(Kontext)

# {
# 'END_DATE': '2019-01-01',
# 'conf': ,
# 'dag': ,
# 'dag_run': Keine,
# 'ds': '2019-01-01',
# 'ds_nodash': '20190101',
# 'end_date': '2019-01-01',
# 'execution_date': ,
# 'Einlässe': [],
# 'latest_date': '2019-01-01',
# 'Makros': ,
# 'next_ds': '2019-01-02',
# 'next_ds_nodash': '20190102',
# 'next_execution_date': datetime.datetime(2019, 1, 2, 0, 0, tzinfo=),
# 'outlets': [],
# 'params': {},
# 'prev_ds': '2018-12-31',
# 'prev_ds_nodash': '20181231',
# 'prev_execution_date': datetime.datetime(2018, 12, 31, 0, 0, tzinfo=),
# 'run_id': Keine,
# 'Tabellen': Keine,
# 'Aufgabe': ,
# 'task_instance': ,
# 'task_instance_key_str': 'context_demo__print_exec_date__20190101',
# 'templates_dict': Keine,
# 'test_mode': True,
# 'ti': ,
# 'morgen_ds': '2019-01-02',
# 'tomorrow_ds_nodash': '20190102',
# 'ts': '2019-01-01T00:00:00+00:00',
# 'ts_nodash': '20190101T000000',
# 'ts_nodash_with_tz': '20190101T000000+0000',
# 'var': {'json': None, 'value': Keine},
# 'gestern_ds': '2018-12-31',
# 'gestern_ds_nodash': '20181231'
# }
In Python ist es üblich, Schlüsselwortargumente in einer Funktion mit dem Argumentnamen kwargs zu akzeptieren. Wenn ich Funktionen im Kontext von Airflow schreibe, ziehe ich es vor, diese Variable context zu nennen, um ihren Zweck für die Weitergabe des Kontexts der Airflow-Aufgabeninstanz anzugeben. Beachten Sie auch, dass wir die Funktion _print_exec_date wie folgt implementieren können, da wir wissen, dass der Airflow-Kontext eine Variable execution_date enthält. Die Variable context enthält nun alle Airflow-Kontextvariablen mit Ausnahme von execution_date, da diese an das Argument execution_date übergeben wird, und context enthält alle übrigen Schlüsselwortargumente.
def ausdrucken_exec_date(Ausführungsdatum, **Kontext):
    drucken(Ausführungsdatum)
Um die Absicht zu zeigen, kein anderes Argument als executiondate zu verwenden, ist es noch deutlicher, alle anderen Argumente mit ** zu verwerfen:
def ausdrucken_exec_date(Ausführungsdatum, **_):
    drucken(Ausführungsdatum)
Meiner Meinung nach zeigt die Verwendung von **_ und nur der Variablen, die Sie aus dem Airflow-Kontext benötigen, dem Leser Ihres Codes die erwartete Eingabe und verbessert die Eindeutigkeit und Lesbarkeit.

4. Überspringen der Ausführung von Aufgaben

Ein weiteres Beispiel aus der Kategorie "explizit vor implizit". Manchmal möchten Sie die Ausführung von Aufgaben in Ihrer DAG auslassen. Sie laden zum Beispiel jeden Tag eine .zip-Datei von einem SFTP-Server herunter. Der SFTP-Server speichert den Verlauf von einer Woche. Das ist in Ordnung, wenn Sie einmal am Tag den Datenstapel von gestern herunterladen. Es funktioniert jedoch nicht, wenn Sie versuchen, mehr als eine Woche in die Vergangenheit zurückzurechnen. Für diesen Anwendungsfall könnten Sie die erste Aufgabe Ihrer DAG so einrichten, dass sie prüft, ob das Ausführungsdatum vor heute - einer Woche - liegt, und die Ausführung überspringt, wenn diese Bedingung erfüllt ist. Dies kann auf mehrere Arten erreicht werden. Die erste ist die ShortCircuitOperator. Sie erfordert eine Callable, die True zurückgibt, wenn nachgelagerte Aufgaben fortgesetzt werden sollen, und False, wenn nachgelagerte Aufgaben übersprungen werden sollen. Ein Beispiel:
def _check_datum(Ausführungsdatum, **Kontext):
    return Ausführungsdatum > (datetime.datetime.jetzt() - relativedelta(Wochen=1))

check_date = ShortCircuitOperator(
    aufgabe_id="check_if_min_date",
    python_aufrufbar=_check_datum,
    bereitstellen_kontext=True,
    dag=dag,
)

Aufgabe1 = DummyOperator(aufgabe_id="Aufgabe1", dag=dag)
Aufgabe2 = DummyOperator(aufgabe_id="Aufgabe2", dag=dag)

check_date >> Aufgabe1 >> Aufgabe2
Airflow ShortCircuitOperator Beispiel
Eine andere Möglichkeit besteht darin, den PythonOperator zu verwenden und eine AirflowSkipException auszulösen, wenn eine bestimmte Bedingung nicht erfüllt ist. Dadurch wird die Aufgabe übersprungen und damit auch alle nachgelagerten Aufgaben. Beachten Sie, dass dies das Standardargument trigger_rule=TriggerRule.ALL_SUCCESS für die Operatoren voraussetzt. Die TriggerRule definiert die Bedingung für die Ausführung eines Operators. Wenn eine vorgelagerte Abhängigkeit eines Operators den Zustand übersprungen hat und seine trigger_rule TriggerRule.ALL_SUCCESS ist, wird er ebenfalls übersprungen.
def _check_datum(Ausführungsdatum, **Kontext):
    min_datum = datetime.datetime.jetzt() - relativedelta(Wochen=1)
    wenn Ausführungsdatum  min_datum:
        raise AirflowSkipException(f"Keine Daten zu diesem Ausführungsdatum verfügbar ({execution_date}).")

check_date = PythonOperator(
    aufgabe_id="check_if_min_date",
    python_aufrufbar=_check_datum,
    bereitstellen_kontext=True,
    dag=dag,
)

Aufgabe1 = DummyOperator(task_id="aufgabe1", dag=dag)
Aufgabe2 = DummyOperator(task_id="aufgabe2", dag=dag)

check_date >> Aufgabe1 >> Aufgabe2
AirflowSkipException Beispiel
Was mir an dieser Lösung gefällt, ist, dass sie auch den Status der Aufgabe, die die Bedingung überprüft, auf "übersprungen" setzt, was meiner Meinung nach visuell klarer ist.

Letzte Worte

Die meisten Beispiele laufen auf Eindeutigkeit und Lesbarkeit hinaus. Das hilft Ihrem zukünftigen Ich und anderen, Ihren Code leicht zu verstehen, was ich weniger Zeichen vorziehe. Wenden Sie sich an uns, wenn Sie Fragen zu Airflow haben!

Sind Sie an einer Schulung zu Data Science mit Python oder Apache Airflow interessiert?

Nehmen Sie an einem unserer Kurse teil: - Apache Airflow - Python Grundlagen - Data Science mit Python Foundation - Fortgeschrittene Datenwissenschaft mit Python

  1. Im Allgemeinen lässt die Verwendung von global die Alarmglocken schrillen und deutet auf schlechte Programmierpraxis hin. Standard-Open-Source-Antwort: Sie können gerne etwas beitragen ;-)Â â©

Contact

Let’s discuss how we can support your journey.