Java源码示例:org.apache.flink.table.expressions.ItemAt

示例1
private Expression unsupportedPred() {
	return new EqualTo(
		new GetCompositeField(
			new ItemAt(
				new PlannerResolvedFieldReference(
					"list",
					ObjectArrayTypeInfo.getInfoFor(
						Types.ROW_NAMED(new String[] {"int1", "string1"}, Types.INT, Types.STRING))),
				new Literal(1, Types.INT)),
			"int1"),
		new Literal(1, Types.INT)
	);
}
 
示例2
@Test
@SuppressWarnings("unchecked")
public void testApplyPredicate() throws Exception {

	OrcTableSource orc = OrcTableSource.builder()
		.path(getPath(TEST_FILE_NESTED))
		.forOrcSchema(TEST_SCHEMA_NESTED)
		.build();

	// expressions for supported predicates
	Expression pred1 = new GreaterThan(
		new ResolvedFieldReference("int1", Types.INT),
		new Literal(100, Types.INT));
	Expression pred2 = new EqualTo(
		new ResolvedFieldReference("string1", Types.STRING),
		new Literal("hello", Types.STRING));
	// unsupported predicate
	Expression unsupportedPred = new EqualTo(
		new GetCompositeField(
			new ItemAt(
				new ResolvedFieldReference(
					"list",
					ObjectArrayTypeInfo.getInfoFor(
						Types.ROW_NAMED(new String[] {"int1", "string1"}, Types.INT, Types.STRING))),
				new Literal(1, Types.INT)),
			"int1"),
		new Literal(1, Types.INT)
		);
	// invalid predicate
	Expression invalidPred = new EqualTo(
		new ResolvedFieldReference("long1", Types.LONG),
		// some invalid, non-serializable literal (here an object of this test class)
		new Literal(new OrcTableSourceTest(), Types.LONG)
	);

	ArrayList<Expression> preds = new ArrayList<>();
	preds.add(pred1);
	preds.add(pred2);
	preds.add(unsupportedPred);
	preds.add(invalidPred);

	// apply predicates on TableSource
	OrcTableSource projected = (OrcTableSource) orc.applyPredicate(preds);

	// ensure copy is returned
	assertTrue(orc != projected);

	// ensure table schema is identical
	assertEquals(orc.getTableSchema(), projected.getTableSchema());

	// ensure return type is identical
	assertEquals(
		Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()),
		projected.getReturnType());

	// ensure IF is configured with valid/supported predicates
	OrcTableSource spyTS = spy(projected);
	OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class);
	doReturn(mockIF).when(spyTS).buildOrcInputFormat();
	ExecutionEnvironment environment = mock(ExecutionEnvironment.class);
	when(environment.createInput(any(InputFormat.class))).thenReturn(mock(DataSource.class));
	spyTS.getDataSet(environment);

	ArgumentCaptor<OrcRowInputFormat.Predicate> arguments = ArgumentCaptor.forClass(OrcRowInputFormat.Predicate.class);
	verify(mockIF, times(2)).addPredicate(arguments.capture());
	List<String> values = arguments.getAllValues().stream().map(Object::toString).collect(Collectors.toList());
	assertTrue(values.contains(
		new OrcRowInputFormat.Not(new OrcRowInputFormat.LessThanEquals("int1", PredicateLeaf.Type.LONG, 100)).toString()));
	assertTrue(values.contains(
		new OrcRowInputFormat.Equals("string1", PredicateLeaf.Type.STRING, "hello").toString()));

	// ensure filter pushdown is correct
	assertTrue(spyTS.isFilterPushedDown());
	assertFalse(orc.isFilterPushedDown());
}
 
