提问者:小点点

Apache光束数据流:从Azure到GCS的文件传输


我尝试将文件从Azure容器传输到GCS存储桶,但最终出现以下问题

  1. 源文件中记录的顺序与目标文件的记录顺序不同,因为管道将进行并行处理
  2. 必须编写大量自定义代码来为GCS目标文件提供自定义名称,因为管道为其提供默认名称。

无论如何,Apache管道是否可以在不处理文件内容的情况下传输文件本身(因此,不会发生上述问题)?因为我需要将多个文件从Azure容器传输到GCS存储桶

下面是我目前用来传输文件的代码

String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("YYYY_MM_DD_HH_MM_SS3")).toString();

String connectionString = "<<AZURE_STORAGE_CONNECTION_STRING>>"; 
        
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BlobstoreOptions.class).setAzureConnectionString(connectionString);
        
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("azfs://storageaccountname/containername/CSVSample.csv"))
.apply("",FileIO.<String>write().to("azfs://storageaccountname/containername/"+format+"/").withNumShards(1).withSuffix(".csv")
        .via(TextIO.sink()));
p.run().waitUntilFinish();

共1个答案

匿名用户

您应该能够为此目的使用FileIO转换。

例如(未经测试的伪代码),

FileIO.match().filepattern("azfs://storageaccountname/containername/CSVSample.csv")
.apply(FileIO.readMatches())
.apply(ParDo.of(new MyWriteDoFn()));

MyWriteDoFn()上面是一个DoFn,它从单个文件中读取字节(使用AzureBlobStoreFileSystem)并写入GCS(使用GCSFileSystem)。您可以使用FileSystems类中具有正确前缀的静态方法,而不是直接调用底层FileSystem实现的方法。