Java源码示例:org.apache.beam.sdk.schemas.Schema
示例1
@Test
public void testExistsSubquery() {
String sql =
"select * from CUSTOMER "
+ " where exists ( "
+ " select * from ORDERS "
+ " where o_custkey = c_custkey )";
PCollection<Row> rows = compilePipeline(sql, pipeline);
PAssert.that(rows)
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
Schema.FieldType.INT32, "c_custkey",
Schema.FieldType.DOUBLE, "c_acctbal",
Schema.FieldType.STRING, "c_city")
.addRows(1, 1.0, "Seattle")
.getRows());
pipeline.run().waitUntilFinish();
}
示例2
@Test
public void testArrayRowArray() {
Schema f1 = Schema.builder().addStringField("f0").build();
Schema f2 = Schema.builder().addArrayField("f1", FieldType.row(f1)).build();
Schema f3 = Schema.builder().addRowField("f2", f2).build();
Schema f4 = Schema.builder().addArrayField("f3", FieldType.row(f3)).build();
Row r1 = Row.withSchema(f1).addValue("first").build();
Row r2 = Row.withSchema(f2).addArray(r1, r1).build();
Row r3 = Row.withSchema(f3).addValue(r2).build();
Row r4 = Row.withSchema(f4).addArray(r3, r3).build();
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("f3.f2.f1.f0").resolve(f4);
Schema outputSchema = SelectHelpers.getOutputSchema(f4, fieldAccessDescriptor);
Schema expectedSchema =
Schema.builder().addArrayField("f0", FieldType.array(FieldType.STRING)).build();
assertEquals(expectedSchema, outputSchema);
Row out = selectRow(f4, fieldAccessDescriptor, r4);
Row expected =
Row.withSchema(outputSchema)
.addArray(Lists.newArrayList("first", "first"), Lists.newArrayList("first", "first"))
.build();
assertEquals(expected, out);
}
示例3
@Test
public void testAccessMapElement() {
PCollection<Row> input = pCollectionOf2Elements();
Schema resultType =
Schema.builder().addNullableField("f_mapElem", Schema.FieldType.INT32).build();
PCollection<Row> result =
input.apply(
"sqlQuery", SqlTransform.query("SELECT f_intStringMap['key11'] FROM PCOLLECTION"));
PAssert.that(result)
.containsInAnyOrder(
Row.withSchema(resultType).addValues(11).build(),
Row.withSchema(resultType).addValue(null).build());
pipeline.run();
}
示例4
public static Row castRow(Row input, Schema inputSchema, Schema outputSchema) {
if (input == null) {
return null;
}
Row.Builder output = Row.withSchema(outputSchema);
for (int i = 0; i < outputSchema.getFieldCount(); i++) {
Schema.Field outputField = outputSchema.getField(i);
int fromFieldIdx = inputSchema.indexOf(outputField.getName());
Schema.Field inputField = inputSchema.getField(fromFieldIdx);
Object inputValue = input.getValue(fromFieldIdx);
Object outputValue = castValue(inputValue, inputField.getType(), outputField.getType());
output.addValue(outputValue);
}
return output.build();
}
示例5
@Test
public void testOrderBy_bigFetch() {
String sql =
"INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
+ " order_id, site_id, price "
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 11";
PCollection<Row> rows = compilePipeline(sql, pipeline);
PAssert.that(rows)
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
Schema.FieldType.INT64, "order_id",
Schema.FieldType.INT32, "site_id",
Schema.FieldType.DOUBLE, "price")
.addRows(
1L, 2, 1.0, 1L, 1, 2.0, 2L, 4, 3.0, 2L, 1, 4.0, 5L, 5, 5.0, 6L, 6, 6.0, 7L, 7,
7.0, 8L, 8888, 8.0, 8L, 999, 9.0, 10L, 100, 10.0)
.getRows());
pipeline.run().waitUntilFinish();
}
示例6
@Test
public void testOrderBy_basic() {
String sql =
"INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
+ " order_id, site_id, price "
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 4";
PCollection<Row> rows = compilePipeline(sql, pipeline);
PAssert.that(rows)
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
Schema.FieldType.INT64, "order_id",
Schema.FieldType.INT32, "site_id",
Schema.FieldType.DOUBLE, "price")
.addRows(1L, 2, 1.0, 1L, 1, 2.0, 2L, 4, 3.0, 2L, 1, 4.0)
.getRows());
pipeline.run().waitUntilFinish();
}
示例7
@Test
public void testIOSourceRel_withSupportedAndUnsupportedPredicate() {
String selectTableStatement = "SELECT name FROM TEST where id+unused1=101 and id=1";
BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
assertEquals(
"BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{=($1, 1)}, unsupported{=(+($1, $0), 101)}])",
beamRelNode.getInput(0).getDigest());
// Make sure project push-down was done
List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
assertThat(a, containsInAnyOrder("name", "id", "unused1"));
assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
示例8
/** {@link CombineFn} for Sum based on {@link Sum} and {@link Combine.BinaryCombineFn}. */
static CombineFn createSum(Schema.FieldType fieldType) {
switch (fieldType.getTypeName()) {
case INT32:
return Sum.ofIntegers();
case INT16:
return new ShortSum();
case BYTE:
return new ByteSum();
case INT64:
return Sum.ofLongs();
case FLOAT:
return new FloatSum();
case DOUBLE:
return Sum.ofDoubles();
case DECIMAL:
return new BigDecimalSum();
default:
throw new UnsupportedOperationException(
String.format("[%s] is not supported in SUM", fieldType));
}
}
示例9
/**
* Trivial programs project precisely their input fields, without dropping or re-ordering them.
*
* @see <a href="https://issues.apache.org/jira/browse/BEAM-6810">BEAM-6810</a>
*/
@Test
public void testTrivialProjection() {
String sql = "SELECT c_int64 as abc FROM PCOLLECTION";
Schema inputSchema = Schema.of(Schema.Field.of("c_int64", Schema.FieldType.INT64));
Schema outputSchema = Schema.of(Schema.Field.of("abc", Schema.FieldType.INT64));
PCollection<Row> input =
pipeline.apply(
Create.of(Row.withSchema(inputSchema).addValue(42L).build())
.withRowSchema(inputSchema));
PCollection<Row> result = input.apply(SqlTransform.query(sql));
Assert.assertEquals(outputSchema, result.getSchema());
PAssert.that(result).containsInAnyOrder(Row.withSchema(outputSchema).addValue(42L).build());
pipeline.run();
}
示例10
@Experimental(Kind.SCHEMAS)
public TableDataInsertAllResponse insertRows(Schema rowSchema, Row... rows) throws IOException {
List<Rows> bqRows =
Arrays.stream(rows)
.map(row -> new Rows().setJson(BigQueryUtils.toTableRow(row)))
.collect(ImmutableList.toImmutableList());
Bigquery bq = newBigQueryClient(pipelineOptions);
return bq.tabledata()
.insertAll(
pipelineOptions.getProject(),
pipelineOptions.getTargetDataset(),
table.getTableReference().getTableId(),
new TableDataInsertAllRequest().setRows(bqRows))
.execute();
}
示例11
@Override
public Row processRow(
RowPosition rowPosition, Schema schema, Row value, RowFieldMatcher matcher) {
FieldOverride override = override(rowPosition);
Row retValue = value;
if (override != null) {
retValue = (Row) override.getOverrideValue();
} else if (fieldOverrides.hasOverrideBelow(rowPosition.descriptor)) {
List<Object> values = Lists.newArrayListWithCapacity(schema.getFieldCount());
for (int i = 0; i < schema.getFieldCount(); ++i) {
FieldAccessDescriptor nestedDescriptor =
FieldAccessDescriptor.withFieldIds(rowPosition.descriptor, i).resolve(topSchema);
Object fieldValue = (value != null) ? value.getValue(i) : null;
values.add(
matcher.match(
this,
schema.getField(i).getType(),
new RowPosition(nestedDescriptor),
fieldValue));
}
retValue = new RowWithStorage(schema, values);
}
return retValue;
}
示例12
@Test
public void testIsInf() throws Exception {
Schema resultType =
Schema.builder()
.addBooleanField("field_1")
.addBooleanField("field_2")
.addBooleanField("field_3")
.addBooleanField("field_4")
.build();
Row resultRow = Row.withSchema(resultType).addValues(true, true, true, true).build();
String sql =
"SELECT IS_INF(f_float_1), IS_INF(f_double_1), IS_INF(f_float_2), IS_INF(f_double_2) FROM PCOLLECTION";
PCollection<Row> result = boundedInputFloatDouble.apply("testUdf", SqlTransform.query(sql));
PAssert.that(result).containsInAnyOrder(resultRow);
pipeline.run().waitUntilFinish();
}
示例13
@Test
public void testMd5() throws Exception {
Schema resultType = Schema.builder().addByteArrayField("field").build();
Row resultRow1 =
Row.withSchema(resultType).addValues(DigestUtils.md5("foobar".getBytes(UTF_8))).build();
Row resultRow2 =
Row.withSchema(resultType).addValues(DigestUtils.md5(" ".getBytes(UTF_8))).build();
Row resultRow3 =
Row.withSchema(resultType)
.addValues(DigestUtils.md5("abcABCжщфЖЩФ".getBytes(UTF_8)))
.build();
String sql = "SELECT MD5(f_bytes) FROM PCOLLECTION WHERE f_func = 'HashingFn'";
PCollection<Row> result = boundedInputBytes.apply("testUdf", SqlTransform.query(sql));
PAssert.that(result).containsInAnyOrder(resultRow1, resultRow2, resultRow3);
pipeline.run().waitUntilFinish();
}
示例14
public static boolean deepEquals(Object a, Object b, Schema.FieldType fieldType) {
if (a == null || b == null) {
return a == b;
} else if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE) {
return deepEquals(a, b, fieldType.getLogicalType().getBaseType());
} else if (fieldType.getTypeName() == Schema.TypeName.BYTES) {
return Arrays.equals((byte[]) a, (byte[]) b);
} else if (fieldType.getTypeName() == TypeName.ARRAY) {
return deepEqualsForCollection(
(Collection<Object>) a, (Collection<Object>) b, fieldType.getCollectionElementType());
} else if (fieldType.getTypeName() == TypeName.ITERABLE) {
return deepEqualsForIterable(
(Iterable<Object>) a, (Iterable<Object>) b, fieldType.getCollectionElementType());
} else if (fieldType.getTypeName() == Schema.TypeName.MAP) {
return deepEqualsForMap(
(Map<Object, Object>) a, (Map<Object, Object>) b, fieldType.getMapValueType());
} else {
return Objects.equals(a, b);
}
}
示例15
@Test
public void testConsistentWithEqualsArrayOfArrayOfBytes() throws Exception {
FieldType fieldType = FieldType.array(FieldType.array(FieldType.BYTES));
Schema schema = Schema.of(Schema.Field.of("f1", fieldType));
RowCoder coder = RowCoder.of(schema);
List<byte[]> innerList1 = Collections.singletonList(new byte[] {1, 2, 3, 4});
List<List<byte[]>> list1 = Collections.singletonList(innerList1);
Row row1 = Row.withSchema(schema).addValue(list1).build();
List<byte[]> innerList2 = Collections.singletonList(new byte[] {1, 2, 3, 4});
List<List<byte[]>> list2 = Collections.singletonList(innerList2);
Row row2 = Row.withSchema(schema).addValue(list2).build();
Assume.assumeTrue(coder.consistentWithEquals());
CoderProperties.coderConsistentWithEquals(coder, row1, row2);
}
示例16
@Test
public void testSelectMapOfRowSelectAll() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("map{}.*").resolve(MAP_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema =
Schema.builder()
.addMapField("field1", FieldType.INT32, FieldType.STRING)
.addMapField("field2", FieldType.INT32, FieldType.INT32)
.addMapField("field3", FieldType.INT32, FieldType.DOUBLE)
.addMapField("field_extra", FieldType.INT32, FieldType.STRING)
.build();
assertEquals(expectedSchema, outputSchema);
Row row = selectRow(MAP_SCHEMA, fieldAccessDescriptor, MAP_ROW);
Row expectedRow =
Row.withSchema(expectedSchema)
.addValue(ImmutableMap.of(1, FLAT_ROW.getValue(0)))
.addValue(ImmutableMap.of(1, FLAT_ROW.getValue(1)))
.addValue(ImmutableMap.of(1, FLAT_ROW.getValue(2)))
.addValue(ImmutableMap.of(1, FLAT_ROW.getValue(3)))
.build();
assertEquals(expectedRow, row);
}
示例17
@Test
public void testCreatesTable() {
PubsubJsonTableProvider provider = new PubsubJsonTableProvider();
Schema messageSchema =
Schema.builder()
.addDateTimeField("event_timestamp")
.addMapField("attributes", VARCHAR, VARCHAR)
.addRowField("payload", Schema.builder().build())
.build();
Table tableDefinition = tableDefinition().schema(messageSchema).build();
BeamSqlTable pubsubTable = provider.buildBeamSqlTable(tableDefinition);
assertNotNull(pubsubTable);
assertEquals(messageSchema, pubsubTable.getSchema());
}
示例18
@Test
public void testRightOuterJoin() throws Exception {
String sql =
"SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+ " ORDER_DETAILS1 o2 "
+ " RIGHT OUTER JOIN "
+ "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ " on "
+ " o1.order_id=o2.order_id";
PCollection<Row> rows = compilePipeline(sql, pipeline);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
Schema.builder()
.addField("order_id", Schema.FieldType.INT32)
.addField("sum_site_id", Schema.FieldType.INT32)
.addNullableField("buyer", Schema.FieldType.STRING)
.build())
.addRows(1, 3, "james", 2, 5, "bond", 3, 3, null)
.getStringRows());
pipeline.run();
}
示例19
static boolean iterablesEquivalent(
Iterable<Object> expected, Iterable<Object> actual, Schema.FieldType elementType) {
if (expected == actual) {
return true;
}
Iterator<Object> actualIter = actual.iterator();
for (Object currentExpected : expected) {
if (!actualIter.hasNext()) {
return false;
}
if (!fieldsEquivalent(currentExpected, actualIter.next(), elementType)) {
return false;
}
}
return !actualIter.hasNext();
}
示例20
private static AddFieldsInformation getAddFieldsInformation(
Schema.FieldType inputFieldType, Collection<NewField> nestedFields) {
AddFieldsInformation addFieldsInformation;
Schema.FieldType fieldType;
switch (inputFieldType.getTypeName()) {
case ROW:
addFieldsInformation =
getAddFieldsInformation(inputFieldType.getRowSchema(), nestedFields);
fieldType = addFieldsInformation.getOutputFieldType();
break;
case ARRAY:
addFieldsInformation =
getAddFieldsInformation(inputFieldType.getCollectionElementType(), nestedFields);
fieldType = Schema.FieldType.array(addFieldsInformation.getOutputFieldType());
break;
case ITERABLE:
addFieldsInformation =
getAddFieldsInformation(inputFieldType.getCollectionElementType(), nestedFields);
fieldType = Schema.FieldType.iterable(addFieldsInformation.getOutputFieldType());
break;
case MAP:
addFieldsInformation =
getAddFieldsInformation(inputFieldType.getMapValueType(), nestedFields);
fieldType =
Schema.FieldType.map(
inputFieldType.getMapKeyType(), addFieldsInformation.getOutputFieldType());
break;
default:
throw new RuntimeException("Cannot select a subfield of a non-composite type.");
}
fieldType = fieldType.withNullable(inputFieldType.getNullable());
return addFieldsInformation.toBuilder().setOutputFieldType(fieldType).build();
}
示例21
@Override
public FeatureRowsBatch decode(InputStream inStream) throws CoderException, IOException {
Schema schema = schemaCoder.decode(inStream);
String reference = referenceCoder.decode(inStream);
return FeatureRowsBatch.fromRow(getDelegateCoder(schema).decode(inStream))
.withFeatureSetReference(reference);
}
示例22
public SelectDoFn(
FieldAccessDescriptor fieldAccessDescriptor, Schema inputSchema, Schema outputSchema) {
this.fieldAccessDescriptor = fieldAccessDescriptor;
this.inputSchema = inputSchema;
this.outputSchema = outputSchema;
this.rowSelector = new RowSelectorContainer(inputSchema, fieldAccessDescriptor, true);
}
示例23
@Test
public void testTimestampWithZeroTimezone() throws Exception {
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT);
TestTableProvider tableProvider = new TestTableProvider();
Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
// A table with one TIMESTAMP column
Schema schema = Schema.builder().addDateTimeField("ts").build();
connection
.createStatement()
.executeUpdate("CREATE EXTERNAL TABLE test (ts TIMESTAMP) TYPE 'test'");
ReadableInstant july1 =
ISODateTimeFormat.dateTimeParser().parseDateTime("2018-07-01T01:02:03Z");
tableProvider.addRows("test", Row.withSchema(schema).addValue(july1).build());
ResultSet selectResult =
connection.createStatement().executeQuery(String.format("SELECT ts FROM test"));
selectResult.next();
Timestamp ts = selectResult.getTimestamp(1, cal);
assertThat(
String.format(
"Wrote %s to a table, but got back %s",
ISODateTimeFormat.basicDateTime().print(july1),
ISODateTimeFormat.basicDateTime().print(ts.getTime())),
ts.getTime(),
equalTo(july1.getMillis()));
}
示例24
@Test
public void testOptionsMessageOnField() {
Schema schema = ProtoSchemaTranslator.getSchema(Proto3SchemaMessages.OptionMessage.class);
Schema.Options options = schema.getField("field_one").getOptions();
Row optionMessage =
options.getValue("beam:option:proto:field:proto3_schema_options.field_option_message");
assertEquals("foobar in field", optionMessage.getString("single_string"));
assertEquals(Integer.valueOf(56), optionMessage.getInt32("single_int32"));
assertEquals(Long.valueOf(78L), optionMessage.getInt64("single_int64"));
}
示例25
/** Get generated getters for an AVRO-generated SpecificRecord or a POJO. */
public static <T> List<FieldValueGetter> getGetters(Class<T> clazz, Schema schema) {
if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) {
return JavaBeanUtils.getGetters(
clazz,
schema,
new AvroSpecificRecordFieldValueTypeSupplier(),
new AvroTypeConversionFactory());
} else {
return POJOUtils.getGetters(
clazz, schema, new AvroPojoFieldValueTypeSupplier(), new AvroTypeConversionFactory());
}
}
示例26
/** Generates an insert statement based on {@link Schema.Field}. * */
static String generateStatement(String tableName, List<Schema.Field> fields) {
String fieldNames =
IntStream.range(0, fields.size())
.mapToObj((index) -> fields.get(index).getName())
.collect(Collectors.joining(", "));
String valuePlaceholder =
IntStream.range(0, fields.size())
.mapToObj((index) -> "?")
.collect(Collectors.joining(", "));
return String.format("INSERT INTO %s(%s) VALUES(%s)", tableName, fieldNames, valuePlaceholder);
}
示例27
@SuppressWarnings("unchecked")
public static <T> SerializableFunction<T, Row> getToRowFunction(
Class<T> clazz, @Nullable org.apache.avro.Schema schema) {
if (GenericRecord.class.equals(clazz)) {
Schema beamSchema = toBeamSchema(schema);
return (SerializableFunction<T, Row>) getGenericRecordToRowFunction(beamSchema);
} else {
return new AvroRecordSchema().toRowFunction(TypeDescriptor.of(clazz));
}
}
示例28
@Test
public void testBitOrFunction() throws Exception {
pipeline.enableAbandonedNodeEnforcement(false);
Schema schemaInTableA =
Schema.builder().addInt64Field("f_long").addInt32Field("f_int2").build();
Schema resultType = Schema.builder().addInt64Field("finalAnswer").build();
List<Row> rowsInTableA =
TestUtils.RowsBuilder.of(schemaInTableA)
.addRows(
0xF001L, 0,
0x00A1L, 0,
44L, 0)
.getRows();
String sql = "SELECT bit_or(f_long) as bitor " + "FROM PCOLLECTION GROUP BY f_int2";
Row rowResult = Row.withSchema(resultType).addValues(61613L).build();
PCollection<Row> inputRows =
pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
PAssert.that(result).containsInAnyOrder(rowResult);
pipeline.run().waitUntilFinish();
}
示例29
@Test
public void testRowWithArray() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
PCollection<Row> stream =
BeamSqlRelUtils.toPCollection(
pipeline,
sqlEnv.parseQuery(
"SELECT rowWithArrayTestTable.col.field3[2] FROM rowWithArrayTestTable"));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("int64").build()).addValue(6L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
示例30
@Test
public void testUnwrapNullableSchemaReordered() {
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createUnion(
org.apache.avro.Schema.create(Type.STRING), org.apache.avro.Schema.create(Type.NULL));
TypeWithNullability typeWithNullability = new TypeWithNullability(avroSchema);
assertTrue(typeWithNullability.nullable);
assertEquals(org.apache.avro.Schema.create(Type.STRING), typeWithNullability.type);
}