尝试将简单的Parquet文件读入我的Google DataFlow Pipeline
使用以下代码
Read.Bounded<KV<Void, GenericData>> results = HadoopFileSource.readFrom("/home/avi/tmp/db_demo/simple.parquet", AvroParquetInputFormat.class, Void.class, GenericData.class);
运行流水线时始终触发以下异常
IllegalStateException:找不到org. apache.avro.Generic.GenericData类的编码器
似乎HadoopFileSource中的这个方法不能像编码器那样处理这种类型的类
private <T> Coder<T> getDefaultCoder(Class<T> c) {
if (Writable.class.isAssignableFrom(c)) {
Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
return (Coder<T>) WritableCoder.of(writableClass);
} else if (Void.class.equals(c)) {
return (Coder<T>) VoidCoder.of();
}
// TODO: how to use registered coders here?
throw new IllegalStateException("Cannot find coder for " + c);
}
任何帮助将不胜感激
阿维
这是HadoopFileSource
设计的一个问题。我建议迁移到apache-bin
或(Scio),这是dataflow sdk的apache“版本”(和“未来”)。一旦你在光束上,你可以:
这将是scala(但你可以很容易地翻译成java):
HDFSFileSource.from(
input,
classOf[AvroParquetInputFormat[AvroSchemaClass]],
AvroCoder.of(classOf[AvroSchemaClass]),
new SerializableFunction[KV[Void, AvroSchemaClass], AvroSchemaClass]() {
override def apply(e: KV[Void, AvroSchemaClass]): AvroSchemaClass =
CoderUtils.clone(AvroCoder.of(classOf[AvroSchemaClass]), e.getValue)
}
)
这是中接受coder
的的替代版本。