Run Completion Events
Run completion events are created by a Provider to signal the completion of a pipeline run, and is used to trigger downstream processes.
Within the operator this can be updating the status fields of resources such as their synchronizationState or providerId. Within the ML Ops ecosystem,
this can also be used to reload a serving instance of a model with the newly trained version.
Event Specification
The specification of the events follows CloudEvents:
{
"id": "{{ UNIQUE_MESSAGE_ID }}",
"specversion": "1.0",
"source": "{{ PROVIDER_NAME }}",
"type": "org.kubeflow.pipelines.run-completion",
"datacontenttype": "application/json",
"data": {
"provider": "{{ PROVIDER_NAME }}",
"status": "succeeded|failed",
"pipelineName": "{{ PIPELINE_NAME }}",
"servingModelArtifacts": [
{
"name": "{{ PIPELINE_NAME }}:{{ WORKFLOW_NAME }}:Pusher:pushed_model:{{ PUSHER_INDEX }}",
"location": "gs://{{ PIPELINE_ROOT }}/Pusher/pushed_model/{{ MODEL_VERSION }}"
}
],
"artifacts": [
{
"name": "serving-model",
"location": "gs://{{ ARTIFACT_LOCATION }}"
}
],
"runStartTime": "{{ START_TIME }}",
"runEndTime": "{{ END_TIME }}"
}
}
Using Events
The KFP-Operator Events system provides a NATS Event bus in the operator namespace to consume events from. To use it, users can create an Argo-Events NATS Eventsource as follows:
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: run-completion
spec:
nats:
run-completion:
jsonBody: true
subject: events
url: nats://eventbus-kfp-operator-events-stan-svc.kfp-operator.svc:4222
NOTE: currently, the event includes both
servingModelArtifactsandartifacts:
servingModelArtifactscontain a list of all artifacts of type Pushed Model for the pipeline run. This field is deprecated andartifactsshould be used instead, which are resolved according to Run Artifact Definition
A sensor for the pipeline penguin-pipeline could look as follows:
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: penguin-pipeline-model-update
spec:
dependencies:
- name: run-completion
eventSourceName: run-completion
eventName: run-completion
filters:
data:
- path: body.status
type: string
comparator: "="
value:
- "succeeded"
- path: body.pipelineName
type: string
comparator: "="
value:
- "penguin-pipeline"
triggers:
- template:
name: log
log: {}
For more information and an in-depth example, see the Quickstart Guide and Argo-Events Documentation.
Please make sure to provide an event bus for the eventsource and the sensor to connect to. You can define a default event bus, which does not require further configuration on either end, as follows:
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
spec:
nats:
native: {}