Blog

The Zen of Python and Apache Airflow

17 Feb, 2019
Xebia Background Header Wave

Apache Airflow is a Python framework for programmatically creating workflows in DAGs, e.g. ETL processes, generating reports, and retraining models on a daily basis. This allows for concise and flexible scripts but can also be the downside of Airflow; since it’s Python code there are infinite ways to define your pipelines. The Zen of Python is a list of 19 Python design principles and in this blog post I point out some of these principles on four Airflow examples. This blog was written with Airflow 1.10.2.

1. The DAG context manager

A DAG object can be instantiated and referenced in tasks in two ways:

Option 1: explicity pass DAG reference:

dag = DAG(...)
t1 = DummyOperator(task_id="task1", dag=dag)
t2 = DummyOperator(task_id="task2", dag=dag)

Option 2: use DAG in context manager, no need to reference the DAG object:

with DAG(...) as dag:
    t1 = DummyOperator(task_id="task1")
    t2 = DummyOperator(task_id="task2")

If you check the context manager implementation, you see it’s implemented by setting the DAG to a global variable1 _CONTEXT_MANAGER_DAG. Operators simply check for the existence of _CONTEXT_MANAGER_DAG and if so, set dag=_CONTEXT_MANAGER_DAG. Therefore, we don’t even have to write as dag (unless you’d like to reference it somewhere in your code):

with DAG(...):
    t1 = DummyOperator(task_id="task1")
    t2 = DummyOperator(task_id="task2")

The DAG context manager appears to work "magically" without having to reference it in any task, so given Python’s design principle "Explicit is better than implicit", I prefer to use option #1.

2. Setting dependencies between tasks

The >> and << operators can be used to connect both a single task and a list of tasks. The bit shift operators were introduced in Airflow 1.8 as an alternative to set_downstream() (>>) and set_upstream() (<<). The implementing functions (<strong>lshift</strong> and <strong>rshift</strong>) simply pass variables to set_downstream and set_upstream, which in turn all call _set_relatives(), which accepts both a single task and a list of tasks (a single task is converted to a list containing the single task).

This allows connecting lists of tasks:

task1 >> [task2, task3]

# Instead of:
task1 >> task2
task1 >> task3

<strong>rlshift</strong> and <strong>rrshift</strong> are also implemented to support setting dependencies the other way around. These operations are called on the right-hand-side object when the left-hand-side has no implementation for the bit shift operator, which is the case for lists:

[task2, task3]  task1

# Instead of:
task2  task1
task3  task1

Which allows for consise chaining of tasks such as:

task1 >> [task2, task3] >> task4  [task5, task6]

Referencies Airflow task lists
Given the principle “Readability counts“, I think for clarity sake it’s better not to mix directions in a single statement. And if possible, use the downstream direction whenever possible since it’s the natural way of reading from left to right for most people:

task1 >> [task2, task3] >> task4
[task5, task6] >> task4

Note it’s not possible to reference a list on both sides, since a list has no implementation for the bit shift operation:

# This does not work
[task1, task2] >> [task3, task4]

3. Passing context to tasks

A common data pipeline is a daily load of data, writing the data in partitions per day, notated as e.g. dt=yyyy-mm-dd. Airflow supports this use case by providing access to the task context.

from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {"owner": "airflow", "start_date": datetime(2018, 10, 1)}
dag = DAG(dag_id="context_demo", default_args=default_args, schedule_interval="@daily")

# The PythonOperator with provide_context=True passes the Airflow context to the given callable
def _print_exec_date(**context):
    print(context["execution_date"])
    # Prints e.g. 2018-10-01T00:00:00+00:00

print_exec_date = PythonOperator(
    task_id="print_exec_date",
    python_callable=_print_exec_date,
    provide_context=True,
    dag=dag,
)

When providing provide_context=True to an operator, we pass along the Airflow context variables to be used inside the operator. These context variables include a.o. start date of the interval in various formats (for example ds="yyyy-mm-dd", ds_nodash="yyyymmdd" and execution_date=pendulum.pendulum.Pendulum). To inspect all available variables, print the context, or view the documentation at default-variables.

def _print_exec_date(**context):
    pprint.pprint(context)

