我正在尝试将消息从kafka消费者流式传输到谷歌云存储,使用apache光束有30秒的窗口。用于beam_nuggets.io从kafka主题中读取。然而,我无法为每个窗口编写独特的镶木地板文件GCS。你可以在下面看到我的代码:'
import apache_beam as beam
from apache_beam.transforms.trigger import AfterAny, AfterCount, AfterProcessingTime, AfterWatermark, Repeatedly
from apache_beam.portability.api.beam_runner_api_pb2 import AccumulationMode
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio
import json
from datetime import datetime
import pandas as pd
import config as conf
import apache_beam.transforms.window as window
consumer_config = {"topic": "Uswrite",
"bootstrap_servers": "*.*.*.*:9092",
"group_id": "notification_consumer_group_33"}
folder_name = datetime.now().strftime('%Y-%m-%d')
def format_result(consume_message):
data = json.loads(consume_message[1])
file_name = datetime.now().strftime("%Y_%m_%d-%I_%M_%S")
df = pd.DataFrame(data).T #, orient='index'
df.to_parquet(f'gs://{conf.gcs}/{folder_name}/{file_name}.parquet',
storage_options={"token": "gcp.json"}, engine='fastparquet')
print(consume_message)
with beam.Pipeline(options=PipelineOptions()) as p:
consumer_message = (p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(consumer_config=consumer_config)
| 'Windowing' >> beam.WindowInto(window.FixedWindows(30),
trigger=AfterProcessingTime(30),
allowed_lateness=900,
accumulation_mode=AccumulationMode.ACCUMULATING)
| 'CombineGlobally' >> beam.Map(format_result))
# window.FixedWindows(30),trigger=beam.transforms.trigger.AfterProcessingTime(30),
# accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING
# allowed_lateness=100,CombineGlobally(format_result).without_defaults() allowed_lateness=30,
使用上面的代码,为每条消息生成一个新的拼花文件。我想做的是按30秒的窗口对消息进行分组,并为每个窗口生成一个拼花文件。我尝试了下面的不同配置,但没有成功:Beam. CombineGlobally(format_result).without_defaults())而不是beam.Map(format_result))Beam.ParDo(format_result))此外,我还有几个问题:
正如您在上面的代码块中看到的,我也尝试使用这些参数,但没有帮助。
我到处搜索,但找不到一个解释此用例的示例。
以下是您应该对管道进行的一些更改以获得此结果:
触发器
。触发器仅在每个窗口获得多个结果时才需要。GroupByKey
或组合
操作来聚合元素。没有这样的操作,窗口没有任何效果。parquetio
来确保您获得可扩展的精确一次行为。(参见2.33.0版本的pydoc)
我看了一下python留档中的GroupByKey示例
import apache_beam as beam
from beam_nuggets.io import kafkaio
new_list = []
def convert_to_list(consume_message):
new_list.append(consume_message)
return new_list
with beam.Pipeline() as pipeline:
dofn_params = (
pipeline
| "Reading messages from Kafka" >> kafkaio.KafkaConsume(consumer_config=consumer_config)
| 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
| 'consume message added list' >> beam.ParDo(convert_to_list)
| 'GroupBykey' >> beam.GroupByKey()
| 'print' >> beam.Map(print))
import apache_beam as beam
from beam_nuggets.io import kafkaio
with beam.Pipeline() as pipeline:
dofn_params = (
pipeline
| 'Created Pipeline' >> beam.Create([(None, '{"userId": "921","xx":"123"]),(None, '{"userId": "92111","yy":"123"]))
| 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
| 'GroupBykey' >> beam.GroupByKey()
| 'print' >> beam.Map(print))
我假设第一种方法中的问题与生成外部列表而不是pCollection有关,但我不确定。你能指导我如何继续吗?
我尝试的另一件事是使用apache_beam.io.kafka模块中的ReadFromKafka函数。但是这次我得到了以下错误:
ERROR:apache_beam.utils.subprocess_server:Starting job service with ['java', '-jar', 'user_directory’/.apache_beam/cache/jars\\beam-sdks-java-io-expansion-service-2.33.0.jar', '59627']
Java版本11.0.12已安装在我的计算机上,并且可以使用java命令。