提问者:小点点

KafkaStreams不向目标主题生成消息


我一直在尝试使用KafkaStreams来计算传感器读取的温度的移动平均值。我有Producer,它从mqtt代理获取消息并将它们推送到kafka:

String topic = "TEMPERATURE";
Producer<String, Double> producer = new KafkaProducer<>(properties);

            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("Connection to broker lost!" + throwable.getMessage());

                }

                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    try{
                        String time = new Timestamp(System.currentTimeMillis()).toString();
                        String content = new String(mqttMessage.getPayload());

                        System.out.println("\nReceived a Message!" +
                                "\n\tTime:    " + time +
                                "\n\tTopic:   " + topic +
                                "\n\tMessage: " + content +
                                "\n\tQoS:     " + mqttMessage.getQos() + "\n");
                        double temp = Double.valueOf(content.substring(2));


                        producer.send(new ProducerRecord<>(topic, time, temp), (recordMetadata, e) -> {
                            if (e != null) {
                                e.printStackTrace();
                            } else {
                                System.out.printf("Produced record to topic %s partition [%d] @ offset %d%n",
                                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                            }
                        });
                        producer.flush();
                    } catch (Exception e){
                        System.err.println(e);
                    }

                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

                }
            });

            mqttClient.subscribe(topic, 0);

Kafka收到这些消息,我可以使用bash进行检查。接下来,我想有一个窗口化的Kafka Stream来计算移动平均值:

public class PersistenceService {


    public static void main(String[] args) {
        Logger logger = Logger.getLogger("PERSISTENCE SERVICE");
        String topic = "TEMPERATURE";
        String targetTopic = "MOVINGAVG";
        String kafkaURI = "localhost:9092";
        String clientID = "Persistence Service";
        Properties properties = new Properties();
        kafkaPropertiesSet(kafkaURI, properties, clientID);
        MongoClient mongoClient = new MongoClient("localhost", 27017);

        MongoDatabase database = mongoClient.getDatabase("Temperature");
        logger.info("Connected to the database");
        MongoCollection kafkaCollection = database.getCollection("TemperatureReadingsKafka");
        MongoCollection brokerCollection = database.getCollection("TemperatureReadingsMQTT");
        DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        Document document = new Document();
        document.put(dateFormat.format(new Date()), 18.34);
        kafkaCollection.insertOne(document);



        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream<String, Double> kafkaStreams =  streamsBuilder.stream(topic, Consumed.with(Serdes.String(), Serdes.Double()));
        Duration timeDifference = Duration.ofSeconds(30);
        Topology topology = streamsBuilder.build();
        KTable table = kafkaStreams.groupByKey()
                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(timeDifference))
                .aggregate(
                        () -> new Tuple2(0.0, 0.0, 0.0), // initializer
                        (key, value, aggregate) -> tempAggregator(key, String.valueOf(value), aggregate))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .mapValues(new ValueMapper<Tuple2, Object>() {
                    @Override
                    public Double apply(Tuple2 tuple2) {
                        return (Double) tuple2.avg;
                    }
                });
        table.toStream().to(targetTopic);
        KafkaStreams streams = new KafkaStreams(topology, properties);
        streams.start();
    }



    private static void kafkaPropertiesSet(String kafkaURI, Properties properties, String clientID) {
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaURI);
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientID);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }

    public static Tuple2 tempAggregator(String key, String value, Tuple2<Double> aggregateTuple){
        aggregateTuple.count = aggregateTuple.count + 1;
        aggregateTuple.sum = aggregateTuple.sum + Double.valueOf(value);
        aggregateTuple.avg = aggregateTuple.sum/ aggregateTuple.count;
        return aggregateTuple;
    }
    static class Tuple2<Double> {
        public Double count;
        public Double sum;
        public Double avg;

        public Tuple2(Double count, Double sum, Double avg) {
            this.count = count;
            this.sum = sum;
            this.avg = avg;
        }
    }

但是,不会从流中向Kafka发送任何消息。来自控制台的日志:

[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1-consumer, groupId=Persistence Service] Adding newly assigned partitions: TEMPERATURE-0
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] State transition from STARTING to PARTITIONS_ASSIGNED
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1-consumer, groupId=Persistence Service] Setting offset for partition TEMPERATURE-0 to the committed offset FetchPosition{offset=621, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}}
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamTask - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] task [0_0] Initialized
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamTask - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] task [0_0] Restored and ready to run
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] Restoration took 80 ms for all tasks [0_0]
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6] State transition from REBALANCING to RUNNING
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1-consumer, groupId=Persistence Service] Requesting the log end offset for TEMPERATURE-0 in order to compute lag
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] Processed 346 total records, ran 0 punctuators, and committed 2 total tasks since the last update

我做错了什么?非常感谢任何建议

编辑按照@OneCricketeer的建议,删除拓扑并使用KafkaStreams流=new KafkaStreams(streamBuilder.build(), properties);推进程序。我需要实现序列化和反序列化类,以及Serdes。所以现在更改KafkaStreams代码如下所示:

 StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream<String, Double> kafkaStreams = streamsBuilder.stream(topic, Consumed.with(Serdes.String(), Serdes.Double()));
        Duration timeDifference = Duration.ofSeconds(5);

        KTable table = kafkaStreams.groupByKey(Grouped.with(Serdes.String(),Serdes.Double()))
                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(timeDifference))
                .aggregate(
                        () -> generateTuple(logger), // initializer
                        (key, value, aggregate) -> tempAggregator(key, String.valueOf(value), aggregate, logger))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .mapValues((ValueMapper<AggregationClass, Object>) tuple2 -> (Double) tuple2.getAverage());
        table.toStream().to(targetTopic);
        KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);
        streams.cleanUp();
        streams.start();

    }

    private static AggregationClass generateTuple(Logger logger) {
        logger.info("Tuple init");
        return new AggregationClass(0.0, 0.0);
    }
private static void kafkaPropertiesSet(String kafkaURI, Properties properties, String clientID) {
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaURI);
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientID);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AggregationSerde.class.getName());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }

    public static AggregationClass tempAggregator(String key, String value, AggregationClass aggregateTuple, Logger logger) {
        aggregateTuple.count = aggregateTuple.count + 1;
        logger.warning("COUNT" + aggregateTuple.count);
        aggregateTuple.sum = aggregateTuple.sum + Double.valueOf(value);
        logger.warning("SUM: " + aggregateTuple.sum);
        return aggregateTuple;
    }

AggregationClass是这样实现的:

public class AggregationClass<Double> {
    public double count;
    public double sum;


    public AggregationClass(double count, double sum) {
        this.count = count;
        this.sum = sum;

    }

    public double getAverage() {
        return this.sum/this.count;
    }
}

现在发生的事情是从主题中读取消息,但最终我得到:

生成主题MOVINGAVG的数据时出现ClassCastException。序列化程序(key: org.apache.kafka.stream.kstream.TimeWindowedSerializer/value:Aggregate Serializer)与实际的键或值类型(key type:org.apache.kafka.stream.kstream.Windows/value type:java.lang.Double)不兼容。更改StreamConfig中的默认Serdes或通过方法参数提供正确的Serdes(例如,如果使用DSL,#到(String topic,Produced


共1个答案

匿名用户

既然你有这个

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AggregationSerde.class.getName());

然后,所有需要生成到的中间主题的数据,例如在groupByKeyto()之后,默认情况下都需要是AggregationClass实例。

但是在mapValue之后,您有Double,因此您需要为该类型定义一个新的序列化程序

.to(targetTopic, Produced.valueSerde(Serdes.Double()));