我一直在阅读一些关于使用Kafka和Kafka Streams(带有状态存储)作为事件存储实现的文章。
实施思路如下:
此工作流的问题是状态存储正在异步更新(步骤2),并且当正在处理新命令时,检索到的实体快照可能已过时(因为它没有使用以前命令的事件更新)。
我的理解是否正确?有没有一个简单的方法来处理kafka的这种情况?
我的理解是否正确?
据我所知,是的——这意味着对于许多事件源域模型来说,它是一个不令人满意的事件存储。
简而言之,在向主题添加事件时不支持“第一作者获胜”,这意味着Kafka不能帮助您确保主题满足其不变量。
已经有了解决这个问题的建议/门票,但是我还没有找到进展的证据。
是的,方法很简单。
为Kafka消息使用键。具有相同键的消息总是*去同一个分区。一个消费者可以从一个或多个部分读取,但是两个消费者不能同时读取两个分区。
工作消费者的最大数量始终是
Something like example:
Assumption.
There is a kafka topic abc with partitions p0,p1.
There is consumer C1 consuming from p0, and consumer C2 consuming from p1. Consumers are working asynchronicity
km(key,command) - kafka message.
#Procedure creating message
km(key1,add) -> p0
km(key2,add) -> p1
km(key1,edit) -> p0
km(key3,add) -> p1
km(key3,edit) -> p1
#consumer C1 will read messages km(key1,add), km(key1,edit) and order will be persist
#consumer c2 will read messages km(key2,add) , km(key3,add) ,km(key3,edit)
如果您将命令写入Kafka,然后在KStreams中具体化视图,则具体化视图将异步更新。这有助于您将写入与读取分开,以便读取路径可以扩展。
如果您希望在命令/事件上保持一致的读写语义学,您最好写入数据库。事件可以使用CDC连接器从数据库中提取到Kafka(写通),也可以写入数据库,然后在事务中写入Kafka(写留)。
另一种选择是对读取实现长轮询(因此,如果您编写trade1. version2,然后想再次读取它,读取将阻塞,直到trade1.version2可用)。这并不适合所有用例,但它可能很有用。
这里的例子:https://github.com/confluentinc/kafka-streams-examples/blob/4eb3aa4cc9481562749984760de159b68c922a8f/src/main/java/io/confluent/examples/streams/microservices/OrdersService.java#L165