提问者:小点点

HintMatchesManyFiles真的能在读取大量文件时提高TextIO的性能吗?


在这个问题中,我们知道

 PCollection<String> lines = p.apply(TextIO.read()
   .from("gs://some-bucket/many/files/*")
   .withHintMatchesManyFiles());

使用此提示会导致转换以优化的方式执行以读取大量文件:在这种情况下可以读取的文件数量实际上是无限的,并且很可能管道将比没有此提示运行得更快、更便宜、更可靠。

但是流水线的步骤卡住了代码如下

   PCollection<String> lines = pipeline.apply("readDataFromGCS",
          TextIO.read().from(sourcePath + "/prefix*")
                       .withHintMatchesManyFiles()
                       .watchForNewFiles(Duration.standardMinutes(1), Watch.Growth.never()));

每分钟大约有10~30MB的新文件上传到GCS。

但是,我们尝试在pub/sub中从GCS读取文件,管道可以很好地工作。

   raw_event = p.apply("Read Sub Message", PubsubIO.readStrings().fromTopic(options.getTopic()))
           .apply("Extract File Name", ParDo.of(new ExtractFileNameFn()))
           .apply("Read file matchall", FileIO.matchAll())
           .apply("Read file match", FileIO.readMatches())
           .apply("Read file", TextIO.readFiles());

我在这里遗漏了什么?或者有没有其他方法可以更有效地从GCS读取大量文件?

我的管道的工作流程是从GCS读取数据,并在数据处理后沉入Pub/Sub。

光束版本:2.16.0


共1个答案

匿名用户

当您尝试通过数据流使用TextIO.read()读取压缩/压缩文件时,压缩文件只能由单个worker和所述worker的单个线程解压。这导致您的管道等待该单个worker解压所有数据,因此,系统会输出一条警告消息,指出您的管道卡住了,但实际上,管道并未卡住,只是试图解压您的数据。此时,流式数据时没有并行解压。