Blog

Echte verteilte Bildverarbeitung mit Apache Spark

Kris Geusebroek

Aktualisiert Oktober 16, 2025
5 Minuten

Bildverarbeitung mit Apache Spark

Wie können Sie Bilder in Apache Spark effizient verarbeiten?

Wenn Sie die Databricks-Dokumentation lesen, werden Sie kaum glauben, dass die meisten Vorverarbeitungen außerhalb des Apache Spark-Ökosystems durchgeführt werden müssen.

Zum Beispiel:

  • Bei der Modellinferenz mit keras lernen Sie, die Dateien mit einfachem Python in den Speicher zu lesen, bevor Sie einen Pandas Dataframe erstellen, um die Bilddaten in eine Parquet-Datei zu schreiben.
  • Die Modellinferenz mit pytorch lehrt Sie einen etwas anderen Weg, indem Sie einfaches Python verwenden, um die Dateipfade zu erhalten und diese Pfade in einen Spark-Datenrahmen zu setzen.

Effizient sein

Diese Ansätze sind nicht wirklich verteilt, aber gibt es einen besseren Weg?

In diesem Blog zeige ich Ihnen, wie Sie die eingebaute Bilddatenquelle verwenden können.

Mit Hilfe dieser Datenquelle wird Apache Spark die Bilder in einer wirklich verteilten Weise verarbeiten.[1]

Erste Schritte

Einrichten der Umgebung

python -m venv /path/to/spark-image-processing
source /path/to/spark-image-processing/bin/activate
pip install pyspark pillow pandas pyarrow tensorflow jupyterlab

Vorbereiten einiger Bilddaten

Ich habe mich für die imagenette2.tgz entschieden, die auf AWS fast ai imageclas.

Aus diesen über 13000 Bildern habe ich 75 zufällig ausgewählt:

find ./imagenette2 -maxdepth 4 -type f | 
    sort -R | 
    head -75 | 
    xargs -I{} cp {} ./data/images/mixed

Lesen der Bilddaten in einen Spark-Datenframe

Starten Sie eine Pyspark-Sitzung

pyspark --master "local[2]" --conf spark.executor.memory=4G --conf spark.driver.memory=2G

Importe

from typing import Iterator

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType, BinaryType

from PIL import Image, ImageDraw
from tensorflow.keras.applications.resnet50 import ResNet50

import tensorflow as tf
import numpy as np
import pandas as pd

Lesen Sie den Bildordner

images_dir = "./data/images/mixed/"
image_df = spark.read.format("image").load(images_dir).filter("image.nChannels > 2 AND image.height < 1000")
image_df.select("image.origin", "image.height", "image.width", "image.mode", "image.nChannels").show(5, truncate=False)

Visualisieren Sie eines der Bilder

image_row = 40
spark_single_img = image_df.select("image").collect()[image_row]
(spark_single_img.image.origin, spark_single_img.image.mode, spark_single_img.image.nChannels )

mode = 'RGBA' if (spark_single_img.image.nChannels == 4) else 'RGB' 
Image.frombytes(mode=mode, data=bytes(spark_single_img.image.data), size=[spark_single_img.image.width,spark_single_img.image.height]).show()

Wie Sie sehen können, hat das Bild einen leichten Blaustich, der nicht richtig erscheint.

Funkenbild
Blaues Bild
Originalbild
Original

Warum das zusätzliche Blau im Artefakt? Die Bilddatenquelle verwendet , um die Daten zu lesen. Die Bibliothek erwartet die Eingabe als BGRA (Blau, Grün, Rot, Alpha) anstelle von RGB (Rot, Grün, Blau).

Wie kann man das beheben?

Konvertieren Sie die Bildebenen

def convert_bgr_array_to_rgb_array(img_array):
    B, G, R = img_array.T
    return np.array((R, G, B)).T

img = Image.frombytes(mode=mode, data=bytes(spark_single_img.image.data), size=[spark_single_img.image.width,spark_single_img.image.height])

converted_img_array = convert_bgr_array_to_rgb_array(np.asarray(img))
Image.fromarray(converted_img_array).show()

Besser!

Alle Bilder in Spark konvertieren

schema = StructType(image_df.select("image.*").schema.fields + [
    StructField("data_as_resized_array", ArrayType(IntegerType()), True),
    StructField("data_as_array", ArrayType(IntegerType()), True)
])

def resize_img(img_data, resize=True):
    mode = 'RGBA' if (img_data.nChannels == 4) else 'RGB' 
    img = Image.frombytes(mode=mode, data=img_data.data, size=[img_data.width, img_data.height])
    img = img.convert('RGB') if (mode == 'RGBA') else img
    img = img.resize([224, 224], resample=Image.Resampling.BICUBIC) if (resize) else img
    arr = convert_bgr_array_to_rgb_array(np.asarray(img))
    arr = arr.reshape([224*224*3]) if (resize) else arr.reshape([img_data.width*img_data.height*3])

    return arr

