Blog

Kedro Dynamische Pipelines

Marcin Zabłocki

Aktualisiert Oktober 15, 2025
11 Minuten

"Wie kann ich Kedro-Pipelines dynamisch generieren?" - ist eine der am häufigsten gestellten Fragen auf Kedro Slack. Ich bin Mitglied des technischen Lenkungsausschusses von Kedro und sehe, dass diese Frage dort häufig auftaucht.

Die Anwendungsfälle für dynamische Pipelines in Kedro fallen in der Regel in die folgenden Bereiche:

  • Implementierung von "Kern"-ML-Pipelines, die für verschiedene Geschäftsanwendungen konfiguriert und wiederverwendet werden können.
    Beispielhafte Industrieanwendungen:
    • Einzelhandel: Dies kann den Aufbau eines Umsatzprognosemodells pro Geschäft oder pro Produkt oder die Erstellung personalisierter Empfehlungssysteme für verschiedene Kundensegmente beinhalten.
    • Finanzen: Dies könnte die Entwicklung von Risikobewertungsmodellen für verschiedene Arten von Darlehen oder Kreditprodukten beinhalten.
  • Automatische Durchführung mehrerer Experimente, um zu ermitteln, welche Modellkonfiguration am besten abschneidet. Diese Experimente können sich in Bezug auf die verwendeten Funktionen, Modellparameter oder sogar die Art der verwendeten Modelle unterscheiden.

Auf den ersten Blick scheint das Problem trivial zu sein - da wir uns in der Welt von Python (dem Motor des aktuellen Gen AI-Booms) befinden, sollte alles möglich sein. Das ist es in der Tat, aber wenn Sie sich an die Kedro-Prinzipien für die Erstellung von wartbarem und modularem Data Science Code halten, wird das Problem schwieriger, als es aussieht.

In diesem Beitrag führe ich Sie durch den Prozess der Implementierung dynamischer Pipelines in Kedro, wobei die wichtigsten Konzepte und Prinzipien des Frameworks beibehalten werden.

Warum sind dynamische Pipelines in Kedro schwierig?

Einerseits verfügt Kedro über die Konzepte modularer Pipelines - sie ermöglichen die mehrfache Wiederverwendung der gleichen Pipelinestruktur. Gleichzeitig ermöglichen sie es dem Benutzer, die Eingaben, Ausgaben und Parameter jeder Instanz zu ändern. Dadurch sind sie innerhalb derselben Codebasis wiederverwendbar und können projektübergreifend genutzt werden.

Schauen wir uns ein Beispiel für eine modulare Pipeline an (angepasst an das offizielle Spaceflights-Tutorial):

def create_pipeline(**kwargs) -> Pipeline:
    data_science_pipeline = pipeline(
        [
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data_node",
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
            ),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
                name="evaluate_model_node",
            ),
        ]
    )
    baseline = pipeline(
        data_science_pipeline,
        parameters={"params:model_options": "params:model_options"},
        inputs={"model_input_table": "model_input_table",},
        namespace="baseline",
    )
    candidate = pipeline(
        data_science_pipeline,
        inputs={"model_input_table": "model_input_table",},
        tags=["candidate"],
        namespace="candidate"
    )
    return baseline + candidate

Die Idee dabei ist, dieselbe Pipeline zweimal mit unterschiedlichen Parametern laufen zu lassen. In der Welt von Kedro ist das völlig in Ordnung - Sie definieren die Parameter zweimal in parameters.yml:

model_options:
  test_size: 0.2
  random_state: 3
  features:
    - engines
    - passenger_capacity
    - crew
    - d_check_complete
    - moon_clearance_complete
    - iata_approved
    - company_rating
    - review_scores_rating

candidate:
  model_options:
    test_size: 0.2
    random_state: 666
    features:
      - engines
      - passenger_capacity
      - crew
      - d_check_complete
      - moon_clearance_complete

Jetzt sind Sie startklar! Sie auch?

