TL/DR-有人能提供一个使用JsonDeserializer的简单示例吗?VALUE_TYPE_METHOD
?
我有一个完全符合留档的情况:
从2.5版开始,您现在可以通过属性配置反序列化器来调用方法来确定目标类型。如果存在,这将覆盖上面讨论的任何其他技术。如果数据是由不使用Spring序列化器的应用程序发布的,并且您需要根据数据或其他标头反序列化为不同的类型,这将很有用。将这些属性设置为方法名称-一个完全限定的类名,后跟方法名称,用句点分隔…该方法必须声明为公共静态,具有三个签名之一(String topic、byte[]data、Headers header ers)、(byte[]data、Headers header ers)或(byte[]data)并返回Jackson JavaType。
我这样设置我的配置:
spring:
kafka:
consumer:
bootstrap-servers:
- https://blah.blah.blah:443
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"spring.json.trusted.packages": "com.my.base"
"spring.json.value.type.method": "com.my.base.Application.deserializerDelegator"
我的反序列化委托器
是这样的:
private static final JavaType FOO_MESSAGE_TYPE = TypeFactory.defaultInstance()
.constructType(Foo.class);
private static final JavaType BAR_MESSAGE_TYPE = TypeFactory.defaultInstance()
.constructType(Bar.class);
public static JavaType deserializerDelegator(byte[] data, Headers headers) {
Header header = headers.lastHeader("eventSubType");
return Arrays.equals(header.value(), "100".getBytes())
? FOO_MESSAGE_TYPE : BAR_MESSAGE_TYPE;
}
现在我设置了所有这些…我如何设置我的实际消费者????我是否创建了多个消费者….每种类型一个?我尝试了这个,但失败了:
@Slf4j
@Component
public class Consumer {
@KafkaListener(topics = {"some-topic"}, groupId = "some-group")
public void consume(Foo message) {
// handle Foo message
}
@KafkaListener(topics = {"some-topic"}, groupId = "some-group")
public void consume(Bar message) {
// handle Bar message
}
}
这导致了这个错误:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.Consumer.consume(com.example.consumer.models.Foo)]
MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [Foo] to [Bar] for GenericMessage [payload=Foo(...omitted...)]
如果具有不同类型的消息在同一个主题中,则必须使用类级别的@KafkaListener
和@KafkaHandler
方法(以及可选的默认值)。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#class-level-kafkalistener
当您在类级别使用@KafkaListener
时,您必须在方法级别指定@KafkaHandler
。当传递消息时,转换后的消息有效负载类型用于确定调用哪个方法。以下示例显示了如何做到这一点:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
从2.1.3版本开始,您可以指定一个@KafkaHandler
方法作为默认方法,如果其他方法没有匹配,则调用该方法。最多可以指定一个方法。使用@KafkaHandler
方法时,有效负载必须已经转换为域对象(因此可以执行匹配)。
编辑
@SpringBootApplication
public class So73953255Application {
public static void main(String[] args) {
SpringApplication.run(So73953255Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so73953255").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so73953255", "{\"baz\":\"qux\"}");
template.send("so73953255", "{\"baz\":\"fiz\"}");
};
}
private static final JavaType fooType = TypeFactory.defaultInstance().constructType(Foo.class);
private static final JavaType barType = TypeFactory.defaultInstance().constructType(Bar.class);
public static JavaType typer(byte[] data, Headers headers) {
return new String(data).contains("fiz") ? barType : fooType;
}
public static class Foo {
private String baz;
public String getBaz() {
return this.baz;
}
public void setBaz(String baz) {
this.baz = baz;
}
@Override
public String toString() {
return "Foo [baz=" + this.baz + "]";
}
}
public static class Bar {
private String baz;
public String getBaz() {
return this.baz;
}
public void setBaz(String baz) {
this.baz = baz;
}
@Override
public String toString() {
return "Bar [baz=" + this.baz + "]";
}
}
}
@Component
@KafkaListener(id = "so73953255", topics = "so73953255")
class Listener {
@KafkaHandler
void fooListener(So73953255Application.Foo foo) {
System.out.println("Foo Handler: " + foo);
}
@KafkaHandler
void barListener(So73953255Application.Bar bar) {
System.out.println("Bar Handler: " + bar);
}
}
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So73953255Application.typer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.auto-offset-reset=earliest
Foo Handler: Foo [baz=qux]
Bar Handler: Bar [baz=fiz]