Blog
Echte verteilte Bildverarbeitung mit Apache Spark

Bildverarbeitung mit Apache Spark
Wie können Sie Bilder in Apache Spark effizient verarbeiten?
Wenn Sie die Databricks-Dokumentation lesen, werden Sie kaum glauben, dass die meisten Vorverarbeitungen außerhalb des Apache Spark-Ökosystems durchgeführt werden müssen.
Zum Beispiel:
- Bei der Modellinferenz mit keras lernen Sie, die Dateien mit einfachem Python in den Speicher zu lesen, bevor Sie einen Pandas Dataframe erstellen, um die Bilddaten in eine Parquet-Datei zu schreiben.
- Die Modellinferenz mit pytorch lehrt Sie einen etwas anderen Weg, indem Sie einfaches Python verwenden, um die Dateipfade zu erhalten und diese Pfade in einen Spark-Datenrahmen zu setzen.
Effizient sein
Diese Ansätze sind nicht wirklich verteilt, aber gibt es einen besseren Weg?
In diesem Blog zeige ich Ihnen, wie Sie die eingebaute Bilddatenquelle verwenden können.
Mit Hilfe dieser Datenquelle wird Apache Spark die Bilder in einer wirklich verteilten Weise verarbeiten.[1]
Erste Schritte
Einrichten der Umgebung
python -m venv /path/to/spark-image-processing
source /path/to/spark-image-processing/bin/activate
pip install pyspark pillow pandas pyarrow tensorflow jupyterlab
Vorbereiten einiger Bilddaten
Ich habe mich für die imagenette2.tgz entschieden, die auf AWS fast ai imageclas.
Aus diesen über 13000 Bildern habe ich 75 zufällig ausgewählt:
find ./imagenette2 -maxdepth 4 -type f |
sort -R |
head -75 |
xargs -I{} cp {} ./data/images/mixed
Lesen der Bilddaten in einen Spark-Datenframe
Starten Sie eine Pyspark-Sitzung
pyspark --master "local[2]" --conf spark.executor.memory=4G --conf spark.driver.memory=2G
Importe
from typing import Iterator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType, BinaryType
from PIL import Image, ImageDraw
from tensorflow.keras.applications.resnet50 import ResNet50
import tensorflow as tf
import numpy as np
import pandas as pd
Lesen Sie den Bildordner
images_dir = "./data/images/mixed/"
image_df = spark.read.format("image").load(images_dir).filter("image.nChannels > 2 AND image.height < 1000")
image_df.select("image.origin", "image.height", "image.width", "image.mode", "image.nChannels").show(5, truncate=False)
Visualisieren Sie eines der Bilder
image_row = 40
spark_single_img = image_df.select("image").collect()[image_row]
(spark_single_img.image.origin, spark_single_img.image.mode, spark_single_img.image.nChannels )
mode = 'RGBA' if (spark_single_img.image.nChannels == 4) else 'RGB'
Image.frombytes(mode=mode, data=bytes(spark_single_img.image.data), size=[spark_single_img.image.width,spark_single_img.image.height]).show()
Wie Sie sehen können, hat das Bild einen leichten Blaustich, der nicht richtig erscheint.
Warum das zusätzliche Blau im Artefakt? Die Bilddatenquelle verwendet
Wie kann man das beheben?
Konvertieren Sie die Bildebenen
def convert_bgr_array_to_rgb_array(img_array):
B, G, R = img_array.T
return np.array((R, G, B)).T
img = Image.frombytes(mode=mode, data=bytes(spark_single_img.image.data), size=[spark_single_img.image.width,spark_single_img.image.height])
converted_img_array = convert_bgr_array_to_rgb_array(np.asarray(img))
Image.fromarray(converted_img_array).show()
Besser!
Alle Bilder in Spark konvertieren
schema = StructType(image_df.select("image.*").schema.fields + [
StructField("data_as_resized_array", ArrayType(IntegerType()), True),
StructField("data_as_array", ArrayType(IntegerType()), True)
])
def resize_img(img_data, resize=True):
mode = 'RGBA' if (img_data.nChannels == 4) else 'RGB'
img = Image.frombytes(mode=mode, data=img_data.data, size=[img_data.width, img_data.height])
img = img.convert('RGB') if (mode == 'RGBA') else img
img = img.resize([224, 224], resample=Image.Resampling.BICUBIC) if (resize) else img
arr = convert_bgr_array_to_rgb_array(np.asarray(img))
arr = arr.reshape([224*224*3]) if (resize) else arr.reshape([img_data.width*img_data.height*3])
return arr
def resize_image_udf(dataframe_batch_iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for dataframe_batch in dataframe_batch_iterator:
dataframe_batch["data_as_resized_array"] = dataframe_batch.apply(resize_img, args=(True,), axis=1)
dataframe_batch["data_as_array"] = dataframe_batch.apply(resize_img, args=(False,), axis=1)
yield dataframe_batch
resized_df = image_df.select("image.*").mapInPandas(resize_image_udf, schema)
Wir können prüfen, ob die Daten ein konvertiertes und ein verkleinertes Bild enthalten:
row = resized_df.collect()[image_row]
Image.frombytes(mode='RGB', data=bytes(row.data_as_array), size=[row.width,row.height]).show()
Image.frombytes(mode='RGB', data=bytes(row.data_as_resized_array), size=[224,224]).show()
Vorhersage mit dem ResNet50 Modell
Jetzt haben wir ein verkleinertes Bild, das sich als Eingabe für das Klassifizierungsmodell resnet50 eignet.
Um ein Vorhersagemodell zu erstellen, können wir verwenden:
def normalize_array(arr):
return tf.keras.applications.resnet50.preprocess_input(arr.reshape([224,224,3]))
@pandas_udf(ArrayType(FloatType()))
def predict_batch_udf(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
model = ResNet50()
for input_array in iterator:
normalized_input = np.stack(input_array.map(normalize_array))
preds = model.predict(normalized_input)
yield pd.Series(list(preds))
predicted_df = resized_df.withColumn("predictions", predict_batch_udf("data_as_resized_array"))
Um die Vorhersage unseres Referenzbildes zu überprüfen
prediction_row = predicted_df.collect()[image_row]
tf.keras.applications.resnet50.decode_predictions(
np.array(prediction_row.predictions).reshape(1,1000), top=5
)
Das ergibt die folgende Ausgabe:
[
[
(
"n03417042",
"garbage_truck",
0.9919044375419617
),
(
"n02701002",
"ambulance",
0.004018119070678949
),
(
"n03769881",
"minibus",
0.0015384092694148421
),
(
"n03770679",
"minivan",
0.0005491302581503987
),
(
"n03977966",
"police_van",
0.00048319826601073146
)
]
]
Das Modell ist zuversichtlich, dass das Bild einen Müllwagen darstellt - auch wenn es in diesem Beitrag nicht darum geht, ein präzises Modell zu erstellen, sondern vielmehr darum, zu skizzieren, wie man Daten auf verteilte Weise verarbeitet!
Erhalten Sie die Top 5 Vorhersagen für jedes Bild
decoded_predictions_schema = StructType(predicted_df.schema.fields + [
StructField("pred_id", ArrayType(StringType()), False),
StructField("label", ArrayType(StringType()), False),
StructField("score", ArrayType(FloatType()), False)
])
def top5_predictions(preds):
return tf.keras.applications.resnet50.decode_predictions(
np.array(preds).reshape(1,1000), top=5
)
def top5predictions_batch_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for dataframe_batch in iterator:
yield pd.merge(
dataframe_batch,
right=pd.concat([
pd.DataFrame(top5, columns=["pred_id", "label", "score"]).aggregate(lambda x: [x.tolist()], axis=0) for [top5] in dataframe_batch.predictions.map(top5_predictions)
]).reset_index(drop=True),
left_index=True,
right_index=True
)
top5_predictions_df = predicted_df.mapInPandas(top5predictions_batch_udf, decoded_predictions_schema)
Wir überprüfen die Ergebnisse an unserem Referenzbild
top5_prediction = top5_predictions_df.collect()[image_row]
top5_prediction.label[0] # gives garbage_truck
Zur Veranschaulichung können wir die Beschriftung zu den Bildern hinzufügen
def show_image_with_label(image, label):
draw = ImageDraw.Draw(image)
draw.text((10, 10), label, fill="red")
image.show()
show_image_with_label(Image.frombytes(mode='RGB', data=bytes(top5_prediction.data_as_array), size=[top5_prediction.width,top5_prediction.height]), top5_prediction.label[0])
Hier ist unser Bild mit der vorhergesagten Beschriftung:
.
Wir können sie alle mit zeigen:
for row in top5_predictions_df.collect():
show_image_with_label(Image.frombytes(mode='RGB', data=bytes(row.data_as_resized_array), size=[224,224]), row.label[0])
Alle Befehle können in Ihre Spark-Shell kopiert/eingefügt werden. Für einen leichteren Zugang habe ich ein Notizbuch mit demselben Code erstellt, um damit weiter zu experimentieren.
Das war's für heute! Nicht vergessen;
- Möchten Sie als Ingenieur an der Schnittstelle von Data Science und verteilten Systemen an ähnlichen Problemen arbeiten? Wir stellen ein
- Wenn Sie auf der Suche nach einem Team sind, das Sie bei Ihren Bemühungen unterstützt, haben wir einige der besten Köpfe auf dem Markt. Melden Sie sich!!!
[1]: In der offiziellen Dokumentation zur Bilddatenquelle wird darauf hingewiesen, dass es einige Einschränkungen bei der Verwendung dieses Datenquellentyps gibt. Seien Sie sich dessen bewusst.
Verfasst von
Kris Geusebroek
Unsere Ideen
Weitere Blogs
Contact



