提问者:小点点

一段时间后 KafkaIO 分区消耗不均匀


我有一个简单的数据流管道(作业id 2018-05-15_06_17_40-8591349846083543299),有1个最小工人和7个最大工人,可以执行以下操作:

>

  • 使用KafkaIO从4个Kafka主题中消费。每个主题以不同的方式表示,并且是一个单独的PCollection
  • 对每个PCollection执行转换以输出标准表示形式PCollection
  • 使用Flatten.pCollections
  • 合并4 PCollection
  • 每小时窗口,触发如下:

    Repeatedly
    .forever(
      AfterFirst.of(
        AfterPane.elementCountAtLeast(40000),
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5))
      )
    )
    .orFinally(AfterWatermark.pastEndOfWindow()) 
    

    使用14个碎片的AvroIO窗口写入将这些事件写入GCS。

    当管道最初启动时,一切都很好,但几个小时后,AvroIO: GroupIntoShards步骤中的系统延迟急剧增加。

    经过进一步调查,其中一个主题落后了许多小时(与其他3个主题相比,该主题每秒发生的事件最多)。查看日志,我看到关闭S12-0000000000a的空闲读取器,这是可以理解的。然而,主题的36个分区的使用者组偏移量处于这样的状态:对于某些分区,偏移量非常低,但有些分区非常高。日志结束偏移量大致均匀分布,我们所生成的记录大小大致相同。

    问题:

    • 如果系统滞后在某一步很高,这是否会阻止Kafka消费者消费
    • Kafka偏移量分布不均匀的可能原因是什么
    • 合并的PCollection具有不同的流量模式,有些低,有些高。添加AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5)触发器会在窗口中首次看到事件的5分钟后有效地开始向每个(窗口、碎片)的GCS写入吗

    使用相同的代码/配置更新流水线使其回到正常状态,其中消耗的速率比产生的速率高得多(由于重启之前的滞后)。


  • 共1个答案

    匿名用户

    回答提出的3个问题(我留下了关于具体工作的评论):

    • 不,系统延迟并不能阻止Kafka消费。<ul>
    • 一般来说,如果下游阶段有大量工作要处理,这可能会延迟上游工作的开始。但这并不是KafkaIO特有的