Blog

Überwachung des Verbraucherrückstands in Azure Event Hub

Ruben Oostinga

Ruben Oostinga

Aktualisiert Oktober 20, 2025
5 Minuten

Warum

Die Verzögerung der Verbraucher ist die wichtigste Metrik, die Sie bei der Arbeit mit Ereignisströmen überwachen müssen. Sie ist jedoch nicht als Standard-Metrik in Azure Insights verfügbar. Möchten Sie diese Metrik als Teil Ihrer Überwachungslösung zur Verfügung haben? Sie können sie mit etwas benutzerdefiniertem Code einrichten. In diesem Blog zeigen wir Ihnen, wie.

Was

Die Verzögerung der Verbraucher bezieht sich auf die Anzahl der Ereignisse, die noch von den Verbrauchern eines Streams verarbeitet werden müssen. Die meiste Zeit über ist die Verzögerung 0, da jedes Ereignis sofort verarbeitet wird. Es gibt jedoch einige Ereignisse, die diese Zahl in die Höhe treiben können. Wenn ein Verbraucher auf Fehler stößt, z. B. ein funktionelles Problem, das durch ein Ereignis verursacht wurde, oder ein technisches Problem wie eine Netzwerkverbindung, wird er die Verarbeitung von Ereignissen stoppen und die Verzögerung der Verbraucher erhöhen.

Die Verzögerung wird auch größer, wenn Ereignisse schneller veröffentlicht werden, als der Verbraucher sie verarbeiten kann. In diesem Fall löst sich das Problem von selbst, wenn Ereignisse mit einer niedrigeren Rate veröffentlicht werden und der Verbraucher wieder aufholt.

Sie können einen Alarm auslösen, wenn der Verbraucherrückstand über einen längeren Zeitraum, z.B. 10 Minuten, 0 überschreitet. Welche Konfiguration des Alarmauslösers für Sie die beste ist, hängt von Ihrer Situation ab. Bevor wir mit der Lösung fortfahren, sollten wir einige Begriffe klären:

Definitionen

  • Verbrauchergruppen ermöglichen es mehreren Verbrauchern, denselben Ereignisstrom zu abonnieren. In der Regel besteht eine Verbrauchergruppe aus mehreren Instanzen derselben Anwendung, die für hohe Verfügbarkeit und horizontale Skalierung genutzt werden kann.
  • Partitionen ermöglichen die parallele Verarbeitung von Ereignissen. Alle Ereignisse innerhalb einer Partition haben eine feste Reihenfolge. Ereignisse in verschiedenen Partitionen können in einer anderen Reihenfolge empfangen werden, da sie parallel verarbeitet werden. Eine konsumierende Anwendung kann mehrere Instanzen haben, die jeweils von mehreren Partitionen lesen können.
  • Ein Namensraum ist eine Sammlung von Ereignis-Hubs/Themen, die gemeinsam verwaltet werden können.
  • Checkpoints zeichnet die Sequenznummer des letzten konsumierten Ereignisses auf. Dieser Wert wird verwendet, um sicherzustellen, dass bei einem Neustart nur die Ereignisse erneut gesendet werden, die noch nicht konsumiert wurden. Normalerweise werden Checkpoints als Datei in BlobStorage gespeichert.

Wie

Das Azure SDK kann die Sequenznummer des letzten in der Warteschlange stehenden Ereignisses einer Partition abrufen. Mit dem CheckpointStore können Sie die Sequenznummer des Checkpoints abrufen. Da beides einfache Zähler sind, können Sie die Differenz berechnen und diese als benutzerdefinierte Metrik in Azure Insights veröffentlichen. Damit es sich um eine überwachbare Metrik handelt, müssen Sie die Metrik in regelmäßigen Abständen, z.B. jede Minute, erfassen.

Es gibt zwei Möglichkeiten, die Verzögerungsmetrik zu erfassen:

  • Wenn Sie die Verbraucheranwendung verwenden, erhalten Sie die Event Hub-Anmeldeinformationen, den Namespace und die Verbrauchergruppe. Wenn jedoch etwas schief geht und die konsumierende Anwendung heruntergefahren wird, sehen Sie nicht mehr, ob die Verzögerung der Verbraucher zunimmt, da diese Informationen nicht mehr gesammelt werden. Verwenden Sie einen separaten Prozess für die Überwachung, um dies zu verhindern.
  • Oder alarmieren Sie die Anwendung, wenn der Gesundheitscheck fehlschlägt oder die Verzögerungsmetrik des Verbrauchers nicht vorhanden ist. Die folgenden Codebeispiele sind der Übersichtlichkeit halber in Typescript geschrieben. Der gleiche Ansatz kann jedoch auch mit den anderen Event Hub SDKs verwendet werden, z. B. für C#, Java, Python und Go.

