Pipeline Training

This tutorial walks you through the creation of a simple TFX pipeline on Kubeflow Pipelines and shows you how to manage pipelines via Kubernetes Custom Resources.

The examples for this tutorial can be found on GitHub.

1. Build the Pipeline

We use the same pipeline as the TFX example with a few modifications.

Create pipeline.py.

import os from typing import List, Text from tfx.components import CsvExampleGen, Pusher, Trainer from tfx.dsl.components.base.base_node import BaseNode from tfx.proto import pusher_pb2, trainer_pb2 from tfx.orchestration.data_types import RuntimeParameter ### Environmental parameters can be left out when using the operator. ### Also, the return type is now a list of components instead of a pipeline. # #def create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str, # module_file: str, serving_model_dir: str, # metadata_path: str) -> tfx.dsl.Pipeline: def create_components() -> List[BaseNode]: """Creates a penguin pipeline with TFX.""" # Brings data into the pipeline. example_gen = CsvExampleGen(input_base='/data') # Uses user-provided Python function that trains a model. trainer = Trainer( run_fn='penguin_pipeline.trainer.run_fn', examples=example_gen.outputs['examples'], train_args=trainer_pb2.TrainArgs(num_steps=100), eval_args=trainer_pb2.EvalArgs(num_steps=5)) ## Pushes the model to a filesystem destination. pusher = Pusher( model=trainer.outputs['model'], push_destination=RuntimeParameter(name="push_destination", ptype=Text)) # Following three components will be included in the pipeline. components = [ example_gen, trainer, pusher ] ### When using the operator, it creates the pipeline for us, ### so we return the components directly instead. # #return tfx.dsl.Pipeline( # pipeline_name=pipeline_name, # pipeline_root=pipeline_root, # metadata_connection_config=tfx.orchestration.metadata # .sqlite_metadata_connection_config(metadata_path), # components=components) return components

Create trainer.py. The training code remains unchanged:

from typing import List from absl import logging import tensorflow as tf from tensorflow import keras from tensorflow_transform.tf_metadata import schema_utils from tfx import v1 as tfx from tfx_bsl.public import tfxio from tensorflow_metadata.proto.v0 import schema_pb2 _FEATURE_KEYS = [ 'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g' ] _LABEL_KEY = 'species' _TRAIN_BATCH_SIZE = 20 _EVAL_BATCH_SIZE = 10 # Since we're not generating or creating a schema, we will instead create # a feature spec. Since there are a fairly small number of features this is # manageable for this dataset. _FEATURE_SPEC = { **{ feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32) for feature in _FEATURE_KEYS }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64) } def _input_fn(file_pattern: List[str], data_accessor: tfx.components.DataAccessor, schema: schema_pb2.Schema, batch_size: int = 200) -> tf.data.Dataset: """Generates features and label for training. Args: file_pattern: List of paths or patterns of input tfrecord files. data_accessor: DataAccessor for converting input to RecordBatch. schema: schema of the input data. batch_size: representing the number of consecutive elements of returned dataset to combine in a single batch Returns: A dataset that contains (features, indices) tuple where features is a dictionary of Tensors, and indices is a single Tensor of label indices. """ return data_accessor.tf_dataset_factory( file_pattern, tfxio.TensorFlowDatasetOptions( batch_size=batch_size, label_key=_LABEL_KEY), schema=schema).repeat() def _build_keras_model() -> tf.keras.Model: """Creates a DNN Keras model for classifying penguin data. Returns: A Keras Model. """ # The model below is built with Functional API, please refer to # https://www.tensorflow.org/guide/keras/overview for all API options. inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS] d = keras.layers.concatenate(inputs) for _ in range(2): d = keras.layers.Dense(8, activation='relu')(d) outputs = keras.layers.Dense(3)(d) model = keras.Model(inputs=inputs, outputs=outputs) model.compile( optimizer=keras.optimizers.Adam(1e-2), loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=[keras.metrics.SparseCategoricalAccuracy()]) model.summary(print_fn=logging.info) return model # TFX Trainer will call this function. def run_fn(fn_args: tfx.components.FnArgs): """Train the model based on given args. Args: fn_args: Holds args used to train the model as name/value pairs. """ # This schema is usually either an output of SchemaGen or a manually-curated # version provided by pipeline author. A schema can also derived from TFT # graph if a Transform component is used. In the case when either is missing, # `schema_from_feature_spec` could be used to generate schema from very simple # feature_spec, but the schema returned would be very primitive. schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC) train_dataset = _input_fn( fn_args.train_files, fn_args.data_accessor, schema, batch_size=_TRAIN_BATCH_SIZE) eval_dataset = _input_fn( fn_args.eval_files, fn_args.data_accessor, schema, batch_size=_EVAL_BATCH_SIZE) model = _build_keras_model() model.fit( train_dataset, steps_per_epoch=fn_args.train_steps, validation_data=eval_dataset, validation_steps=fn_args.eval_steps) # The result of the training should be saved in `fn_args.serving_model_dir` # directory. model.save(fn_args.serving_model_dir, save_format='tf')