# {
# 'END_DATE': '2019-01-01',
# 'conf': ,
# 'dag': ,
# 'dag_run': None,
# 'ds': '2019-01-01',
# 'ds_nodash': '20190101',
# 'end_date': '2019-01-01',
# 'execution_date': ,
# 'inlets': [],
# 'latest_date': '2019-01-01',
# 'macros': ,
# 'next_ds': '2019-01-02',
# 'next_ds_nodash': '20190102',
# 'next_execution_date': datetime.datetime(2019, 1, 2, 0, 0, tzinfo=),
# 'outlets': [],
# 'params': {},
# 'prev_ds': '2018-12-31',
# 'prev_ds_nodash': '20181231',
# 'prev_execution_date': datetime.datetime(2018, 12, 31, 0, 0, tzinfo=),
# 'run_id': None,
# 'tables': None,
# 'task': ,
# 'task_instance': ,
# 'task_instance_key_str': 'context_demo__print_exec_date__20190101',
# 'templates_dict': None,
# 'test_mode': True,
# 'ti': ,
# 'tomorrow_ds': '2019-01-02',
# 'tomorrow_ds_nodash': '20190102',
# 'ts': '2019-01-01T00:00:00+00:00',
# 'ts_nodash': '20190101T000000',
# 'ts_nodash_with_tz': '20190101T000000+0000',
# 'var': {'json': None, 'value': None},
# 'yesterday_ds': '2018-12-31',
# 'yesterday_ds_nodash': '20181231'
# }

It is common practice in Python to accept keyword arguments in a function with the argument name kwargs. When writing functions in the context of Airflow, I prefer to name this variable context, to indicate its purpose for passing along the Airflow task instance context.

Also note we can implement the _print_exec_date function as following, because we know the Airflow context contains a variable execution_date. The context variable will now contain all Airflow context variables except for execution_date, since that is passed to the execution_date argument and context will contain all remaining keyword arguments.

def _print_exec_date(execution_date, **context):
    print(execution_date)

To show the intent of not using any other argument than execution<em>date</em>, it’s even more explicit to discard all other arguments with **:

def _print_exec_date(execution_date, **_):
    print(execution_date)

In my opinion, using **_ and only the variables you require from the Airflow context tells the reader of your code the expected input and improves the explicitness and readability.

4. Skipping execution of tasks

Another one in the “explicit over implicit” category. Sometimes you’d like to skip execution of tasks in your DAG. For example, you download a .zip file from an SFTP server every day. The SFTP server keeps one week of history. This is fine when once a day you download yesterday’s batch of data. However it will break if you try to backfill more than one week back in time.

For this use case you could set up the first task of your DAG to check if the execution date is before today – one week, and skip execution if that condition is met. This can be achieved in multiple ways. First is the ShortCircuitOperator. It requires a callable which returns True if downstream tasks should proceed and False if downstream tasks should be skipped. An example:

def _check_date(execution_date, **context):
    return execution_date > (datetime.datetime.now() - relativedelta(weeks=1))

check_date = ShortCircuitOperator(
    task_id="check_if_min_date",
    python_callable=_check_date,
    provide_context=True,
    dag=dag,
)

task1 = DummyOperator(task_id="task1", dag=dag)
task2 = DummyOperator(task_id="task2", dag=dag)

check_date >> task1 >> task2

Airflow ShortCircuitOperator example
Another option is to use the PythonOperator and raise an AirflowSkipException if a given condition is not met. This skips the task, and as a result all downstream tasks are skipped too.

Note this assumes the default trigger_rule=TriggerRule.ALL_SUCCESS argument on the operators. The TriggerRule defines the condition for running an operator. If an upstream dependency of an operator has state skipped, and its trigger_rule is TriggerRule.ALL_SUCCESS, it will also be skipped.

def _check_date(execution_date, **context):
    min_date = datetime.datetime.now() - relativedelta(weeks=1)
    if execution_date  min_date:
        raise AirflowSkipException(f"No data available on this execution_date ({execution_date}).")

check_date = PythonOperator(
    task_id="check_if_min_date",
    python_callable=_check_date,
    provide_context=True,
    dag=dag,
)

task1 = DummyOperator(task_id="task1", dag=dag)
task2 = DummyOperator(task_id="task2", dag=dag)

check_date >> task1 >> task2

AirflowSkipException example
The thing I like about this solution, is it also sets the state of the task checking the condition to skipped, which in my opinion is visually clearer.

Final words

Most examples boil down to explicitness and readability. It helps your future self and others to easily understand your code, which I prefer over less characters. Feel free to contact us about anything Airflow related!

Interested in Data Science with Python or Apache Airflow Training?

Join us for one of our courses:
Apache Airflow
Python Essentials
Data Science with Python Foundation
Advanced Data Science with Python


  1. Generally the usage of global rings red alarm bells and indicates bad programming practice. Standard open source answer: feel free to contribute 😉 ↩
Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts