我一直在尝试使用KafkaStreams来计算传感器读取的温度的移动平均值。我有Producer,它从mqtt代理获取消息并将它们推送到kafka:
String topic = "TEMPERATURE";
Producer<String, Double> producer = new KafkaProducer<>(properties);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("Connection to broker lost!" + throwable.getMessage());
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
try{
String time = new Timestamp(System.currentTimeMillis()).toString();
String content = new String(mqttMessage.getPayload());
System.out.println("\nReceived a Message!" +
"\n\tTime: " + time +
"\n\tTopic: " + topic +
"\n\tMessage: " + content +
"\n\tQoS: " + mqttMessage.getQos() + "\n");
double temp = Double.valueOf(content.substring(2));
producer.send(new ProducerRecord<>(topic, time, temp), (recordMetadata, e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.printf("Produced record to topic %s partition [%d] @ offset %d%n",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}
});
producer.flush();
} catch (Exception e){
System.err.println(e);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
mqttClient.subscribe(topic, 0);
Kafka收到这些消息,我可以使用bash进行检查。接下来,我想有一个窗口化的Kafka Stream来计算移动平均值:
public class PersistenceService {
public static void main(String[] args) {
Logger logger = Logger.getLogger("PERSISTENCE SERVICE");
String topic = "TEMPERATURE";
String targetTopic = "MOVINGAVG";
String kafkaURI = "localhost:9092";
String clientID = "Persistence Service";
Properties properties = new Properties();
kafkaPropertiesSet(kafkaURI, properties, clientID);
MongoClient mongoClient = new MongoClient("localhost", 27017);
MongoDatabase database = mongoClient.getDatabase("Temperature");
logger.info("Connected to the database");
MongoCollection kafkaCollection = database.getCollection("TemperatureReadingsKafka");
MongoCollection brokerCollection = database.getCollection("TemperatureReadingsMQTT");
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Document document = new Document();
document.put(dateFormat.format(new Date()), 18.34);
kafkaCollection.insertOne(document);
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, Double> kafkaStreams = streamsBuilder.stream(topic, Consumed.with(Serdes.String(), Serdes.Double()));
Duration timeDifference = Duration.ofSeconds(30);
Topology topology = streamsBuilder.build();
KTable table = kafkaStreams.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(timeDifference))
.aggregate(
() -> new Tuple2(0.0, 0.0, 0.0), // initializer
(key, value, aggregate) -> tempAggregator(key, String.valueOf(value), aggregate))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.mapValues(new ValueMapper<Tuple2, Object>() {
@Override
public Double apply(Tuple2 tuple2) {
return (Double) tuple2.avg;
}
});
table.toStream().to(targetTopic);
KafkaStreams streams = new KafkaStreams(topology, properties);
streams.start();
}
private static void kafkaPropertiesSet(String kafkaURI, Properties properties, String clientID) {
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaURI);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientID);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
public static Tuple2 tempAggregator(String key, String value, Tuple2<Double> aggregateTuple){
aggregateTuple.count = aggregateTuple.count + 1;
aggregateTuple.sum = aggregateTuple.sum + Double.valueOf(value);
aggregateTuple.avg = aggregateTuple.sum/ aggregateTuple.count;
return aggregateTuple;
}
static class Tuple2<Double> {
public Double count;
public Double sum;
public Double avg;
public Tuple2(Double count, Double sum, Double avg) {
this.count = count;
this.sum = sum;
this.avg = avg;
}
}
但是,不会从流中向Kafka发送任何消息。来自控制台的日志:
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1-consumer, groupId=Persistence Service] Adding newly assigned partitions: TEMPERATURE-0
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] State transition from STARTING to PARTITIONS_ASSIGNED
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1-consumer, groupId=Persistence Service] Setting offset for partition TEMPERATURE-0 to the committed offset FetchPosition{offset=621, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}}
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamTask - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] task [0_0] Initialized
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamTask - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] task [0_0] Restored and ready to run
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] Restoration took 80 ms for all tasks [0_0]
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6] State transition from REBALANCING to RUNNING
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1-consumer, groupId=Persistence Service] Requesting the log end offset for TEMPERATURE-0 in order to compute lag
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] Processed 346 total records, ran 0 punctuators, and committed 2 total tasks since the last update
我做错了什么?非常感谢任何建议
编辑按照@OneCricketeer的建议,删除拓扑并使用KafkaStreams流=new KafkaStreams(streamBuilder.build(), properties);
推进程序。我需要实现序列化和反序列化类,以及Serdes。所以现在更改KafkaStreams代码如下所示:
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, Double> kafkaStreams = streamsBuilder.stream(topic, Consumed.with(Serdes.String(), Serdes.Double()));
Duration timeDifference = Duration.ofSeconds(5);
KTable table = kafkaStreams.groupByKey(Grouped.with(Serdes.String(),Serdes.Double()))
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(timeDifference))
.aggregate(
() -> generateTuple(logger), // initializer
(key, value, aggregate) -> tempAggregator(key, String.valueOf(value), aggregate, logger))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.mapValues((ValueMapper<AggregationClass, Object>) tuple2 -> (Double) tuple2.getAverage());
table.toStream().to(targetTopic);
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);
streams.cleanUp();
streams.start();
}
private static AggregationClass generateTuple(Logger logger) {
logger.info("Tuple init");
return new AggregationClass(0.0, 0.0);
}
private static void kafkaPropertiesSet(String kafkaURI, Properties properties, String clientID) {
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaURI);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientID);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AggregationSerde.class.getName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
public static AggregationClass tempAggregator(String key, String value, AggregationClass aggregateTuple, Logger logger) {
aggregateTuple.count = aggregateTuple.count + 1;
logger.warning("COUNT" + aggregateTuple.count);
aggregateTuple.sum = aggregateTuple.sum + Double.valueOf(value);
logger.warning("SUM: " + aggregateTuple.sum);
return aggregateTuple;
}
AggregationClass是这样实现的:
public class AggregationClass<Double> {
public double count;
public double sum;
public AggregationClass(double count, double sum) {
this.count = count;
this.sum = sum;
}
public double getAverage() {
return this.sum/this.count;
}
}
现在发生的事情是从主题中读取消息,但最终我得到:
生成主题MOVINGAVG的数据时出现ClassCastException。序列化程序(key: org.apache.kafka.stream.kstream.TimeWindowedSerializer/value:Aggregate Serializer)与实际的键或值类型(key type:org.apache.kafka.stream.kstream.Windows/value type:java.lang.Double)不兼容。更改StreamConfig中的默认Serdes或通过方法参数提供正确的Serdes(例如,如果使用DSL,#到(String topic,Produced
既然你有这个
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AggregationSerde.class.getName());
然后,所有需要生成到的中间主题的数据,例如在groupByKey
或to()
之后,默认情况下都需要是AggregationClass实例。
但是在mapValue
之后,您有Double,因此您需要为该类型定义一个新的序列化程序
.to(targetTopic, Produced.valueSerde(Serdes.Double()));