Java源码示例:org.apache.beam.sdk.schemas.Schema

示例1
@Test
public void testExistsSubquery() {
  String sql =
      "select * from CUSTOMER "
          + " where exists ( "
          + " select * from ORDERS "
          + " where o_custkey = c_custkey )";

  PCollection<Row> rows = compilePipeline(sql, pipeline);
  PAssert.that(rows)
      .containsInAnyOrder(
          TestUtils.RowsBuilder.of(
                  Schema.FieldType.INT32, "c_custkey",
                  Schema.FieldType.DOUBLE, "c_acctbal",
                  Schema.FieldType.STRING, "c_city")
              .addRows(1, 1.0, "Seattle")
              .getRows());

  pipeline.run().waitUntilFinish();
}
 
示例2
@Test
public void testArrayRowArray() {
  Schema f1 = Schema.builder().addStringField("f0").build();
  Schema f2 = Schema.builder().addArrayField("f1", FieldType.row(f1)).build();
  Schema f3 = Schema.builder().addRowField("f2", f2).build();
  Schema f4 = Schema.builder().addArrayField("f3", FieldType.row(f3)).build();

  Row r1 = Row.withSchema(f1).addValue("first").build();
  Row r2 = Row.withSchema(f2).addArray(r1, r1).build();
  Row r3 = Row.withSchema(f3).addValue(r2).build();
  Row r4 = Row.withSchema(f4).addArray(r3, r3).build();

  FieldAccessDescriptor fieldAccessDescriptor =
      FieldAccessDescriptor.withFieldNames("f3.f2.f1.f0").resolve(f4);
  Schema outputSchema = SelectHelpers.getOutputSchema(f4, fieldAccessDescriptor);
  Schema expectedSchema =
      Schema.builder().addArrayField("f0", FieldType.array(FieldType.STRING)).build();
  assertEquals(expectedSchema, outputSchema);
  Row out = selectRow(f4, fieldAccessDescriptor, r4);
  Row expected =
      Row.withSchema(outputSchema)
          .addArray(Lists.newArrayList("first", "first"), Lists.newArrayList("first", "first"))
          .build();
  assertEquals(expected, out);
}
 
示例3
@Test
public void testAccessMapElement() {
  PCollection<Row> input = pCollectionOf2Elements();

  Schema resultType =
      Schema.builder().addNullableField("f_mapElem", Schema.FieldType.INT32).build();

  PCollection<Row> result =
      input.apply(
          "sqlQuery", SqlTransform.query("SELECT f_intStringMap['key11'] FROM PCOLLECTION"));

  PAssert.that(result)
      .containsInAnyOrder(
          Row.withSchema(resultType).addValues(11).build(),
          Row.withSchema(resultType).addValue(null).build());

  pipeline.run();
}
 
示例4
public static Row castRow(Row input, Schema inputSchema, Schema outputSchema) {
  if (input == null) {
    return null;
  }

  Row.Builder output = Row.withSchema(outputSchema);
  for (int i = 0; i < outputSchema.getFieldCount(); i++) {
    Schema.Field outputField = outputSchema.getField(i);

    int fromFieldIdx = inputSchema.indexOf(outputField.getName());
    Schema.Field inputField = inputSchema.getField(fromFieldIdx);

    Object inputValue = input.getValue(fromFieldIdx);
    Object outputValue = castValue(inputValue, inputField.getType(), outputField.getType());

    output.addValue(outputValue);
  }

  return output.build();
}
 
