提问者:小点点

Spring云流,kafkabinder-按需查找


我使用Spring云流与kafka。我有一个主题X,分区Y和消费者组Z。Spring Boot启动器父级2.7.2,Springkafka版本2.8.8

@StreamListener("input-channel-name")
public void processMessage(final DomainObject domainObject) {
    // some processing
}

它工作正常。

我希望在应用程序中有一个endpoint,它允许我重新读取/重新处理(seek对吗?)X. Y中的所有消息(再次)。但不是在重新平衡(消费者SeekAware#onPartionsAssign)或应用程序重新启动(Kafka消费者属性#resetOffset)之后,而是按需这样做:

@RestController
@Slf4j
@RequiredArgsConstructor
public class SeekController {

    @GetMapping
    public void seekToBeginningForDomainObject() {

        /**
         * seekToBeginning for X, Y, input-channel-name
         */
    }
}

我就是无法做到这一点。这可能吗?。我知道我必须在消费者级别上做到这一点,可能是在@StreamListener("input-channel-name")订阅之后创建的,对吗?但是我不知道如何获得该消费者。我如何执行按需查找以使kafka再次向消费者发送消息?我只想将X. Y.Z的偏移量重置为0,以便再次创建应用程序、加载和处理所有消息。


共1个答案

匿名用户

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#rebalance-listener

KafkaBindingRebalanceListener. onParttionsAssign()提供一个布尔值来指示这是初始赋值还是重新平衡赋值。

Spring Cloud stream目前不支持运行时的任意搜索,尽管底层KafkaMessageDrivenChannelAdapter确实支持访问消费者SeekCallback(它允许在轮询之间进行任意搜索)。它需要对绑定器进行增强以允许访问此代码。

但是,可以在事件侦听器中使用空闲容器事件;该事件包含消费者,因此您可以在这些条件下执行任意查找。