Blog

Zentralisierte Überwachung für Datenpipelines: Die Kombination von Azure Data Factory Diagnostics mit Databricks Systemtabellen

Rik Adegeest

Aktualisiert Oktober 13, 2025
7 Minuten

Einführung

Die Überwachung von Datenpipelines ist unerlässlich, um die Zuverlässigkeit sicherzustellen, Probleme zu beheben und die Leistung im Laufe der Zeit zu verfolgen. Bei Xebia implementieren wir häufig eine Datenplattformarchitektur, die wie folgt aussieht:

Architektur der Datenplattform

In diesem Setup ist Azure Data Factory (ADF) für die Aufnahme von Daten aus einer Vielzahl von Quellen in ein Speicherkonto zuständig, während Databricks sich um die Umwandlung dieser Daten und ihre Bereitstellung für Endbenutzer kümmert. Durch diese klare Trennung kann jedes Tool seine Stärken ausspielen: ADF zeichnet sich durch Konnektivität und Ingestion aus, während Databricks bei der Transformation und Analyse leistungsstark ist.

Trotz dieser Vorteile ist die Überwachung oft fragmentiert. ADF ermöglicht die Überwachung einzelner Pipeline-Läufe, und Databricks bietet Workflow-Analysen über Systemtabellen, aber es gibt keinen einzigen Ort, an dem Sie eine End-to-End-Ansicht des Datenflusses erhalten. Das macht es schwierig, den gesamten Weg der Daten zu verfolgen und Probleme schnell zu erkennen.

Dieser Artikel zeigt, wie Sie diese Lücke schließen können, indem Sie ADF-Diagnoseeinstellungen mit Databricks-Systemtabellen kombinieren. Das folgende Beispiel zeigt, wie Sie eine zentrale Übersicht erstellen, um die Menge der eingelesenen Daten und die End-to-End-Laufzeit für einen bestimmten Anwendungsfall zu analysieren. Das Setup kann jedoch leicht an Ihren spezifischen Informationsbedarf angepasst werden.

Voraussetzungen

Stellen Sie sicher, dass Sie alles haben, was Sie brauchen:

  • Eine Azure Data Factory-Instanz mit laufenden Pipelines
  • Ein Speicherkonto
  • Ein Databricks-Arbeitsbereich, der mit demselben Speicherkonto verbunden ist
  • Databricks Systemtabellen aktiviert

Azure Data Factory Diagnose-Einstellungen

Die Diagnoseeinstellungen von Azure Data Factory ermöglichen den Export von Pipeline-Laufprotokollen und Metriken in ein Speicherkonto, Log Analytics oder Event Hub.

Der erste Schritt besteht darin, die Diagnoseeinstellungen in ein Speicherkonto zu exportieren, auf das Databricks Zugriff hat. Gehen Sie im Azure Portal zu Ihrer ADF-Instanz, wählen Sie Diagnoseeinstellungen und fügen Sie eine neue Diagnoseeinstellung hinzu. Wählen Sie in den Kategorien die gewünschten Einstellungen aus. Für dieses Beispiel exportieren Sie die Pipeline-Läufe und die Aktivitätsläufe.

Aktivieren der ADF-Diagnoseeinstellungen

Sobald diese Funktion aktiviert ist, schreibt ADF Protokolle in Ihr Speicherkonto. Die Struktur des Protokollordners wird von Azure festgelegt und kann nicht geändert werden. ADF erstellt einen Container pro ausgewählter Kategorie.

Beachten Sie, dass es bis zu 15 Minuten dauern kann, bis die Protokolle erscheinen.

ADF-Protokolle in Databricks einlesen

Um ADF-Protokolle zusammen mit Databricks-Workflow-Daten zu analysieren, nehmen Sie die Protokolle in Databricks auf. Eine Möglichkeit, dies zu tun, ist die Verwendung der Autoloader-Funktion von Databricks. Der Autoloader verfolgt automatisch die unverarbeiteten Protokolle im Speicherkonto und verarbeitet jedes Mal neue Protokolldateien, wenn der Autoloader ausgelöst wird.

Die Schritte, um dies zu erreichen, sind:

  1. Identifizieren Sie den Speicherpfad: Suchen Sie den Container und den Ordner in Ihrem Speicherkonto, in den ADF die Diagnoseprotokolle schreibt.
  2. Konfigurieren Sie Databricks Autoloader: Richten Sie Autoloader ein, um den Speicherpfad für neue Protokolldateien zu überwachen.
  3. Workflow erstellen: Planen Sie den Autoloader in einen Workflow mit der Häufigkeit ein, mit der Sie die Prüftabelle aktualisieren möchten.