示例5
@Test
public void testOrderBy_bigFetch() {
  String sql =
      "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
          + " order_id, site_id, price "
          + "FROM ORDER_DETAILS "
          + "ORDER BY order_id asc, site_id desc limit 11";

  PCollection<Row> rows = compilePipeline(sql, pipeline);
  PAssert.that(rows)
      .containsInAnyOrder(
          TestUtils.RowsBuilder.of(
                  Schema.FieldType.INT64, "order_id",
                  Schema.FieldType.INT32, "site_id",
                  Schema.FieldType.DOUBLE, "price")
              .addRows(
                  1L, 2, 1.0, 1L, 1, 2.0, 2L, 4, 3.0, 2L, 1, 4.0, 5L, 5, 5.0, 6L, 6, 6.0, 7L, 7,
                  7.0, 8L, 8888, 8.0, 8L, 999, 9.0, 10L, 100, 10.0)
              .getRows());
  pipeline.run().waitUntilFinish();
}
 
示例6
@Test
public void testOrderBy_basic() {
  String sql =
      "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
          + " order_id, site_id, price "
          + "FROM ORDER_DETAILS "
          + "ORDER BY order_id asc, site_id desc limit 4";

  PCollection<Row> rows = compilePipeline(sql, pipeline);
  PAssert.that(rows)
      .containsInAnyOrder(
          TestUtils.RowsBuilder.of(
                  Schema.FieldType.INT64, "order_id",
                  Schema.FieldType.INT32, "site_id",
                  Schema.FieldType.DOUBLE, "price")
              .addRows(1L, 2, 1.0, 1L, 1, 2.0, 2L, 4, 3.0, 2L, 1, 4.0)
              .getRows());
  pipeline.run().waitUntilFinish();
}
 
示例7
@Test
public void testIOSourceRel_withSupportedAndUnsupportedPredicate() {
  String selectTableStatement = "SELECT name FROM TEST where id+unused1=101 and id=1";

  BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
  PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);

  assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
  assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
  assertEquals(
      "BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{=($1, 1)}, unsupported{=(+($1, $0), 101)}])",
      beamRelNode.getInput(0).getDigest());
  // Make sure project push-down was done
  List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
  assertThat(a, containsInAnyOrder("name", "id", "unused1"));

  assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
  PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));

  pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
 
示例8
/** {@link CombineFn} for Sum based on {@link Sum} and {@link Combine.BinaryCombineFn}. */
static CombineFn createSum(Schema.FieldType fieldType) {
  switch (fieldType.getTypeName()) {
    case INT32:
      return Sum.ofIntegers();
    case INT16:
      return new ShortSum();
    case BYTE:
      return new ByteSum();
    case INT64:
      return Sum.ofLongs();
    case FLOAT:
      return new FloatSum();
    case DOUBLE:
      return Sum.ofDoubles();
    case DECIMAL:
      return new BigDecimalSum();
    default:
      throw new UnsupportedOperationException(
          String.format("[%s] is not supported in SUM", fieldType));
  }
}
 
示例9
/**
 * Trivial programs project precisely their input fields, without dropping or re-ordering them.
 *
 * @see <a href="https://issues.apache.org/jira/browse/BEAM-6810">BEAM-6810</a>
 */
@Test
public void testTrivialProjection() {
  String sql = "SELECT c_int64 as abc FROM PCOLLECTION";
  Schema inputSchema = Schema.of(Schema.Field.of("c_int64", Schema.FieldType.INT64));
  Schema outputSchema = Schema.of(Schema.Field.of("abc", Schema.FieldType.INT64));

  PCollection<Row> input =
      pipeline.apply(
          Create.of(Row.withSchema(inputSchema).addValue(42L).build())
              .withRowSchema(inputSchema));

  PCollection<Row> result = input.apply(SqlTransform.query(sql));

  Assert.assertEquals(outputSchema, result.getSchema());

  PAssert.that(result).containsInAnyOrder(Row.withSchema(outputSchema).addValue(42L).build());

  pipeline.run();
}
 
示例10
@Experimental(Kind.SCHEMAS)
public TableDataInsertAllResponse insertRows(Schema rowSchema, Row... rows) throws IOException {
  List<Rows> bqRows =
      Arrays.stream(rows)
          .map(row -> new Rows().setJson(BigQueryUtils.toTableRow(row)))
          .collect(ImmutableList.toImmutableList());
  Bigquery bq = newBigQueryClient(pipelineOptions);

  return bq.tabledata()
      .insertAll(
          pipelineOptions.getProject(),
          pipelineOptions.getTargetDataset(),
          table.getTableReference().getTableId(),
          new TableDataInsertAllRequest().setRows(bqRows))
      .execute();
}
 
