不完全是你要求的,但是我们的解决方法是将并行任务的结果写入S3,然后在后处理任务中简单地收集它们。
with dsl.ParallelFor(preprocessing_task.output) as plant_item:
predict_plant='{}'.format(plant_item)
forecasting_task = forecasting_op(predict_plant, ....).after(preprocessing_task)
postprocessing_task = postprocessing_op(...).after(forecasting_task)
目前可能不支持:
支持具有多个参数的输入#1933