Blog

Databricks Spark structured streaming metrics in DataDog

01 Jan, 1970
Xebia Background Header Wave
spark-datadog

In this guide we are going to collect Spark Structured Streaming metrics and send them to Datadog within a Databricks Workspace.

When the structured streaming job is stopped, no more streaming metrics are available. Therefor this solution does not work for trigger: once streaming jobs at the moment. (working on it)

1. Databricks secrets: datadog-api-key

The metrics are collected using the Datadog Agent. The agent requires an API key (not an Application Key) to send it's metrics to DataDog. Datadog Admins can create keys in the Datadog account settings page. We place this key in the Databricks Secrets, so it's not visible as plain text. For more information about see Databricks secrets or how to setup the Databricks CLI.

Secrets cannot be created inside notebooks, so we have to use the command-line: databricks secrets create-scope --scope application databricks secrets put --scope application --key datadog-api-key --string-value '**4c653'

The result is that the secret is available within the Databricks Workspace. Normally we access the secrets using dbutils.secrets.get('scope', 'name'), but we are going to use the secret in the init-script. Stay tuned.

2. The init script

Most of the magic is inside the init-script, which I have shared in this github-gist. I have based the script on the out-dated one from Databricks: https://docs.databricks.com/_static/notebooks/datadog-init-script.html, but with various improvements:

  • Compatible with latest Datadog Spark integration, using 'spark_url' in config (instead of 'resourcemanager_uri')
  • Using Databricks Secrets to store and retrieve the datadog-api-key
  • Works with SingleNode-clusters
  • Update datadog-spark integration (when needed), to support Structured Streaming metrics!
  • Improved logging of these steps.

Uploading the init script:

  • You can use the databricks cli dbfs cp datadog-install-agent.sh dbfs:/init-scripts/datadog-install-agent.sh
  • Or you can use a Databricks notebook
    dbutils.fs.put("/init-scripts/datadog-install-agent.sh", '''
    
    // copy & paste the gist init script
    
    ''', overwrite=True)

summary of the steps:

  • Step 0: Check presence of variable the DATADOG_API_KEY, because its required!
  • Step 1: Install the Datadog agent using official remote install script. This requires access to 'internet'.
  • Step 2: Check cluster type Normal or SingleNode and collect spark port. Note: /tmp/master-params is not present on SingleNode-cluster.
  • Step 3: Write spark config: /etc/datadog-agent/conf.d/spark.yaml
  • Step 4: Update datadog-spark plugin when version is below 1.19.1
  • Step 5: Restart datadog agent

3. Configure cluster

The secret and init script should be configured at the cluster executing the structured streaming job.

Spark Config:

spark.app.name MyStreamingJob
spark.sql.streaming.metricsEnabled true

Environment Variables:

DD_AGENT_MAJOR_VERSION=7
DATADOG_API_KEY={{secrets/application/datadog-api-key}}

Here we defined agent version 7, which is the same as 6, but based Python version 3. The DATADOG_API_KEY variable will be filled at runtime with the secret datadog-api-key from the scope application. This link trick is in public-preview at the moment. In the Databricks UI it looks like:

cluster_advanced_options1

And most important the init-script. Configure init script by pointing to the correct DBFS destination:dbfs:/init-scripts/datadog-install-agent.sh. Don't forget to click the add button here.

cluster_advanced_options2

4. Start Structured Streaming job

Here we are using a 'rate'-stream, nice for demonstration purposes.

from pyspark.sql import functions as F

rate_df = spark.readStream.format("rate").load()

query1 = (
  rate_df
  # Average value, per minute
  .groupBy(
    F.window(F.col('timestamp'), "1 minute", "30 seconds"),
  ).avg('value')
  # Write to (driver) console
  .writeStream
  .queryName("dummy_agg")
  .outputMode("complete")
  .format("console")
  .start()
);

Optional: add some sleep timeout for job-clusters to send last metrics before shutdown

import subprocess, time

query1.awaitTermination()

if subprocess.run(["systemctl", "is-active", "datadog-agent"]).returncode == 0:
  print("Datadog agent running, waiting 15 seconds to propagate metrics...")
  time.sleep(15)

Metrics Explorer in DataDog:

The metrics are send every 15 seconds by default, so they should popup very soon in Datadog. The result:

datadoc-spark

5. Troubleshooting

  • Is the startup script used? %sh ls /tmp/: list local file system, and check for start_datadog.sh
  • Is the Datadog agent installed correctly? %sh cat /tmp/start_datadog.log: View installation logs, not only the driver should have the agent installed and running. %sh cat /etc/datadog-agent/conf.d/spark.yaml: The datadog spark configuration file should be filled. %sh systemctl status datadog-agent: Status of the agent and last lines of the /var/log/datadog/agent.log You can change the config and restart the agent %sh systemctl restart datadog-agent:
        dbutils.fs.put("file:////etc/datadog-agent/conf.d/spark.yaml", """
        init_config:
        instances:
            - spark_url: http://SOME-IP:SOME-PORT
              spark_cluster_mode: spark_driver_mode
              cluster_name: test-init-script
        """, overwrite=True)
  • Is datadog-spark collecting metrics? %sh datadog-agent check spark: View the collection process and plugin details, since version 1.19.0 the Structured Streaming metrics are implemented
  • Which Metrics are collected? We are using the plugin datadog-spark, have a look at the code, but at the moment: input_rate, processing_rate, rows_count and used_bytes.

Explore related posts