Pipeline Training
This tutorial walks you through the creation of a simple TFX pipeline on Kubeflow Pipelines and shows you how to manage pipelines via Kubernetes Custom Resources.
The examples for this tutorial can be found on GitHub.
1. Build the Pipeline
We use the same pipeline as the TFX example with a few modifications.
Create pipeline.py
.
import os
from typing import List, Text
from tfx.components import CsvExampleGen, Pusher, Trainer
from tfx.dsl.components.base.base_node import BaseNode
from tfx.proto import pusher_pb2, trainer_pb2
from tfx.orchestration.data_types import RuntimeParameter
### Environmental parameters can be left out when using the operator.
### Also, the return type is now a list of components instead of a pipeline.
#
#def create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
# module_file: str, serving_model_dir: str,
# metadata_path: str) -> tfx.dsl.Pipeline:
def create_components() -> List[BaseNode]:
"""Creates a penguin pipeline with TFX."""
# Brings data into the pipeline.
example_gen = CsvExampleGen(input_base='/data')
# Uses user-provided Python function that trains a model.
trainer = Trainer(
run_fn='penguin_pipeline.trainer.run_fn',
examples=example_gen.outputs['examples'],
train_args=trainer_pb2.TrainArgs(num_steps=100),
eval_args=trainer_pb2.EvalArgs(num_steps=5))
## Pushes the model to a filesystem destination.
pusher = Pusher(
model=trainer.outputs['model'],
push_destination=RuntimeParameter(name="push_destination", ptype=Text))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
pusher
]
### When using the operator, it creates the pipeline for us,
### so we return the components directly instead.
#
#return tfx.dsl.Pipeline(
# pipeline_name=pipeline_name,
# pipeline_root=pipeline_root,
# metadata_connection_config=tfx.orchestration.metadata
# .sqlite_metadata_connection_config(metadata_path),
# components=components)
return components
Create trainer.py
.
The training code remains unchanged:
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
for feature in _FEATURE_KEYS
},
_LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
schema: schema of the input data.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _build_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.
Returns:
A Keras Model.
"""
# The model below is built with Functional API, please refer to
# https://www.tensorflow.org/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.summary(print_fn=logging.info)
return model
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# This schema is usually either an output of SchemaGen or a manually-curated
# version provided by pipeline author. A schema can also derived from TFT
# graph if a Transform component is used. In the case when either is missing,
# `schema_from_feature_spec` could be used to generate schema from very simple
# feature_spec, but the schema returned would be very primitive.
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
schema,
batch_size=_EVAL_BATCH_SIZE)
model = _build_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
# The result of the training should be saved in `fn_args.serving_model_dir`
# directory.
model.save(fn_args.serving_model_dir, save_format='tf')
Create Dockerfile
.
FROM python:3.10.12
ARG WHEEL_VERSION
RUN mkdir /data
RUN wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv -P /data
COPY dist/*-${WHEEL_VERSION}-*.whl /
RUN pip install /*.whl && rm /*.whl
Next, build the pipeline as a Docker container and push it:
docker build . -t kfp-quickstart:v1
...
docker push kfp-quickstart:v1
2. Create a Pipeline Resource
Now that we have a pipeline image, we can create a pipeline.yaml
resource to manage the lifecycle of this pipeline on Kubeflow:
apiVersion: pipelines.kubeflow.org/v1beta1
kind: Pipeline
metadata:
name: penguin-pipeline
spec:
provider: provider-namespace/provider-name
image: kfp-quickstart:v1
framework:
name: tfx
parameters:
pipeline: penguin_pipeline.pipeline.create_components
beamArgs:
- name: anArg
value: aValue
kubectl apply -f resources/pipeline.yaml
The pipeline now gets uploaded to Kubeflow in several steps. After a few seconds to minutes, the following command should result in a success:
kubectl get pipeline
NAME SYNCHRONIZATIONSTATE PROVIDERID
penguin-pipeline Succeeded 53905abe-0337-48de-875d-67b9285f3cf7
Now visit your Kubeflow Pipelines UI. You should be able to see the newly created pipeline named penguin-pipeline
. Note that you will see two versions: ‘penguin-pipeline’ and ‘v1’. This is due to an open issue on Kubeflow where you can’t specify a version when creating a pipeline.
3. Create an Experiment resource
Note: this step is optional. You can continue with the next step if you want to use the Default
experiment instead.
Create experiment.yaml
:
apiVersion: pipelines.kubeflow.org/v1beta1
kind: Experiment
metadata:
name: penguin-experiment
spec:
description: An experiment for the penguin pipeline
provider: provider-namespace/provider-name
kubectl apply -f resources/experiment.yaml
4. Create a pipeline RunConfiguration resource
We can now define a recurring run declaratively using the RunConfiguration
resource.
Note: remove experimentName
if you want to use the Default
experiment instead of penguin-experiment
Create runconfiguration.yaml
:
apiVersion: pipelines.kubeflow.org/v1beta1
kind: RunConfiguration
metadata:
name: penguin-pipeline-recurring-run
spec:
run:
provider: provider-namespace/provider-name
pipeline: penguin-pipeline
experimentName: penguin-experiment
parameters:
- name: push_destination
value: '{"filesystem":{"base_directory":"gs://my-bucket/penguin-pipeline"}}'
triggers:
schedules:
- cronExpression: 0 * * * *
startTime: 2024-01-01T00:00:00Z
endTime: 2024-12-31T23:59:59Z
kubectl apply -f resources/runconfiguration.yaml
This will trigger run of penguin-pipeline
once every hour. Note that the cron schedule uses a 6-place space separated syntax as defined here.
5. (Optional) Deploy newly trained models
If the operator has been installed with Argo-Events support, we can now specify eventsources and sensors to update arbitrary Kubernetes config when a pipeline has been trained successfully. In this example we are updating a serving component with the location of the newly trained model.
Create apply-model-location.yaml
. This creates an EventSource
and a Sensor
as well as an EventBus
:
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
spec:
nats:
native: {}
---
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: run-completion
spec:
nats:
run-completion:
jsonBody: true
subject: events
url: nats://eventbus-kfp-operator-events-stan-svc.kfp-operator.svc:4222
---
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: penguin-pipeline-model-update
spec:
template:
serviceAccountName: events-sa
dependencies:
- name: run-completion
eventSourceName: run-completion
eventName: run-completion
filters:
data:
- path: body.data.status
type: string
comparator: "="
value:
- "succeeded"
- path: body.data.pipelineName
type: string
comparator: "="
value:
- "penguin-pipeline"
triggers:
- template:
name: update
k8s:
operation: update
source:
resource:
apiVersion: v1
kind: ConfigMap
metadata:
name: serving-config
data:
servingModel: ""
parameters:
- src:
dependencyName: run-completion
dataKey: body.data.artifacts.0.location
dest: data.servingModel
kubectl apply -f resources/apply-model-location.yaml