示例3
@Test
@SuppressWarnings("unchecked")
public void testApplyPredicate() throws Exception {

	OrcTableSource orc = OrcTableSource.builder()
		.path(getPath(TEST_FILE_NESTED))
		.forOrcSchema(TEST_SCHEMA_NESTED)
		.build();

	// expressions for supported predicates
	Expression pred1 = new GreaterThan(
		new PlannerResolvedFieldReference("int1", Types.INT),
		new Literal(100, Types.INT));
	Expression pred2 = new EqualTo(
		new PlannerResolvedFieldReference("string1", Types.STRING),
		new Literal("hello", Types.STRING));
	// unsupported predicate
	Expression unsupportedPred = new EqualTo(
		new GetCompositeField(
			new ItemAt(
				new PlannerResolvedFieldReference(
					"list",
					ObjectArrayTypeInfo.getInfoFor(
						Types.ROW_NAMED(new String[] {"int1", "string1"}, Types.INT, Types.STRING))),
				new Literal(1, Types.INT)),
			"int1"),
		new Literal(1, Types.INT)
		);
	// invalid predicate
	Expression invalidPred = new EqualTo(
		new PlannerResolvedFieldReference("long1", Types.LONG),
		// some invalid, non-serializable literal (here an object of this test class)
		new Literal(new OrcTableSourceTest(), Types.LONG)
	);

	ArrayList<Expression> preds = new ArrayList<>();
	preds.add(pred1);
	preds.add(pred2);
	preds.add(unsupportedPred);
	preds.add(invalidPred);

	// apply predicates on TableSource
	OrcTableSource projected = (OrcTableSource) orc.applyPredicate(preds);

	// ensure copy is returned
	assertTrue(orc != projected);

	// ensure table schema is identical
	assertEquals(orc.getTableSchema(), projected.getTableSchema());

	// ensure return type is identical
	assertEquals(
		Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()),
		projected.getReturnType());

	// ensure IF is configured with valid/supported predicates
	OrcTableSource spyTS = spy(projected);
	OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class);
	doReturn(mockIF).when(spyTS).buildOrcInputFormat();
	ExecutionEnvironment environment = mock(ExecutionEnvironment.class);
	when(environment.createInput(any(InputFormat.class))).thenReturn(mock(DataSource.class));
	spyTS.getDataSet(environment);

	ArgumentCaptor<OrcRowInputFormat.Predicate> arguments = ArgumentCaptor.forClass(OrcRowInputFormat.Predicate.class);
	verify(mockIF, times(2)).addPredicate(arguments.capture());
	List<String> values = arguments.getAllValues().stream().map(Object::toString).collect(Collectors.toList());
	assertTrue(values.contains(
		new OrcRowInputFormat.Not(new OrcRowInputFormat.LessThanEquals("int1", PredicateLeaf.Type.LONG, 100)).toString()));
	assertTrue(values.contains(
		new OrcRowInputFormat.Equals("string1", PredicateLeaf.Type.STRING, "hello").toString()));

	// ensure filter pushdown is correct
	assertTrue(spyTS.isFilterPushedDown());
	assertFalse(orc.isFilterPushedDown());
}
 
示例4
@Test
public void testFieldsFilter() throws Exception {
	ParquetTableSource parquetTableSource = createNestedTestParquetTableSource(testPath);

	// expressions for supported predicates
	Expression exp1 = new GreaterThan(
		new PlannerResolvedFieldReference("foo", Types.LONG),
		new Literal(100L, Types.LONG));
	Expression exp2 = new EqualTo(
		new Literal(100L, Types.LONG),
		new PlannerResolvedFieldReference("bar.spam", Types.LONG));

	// unsupported predicate
	Expression unsupported = new EqualTo(
		new GetCompositeField(
			new ItemAt(
				new PlannerResolvedFieldReference(
					"nestedArray",
					ObjectArrayTypeInfo.getInfoFor(
						Types.ROW_NAMED(new String[] {"type", "name"}, Types.STRING, Types.STRING))),
					new Literal(1, Types.INT)),
					"type"),
		new Literal("test", Types.STRING));
	// invalid predicate
	Expression invalidPred = new EqualTo(
		new PlannerResolvedFieldReference("nonField", Types.LONG),
		// some invalid, non-serializable, literal (here an object of this test class)
		new Literal(new ParquetTableSourceTest(), Types.LONG)
	);

	List<Expression> exps = new ArrayList<>();
	exps.add(exp1);
	exps.add(exp2);
	exps.add(unsupported);
	exps.add(invalidPred);

	// apply predict on TableSource
	ParquetTableSource filtered = (ParquetTableSource) parquetTableSource.applyPredicate(exps);

	// ensure copy is returned
	assertNotSame(parquetTableSource, filtered);

	// ensure table schema is identical
	assertEquals(parquetTableSource.getTableSchema(), filtered.getTableSchema());

	// ensure return type is identical
	assertEquals(NESTED_ROW_TYPE, filtered.getReturnType());

	// ensure source description is not the same
	assertNotEquals(parquetTableSource.explainSource(), filtered.explainSource());

	// check that pushdown was recorded
	assertTrue(filtered.isFilterPushedDown());
	assertFalse(parquetTableSource.isFilterPushedDown());

	// ensure that supported predicates were removed from list of offered expressions
	assertEquals(2, exps.size());
	assertTrue(exps.contains(unsupported));
	assertTrue(exps.contains(invalidPred));

	// ensure ParquetInputFormat is correctly configured with filter
	DataSet<Row> data = filtered.getDataSet(ExecutionEnvironment.createLocalEnvironment());
	InputFormat<Row, ?> inputFormat = ((DataSource<Row>) data).getInputFormat();
	assertTrue(inputFormat instanceof ParquetRowInputFormat);
	ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) inputFormat;

	// expected predicate
	FilterPredicate a = FilterApi.gt(FilterApi.longColumn("foo"), 100L);
	FilterPredicate b = FilterApi.eq(FilterApi.longColumn("bar.spam"), 100L);
	FilterPredicate expected = FilterApi.and(a, b);
	// actual predicate
	FilterPredicate predicate = parquetIF.getPredicate();
	// check predicate
	assertEquals(expected, predicate);
}
 
