我试图读取一个JSON文件通过apache光束在python和应用一些数据质量规则。目前我使用beam.io. ReadFromText读取每个json行,并使用一些函数来修改数据。什么是更好的方法来读取JSON数据并修改它们?
(p
| 'Getdata' >> beam.io.ReadFromText(input)
| 'filter_name' >> beam.FlatMap(lambda line: dq_name(line))
| 'filter_phone' >> beam.FlatMap(lambda line: dq_phone(line))
| 'filter_zip' >> beam.FlatMap(lambda line: dq_zip(line))
| 'filter_address' >> beam.FlatMap(lambda line: dq_city(line))
| 'filter_website' >> beam.FlatMap(lambda line: dq_website(line))
| 'write' >> beam.io.WriteToText(output_prefix) )
注意:我对此相当陌生,如果我目前的方法看起来太粗制滥造,我很抱歉。
您从错误的方向接近Apache Beam(数据流)。
您正在尝试读取一行,然后一次将转换应用于该行。
相反,您需要将Beam视为并行处理器。您将读取所有行ReadFromText()
,然后将转换并行应用于每一行。
查看函数beam. ParDo()
。这将允许您创建一个可以处理JSON文件的每一行的类。然后,您的代码将具有主要步骤,例如ReadFromText()
,ParDo(MyJsonProcess())
,WriteToText()
。
请记住,您的JSON需要JSON换行符。http://ndjson.org/
我认为你的管道没问题。它会并行运行,没有任何问题。FYI,如果你使用FlatMap
只是为了过滤元素,你也可以使用Filter
。