提问者:小点点

带有Apache Beam的GroupByKey()


我试图流消息从kafka消费者到30秒窗口使用apache光束。用于beam_nuggets.io从kafka主题阅读。

你可以在下面看到我的代码:

with beam.Pipeline(options=PipelineOptions()) as p:
    consumer_message = (p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(consumer_config=consumer_config)
                        | 'window' >> beam.WindowInto(window.FixedWindows(30))
                        | 'groupBy' >> beam.GroupByKey()
                        | beam.Map(print))

GroupByKey仍然不产生任何输出。

我的consume_message:

(None, '{"userId": null, "visitorId": "1cb8b48d-6495-44fc-9ba5-ba28d71933a7", "ip": "10.212.134.89", "userAgent": "Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1", "referer": "https://test.xxx.com/", "clientName": "xxx.com", "clientTypeId": "0", "sequenceAtSession": "1", "sessionId": "8f098d91-9049-49d0-ae52-63dffda76936", "url": null, "dimension": null, "event": {"category": null, "action": "pageview", "label": null, }, "startDate": "2021-10-18T07:05:46.9244107+00:00", "endDate": "", "pageType": "homePage", "countryCode": "ZZ", "isp": "Private network", "usageType": "reserved", "organization": "Rfc 1918"}')

GroupByKey()可以做到这一点,因为我所有消息的密钥都是“无”,如果我错了,请帮忙。谢谢


共1个答案

匿名用户

看起来触发器没有被触发。由于您隐式地使用了默认触发器,它应该在窗口结束时触发,加上允许的延迟。

这可能是水印未推进的结果。是否在窗口结束后尝试发送新事件?