示例5
@Test
public void testFieldsFilter() throws Exception {
	ParquetTableSource parquetTableSource = createNestedTestParquetTableSource(testPath);

	// expressions for supported predicates
	Expression exp1 = new GreaterThan(
		new PlannerResolvedFieldReference("foo", Types.LONG),
		new Literal(100L, Types.LONG));
	Expression exp2 = new EqualTo(
		new Literal(100L, Types.LONG),
		new PlannerResolvedFieldReference("bar.spam", Types.LONG));

	// unsupported predicate
	Expression unsupported = new EqualTo(
		new GetCompositeField(
			new ItemAt(
				new PlannerResolvedFieldReference(
					"nestedArray",
					ObjectArrayTypeInfo.getInfoFor(
						Types.ROW_NAMED(new String[] {"type", "name"}, Types.STRING, Types.STRING))),
					new Literal(1, Types.INT)),
					"type"),
		new Literal("test", Types.STRING));
	// invalid predicate
	Expression invalidPred = new EqualTo(
		new PlannerResolvedFieldReference("nonField", Types.LONG),
		// some invalid, non-serializable, literal (here an object of this test class)
		new Literal(new ParquetTableSourceTest(), Types.LONG)
	);

	List<Expression> exps = new ArrayList<>();
	exps.add(exp1);
	exps.add(exp2);
	exps.add(unsupported);
	exps.add(invalidPred);

	// apply predict on TableSource
	ParquetTableSource filtered = (ParquetTableSource) parquetTableSource.applyPredicate(exps);

	// ensure copy is returned
	assertNotSame(parquetTableSource, filtered);

	// ensure table schema is identical
	assertEquals(parquetTableSource.getTableSchema(), filtered.getTableSchema());

	// ensure return type is identical
	assertEquals(NESTED_ROW_TYPE, filtered.getReturnType());

	// ensure source description is not the same
	assertNotEquals(parquetTableSource.explainSource(), filtered.explainSource());

	// check that pushdown was recorded
	assertTrue(filtered.isFilterPushedDown());
	assertFalse(parquetTableSource.isFilterPushedDown());

	// ensure that supported predicates were removed from list of offered expressions
	assertEquals(2, exps.size());
	assertTrue(exps.contains(unsupported));
	assertTrue(exps.contains(invalidPred));

	// ensure ParquetInputFormat is correctly configured with filter
	DataSet<Row> data = filtered.getDataSet(ExecutionEnvironment.createLocalEnvironment());
	InputFormat<Row, ?> inputFormat = ((DataSource<Row>) data).getInputFormat();
	assertTrue(inputFormat instanceof ParquetRowInputFormat);
	ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) inputFormat;

	// expected predicate
	FilterPredicate a = FilterApi.gt(FilterApi.longColumn("foo"), 100L);
	FilterPredicate b = FilterApi.eq(FilterApi.longColumn("bar.spam"), 100L);
	FilterPredicate expected = FilterApi.and(a, b);
	// actual predicate
	FilterPredicate predicate = parquetIF.getPredicate();
	// check predicate
	assertEquals(expected, predicate);
}