这是困扰我两天的问题。
我可以从本地读取json数据,但是当我将其写入parquet时会出现错误。
我的代码如下:
public class parquet_save_convert {
private static final Schema SCHEMA = new Schema.Parser().parse(
"{ \n" +
" \"namespace\": \"com.navteq.avro\", \n" +
" \"name\": \"FacebookUser\", \n" +
" \"type\": \"record\",\n" +
" \"fields\": [\n" +
" {\"name\": \"event_level\", \"type\": \"string\"},\n" +
" {\"name\": \"spm_page\", \"type\": \"string\"},\n" +
" {\"name\": \"spm_module\", \"type\": \"string\"} ]\n" +
"}");
public static void main(String[] args) {
Gson gson=new GsonBuilder().create();
String outputPath = "./output/parquet";
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.read().from("./input/event_type.json"))
.apply(ParDo.of(new DoFn<String,GenericRecord>(){
@ProcessElement
public void processElement(ProcessContext c){
HashMap<String,String> map= gson.fromJson(c.element().toString(),HashMap.class);
GenericRecord osRecord = new GenericData.Record(SCHEMA);
map.forEach((k,v)->{
osRecord.put(k,v);
});
c.output(osRecord);
}
}))
.setCoder(AvroCoder.of(GenericRecord.class,SCHEMA))
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(SCHEMA)).to(outputPath)
.withSuffix(".parquet"));
pipeline.run().waitUntilFinish();}
我的输入数据如下:
{"event_level":"item","spm_page":"Activity","spm_module":"click"}
{"event_level":"page","spm_page":"Activity","spm_module":"action"}
{"event_level":"page","spm_page":"Activity","spm_module":"click"}
{"event_level":"item","spm_page":"Activity","spm_module":"action"}
例外情况是:
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=parquet_save_convert$1@5d10455d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
我也谷歌了这个问题,但没有得到答案,这让我感到很沮丧。
通过Apache Beam使用ParquetIO读取和写入拼花文件的示例
提前感谢。
我猜你的Gson对象是不可序列化的。解决方案可能是在DoFn的SetUp方法中将其初始化为局部变量。