Wiederholen Sie diese Schritte für jede Kategorie von Diagnoseeinstellungen, die Sie im vorherigen Schritt ausgewählt haben.

Hier ist ein Beispiel für einen PySpark-Codeausschnitt, um ADF-Protokolle mit Autoloader zu importieren:

# Define path variables
container = "insights-logs-pipelineruns"
storage_account = ""
subscription_id = ""
adf_resource_group = ""
adf_name = ""

# Define path
path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/resourceId=/SUBSCRIPTIONS/{subscription_id}/RESOURCEGROUPS/{adf_resource_group.upper()}/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/{adf_name.upper()}/"
checkpoint_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/_checkpoints/"

# Write logs to etl_audit.adf_pipeline_runs
df = spark.readStream.format("cloudFiles") 
  .option("cloudFiles.format", "json") 
  .option("cloudFiles.schemaLocation", checkpoint_path) 
  .option("cloudFiles.partitionColumns", "") 
  .option("pathGlobFilter", "*.json") 
  .option("cloudFiles.inferSchema", "true") 
  .load(path) 
  .writeStream 
  .option("checkpointLocation", checkpoint_path) 
  .trigger(availableNow=True) 
  .option("mergeSchema", "true") 
  .toTable(f"etl_audit.adf_pipeline_runs")

Nach diesen Schritten fragen Sie die Protokolle von Databricks ab, indem Sie die Daten aus der Tabelle adf_pipeline_runs auswählen.

Kombinieren von ADF Log Kategorien

Da die Protokolle in Databricks verfügbar sind, besteht der nächste Schritt darin, die Informationen aus den Protokollen zu extrahieren. Um zum Beispiel die Menge der pro Anwendungsfall aufgenommenen Daten zu sehen, verwenden Sie die folgende Abfrage.

Beachten Sie, dass diese Abfrage für diesen speziellen Informationsbedarf erstellt wurde, aber Sie können so viele Informationen in die Prüfungstabelle aufnehmen, wie Sie benötigen.

-- Filter ADF pipeline runs for succeeded or failed operations
with adf_pipeline_filtered as (
  select
    runId,
    pipelineName,
    start as startTime,
    end as endTime,
    status,
    get_json_object(properties, '$.Annotations[1]') AS tableName, -- Extract table name from annotations
    get_json_object(properties, '$.Annotations[2]') AS useCase    -- Extract use case from annotations
  from
    etl_audit.adf_pipeline_runs
  where
    operationName ilike '%Succeeded'
    or operationName ilike '%Failed'
)

-- Filter ADF activity runs for succeeded or failed operations
, adf_activity_filtered as (
  select
    pipelineRunId,
    get_json_object(properties, '$.Output.rowsRead') AS rowsRead,     -- Extract rows read
    get_json_object(properties, '$.Output.rowsCopied') AS rowsCopied, -- Extract rows copied
    row_number() over (partition by pipelineRunId order by activityRetryCount desc) as rn -- Only take latest retry of copy activity
  from
    etl_audit.adf_activity_runs
  where
    operationName ilike '%Succeeded'
    or operationName ilike '%Failed'
)

-- Combine pipeline and activity data, joining on run ID and selecting only the latest retry of the an activity per pipeline run
, adf_combined as (
  select
    "ADF" as source,
    pf.useCase,
    pf.pipelineName,
    pf.startTime,
    pf.endTime,
    pf.status,
    af.rowsRead,
    af.rowsCopied,
    pf.tableName
  from
    adf_pipeline_filtered pf
    left join adf_activity_filtered af on pf.runId = af.pipelineRunId
  where
    af.rn = 1
)
Beispiel für ADF-Protokolle

Databricks Systemtabellen hinzufügen

Databricks Systemtabellen liefern detaillierte Informationen über Workflow-Läufe, Job-Status und mehr. Fragen Sie diese Tabellen ab, um die relevanten Details für Ihr Monitoring Dashboard zu extrahieren.

  • Erstellen Sie eine Ansicht zur Auswahl der Schlüsselspalten (z.B. Job-ID, Laufstatus, Start-/Endzeit)
  • Verbinden Sie diese Ansicht mit der Ansicht ADF-Protokolle, um eine zentralisierte, durchgängige Überwachungstabelle zu erstellen
  • Passen Sie die endgültige Ansicht an die Bedürfnisse Ihres Unternehmens an (fügen Sie Spalten, Filter usw. hinzu).

