提问者:小点点

如何从PubSub主题中读取和解析数据并将其打印到光束管道中


我有一个程序,它在pubSub中创建一个主题,并向主题发布消息。我还有一个自动数据流作业(使用模板),它将这些消息保存到我的BigQuery表中。现在我打算用python管道替换基于模板的作业,我的要求是从PubSub读取数据,应用转换并将数据保存到BigQuery/发布到另一个PubSub主题。我开始用python编写脚本,并做了很多尝试和错误来实现它,但令我沮丧的是,我无法实现它。代码如下所示:

import apache_beam as beam
from apache_beam.io import WriteToText
TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"

def run():
    o = beam.options.pipeline_options.PipelineOptions()
    p = beam.Pipeline(options=o)

    print("I reached here")
    # # Read from PubSub into a PCollection.
    data = (
        p
        | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
    )
    data | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
    print("Lines: ", data)
run()

如果我能尽早得到一些帮助,我将不胜感激。注意:我在谷歌云上设置了我的项目,并且我的脚本在本地运行。


共2个答案

匿名用户

这里是工作代码。

import apache_beam as beam

TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"


class PrintValue(beam.DoFn):
    def process(self, element):
        print(element)
        return [element]

def run():

    o = beam.options.pipeline_options.PipelineOptions()
    # Replace this by --stream execution param
    standard_options = o.view_as(beam.options.pipeline_options.StandardOptions)
    standard_options.streaming = True
    p = beam.Pipeline(options=o)

    print("I reached here")
    # # Read from PubSub into a PCollection.
    data = p | beam.io.ReadFromPubSub(topic=TOPIC_PATH) | beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
    # Don't forget to run the pipeline!
    result = p.run()
    result.wait_until_finish()

run()

综上所述

  • 你错过了运行管道。事实上,Beam是一个Graph编程模型。所以,在你之前的代码中,你构建了你的图,但你从未运行过它。在这里,在最后,运行它(不是阻塞调用)并等待结束(阻塞调用)
  • 当您启动管道时,Beam提到PubSub仅在流式模式下工作。因此,您可以使用--流参数启动代码,或者像我的代码中所示的那样以编程方式执行代码

请注意,流式模式意味着在PubSub上进行不确定的监听。如果您在Dataflow上运行此功能,您的管道将始终处于运行状态,直到您停止它。如果您的消息很少,这可能会导致成本高昂。确保这是目标模型

另一种方法是在有限的时间内使用管道(您使用调度程序来启动它,另一个用于停止它)。但是,此时,您必须堆叠消息。这里您使用Topic作为管道的条目。此选项强制Beam创建临时订阅并侦听此订阅上的消息。这意味着在此订阅创建之前发布的消息将不会被接收和处理。

这个想法是创建一个订阅,通过这种方式,消息将被堆叠在其中(默认情况下最多7天)。然后,在管道的条目中使用订阅名称beam.io。ReadFromPubSub(订阅=SUB_PATH)。消息将被解堆叠并由Beam处理(不保证顺序!)

匿名用户

根据Beam编程指南,您只需在管道中添加转换步骤。这里是一个示例或转换:

class PrintValue(beam.DoFn):
  def process(self, element):
    print(element)
    return [element]

将其添加到您的管道中

 data |  beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH)

您可以添加所需的转换次数。您可以测试该值并将标记为PCollection(用于具有多个输出)中的元素设置为扇出,或在PCollection中为风扇使用侧输入。