Was wäre, wenn Sie eine modulare Pipeline nicht zweimal, sondern 10x oder vielleicht 100x wiederverwenden wollten? Sie müssten die parameters.yml 10- oder 100-mal erweitern, indem Sie sie entweder irgendwie generieren oder immer wieder einfügen, obwohl Sie nur einen Teil der Parameter ändern wollten - ein alltäglicher Fall, dem Sie bei Ihrer Arbeit als Data Scientist oder ML Engineer begegnen werden.

Die parameters.yml ist die erste Sache, die zweite ist der create_pipeline Code. Zu dem Zeitpunkt, an dem create_pipeline im Lebenszyklus des Kedro-Projekts aufgerufen wird, sind die Parameter noch nicht verfügbar und Sie können sie nicht verwenden, um eine variable Anzahl von modularen Pipelines1 zu erzeugen.

Es gibt drei Stellen, die "wissen" müssen, wie oft Sie die modulare Pipeline verwenden möchten - die letzte Stelle ist die catalog.yml - da jede modulare Pipeline mit einem Namensraum versehen ist, müssen auch alle Datenkatalogeinträge mit einem Namensraum versehen sein:

preprocessed_companies:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_companies.pq

preprocessed_shuttles:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_shuttles.pq

model_input_table:
  type: pandas.ParquetDataSet
  filepath: data/03_primary/model_input_table.pq

Das müssen Sie haben:

baseline.preprocessed_companies:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_companies.pq

baseline.preprocessed_shuttles:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_shuttles.pq

baseline.model_input_table:
  type: pandas.ParquetDataSet
  filepath: data/03_primary/model_input_table.pq

candidate.preprocessed_companies:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/candidate/preprocessed_companies.pq

candidate.preprocessed_shuttles:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/candidate/preprocessed_shuttles.pq

candidate.model_input_table:
  type: pandas.ParquetDataSet
  filepath: data/03_primary/candidate/model_input_table.pq

Je größer die Pipeline ist, desto größer wird die catalog.yml, desto größer ist die Wahrscheinlichkeit, dass eine der filepath(s) durcheinander gerät.
blog-notiz-getindata

Von modularen Pipelines zu dynamischen Pipelines

Da Sie nun den Hintergrund kennen (und wahrscheinlich sind Sie hier, weil Sie "Kedro dynamische Pipelines" nachgeschlagen haben und einfach nur den Code wollen), möchte ich Ihnen zeigen, wie Sie die folgenden Anwendungsfälle für dynamische Pipelines in Kedro lösen können:

  • Anwendungsfall 1: Sie haben eine Pipeline, die Sie auf einem Datensatz ausführen möchten, der sich im Laufe der Zeit weiterentwickelt - z.B. ein Prognosemodell mit monatlichen Datentabellen, bei dem ein Monat die Daten des Vormonats verbraucht usw., wie in diesem Beispiel.
  • Anwendungsfall 2: Sie haben eine Reihe von ähnlichen Modellexperimenten mit ähnlichen Parametern, die Sie parallel durchführen möchten. Die Modellparameter, die verwendeten Features, die Zielspalten oder die Modelltypen können in den verschiedenen Experimenten variieren.
  • Anwendungsfall 3: Sie möchten eine "zentrale" / "wiederverwendbare" Pipeline implementieren, die für mehrere geschäftliche Anwendungsfälle konfiguriert und mehrfach ausgeführt werden kann.

In "unserer Methode" lösen wir die wichtigsten Probleme im Umgang mit dynamischen Pipelines in Kedro, mit sehr wenig eigenem Code und ohne zusätzliche Plugins! Die Methode, die wir hier vorstellen, ist eher ein Vorschlag für einen Projekt-Workflow mit ein paar Ergänzungen, um alles zusammenzufügen.

Das alles wurde erst kürzlich dank der folgenden Funktionen von Kedro möglich (alle verfügbar unter 0.18.13):

  • OmegaConfigLoader mit benutzerdefinierten Auflösern
  • Dataset-Fabriken
  • Modulare Pipelines mit Namespaces
  • Zentralisiert settings.py

