你好,伙计们,我正在使用Apache光束,我试图在Google Cloud Storage上CSV一些记录,我不得不使用FileIO WriteDynamic方法来编写包含在字符串中的名称的CSV。我的代码如下所示:
orders.get(validationErrorTupleTag)
.apply("Convert Validation Error To KV", ParDo.of(new DoFn<ValidationError, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext context) {
ValidationError validationError = context.element();
String errorRow = String.format("%s, %s, %s, %s, %s, %s",
validationError.getValidationType(),
validationError.getValidationRulesType(),
validationError.getErrorMessage(),
validationError.getErrorElement(),
validationError.getOrderNumber(),
validationError.getFileName());
context.output(KV.of(validationError.getFileName(), errorRow));
}
}))
.apply("Window", Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10),
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))))
.withAllowedLateness(Duration.standardSeconds(10))
.discardingFiredPanes())
.apply("Write CSV to GCS", FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(path)
.withNaming(key -> FileIO.Write.defaultNaming("error-" + key, ".csv"))
);
文件写入正确,但名称如下:
error-helloWorld-2022-03-23T23:14:31.000Z-2022-03-23T23:14:32.000Z-0-00000-of-00001.csv
有没有办法使用FileIO在没有分片时间戳的情况下写入文件?
提前感谢
您可以提供自己的FileIO. Writ.FileNaming
实例,它可以对时间戳、分片标识符等执行任何您想要的操作。