示例11
@Override
public Row processRow(
    RowPosition rowPosition, Schema schema, Row value, RowFieldMatcher matcher) {
  FieldOverride override = override(rowPosition);
  Row retValue = value;
  if (override != null) {
    retValue = (Row) override.getOverrideValue();
  } else if (fieldOverrides.hasOverrideBelow(rowPosition.descriptor)) {
    List<Object> values = Lists.newArrayListWithCapacity(schema.getFieldCount());
    for (int i = 0; i < schema.getFieldCount(); ++i) {
      FieldAccessDescriptor nestedDescriptor =
          FieldAccessDescriptor.withFieldIds(rowPosition.descriptor, i).resolve(topSchema);
      Object fieldValue = (value != null) ? value.getValue(i) : null;
      values.add(
          matcher.match(
              this,
              schema.getField(i).getType(),
              new RowPosition(nestedDescriptor),
              fieldValue));
    }
    retValue = new RowWithStorage(schema, values);
  }
  return retValue;
}
 
示例12
@Test
public void testIsInf() throws Exception {
  Schema resultType =
      Schema.builder()
          .addBooleanField("field_1")
          .addBooleanField("field_2")
          .addBooleanField("field_3")
          .addBooleanField("field_4")
          .build();
  Row resultRow = Row.withSchema(resultType).addValues(true, true, true, true).build();

  String sql =
      "SELECT IS_INF(f_float_1), IS_INF(f_double_1), IS_INF(f_float_2), IS_INF(f_double_2) FROM PCOLLECTION";
  PCollection<Row> result = boundedInputFloatDouble.apply("testUdf", SqlTransform.query(sql));
  PAssert.that(result).containsInAnyOrder(resultRow);
  pipeline.run().waitUntilFinish();
}
 
示例13
@Test
public void testMd5() throws Exception {
  Schema resultType = Schema.builder().addByteArrayField("field").build();
  Row resultRow1 =
      Row.withSchema(resultType).addValues(DigestUtils.md5("foobar".getBytes(UTF_8))).build();
  Row resultRow2 =
      Row.withSchema(resultType).addValues(DigestUtils.md5(" ".getBytes(UTF_8))).build();
  Row resultRow3 =
      Row.withSchema(resultType)
          .addValues(DigestUtils.md5("abcABCжщфЖЩФ".getBytes(UTF_8)))
          .build();
  String sql = "SELECT MD5(f_bytes) FROM PCOLLECTION WHERE f_func = 'HashingFn'";
  PCollection<Row> result = boundedInputBytes.apply("testUdf", SqlTransform.query(sql));
  PAssert.that(result).containsInAnyOrder(resultRow1, resultRow2, resultRow3);
  pipeline.run().waitUntilFinish();
}
 
示例14
public static boolean deepEquals(Object a, Object b, Schema.FieldType fieldType) {
  if (a == null || b == null) {
    return a == b;
  } else if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE) {
    return deepEquals(a, b, fieldType.getLogicalType().getBaseType());
  } else if (fieldType.getTypeName() == Schema.TypeName.BYTES) {
    return Arrays.equals((byte[]) a, (byte[]) b);
  } else if (fieldType.getTypeName() == TypeName.ARRAY) {
    return deepEqualsForCollection(
        (Collection<Object>) a, (Collection<Object>) b, fieldType.getCollectionElementType());
  } else if (fieldType.getTypeName() == TypeName.ITERABLE) {
    return deepEqualsForIterable(
        (Iterable<Object>) a, (Iterable<Object>) b, fieldType.getCollectionElementType());
  } else if (fieldType.getTypeName() == Schema.TypeName.MAP) {
    return deepEqualsForMap(
        (Map<Object, Object>) a, (Map<Object, Object>) b, fieldType.getMapValueType());
  } else {
    return Objects.equals(a, b);
  }
}
 
