我正在使用Apache Beam的PythonSDK,我无法从无界PCollection中按窗口和键执行聚合。数据来自Kafka主题,它被组织为带有键、值和时间戳的字典。我在beam_nuggets包中与Kafka消费者一起阅读它(因为我无法使默认的Kafka消费者工作),应用三分钟长的固定窗口,GroupByKey并计算平均值。我目前对处理后期数据不感兴趣(默认触发器应该可以很好地工作)。似乎所有数据在windows中都被正确划分,但从未调用GroupByKey之后的聚合函数。
这是我使用的代码:
import json
import apache_beam as beam
from apache_beam.transforms import window, trigger
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio
beam_options = PipelineOptions(
runner = "DirectRunner",
streaming = True,
)
class AddTimestampDoFn(beam.DoFn):
def process(self, element):
unix_timestamp = element["datetime"]/1000
yield beam.window.TimestampedValue(element, unix_timestamp)
def add_key(x):
print("add key", x["datetime"])
return (x["key"], x)
def process_group(x):
print("process_group")
return sum(x)/len(x)
with beam.Pipeline(options = beam_options) as pipeline:
data = (pipeline | kafkaio.KafkaConsume(consumer_config = {"bootstrap_servers": "localhost:9092",
"topic": "foo",
"group_id": "consumer_group",
"auto_offset_reset": "latest"},
value_decoder = bytes.decode
)
| "ToDict" >> beam.MapTuple(lambda k,v: json.loads(v))
| "Add timestamp" >> beam.ParDo(AddTimestampDoFn())
| "Add key" >> beam.Map(add_key)
| "Window" >> beam.WindowInto(window.FixedWindows(60*3))
)
grouped = (data | f"Group" >> beam.GroupByKey()
| f"ProcessGroup" >> beam.Map(process_group)
)
第一部分似乎工作正常,因为Kafka消费者收到的每条消息都会打印“添加键”调试日志。窗口似乎设置正确,每个数据点都分配给一个窗口。然而,“process_group”日志永远不会打印,就好像管道永远不会到达那个点一样。
我知道StackOverflow上有几个类似的问题(比如这个、这个或这个),但似乎没有一个解决方案有效。
我还尝试了不同的触发功能(如AfterWatermark),但似乎仍然不起作用。
Apache Beam版本是2.41.0
当我阅读带有静态数量记录的测试Kafka主题时,Flink和CoGroupByKey也遇到了类似的问题。一旦我开始每隔几秒钟产生一次新消息,问题就消失了,CoGroupByKey开始按预期运行。
在这里找到解决方案:
https://github.com/apache/beam/issues/22809#issuecomment-1310971785