Run Completion Events

Model Serving

The KFP-Operator Events system provides a NATS Event bus in the operator namespace to consume events from. To use it, users can create an Argo-Events NATS Eventsource as follows:

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: run-completion
spec:
  nats:
    run-completion:
      jsonBody: true
      subject: events
      url: nats://eventbus-kfp-operator-events-stan-svc.kfp-operator.svc:4222

The specification of the events follows CloudEvents:

{
  "id": "{{ UNIQUE_MESSAGE_ID }}",
  "specversion": "1.0",
  "source": "{{ PROVIDER_NAME }}",
  "type": "org.kubeflow.pipelines.run-completion",
  "datacontenttype": "application/json",
  "data": {
    "provider": "{{ PROVIDER_NAME }}",
    "status": "succeeded|failed",
    "pipelineName":"{{ PIPELINE_NAME }}",
    "servingModelArtifacts": [
      {
        "name":"{{ PIPELINE_NAME }}:{{ WORKFLOW_NAME }}:Pusher:pushed_model:{{ PUSHER_INDEX }}",
        "location":"gs://{{ PIPELINE_ROOT }}/Pusher/pushed_model/{{ MODEL_VERSION }}"
      }
    ],
    "artifacts": [
      {
        "name":"serving-model",
        "location":"gs://{{ ARTIFACT_LOCATION }}"
      }
    ]
  }
}

NOTE: currently, the event includes both servingModelArtifacts and artifacts:

servingModelArtifacts contain a list of all artifacts of type Pushed Model for the pipeline run. This field is deprecated and artifacts should be used instead, which are resolved according to Run Artifact Definition

A sensor for the pipeline penguin-pipeline could look as follows:

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: penguin-pipeline-model-update
spec:
  dependencies:
    - name: run-completion
      eventSourceName: run-completion
      eventName: run-completion
      filters:
        data:
          - path: body.status
            type: string
            comparator: "="
            value:
              - "succeeded"
          - path: body.pipelineName
            type: string
            comparator: "="
            value:
              - "penguin-pipeline"
  triggers:
    - template:
        name: log
        log: {}

For more information and an in-depth example, see the Quickstart Guide and Argo-Events Documentation.

Please make sure to provide an event bus for the eventsource and the sensor to connect to. You can define a default event bus, which does not require further configuration on either end, as follows:

apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
spec:
  nats:
    native: {}