我的管道的这一部分应该接受一个输入,对其应用适当的元组标签,然后根据它接收到的标签对输入进行进一步处理。
运行下面的代码时,主标签(tag1)中的PCollection正常工作。但是,附加标签(tag2、tag3)会在. application()
上抛出此错误:
线程“main”java. lang.IllegalStateException中的异常:无法为Assign Output.out1[PCollection]返回默认编码器。更正以下根本原因之一:未手动指定编码器;您可以使用.setCoder()这样做。从Coder注册表推断编码器失败:无法为V提供编码器。
为什么这个错误发生在tag2上而不是tag1上?请注意,如果我将tag2作为主要输出,tag1/tag3作为附加输出并适当地重新排序代码,tag2处理是成功的,但tag1/tag3将抛出错误。
主管道:
PCollectionTuple pct = outputPair.apply("Assign Output", ParDo.of( new output())
.withOutputTags(output.tag1, TupleTagList.of(output.tag2).and(output.tag3)));
//Tag1 Output
PCollection<KV<String, outResultPair>> tagPair1 = pct.get(output.tag1)
.apply("Process", ParDo.of( new ABCOutput()))
//Tag2 Output
PCollection<KV<String, outResultPair>> tagPair2 = pct.get(output.tag2)
.apply("Process", ParDo.of( new DEFOutput())) //Error Thrown here
支持类:
//ABCOutput Class
@DefaultCoder(AvroCoder.class)
public class ABCOutput extends DoFn<KV<String, inResultPair>, KV<String, outResultPair>> {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, inResultPair> e = c.element();
c.output( processInput(e) );
}
}
//XYZOutput Class
@DefaultCoder(AvroCoder.class)
public class XYZOutput extends DoFn<KV<String, inResultPair>, KV<String, outResultPair>> {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, inResultPair> e = c.element();
c.output( processInput(e) );
}
}
//Output Splitter
@DefaultCoder(AvroCoder.class)
public class output {
private final static Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
final static TupleTag<KV<String,inResultPair>> tag1 = new TupleTag();
final static TupleTag<KV<String,inResultPair>> tag2 = new TupleTag();
final static TupleTag<KV<String,inResultPair>> tag3 = new TupleTag();
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, inResultPair> e = c.element();
KV<String, outResultPair> out = process(e);
switch(e.getValue().type){
case 1:
c.output(tag1, out);
break;
case 2:
c.output(tag2, out);
break;
case 3:
c.output(tag3, out);
break;
}
c.output();
}
}
您需要构造TupleTag
,使它们的类型信息将由Java编译器保存,而目前您将它们构造为原始类型,因此Beam的编码器推断不知道输出到此标记中的元素是什么类型。
改变:
final static TupleTag<KV<String,inResultPair>> tag1 = new TupleTag();
到:
final static TupleTag<KV<String,inResultPair>> tag1 =
new TupleTag<KV<String, inResultPair>>() {};
{}
对于保存此处的类型信息至关重要。