提问者:小点点

Spring Cloud Stream中Kafka标头上的条件(基于内容)路由


我正在Spring Boot 2. x应用程序中使用Spring Cloud Stream 3.x来消费来自Kafka主题的消息。

我想有一个侦听器,它有条件地在一些自定义标头值上使用消息,如文档所示:

@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
  LOGGER.info("Received: {}", message);
}

但是侦听器永远不会收到通知,如果条件被删除,我在日志中会看到以下内容:

Received: ... SomeHeader: [B@1055e4af ...

事实证明,自定义标头保留为Kafka字节数组原始格式,使它们没有资格进行条件评估。

是否需要一些额外的配置,或者我遗漏了什么?


共1个答案

匿名用户

经过对源和stackoveflow的一些挖掘,我发现了以下内容:

  • Spring Cloud Stream委托给Spring Kafka消息和标头转换(KafkaMessageChannelBinder~getHeaderMapper)
  • 默认标头转换实现(BinderHeaderMapper)将标头保留为原始格式
  • Spring Cloud Stream允许自定义标头映射,特别是标头从字节数组到字符串的转换(如何在我的Spring Cloud Stream项目中将传入标头映射为String而不是byte[]?)

所以我添加了我的自定义标头映射器bean(bean名称很重要,它允许省略额外的配置属性),它将我的自定义标头映射到String:

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper();
    headerMapper.setRawMappedHeaders(Map.of(
        "SomeHeader", true
    ));
    return headerMapper;
}

这解决了问题:

Received: ... SomeHeader: SomeHeaderValue ...

附言:这似乎是Spring Cloud Stream的bug:

  1. 它引入了自己的头映射器(BinderHeaderMapper)实现,但后者不尊重条件路由特性。
  2. 标头映射器在KafkaMessageChannelBinder中被子类化,这种添加的行为是不明显的,如果提供自定义标头映射器,它将丢失。