Lassen Sie mich Sie Schritt für Schritt durch den Prozess führen.

1. Ändern Sie settings.py

Es gibt ein paar wichtige, projektweite Einstellungen, die Sie aktivieren müssen. Die erste ist die Verwendung von OmegaConfigLoader anstelle des Standard-Config-Loaders (beachten Sie, dass dies ab Kedro 0.19.0 die Standardeinstellung ist).

from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader

Als Nächstes erstellen Sie einen benutzerdefinierten OmegaConf-Resolver namens merge, der einen Deep-Merge von zwei Python-Wörterbüchern durchführt - Sie werden gleich sehen, warum.

from copy import deepcopy
import omegaconf

def merge_dicts(dict1, dict2):
    """
    Recursively merge two dictionaries.

    Args:
        dict1 (dict): The first dictionary to merge.
        dict2 (dict): The second dictionary to merge.

    Returns:
        dict: The merged dictionary.
    """
    result = deepcopy(dict1)
    for key, value in dict2.items():
        if (
            key in result
            and isinstance(result[key], omegaconf.dictconfig.DictConfig)
            and isinstance(value, omegaconf.dictconfig.DictConfig)
        ):
            result[key] = merge_dicts(result[key], value)
        else:
            result[key] = value
    return result

Der letzte und wichtigste Zusatz in settings.py ist die projektweite "Deklaration" der Pipelines, die Sie haben möchten. Sie wird sowohl in create_pipeline als auch bei der Validierung der Parameter verwendet.

DYNAMIC_PIPELINES_MAPPING = {
    "reviews_predictor": ["base", "test1"],
    "price_predictor": ["base", "candidate1", "candidate2", "candidate3"],
}

Dieses Wörterbuch deklariert die namespaces (Schlüssel), die dynamisch generierte Pipelines haben - im obigen Beispiel hat die Pipeline im Namensraum = reviews_predictor2 Instanzen und die Pipeline im Namensraum = price_predictor hat 4 Instanzen. Alle 6 Instanzen verwenden die gleiche Kedro-Pipelinestruktur.
wichtige-note-getindata

2. Anpassen der parameters.yml

Bis zu diesem Punkt hatte parameters.yml also eine Menge Duplikate und war fehleranfällig. Dank der Unterstützung für OmegaConfigLoader und merge resolver (oben definiert) kann die Datei parameters.yml so umstrukturiert werden, dass sie der Vererbung von Klassen aus OOP ähnelt:

# The model_options below are "base" options for all pipelines

model_options:
  test_size: 0.2
  random_state: 3
  target: costs
  features:
    - engines
    - passenger_capacity
    - crew
    - d_check_complete
    - moon_clearance_complete
    - iata_approved
    - company_rating
    - review_scores_rating
  model: sklearn.linear_model.LinearRegression
  model_params: {}

# -------------------------------
# Pipeline-specific configuration

price_predictor:
  _overrides:
    target: price
  model_options: ${merge:${model_options},${._overrides}}

  base:
    model_options: ${..model_options}
  
  candidate1:
    _overrides:
      features:
      - engines
      - passenger_capacity
      - crew
      - d_check_complete
      - company_rating
    model_options: ${merge:${..model_options},${._overrides}}

  candidate2:
    _overrides:
      model_params:
        gamma: 2.5
    model_options: ${merge:${..model_options},${._overrides}}

  candidate3:
    _overrides:
      model: sklearn.ensemble.RandomForestRegressor
      model_params:
        max_depth: 1.0

    model_options: ${merge:${..model_options},${._overrides}}

Das neue parameters.yml kombiniert auf geschickte Weise die Verwendung der eingebauten OmegaConf-Referenzauflösung (hier: ${..model_options} und ${._overrides}) mit der Merge-Auflösung, die es Ihnen ermöglicht, auf verschiedene Teile der Konfiguration zu verweisen und diese wiederzuverwenden. Indem Sie die Parameterstruktur wie folgt deklarieren:

