我有一个无限的复杂对象流,我想将其加载到BigQuery中。这些对象的结构代表了我在BigQuery中目标表的模式。
问题是,由于POJO中有很多嵌套字段,将其转换为TableSchema
对象是一项极其繁琐的任务,我正在寻找一种快速/自动化的方法来将我的POJO转换为TableSchema
对象,同时写入BigQuery。
我不是很熟悉Apache BeamAPI,任何帮助将不胜感激。
在管道中,我从GCS加载模式列表。我将它们保持为字符串格式,因为TableSchema不可序列化。但是,我将它们加载到TableSchema以验证它们。然后我将它们以字符串格式添加到Option对象中的映射中。
String schema = new String(blob.getContent());
// Decorate list of fields for allowing a correct parsing
String targetSchema = "{\"fields\":" + schema + "}";
try {
//Preload schema to ensure validity, but then use string version
Transport.getJsonFactory().fromString(targetSchema, TableSchema.class);
String tableName = blob.getName().replace(SCHEMA_FILE_PREFIX, "").replace(SCHEMA_FILE_SUFFIX, "");
tableSchemaStringMap.put(tableName, targetSchema);
} catch (IOException e) {
logger.warn("impossible to read schema " + blob.getName() + " in bucket gs://" + options.getSchemaBucket());
}
当我开发这个时,我没有找到另一个解决方案。
在我的公司,我创建了一种ORM(我们称之为OBQM)来做到这一点。我们希望将其发布给公众。代码相当大(特别是因为我创建了注释等),但我可以与您分享一些快速生成模式的片段:
public TableSchema generateTableSchema(@Nonnull final Class cls) {
final TableSchema tableSchema = new TableSchema();
tableSchema.setFields(generateFieldsSchema(cls));
return tableSchema;
}
public List<TableFieldSchema> generateFieldsSchema(@Nonnull final Class cls) {
final List<TableFieldSchema> schemaFields = new ArrayList<>();
final Field[] clsFields = cls.getFields();
for (final Field field : clsFields) {
schemaFields.add(fromFieldToSchemaField(field));
}
return schemaFields;
}
这段代码从POJO类中获取所有字段,并创建一个TableSchema
对象(BigQueryIO在ApacheBeam中使用的对象)。您可以看到我创建的一个名为fromFieldToSchemaField
的方法。此方法标识每个字段类型并设置字段名称、模式、描述和类型。在这种情况下,为了保持简单,我将重点关注类型和名称:
public static TableFieldSchema fromFieldToSchemaField(@Nonnull final Field field) {
return fromFieldToSchemaField(field, 0);
}
public static TableFieldSchema fromFieldToSchemaField(
@Nonnull final Field field,
final int iteration) {
final TableFieldSchema schemaField = new TableFieldSchema();
final Type customType = field.getGenericType().getTypeName()
schemaField.setName(field.getName());
schemaField.setMode("NULLABLE"); // You can add better logic here, we use annotations to override this value
schemaField.setType(getFieldTypeString(field));
schemaField.setDescription("Optional"); // Optional
if (iteration < MAX_RECURSION
&& (isStruct(schemaField.getType())
|| isRecord(schemaField.getType()))) {
final List<TableFieldSchema> schemaFields = new ArrayList<>();
final Field[] fields = getFieldsFromComplexObjectField(field);
for (final Field subField : fields) {
schemaFields.add(
fromFieldToSchemaField(
subField, iteration + 1));
}
schemaField.setFields(schemaFields.isEmpty() ? null : schemaFields);
}
return schemaField;
}
现在是返回BigQuery字段类型的方法。
public static String getFieldTypeString(@Nonnull final Field field) {
// On my side this code is much complex but this is a short version of that
final Class<?> cls = (Class<?>) field.getGenericType()
if (cls.isAssignableFrom(String.class)) {
return "STRING";
} else if (cls.isAssignableFrom(Integer.class) || cls.isAssignableFrom(Short.class)) {
return "INT64";
} else if (cls.isAssignableFrom(Double.class)) {
return "NUMERIC";
} else if (cls.isAssignableFrom(Float.class)) {
return "FLOAT64";
} else if (cls.isAssignableFrom(Boolean.class)) {
return "BOOLEAN";
} else if (cls.isAssignableFrom(Double.class)) {
return "BYTES";
} else if (cls.isAssignableFrom(Date.class)
|| cls.isAssignableFrom(DateTime.class)) {
return "TIMESTAMP";
} else {
return "STRUCT";
}
}
请记住,我不是在展示如何识别原始类型或数组。但这对您的代码来说是一个好的开始:)。如果您需要任何帮助,请告诉我。
如果您使用JSONPubSub中的消息序列化,您可以使用提供的模板之一:
PubSub到BigQuery模板
该模板的代码在这里:
PubSubToBigQuery.java