"Wie kann ich Kedro-Pipelines dynamisch generieren?" - ist eine der am häufigsten gestellten Fragen auf Kedro Slack. Ich bin Mitglied des technischen Lenkungsausschusses von Kedro und sehe, dass diese Frage dort häufig auftaucht.
Die Anwendungsfälle für dynamische Pipelines in Kedro fallen in der Regel in die folgenden Bereiche:
- Implementierung von "Kern"-ML-Pipelines, die für verschiedene Geschäftsanwendungen konfiguriert und wiederverwendet werden können.
Beispielhafte Industrieanwendungen:- Einzelhandel: Dies kann den Aufbau eines Umsatzprognosemodells pro Geschäft oder pro Produkt oder die Erstellung personalisierter Empfehlungssysteme für verschiedene Kundensegmente beinhalten.
- Finanzen: Dies könnte die Entwicklung von Risikobewertungsmodellen für verschiedene Arten von Darlehen oder Kreditprodukten beinhalten.
- Automatische Durchführung mehrerer Experimente, um zu ermitteln, welche Modellkonfiguration am besten abschneidet. Diese Experimente können sich in Bezug auf die verwendeten Funktionen, Modellparameter oder sogar die Art der verwendeten Modelle unterscheiden.
Auf den ersten Blick scheint das Problem trivial zu sein - da wir uns in der Welt von Python (dem Motor des aktuellen Gen AI-Booms) befinden, sollte alles möglich sein. Das ist es in der Tat, aber wenn Sie sich an die Kedro-Prinzipien für die Erstellung von wartbarem und modularem Data Science Code halten, wird das Problem schwieriger, als es aussieht.
In diesem Beitrag führe ich Sie durch den Prozess der Implementierung dynamischer Pipelines in Kedro, wobei die wichtigsten Konzepte und Prinzipien des Frameworks beibehalten werden.
Warum sind dynamische Pipelines in Kedro schwierig?
Einerseits verfügt Kedro über die Konzepte modularer Pipelines - sie ermöglichen die mehrfache Wiederverwendung der gleichen Pipelinestruktur. Gleichzeitig ermöglichen sie es dem Benutzer, die Eingaben, Ausgaben und Parameter jeder Instanz zu ändern. Dadurch sind sie innerhalb derselben Codebasis wiederverwendbar und können projektübergreifend genutzt werden.
Schauen wir uns ein Beispiel für eine modulare Pipeline an (angepasst an das offizielle Spaceflights-Tutorial):
def create_pipeline(**kwargs) -> Pipeline:
data_science_pipeline = pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
baseline = pipeline(
data_science_pipeline,
parameters={"params:model_options": "params:model_options"},
inputs={"model_input_table": "model_input_table",},
namespace="baseline",
)
candidate = pipeline(
data_science_pipeline,
inputs={"model_input_table": "model_input_table",},
tags=["candidate"],
namespace="candidate"
)
return baseline + candidate
Die Idee dabei ist, dieselbe Pipeline zweimal mit unterschiedlichen Parametern laufen zu lassen. In der Welt von Kedro ist das völlig in Ordnung - Sie definieren die Parameter zweimal in parameters.yml:
model_options:
test_size: 0.2
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
candidate:
model_options:
test_size: 0.2
random_state: 666
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
Jetzt sind Sie startklar! Sie auch?
Was wäre, wenn Sie eine modulare Pipeline nicht zweimal, sondern 10x oder vielleicht 100x wiederverwenden wollten? Sie müssten die parameters.yml 10- oder 100-mal erweitern, indem Sie sie entweder irgendwie generieren oder immer wieder einfügen, obwohl Sie nur einen Teil der Parameter ändern wollten - ein alltäglicher Fall, dem Sie bei Ihrer Arbeit als Data Scientist oder ML Engineer begegnen werden.
Die parameters.yml ist die erste Sache, die zweite ist der create_pipeline Code. Zu dem Zeitpunkt, an dem create_pipeline im Lebenszyklus des Kedro-Projekts aufgerufen wird, sind die Parameter noch nicht verfügbar und Sie können sie nicht verwenden, um eine variable Anzahl von modularen Pipelines1 zu erzeugen.
Es gibt drei Stellen, die "wissen" müssen, wie oft Sie die modulare Pipeline verwenden möchten - die letzte Stelle ist die catalog.yml - da jede modulare Pipeline mit einem Namensraum versehen ist, müssen auch alle Datenkatalogeinträge mit einem Namensraum versehen sein:
preprocessed_companies:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_companies.pq
preprocessed_shuttles:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_shuttles.pq
model_input_table:
type: pandas.ParquetDataSet
filepath: data/03_primary/model_input_table.pq
Das müssen Sie haben:
baseline.preprocessed_companies:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_companies.pq
baseline.preprocessed_shuttles:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_shuttles.pq
baseline.model_input_table:
type: pandas.ParquetDataSet
filepath: data/03_primary/model_input_table.pq
candidate.preprocessed_companies:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/candidate/preprocessed_companies.pq
candidate.preprocessed_shuttles:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/candidate/preprocessed_shuttles.pq
candidate.model_input_table:
type: pandas.ParquetDataSet
filepath: data/03_primary/candidate/model_input_table.pq
Je größer die Pipeline ist, desto größer wird die catalog.yml, desto größer ist die Wahrscheinlichkeit, dass eine der filepath(s) durcheinander gerät.
Von modularen Pipelines zu dynamischen Pipelines
Da Sie nun den Hintergrund kennen (und wahrscheinlich sind Sie hier, weil Sie "Kedro dynamische Pipelines" nachgeschlagen haben und einfach nur den Code wollen), möchte ich Ihnen zeigen, wie Sie die folgenden Anwendungsfälle für dynamische Pipelines in Kedro lösen können:
- Anwendungsfall 1: Sie haben eine Pipeline, die Sie auf einem Datensatz ausführen möchten, der sich im Laufe der Zeit weiterentwickelt - z.B. ein Prognosemodell mit monatlichen Datentabellen, bei dem ein Monat die Daten des Vormonats verbraucht usw., wie in diesem Beispiel.
- Anwendungsfall 2: Sie haben eine Reihe von ähnlichen Modellexperimenten mit ähnlichen Parametern, die Sie parallel durchführen möchten. Die Modellparameter, die verwendeten Features, die Zielspalten oder die Modelltypen können in den verschiedenen Experimenten variieren.
- Anwendungsfall 3: Sie möchten eine "zentrale" / "wiederverwendbare" Pipeline implementieren, die für mehrere geschäftliche Anwendungsfälle konfiguriert und mehrfach ausgeführt werden kann.
In "unserer Methode" lösen wir die wichtigsten Probleme im Umgang mit dynamischen Pipelines in Kedro, mit sehr wenig eigenem Code und ohne zusätzliche Plugins! Die Methode, die wir hier vorstellen, ist eher ein Vorschlag für einen Projekt-Workflow mit ein paar Ergänzungen, um alles zusammenzufügen.
Das alles wurde erst kürzlich dank der folgenden Funktionen von Kedro möglich (alle verfügbar unter 0.18.13):
- OmegaConfigLoader mit benutzerdefinierten Auflösern
- Dataset-Fabriken
- Modulare Pipelines mit Namespaces
- Zentralisiert
settings.py
Lassen Sie mich Sie Schritt für Schritt durch den Prozess führen.
1. Ändern Sie settings.py
Es gibt ein paar wichtige, projektweite Einstellungen, die Sie aktivieren müssen. Die erste ist die Verwendung von OmegaConfigLoader anstelle des Standard-Config-Loaders (beachten Sie, dass dies ab Kedro 0.19.0 die Standardeinstellung ist).
from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader
Als Nächstes erstellen Sie einen benutzerdefinierten OmegaConf-Resolver namens merge, der einen Deep-Merge von zwei Python-Wörterbüchern durchführt - Sie werden gleich sehen, warum.
from copy import deepcopy
import omegaconf
def merge_dicts(dict1, dict2):
"""
Recursively merge two dictionaries.
Args:
dict1 (dict): The first dictionary to merge.
dict2 (dict): The second dictionary to merge.
Returns:
dict: The merged dictionary.
"""
result = deepcopy(dict1)
for key, value in dict2.items():
if (
key in result
and isinstance(result[key], omegaconf.dictconfig.DictConfig)
and isinstance(value, omegaconf.dictconfig.DictConfig)
):
result[key] = merge_dicts(result[key], value)
else:
result[key] = value
return result
Der letzte und wichtigste Zusatz in settings.py ist die projektweite "Deklaration" der Pipelines, die Sie haben möchten. Sie wird sowohl in create_pipeline als auch bei der Validierung der Parameter verwendet.
DYNAMIC_PIPELINES_MAPPING = {
"reviews_predictor": ["base", "test1"],
"price_predictor": ["base", "candidate1", "candidate2", "candidate3"],
}
Dieses Wörterbuch deklariert die namespaces (Schlüssel), die dynamisch generierte Pipelines haben - im obigen Beispiel hat die Pipeline im Namensraum = reviews_predictor2 Instanzen und die Pipeline im Namensraum = price_predictor hat 4 Instanzen. Alle 6 Instanzen verwenden die gleiche Kedro-Pipelinestruktur. 
2. Anpassen der parameters.yml
Bis zu diesem Punkt hatte parameters.yml also eine Menge Duplikate und war fehleranfällig. Dank der Unterstützung für OmegaConfigLoader und merge resolver (oben definiert) kann die Datei parameters.yml so umstrukturiert werden, dass sie der Vererbung von Klassen aus OOP ähnelt:
# The model_options below are "base" options for all pipelines
model_options:
test_size: 0.2
random_state: 3
target: costs
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
model: sklearn.linear_model.LinearRegression
model_params: {}
# -------------------------------
# Pipeline-specific configuration
price_predictor:
_overrides:
target: price
model_options: ${merge:${model_options},${._overrides}}
base:
model_options: ${..model_options}
candidate1:
_overrides:
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- company_rating
model_options: ${merge:${..model_options},${._overrides}}
candidate2:
_overrides:
model_params:
gamma: 2.5
model_options: ${merge:${..model_options},${._overrides}}
candidate3:
_overrides:
model: sklearn.ensemble.RandomForestRegressor
model_params:
max_depth: 1.0
model_options: ${merge:${..model_options},${._overrides}}
Das neue parameters.yml kombiniert auf geschickte Weise die Verwendung der eingebauten OmegaConf-Referenzauflösung (hier: ${..model_options} und ${._overrides}) mit der Merge-Auflösung, die es Ihnen ermöglicht, auf verschiedene Teile der Konfiguration zu verweisen und diese wiederzuverwenden. Indem Sie die Parameterstruktur wie folgt deklarieren:
<namespace>:
<variant - e.g. model variant / experiment name>:
_overrides: {} # what to override from root configuration
# any specific parameter, e.g. model_options, used in Kedro pipeline
model_options: ${merge:${..model_options},${._overrides}}
simulieren Sie effektiv die Vererbung von Parametern und können einige/alle Werte wiederverwenden und/oder außer Kraft setzen. Während der Laufzeit werden die Resolver ausgeführt und die Konfiguration, die den Kedro-Knoten zur Verfügung gestellt wird, enthält die tatsächlichen Werte.
3. Dataset-Fabriken verwenden
Die Funktion der Kedro-Datensatzfabriken ermöglicht es Ihnen, die "generische" Konfiguration zu schreiben und die Redundanz der Katalogeinträge zu minimieren, indem Sie die in den Pipelines Ihres Projekts verwendeten Datensätze mit den Mustern der Datensatzfabriken verknüpfen. Wie funktioniert das? Sie geben die "Platzhalter" an, die während der Ausführung des Kedro-Projekts mit den tatsächlichen Laufzeitwerten gefüllt werden.
In unserem Fall möchten wir potenziell alle trainierten Modelle aus allen unseren Pipelines speichern - also in diesem einfachen Szenario mit 2 Namensräumen und 6 Varianten:
"reviews_predictor": ["base", "test1"],
"price_predictor": ["base", "candidate1", "candidate2", "candidate3"],
müssten Sie insgesamt 12 Katalogeinträge haben - und das nur für einen einzigen Knoten, der Daten ausgibt!
Dank der Dataset-Fabriken, statt:
price_predictor.base.regressor:
type: pickle.PickleDataSet
filepath: data/06_models/price_predictor/base/regressor.pickle
versioned: true
price_predictor.candidate1.regressor:
type: pickle.PickleDataSet
filepath: data/06_models/price_predictor/candidate1/regressor.pickle
versioned: true
price_predictor.candidate2.regressor:
type: pickle.PickleDataSet
filepath: data/06_models/price_predictor/candidate2/regressor.pickle
versioned: true
# ... and 9 more entries
können Sie nur einen einzigen Eintrag haben :
"{namespace}.{variant}.regressor":
type: pickle.PickleDataSet
filepath: data/06_models/{namespace}/{variant}/regressor.pickle
versioned: true
Zur Laufzeit, wenn Kedro den Knoten erreicht, der die regressornimmt es seinen Namespace (z.B. price_predictor.candidate1), gleichen Sie es mit dem Muster aus dem Datenkatalog ab: "{namespace}.{variant}.regressor" und füllen Sie die {namespace} und {variant} Platzhalter mit price_predictor und candidate1 entsprechend.
Stellen Sie sicher, dass Sie auch den folgenden Eintrag hinzufügen:
"{namespace}.model_input_table":
type: pandas.ParquetDataSet
filepath: data/03_primary/{namespace}/model_input_table.pq
wenn Sie diesen Blogpost mit dem Spaceflights-Starter mitverfolgen.
4. Ändern Sie Standard-Pipelines in modulare Pipelines
Der letzte Teil besteht darin, die Pipelines tatsächlich zu generieren. Beginnen wir mit der data_processing Pipeline aus dem Spaceflights-Starter:
from <project_name> import settings
def create_pipeline(**kwargs) -> Pipeline:
data_processing = pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
node(
func=create_model_input_table,
inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
outputs="model_input_table",
name="create_model_input_table_node",
),
]
)
pipes = []
for namespace in settings.DYNAMIC_PIPELINES_MAPPING.keys():
pipes.append(
pipeline(
data_processing,
inputs={
"companies": "companies",
"shuttles": "shuttles",
"reviews": "reviews",
},
namespace=namespace,
tags=settings.DYNAMIC_PIPELINES_MAPPING[namespace],
)
)
return sum(pipes)
Und die data_science Pipeline:
from <project name> import settings
def create_pipeline(**kwargs) -> Pipeline:
data_science_pipeline = pipeline(
[
node(
func=verbose_params,
inputs=["params:model_options"],
outputs=None,
name="debug_node",
),
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
pipes = []
for namespace, variants in settings.DYNAMIC_PIPELINES_MAPPING.items():
for variant in variants:
pipes.append(
pipeline(
data_science_pipeline,
inputs={"model_input_table": f"{namespace}.model_input_table"},
namespace=f"{namespace}.{variant}",
tags=[variant, namespace],
)
)
return sum(pipes)
Beachten Sie, wie wir die settings importieren, um die Deklarationen der dynamischen Pipelines nachzuschlagen. Dank der Verwendung von settings von Kedro werden die Pipelines an einem zentralen Ort deklariert und können im gesamten Projekt wiederverwendet werden.
5. Ausführen der dynamischen Pipelines in Kedro
Die Art und Weise, wie namespaces und tags hier konfiguriert sind, ist sehr wichtig. Die data_processing Pipeline generiert wird für price_predictor und reviews_predictor während die data_science Pipeline hat mehrere Varianten (d.h. verschiedene Modellkonfigurationen) für jede von uns erstellte dynamische Instanz: base, candidate1, candidate2 usw.
Und warum?
So können wir die Pipelines auf die folgenden Arten ausführen:
- Führen Sie alles für einen der Namensräume aus, z.B.
kedro run --namespace price_predictor: Dies führt diedata_processingPipeline einmal aus und alle Varianten derdata_sciencePipeline: base, candidate1, candidate2 usw. - Führen Sie nur die eine Variante aus:
kedro run --namespace price_predictor.candidate1führt nur diedata_sciencePipeline in dercandidate1Variante aus und überspringt den Rest. Dazu muss diedata_processingPipeline zuerst ausgeführt werden, um Eingabedaten zu verarbeiten. - Führen Sie nur die einzelne Variante aus und lassen Sie auch die
data_processingPipeline laufen:kedro run --namespace price_predictor --tags candidate1
Einfach und wartbar
Der Ansatz zur Erstellung dynamischer Pipelines in Kedro steht im Einklang mit der Wartungsfreundlichkeit des Projekts und erfordert kein Hacken oder Abweichen von der "Kedro-Methode". Da die Pipeline-Ausführung dynamisch ist und die Pipeline-Struktur quasi-dynamisch bleibt, sollten alle vorhandenen Tools und Plug-ins, die auf Kedro aufbauen, so funktionieren wie bisher, z.B. können Sie mit Kedro-Viz eine Vorschau der Pipelines und Parameter anzeigen: 
Follow-ups für die Geeks
Wenn Sie ein Kedro-Geek sind (wie ich 😀 ), gibt es noch ein paar weitere Themen, in die Sie eintauchen können:
- Vorschlag für Pipeline-Filterungshaken, die die Erstellung von Pipelines auf der Grundlage von Parametern/Konfiguration ermöglichen https://github.com/kedro-org/kedro/issues/3000
- Konsistenz der Initialisierung des Config Loaders https://github.com/kedro-org/kedro/issues/3093
- Möglichkeit, mehrere Namespaces gleichzeitig auszuführen https://github.com/kedro-org/kedro/issues/3056
- Hinzufügen einer Validierung - in unseren Projekten, die dynamische Pipelines nutzen, haben wir auch einen Validierungsschritt hinzugefügt, der eine ordnungsgemäße Struktur auf
parameters.ymlin Bezug auf die definierten Namespaces und Varianten gewährleistet. Wenn Sie daran oder an anderen ML / MLOps Best Practices und Implementierungen interessiert sind, sehen Sie sich unsere MLOps-Demo an und melden Sie sich für eine kostenlose Beratung bei mir an.
Zusammenfassung
Endlich haben wir den Stand der Funktionen in Kedro erreicht, die es uns ermöglichen, dynamische Pipelines zu erstellen. All die kleinen Dinge, die während des 0.18.x Release-Zyklus hinzugefügt wurden, haben das Framework noch elastischer gemacht, während es weiterhin ein ausgereifter Baustein für MLOps-Plattformen und -Projekte bleibt.
Vielen Dank an Artur Dobrogowski, der einen großen Teil dieses Projekts implementiert und getestet hat.
Verfasst von
Marcin Zabłocki
Unsere Ideen
Weitere Blogs
Contact



