Run Completion Events
The KFP-Operator Events system provides a NATS Event bus in the operator namespace to consume events from. To use it, users can create an Argo-Events NATS Eventsource as follows:
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: run-completion
spec:
nats:
run-completion:
jsonBody: true
subject: events
url: nats://eventbus-kfp-operator-events-stan-svc.kfp-operator.svc:4222
The specification of the events follows CloudEvents:
{
"id": "{{ UNIQUE_MESSAGE_ID }}",
"specversion": "1.0",
"source": "{{ PROVIDER_NAME }}",
"type": "org.kubeflow.pipelines.run-completion",
"datacontenttype": "application/json",
"data": {
"provider": "{{ PROVIDER_NAME }}",
"status": "succeeded|failed",
"pipelineName":"{{ PIPELINE_NAME }}",
"servingModelArtifacts": [
{
"name":"{{ PIPELINE_NAME }}:{{ WORKFLOW_NAME }}:Pusher:pushed_model:{{ PUSHER_INDEX }}",
"location":"gs://{{ PIPELINE_ROOT }}/Pusher/pushed_model/{{ MODEL_VERSION }}"
}
],
"artifacts": [
{
"name":"serving-model",
"location":"gs://{{ ARTIFACT_LOCATION }}"
}
]
}
}
NOTE: currently, the event includes both
servingModelArtifacts
andartifacts
:
servingModelArtifacts
contain a list of all artifacts of type Pushed Model for the pipeline run. This field is deprecated andartifacts
should be used instead, which are resolved according to Run Artifact Definition
A sensor for the pipeline penguin-pipeline
could look as follows:
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: penguin-pipeline-model-update
spec:
dependencies:
- name: run-completion
eventSourceName: run-completion
eventName: run-completion
filters:
data:
- path: body.status
type: string
comparator: "="
value:
- "succeeded"
- path: body.pipelineName
type: string
comparator: "="
value:
- "penguin-pipeline"
triggers:
- template:
name: log
log: {}
For more information and an in-depth example, see the Quickstart Guide and Argo-Events Documentation.
Please make sure to provide an event bus for the eventsource and the sensor to connect to. You can define a default event bus, which does not require further configuration on either end, as follows:
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
spec:
nats:
native: {}