提问者:小点点

如何通过python读取apache光束(数据流)中的JSON文件?


我试图读取一个JSON文件通过apache光束在python和应用一些数据质量规则。目前我使用beam.io. ReadFromText读取每个json行,并使用一些函数来修改数据。什么是更好的方法来读取JSON数据并修改它们?

(p
  | 'Getdata' >> beam.io.ReadFromText(input)
  | 'filter_name' >> beam.FlatMap(lambda line: dq_name(line))
  | 'filter_phone' >> beam.FlatMap(lambda line: dq_phone(line))
  | 'filter_zip' >> beam.FlatMap(lambda line: dq_zip(line))
  | 'filter_address' >> beam.FlatMap(lambda line: dq_city(line))
  | 'filter_website' >> beam.FlatMap(lambda line: dq_website(line))
  | 'write' >> beam.io.WriteToText(output_prefix)  )

注意:我对此相当陌生,如果我目前的方法看起来太粗制滥造,我很抱歉。


共2个答案

匿名用户

您从错误的方向接近Apache Beam(数据流)。

您正在尝试读取一行,然后一次将转换应用于该行。

相反,您需要将Beam视为并行处理器。您将读取所有行ReadFromText(),然后将转换并行应用于每一行。

查看函数beam. ParDo()。这将允许您创建一个可以处理JSON文件的每一行的类。然后,您的代码将具有主要步骤,例如ReadFromText()ParDo(MyJsonProcess())WriteToText()

请记住,您的JSON需要JSON换行符。http://ndjson.org/

匿名用户

我认为你的管道没问题。它会并行运行,没有任何问题。FYI,如果你使用FlatMap只是为了过滤元素,你也可以使用Filter