提问者:小点点

SpringkafkaVALUE_TYPE_METHOD


TL/DR-有人能提供一个使用JsonDeserializer的简单示例吗?VALUE_TYPE_METHOD

我有一个完全符合留档的情况:

从2.5版开始,您现在可以通过属性配置反序列化器来调用方法来确定目标类型。如果存在,这将覆盖上面讨论的任何其他技术。如果数据是由不使用Spring序列化器的应用程序发布的,并且您需要根据数据或其他标头反序列化为不同的类型,这将很有用。将这些属性设置为方法名称-一个完全限定的类名,后跟方法名称,用句点分隔…该方法必须声明为公共静态,具有三个签名之一(String topic、byte[]data、Headers header ers)、(byte[]data、Headers header ers)或(byte[]data)并返回Jackson JavaType。

我这样设置我的配置:

spring:
  kafka:
    consumer:
      bootstrap-servers:
        - https://blah.blah.blah:443
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "spring.json.trusted.packages": "com.my.base"
        "spring.json.value.type.method": "com.my.base.Application.deserializerDelegator"

我的反序列化委托器是这样的:

private static final JavaType FOO_MESSAGE_TYPE = TypeFactory.defaultInstance()
       .constructType(Foo.class);
private static final JavaType BAR_MESSAGE_TYPE = TypeFactory.defaultInstance()
       .constructType(Bar.class);

public static JavaType deserializerDelegator(byte[] data, Headers headers) {
    Header header = headers.lastHeader("eventSubType");
    return Arrays.equals(header.value(), "100".getBytes())
            ? FOO_MESSAGE_TYPE : BAR_MESSAGE_TYPE;
}

现在我设置了所有这些…我如何设置我的实际消费者????我是否创建了多个消费者….每种类型一个?我尝试了这个,但失败了:

@Slf4j
@Component
public class Consumer {

    @KafkaListener(topics = {"some-topic"}, groupId = "some-group")
    public void consume(Foo message) {
        // handle Foo message
    }

    @KafkaListener(topics = {"some-topic"}, groupId = "some-group")
    public void consume(Bar message) {
        // handle Bar message
    }
}

这导致了这个错误:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.Consumer.consume(com.example.consumer.models.Foo)]
MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [Foo] to [Bar] for GenericMessage [payload=Foo(...omitted...)]

共1个答案

匿名用户

如果具有不同类型的消息在同一个主题中,则必须使用类级别的@KafkaListener@KafkaHandler方法(以及可选的默认值)。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#class-level-kafkalistener

当您在类级别使用@KafkaListener时,您必须在方法级别指定@KafkaHandler。当传递消息时,转换后的消息有效负载类型用于确定调用哪个方法。以下示例显示了如何做到这一点:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

从2.1.3版本开始,您可以指定一个@KafkaHandler方法作为默认方法,如果其他方法没有匹配,则调用该方法。最多可以指定一个方法。使用@KafkaHandler方法时,有效负载必须已经转换为域对象(因此可以执行匹配)。

编辑

@SpringBootApplication
public class So73953255Application {

    public static void main(String[] args) {
        SpringApplication.run(So73953255Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so73953255").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so73953255", "{\"baz\":\"qux\"}");
            template.send("so73953255", "{\"baz\":\"fiz\"}");
        };
    }

    private static final JavaType fooType = TypeFactory.defaultInstance().constructType(Foo.class);

    private static final JavaType barType = TypeFactory.defaultInstance().constructType(Bar.class);

    public static JavaType typer(byte[] data, Headers headers) {
        return new String(data).contains("fiz") ? barType : fooType;
    }

    public static class Foo {

        private String baz;

        public String getBaz() {
            return this.baz;
        }

        public void setBaz(String baz) {
            this.baz = baz;
        }

        @Override
        public String toString() {
            return "Foo [baz=" + this.baz + "]";
        }

    }

    public static class Bar {

        private String baz;

        public String getBaz() {
            return this.baz;
        }

        public void setBaz(String baz) {
            this.baz = baz;
        }

        @Override
        public String toString() {
            return "Bar [baz=" + this.baz + "]";
        }

    }

}

@Component
@KafkaListener(id = "so73953255", topics = "so73953255")
class Listener {

    @KafkaHandler
    void fooListener(So73953255Application.Foo foo) {
        System.out.println("Foo Handler: " + foo);
    }

    @KafkaHandler
    void barListener(So73953255Application.Bar bar) {
        System.out.println("Bar Handler: " + bar);
    }

}
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So73953255Application.typer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.auto-offset-reset=earliest
Foo Handler: Foo [baz=qux]
Bar Handler: Bar [baz=fiz]