示例15
@Test
public void testConsistentWithEqualsArrayOfArrayOfBytes() throws Exception {
  FieldType fieldType = FieldType.array(FieldType.array(FieldType.BYTES));
  Schema schema = Schema.of(Schema.Field.of("f1", fieldType));
  RowCoder coder = RowCoder.of(schema);

  List<byte[]> innerList1 = Collections.singletonList(new byte[] {1, 2, 3, 4});
  List<List<byte[]>> list1 = Collections.singletonList(innerList1);
  Row row1 = Row.withSchema(schema).addValue(list1).build();

  List<byte[]> innerList2 = Collections.singletonList(new byte[] {1, 2, 3, 4});
  List<List<byte[]>> list2 = Collections.singletonList(innerList2);
  Row row2 = Row.withSchema(schema).addValue(list2).build();

  Assume.assumeTrue(coder.consistentWithEquals());

  CoderProperties.coderConsistentWithEquals(coder, row1, row2);
}
 
示例16
@Test
public void testSelectMapOfRowSelectAll() {
  FieldAccessDescriptor fieldAccessDescriptor =
      FieldAccessDescriptor.withFieldNames("map{}.*").resolve(MAP_SCHEMA);
  Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA, fieldAccessDescriptor);
  Schema expectedSchema =
      Schema.builder()
          .addMapField("field1", FieldType.INT32, FieldType.STRING)
          .addMapField("field2", FieldType.INT32, FieldType.INT32)
          .addMapField("field3", FieldType.INT32, FieldType.DOUBLE)
          .addMapField("field_extra", FieldType.INT32, FieldType.STRING)
          .build();
  assertEquals(expectedSchema, outputSchema);

  Row row = selectRow(MAP_SCHEMA, fieldAccessDescriptor, MAP_ROW);
  Row expectedRow =
      Row.withSchema(expectedSchema)
          .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(0)))
          .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(1)))
          .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(2)))
          .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(3)))
          .build();
  assertEquals(expectedRow, row);
}
 
示例17
@Test
public void testCreatesTable() {
  PubsubJsonTableProvider provider = new PubsubJsonTableProvider();
  Schema messageSchema =
      Schema.builder()
          .addDateTimeField("event_timestamp")
          .addMapField("attributes", VARCHAR, VARCHAR)
          .addRowField("payload", Schema.builder().build())
          .build();

  Table tableDefinition = tableDefinition().schema(messageSchema).build();

  BeamSqlTable pubsubTable = provider.buildBeamSqlTable(tableDefinition);

  assertNotNull(pubsubTable);
  assertEquals(messageSchema, pubsubTable.getSchema());
}
 
示例18
@Test
public void testRightOuterJoin() throws Exception {
  String sql =
      "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
          + " ORDER_DETAILS1 o2 "
          + " RIGHT OUTER JOIN "
          + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
          + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
          + " on "
          + " o1.order_id=o2.order_id";
  PCollection<Row> rows = compilePipeline(sql, pipeline);
  PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
      .containsInAnyOrder(
          TestUtils.RowsBuilder.of(
                  Schema.builder()
                      .addField("order_id", Schema.FieldType.INT32)
                      .addField("sum_site_id", Schema.FieldType.INT32)
                      .addNullableField("buyer", Schema.FieldType.STRING)
                      .build())
              .addRows(1, 3, "james", 2, 5, "bond", 3, 3, null)
              .getStringRows());
  pipeline.run();
}
 
示例19
static boolean iterablesEquivalent(
    Iterable<Object> expected, Iterable<Object> actual, Schema.FieldType elementType) {
  if (expected == actual) {
    return true;
  }
  Iterator<Object> actualIter = actual.iterator();
  for (Object currentExpected : expected) {
    if (!actualIter.hasNext()) {
      return false;
    }
    if (!fieldsEquivalent(currentExpected, actualIter.next(), elementType)) {
      return false;
    }
  }
  return !actualIter.hasNext();
}
 
