提问者:小点点

Apache Beam FILEIOWriteDynamic删除接收器时间戳


你好,伙计们,我正在使用Apache光束,我试图在Google Cloud Storage上CSV一些记录,我不得不使用FileIO WriteDynamic方法来编写包含在字符串中的名称的CSV。我的代码如下所示:

orders.get(validationErrorTupleTag)
            .apply("Convert Validation Error To KV", ParDo.of(new DoFn<ValidationError, KV<String, String>>() {
                @ProcessElement
                public void processElement(ProcessContext context) {
                    ValidationError validationError = context.element();
                    String errorRow = String.format("%s, %s, %s, %s, %s, %s",
                            validationError.getValidationType(),
                            validationError.getValidationRulesType(),
                            validationError.getErrorMessage(),
                            validationError.getErrorElement(),
                            validationError.getOrderNumber(),
                            validationError.getFileName());

                    context.output(KV.of(validationError.getFileName(), errorRow));
                }
            }))
            .apply("Window", Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
                    .triggering(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10),
                            AfterProcessingTime
                                    .pastFirstElementInPane()
                                    .plusDelayOf(Duration.standardSeconds(1)))))
                    .withAllowedLateness(Duration.standardSeconds(10))
                    .discardingFiredPanes())
            .apply("Write CSV to GCS", FileIO.<String, KV<String, String>>writeDynamic()
                    .by(KV::getKey)
                    .withDestinationCoder(StringUtf8Coder.of())
                    .via(Contextful.fn(KV::getValue), TextIO.sink())
                    .to(path)
                    .withNaming(key -> FileIO.Write.defaultNaming("error-" + key, ".csv"))
            );

文件写入正确,但名称如下:

error-helloWorld-2022-03-23T23:14:31.000Z-2022-03-23T23:14:32.000Z-0-00000-of-00001.csv

有没有办法使用FileIO在没有分片时间戳的情况下写入文件?

提前感谢


共1个答案

匿名用户

您可以提供自己的FileIO. Writ.FileNaming实例,它可以对时间戳、分片标识符等执行任何您想要的操作。