提问者:小点点

在apache光束python中读取CSV的有效方法


在阅读了StackOverflow上的一些问题后,我一直在使用下面的代码来读取CSV文件。

管道代码:

 with beam.Pipeline(options=pipeline_options) as p:

    parsed_csv = (p | 'Create from CSV' >> beam.Create([input_file]))
    flattened_file = (parsed_csv | 'Flatten the CSV' >> beam.FlatMap(get_csv_reader))

读取csv的方法:get_csv_reader()

def get_csv_reader(readable_file):

    # Open a channel to read the file from GCS
    gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Read file as a CSV
    gcs_reader = csv.reader(io.TextIOWrapper(gcs_file))

    next(gcs_reader)

    return gcs_reader

我使用它而不是ReadFromText,因为当字段值中有换行符时它会失败。

问题:现在,我的问题是这种读取CSV的方式是否有效?在大文件的情况下会失败吗?我问是因为我在我的方法中使用了csv. read。我觉得这会将文件加载到内存中,导致大文件失败。如果我错了,请纠正我的理解。

此外,由于这是一个PTransfer,我的方法会被序列化以在不同的工作节点上运行吗?我对光束如何在幕后运行这段代码感到困惑。

如果这不是有效的,请建议在阿帕奇光束上读取CSV的有效方法。


共1个答案

匿名用户

您可以定义一个生成器来逐行延迟读取文件。

def read_csv_file(readable_file):
  with beam.io.filesystems.FileSystems.open(readable_file) as gcs_file:
    for row in csv.reader(gcs_file):
      yield row

一个类似的问题是如何处理换行符加载CSV到Apache Beam?