我正在尝试运行一个顶点AI管道作业,如果某个管道参数(在本例中为do_task1
)的值为False
,则跳过某个管道步骤。但是因为有另一个步骤无条件运行并期望第一个可能跳过的步骤的输出,所以我得到以下错误,do_task1是True
或False
:
AssertionError: component_input_artifact: pipelineparam--task1-output_path not found. All inputs: parameters {
key: "do_task1"
value {
type: STRING
}
}
parameters {
key: "task1_name"
value {
type: STRING
}
}
编译器似乎无法从task1
中找到输出output_path
。所以我想知道是否有任何方法可以为dsl.条件
下的那些步骤的输出设置某种占位符,因此它们会被默认值填充,除非实际步骤运行并用非默认值填充它们。下面的代码代表了问题,并且很容易重现。
我正在使用google-blod-aiPlatform==1.14.0
和kfp==1.8.11
from typing import NamedTuple
from kfp import dsl
from kfp.v2.dsl import Dataset, Input, OutputPath, component
from kfp.v2 import compiler
from google.cloud.aiplatform import pipeline_jobs
@component(
base_image="python:3.9",
packages_to_install=["pandas"]
)
def task1(
# inputs
task1_name: str,
# outputs
output_path: OutputPath("Dataset"),
) -> NamedTuple("Outputs", [("output_1", str), ("output_2", int)]):
import pandas as pd
output_1 = task1_name + "-processed"
output_2 = 2
df_output_1 = pd.DataFrame({"output_1": [output_1]})
df_output_1.to_csv(output_path, index=False)
return (output_1, output_2)
@component(
base_image="python:3.9",
packages_to_install=["pandas"]
)
def task2(
# inputs
task1_output: Input[Dataset],
) -> str:
import pandas as pd
task1_input = pd.read_csv(task1_output.path).values[0][0]
return task1_input
@dsl.pipeline(
pipeline_root='pipeline_root',
name='pipelinename',
)
def pipeline(
do_task1: bool,
task1_name: str,
):
with dsl.Condition(do_task1 == True):
task1_op = (
task1(
task1_name=task1_name,
)
)
task2_op = (
task2(
task1_output=task1_op.outputs["output_path"],
)
)
if __name__ == '__main__':
do_task1 = True # <------------ The variable to modify ---------------
# compile pipeline
compiler.Compiler().compile(
pipeline_func=pipeline, package_path='pipeline.json')
# create pipeline run
pipeline_run = pipeline_jobs.PipelineJob(
display_name='pipeline-display-name',
pipeline_root='pipelineroot',
job_id='pipeline-job-id',
template_path='pipelinename.json',
parameter_values={
'do_task1': do_task1, # pipeline compilation fails with either True or False values
'task1_name': 'Task 1',
},
enable_caching=False
)
# execute pipeline run
pipeline_run.run()
任何帮助都非常感谢!
这里真正的问题是使用dsl. Conitory():
创建了一个子组,其中task1_op
是一个内部任务,只能从子组中“可见”。在最新的SDK中,它会抛出一条更明确的错误消息,指出task2
不能依赖于任何内部任务。因此,要解决这个问题,您只需要将task2
移动到条件中——如果不满足条件,您无论如何都没有有效的输入来输入task2。
with dsl.Condition(do_task1 == True):
task1_op = (
task1(
task1_name=task1_name,
)
)
task2_op = (
task2(
task1_output=task1_op.outputs["output_path"],
)
)