Blog

Lokal entwickeln, global skalieren: Dask auf Kubernetes mit Google Cloud

Diederik Greveling

Aktualisiert Oktober 21, 2025
9 Minuten

Wir glauben, dass die Nutzung der Cloud keine Kompromisse bei Ihrer Produktivität eingehen sollte. In diesem Blogbeitrag zeigen wir Ihnen, wie Sie lokal entwickeln können, während Sie Ihre Dask-Arbeitslast in der Cloud ausführen. Sie genießen den Komfort Ihrer eigenen IDE und der lokalen Versionskontrolle, während die Google Kubernetes Engine (GKE) die schwere Arbeit für Sie erledigt. Sie werden es nicht einmal bemerken.

Dask kombiniert eine umfangreiche Pandas-ähnliche API mit der Skalierbarkeit von Spark. Für Datenwissenschaftler bedeutet dies schnelles Prototyping und Data Crunching, ohne sich Sorgen machen zu müssen, dass die Daten nicht in den Speicher Ihres lokalen Rechners passen. Mit Dask können Sie auch Maschinenbibliotheken wie scikit-learn skalieren, um schwere Trainings- und Kreuzvalidierungsaufgaben zu verteilen.

Unser Setup unterscheidet sich von der offiziellen Variante, bei der Sie auf dem Cluster entwickeln müssen. Das bedeutet, dass Sie eine Online-IDE (wie Jupyter{Hub, Lab}) verwenden und Ihren Code auf dem Cluster klonen. Sie müssen Ihren Cluster immer am Leben erhalten, um den Status Ihres Projekts zu speichern. Unser Setup umgeht dieses Problem.

Die Einrichtung

Schritt 0: Vorbereitungen

Für den Blogbeitrag benötigen Sie sowohl gcloud als auch kubectl. Führen Sie gcloud auth list aus, um zu prüfen, ob Sie mit dem richtigen Konto authentifiziert sind, und verwenden Sie kubectl --version, um sicherzustellen, dass das Befehlszeilentool funktioniert. Für diesen Blog-Beitrag haben wir Kubernetes Version 1.19.4 verwendet.

Diese Anleitung funktioniert am besten, wenn Sie mit einem neuen Google Cloud-Projekt mit Standardnetzwerken beginnen. Sobald Sie Ihr Projekt erstellt haben, stellen Sie sicher, dass Sie Ihr Projekt als Standard in Ihrer gcloud config festlegen:

gcloud config set project <<YOUR PROJECT ID>>

Stellen Sie außerdem sicher, dass die GKE-API aktiviert ist. Sie tun dies über die Konsole oder indem Sie:

gcloud services enable container.googleapis.com

Schritt 1: Erstellen eines Kubernetes-Clusters

Erstellen Sie einen Kubernetes-Cluster wie folgt (vergessen Sie nicht, <<YOUR-IPV4-IP-HERE>> durch Ihre IP-Adresse zu ersetzen):

gcloud container clusters create dask-cluster 
  --create-subnetwork name=dask-cluster 
  --num-nodes 3 
  --region europe-west4 
  --machine-type n1-standard-2 
  --node-locations europe-west4-c 
  --enable-master-authorized-networks 
  --master-authorized-networks <<YOUR-IPV4-IP-HERE>>/32 
  --enable-ip-alias 
  --enable-private-nodes 
  --master-ipv4-cidr 172.16.0.0/28

Dadurch wird ein privater Kubernetes-Cluster mit einem öffentlichen Master-Endpunkt erstellt. Damit können Sie sich mit dem Kubernetes-Master verbinden und kubectl verwenden, um mit dem Cluster zu interagieren.

Hinweis: Sie können Ihren Cluster noch sicherer machen, indem Sie den Client-Zugriff auf den öffentlichen Endpunkt deaktivieren. Für das Ziel dieses Blogbeitrags lassen wir diese Schritte jedoch aus.

Nachdem der Cluster erstellt wurde, ziehen Sie die Anmeldedaten des Clusters, damit kubectl eine Verbindung zu ihm herstellen kann:

gcloud container clusters get-credentials dask-cluster 
  --zone europe-west4

Wenn alles gut gegangen ist, können Sie die Knoten in Ihrem Kubernetes-Cluster auflisten:

kubectl get nodes

