我使用Spring云流与kafka。我有一个主题X
,分区Y
和消费者组Z
。Spring Boot启动器父级2.7.2
,Springkafka版本2.8.8
:
@StreamListener("input-channel-name")
public void processMessage(final DomainObject domainObject) {
// some processing
}
它工作正常。
我希望在应用程序中有一个endpoint,它允许我重新读取/重新处理(seek
对吗?)X. Y
中的所有消息(再次)。但不是在重新平衡(消费者SeekAware#onPartionsAssign
)或应用程序重新启动(Kafka消费者属性#resetOffset
)之后,而是按需这样做:
@RestController
@Slf4j
@RequiredArgsConstructor
public class SeekController {
@GetMapping
public void seekToBeginningForDomainObject() {
/**
* seekToBeginning for X, Y, input-channel-name
*/
}
}
我就是无法做到这一点。这可能吗?。我知道我必须在消费者级别上做到这一点,可能是在@StreamListener("input-channel-name")
订阅之后创建的,对吗?但是我不知道如何获得该消费者。我如何执行按需查找以使kafka再次向消费者发送消息?我只想将X. Y.Z
的偏移量重置为0
,以便再次创建应用程序、加载和处理所有消息。
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#rebalance-listener
KafkaBindingRebalanceListener. onParttionsAssign()
提供一个布尔值来指示这是初始赋值还是重新平衡赋值。
Spring Cloud stream目前不支持运行时的任意搜索,尽管底层KafkaMessageDrivenChannelAdapter
确实支持访问消费者SeekCallback
(它允许在轮询之间进行任意搜索)。它需要对绑定器进行增强以允许访问此代码。
但是,可以在事件侦听器中使用空闲容器事件;该事件包含消费者,因此您可以在这些条件下执行任意查找。