Pipeline Dependencies
Pipeline dependencies allow splitting up larger machine learning pipelines into sub-pipelines. This is particularly useful when:
- The data of an earlier step changes at a lower frequency than the data for subsequent steps
- Outputs of an earlier step could be shared between pipelines to avoid re-processing the same data
- Serving a combined model of two or more pipelines
In this example, we break up the penguin example pipeline into two pipelines:
- The Penguin Examples Pipeline has a single pipeline step that imports the CSV example and outputs it as an artifact
- The Penguin Training Pipeline references the previously produces example and trains the model
This example might not be of practical use but it demonstrates the approach to managing pipeline dependencies.
The code for this tutorial can be found on GitHub.
1. Build and deploy the Penguin Examples Pipeline
The penguin_examples
pipeline loads the CSV data and makes it available as an output artifact. The code for this pipeline can be found under quickstart-base.
First, create the pipeline code in penguin_examples/pipeline.py
:
import os
from typing import List
from tfx.components import CsvExampleGen, Trainer
from tfx.proto import trainer_pb2
from tfx.dsl.components.base.base_node import BaseNode
def create_components() -> List[BaseNode]:
"""Creates a penguin pipeline with TFX."""
# Brings data into the pipeline.
example_gen = CsvExampleGen(input_base='/data')
return [ example_gen ]
Next, create the following Dockerfile
, then build and push the pipeline image:
FROM python:3.10.12
ARG WHEEL_VERSION
RUN mkdir /data
RUN wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv -P /data
COPY dist/*-${WHEEL_VERSION}-*.whl /
RUN pip install /*.whl && rm /*.whl
docker build quickstart-base -t kfp-quickstart-base:v1
docker push kfp-quickstart-base:v1
Now create and apply the resources needed to compile and train the penguin examples pipeline:
Create quickstart-base/resources/pipeline.yaml
. The specification has not changed from the original example.
apiVersion: pipelines.kubeflow.org/v1beta1
kind: Pipeline
metadata:
name: penguin-pipeline-examples
namespace: base-namespace
spec:
provider: provider-namespace/kfp
image: kfp-quickstart-base:v1
framework:
name: tfx
parameters:
pipeline: penguin_examples.pipeline.create_components
Create quickstart-base/resources/runconfiguration.yaml
which schedules the pipeline to be trained at regular intervals and specifies the output artifacts that it exposes.
apiVersion: pipelines.kubeflow.org/v1beta1
kind: RunConfiguration
metadata:
name: penguin-pipeline-examples-rc
namespace: base-namespace
spec:
run:
provider: provider-namespace/kfp
pipeline: penguin-pipeline-examples
artifacts:
- name: examples
path: CsvExampleGen:examples
triggers:
schedules:
- cronExpression: '0 * * * *'
startTime: "2024-01-01T00:00:00Z"
endTime: "2024-12-31T23:59:59Z"
Finally, apply the resources:
kubectl apply -f quickstart-base/resources/pipeline.yaml
kubectl apply -f quickstart-base/resources/runconfiguration.yaml
2. Build and deploy the Penguin Training Pipeline
The penguin_training
pipeline imports the artifact exposed by the penguin_examples
pipeline and trains a model in the same way as the original pipeline.
The code for this pipeline can be found under quickstart-dependant.
Create penguin_training/pipeline.py
which imports the artifact as referenced by its runtime parameters. Note that the CsvExampleGen
has been replaced by a ImportExampleGen
:
import os
from typing import List
from tfx.components import CsvExampleGen, Trainer, ImportExampleGen
from tfx.proto import trainer_pb2, example_gen_pb2
from tfx.dsl.components.base.base_node import BaseNode
from tfx.v1.dsl.experimental import RuntimeParameter
from typing import Text
def create_components() -> List[BaseNode]:
"""Creates a penguin pipeline with TFX."""
# Defines a pipeline runtime parameter
examples_location_param = RuntimeParameter(
name = "examples_location",
ptype = Text
)
# Imports the artifact referenced by the runtime parameter
examples = ImportExampleGen(
input_base = examples_location_param,
input_config=example_gen_pb2.Input(
splits=[
example_gen_pb2.Input.Split(
name="eval",
pattern='Split-eval/*'
),
example_gen_pb2.Input.Split(
name="train",
pattern='Split-train/*'
),
]
),
)
trainer = Trainer(
run_fn='penguin_training.trainer.run_fn',
examples=examples.outputs['examples'],
train_args=trainer_pb2.TrainArgs(num_steps=100),
eval_args=trainer_pb2.EvalArgs(num_steps=5))
components = [
examples,
trainer,
]
return components
Next create penguin_training/trainer.py
, which has not changed from the original pipeline.
Create the following Dockerfile
. In contrast to the above, this Dockerfile does not include the examples CSV.
FROM python:3.10.12
ARG WHEEL_VERSION
COPY dist/*-${WHEEL_VERSION}-*.whl /
RUN pip install /*.whl && rm /*.whl
Next, build the pipeline as a Docker container and push it:
docker build quickstart-dependant -t kfp-quickstart-dependant:v1
docker push kfp-quickstart-dependant:v1
Now create and apply the resources needed to compile and train the penguin training pipeline.
quickstart-dependant/resources/pipeline.yaml
has not changed from the original example:
apiVersion: pipelines.kubeflow.org/v1beta1
kind: Pipeline
metadata:
name: penguin-pipeline-training
namespace: dependant-namespace
spec:
provider: provider-namespace/kfp
image: kfp-quickstart-base:v1
framework:
name: tfx
parameters:
pipeline: penguin_training.pipeline.create_components
quickstart-dependant/resources/runconfiguration.yaml
has been updated so that a run is triggered as soon as the dependency has finished training and produced the referenced artifact. This artifact will then be provided to the pipeline as a runtime parameter:
apiVersion: pipelines.kubeflow.org/v1beta1
kind: RunConfiguration
metadata:
name: penguin-pipeline-training-rc
namespace: dependant-namespace
spec:
run:
provider: provider-namespace/kfp
pipeline: penguin-pipeline-training
parameters:
- name: examples_location
valueFrom:
runConfigurationRef:
name: base-namespace/penguin-pipeline-examples-rc
outputArtifact: examples
triggers:
runConfigurations:
- base-namespace/penguin-pipeline-examples-rc
Apply the resources as follows:
kubectl apply -f quickstart-dependant/resources/pipeline.yaml
kubectl apply -f quickstart-dependant/resources/runconfiguration.yaml