Blog

Verwendung von DynamoDB-Streams für zusätzliche Verarbeitung

Jan Vermeir

Jan Vermeir

Aktualisiert Oktober 16, 2025
5 Minuten

DynamoDB bietet eine Funktion namens Streams. Wir haben Streams ausprobiert, um herauszufinden, ob es möglich ist, nach Aktualisierungen oder Einfügungen eine zusätzliche Verarbeitung durchzuführen. DynamoDB-Streams sind eine vielversprechende Lösung für Fälle, in denen eine Verarbeitung aufgrund von Änderungen erforderlich ist, solange das Timing nicht entscheidend ist.

Jede Tabelle kann einen zugehörigen Stream haben. Änderungen werden in dem Stream veröffentlicht. Diese Änderungen können dann verwendet werden, um Prozesse auszuführen, die eine gewisse Zeit in Anspruch nehmen. In unserem Fall wollten wir einen Datensatz analysieren, sobald ein neuer Datensatz hinzugefügt oder ein bestehender Datensatz geändert wurde. Da diese Analyse einige Zeit in Anspruch nehmen kann, wollten wir nicht, dass sie die Antwortzeiten beeinträchtigt. Streams bieten eine einfache Möglichkeit, diese Art der Verarbeitung durchzuführen.

Der Anwendungsfall war, dass wir jedes Mal, wenn ein Kunde ein Spiel zu Ende gespielt hat, herausfinden wollten, ob der Kunde für dieses Spiel ein Abzeichen erhält. Ein Abzeichen kann etwas sein wie 'Sie haben dieses Spiel 10 Tage in Folge gespielt' oder 'Sie haben dieses Spiel zum ersten Mal gespielt'. Unsere API würde also das Ergebnis eines Spiels akzeptieren und dann ein Ereignis senden, das anzeigt, dass das Spiel abgeschlossen wurde. Dieses Ereignis wird von einem Lambda abgefangen. Die Verarbeitung umfasst das Lesen aller früheren Spiele, um herauszufinden, ob ein neues Abzeichen erworben wurde. Wenn dies der Fall ist, werden die neuen Errungenschaften in einer Tabelle gespeichert und wenn ein Spieler das nächste Mal seine Daten abfragt, wird das Abzeichen auf seiner persönlichen Seite angezeigt.

Um die von DynamoDB angebotenen Streams zu testen, haben wir eine einfache Tabelle namens JansTable erstellt. Nach der Erstellung sieht die Tabelle wie folgt aus

Tabellendefinition

Beachten Sie die Registerkarte 'Exporte und Streams'. Unter 'DynamoDB-Stream-Details' finden Sie eine Schaltfläche 'Aktivieren '.Wenn Sie auf diese Schaltfläche klicken, wird eine neue Seite angezeigt, auf der Sie die Art des Streams auswählen müssen. DynamoDB kann Nachrichten mit nur neuen, nur alten oder sowohl neuen als auch alten Daten in den Stream stellen. Ich habe new and old images gewählt und nun ist der Stream aktiviert.

Stream Details

Um tatsächlich etwas mit den Nachrichten im Stream zu tun, benötigen wir eine Lambda-Funktion, die ausgelöst wird, wenn eine neue Nachricht im Stream eintrifft. Klicken Sie also auf 'Auslöser erstellen' und wählen Sie dann 'Neu erstellen', um eine neue Lambda-Funktion zu erstellen. Wählen Sie im Filterfeld 'DynamoDB-process-stream-python' und dann 'Process updates made to a DDB table'. Wählen Sie im nächsten Dialogfeld einen Funktionsnamen und eine Rolle und wählen Sie den Arn für die Tabelle aus der Liste. Es wird ein Stub-Code zur Verarbeitung von Nachrichten erzeugt. Nach der Erstellung kann dieser Code geändert werden.

Es ist hilfreich, den generierten Code leicht zu ändern, um Ereignisse zu drucken, damit wir herausfinden können, ob der Stream funktioniert. Ich habe diesen Code eingegeben

import json

def lambda_handler(event, context):
  print("Received event: " + json.dumps(event))
  for record in event['Records']:
    print("do something useful")
  return 'Successfully processed {} records.'.format(len(event['Records']))

Dies ist im Grunde die Standardeinstellung, wobei die Druckanweisungen bereinigt wurden. Ich habe auch die JSON-Formatierung entfernt, um sicherzustellen, dass die Nachricht in einer einzigen Zeile erscheint. Wenn Sie die Standardeinstellung akzeptieren, kann es schwierig sein, die Protokolle zu lesen, da der Inhalt der vom Lambda empfangenen Nachrichten über mehrere Zeilen verteilt ist.

Eine wesentliche Änderung, die Sie vornehmen müssen, ist die Festlegung der richtigen Rollen für den Betrieb des Lambdas. Die Richtlinienanweisung sollte wie folgt aussehen:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "dynamodb:DescribeStream",
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:ListStreams"
      ],
      "Resource": [
        "arn:aws:dynamodb:eu-central-1:[yourSubscriptionID]:table/JansTable/stream/2022-10-28T15:51:58.343"
      ],
      "Effect": "Allow"
    }
  ]
}

Ersetzen Sie die Ressource arn durch die arn für Ihren Stream (diese finden Sie im Abschnitt 'DynamoDB Stream Details', in dem der Stream definiert ist).

Jetzt können wir testen, indem wir einen Datensatz in die Tabelle einfügen. Verwenden Sie die Schaltfläche 'Tabellenelemente erkunden', um die Liste der Datensätze aufzurufen und einen neuen Datensatz hinzuzufügen. Der Inhalt ist nicht wichtig, ich habe eingegeben

{
  "JansPartition": {
    "S": "hello world"
  }
}

Die Nachricht sollte in den Protokollen angezeigt werden:

neue und alte Nachricht in den Protokollen

Beachten Sie, dass es sich bei diesem ersten Ereignis um eine Einfügung handelt, d.h. die Nachricht enthält einen Schlüssel 'NewImage' wie diesen:

{
  "NewImage": {
    "JansPartition": {
      "S": "hello world"
    }
  }
}

Aktualisierungen werden mit einem NewImage- und einem OldImage-Schlüssel angezeigt, etwa so:

{
  "NewImage": {
    "JansPartition": {
      "S": "hello world, update"
    },
    "moreData": {
      "S": "hello again"
    }
  },
  "OldImage": {
    "JansPartition": {
      "S": "hello world"
    }
  }
}

Beim Schreiben dieses Blogs ist mir ein kleiner Fehler im Lambda-Code unterlaufen. Dies führte dazu, dass die Verarbeitung mit einer Fehlermeldung fehlschlug, die in den Protokollen angezeigt wird. Die Fehlermeldung erscheint ein paar Mal, in immer kürzeren Abständen. AWS wird 24 Stunden lang versuchen, die Nachricht zuzustellen.

Ich denke, dass Streams eine vielversprechende Lösung für Fälle sind, in denen eine Verarbeitung auf der Grundlage von Änderungen erforderlich ist, solange das Timing nicht entscheidend ist.

Verfasst von

Jan Vermeir

Developing software and infrastructure in teams, doing whatever it takes to get stable, safe and efficient systems in production.

Contact

Let’s discuss how we can support your journey.