我试图添加一个逻辑,当管道因某些错误而终止时,该逻辑将发送松弛通知。我试图用ExitHandler
实现这一点。但是,似乎ExitHandler
不能依赖于任何操作。你有什么好主意吗?
我找到了一个使用ExitHandler
的解决方案。我在下面发布了我的代码,希望它可以帮助其他人。
def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
"""
performs slack notifications
"""
send_slack_op = dsl.ContainerOp(
name=name,
image='wenmin.wu/slack-cli:latest',
is_exit_handler=is_exit_handler,
command=['sh', '-c'],
arguments=["/send-message.sh -d {} '{}'".format(slack_channel, status)]
)
send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
return send_slack_op
@dsl.pipeline(
name='forecasting-supply',
description='forecasting supply ...'
)
def ml_pipeline(
param1,
param2,
param3,
):
exit_task = slack_notification(
slack_channel = slack_channel,
name = "supply-forecasting",
status = "Kubeflow pipeline: {{workflow.name}} has {{workflow.status}}!",
is_exit_handler = True
)
with dsl.ExitHandler(exit_task):
# put other tasks here