提问者:小点点

如何写入Apache Beam中的多个文件?


让我简化我的案例。我使用的是Apache Beam 0.6.0。我最终处理的结果是PCollection

例如,假设结果包括

(key1, value1)
(key2, value2)
(key1, value3)
(key1, value4)

然后我想把vale1vale3vale4写到key1. txt,把vale4写到key2.txt

就我而言:

  • 密钥集是在流水线运行时确定的,而不是在构建流水线时确定的。
  • 键集可能很小,但每个键对应的值的数量可能非常非常大。

有什么想法吗?


共3个答案

匿名用户

前几天,我轻而易举地写了一个这个案例的样本。

此示例是数据流1. x样式

基本上你可以按每个键分组,然后你可以通过连接到云存储的自定义转换来做到这一点。需要注意的是,每个文件的行列表不应该很大(它必须适合单个实例的内存,但是考虑到你可以运行高内存实例,这个限制相当高)。

    ...
    PCollection<KV<String, List<String>>> readyToWrite = groupedByFirstLetter
                .apply(Combine.perKey(AccumulatorOfWords.getCombineFn()));
        readyToWrite.apply(
                new PTransformWriteToGCS("dataflow-experiment", TonyWordGrouper::derivePath));
    ...

然后完成大部分工作的转换是:

public class PTransformWriteToGCS
    extends PTransform<PCollection<KV<String, List<String>>>, PCollection<Void>> {

    private static final Logger LOG = Logging.getLogger(PTransformWriteToGCS.class);

    private static final Storage STORAGE = StorageOptions.getDefaultInstance().getService();

    private final String bucketName;

    private final SerializableFunction<String, String> pathCreator;

    public PTransformWriteToGCS(final String bucketName,
        final SerializableFunction<String, String> pathCreator) {
        this.bucketName = bucketName;
        this.pathCreator = pathCreator;
    }

    @Override
    public PCollection<Void> apply(final PCollection<KV<String, List<String>>> input) {

        return input
            .apply(ParDo.of(new DoFn<KV<String, List<String>>, Void>() {

                @Override
                public void processElement(
                    final DoFn<KV<String, List<String>>, Void>.ProcessContext arg0)
                    throws Exception {
                    final String key = arg0.element().getKey();
                    final List<String> values = arg0.element().getValue();
                    final String toWrite = values.stream().collect(Collectors.joining("\n"));
                    final String path = pathCreator.apply(key);
                    BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path)
                        .setContentType(MimeTypes.TEXT)
                        .build();
                    LOG.info("blob writing to: {}", blobInfo);
                    Blob result = STORAGE.create(blobInfo,
                        toWrite.getBytes(StandardCharsets.UTF_8));
                }
            }));
    }
}

匿名用户

只需在ParDo函数中编写一个循环!更多细节-我今天也有同样的场景,唯一的事情是在我的情况下key=image_label和value=image_tf_record。所以就像你所问的,我正在尝试创建单独的TFRecords文件,每个类一个,每个记录文件包含许多图像。然而,不确定当每个键的值非常高时是否会出现内存问题,就像你的场景一样:(我的代码也在Python)

class WriteToSeparateTFRecordFiles(beam.DoFn):

def __init__(self, outdir):
    self.outdir = outdir

def process(self, element):
    l, image_list = element
    writer = tf.python_io.TFRecordWriter(self.outdir + "/tfr" + str(l) + '.tfrecord')
    for example in image_list:
        writer.write(example.SerializeToString())
    writer.close()

然后在您的管道中,在您获得键值对的阶段之后添加这两行:

   (p
    | 'GroupByLabelId' >> beam.GroupByKey()
    | 'SaveToMultipleFiles' >> beam.ParDo(WriteToSeparateTFRecordFiles(opt, p))
    )

匿名用户

您可以为此使用FileIO. write eDinic()

PCollection<KV<String,String>> readfile= (something you read..);

readfile.apply(FileIO. <String,KV<String,String >> writeDynamic()
    .by(KV::getKey)
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to("somefolder")
    .withNaming(key -> FileIO.Write.defaultNaming(key, ".txt")));

p.run();