<namespace>:
    <variant - e.g. model variant / experiment name>:
        _overrides: {} # what to override from root configuration
        
	# any specific parameter, e.g. model_options, used in Kedro pipeline
       model_options: ${merge:${..model_options},${._overrides}}

simulieren Sie effektiv die Vererbung von Parametern und können einige/alle Werte wiederverwenden und/oder außer Kraft setzen. Während der Laufzeit werden die Resolver ausgeführt und die Konfiguration, die den Kedro-Knoten zur Verfügung gestellt wird, enthält die tatsächlichen Werte.

3. Dataset-Fabriken verwenden

Die Funktion der Kedro-Datensatzfabriken ermöglicht es Ihnen, die "generische" Konfiguration zu schreiben und die Redundanz der Katalogeinträge zu minimieren, indem Sie die in den Pipelines Ihres Projekts verwendeten Datensätze mit den Mustern der Datensatzfabriken verknüpfen. Wie funktioniert das? Sie geben die "Platzhalter" an, die während der Ausführung des Kedro-Projekts mit den tatsächlichen Laufzeitwerten gefüllt werden.

In unserem Fall möchten wir potenziell alle trainierten Modelle aus allen unseren Pipelines speichern - also in diesem einfachen Szenario mit 2 Namensräumen und 6 Varianten:

"reviews_predictor": ["base", "test1"],
 "price_predictor": ["base", "candidate1", "candidate2", "candidate3"],

müssten Sie insgesamt 12 Katalogeinträge haben - und das nur für einen einzigen Knoten, der Daten ausgibt!

Dank der Dataset-Fabriken, statt:

price_predictor.base.regressor:
  type: pickle.PickleDataSet
  filepath: data/06_models/price_predictor/base/regressor.pickle
  versioned: true

price_predictor.candidate1.regressor:
  type: pickle.PickleDataSet
  filepath: data/06_models/price_predictor/candidate1/regressor.pickle
  versioned: true

price_predictor.candidate2.regressor:
  type: pickle.PickleDataSet
  filepath: data/06_models/price_predictor/candidate2/regressor.pickle
  versioned: true

# ... and 9 more entries

können Sie nur einen einzigen Eintrag haben :

"{namespace}.{variant}.regressor":
  type: pickle.PickleDataSet
  filepath: data/06_models/{namespace}/{variant}/regressor.pickle
  versioned: true

Zur Laufzeit, wenn Kedro den Knoten erreicht, der die regressornimmt es seinen Namespace (z.B. price_predictor.candidate1), gleichen Sie es mit dem Muster aus dem Datenkatalog ab: "{namespace}.{variant}.regressor" und füllen Sie die {namespace} und {variant} Platzhalter mit price_predictor und candidate1 entsprechend.

Stellen Sie sicher, dass Sie auch den folgenden Eintrag hinzufügen:

"{namespace}.model_input_table":
  type: pandas.ParquetDataSet
  filepath: data/03_primary/{namespace}/model_input_table.pq

wenn Sie diesen Blogpost mit dem Spaceflights-Starter mitverfolgen.

4. Ändern Sie Standard-Pipelines in modulare Pipelines

Der letzte Teil besteht darin, die Pipelines tatsächlich zu generieren. Beginnen wir mit der data_processing Pipeline aus dem Spaceflights-Starter:

from <project_name> import settings

def create_pipeline(**kwargs) -> Pipeline:
    data_processing = pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies",
                outputs="preprocessed_companies",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles_node",
            ),
            node(
                func=create_model_input_table,
                inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
                outputs="model_input_table",
                name="create_model_input_table_node",
            ),
        ]
    )

    pipes = []
    for namespace in settings.DYNAMIC_PIPELINES_MAPPING.keys():
        pipes.append(
            pipeline(
                data_processing,
                inputs={
                    "companies": "companies",
                    "shuttles": "shuttles",
                    "reviews": "reviews",
                },
                namespace=namespace,
                tags=settings.DYNAMIC_PIPELINES_MAPPING[namespace],
            )
        )
    return sum(pipes)

