提问者:小点点

如何实现KStream-Ktable leftJoin,如何使用Envelope对象获取和设置字段并实现KStream-Ktable的join?


如果我有一个主题模式(即Kstream):

{
    "type": "record",
    "name": "Value",
    "namespace": "test1",
    "fields": [
          {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
          },
          {
            "name": "createdAt",
            "type": [
                "null",
                {
                    "type": "string",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.ZonedTimestamp"
                }
            ],
            "default": null
          }
    ],
    "connect.name": "test1.Value"
}

其他主题的架构

{
  "type": "record",
  "name": "Envelope",
  "namespace": "test2",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
            },
            {
              "name": "createdAt",
              "type": [
                "null",
                {
                  "type": "string",
                  "connect.version": 1,
                  "connect.name": "io.debezium.time.ZonedTimestamp"
                }
              ],
              "default": null
            },
           
          ],
          "connect.name": "test2.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    }
   
  ],
  "connect.name": "test2.Envelope"
}

我想在这两个主题KStream和Ktable之间实现连接。

如何使用test1主题id和test2主题id(在后obj中)来实现,如何从对象中提取id(使用信封模式在obj之后)以实现连接。


共1个答案

匿名用户

  • 它与表一起执行流的LEFT JOIN,有效地进行表查找。
  • 两边的输入数据必须共同分区。
  • 当且仅当流被标记为重新分区时,它会导致流的数据重新分区

KStream

KTable

KStream

join. keySerde(Serdes.String()) /* key/.withValueSerde(Serdes.Long())/左值*/);

  • 连接是基于键的,即使用连接谓词leftRecord. key==rightRecord.key。
  • 每当收到新输入时,都会在下面列出的条件下触发连接。触发时,将调用用户提供的ValueJoiner来生成连接输出记录。
  • 只有左侧(流)的输入记录触发连接。右侧(表)的输入记录仅更新内部右侧连接状态。
  • 带有空值或空值的流的输入记录将被忽略,并且不会触发连接。
  • 空值表的输入记录被解释为对应键的墓碑,表示该键从表中删除。墓碑不会触发连接。
  • 对于左侧的每条输入记录,如果右侧没有任何匹配项,则将使用ValueJoiner#application(leftRecord. value,null)调用ValueJoiner。

非常低级的详细实现在这里https://developer.confluent.io/learn-kafka/kafka-streams/joins/

另请参阅https://mydeveloperplanet.com/2019/10/30/kafka-streams-joins-explored/中的会话2.7