提问者:小点点

如何使用JMeter将avro消息发布到kafka主题


我正在尝试使用JMeter将Avro消息发布到Kafka主题。

我收到以下错误消息:

由:javax. script.ScriptException:org.apache.kafka.公共.error.SerializationException引起:检索Avro模式“字符串”时出错

我使用JSR223采样器使用了以下代码。

KAFKA_BROKERSKAFKA_TOPIC

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

String brokers = vars.get("KAFKA_BROKERS");
String topic = vars.get("KAFKA_TOPIC");
String user = String.valueOf(ctx.getThreadNum() + 1);
Object msg = vars.get("MESSAGE");

Properties kafkaProps = new Properties();

kafkaProps.put("bootstrap.servers", brokers);
kafkaProps.put("schema.registry.url","https://\<\>");
kafkaProps.put("auto.register.schemas","false");
kafkaProps.put("basic.auth.credentials.source","USER_INFO");
kafkaProps.put("basic.auth.user.info","\<\>");
kafkaProps.put("security.protocol","SASL_SSL");
kafkaProps.put("sasl.mechanism","PLAIN");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer");
kafkaProps.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='\<\>' password='\<\>';");

Producer\<String, Object\> producer = new KafkaProducer\<\>(kafkaProps);
try
{
producer.send(new ProducerRecord\<String, Object\>(topic, user, msg)).get();
}
finally
{
producer.close();
}

获取以下错误消息:

由:javax. script.ScriptException:org.apache.kafka.公共.error.SerializationException引起:检索Avro模式“字符串”时出错


共1个答案

匿名用户

您需要以与您尝试模仿的上游系统中完全相同的方式配置kafkaProps对象。

作为一种解决方法,您可以启用自动模式注册,例如:

kafkaProps.put("auto.register.schemas","true");

更多信息:如何使用JMeter进行Kafka测试