Java源码示例:org.apache.flink.table.expressions.ItemAt
示例1
private Expression unsupportedPred() {
return new EqualTo(
new GetCompositeField(
new ItemAt(
new PlannerResolvedFieldReference(
"list",
ObjectArrayTypeInfo.getInfoFor(
Types.ROW_NAMED(new String[] {"int1", "string1"}, Types.INT, Types.STRING))),
new Literal(1, Types.INT)),
"int1"),
new Literal(1, Types.INT)
);
}
示例2
@Test
@SuppressWarnings("unchecked")
public void testApplyPredicate() throws Exception {
OrcTableSource orc = OrcTableSource.builder()
.path(getPath(TEST_FILE_NESTED))
.forOrcSchema(TEST_SCHEMA_NESTED)
.build();
// expressions for supported predicates
Expression pred1 = new GreaterThan(
new ResolvedFieldReference("int1", Types.INT),
new Literal(100, Types.INT));
Expression pred2 = new EqualTo(
new ResolvedFieldReference("string1", Types.STRING),
new Literal("hello", Types.STRING));
// unsupported predicate
Expression unsupportedPred = new EqualTo(
new GetCompositeField(
new ItemAt(
new ResolvedFieldReference(
"list",
ObjectArrayTypeInfo.getInfoFor(
Types.ROW_NAMED(new String[] {"int1", "string1"}, Types.INT, Types.STRING))),
new Literal(1, Types.INT)),
"int1"),
new Literal(1, Types.INT)
);
// invalid predicate
Expression invalidPred = new EqualTo(
new ResolvedFieldReference("long1", Types.LONG),
// some invalid, non-serializable literal (here an object of this test class)
new Literal(new OrcTableSourceTest(), Types.LONG)
);
ArrayList<Expression> preds = new ArrayList<>();
preds.add(pred1);
preds.add(pred2);
preds.add(unsupportedPred);
preds.add(invalidPred);
// apply predicates on TableSource
OrcTableSource projected = (OrcTableSource) orc.applyPredicate(preds);
// ensure copy is returned
assertTrue(orc != projected);
// ensure table schema is identical
assertEquals(orc.getTableSchema(), projected.getTableSchema());
// ensure return type is identical
assertEquals(
Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()),
projected.getReturnType());
// ensure IF is configured with valid/supported predicates
OrcTableSource spyTS = spy(projected);
OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class);
doReturn(mockIF).when(spyTS).buildOrcInputFormat();
ExecutionEnvironment environment = mock(ExecutionEnvironment.class);
when(environment.createInput(any(InputFormat.class))).thenReturn(mock(DataSource.class));
spyTS.getDataSet(environment);
ArgumentCaptor<OrcRowInputFormat.Predicate> arguments = ArgumentCaptor.forClass(OrcRowInputFormat.Predicate.class);
verify(mockIF, times(2)).addPredicate(arguments.capture());
List<String> values = arguments.getAllValues().stream().map(Object::toString).collect(Collectors.toList());
assertTrue(values.contains(
new OrcRowInputFormat.Not(new OrcRowInputFormat.LessThanEquals("int1", PredicateLeaf.Type.LONG, 100)).toString()));
assertTrue(values.contains(
new OrcRowInputFormat.Equals("string1", PredicateLeaf.Type.STRING, "hello").toString()));
// ensure filter pushdown is correct
assertTrue(spyTS.isFilterPushedDown());
assertFalse(orc.isFilterPushedDown());
}
示例3
@Test
@SuppressWarnings("unchecked")
public void testApplyPredicate() throws Exception {
OrcTableSource orc = OrcTableSource.builder()
.path(getPath(TEST_FILE_NESTED))
.forOrcSchema(TEST_SCHEMA_NESTED)
.build();
// expressions for supported predicates
Expression pred1 = new GreaterThan(
new PlannerResolvedFieldReference("int1", Types.INT),
new Literal(100, Types.INT));
Expression pred2 = new EqualTo(
new PlannerResolvedFieldReference("string1", Types.STRING),
new Literal("hello", Types.STRING));
// unsupported predicate
Expression unsupportedPred = new EqualTo(
new GetCompositeField(
new ItemAt(
new PlannerResolvedFieldReference(
"list",
ObjectArrayTypeInfo.getInfoFor(
Types.ROW_NAMED(new String[] {"int1", "string1"}, Types.INT, Types.STRING))),
new Literal(1, Types.INT)),
"int1"),
new Literal(1, Types.INT)
);
// invalid predicate
Expression invalidPred = new EqualTo(
new PlannerResolvedFieldReference("long1", Types.LONG),
// some invalid, non-serializable literal (here an object of this test class)
new Literal(new OrcTableSourceTest(), Types.LONG)
);
ArrayList<Expression> preds = new ArrayList<>();
preds.add(pred1);
preds.add(pred2);
preds.add(unsupportedPred);
preds.add(invalidPred);
// apply predicates on TableSource
OrcTableSource projected = (OrcTableSource) orc.applyPredicate(preds);
// ensure copy is returned
assertTrue(orc != projected);
// ensure table schema is identical
assertEquals(orc.getTableSchema(), projected.getTableSchema());
// ensure return type is identical
assertEquals(
Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()),
projected.getReturnType());
// ensure IF is configured with valid/supported predicates
OrcTableSource spyTS = spy(projected);
OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class);
doReturn(mockIF).when(spyTS).buildOrcInputFormat();
ExecutionEnvironment environment = mock(ExecutionEnvironment.class);
when(environment.createInput(any(InputFormat.class))).thenReturn(mock(DataSource.class));
spyTS.getDataSet(environment);
ArgumentCaptor<OrcRowInputFormat.Predicate> arguments = ArgumentCaptor.forClass(OrcRowInputFormat.Predicate.class);
verify(mockIF, times(2)).addPredicate(arguments.capture());
List<String> values = arguments.getAllValues().stream().map(Object::toString).collect(Collectors.toList());
assertTrue(values.contains(
new OrcRowInputFormat.Not(new OrcRowInputFormat.LessThanEquals("int1", PredicateLeaf.Type.LONG, 100)).toString()));
assertTrue(values.contains(
new OrcRowInputFormat.Equals("string1", PredicateLeaf.Type.STRING, "hello").toString()));
// ensure filter pushdown is correct
assertTrue(spyTS.isFilterPushedDown());
assertFalse(orc.isFilterPushedDown());
}
示例4
@Test
public void testFieldsFilter() throws Exception {
ParquetTableSource parquetTableSource = createNestedTestParquetTableSource(testPath);
// expressions for supported predicates
Expression exp1 = new GreaterThan(
new PlannerResolvedFieldReference("foo", Types.LONG),
new Literal(100L, Types.LONG));
Expression exp2 = new EqualTo(
new Literal(100L, Types.LONG),
new PlannerResolvedFieldReference("bar.spam", Types.LONG));
// unsupported predicate
Expression unsupported = new EqualTo(
new GetCompositeField(
new ItemAt(
new PlannerResolvedFieldReference(
"nestedArray",
ObjectArrayTypeInfo.getInfoFor(
Types.ROW_NAMED(new String[] {"type", "name"}, Types.STRING, Types.STRING))),
new Literal(1, Types.INT)),
"type"),
new Literal("test", Types.STRING));
// invalid predicate
Expression invalidPred = new EqualTo(
new PlannerResolvedFieldReference("nonField", Types.LONG),
// some invalid, non-serializable, literal (here an object of this test class)
new Literal(new ParquetTableSourceTest(), Types.LONG)
);
List<Expression> exps = new ArrayList<>();
exps.add(exp1);
exps.add(exp2);
exps.add(unsupported);
exps.add(invalidPred);
// apply predict on TableSource
ParquetTableSource filtered = (ParquetTableSource) parquetTableSource.applyPredicate(exps);
// ensure copy is returned
assertNotSame(parquetTableSource, filtered);
// ensure table schema is identical
assertEquals(parquetTableSource.getTableSchema(), filtered.getTableSchema());
// ensure return type is identical
assertEquals(NESTED_ROW_TYPE, filtered.getReturnType());
// ensure source description is not the same
assertNotEquals(parquetTableSource.explainSource(), filtered.explainSource());
// check that pushdown was recorded
assertTrue(filtered.isFilterPushedDown());
assertFalse(parquetTableSource.isFilterPushedDown());
// ensure that supported predicates were removed from list of offered expressions
assertEquals(2, exps.size());
assertTrue(exps.contains(unsupported));
assertTrue(exps.contains(invalidPred));
// ensure ParquetInputFormat is correctly configured with filter
DataSet<Row> data = filtered.getDataSet(ExecutionEnvironment.createLocalEnvironment());
InputFormat<Row, ?> inputFormat = ((DataSource<Row>) data).getInputFormat();
assertTrue(inputFormat instanceof ParquetRowInputFormat);
ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) inputFormat;
// expected predicate
FilterPredicate a = FilterApi.gt(FilterApi.longColumn("foo"), 100L);
FilterPredicate b = FilterApi.eq(FilterApi.longColumn("bar.spam"), 100L);
FilterPredicate expected = FilterApi.and(a, b);
// actual predicate
FilterPredicate predicate = parquetIF.getPredicate();
// check predicate
assertEquals(expected, predicate);
}
示例5
@Test
public void testFieldsFilter() throws Exception {
ParquetTableSource parquetTableSource = createNestedTestParquetTableSource(testPath);
// expressions for supported predicates
Expression exp1 = new GreaterThan(
new PlannerResolvedFieldReference("foo", Types.LONG),
new Literal(100L, Types.LONG));
Expression exp2 = new EqualTo(
new Literal(100L, Types.LONG),
new PlannerResolvedFieldReference("bar.spam", Types.LONG));
// unsupported predicate
Expression unsupported = new EqualTo(
new GetCompositeField(
new ItemAt(
new PlannerResolvedFieldReference(
"nestedArray",
ObjectArrayTypeInfo.getInfoFor(
Types.ROW_NAMED(new String[] {"type", "name"}, Types.STRING, Types.STRING))),
new Literal(1, Types.INT)),
"type"),
new Literal("test", Types.STRING));
// invalid predicate
Expression invalidPred = new EqualTo(
new PlannerResolvedFieldReference("nonField", Types.LONG),
// some invalid, non-serializable, literal (here an object of this test class)
new Literal(new ParquetTableSourceTest(), Types.LONG)
);
List<Expression> exps = new ArrayList<>();
exps.add(exp1);
exps.add(exp2);
exps.add(unsupported);
exps.add(invalidPred);
// apply predict on TableSource
ParquetTableSource filtered = (ParquetTableSource) parquetTableSource.applyPredicate(exps);
// ensure copy is returned
assertNotSame(parquetTableSource, filtered);
// ensure table schema is identical
assertEquals(parquetTableSource.getTableSchema(), filtered.getTableSchema());
// ensure return type is identical
assertEquals(NESTED_ROW_TYPE, filtered.getReturnType());
// ensure source description is not the same
assertNotEquals(parquetTableSource.explainSource(), filtered.explainSource());
// check that pushdown was recorded
assertTrue(filtered.isFilterPushedDown());
assertFalse(parquetTableSource.isFilterPushedDown());
// ensure that supported predicates were removed from list of offered expressions
assertEquals(2, exps.size());
assertTrue(exps.contains(unsupported));
assertTrue(exps.contains(invalidPred));
// ensure ParquetInputFormat is correctly configured with filter
DataSet<Row> data = filtered.getDataSet(ExecutionEnvironment.createLocalEnvironment());
InputFormat<Row, ?> inputFormat = ((DataSource<Row>) data).getInputFormat();
assertTrue(inputFormat instanceof ParquetRowInputFormat);
ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) inputFormat;
// expected predicate
FilterPredicate a = FilterApi.gt(FilterApi.longColumn("foo"), 100L);
FilterPredicate b = FilterApi.eq(FilterApi.longColumn("bar.spam"), 100L);
FilterPredicate expected = FilterApi.and(a, b);
// actual predicate
FilterPredicate predicate = parquetIF.getPredicate();
// check predicate
assertEquals(expected, predicate);
}