提问者:小点点

对于google cloud数据流,是否可以从管道启动另一个管道。


我试图设置一个谷歌云数据流管道(流模式),读取pubsub主题消息,从发布的消息中提取信息(谷歌云存储中的对象名称),然后启动另一个管道(批处理模式)来处理存储在谷歌云存储中的对象。

是否可以在管道内启动另一个管道???


共2个答案

匿名用户

除此之外,没有技术原因。您需要确保将Pipeline对象分开,有足够的计算引擎配额来启动您需要的所有作业。

匿名用户

我们让它工作了。这样做:

private static class ExecuteUpdateTaskFroNamespace extends DoFn<String, String> {
    @Override
    public void processElement(ProcessContext c) throws Exception {
        String namespace = c.element();
        LOG.info("Processing namespace: " + namespace);

        BasicOptions options = c.getPipelineOptions().cloneAs(BasicOptions.class);

        EntityOptions entityOptions = PipelineOptionsFactory.as(EntityOptions.class); // important to NOT use .create()
        entityOptions.setNamespace(namespace);
        entityOptions.setProject(options.getProject());
        entityOptions.setRunner(DataflowPipelineRunner.class);
        entityOptions.setStagingLocation(options.getStagingLocation());
        entityOptions.setKind("DocsAsset");
        try {
            Pipeline p = Pipeline.create(entityOptions);
            p.apply("Read from Datastore", BcDatastoreReadFactory.getEntitySource(entityOptions))
                    .apply("Find Old Site Entities", ParDo.of(new FindEntities()))
                    .apply("Transform Entities", ParDo.of(new TransformEntities()))
                    .apply("Save", DatastoreIO.v1().write().withProjectId(entityOptions.getProject()));
            p.run();

            LOG.info("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);
            c.output("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);

        } catch (Exception e) {
            LOG.warn("Unable to create pipeline for namespace: " + namespace, e);
        }

    }
}

问题:如果不达到配额,一次不能生成超过25个,要绕过此问题,您可以将setRunner(DataflowPipelineRunner. class)更改为setRunner(BlockingDataflowPipelineRunner.class)。但是BlockingDataflowPipelineRunner在2.0.0中被删除

EntityOptions和BasicOptions是PipelineOptions的扩展。

相关问题