提问者:小点点

kafka测试容器和Spring引导


所以我设置了一个联调为我的Spring启动应用程序,kafka测试容器使用

    KAFKA_CONTAINER = (KafkaContainer)(new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("5.4.3"))).withReuse(true);

生成消息时测试工作正常,但是消费者似乎根本不消费。我认为这归因于模式注册表是一个模拟

spring.kafka.properties.schema.registry.url=mock://localhost

所以我决定使用以下内容来使用消息,并使用模拟注册表。使用KafkaUtil。然而,它似乎不起作用

  public static KafkaConsumer<String, CarDTO> createEventConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
    props.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://testUrl");
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkatest");
    return new KafkaConsumer<>(props);
  }

和轮询代码如下:

消费者已经订阅了正确的主题,好像我把avro改成了

while (i>0) {
            ConsumerRecords<String, CarDTO> records = storeReplenOrderDTOKafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, StoreReplenOrderDTO> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                i = 0;

            }

我得到的错误如下:

Error deserializing key/value for partition test-topic-0 at offset 0. If needed, please seek past the record to continue consumption.
...
...
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject Not Found; error code: 40401
    at app//io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:202)

有趣的是,我将以下内容更改为,然后我可以获得密钥但不能获得对象

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);

我不确定我需要设置什么才能让它与模拟注册表一起工作。


共1个答案

匿名用户

所以我设法让这个工作,而不是使用mock://kafka注册表,而是使用wiremck来存根响应。|对于avro,一旦我这样做了,我的消费者就能够消费/