提问者:小点点

阅读主题并使用reactor Kafka将消息批量写入RESTendpoint


我正在做一个使用反应式Kafka的项目,该项目使用来自Kafka主题的消息,并将消息分批发布到RESTendpoint。我一直在处理批处理部分,并将该批处理发送到endpoint。我需要从主题中读取N条消息(这里的N是可配置的),然后将这N条消息发送到RESTendpoint。如何使用reactor Kafka读取N条消息?我看过中的例子https://projectreactor.io/docs/kafka/release/reference/#_overview但找不到与我的问题类似的例子。任何关于解决这个问题的建议都会非常有帮助。

以下是到目前为止我要阅读的代码,用于从主题中消费消息

@Slf4j
@Service
public class Service implements CommandLineRunner {
    @Autowired
    @Qualifier("KafkaConsumerTemplate")
    public ReactiveKafkaConsumerTemplate<String, String> KafkaConsumerTemplate;


    public Flux<String> consume() {
        return KafkaConsumerTemplate.receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .map(ConsumerRecord::value)
                .doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), Metric))
                .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));

    }


    @Override
    public void run(String... args) throws Exception {
        consume().subscribe();
    }
}

共1个答案

匿名用户

因此,您可以为此目的使用#缓冲区(int)操作。

对于您的具体情况:

int bufferSize = 10;
return KafkaConsumerTemplate.receiveAutoAck()
          .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
              consumerRecord.key(),
              consumerRecord.value(),
              consumerRecord.topic(),
              consumerRecord.offset())
           ).map(ConsumerRecord::value)
            .doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), Metric))
            .buffer(bufferSize)
            .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
@Override
public void run(String... args) throws Exception {
   consume().subscribe(it -> {
      //it is a List of batched entities, here you can do whatever you want with your data.
   });
}