我有一个Pub/Sub主题订阅,想要在数据流中使用和聚合订阅中的无界数据。我使用固定窗口并将聚合写入BigQuery。
读写(没有窗口和聚合)工作正常。但是当我将数据通过管道传输到固定窗口(计算每个窗口中的元素)时,窗口永远不会触发。因此聚合不会写入。
这是我的单词发布者(它使用示例中的kinglear. txt作为输入文件):
public static class AddCurrentTimestampFn extends DoFn<String, String> {
@ProcessElement public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element(), new Instant(System.currentTimeMillis()));
}
}
public static class ExtractWordsFn extends DoFn<String, String> {
@ProcessElement public void processElement(ProcessContext c) {
String[] words = c.element().split("[^a-zA-Z']+");
for (String word:words){ if(!word.isEmpty()){ c.output(word); }}
}
}
// main:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
p.apply("ReadLines", TextIO.Read.from(o.getInputFile()))
.apply("Lines2Words", ParDo.of(new ExtractWordsFn()))
.apply("AddTimestampFn", ParDo.of(new AddCurrentTimestampFn()))
.apply("WriteTopic", PubsubIO.Write.topic(o.getTopic()));
p.run();
这是我的窗口单词计数器:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
BigQueryIO.Write.Bound tablePipe = BigQueryIO.Write.to(o.getTable(o))
.withSchema(o.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
Window.Bound<String> w = Window
.<String>into(FixedWindows.of(Duration.standardSeconds(1)));
p.apply("ReadTopic", PubsubIO.Read.subscription(o.getSubscription()))
.apply("FixedWindow", w)
.apply("CountWords", Count.<String>perElement())
.apply("CreateRows", ParDo.of(new WordCountToRowFn()))
.apply("WriteRows", tablePipe);
p.run();
上面的订阅者将不起作用,因为窗口似乎没有使用默认触发器触发。但是,如果我手动定义触发器,则代码有效,计数将写入BigQuery。
Window.Bound<String> w = Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();
如果可能,我喜欢避免指定自定义触发器。
问题:
你怎么确定扳机永远不会开火?
您的PubSubIO. Write
和PubSubIO.Read
转换都应该使用with TimestampLabel
指定时间戳标签,否则您添加的时间戳将不会写入PubSub,并且将使用发布时间。
无论哪种方式,管道的输入水印都将从PubSub中等待的元素的时间戳中导出。一旦所有输入都被处理完,它将在前进到实时之前停留几分钟(以防发布器有延迟)。
您可能会看到所有元素都发布在同一个~1秒窗口中(因为输入文件非常小)。这些都被相对较快地读取和处理,但是它们被放入的1秒窗口直到输入水印前进后才会触发,这表明该1秒窗口中的所有数据都已被消费。
这在几分钟之前不会发生,这可能会使触发器看起来不起作用。您编写的触发器在流转时长1秒后触发,这会更早触发,但不能保证所有数据都已处理。
从默认触发器获得更好行为的步骤:
with TimestampLabel
。