提问者:小点点

顶点AI管道(Kubeflow)跳过步骤,输出依赖于后面的步骤


我正在尝试运行一个顶点AI管道作业,如果某个管道参数(在本例中为do_task1)的值为False,则跳过某个管道步骤。但是因为有另一个步骤无条件运行并期望第一个可能跳过的步骤的输出,所以我得到以下错误,do_task1是TrueFalse

AssertionError: component_input_artifact: pipelineparam--task1-output_path not found. All inputs: parameters {
  key: "do_task1"
  value {
    type: STRING
  }
}
parameters {
  key: "task1_name"
  value {
    type: STRING
  }
}

编译器似乎无法从task1中找到输出output_path。所以我想知道是否有任何方法可以为dsl.条件下的那些步骤的输出设置某种占位符,因此它们会被默认值填充,除非实际步骤运行并用非默认值填充它们。下面的代码代表了问题,并且很容易重现。

我正在使用google-blod-aiPlatform==1.14.0kfp==1.8.11

from typing import NamedTuple

from kfp import dsl
from kfp.v2.dsl import Dataset, Input, OutputPath, component
from kfp.v2 import compiler

from google.cloud.aiplatform import pipeline_jobs

@component(
    base_image="python:3.9",
    packages_to_install=["pandas"]
)
def task1(
    # inputs
    task1_name: str,
    # outputs
    output_path: OutputPath("Dataset"),
) -> NamedTuple("Outputs", [("output_1", str), ("output_2", int)]):

    import pandas as pd
    
    output_1 = task1_name + "-processed"
    output_2 = 2

    df_output_1 = pd.DataFrame({"output_1": [output_1]})
    df_output_1.to_csv(output_path, index=False)

    return (output_1, output_2)

@component(
    base_image="python:3.9",
    packages_to_install=["pandas"]
)
def task2(
    # inputs
    task1_output: Input[Dataset],
) -> str:

    import pandas as pd

    task1_input = pd.read_csv(task1_output.path).values[0][0]

    return task1_input

@dsl.pipeline(
    pipeline_root='pipeline_root',
    name='pipelinename',
)
def pipeline(
    do_task1: bool,
    task1_name: str,
):

    with dsl.Condition(do_task1 == True):

        task1_op = (
            task1(
                task1_name=task1_name,
            )
        )

    task2_op = (
        task2(
            task1_output=task1_op.outputs["output_path"],
        )
    )


if __name__ == '__main__':
    
    do_task1 = True # <------------ The variable to modify ---------------

    # compile pipeline
    compiler.Compiler().compile(
        pipeline_func=pipeline, package_path='pipeline.json')

    # create pipeline run
    pipeline_run = pipeline_jobs.PipelineJob(
        display_name='pipeline-display-name',
        pipeline_root='pipelineroot',
        job_id='pipeline-job-id',
        template_path='pipelinename.json',
        parameter_values={
            'do_task1': do_task1, # pipeline compilation fails with either True or False values
            'task1_name': 'Task 1',
        },
        enable_caching=False
    )
    
    # execute pipeline run
    pipeline_run.run()

任何帮助都非常感谢!


共1个答案

匿名用户

这里真正的问题是使用dsl. Conitory():创建了一个子组,其中task1_op是一个内部任务,只能从子组中“可见”。在最新的SDK中,它会抛出一条更明确的错误消息,指出task2不能依赖于任何内部任务。因此,要解决这个问题,您只需要将task2移动到条件中——如果不满足条件,您无论如何都没有有效的输入来输入task2。

    with dsl.Condition(do_task1 == True):

        task1_op = (
            task1(
                task1_name=task1_name,
            )
        )

        task2_op = (
            task2(
                task1_output=task1_op.outputs["output_path"],
            )
        )

相关问题