Um einen vollständigen Überblick über die Laufzeit für einen Anwendungsfall zu erhalten, führen Sie die ADF-Audit-Tabelle mit dieser Abfrage mit der Databricks-Systemtabelle zusammen:

Diese Abfrage knüpft an die vorherige Abfrage an. Sie können sie also einfach anhängen, um eine einzige, einheitliche SQL-Anweisung zu erstellen.

-- Select Databricks jobs
, adb_job as (
  select
    *,
    row_number() over (partition by workspace_id, job_id order by change_time desc) as rn -- As this is an SCD2 table, we only take the most recent version of the job.
  from system.lakeflow.jobs
  qualify rn = 1
)

-- Select Databricks job run details
, adb_job_run as (
  select
    job_id,
    period_start_time as startTime,
    period_end_time as endTime,
    result_state as status -- Job run status (e.g., Succeeded, Failed)
  from system.lakeflow.job_run_timeline
)

-- Combine job and run info, align columns with ADF data
, adb_combined as (
  select
    "DATABRICKS" as source,
    coalesce(job.tags.use_case, null) as useCase,
    job.name as pipelineName,
    run.startTime,
    run.endTime,
    run.status,
    null as rowsRead,    -- Not applicable for Databricks jobs
    null as rowsCopied,  -- Not applicable for Databricks jobs
    null as tableName    -- Not applicable for Databricks jobs
  from adb_job job
  left join adb_job_run run on job.job_id = run.job_id
)

-- Union ADF and Databricks results for a centralized view
, combined as (
  select * from adf_combined
  union all
  select * from adb_combined
)

-- Final selection, ordered by most recent runs
select *
from combined
order by startTime desc

So erhalten Sie ein Ergebnis, in dem Sie die aufgenommenen Daten und die End-to-End-Laufzeit des Anwendungsfalls leicht erkennen können.

Beispiel für ADF-Protokolle

Zusammenfassung

In diesem Artikel wird eine Lösung gezeigt, wie Sie die ADF-Diagnoseeinstellungen mit Databricks Systemtabellen zu einer zentralisierten Audit-Tabelle kombinieren können, mit der Sie Ihre Datenpipelines an einem einzigen Ort überwachen und analysieren können.

Mit dieser Einrichtung können Sie eine Vielzahl von Fragen beantworten und viele Szenarien unterstützen, wie z.B.:

  • Analysieren Sie Veränderungen im Laufe der Zeit in den geladenen Zeilen für eine bestimmte Tabelle oder einen Anwendungsfall
  • Verfolgen Sie die Gesamtlaufzeiten und Leistungstrends ganzer Datenpipelines
  • Identifizieren Sie Engpässe oder Ausfälle in der gesamten Pipeline, von der Aufnahme bis zur Umwandlung
  • Korrelieren Sie die Schritte der Aufnahme und Umwandlung für eine bessere Fehlerbehebung und Ursachenanalyse.
  • Aktivieren Sie die Berichterstattung auf der Grundlage benutzerdefinierter Metriken oder Schwellenwerte.

Beschränkungen & Tipps

  • Systemtabellen: Stellen Sie sicher, dass Systemtabellen in Ihrem Databricks-Arbeitsbereich aktiviert sind (verfügbar in Premium und höher).
  • Protokollvolumen: ADF-Diagnoseprotokolle können schnell anwachsen. Richten Sie für Ihr Speicherkonto Aufbewahrungsrichtlinien oder eine Lebenszyklusverwaltung ein.
  • Schemaänderungen: Wenn sich die Struktur Ihrer Pipelines oder Arbeitsabläufe ändert, aktualisieren Sie Ihre Ansichten entsprechend, um fehlende Daten zu vermeiden.
  • Infrastruktur als Code: Sie können ADF-Diagnoseeinstellungen über Terraform für automatisierte, wiederholbare Implementierungen einrichten.
  • Datenvisualisierung: Wenn Sie die endgültige Prüftabelle in einem Datenvisualisierungstool (oder sogar in einem Databricks-Dashboard) anzeigen, können auch technisch nicht versierte Beteiligte leicht erkennen, wann eine Aktualisierung ihrer Daten zu erwarten ist.

Verfasst von

Rik Adegeest

Rik is a dedicated Data Engineer with a passion for applying data to solve complex problems and create scalable, reliable, and high-performing solutions. With a strong foundation in programming and a commitment to continuous improvement, Rik thrives on challenging projects that offer opportunities for optimization and innovation.

Contact

Let’s discuss how we can support your journey.