提问者:小点点

如何使用Spring Reactive Kafka实现重试和恢复逻辑


我们正在使用https://github.com/reactor/reactor-kafka项目来实现Spring Reactive Kafka。但是我们想利用Kafka重试和响应式Kafka恢复逻辑。有人能提供一些示例代码吗?


共1个答案

匿名用户

由于您使用Spring生态系统进行重试和恢复,您可以使用spring-retry查看那里的留档spring-retry。网络上有足够的参考资料。

类下面的示例是使用来自主题和处理kafka消息。

消耗的方法被标记为可重试,因此如果有异常处理,它将重试,如果重试不成功,则将调用相应的恢复方法。

public class KafkaListener{
  
 
  @KafkaListener(topic="books-topic", id ="group-1")
  @Retryable(maxAttempts = 3, value = Exception.class))
  public void consuming(String message){
   //  To do message processing 
   //  Whenever there is exception thrown from this method
   //   - it will retry 3 times in total
   //   - Even after retry we get exception then it will be handed of to below 
   //     recover method recoverConsuming 
  
   }

   @Recover
   public void recoverConsuming(Exception exception, String message){
     // Recovery logic 
     // you can implement your recovery scenario
    }
  
 }