The Airflow experimental api allows you to trigger a DAG over HTTP. This comes in handy if you are integrating with cloud storage such Azure Blob store.
Because although Airflow has the concept of Sensors, an external trigger will allow you to avoid polling for a file to appear.
In this blog post, I will show how we use Azure Functions to trigger a DAG when a file is uploaded to a Azure Blob Store.
Experimental API
The experimental API allows you to fetch information regarding dags and tasks, but also trigger and even delete a DAG. In this blog post we will use it to trigger a DAG.
By default the experimental API is unsecured, and hence before we continue we should define an auth_backend
which secures it.
There are multiple options available, in this blogpost we use the password_auth
backend which implements HTTP Basic Authentication. Something you should only use over HTTPS.
Enabling the password_auth
backend is a small change to your Airflow config file:
[api] auth_backend = airflow.contrib.auth.backends.password_auth
Next, use the Airflow web interface to create a new user to be used by the Azure Function to trigger the DAG.
Azure Functions
Azure allows you to define small snippets of code which can be triggered by a whole range of other Auzre products. Examples are being triggered by a message on an EventHub, or in this case a file appearing on a Blob Store.
There are a couple of different languages to choose from, and in this case I was a bit lazy and went for JavaScript in the BlobTrigger wizard.
You need to link a storage account, and define the path you want to monitor. In my case I configured it to monitor my-data/{name}
, where my-data
is the name of the container within the storage account.
Next, you’re presented with a small example. I’ve extending their example a bit which resulted in the following piece of code:
var request = require('request'); module.exports = function (context, myBlob) { context.log("JavaScript blob trigger function processed blob n Name:", context.bindingData.name, "n Blob Size:", myBlob.length, "Bytes"); filename = context.bindingData.name; context.log("Triggering DAG"); request({ url: "https://AIRFLOW_URL/api/experimental/dags/DAG_NAME/dag_runs", method: "POST", json: {'conf': '{"filename": "'+filename+'"}'}, auth: { 'user': 'AIRFLOW_USER', 'pass': 'AIRFLOW_PASSWORD', 'sendImmediately': true } }).on('response', function(response) { context.log(response.statusCode) }); context.done(); };
You need to adjust the AIRFLOW_URL
, DAG_NAME
, AIRFLOW_USER
, and AIRFLOW_PASSWORD
.
The nice thing here is that I’m actually passing the filename of the new file to Airflow, which I can use in the DAG lateron.
Airflow DAG
The full Airflow DAG itself I won’t post, but in the excerpt below I show how to use the filename
in the DAG.
def copy_blob(templates_dict, **kwargs): if templates_dict['filename']: #do something with the filename pass else: #no filename specified, probably a manual run pass with DAG(....) as dag: copy_blob = PythonOperator(task_id='copy_blob', provide_context=True, python_callable=copy_blob, templates_dict={'filename': "{{dag_run.conf['filename']}}"})
Concluding
So in this blog post I’ve shown how to use an Azure Function to trigger an Airflow DAG using the experimental API.
I very much like the fact that it removes the need of polling for a file, and it allows us to integrate Airflow nicely in an otherwise cloud native setup.
Want to improve your Apache Airflow skills?
Did you know we offer an Apache Airflow course
to teach you the internals, terminology, and best practices of working with Airflow, with hands-on experience in writing an maintaining data pipelines?