在阅读了StackOverflow上的一些问题后,我一直在使用下面的代码来读取CSV文件。
管道代码:
with beam.Pipeline(options=pipeline_options) as p:
parsed_csv = (p | 'Create from CSV' >> beam.Create([input_file]))
flattened_file = (parsed_csv | 'Flatten the CSV' >> beam.FlatMap(get_csv_reader))
读取csv的方法:get_csv_reader()
def get_csv_reader(readable_file):
# Open a channel to read the file from GCS
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
# Read file as a CSV
gcs_reader = csv.reader(io.TextIOWrapper(gcs_file))
next(gcs_reader)
return gcs_reader
我使用它而不是ReadFromText,因为当字段值中有换行符时它会失败。
问题:现在,我的问题是这种读取CSV的方式是否有效?在大文件的情况下会失败吗?我问是因为我在我的方法中使用了csv. read。我觉得这会将文件加载到内存中,导致大文件失败。如果我错了,请纠正我的理解。
此外,由于这是一个PTransfer,我的方法会被序列化以在不同的工作节点上运行吗?我对光束如何在幕后运行这段代码感到困惑。
如果这不是有效的,请建议在阿帕奇光束上读取CSV的有效方法。
您可以定义一个生成器来逐行延迟读取文件。
def read_csv_file(readable_file):
with beam.io.filesystems.FileSystems.open(readable_file) as gcs_file:
for row in csv.reader(gcs_file):
yield row
一个类似的问题是如何处理换行符加载CSV到Apache Beam?