Pipeline Training
This tutorial walks you through the creation of a simple TFX pipeline on Kubeflow Pipelines and shows you how to manage pipelines via Kubernetes Custom Resources.
The examples for this tutorial can be found on GitHub.
1. Build the Pipeline
We use the same pipeline as the TFX example with a few modifications.
Create pipeline.py
.
Note that the pipeline definition itself is simpler because all infrastructure references, like pusher and pipeline root, will be injected by the operator before the pipeline is uploaded to Kubeflow:
import os
from typing import List
from tfx.components import CsvExampleGen, Trainer
from tfx.proto import trainer_pb2
from tfx.dsl.components.base.base_node import BaseNode
### Environmental parameters can be left out when using the operator.
### Also, the return type is now a list of components instead of a pipeline.
#
#def create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
# module_file: str, serving_model_dir: str,
# metadata_path: str) -> tfx.dsl.Pipeline:
def create_components() -> List[BaseNode]:
"""Creates a penguin pipeline with TFX."""
# Brings data into the pipeline.
example_gen = CsvExampleGen(input_base='/data')
# Uses user-provided Python function that trains a model.
trainer = Trainer(
run_fn='trainer.run_fn',
examples=example_gen.outputs['examples'],
train_args=trainer_pb2.TrainArgs(num_steps=100),
eval_args=trainer_pb2.EvalArgs(num_steps=5))
### This needs to be omitted when using the operator.
#
## Pushes the model to a filesystem destination.
#pusher = tfx.components.Pusher(
# model=trainer.outputs['model'],
# push_destination=tfx.proto.PushDestination(
# filesystem=tfx.proto.PushDestination.Filesystem(
# base_directory=serving_model_dir)))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
#pusher
]
### When using the operator, it creates the pipeline for us,
### so we return the components directly instead.
#
#return tfx.dsl.Pipeline(
# pipeline_name=pipeline_name,
# pipeline_root=pipeline_root,
# metadata_connection_config=tfx.orchestration.metadata
# .sqlite_metadata_connection_config(metadata_path),
# components=components)
return components
Create trainer.py
.
The training code remains unchanged:
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
for feature in _FEATURE_KEYS
},
_LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
schema: schema of the input data.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _build_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.
Returns:
A Keras Model.
"""
# The model below is built with Functional API, please refer to
# https://www.tensorflow.org/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.summary(print_fn=logging.info)
return model
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# This schema is usually either an output of SchemaGen or a manually-curated
# version provided by pipeline author. A schema can also derived from TFT
# graph if a Transform component is used. In the case when either is missing,
# `schema_from_feature_spec` could be used to generate schema from very simple
# feature_spec, but the schema returned would be very primitive.
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
schema,
batch_size=_EVAL_BATCH_SIZE)
model = _build_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
# The result of the training should be saved in `fn_args.serving_model_dir`
# directory.
model.save(fn_args.serving_model_dir, save_format='tf')
Create Dockerfile
.
FROM python:3.10.12 as builder
RUN mkdir /data
RUN wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv -P /data
RUN pip install poetry==1.7.1
COPY pyproject.toml .
COPY poetry.lock .
RUN poetry config virtualenvs.create true
RUN poetry config virtualenvs.in-project true
RUN poetry install --no-root
FROM python:3.10.12-slim as runtime
ENV VIRTUAL_ENV=/.venv \
PATH="/.venv/bin:$PATH"
WORKDIR /pipeline
COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
COPY --from=builder /data /data
COPY penguin_pipeline/*.py ./
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"
Next, build the pipeline as a Docker container and push it:
docker build . -t kfp-quickstart:v1
...
docker push kfp-quickstart:v1
2. Create a Pipeline Resource
Now that we have a pipeline image, we can create a pipeline.yaml
resource to manage the lifecycle of this pipeline on Kubeflow:
apiVersion: pipelines.kubeflow.org/v1alpha6
kind: Pipeline
metadata:
name: penguin-pipeline
spec:
image: kfp-quickstart:v1
tfxComponents: pipeline.create_components
kubectl apply -f resources/pipeline.yaml
The pipeline now gets uploaded to Kubeflow in several steps. After a few seconds to minutes, the following command should result in a success:
kubectl get pipeline
NAME SYNCHRONIZATIONSTATE PROVIDERID
penguin-pipeline Succeeded 53905abe-0337-48de-875d-67b9285f3cf7
Now visit your Kubeflow Pipelines UI. You should be able to see the newly created pipeline named penguin-pipeline
. Note that you will see two versions: ‘penguin-pipeline’ and ‘v1’. This is due to an open issue on Kubeflow where you can’t specify a version when creating a pipeline.
3. Create an Experiment resource
Note: this step is optional. You can continue with the next step if you want to use the Default
experiment instead.
Create experiment.yaml
:
apiVersion: pipelines.kubeflow.org/v1alpha6
kind: Experiment
metadata:
name: penguin-experiment
spec:
description: An experiment for the penguin pipeline
kubectl apply -f resources/experiment.yaml
4. Create a pipeline RunConfiguration resource
We can now define a recurring run declaratively using the RunConfiguration
resource.
Note: remove experimentName
if you want to use the Default
experiment instead of penguin-experiment
Create runconfiguration.yaml
:
apiVersion: pipelines.kubeflow.org/v1alpha6
kind: RunConfiguration
metadata:
name: penguin-pipeline-recurring-run
spec:
run:
pipeline: penguin-pipeline
experimentName: penguin-experiment
triggers:
schedules:
- cronExpression: 0 * * * *
startTime: 2024-01-01T00:00:00Z
endTime: 2024-12-31T23:59:59Z
kubectl apply -f resources/runconfiguration.yaml
This will trigger run of penguin-pipeline
once every hour. Note that the cron schedule uses a 6-place space separated syntax as defined here.
5. (Optional) Deploy newly trained models
If the operator has been installed with Argo-Events support, we can now specify eventsources and sensors to update arbitrary Kubernetes config when a pipeline has been trained successfully. In this example we are updating a serving component with the location of the newly trained model.
Create apply-model-location.yaml
. This creates an EventSource
and a Sensor
as well as an EventBus
:
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
spec:
nats:
native: {}
---
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: run-completion
spec:
nats:
run-completion:
jsonBody: true
subject: events
url: nats://eventbus-kfp-operator-events-stan-svc.kfp-operator.svc:4222
---
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: penguin-pipeline-model-update
spec:
template:
serviceAccountName: events-sa
dependencies:
- name: run-completion
eventSourceName: run-completion
eventName: run-completion
filters:
data:
- path: body.data.status
type: string
comparator: "="
value:
- "succeeded"
- path: body.data.pipelineName
type: string
comparator: "="
value:
- "penguin-pipeline"
triggers:
- template:
name: update
k8s:
operation: update
source:
resource:
apiVersion: v1
kind: ConfigMap
metadata:
name: serving-config
data:
servingModel: ""
parameters:
- src:
dependencyName: run-completion
dataKey: body.data.artifacts.0.location
dest: data.servingModel
kubectl apply -f resources/apply-model-location.yaml