我有一个从Kafka主题轮询记录的消费者,我正在执行以下操作:
while (true)
{
ConsumerRecords<GenericRecord, GenericRecord> items=
consumer.poll(Duration.ofMillis(300));
log.info("Polled {} items", items.count());
}
我得到以下日志:
Polled 0 items
Polled 0 items
Polled 0 items
Polled 0 items
.
.
.
Polled 0 items
Polled 3620 items
我只是想了解轮询的行为,以及为什么它多次尝试得到0条消息,然后在稍后的时间点得到记录?(记得我寻找过去的偏移量)。
这是我的消费者配置:
{
schema.registry.url=https://schema-registry.*********,
enable.auto.commit=false,
max.poll.records=65536,
group.id=**************,
fetch.max.wait.ms=5000,
bootstrap.servers=********
fetch.min.bytes=1048576,
fetch.max.bytes=1048576,
auto.offset.reset=earliest
}
您应该尝试打印实际消耗的偏移量,以验证查找是否正常。
你不应该需要在任何地方寻找,除非你每次运行都重复使用group.id
(并在循环中的某个地方提交偏移量),因为你设置了auto. offset.set=最早的
。