提问者:小点点

将多部分空csv文件从Apache Beam写入netApp存储网格时出现异常


问题陈述

我们正在将多个csv文件消耗到p集合中-

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
    at ECSOperations.main(ECSOperations.java:53)
Caused by: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
    at org.apache.beam.sdk.io.FileBasedSink$Writer.close(FileBasedSink.java:1076)
    at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.createMissingEmptyShards(FileBasedSink.java:759)
    at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalizeDestination(FileBasedSink.java:639)
    at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:1040)
Caused by: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Your proposed upload is smaller than the minimum allowed object size. (Service: Amazon S3; Status Code: 400; Error Code: EntityTooSmall; Request ID: 1643869619144605; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null

以下是示例代码

ECSOptions options = PipelineOptionsFactory.fromArgs(args).as(ECSOptions.class);
setupConfiguration(options);
Pipeline p = Pipeline.create(options);
PCollection<String> pSource= p.apply(TextIO.read().from("src/main/resources/empty.csv"));    
pSource.apply(TextIO.write().to("s3://bucket-name/empty.csv").withoutSharding());    
p.run();

观察

  • 如果我们编写简单文件而不是多部分文件(简单的将对象放入存储网格),它可以正常工作
  • 似乎存储网格存在已知问题,但我们想检查是否可以从光束管道处理此问题。

我所尝试过的

  • 尝试看看我是否可以在编写之前检查PCollection的大小并将一些字符串放入输出文件中,但由于PCollection为空,它根本不会进入PTransform。
  • 也尝试过Count. global,但该事件没有帮助

问吧

  • 无论如何,我们是否可以在Beam中处理这个问题,就像我们可以在写入之前检查PCollection的大小一样?如果大小为零,即空pCollection,那么我们可以避免写入文件以避免此问题。
  • 有没有人面临过类似的问题,能够解决?

共2个答案

匿名用户

我想到了另外两个选择:

  1. TextIO. write().withFoter(…)始终在文件末尾写一个空行(或空格或其他)以确保它不为空。
  2. 如果给定的PCollection为空,您可以使用具有单个空行的PCollection来展平您的PCollection。(这更复杂,但可以更普遍地使用。)具体来说
PCollection<String> pcollToWrite = ...

// This will count the number of elements in toWriteSize at runtime.
PCollectionView<Long> toWriteSize = pcollToWrite.apply(Count.globally().asSingletonView());

PCollection<String> emptyOrSingletonPCollection = 
    p
      // Creates a PCollection with a single element.
      .apply(Create.of(Collections.singletonList(""))
      // Applies a ParDo that will emit this single element if and only if
      // toWriteSize is zero.
      .apply(ParDo.of(new DoFn<String, String>() {
          @ProcessElement
          public void processElement(@Element String mainElement, OutputReceiver<String> out, ProcessContext c) {
            if (c.sideInput(toWriteSize) == 0) {
              out.output("");
            }
          }
      }).withSideInputs(toWriteSize));

// We now flatten pcollToWrite and emptyOrSingletonPCollection together.
// The right hand side has exactly one element whenever the left hand side
// is empty, so there will always be at least one element.
PCollectionList.of(pcollToWrite, emptyOrSingletonPCollection)
    .apply(Flatten.pCollections())
    .apply(TextIO.write(...))

匿名用户

在管道构建过程中,您无法检查PCollection是否为空,因为它还没有计算出来。如果此文件系统不能支持空文件,您可以尝试写入另一个文件系统,然后在文件不为空时复制(假设有问题的文件不太大)。