Und die data_science Pipeline:

from <project name> import settings

def create_pipeline(**kwargs) -> Pipeline:
    data_science_pipeline = pipeline(
        [
            node(
                func=verbose_params,
                inputs=["params:model_options"],
                outputs=None,
                name="debug_node",
            ),
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data_node",
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
            ),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
                name="evaluate_model_node",
            ),
        ]
    )
    pipes = []
    for namespace, variants in settings.DYNAMIC_PIPELINES_MAPPING.items():
        for variant in variants:
            pipes.append(
                pipeline(
                    data_science_pipeline,
                    inputs={"model_input_table": f"{namespace}.model_input_table"},
                    namespace=f"{namespace}.{variant}",
                    tags=[variant, namespace],
                )
            )
    return sum(pipes)

Beachten Sie, wie wir die settings importieren, um die Deklarationen der dynamischen Pipelines nachzuschlagen. Dank der Verwendung von settings von Kedro werden die Pipelines an einem zentralen Ort deklariert und können im gesamten Projekt wiederverwendet werden.

5. Ausführen der dynamischen Pipelines in Kedro

Die Art und Weise, wie namespaces und tags hier konfiguriert sind, ist sehr wichtig. Die data_processing Pipeline generiert wird für price_predictor und reviews_predictor während die data_science Pipeline hat mehrere Varianten (d.h. verschiedene Modellkonfigurationen) für jede von uns erstellte dynamische Instanz: base, candidate1, candidate2 usw.

Und warum?

So können wir die Pipelines auf die folgenden Arten ausführen:

  • Führen Sie alles für einen der Namensräume aus, z.B. kedro run --namespace price_predictor: Dies führt die data_processing Pipeline einmal aus und alle Varianten der data_sciencePipeline: base, candidate1, candidate2 usw.
  • Führen Sie nur die eine Variante aus: kedro run --namespace price_predictor.candidate1 führt nur die data_science Pipeline in der candidate1 Variante aus und überspringt den Rest. Dazu muss die data_processing Pipeline zuerst ausgeführt werden, um Eingabedaten zu verarbeiten.
  • Führen Sie nur die einzelne Variante aus und lassen Sie auch die data_processing Pipeline laufen: kedro run --namespace price_predictor --tags candidate1

Einfach und wartbar

Der Ansatz zur Erstellung dynamischer Pipelines in Kedro steht im Einklang mit der Wartungsfreundlichkeit des Projekts und erfordert kein Hacken oder Abweichen von der "Kedro-Methode". Da die Pipeline-Ausführung dynamisch ist und die Pipeline-Struktur quasi-dynamisch bleibt, sollten alle vorhandenen Tools und Plug-ins, die auf Kedro aufbauen, so funktionieren wie bisher, z.B. können Sie mit Kedro-Viz eine Vorschau der Pipelines und Parameter anzeigen:
Kedro Dynamische Pipelines

Follow-ups für die Geeks

Wenn Sie ein Kedro-Geek sind (wie ich 😀 ), gibt es noch ein paar weitere Themen, in die Sie eintauchen können:

Zusammenfassung

Endlich haben wir den Stand der Funktionen in Kedro erreicht, die es uns ermöglichen, dynamische Pipelines zu erstellen. All die kleinen Dinge, die während des 0.18.x Release-Zyklus hinzugefügt wurden, haben das Framework noch elastischer gemacht, während es weiterhin ein ausgereifter Baustein für MLOps-Plattformen und -Projekte bleibt.

Vielen Dank an Artur Dobrogowski, der einen großen Teil dieses Projekts implementiert und getestet hat.

Verfasst von

Marcin Zabłocki

Contact

Let’s discuss how we can support your journey.