我正在尝试在Kafka(0.11)的聚合函数中使用SessionWindows,但无法理解为什么我会出错。
这是我的代码片段:
// defining some values:
public static final Integer SESSION_TIMEOUT_MS = 6000000;
public static final String INTOPIC = "input";
public static final String HOST = "host";
// setting up serdes:
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
// some more code to build up the streams
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> dataStream = builder.stream(Serdes.String(), jsonSerde, INTOPIC);
// constructing the initalMessage ObjectNode:
ObjectNode initialMessage = JsonNodeFactory.instance.objectNode();
initialMessage.put("count", 0);
initialMessage.put("endTime", "");
// transforming data to KGroupedStream<String,JsonNode>
KGroupedStream<String, JsonNode> data = dataStream.map((key, value) ->{return new KeyValue<>(value.get(HOST).asText(), value); }).groupByKey(Serdes.String(), jsonSerde);
// finally aggregate the data usind SessionWindows
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
() -> initialMessage,
(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),
SessionWindows.with(SESSION_TIMEOUT_MS),
jsonSerde,
"aggregated-data");
private static JsonNode countData(JsonNode incomingMessage, JsonNode initialMessage){
// some dataprocessing
}
当我改变
KTable<Windowed<String>,JsonNode>
到
KTable<String, JsonNode>
并删除
SessionWindows.with(SESSION_TIMEOUT_MS)
从聚合函数来看,一切都可以。
如果我不这么做Eclise会告诉我
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate( [...])
KGroupeStream类型的方法聚合(初始化器、聚合器、Windows、Serde、字符串)不适用于参数(()-
和线路
() -> initialMessage
类型不匹配:无法从ObjectNode转换为VR
和:
(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),
DataWind的类型中的方法count tData(JsonNode, JsonNode)不适用于参数(JsonNode,VR)
我真的不明白,类型在哪里丢失!任何提示都很好!
Thx: D
我真的需要实现合并:
Merger<? super String, JsonNode>tmpMerger = new MergerClass<String, JsonNode>();
并将其添加到聚合函数中:
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
() -> initialMessage,
(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),
tmpMerger,
SessionWindows.with(SESSION_TIMEOUT_MS),
jsonSerde,
"aggregated-data");