Create Dockerfile.

FROM python:3.10.12 ARG WHEEL_VERSION RUN mkdir /data RUN wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv -P /data COPY dist/*-${WHEEL_VERSION}-*.whl / RUN pip install /*.whl && rm /*.whl

Next, build the pipeline as a Docker container and push it:

docker build . -t kfp-quickstart:v1 ... docker push kfp-quickstart:v1

2. Create a Pipeline Resource

Now that we have a pipeline image, we can create a pipeline.yaml resource to manage the lifecycle of this pipeline on Kubeflow:

apiVersion: pipelines.kubeflow.org/v1beta1 kind: Pipeline metadata: name: penguin-pipeline spec: provider: provider-namespace/provider-name image: kfp-quickstart:v1 framework: name: tfx parameters: pipeline: penguin_pipeline.pipeline.create_components beamArgs: - name: anArg value: aValue
kubectl apply -f resources/pipeline.yaml

The pipeline now gets uploaded to Kubeflow in several steps. After a few seconds to minutes, the following command should result in a success:

kubectl get pipeline NAME SYNCHRONIZATIONSTATE PROVIDERID penguin-pipeline Succeeded 53905abe-0337-48de-875d-67b9285f3cf7

Now visit your Kubeflow Pipelines UI. You should be able to see the newly created pipeline named penguin-pipeline. Note that you will see two versions: ‘penguin-pipeline’ and ‘v1’. This is due to an open issue on Kubeflow where you can’t specify a version when creating a pipeline.

3. Create an Experiment resource

Note: this step is optional. You can continue with the next step if you want to use the Default experiment instead.

Create experiment.yaml:

apiVersion: pipelines.kubeflow.org/v1beta1 kind: Experiment metadata: name: penguin-experiment spec: description: An experiment for the penguin pipeline provider: provider-namespace/provider-name
kubectl apply -f resources/experiment.yaml

4. Create a pipeline RunConfiguration resource

We can now define a recurring run declaratively using the RunConfiguration resource.

Note: remove experimentName if you want to use the Default experiment instead of penguin-experiment

Create runconfiguration.yaml:

apiVersion: pipelines.kubeflow.org/v1beta1 kind: RunConfiguration metadata: name: penguin-pipeline-recurring-run spec: run: provider: provider-namespace/provider-name pipeline: penguin-pipeline experimentName: penguin-experiment parameters: - name: push_destination value: '{"filesystem":{"base_directory":"gs://my-bucket/penguin-pipeline"}}' triggers: schedules: - cronExpression: 0 * * * * startTime: 2024-01-01T00:00:00Z endTime: 2024-12-31T23:59:59Z
kubectl apply -f resources/runconfiguration.yaml

This will trigger run of penguin-pipeline once every hour. Note that the cron schedule uses a 6-place space separated syntax as defined here.

5. (Optional) Deploy newly trained models

If the operator has been installed with Argo-Events support, we can now specify eventsources and sensors to update arbitrary Kubernetes config when a pipeline has been trained successfully. In this example we are updating a serving component with the location of the newly trained model.

Create apply-model-location.yaml. This creates an EventSource and a Sensor as well as an EventBus:

apiVersion: argoproj.io/v1alpha1 kind: EventBus metadata: name: default spec: nats: native: {} --- apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: name: run-completion spec: nats: run-completion: jsonBody: true subject: events url: nats://eventbus-kfp-operator-events-stan-svc.kfp-operator.svc:4222 --- apiVersion: argoproj.io/v1alpha1 kind: Sensor metadata: name: penguin-pipeline-model-update spec: template: serviceAccountName: events-sa dependencies: - name: run-completion eventSourceName: run-completion eventName: run-completion filters: data: - path: body.data.status type: string comparator: "=" value: - "succeeded" - path: body.data.pipelineName type: string comparator: "=" value: - "penguin-pipeline" triggers: - template: name: update k8s: operation: update source: resource: apiVersion: v1 kind: ConfigMap metadata: name: serving-config data: servingModel: "" parameters: - src: dependencyName: run-completion dataKey: body.data.artifacts.0.location dest: data.servingModel
kubectl apply -f resources/apply-model-location.yaml