Testing Airflow is hard
There’s a good reason for writing this blog post – testing Airflow code can be difficult. It often leads people to go through an entire deployment cycle to manually push the trigger button on a live system. Only after can they verify their Airflow code. This is a painfully long process and as with any other software, people would like to write, test, and debug their Airflow code locally.
Running an Airflow DAG on your local machine is often not possible due to dependencies on external systems. To start, I’d like to point out this excellent blog post by ING WBAA about testing Airflow. It covers setting up DTAP and CI/CD for Airflow. Besides this, the blog post also describes a DAG integrity test for validating if your DAG files contain valid DAG objects, which is a good starting point. Also, there’s this Meetup talk about a local Airflow testing environment with Docker Compose by my colleague Bas Beelen, which will be open sourced in the near future.
The main goal of this post is to explain how to unit test Airflow components locally without the need for a production system. In Airflow this goes for anything – hooks, utility functions, etc. The DAG itself is simply configuration to glue various operations together. This post does not cover testing complete DAGs (although you could do it with the tools shown in this blog post), but explains how to test individual operations.
All tests are written with pytest and Airflow 1.10.2. All code in this blog post is available on GitHub.
Pytest Airflow fixtures & helpers
Airflow jobs always run in the context of a DAG. The execution of a task in a DAG is controlled via a task instance, which provides the context of the current run to the task. Hence testing an cannot be decoupled from running a DAG. So in order to test operators, I use a dummy DAG to be used throughout my tests.
Pytest has the concept of fixtures; objects which can be passed to test functions as input arguments. This is why I prefer pytest over Python unittest; these fixtures allow for reusable code and less code duplication. For Airflow, I have a test_dag
fixture with which I test operators which require a DAG to run.
Define this test_dag
fixture in tests/conftest.py
to use it in any test.
A nice plugin for pytest is the Pytest Helpers Namespace. It allows to register any function under the pytest helpers
namespace, to use anywhere in your tests. Install with pip install pytest-helpers-namespace
. For testing operators, I need to run a task with a DAG, and therefore define a run_task
helper function:
Now the test_dag
fixture and run_task
helper function can be used to run tasks in a unit test:
The test_dummy
test uses two pytest fixtures: the test_dag
as described above and tmpdir
. Tmpdir is one of the fixtures you get for free when using pytest. It provides a temporary directory which you’d normally create with the tempfile builtin. Simply put tmpdir
as a test function argument and you can use this tmpdir in your test. In the test above, I run a BashOperator which writes a file and I verify the content of the file, without having to upload a DAG to an Airflow instance and test manually.
Mocking Airflow
Sometimes you need to fake objects in your tests. For example, when you cannot access the Airflow metastore directly from your laptop and thus cannot read the connections. In these situations, you can mock these objects in your tests. For mocking I use pytest-mock
which installs a mocker
fixture which is a thin wrapper around the mock package. For example, this shows a test using the SimpleHttpOperator
(code):
This (hypothetical) task fetches sunrise and sunset times from api.sunrise-sunset.org and the lambda function prints whether it’s currently light or dark. You might save the credentials to such an API in Airflow connections, however you cannot access the metastore from your local machine. So, we patch the BaseHook and mock the return value of get_connection
to always return a Connection object with host="api.sunrise-sunset.org"
for this test.
That way we can work with an Airflow Connection object and test the operator.
Mocking external systems
Mocking works for objects, but what if you want to verify the implementation of your component against a real external system. By spinning up Docker containers with the system you want to test against, you can verify the correct behaviour of your component!
There’s several ways to do this of course, for example Airflow itself starts a set of containers with Docker Compose at the start of its test suite.
Another option that I like is, once again, another pytest package called pytest_docker_tools
. It provides a set of helpers for pytest to manage Docker containers. I like that it keeps the test dependencies within the test scripts and you can pass Docker containers as fixtures to your tests.
To demonstrate, I implemented a PostgresToLocalOperator
:
The PostgresToLocalOperator
queries a Postgres database and stores the JSON-formatted result on local disk. Now I’d like to verify the correct behaviour of my PostgresToLocalOperator
, however I cannot access the production Postgres database. So, let’s write a test and spin up a Postgres Docker container to query against:
There are a couple of things going on here.
First, we create a namedtuple fixture holding the Postgres credentials. The way to pass variables to the pytest_docker_tools
objects is via fixtures. As added bonus, the postgres_credentials fixture can now be passed as an argument to all tests.
Next, pytest_docker_tools
requires two statements for creating a Docker container fixture. The fetch()
for fetching Docker image metadata and the container()
for constructing the Docker container. It follows the same argument names as Docker itself, e.g. volumes for mounting volumes. I created a postgres-init.sql
script which can be placed in /docker-entrypoint-initdb.d/
in a Postgres Docker container, to be executed at boot[^1] so that the dummy Postgres DB contains dummy data.
postgres-init.sql:
With all this set up, we can now write the test to validate the
PostgresToLocalOperator reading from Postgres and writing to local
filesystem:
The test takes a number of arguments:
- test_dag – DAG fixture
- mocker – pytest mock fixture
- tmpdir – pytest tmpdir fixture
- postgres – dummy Postgres Docker container fixture
- postgres_credentials – Postgres credentials fixture
First we define an output path to write the results to:
Next, we patch the PostgresHook to return a mocked Connection object when get_connection
is called, since we don’t have access to a running Airflow instance locally.
Important! One of, if not the, most made mistake with Python mocking is to patch the incorrect location. The result is that the patch appears to have no effect. To patch the PostgresHook, you must not import from airflow.hooks.postgres_hook import PostgresHook
! Instead, import the PostgresHook from the location where you actually use the PostgresHook: from my_package.operators.postgres_to_local_operator import PostgresToLocalOperator, PostgresHook
.
Next we run our operator, simply querying SELECT * FROM dummy
:
The operator has now completed its execution so we expect one file in
output_path
and we expect this file to contain whatever was in the
dummy table:
The complete code for the PostgresToLocalOperator can be found here, and the complete code for testing the operator can be found here.
Debugging Airflow
There are various ways to debug a process running in Airflow. If running locally, e.g. a unit test, you can place a breakpoint in your IDE of choice. I left remote debugging with an IDE out of scope for this blog post and I’ll explain a different method which works both locally and remote.
PDB
Python comes with a builtin debugger called pdb. You can use it by placing this snippet at the location you want to start debugging:
Or if you’re on Python 3.7 (currently only supported on Airflow master) you can simply call breakpoint()
somewhere in your code. There’s also ipdb
with more features such as color highlighting and autocompletion, it is however not a builtin so you’ll have to install it with pip install ipdb
. Once in a debug session, you can control the debugging with these shortcuts (source):
In case you want to place a breakpoint but don’t know where to find the code, simply open up a Python terminal:
Finally, if you want to debug a "live" Airflow job, you can manually run a task with airflow test [dag_id] [task_id] [yyyy-mm-dd]
. This does not create a task instance and does not record the execution anywhere in the metastore. It is useful though for debugging. In the example below, I show how to use this approach to debug an incorrect Pendulum.format()
statement:
Final words
With the examples in this post, hopefully you’ll be able to shorten your development time and verify the behaviour of your Airflow code locally. Testing operators locally using unit tests without an Airflow installation can feel like quite a breeze! If you have any questions, feel free to contact me on Airflow Slack as @BasPH
.
Interested in Apache Airflow Training?
A quick heads up: we offer Apache Airflow as a public course in ourAcademy. Join us to learn everything you need to successfully work with Airflow!
[^1]: From the Postgres Docker Hub documentation: after initdb
is called, any *.sql
and executable *.sh
file in /docker-entrypoint-initdb.d/
is run.