我尝试将文件从Azure容器传输到GCS存储桶,但最终出现以下问题
无论如何,Apache管道是否可以在不处理文件内容的情况下传输文件本身(因此,不会发生上述问题)?因为我需要将多个文件从Azure容器传输到GCS存储桶
下面是我目前用来传输文件的代码
String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("YYYY_MM_DD_HH_MM_SS3")).toString();
String connectionString = "<<AZURE_STORAGE_CONNECTION_STRING>>";
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BlobstoreOptions.class).setAzureConnectionString(connectionString);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("azfs://storageaccountname/containername/CSVSample.csv"))
.apply("",FileIO.<String>write().to("azfs://storageaccountname/containername/"+format+"/").withNumShards(1).withSuffix(".csv")
.via(TextIO.sink()));
p.run().waitUntilFinish();
您应该能够为此目的使用FileIO转换。
例如(未经测试的伪代码),
FileIO.match().filepattern("azfs://storageaccountname/containername/CSVSample.csv")
.apply(FileIO.readMatches())
.apply(ParDo.of(new MyWriteDoFn()));
在MyWriteDoFn()
上面是一个DoFn
,它从单个文件中读取字节(使用AzureBlobStoreFileSystem)并写入GCS(使用GCSFileSystem)。您可以使用FileSystems类中具有正确前缀的静态方法,而不是直接调用底层FileSystem实现的方法。