示例20
private static AddFieldsInformation getAddFieldsInformation(
    Schema.FieldType inputFieldType, Collection<NewField> nestedFields) {
  AddFieldsInformation addFieldsInformation;
  Schema.FieldType fieldType;
  switch (inputFieldType.getTypeName()) {
    case ROW:
      addFieldsInformation =
          getAddFieldsInformation(inputFieldType.getRowSchema(), nestedFields);
      fieldType = addFieldsInformation.getOutputFieldType();
      break;

    case ARRAY:
      addFieldsInformation =
          getAddFieldsInformation(inputFieldType.getCollectionElementType(), nestedFields);
      fieldType = Schema.FieldType.array(addFieldsInformation.getOutputFieldType());
      break;

    case ITERABLE:
      addFieldsInformation =
          getAddFieldsInformation(inputFieldType.getCollectionElementType(), nestedFields);
      fieldType = Schema.FieldType.iterable(addFieldsInformation.getOutputFieldType());
      break;

    case MAP:
      addFieldsInformation =
          getAddFieldsInformation(inputFieldType.getMapValueType(), nestedFields);
      fieldType =
          Schema.FieldType.map(
              inputFieldType.getMapKeyType(), addFieldsInformation.getOutputFieldType());
      break;

    default:
      throw new RuntimeException("Cannot select a subfield of a non-composite type.");
  }
  fieldType = fieldType.withNullable(inputFieldType.getNullable());
  return addFieldsInformation.toBuilder().setOutputFieldType(fieldType).build();
}
 
示例21
@Override
public FeatureRowsBatch decode(InputStream inStream) throws CoderException, IOException {
  Schema schema = schemaCoder.decode(inStream);
  String reference = referenceCoder.decode(inStream);
  return FeatureRowsBatch.fromRow(getDelegateCoder(schema).decode(inStream))
      .withFeatureSetReference(reference);
}
 
示例22
public SelectDoFn(
    FieldAccessDescriptor fieldAccessDescriptor, Schema inputSchema, Schema outputSchema) {
  this.fieldAccessDescriptor = fieldAccessDescriptor;
  this.inputSchema = inputSchema;
  this.outputSchema = outputSchema;
  this.rowSelector = new RowSelectorContainer(inputSchema, fieldAccessDescriptor, true);
}
 
示例23
@Test
public void testTimestampWithZeroTimezone() throws Exception {
  Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT);
  TestTableProvider tableProvider = new TestTableProvider();
  Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());

  // A table with one TIMESTAMP column
  Schema schema = Schema.builder().addDateTimeField("ts").build();
  connection
      .createStatement()
      .executeUpdate("CREATE EXTERNAL TABLE test (ts TIMESTAMP) TYPE 'test'");

  ReadableInstant july1 =
      ISODateTimeFormat.dateTimeParser().parseDateTime("2018-07-01T01:02:03Z");
  tableProvider.addRows("test", Row.withSchema(schema).addValue(july1).build());

  ResultSet selectResult =
      connection.createStatement().executeQuery(String.format("SELECT ts FROM test"));
  selectResult.next();
  Timestamp ts = selectResult.getTimestamp(1, cal);

  assertThat(
      String.format(
          "Wrote %s to a table, but got back %s",
          ISODateTimeFormat.basicDateTime().print(july1),
          ISODateTimeFormat.basicDateTime().print(ts.getTime())),
      ts.getTime(),
      equalTo(july1.getMillis()));
}
 