Das ergibt in etwa folgendes Bild:

NAME                   STATUS   ROLES    AGE     VERSION
gke-dask-cluster-...   Ready    <none>   2m12s   v1.16.15-gke.4300
gke-dask-cluster-...   Ready    <none>   2m15s   v1.16.15-gke.4300
gke-dask-cluster-...   Ready    <none>   28m     v1.16.15-gke.4300    

Schritt 2: Geben Sie Ihrem Cluster Zugang zum Internet

Da Sie einen privaten Kubernetes-Cluster betreiben, sind die Nodes nicht mit dem Internet verbunden. Glücklicherweise ist es in der Google Cloud extrem einfach, ein Cloud NAT einzurichten, um den Internetzugang in unserer privaten Cloud zu ermöglichen. Sie erstellen einen Cloud Router:

gcloud compute routers create nat-router 
  --network default 
  --region europe-west4

und erstellen Sie dann eine Cloud NAT:

gcloud compute routers nats create nat-config 
  --router-region europe-west4 
  --router nat-router 
  --nat-all-subnet-ip-ranges 
  --auto-allocate-nat-external-ips

Schritt 3: Richten Sie Ihre lokale Umgebung ein

Erstellen Sie auf Ihrem lokalen Gerät eine virtuelle Umgebung mit Python 3.8.0 und installieren Sie die folgenden Abhängigkeiten

pip install dask'[complete]'==2020.12.0 
  dask-kubernetes==0.11.0 
  jupyter==1.0.0 
  numpy==1.18.1 
  s3fs==0.5.2

Der Scheduler für Dask läuft per Fernzugriff auf GKE. Um seine Adresse einer lokalen IP-Adresse zuzuordnen, fügen Sie /etc/hosts hinzu, indem Sie den folgenden Befehl ausführen:

echo '127.0.0.1 dask-cluster.default' | sudo tee -a /etc/hosts

Dadurch wird sichergestellt, dass Sie die Adresse des Clusters zu localhost auflösen können (obwohl der Cluster eigentlich auf GKE läuft, werden wir die Verbindung über einen Tunnel verfügbar machen).

Sie sind nun bereit, Dask auf GKE von Ihrem lokalen Gerät aus auszuführen.

Dask ausführen

Nachdem Sie nun einen GKE-Cluster eingerichtet haben, sind Sie bereit für den schwierigeren Teil: die dynamische Bereitstellung eines Dask-Clusters in einer Jupyter Notebook-Umgebung.

Erstellen Sie zunächst eine Vorlage für die Worker-Spezifikation in einer YAML-Datei:

# worker-spec.yml
kind: Pod
spec:
  restartPolicy: Never
  containers:
  - image: daskdev/dask:2020.12.0
    imagePullPolicy: IfNotPresent
    args: [dask-worker, --nthreads, 2, --no-dashboard, --memory-limit, 7GB, --death-timeout, 300]
    name: dask
    env:
      - name: EXTRA_PIP_PACKAGES
        value: s3fs
    resources:
      limits:
        cpu: 1
        memory: "7G"
      requests:
        cpu: 1
        memory: "4G"

Passen Sie die Worker an Ihre Bedürfnisse an: Nehmen Sie GPUs, TPUs oder Worker mit hohem Arbeitsspeicher, wie Sie es für richtig halten. Stellen Sie sicher, dass die Dask-Version (2020.12.0) mit Ihrer lokalen Dask-Version identisch ist.

Schritt 1: Ausführen des Schedulers

Öffnen Sie ein Jupyter-Notizbuch und führen Sie den folgenden Befehl in einer Zelle aus:

import dask
from dask_kubernetes import KubeCluster

cluster = KubeCluster.from_yaml('./worker-spec.yml', deploy_mode='remote', name='dask-cluster')

Wenn er läuft, öffnen Sie ein neues Terminal und führen den folgenden Befehl aus:

kubectl port-forward service/dask-cluster 8786:8786 --pod-running-timeout=5m0s

Wenn Sie die Meldung "Fehler vom Server (NotFound): Dienste "dask-cluster" nicht gefunden" erhalten, waren Sie zu schnell. Führen Sie den Befehl einfach erneut aus, bis er erfolgreich ist.

