Blog
Abrufen von Dateien aus AWS Glue Job Bookmarks

AWS Glue ist ein Service zur Implementierung von ETL-Pipelines, die große Datenmengen verarbeiten können. Glue verfügt über eine Funktion namens Job-Lesezeichen, die festhält, welche Daten bereits verarbeitet wurden und welche Daten noch von einem Job verarbeitet werden müssen. Im Falle eines S3-Buckets als Datenquelle wird beispielsweise eine Liste der verarbeiteten Dateien geführt, um die erneute Verarbeitung unveränderter Daten in einem Bucket zu verhindern. In unserem Fall wollten wir herausfinden, welche Partitionen in S3 mit neuen Dateien aktualisiert wurden, damit wir sicherstellen können, dass wir sie zusammenführen und nicht eine große Menge kleiner S3-Dateien in jeder Partition haben, da dies die Leistung beeinträchtigen würde.
Wie kann man wissen, welche Objekte - nehmen wir das Beispiel S3 - Ihr Glue-Auftrag in einem bestimmten Lauf verarbeitet? Der erste Schritt besteht darin, einen Glue-Auftrag zu erstellen und die Lesezeichenfunktion zu aktivieren. Wenn das Skript job.init aufruft, initialisiert es den Auftrag und ruft den Status der Lesezeichen ab. Es gibt mehrere Elemente, die den "Status" für unseren Glue-Auftrag bilden. Sie sind spezifisch für jede Quelle, Transformation und Senke im Skript.
Beginnen wir mit der Angabe unserer Datenquelle, indem wir einen dynamischen Rahmen mit einem transformation_ctx erstellen,
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
glue_client = boto3.client("glue")
dyf_source_s3 = glueContext.create_dynamic_frame.from_options(
"s3",
{
"paths": ["YOUR_S3_BUCKET_PATH"],
},
format="parquet",
transformation_ctx="datasource", # specify transformation context
)
Die transformation_ctx dient als Schlüssel zum Suchen des Lesezeichenstatus für eine bestimmte Quelle in Ihrem Skript. Indem wir den dynamischen Rahmen erstellen und unseren transformation_ctx einrichten, ermöglichen wir es Glue, den Lesezeichenstatus für diesen speziellen Auftrag zu setzen. Um die Liste der Dateien abrufen zu können, die in diesem speziellen Auftrag verarbeitet werden, benötigen wir einige zusätzliche Informationen.
Zunächst rufen wir unsere Auftragsdefinition anhand des Auftragsnamens ab.
job_name = args['JOB_NAME']
response = glue_client.get_job(JobName=job_name)
Die response enthält detaillierte Informationen über den Glue-Auftrag, darunter auch das temporäre Verzeichnis (TempDir). Das ist es, wonach wir suchen.
temp_dir = response['Job']['DefaultArguments']['--TempDir']
Sie können Ihr Skript auch direkt auf das temporäre Verzeichnis S3 Pfad verweisen lassen, da Sie dieses durch die Übergabe eines Job-Arguments festlegen können. Die Liste der Dateien, nach denen wir suchen, wird unter dem folgenden Pfad gespeichert:
{temp_dir}/partitionlisting/{args['JOB_NAME']}/{job_run_id}
Was uns derzeit noch fehlt, ist der Abruf von job_run_id. Das können wir auf folgende Weise erreichen:
job_run_id = args['JOB_RUN_ID']
Nachdem wir die job_run_id erhalten haben, können wir nun auf die Liste der Dateien zugreifen, die Ihr Auftrag verarbeiten muss:
df = spark.read.json(f"{temp_dir}/partitionlisting/{args['JOB_NAME']}/{job_run_id}/")
Unser kompletter Auftrag wird der folgende sein:
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
glue_client = boto3.client("glue")
dyf_source_s3 = glueContext.create_dynamic_frame.from_options(
"s3",
{
"paths": ["YOUR_S3_BUCKET_PATH"],
},
format="parquet",
transformation_ctx="datasource", # specify transformation context
)
job_name = args["JOB_NAME"]
response = glue_client.get_job(JobName=job_name)
TempDir = response["Job"]["DefaultArguments"]["--TempDir"]
job_run_id = args["JOB_RUN_ID"]
df = spark.read.json(f"{TempDir}/partitionlisting/{args['JOB_NAME']}/{job_run_id}/")
df.show()
job.commit()
Unser Datenrahmen:
+--------------------+--------------------+
| files| path|
+--------------------+--------------------+
|[s3://FILE_PATH1...,| |
|s3://FILE_PATH1....]|s3://S3_BUCKET |
+--------------------+--------------------+
Nachdem Sie diese Informationen erhalten haben, können Sie die Liste der Dateien durchgehen und Ihre Logik anwenden! Viel Spaß beim Lesen!
Verfasst von
Salma Cherif
I am an AWS consulting engineer with a focus on data and modern data architectures on AWS, I help clients with their data platforms from architecting to implementation. I am passionate about helping customers break the technical barriers with their data in an innovative secure manner to scale and modernize their data architecture.
Unsere Ideen
Weitere Blogs
Contact