Sammeln des Verbraucherrückstands

// initialize checkpointStore and eventHubClient
const consumerGroup = 'my consumer group';
const checkpointStore = ...
const eventHubClient = ...
// Send the consumer lag every minute
setInterval(async () => {
  try {
    await measureConsumerLag(consumerGroup, eventHubClient, checkpointStore);
  } catch (error) {
    logger.error(error, 'The Event Hub Consumer Lag could not be sent to Application Insights');
  }
}, 60000);
export async function measureConsumerLag(
  consumerGroup: string,
  eventHubClient: EventHubConsumerClient,
  checkpointStore: BlobCheckpointStore
): Promise<void> {
  const partitionIds = await eventHubClient.getPartitionIds();
  // Should return either 0 or 1 checkpoint per partition
  const checkpoints = await checkpointStore.listCheckpoints(
    eventHubClient.fullyQualifiedNamespace,
    eventHubClient.eventHubName,
    consumerGroup
  );
  const checkpointSequenceNumberByPartitionId = Object.fromEntries(
    checkpoints.map(({ partitionId, sequenceNumber }): [string, number] => [partitionId, sequenceNumber])
  );
  await Promise.all(
    partitionIds.map(async partitionId => {
      const lastKnownSequenceNumber = checkpointSequenceNumberByPartitionId[partitionId] ?? 0;
      const { lastEnqueuedSequenceNumber } = await eventHubClient.getPartitionProperties(partitionId);
      const consumerLageMetric = {
        eventHubName: eventHubClient.eventHubName,
        consumerGroup,
        partitionId,
        namespace: eventHubClient.fullyQualifiedNamespace.split('.')[0],
        // The consumerLag calculation
        consumerLag: lastEnqueuedSequenceNumber - lastKnownSequenceNumber,
      };
      await trackEventHubConsumerLag(consumerLageMetric);
    })
  );
}

Senden der benutzerdefinierten Metrik

import { defaultClient as appInsightsClient } from 'applicationinsights';
export interface ConsumerLagMetric {
  eventHubName: string;
  consumerGroup: string;
  partitionId: string;
  namespace: string;
  consumerLag: number;
}
export async function trackEventHubConsumerLag({
  eventHubName,
  namespace,
  consumerGroup,
  partitionId,
  consumerLag,
}: ConsumerLagMetric): Promise<void> {
  await trackMetric({
    name: 'Event Hub Consumer Lag',
    value: consumerLag,
    // Format property keys with a space, for readability in the Application Insights metrics dashboard
    properties: {
      'Event Hub': eventHubName,
      'Partition Id': partitionId,
      'Consumer Group': consumerGroup,
      Namespace: namespace,
    },
  });
}

Anzeigen der benutzerdefinierten Metrik

In der Application Insights-Konsole finden Sie Ihre benutzerdefinierte Metrik. Teilen Sie das Diagramm nach "Consumer Group" auf, die eine Anwendung darstellt. Je nach Zoomstufe zeigt das Diagramm mehrere Messungen pro Datenpunkt an. Verwenden Sie die Aggregation "Max", um die beste Linie zu erhalten.

Dieses Diagramm zeigt 3 Microservices, von denen 1 Service bei der Verarbeitung eines Ereignisses festhängt. Jedes Mal, wenn neue Ereignisse veröffentlicht werden, nimmt die Verzögerung beim Verbraucher zu. Die Ereignisse werden in Schüben veröffentlicht, so dass die Verzögerung der Verbraucher in bestimmten Schritten zunimmt.

Ein Diagramm mit einer ansteigenden Linie

Wenn ein Problem gelöst ist, sinkt der Rückstand der Verbraucher schnell.

Ein Diagramm mit einer Linie, die schnell ein Plateau erreicht und dann abfällt

Fazit

Die Verzögerung der Verbraucher zeigt schnell jedes funktionale oder technische Problem mit Ihrem Ereignisstrom. Wenn Sie die Codebeispiele aus diesem Blogpost verwenden, müssen Sie sich nicht selbst mit den SDKs beschäftigen. Natürlich können Sie die Metriksammlung so anpassen, dass die Metrik an die Protokolle oder an ein anderes Metriksystem wie Prometheus, Datadog oder Open Telemetry gesendet wird.

Nach der Erfassung der Metrik besteht der nächste Schritt darin, metrikbasierte Warnmeldungen zu erstellen, um sicherzustellen, dass Sie die Probleme erkennen, bevor Ihr Kunde sie erkennt!

Verfasst von

Ruben Oostinga

Contact

Let’s discuss how we can support your journey.