示例24
@Test
public void testOptionsMessageOnField() {
  Schema schema = ProtoSchemaTranslator.getSchema(Proto3SchemaMessages.OptionMessage.class);
  Schema.Options options = schema.getField("field_one").getOptions();
  Row optionMessage =
      options.getValue("beam:option:proto:field:proto3_schema_options.field_option_message");
  assertEquals("foobar in field", optionMessage.getString("single_string"));
  assertEquals(Integer.valueOf(56), optionMessage.getInt32("single_int32"));
  assertEquals(Long.valueOf(78L), optionMessage.getInt64("single_int64"));
}
 
示例25
/** Get generated getters for an AVRO-generated SpecificRecord or a POJO. */
public static <T> List<FieldValueGetter> getGetters(Class<T> clazz, Schema schema) {
  if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) {
    return JavaBeanUtils.getGetters(
        clazz,
        schema,
        new AvroSpecificRecordFieldValueTypeSupplier(),
        new AvroTypeConversionFactory());
  } else {
    return POJOUtils.getGetters(
        clazz, schema, new AvroPojoFieldValueTypeSupplier(), new AvroTypeConversionFactory());
  }
}
 
示例26
/** Generates an insert statement based on {@link Schema.Field}. * */
static String generateStatement(String tableName, List<Schema.Field> fields) {
  String fieldNames =
      IntStream.range(0, fields.size())
          .mapToObj((index) -> fields.get(index).getName())
          .collect(Collectors.joining(", "));

  String valuePlaceholder =
      IntStream.range(0, fields.size())
          .mapToObj((index) -> "?")
          .collect(Collectors.joining(", "));

  return String.format("INSERT INTO %s(%s) VALUES(%s)", tableName, fieldNames, valuePlaceholder);
}
 
示例27
@SuppressWarnings("unchecked")
public static <T> SerializableFunction<T, Row> getToRowFunction(
    Class<T> clazz, @Nullable org.apache.avro.Schema schema) {
  if (GenericRecord.class.equals(clazz)) {
    Schema beamSchema = toBeamSchema(schema);
    return (SerializableFunction<T, Row>) getGenericRecordToRowFunction(beamSchema);
  } else {
    return new AvroRecordSchema().toRowFunction(TypeDescriptor.of(clazz));
  }
}
 
示例28
@Test
public void testBitOrFunction() throws Exception {
  pipeline.enableAbandonedNodeEnforcement(false);

  Schema schemaInTableA =
      Schema.builder().addInt64Field("f_long").addInt32Field("f_int2").build();

  Schema resultType = Schema.builder().addInt64Field("finalAnswer").build();

  List<Row> rowsInTableA =
      TestUtils.RowsBuilder.of(schemaInTableA)
          .addRows(
              0xF001L, 0,
              0x00A1L, 0,
              44L, 0)
          .getRows();

  String sql = "SELECT bit_or(f_long) as bitor " + "FROM PCOLLECTION GROUP BY f_int2";

  Row rowResult = Row.withSchema(resultType).addValues(61613L).build();

  PCollection<Row> inputRows =
      pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
  PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));

  PAssert.that(result).containsInAnyOrder(rowResult);

  pipeline.run().waitUntilFinish();
}
 
示例29
@Test
public void testRowWithArray() {
  BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
  PCollection<Row> stream =
      BeamSqlRelUtils.toPCollection(
          pipeline,
          sqlEnv.parseQuery(
              "SELECT rowWithArrayTestTable.col.field3[2] FROM rowWithArrayTestTable"));
  PAssert.that(stream)
      .containsInAnyOrder(
          Row.withSchema(Schema.builder().addInt64Field("int64").build()).addValue(6L).build());
  pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
 
示例30
@Test
public void testUnwrapNullableSchemaReordered() {
  org.apache.avro.Schema avroSchema =
      org.apache.avro.Schema.createUnion(
          org.apache.avro.Schema.create(Type.STRING), org.apache.avro.Schema.create(Type.NULL));

  TypeWithNullability typeWithNullability = new TypeWithNullability(avroSchema);
  assertTrue(typeWithNullability.nullable);
  assertEquals(org.apache.avro.Schema.create(Type.STRING), typeWithNullability.type);
}