提问者:小点点

如何在apache光束中将json数据和sava用作拼花地板


这是困扰我两天的问题。

我可以从本地读取json数据,但是当我将其写入parquet时会出现错误。

我的代码如下:

public class parquet_save_convert {

private static final Schema SCHEMA = new Schema.Parser().parse(
        "{ \n" +
                " \"namespace\": \"com.navteq.avro\", \n" +
                " \"name\": \"FacebookUser\", \n" +
                " \"type\": \"record\",\n" +
                " \"fields\": [\n" +
                " {\"name\": \"event_level\", \"type\": \"string\"},\n" +
                " {\"name\": \"spm_page\", \"type\": \"string\"},\n" +
                " {\"name\": \"spm_module\", \"type\": \"string\"} ]\n" +
                "}");


public static void main(String[] args) {
    Gson gson=new  GsonBuilder().create();
    String outputPath = "./output/parquet";
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply(TextIO.read().from("./input/event_type.json"))
            .apply(ParDo.of(new DoFn<String,GenericRecord>(){
                @ProcessElement
                public void processElement(ProcessContext c){
                    HashMap<String,String> map= gson.fromJson(c.element().toString(),HashMap.class);

                    GenericRecord osRecord = new GenericData.Record(SCHEMA);

                    map.forEach((k,v)->{
                        osRecord.put(k,v);
                    });

                    c.output(osRecord);
                }
            }))
            .setCoder(AvroCoder.of(GenericRecord.class,SCHEMA))
            .apply(FileIO.<GenericRecord>write()
                    .via(ParquetIO.sink(SCHEMA)).to(outputPath)
                    .withSuffix(".parquet"));
    pipeline.run().waitUntilFinish();}

我的输入数据如下:

{"event_level":"item","spm_page":"Activity","spm_module":"click"}
{"event_level":"page","spm_page":"Activity","spm_module":"action"}
{"event_level":"page","spm_page":"Activity","spm_module":"click"}
{"event_level":"item","spm_page":"Activity","spm_module":"action"}

例外情况是:

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=parquet_save_convert$1@5d10455d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}

我也谷歌了这个问题,但没有得到答案,这让我感到很沮丧。

通过Apache Beam使用ParquetIO读取和写入拼花文件的示例

提前感谢。


共1个答案

匿名用户

我猜你的Gson对象是不可序列化的。解决方案可能是在DoFn的SetUp方法中将其初始化为局部变量。