提问者:小点点

KafkaIO-与groupId一起使用时,enable.auto的不同行为。


我们有一个Apache Beam管道,它从给定的kafka主题读取消息并进行进一步处理。我的管道使用FlinkRunner,我描述了我们尝试过的三个不同案例:

情况1:未指定组ID:

Beam为每次运行创建一个新的消费者,因此从最新的主题偏移量中读取。它读取消费者启动后产生的消息。在这种情况下,在管道停止和重新启动之间的时间间隔内可能会有潜在的数据丢失

情况2:指定了组ID并将enable.auto提交设置为true Beam从管道停止时开始重新处理消息,并开始读取未提交给给定groupid的kafka的消息。

新的group id再次开始监听最新主题偏移的消息,并开始提交消息

.withConsumerConfigUpdates(ImmutableMap.of("enable.auto.commit", true))
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "testGroupId"))

情况3:使用137 OffsetsInFinalize()指定的组ID

理想情况下,我希望这里的行为与案例2相同,但我看到的行为类似于案例1,其中管道的停止和重新启动之间存在潜在的数据丢失。

.withConsumerConfigUpdates(ImmutableMap.of("group.id", "testGroupId"))
.commitOffsetsInFinalize()

从KafkaIO的留档中,我确实看到,当检查点最终确定时,偏移量被提交回kafka:https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1098

我们想了解:

  1. 为什么案例2在停止和重新启动管道时的行为不像案例3?
  2. 在哪些情况下,我们应该将设置为true,而不是将enable.auto设置为true?

共1个答案

匿名用户

这是bug,https://github.com/apache/beam/issues/22631报道,https://github.com/apache/beam/pull/22633修复