Wenn Sie den Dask-Cluster neu erstellen möchten (weil etwas schief gelaufen ist), stellen Sie sicher, dass Sie zuerst den alten Cluster entfernen, indem Sie den Kubernetes-Dienst (kubectl delete service dask-cluster) und alle Pods (kubectl delete pods --all) entfernen.

Wenn alles gut gegangen ist, sollte die Zelle in Ihrem Jupyter-Notebook beendet werden (was bedeutet, dass [*] in [1] übergeht). Jetzt sehen Sie, dass der Scheduler-Pod in Ihrem Terminal läuft:

kubectl get pods -n default

Sie verbinden sich mit dem Dashboard des Dask-Schedulers, indem Sie ein weiteres Terminal öffnen und einen Tunnel erstellen:

kubectl port-forward service/dask-cluster 8787:8787 --pod-running-timeout=5m0s

In Ihrem Browser können Sie jetzt auf Ihr Dask-Dashboard unter http://localhost:8787 zugreifen :

Was passiert nun eigentlich, wenn Sie den Dask-Cluster erstellen und den Befehl kubectl port-forward ausführen? Wenn Sie eine KubeCluster erstellen, erstellt Dask einen Pod, der den verteilten Dask Scheduler enthält. Außerdem wird ein Kubernetes-Dienst erstellt, der auf Port 8786 (Worker-Kommunikation) und 8787 (Dask-Dashboard) lauscht. Wenn Sie sich mit diesem Dienst verbinden, werden Sie zum Dask Scheduler-Pod weitergeleitet.

Kubernetes bietet die Möglichkeit, über den Master-Endpunkt über http(s) einen Tunnel direkt zu einem Pod oder Dienst aufzubauen. Wir nutzen diese Funktion, um uns von unserer lokalen Umgebung aus mit dem Dask-Scheduler zu verbinden. Dies ist in dem vereinfachten Diagramm unten dargestellt. Wenn Sie kubectl port-forward service/dask-cluster 8786:8786 aufrufen, verbindet sich kubectl mit dem GKE-Master-Endpunkt, der den Datenverkehr an den GKE Dask-Cluster-Dienst weiterleitet. Dieser Dienst wiederum leitet den Datenverkehr an den Dask-Scheduler-Pod weiter. So können Sie sich von Ihrem lokalen Notebook aus über eine sichere Verbindung mit dem Dask-Scheduler verbinden.

Im nächsten Schritt fügen Sie Ihrem Cluster Worker hinzu (unter "Workers" im Dask Dashboard sehen Sie, dass es noch keine gibt).

Schritt 2: Hinzufügen von Arbeitern

Sie können dem Cluster nun Arbeiter hinzufügen, indem Sie die Methode scale verwenden:

cluster.scale(2)

Sie sehen nun zwei Arbeiter, die auf http://localhost:8787/workers laufen :

Schritt 3: Aufträge ausführen

Jetzt, wo wir einen Cluster mit Arbeitern haben, können wir mit der Ausführung von Aufgaben beginnen. Initiieren Sie einen Client:

from dask.distributed import Client
client = Client(cluster)

Sie sind nun bereit, Ihr erstes Stück Dask-Code auszuführen:

import dask.array as da

# Create a large array and calculate the mean
array = da.ones((1000, 1000, 1000))
print(array.mean().compute())  # Should print 1.0

Um den Cluster wirklich auf die Probe zu stellen, laden Sie den berüchtigten New Yorker Taxi-Datensatz für 2015 wie folgt auf Ihren Cluster:

import dask.dataframe as dd
import pandas as pd

taxi_data = dd.read_csv(
    "s3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-*.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    storage_options={'anon': True, 'use_ssl': False}
)

print(f"row count: {taxi_data.shape[0].compute()}")
# 146,112,989 rows

print(f"average trip distance: {taxi_data['trip_distance'].mean().compute()}")
# 13.1 miles

distance_per_day = (
    taxi_data
    .assign(
        day=ddf["tpep_pickup_datetime"].dt.day,
        month=ddf["tpep_pickup_datetime"].dt.month,
        year=ddf["tpep_pickup_datetime"].dt.year,
    )
    .groupby(["day", "month", "year"])["trip_distance"]
    .sum().compute()
)

busiest_day = distance_per_day.idxmax()
distance = distance_per_day.max()

