如果我有一个主题模式(即Kstream):
{
"type": "record",
"name": "Value",
"namespace": "test1",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
}
],
"connect.name": "test1.Value"
}
其他主题的架构
{
"type": "record",
"name": "Envelope",
"namespace": "test2",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
],
"connect.name": "test2.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
}
],
"connect.name": "test2.Envelope"
}
我想在这两个主题KStream和Ktable之间实现连接。
如何使用test1主题id和test2主题id(在后obj中)来实现,如何从对象中提取id(使用信封模式在obj之后)以实现连接。
KStream
KTable
KStream
join. keySerde(Serdes.String()) /* key/.withValueSerde(Serdes.Long())/左值*/);
非常低级的详细实现在这里https://developer.confluent.io/learn-kafka/kafka-streams/joins/
另请参阅https://mydeveloperplanet.com/2019/10/30/kafka-streams-joins-explored/中的会话2.7