Blog
Optimierung der OpenSearch-Ingestion: Gewährleistung von Zuverlässigkeit, Effizienz und Kosteneinsparungen

Das Ingesting von Daten in einen OpenSearch-Cluster sieht einfach aus, wenn Sie die Dokumentation lesen. Die Wahrheit ist, dass es einfach ist, aber es hängt alles davon ab, wie sehr Sie sich für die Daten interessieren, die Sie einlesen. Lassen Sie mich einen Schritt zurückgehen. Warum verwenden wir OpenSearch überhaupt? Mit dem Aufkommen der KI benötigen Sie auch eine Wissensdatenbank. Diese Wissensdatenbanken können in OpenSearch gehostet werden. Um die OpenSearch-Datenbank zu nutzen, müssen Sie sie jedoch auch mit Daten füllen.
Vor kurzem wurde ich mit der Aufgabe konfrontiert, eine Datenbank für GenAI-Zwecke zu erstellen. Wir hatten einen Datensatz, der in OpenSearch geladen werden musste. Amazon OpenSearch Ingestion klingt nach einem Service, der Ihnen dabei helfen kann.

Aufbereitung der Daten
Zunächst einmal liegen die Daten nie in dem Format vor, in dem Sie sie haben möchten. Zunächst müssen wir sie in etwas umwandeln, das wir einlesen können. Für diesen Anwendungsfall habe ich StepFunctions verwendet ( vermeiden Sie jedoch kostspielige Schleifen). Jedes eingehende Dokument löst eine Ausführung aus. In meinem Fall wollte ich in der Lage sein, das Dokument mit semantischer Bedeutung zu durchsuchen. Um dies effizient zu tun, müssen wir die Dokumente in kleinere Teile zerlegen. Auf diese Weise können wir für jeden Chunk Einbettungen erzeugen, die für die semantische Suche verwendet werden können. Sie können eine einzige Datei mit allen Chunks erstellen, und jeder Chunk wird denselben Metadatensatz haben; der einzige Unterschied ist der Chunk-Text und die Chunk-Kennung. Sobald wir diese Datei haben, legen wir sie auf S3 ab, S3 sendet einen S3 Event Trigger an eine SQS-Warteschlange und die OpenSearch Ingestion Pipeline nimmt sie in die OpenSearch-Datenbank auf.
So weit so gut
An dem von mir beschriebenen Ansatz ist nichts auszusetzen. Dies ist ein Beispiel wie aus dem Lehrbuch, das aus der AWS-Dokumentation stammt. Aber eine Ingestion-Pipeline wird Sie auch dann Geld kosten, wenn Sie nichts aufnehmen. Aus diesem Grund habe ich eine Lambda-Funktion entwickelt, die die Pipeline stoppt, wenn sich keine Nachrichten in der Warteschlange befinden. Sie startet die Pipeline auch, wenn sich Nachrichten in der Warteschlange befinden. Dies war aus Kostensicht eine gute Optimierung.
Nach einigen Tests erwarteten wir 10k Dokumente im System, aber wir hatten nur etwa 5k. Wo sind die anderen Dokumente geblieben? Die Dead-Letter-Warteschlangen waren leer, und wir hatten keinerlei Protokollspuren von Fehlern... Es stellte sich heraus, dass alle Nachrichten im Puffer verloren gehen, wenn Sie die Ingestion-Pipeline anhalten.
Danksagungen zur Rettung
Sie können viel darüber streiten, dass beim Anhalten einer Pipeline die Dokumente im Puffer verloren gehen. Aber das ist die Realität, mit der ich zurechtkommen musste. Sie können die dauerhafte Pufferung aktivieren, was zumindest den Datenverlust verhindern würde. Aber Sie müssen immer noch die Pipeline starten, um den Auftrag fortzusetzen. Ich habe mich mehr in Richtung SQS umgesehen. Der Zweck von SQS besteht darin, Nachrichten in eine Warteschlange zu stellen und sicherzustellen, dass sie verarbeitet werden. Nach einigen Nachforschungen bin ich auf zwei Optionen gestoßen, die ich Ihnen gerne vorstellen möchte.
version: '2'
s3_pipeline:
source:
s3:
workers: 10
notification_type: sqs
notification_source: s3
codec:
json: {}
compression: none
acknowledgments: true
records_to_accumulate: 100
on_error: retain_messages
sqs:
queue_url: https://sqs.eu-west-1.amazonaws.com/000000000000/MyQueue
maximum_messages: 10
visibility_timeout: 60s
poll_delay: 0s
visibility_duplication_protection: true
aws:
region: eu-west-1
sts_role_arn: arn:aws:iam::000000000000:role/MyRole
sink:
- opensearch:
serverless: false
hosts:
- https://vpcxxxxxx.eu-west-1.es.amazonaws.com
index: documents-${/index}
bulk_size: 5
max_retries: 3
dlq:
s3:
bucket: MyBucketName
key_path_prefix: dlq/
region: eu-west-1
sts_role_arn: arn:aws:iam::000000000000:role/MyRole
aws:
region: eu-west-1
sts_role_arn: arn:aws:iam::000000000000:role/MyRole
document_id: ${/chunk_id}
actions:
- type: delete
when: /operation == "delete"
- type: update
when: /operation == "update"
- type: index
when: /operation == "index"
Die Konfigurationsoptionen sind acknowledgments und visibility_duplication_protection. Erstere sorgt dafür, dass die Nachricht in der Warteschlange verbleibt, bis sie in die Senke eingelesen wurde, und letztere sorgt dafür, dass die Nachricht so lange im Umlauf bleibt, wie sie sich im Puffer befindet. Mit diesen Optionen wird sichergestellt, dass die SQS-Warteschlange wie vorgesehen verwendet wird. Wenn ein Fehler auftritt, wird die Nachricht erneut in die Warteschlange gestellt und erneut versucht. Nach einer Anzahl von x Versuchen wird sie in die Warteschlange für tote Briefe verschoben. Mit diesen Einstellungen wurde sichergestellt, dass alle 10k Dokumente im Stapel wie erwartet eingelesen wurden.
Die Pipeline würde trotzdem gestoppt werden, sobald die SQS-Warteschlange leer ist. Um dies zu verhindern, habe ich auch die ApproximateNumberOfMessagesNotVisible neben der ApproximateNumberOfMessages Metrik eingefügt.
Umgang mit Aktualisierungen und Löschungen
Ich möchte Ihnen auch mitteilen, wie wir mit Aktualisierungen und Löschungen umgehen. Das Einlesen eines neuen Dokuments ist einfach, aber Aktualisierungen und Löschungen sind etwas anderes. Wenn Sie ein Dokument mit 15 Chunks importiert haben und die aktualisierte Version nur noch 12 hat, müssen Sie 3 Chunks löschen und möglicherweise einige Chunks aktualisieren. Dies kann zum Beispiel der Fall sein, wenn Sie einen Absatz in der Mitte eines Textes löschen.
Zunächst müssen Sie wissen, ob das Dokument bereits aufgenommen wurde. Eine Suchaktion in der OpenSearch-Datenbank kann dies leisten. Diese Suchaktion gibt alle Chunks zurück, die bereits in der Datenbank vorhanden sind. Mit Hilfe einer Logik können Sie bestimmen, welcher Chunk aktualisiert und welcher gelöscht werden muss. Wir fügen drei zusätzliche Metadatenfelder in jedem Chunk hinzu: index, chunk_id, und operation.
Das Feld index bestimmt den Index, in den das Dokument eingefügt wird. Das Feld chunck_id wird zur internen Dokument-ID und das Feld operation bestimmt, ob der Chunk indiziert, aktualisiert oder gelöscht werden muss.
Fazit
Das Einlesen von Daten in OpenSearch klingt einfach, aber die eigentliche Herausforderung liegt in der Gewährleistung von Zuverlässigkeit, Effizienz und Kosteneffizienz. Während der Ansatz aus dem Lehrbuch funktioniert, erfordern reale Szenarien oft Optimierungen - wie das dynamische Anhalten der Ingestion-Pipeline, um Kosten zu sparen, oder die Nutzung von SQS-Acknowledgments, um die Verarbeitung von Nachrichten zu garantieren.
Durch die Umsetzung dieser Anpassungen haben wir einen stabileren Ingestion-Prozess erreicht, der sicherstellt, dass alle Dokumente wie erwartet in OpenSearch landen. Die Kombination aus Step Functions für die Dokumentumwandlung, SQS für eine zuverlässige Warteschlange und Metadaten-Tagging für Aktualisierungen und Löschungen ergab eine skalierbare und wartbare Lösung. Letztendlich geht es beim Ingestion nicht nur darum, Daten in das System zu bringen, sondern auch sicherzustellen, dass die richtigen Daten dort ankommen, dort bleiben und bei Bedarf effizient abgefragt werden können.
Foto von Andrea Piacquadio
Verfasst von

Joris Conijn
Joris is the AWS Practise CTO of the Xebia Cloud service line and has been working with the AWS cloud since 2009 and focussing on building event-driven architectures. While working with the cloud from (almost) the start, he has seen most of the services being launched. Joris strongly believes in automation and infrastructure as code and is open to learning new things and experimenting with them because that is the way to learn and grow.
Unsere Ideen
Weitere Blogs
Contact



