提问者:小点点

使用AvroParquetInputFormat将镶木地板读入Google DataFlow


尝试将简单的Parquet文件读入我的Google DataFlow Pipeline

使用以下代码

Read.Bounded<KV<Void, GenericData>> results = HadoopFileSource.readFrom("/home/avi/tmp/db_demo/simple.parquet", AvroParquetInputFormat.class, Void.class, GenericData.class);

运行流水线时始终触发以下异常

IllegalStateException:找不到org. apache.avro.Generic.GenericData类的编码器

似乎HadoopFileSource中的这个方法不能像编码器那样处理这种类型的类

  private <T> Coder<T> getDefaultCoder(Class<T> c) {
if (Writable.class.isAssignableFrom(c)) {
  Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
  return (Coder<T>) WritableCoder.of(writableClass);
} else if (Void.class.equals(c)) {
  return (Coder<T>) VoidCoder.of();
}
// TODO: how to use registered coders here?
throw new IllegalStateException("Cannot find coder for " + c);

}

任何帮助将不胜感激

阿维


共1个答案

匿名用户

这是HadoopFileSource设计的一个问题。我建议迁移到apache-bin或(Scio),这是dataflow sdk的apache“版本”(和“未来”)。一旦你在光束上,你可以:

这将是scala(但你可以很容易地翻译成java):

HDFSFileSource.from(
  input,
  classOf[AvroParquetInputFormat[AvroSchemaClass]],
  AvroCoder.of(classOf[AvroSchemaClass]),
  new SerializableFunction[KV[Void, AvroSchemaClass], AvroSchemaClass]() {
    override def apply(e: KV[Void, AvroSchemaClass]): AvroSchemaClass =
      CoderUtils.clone(AvroCoder.of(classOf[AvroSchemaClass]), e.getValue)
  }
)

这是中接受coder的替代版本。