def resize_image_udf(dataframe_batch_iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for dataframe_batch in dataframe_batch_iterator:
        dataframe_batch["data_as_resized_array"] = dataframe_batch.apply(resize_img, args=(True,), axis=1)
        dataframe_batch["data_as_array"] = dataframe_batch.apply(resize_img, args=(False,), axis=1)
        yield dataframe_batch

resized_df = image_df.select("image.*").mapInPandas(resize_image_udf, schema)

Wir können prüfen, ob die Daten ein konvertiertes und ein verkleinertes Bild enthalten:

row = resized_df.collect()[image_row]

Image.frombytes(mode='RGB', data=bytes(row.data_as_array), size=[row.width,row.height]).show()

Image.frombytes(mode='RGB', data=bytes(row.data_as_resized_array), size=[224,224]).show()

Vorhersage mit dem ResNet50 Modell

Jetzt haben wir ein verkleinertes Bild, das sich als Eingabe für das Klassifizierungsmodell resnet50 eignet.

Um ein Vorhersagemodell zu erstellen, können wir verwenden:

def normalize_array(arr):
    return tf.keras.applications.resnet50.preprocess_input(arr.reshape([224,224,3]))

@pandas_udf(ArrayType(FloatType()))
def predict_batch_udf(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    model = ResNet50()
    for input_array in iterator:
        normalized_input = np.stack(input_array.map(normalize_array))
        preds = model.predict(normalized_input)
        yield pd.Series(list(preds))

predicted_df = resized_df.withColumn("predictions", predict_batch_udf("data_as_resized_array"))

Um die Vorhersage unseres Referenzbildes zu überprüfen

prediction_row = predicted_df.collect()[image_row]

tf.keras.applications.resnet50.decode_predictions(
    np.array(prediction_row.predictions).reshape(1,1000), top=5
)

Das ergibt die folgende Ausgabe:

[
  [
    (
      "n03417042",
      "garbage_truck",
      0.9919044375419617
    ),
    (
      "n02701002",
      "ambulance",
      0.004018119070678949
    ),
    (
      "n03769881",
      "minibus",
      0.0015384092694148421
    ),
    (
      "n03770679",
      "minivan",
      0.0005491302581503987
    ),
    (
      "n03977966",
      "police_van",
      0.00048319826601073146
    )
  ]
]

Das Modell ist zuversichtlich, dass das Bild einen Müllwagen darstellt - auch wenn es in diesem Beitrag nicht darum geht, ein präzises Modell zu erstellen, sondern vielmehr darum, zu skizzieren, wie man Daten auf verteilte Weise verarbeitet!

Erhalten Sie die Top 5 Vorhersagen für jedes Bild

decoded_predictions_schema = StructType(predicted_df.schema.fields + [
    StructField("pred_id", ArrayType(StringType()), False),
    StructField("label", ArrayType(StringType()), False),
    StructField("score", ArrayType(FloatType()), False)
])

def top5_predictions(preds):
    return tf.keras.applications.resnet50.decode_predictions(
        np.array(preds).reshape(1,1000), top=5
    )

def top5predictions_batch_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for dataframe_batch in iterator:
        yield pd.merge(
            dataframe_batch, 
            right=pd.concat([
                pd.DataFrame(top5, columns=["pred_id", "label", "score"]).aggregate(lambda x: [x.tolist()], axis=0) for [top5] in dataframe_batch.predictions.map(top5_predictions)
                ]).reset_index(drop=True),
            left_index=True,
            right_index=True
        )

top5_predictions_df = predicted_df.mapInPandas(top5predictions_batch_udf, decoded_predictions_schema)

Wir überprüfen die Ergebnisse an unserem Referenzbild

top5_prediction = top5_predictions_df.collect()[image_row]
top5_prediction.label[0]  # gives garbage_truck

Zur Veranschaulichung können wir die Beschriftung zu den Bildern hinzufügen

def show_image_with_label(image, label):
    draw = ImageDraw.Draw(image)
    draw.text((10, 10), label, fill="red")
    image.show()

show_image_with_label(Image.frombytes(mode='RGB', data=bytes(top5_prediction.data_as_array), size=[top5_prediction.width,top5_prediction.height]), top5_prediction.label[0])

Hier ist unser Bild mit der vorhergesagten Beschriftung:

Vorhergesagtes Bild.

Wir können sie alle mit zeigen:

for row in top5_predictions_df.collect():
    show_image_with_label(Image.frombytes(mode='RGB', data=bytes(row.data_as_resized_array), size=[224,224]), row.label[0])

Alle Befehle können in Ihre Spark-Shell kopiert/eingefügt werden. Für einen leichteren Zugang habe ich ein Notizbuch mit demselben Code erstellt, um damit weiter zu experimentieren.


Das war's für heute! Nicht vergessen;

  • Möchten Sie als Ingenieur an der Schnittstelle von Data Science und verteilten Systemen an ähnlichen Problemen arbeiten? Wir stellen ein
  • Wenn Sie auf der Suche nach einem Team sind, das Sie bei Ihren Bemühungen unterstützt, haben wir einige der besten Köpfe auf dem Markt. Melden Sie sich!!!

[1]: In der offiziellen Dokumentation zur Bilddatenquelle wird darauf hingewiesen, dass es einige Einschränkungen bei der Verwendung dieses Datenquellentyps gibt. Seien Sie sich dessen bewusst.

Verfasst von

Kris Geusebroek

Contact

Let’s discuss how we can support your journey.