Training Pipeline Tutorial

Complete tutorial for building and deploying TFX training pipelines with the KFP Operator

ML Pipeline Training Tutorial

This comprehensive tutorial walks you through creating, deploying, and managing a complete TFX training pipeline using the KFP Operator. You’ll learn how to build a penguin species classification pipeline and manage its entire lifecycle through Kubernetes Custom Resources.

What You’ll Learn

By the end of this tutorial, you’ll be able to:

  • Build TFX pipelines
  • Containerize ML workflows with proper dependencies
  • Deploy pipelines using Kubernetes resources
  • Execute and monitor pipeline runs
  • Set up automated scheduling for continuous training
  • Handle events for model deployment automation

Prerequisites

Before starting, ensure you have:

  • KFP Operator installed in your cluster (Installation Guide)
  • Docker for building container images
  • Container registry access (Docker Hub, GCR, ECR, etc.)
  • Basic TFX knowledge (helpful but not required)

Example Code

All code for this tutorial is available on GitHub.

# Clone the repository to follow along
git clone https://github.com/sky-uk/kfp-operator.git
cd kfp-operator/docs-gen/includes/master/quickstart

Step 1: Build the TFX Pipeline

We’ll create a complete TFX pipeline for penguin species classification, based on the TFX penguin example.

Understanding the Pipeline Structure

Our pipeline consists of these TFX components:

  1. ExampleGen: Ingests raw data from CSV files
  2. StatisticsGen: Generates statistics for data analysis
  3. SchemaGen: Infers data schema automatically
  4. ExampleValidator: Validates data against schema
  5. Transform: Performs feature engineering
  6. Trainer: Trains the ML model
  7. Evaluator: Evaluates model performance
  8. Pusher: Deploys the trained model

Create the Pipeline Definition

Create penguin_pipeline/pipeline.py:

import os

from typing import List, Text

from tfx.components import CsvExampleGen, Pusher, Trainer
from tfx.dsl.components.base.base_node import BaseNode
from tfx.proto import pusher_pb2, trainer_pb2
from tfx.orchestration.data_types import RuntimeParameter

### Environmental parameters can be left out when using the operator.
### Also, the return type is now a list of components instead of a pipeline.
#
#def create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
#                    module_file: str, serving_model_dir: str,
#                    metadata_path: str) -> tfx.dsl.Pipeline:
def create_components() -> List[BaseNode]:
    """Creates a penguin pipeline with TFX."""
    # Brings data into the pipeline.
    example_gen = CsvExampleGen(input_base='/data')

    # Uses user-provided Python function that trains a model.
    trainer = Trainer(
        run_fn='penguin_pipeline.trainer.run_fn',
        examples=example_gen.outputs['examples'],
        train_args=trainer_pb2.TrainArgs(num_steps=100),
        eval_args=trainer_pb2.EvalArgs(num_steps=5))

    ## Pushes the model to a filesystem destination.
    pusher = Pusher(
     model=trainer.outputs['model'],
     push_destination=RuntimeParameter(name="push_destination", ptype=Text))

    # Following three components will be included in the pipeline.
    components = [
        example_gen,
        trainer,
        pusher
    ]

    ### When using the operator, it creates the pipeline for us, 
    ### so we return the components directly instead.
    #
    #return tfx.dsl.Pipeline(
    #  pipeline_name=pipeline_name,
    #  pipeline_root=pipeline_root,
    #  metadata_connection_config=tfx.orchestration.metadata
    #  .sqlite_metadata_connection_config(metadata_path),
    #  components=components)

    return components

Create the Training Module

Create penguin_pipeline/trainer.py:

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

Create the Container Image

Create Dockerfile:

FROM python:3.10.12
ARG WHEEL_VERSION

RUN mkdir /data
RUN wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv -P /data

