在这个问题中,我们知道
PCollection<String> lines = p.apply(TextIO.read()
.from("gs://some-bucket/many/files/*")
.withHintMatchesManyFiles());
使用此提示会导致转换以优化的方式执行以读取大量文件:在这种情况下可以读取的文件数量实际上是无限的,并且很可能管道将比没有此提示运行得更快、更便宜、更可靠。
但是流水线的步骤卡住了代码如下
PCollection<String> lines = pipeline.apply("readDataFromGCS",
TextIO.read().from(sourcePath + "/prefix*")
.withHintMatchesManyFiles()
.watchForNewFiles(Duration.standardMinutes(1), Watch.Growth.never()));
每分钟大约有10~30MB的新文件上传到GCS。
但是,我们尝试在pub/sub中从GCS读取文件,管道可以很好地工作。
raw_event = p.apply("Read Sub Message", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("Extract File Name", ParDo.of(new ExtractFileNameFn()))
.apply("Read file matchall", FileIO.matchAll())
.apply("Read file match", FileIO.readMatches())
.apply("Read file", TextIO.readFiles());
我在这里遗漏了什么?或者有没有其他方法可以更有效地从GCS读取大量文件?
我的管道的工作流程是从GCS读取数据,并在数据处理后沉入Pub/Sub。
光束版本:2.16.0
当您尝试通过数据流使用TextIO.read()读取压缩/压缩文件时,压缩文件只能由单个worker和所述worker的单个线程解压。这导致您的管道等待该单个worker解压所有数据,因此,系统会输出一条警告消息,指出您的管道卡住了,但实际上,管道并未卡住,只是试图解压您的数据。此时,流式数据时没有并行解压。