提问者:小点点

了解Kafka消费者投票行为


我有一个从Kafka主题轮询记录的消费者,我正在执行以下操作:

  1. 将消费者分配到kafka主题中的特定分区。
  2. 寻找过去的特定偏移量(所以肯定有记录要轮询)。
  3. 我正在执行以下代码:
while (true) 
{
    ConsumerRecords<GenericRecord, GenericRecord> items= 
    consumer.poll(Duration.ofMillis(300));
    log.info("Polled {} items", items.count());
} 

我得到以下日志:

Polled 0 items                                                                                                                      
Polled 0 items                                                                                                                      
Polled 0 items
Polled 0 items            
.
.
.                                                                                                          
Polled 0 items
Polled 3620 items

我只是想了解轮询的行为,以及为什么它多次尝试得到0条消息,然后在稍后的时间点得到记录?(记得我寻找过去的偏移量)。

这是我的消费者配置:

{
  schema.registry.url=https://schema-registry.*********,
  enable.auto.commit=false,
  max.poll.records=65536,
  group.id=**************,
  fetch.max.wait.ms=5000,
  bootstrap.servers=********
  fetch.min.bytes=1048576,
  fetch.max.bytes=1048576,
  auto.offset.reset=earliest
}

共1个答案

匿名用户

您应该尝试打印实际消耗的偏移量,以验证查找是否正常。

你不应该需要在任何地方寻找,除非你每次运行都重复使用group.id(并在循环中的某个地方提交偏移量),因为你设置了auto. offset.set=最早的