提问者:小点点

KSQL查询在简单聚合中返回意外值


我从针对KTable的KSQL查询中得到了意想不到的结果,KTable本身是由Kafka主题定义的。KTABLE是“交易”,它由压缩主题“localhost. dbo.TradeStory”支持。它应该包含由TradeId键控的股票交易的最新信息。主题的键是TradeId。每笔交易都有一个Account tId,我正在尝试构建一个查询来获取按账户分组的交易金额的SUM。

ksql> create table Trades(TradeId int, AccountId int, Spn int, Amount double) with (KAFKA_TOPIC = 'localhost.dbo.TradeHistory', VALUE_FORMAT = 'JSON', KEY = 'TradeId');

...

ksql> describe extended Trades;

Name                 : TRADES
Type                 : TABLE
Key field            : TRADEID
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : localhost.dbo.TradeHistory (partitions: 1, replication: 1)

Field     | Type
---------------------------------------
ROWTIME   | BIGINT           (system)
ROWKEY    | VARCHAR(STRING)  (system)
TRADEID   | INTEGER
ACCOUNTID | INTEGER
SPN       | INTEGER
AMOUNT    | DOUBLE
---------------------------------------

Local runtime statistics
------------------------
consumer-messages-per-sec:         0 consumer-total-bytes:      3709 consumer-total-messages:        39     last-message: 2019-10-12T20:52:16.552Z

(Statistics of the local KSQL server interaction with the Kafka topic localhost.dbo.TradeHistory)
/usr/bin/kafka-topics --zookeeper zookeeper:2181 --describe --topic localhost.dbo.TradeHistory
Topic:localhost.dbo.TradeHistory    PartitionCount:1    ReplicationFactor:1 Configs:min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,cleanup.policy=compact,segment.ms=100
    Topic: localhost.dbo.TradeHistory   Partition: 0    Leader: 1   Replicas: 1 Isr: 1

在我的测试中,我使用TradeId 2向localhost. dbo.TradeStory主题添加了消息,这只是更改了交易金额。只有金额被更新;帐户ID保持为1。

/usr/bin/kafka-console-consumer --bootstrap-server broker:9092 --property print.key=true --topic localhost.dbo.TradeHistory --from-beginning

... (earlier values redacted) ...

2   {"TradeHistoryId":47,"TradeId":2,"AccountId":1,"Spn":1,"Amount":106.0,"__table":"TradeHistory"}
2   {"TradeHistoryId":48,"TradeId":2,"AccountId":1,"Spn":1,"Amount":107.0,"__table":"TradeHistory"}

上面主题的转储显示交易额2(在账户1中)从106.0变为107.0。

ksql> select AccountId, count(*) as Count, sum(Amount) as Total from Trades group by AccountId;
1 | 1 | 106.0
1 | 0 | 0.0
1 | 1 | 107.0

问题是,为什么每次我发布交易更新时,上面显示的KSQL查询都会返回一个“中间”值。如您所见,Count和Amount字段显示0,0,然后KSQL查询会立即将其“更正”为1,107.0。我对这种行为有点困惑。

有人能解释一下吗?

非常感谢。


共1个答案

匿名用户

谢谢你的问题。我在我们的知识库中添加了一个答案:https://github.com/confluentinc/ksql/pull/3594/files.

当KSQL看到表中现有行的更新时,它会在内部发出一个CDC事件,其中包含旧值和新值。聚合通过在应用新值之前首先撤消旧值来处理此问题。

因此,在上面的示例中,当第二次插入发生时,KSQL首先撤消旧值。这导致COUNT下降1,SUM下降到旧值106.0,即下降到零。然后KSQL应用新的行值,它看到COUNT上升1,SUM上升到新值107.0

默认情况下,KSQL配置为在将结果刷新到Kafka之前缓冲最多2秒或10MB的数据。这就是为什么在本例中插入值时,您可能会看到输出有轻微延迟。如果两个输出行一起缓冲,那么KSQL将抑制第一个结果。这就是为什么您经常看不到中间行被输出的原因。配置commit.interval.mscache. max.bytes.buffer ering,分别设置为2秒和10MB,可用于调整此行为。将这些设置中的任何一个设置设置为零将导致KSQL始终输出所有中间结果。

如果您每次都看到这些中间结果输出,那么很可能您已将这些设置中的一个或两个设置设置为零。

我们有一个Github问题来增强KSQL以利用Kafka Stream的抑制功能,这将允许用户更好地控制结果的实现方式。