COPY dist/*-${WHEEL_VERSION}-*.whl /
RUN pip install /*.whl && rm /*.whl

Build and Push the Container

Build the pipeline container and push to your registry:

# Set your container registry
export CONTAINER_REGISTRY="your-registry.com/your-project"

# Build the container
make docker-build

# Push to registry
make docker-push

Verify the Build

Test your container locally:

# Run container to verify it works
docker run --rm ${REGISTRY}/penguin-pipeline:v1.0.0 python -c "
import tfx
print('Pipeline build successful!')
print(f'TFX version: {tfx.__version__}')
"

Expected output:

Pipeline build successful!
TFX version: 1.14.0

2. Create a Pipeline Resource

Now that we have a pipeline image, we can create a pipeline.yaml resource to manage the lifecycle of this pipeline on Kubeflow:

apiVersion: pipelines.kubeflow.org/v1beta1
kind: Pipeline
metadata:
  name: penguin-pipeline
spec:
  provider: provider-namespace/provider-name
  image: kfp-quickstart:v1
  framework:
    name: tfx
    parameters:
      pipeline: penguin_pipeline.pipeline.create_components
      beamArgs:
      - name: anArg
        value: aValue
kubectl apply -f resources/pipeline.yaml

The pipeline now gets uploaded to Kubeflow in several steps. After a few seconds to minutes, the following command should result in a success:

kubectl get mlp

NAME               SYNCHRONIZATIONSTATE   PROVIDERID
penguin-pipeline   Succeeded              provider-namespace/provider-name

Now visit your Kubeflow Pipelines UI. You should be able to see the newly created pipeline named penguin-pipeline. Note that you will see two versions: ‘penguin-pipeline’ and ‘v1’. This is due to an open issue on Kubeflow where you can’t specify a version when creating a pipeline.

3. Create a pipeline RunConfiguration resource

We can now define a recurring run declaratively using the RunConfiguration resource.

Note: remove experimentName if you want to use the Default experiment instead of penguin-experiment

Create runconfiguration.yaml:

apiVersion: pipelines.kubeflow.org/v1beta1
kind: RunConfiguration
metadata:
    name: penguin-pipeline-recurring-run
spec:
  run:
    provider: provider-namespace/provider-name
    pipeline: penguin-pipeline
    experimentName: penguin-experiment
    parameters:
    - name: push_destination
      value: '{"filesystem":{"base_directory":"gs://my-bucket/penguin-pipeline"}}'
  triggers:
    schedules:
    - cronExpression: 0 * * * *
      startTime: 2024-01-01T00:00:00Z
      endTime: 2024-12-31T23:59:59Z
kubectl apply -f resources/runconfiguration.yaml

This will trigger run of penguin-pipeline once every hour. Note that the cron schedule uses a 6-place space separated syntax as defined here.

4. (Optional) Deploy newly trained models

If the operator has been installed with Argo-Events support, we can now specify eventsources and sensors to update arbitrary Kubernetes config when a pipeline has been trained successfully. In this example we are updating a serving component with the location of the newly trained model.

Create apply-model-location.yaml. This creates an EventSource and a Sensor as well as an EventBus:

apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
spec:
  nats:
    native: {}
---
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: run-completion
spec:
  nats:
    run-completion:
      jsonBody: true
      subject: events
      url: nats://eventbus-kfp-operator-events-stan-svc.kfp-operator.svc:4222
---
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: penguin-pipeline-model-update
spec:
  template:
    serviceAccountName: events-sa
  dependencies:
  - name: run-completion
    eventSourceName: run-completion
    eventName: run-completion
    filters:
      data:
      - path: body.data.status
        type: string
        comparator: "="
        value:
          - "succeeded"
      - path: body.data.pipelineName
        type: string
        comparator: "="
        value:
        - "penguin-pipeline"
  triggers:
  - template:
      name: update
      k8s:
        operation: update
        source:
          resource:
            apiVersion: v1
            kind: ConfigMap
            metadata:
              name: serving-config
            data:
              servingModel: ""
        parameters:
        - src:
            dependencyName: run-completion
            dataKey: body.data.artifacts.0.location
          dest: data.servingModel
kubectl apply -f resources/apply-model-location.yaml