提问者:小点点

如何在apache光束流水线中记录传入消息


我正在编写一个简单的apache光束流管道,从pubsub主题获取输入并将其存储到bigquery中。几个小时以来,我以为我甚至无法阅读消息,因为我只是试图将输入记录到控制台:

events = p | 'Read PubSub' >> ReadFromPubSub(subscription=SUBSCRIPTION)
logging.info(events)

当我将其写入文本时,它可以正常工作!然而,我对记录器的调用从未发生过。

人们如何开发/调试这些流式管道?

我尝试添加以下行:事件|'日志'

使用print()也不会在控制台中产生任何结果。


共1个答案

匿名用户

这是因为event是一个PCollection,所以您需要对其应用PTransform

最简单的方法是将ParDo应用于事件

events | 'Log results' >> beam.ParDo(LogResults())

其定义为:

class LogResults(beam.DoFn):
  """Just log the results"""
  def process(self, element):
    logging.info("Pub/Sub event: %s", element)
    yield element

请注意,如果您想在下游应用进一步的步骤,例如在记录元素后写入接收器,我也会放弃元素。例如,请参阅此处的问题。