我试图设置一个谷歌云数据流管道(流模式),读取pubsub主题消息,从发布的消息中提取信息(谷歌云存储中的对象名称),然后启动另一个管道(批处理模式)来处理存储在谷歌云存储中的对象。
是否可以在管道内启动另一个管道???
除此之外,没有技术原因。您需要确保将Pipeline对象分开,有足够的计算引擎配额来启动您需要的所有作业。
我们让它工作了。这样做:
private static class ExecuteUpdateTaskFroNamespace extends DoFn<String, String> {
@Override
public void processElement(ProcessContext c) throws Exception {
String namespace = c.element();
LOG.info("Processing namespace: " + namespace);
BasicOptions options = c.getPipelineOptions().cloneAs(BasicOptions.class);
EntityOptions entityOptions = PipelineOptionsFactory.as(EntityOptions.class); // important to NOT use .create()
entityOptions.setNamespace(namespace);
entityOptions.setProject(options.getProject());
entityOptions.setRunner(DataflowPipelineRunner.class);
entityOptions.setStagingLocation(options.getStagingLocation());
entityOptions.setKind("DocsAsset");
try {
Pipeline p = Pipeline.create(entityOptions);
p.apply("Read from Datastore", BcDatastoreReadFactory.getEntitySource(entityOptions))
.apply("Find Old Site Entities", ParDo.of(new FindEntities()))
.apply("Transform Entities", ParDo.of(new TransformEntities()))
.apply("Save", DatastoreIO.v1().write().withProjectId(entityOptions.getProject()));
p.run();
LOG.info("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);
c.output("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);
} catch (Exception e) {
LOG.warn("Unable to create pipeline for namespace: " + namespace, e);
}
}
}
问题:如果不达到配额,一次不能生成超过25个,要绕过此问题,您可以将setRunner(DataflowPipelineRunner. class)更改为setRunner(BlockingDataflowPipelineRunner.class)。但是BlockingDataflowPipelineRunner在2.0.0中被删除
EntityOptions和BasicOptions是PipelineOptions的扩展。