print(f"Busiest day of 2015: {'-'.join(map(str, busiest_day))} ({distance:.0f} taxi miles)")
# Busiest day of 2015: 25-11-2015 (200026475 taxi miles)

Wenn Sie das obige Snippet ausführen, sehen Sie im Dashboard, dass die Last des Clusters steigt:

Schritt 4: Aktivieren Sie auto$caling

Wenn Sie auf mehr als zwei Worker skalieren, gehen Ihnen die Ressourcen aus, da der Cluster nur über drei Knoten verfügt (und einer davon für den Scheduler reserviert ist). Sie lösen dieses Problem, indem Sie die automatische Skalierung für Ihren GKE-Knotenpool aktivieren:

gcloud beta container clusters update dask-cluster 
  --region europe-west4 
  --min-nodes 0 
  --max-nodes 6 
  --autoscaling-profile optimize-utilization

Jetzt können Sie auf mehr als zwei Mitarbeiter aufstocken:

cluster.scale(4)

GKE wird feststellen, dass nicht genügend CPUs zur Verfügung stehen und zwei weitere VMs hinzufügen:

kubectl get nodes
gke-dask-cluster-...   Ready    <none>   2m12s   v1.16.15-gke.4300
gke-dask-cluster-...   Ready    <none>   2m15s   v1.16.15-gke.4300
gke-dask-cluster-...   Ready    <none>   28m      v1.16.15-gke.4300
gke-dask-cluster-...   Ready    <none>   28m      v1.16.15-gke.4300
gke-dask-cluster-...   Ready    <none>   28m      v1.16.15-gke.4300

Zum Verkleinern skalieren Sie einfach auf Null (das Verkleinern dauert in der Regel ein paar Minuten):

cluster.scale(0)

Dask unterstützt auch adaptive Cluster, die je nach Arbeitslast nach oben oder unten skalieren. Das bedeutet, dass Sie die Anzahl der Arbeiter nie manuell festlegen müssen. Um den Cluster adaptiv zu machen, legen Sie nur das Minimum und das Maximum der Arbeiter fest:

cluster.adapt(minimum=0, maximum=5)  # scale between 0 and 5 workers

Lassen Sie uns das adaptive Clustering auf die Probe stellen und einen großen Auftrag ausführen:

# Create a large array and calculate the mean
array = da.ones((10000, 10000, 10000))
print(array.mean().compute())  # Should print 1.0

Der Start des Auftrags dauert länger, da GKE zunächst den Node-Pool erhöht, bevor die Worker für Dask verfügbar sind. Dieser Prozess läuft jedoch vollautomatisch ab und nach ein paar Minuten sollte der Auftrag ausgeführt werden.

Nach Beendigung des Auftrags beendet Dask die Worker und GKE entfernt die VMs aus dem Pool.

Schritt 5: Herunterfahren Ihres Clusters

Sie löschen Ihren Dask-Cluster, indem Sie den Befehl shutdown() aufrufen:

client.shutdown()

Dies löscht alle von Dask erstellten Pods und den Kubernetes-Dienst, der speziell für diesen Cluster erstellt wurde. Um zu überprüfen, ob alles beendet wurde, führen Sie und aus. Um den Cluster abzubauen, führen Sie gcloud container clusters delete dask-cluster --region europe-west4 aus.

Wenn Sie Ihren Cluster online halten möchten, aber nur minimale Kosten haben wollen, können Sie die Größe Ihres Nodepools auf 0 setzen. Normalerweise verursacht der Betrieb von GKE ohne Knoten immer noch Kosten durch die Verwaltungsgebühr von 0,10$ pro Stunde, allerdings entfallen die Verwaltungsgebühren für Ihren ersten Cluster. Wenn Sie also Ihren Cluster online halten möchten, aber nur minimale Kosten zahlen wollen, können Sie die Größe Ihres Nodepools auf 0 Maschinen reduzieren, indem Sie ihn ausführen:

gcloud container clusters resize dask-cluster 
    --region europe-west4 
    --num-nodes 0

Wenn Sie einen Scheduler erneut ausführen möchten, stellen Sie sicher, dass Sie mindestens einen Knoten wieder hinzufügen.

Verfasst von

Diederik Greveling

Contact

Let’s discuss how we can support your journey.