我有一个简单的数据流管道(作业id 2018-05-15_06_17_40-8591349846083543299),有1个最小工人和7个最大工人,可以执行以下操作:
>
Flatten.pCollections
每小时窗口,触发如下:
Repeatedly
.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(40000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5))
)
)
.orFinally(AfterWatermark.pastEndOfWindow())
使用14个碎片的AvroIO窗口写入将这些事件写入GCS。
当管道最初启动时,一切都很好,但几个小时后,AvroIO: GroupIntoShards步骤中的系统延迟急剧增加。
经过进一步调查,其中一个主题落后了许多小时(与其他3个主题相比,该主题每秒发生的事件最多)。查看日志,我看到关闭S12-0000000000a
的空闲读取器,这是可以理解的。然而,主题的36个分区的使用者组偏移量处于这样的状态:对于某些分区,偏移量非常低,但有些分区非常高。日志结束偏移量大致均匀分布,我们所生成的记录大小大致相同。
问题:
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5)
触发器会在窗口中首次看到事件的5分钟后有效地开始向每个(窗口、碎片)的GCS写入吗使用相同的代码/配置更新流水线使其回到正常状态,其中消耗的速率比产生的速率高得多(由于重启之前的滞后)。
回答提出的3个问题(我留下了关于具体工作的评论):