Java源码示例:org.apache.flink.api.common.operators.SingleInputOperator

示例1
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {

	// Local properties for GroupCombine may only be preserved on key fields.
	SingleInputSemanticProperties origProps =
			((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
	SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
	FieldSet readSet = origProps.getReadFields(0);
	if(readSet != null) {
		filteredProps.addReadFields(readSet);
	}

	// only add forward field information for key fields
	if(this.keys != null) {
		for (int f : this.keys) {
			FieldSet targets = origProps.getForwardingTargetFields(0, f);
			for (int t : targets) {
				filteredProps.addForwardedField(f, t);
			}
		}
	}
	return filteredProps;
}
 
示例2
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {

	// Local properties for GroupCombine may only be preserved on key fields.
	SingleInputSemanticProperties origProps =
			((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
	SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
	FieldSet readSet = origProps.getReadFields(0);
	if(readSet != null) {
		filteredProps.addReadFields(readSet);
	}

	// only add forward field information for key fields
	if(this.keys != null) {
		for (int f : this.keys) {
			FieldSet targets = origProps.getForwardingTargetFields(0, f);
			for (int t : targets) {
				filteredProps.addForwardedField(f, t);
			}
		}
	}
	return filteredProps;
}
 
示例3
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {

	// Local properties for GroupCombine may only be preserved on key fields.
	SingleInputSemanticProperties origProps =
			((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
	SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
	FieldSet readSet = origProps.getReadFields(0);
	if(readSet != null) {
		filteredProps.addReadFields(readSet);
	}

	// only add forward field information for key fields
	if(this.keys != null) {
		for (int f : this.keys) {
			FieldSet targets = origProps.getForwardingTargetFields(0, f);
			for (int t : targets) {
				filteredProps.addForwardedField(f, t);
			}
		}
	}
	return filteredProps;
}
 
示例4
/**
 * Creates a new node with a single input for the optimizer plan.
 * 
 * @param programOperator The PACT that the node represents.
 */
protected SingleInputNode(SingleInputOperator<?, ?, ?> programOperator) {
	super(programOperator);
	
	int[] k = programOperator.getKeyColumns(0);
	this.keys = k == null || k.length == 0 ? null : new FieldSet(k);
}
 
示例5
public UnaryOperatorNode(String name, SingleInputOperator<?, ?, ?> operator, boolean onDynamicPath) {
	super(operator);

	this.name = name;
	this.operators = new ArrayList<>();
	this.onDynamicPath = onDynamicPath;
}
 
示例6
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {

	// Local properties for MapPartition may not be preserved.
	SingleInputSemanticProperties origProps =
			((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
	SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
	FieldSet readSet = origProps.getReadFields(0);
	if(readSet != null) {
		filteredProps.addReadFields(readSet);
	}

	return filteredProps;
}
 
示例7
/**
 * Creates a new node with a single input for the optimizer plan.
 * 
 * @param programOperator The PACT that the node represents.
 */
protected SingleInputNode(SingleInputOperator<?, ?, ?> programOperator) {
	super(programOperator);
	
	int[] k = programOperator.getKeyColumns(0);
	this.keys = k == null || k.length == 0 ? null : new FieldSet(k);
}
 
示例8
public UnaryOperatorNode(String name, SingleInputOperator<?, ?, ?> operator, boolean onDynamicPath) {
	super(operator);

	this.name = name;
	this.operators = new ArrayList<>();
	this.onDynamicPath = onDynamicPath;
}
 
示例9
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {

	// Local properties for MapPartition may not be preserved.
	SingleInputSemanticProperties origProps =
			((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
	SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
	FieldSet readSet = origProps.getReadFields(0);
	if(readSet != null) {
		filteredProps.addReadFields(readSet);
	}

	return filteredProps;
}
 
示例10
/**
 * Creates a new node with a single input for the optimizer plan.
 * 
 * @param programOperator The PACT that the node represents.
 */
protected SingleInputNode(SingleInputOperator<?, ?, ?> programOperator) {
	super(programOperator);
	
	int[] k = programOperator.getKeyColumns(0);
	this.keys = k == null || k.length == 0 ? null : new FieldSet(k);
}
 
示例11
public UnaryOperatorNode(String name, SingleInputOperator<?, ?, ?> operator, boolean onDynamicPath) {
	super(operator);

	this.name = name;
	this.operators = new ArrayList<>();
	this.onDynamicPath = onDynamicPath;
}
 
示例12
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {

	// Local properties for MapPartition may not be preserved.
	SingleInputSemanticProperties origProps =
			((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
	SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
	FieldSet readSet = origProps.getReadFields(0);
	if(readSet != null) {
		filteredProps.addReadFields(readSet);
	}

	return filteredProps;
}
 
示例13
private void testQueryGeneric(Plan p, long orderSize, long lineitemSize,
		float orderSelectivity, float joinSelectivity,
		boolean broadcastOkay, boolean partitionedOkay,
		boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay) {
	try {
		// set statistics
		OperatorResolver cr = getContractResolver(p);
		GenericDataSourceBase<?, ?> ordersSource = cr.getNode(ORDERS);
		GenericDataSourceBase<?, ?> lineItemSource = cr.getNode(LINEITEM);
		SingleInputOperator<?, ?, ?> mapper = cr.getNode(MAPPER_NAME);
		DualInputOperator<?, ?, ?, ?> joiner = cr.getNode(JOIN_NAME);
		setSourceStatistics(ordersSource, orderSize, 100f);
		setSourceStatistics(lineItemSource, lineitemSize, 140f);
		mapper.getCompilerHints().setAvgOutputRecordSize(16f);
		mapper.getCompilerHints().setFilterFactor(orderSelectivity);
		joiner.getCompilerHints().setFilterFactor(joinSelectivity);

		// compile
		final OptimizedPlan plan = compileWithStats(p);
		final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);

		// get the nodes from the final plan
		final SinkPlanNode sink = or.getNode(SINK);
		final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
		final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
				(SingleInputPlanNode) reducer.getPredecessor() : null;
		final DualInputPlanNode join = or.getNode(JOIN_NAME);
		final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);

		checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);

		// check the possible variants and that the variant ia allowed in this specific setting
		if (checkBroadcastShipStrategies(join, reducer, combiner)) {
			Assert.assertTrue("Broadcast join incorrectly chosen.", broadcastOkay);

			if (checkHashJoinStrategies(join, reducer, true)) {
				Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
			} else if (checkHashJoinStrategies(join, reducer, false)) {
				Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
			} else if (checkBroadcastMergeJoin(join, reducer)) {
				Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
			} else {
				Assert.fail("Plan has no correct hash join or merge join strategies.");
			}
		}
		else if (checkRepartitionShipStrategies(join, reducer, combiner)) {
			Assert.assertTrue("Partitioned join incorrectly chosen.", partitionedOkay);

			if (checkHashJoinStrategies(join, reducer, true)) {
				Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
			} else if (checkHashJoinStrategies(join, reducer, false)) {
				Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
			} else if (checkRepartitionMergeJoin(join, reducer)) {
				Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
			} else {
				Assert.fail("Plan has no correct hash join or merge join strategies.");
			}
		} else {
			Assert.fail("Plan has neither correct BC join or partitioned join configuration.");
		}
	} catch (Exception e) {
		e.printStackTrace();
		Assert.fail(e.getMessage());
	}
}
 
示例14
@Test
public void translateUnion2Group() {
	try {
		final int parallelism = 4;
		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 3);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 2);

		dataset1.union(dataset2)
				.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
				.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
				.returns(String.class)
				.output(new DiscardingOutputFormat<>());

		Plan p = env.createProgramPlan();

		// The plan should look like the following one.
		//
		// DataSet1(3) - MapOperator(3)-+
		//	                            |- Union(-1) - SingleInputOperator - Sink
		// DataSet2(2) - MapOperator(2)-+

		GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
		Union unionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();

		// The key mappers should be added to both of the two input streams for union.
		assertTrue(unionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
		assertTrue(unionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);

		// The parallelisms of the key mappers should be equal to those of their inputs.
		assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
		assertEquals(unionOperator.getSecondInput().getParallelism(), 2);

		// The union should always have the default parallelism.
		assertEquals(unionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail("Test caused an error: " + e.getMessage());
	}
}
 
示例15
@Test
public void translateUnion3SortedGroup() {
	try {
		final int parallelism = 4;
		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 2);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 3);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset3 = getSourceDataSet(env, -1);

		dataset1.union(dataset2).union(dataset3)
				.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
				.sortGroup((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "", Order.ASCENDING)
				.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
				.returns(String.class)
				.output(new DiscardingOutputFormat<>());

		Plan p = env.createProgramPlan();

		// The plan should look like the following one.
		//
		// DataSet1(2) - MapOperator(2)-+
		//	                            |- Union(-1) -+
		// DataSet2(3) - MapOperator(3)-+             |- Union(-1) - SingleInputOperator - Sink
		//                                            |
		//             DataSet3(-1) - MapOperator(-1)-+

		GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
		Union secondUnionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();

		// The first input of the second union should be the first union.
		Union firstUnionOperator = (Union) secondUnionOperator.getFirstInput();

		// The key mapper should be added to the second input stream of the second union.
		assertTrue(secondUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);

		// The key mappers should be added to both of the two input streams for the first union.
		assertTrue(firstUnionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
		assertTrue(firstUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);

		// The parallelisms of the key mappers should be equal to those of their inputs.
		assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
		assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
		assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);

		// The union should always have the default parallelism.
		assertEquals(secondUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
		assertEquals(firstUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail("Test caused an error: " + e.getMessage());
	}
}
 
示例16
@Override
public SingleInputOperator<?, ?, ?> getOperator() {
	return (SingleInputOperator<?, ?, ?>) super.getOperator();
}
 
示例17
@Override
public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode)
		throws CompilerException
{
	// see if an internal hint dictates the strategy to use
	final Configuration conf = getOperator().getParameters();
	final String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
	final ShipStrategyType preSet;
	
	if (shipStrategy != null) {
		if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) {
			preSet = ShipStrategyType.PARTITION_HASH;
		} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) {
			preSet = ShipStrategyType.PARTITION_RANGE;
		} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) {
			preSet = ShipStrategyType.FORWARD;
		} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
			preSet = ShipStrategyType.PARTITION_RANDOM;
		} else {
			throw new CompilerException("Unrecognized ship strategy hint: " + shipStrategy);
		}
	} else {
		preSet = null;
	}
	
	// get the predecessor node
	Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput();
	
	OptimizerNode pred;
	DagConnection conn;
	if (children == null) {
		throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input.");
	} else {
		pred = contractToNode.get(children);
		conn = new DagConnection(pred, this, defaultExchangeMode);
		if (preSet != null) {
			conn.setShipStrategy(preSet);
		}
	}
	
	// create the connection and add it
	setIncomingConnection(conn);
	pred.addOutgoingConnection(conn);
}
 
示例18
/**
 * Creates a new MapNode for the given operator.
 * 
 * @param operator The map operator.
 */
public MapNode(SingleInputOperator<?, ?, ?> operator) {
	super(operator);
	
	this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapDescriptor());
}
 
示例19
/**
 * Creates a new MapNode for the given contract.
 * 
 * @param operator The map partition contract object.
 */
public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
	super(operator);
	
	this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
}
 
示例20
private void testQueryGeneric(Plan p, long orderSize, long lineitemSize,
		float orderSelectivity, float joinSelectivity,
		boolean broadcastOkay, boolean partitionedOkay,
		boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay) {
	try {
		// set statistics
		OperatorResolver cr = getContractResolver(p);
		GenericDataSourceBase<?, ?> ordersSource = cr.getNode(ORDERS);
		GenericDataSourceBase<?, ?> lineItemSource = cr.getNode(LINEITEM);
		SingleInputOperator<?, ?, ?> mapper = cr.getNode(MAPPER_NAME);
		DualInputOperator<?, ?, ?, ?> joiner = cr.getNode(JOIN_NAME);
		setSourceStatistics(ordersSource, orderSize, 100f);
		setSourceStatistics(lineItemSource, lineitemSize, 140f);
		mapper.getCompilerHints().setAvgOutputRecordSize(16f);
		mapper.getCompilerHints().setFilterFactor(orderSelectivity);
		joiner.getCompilerHints().setFilterFactor(joinSelectivity);

		// compile
		final OptimizedPlan plan = compileWithStats(p);
		final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);

		// get the nodes from the final plan
		final SinkPlanNode sink = or.getNode(SINK);
		final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
		final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
				(SingleInputPlanNode) reducer.getPredecessor() : null;
		final DualInputPlanNode join = or.getNode(JOIN_NAME);
		final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);

		checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);

		// check the possible variants and that the variant ia allowed in this specific setting
		if (checkBroadcastShipStrategies(join, reducer, combiner)) {
			Assert.assertTrue("Broadcast join incorrectly chosen.", broadcastOkay);

			if (checkHashJoinStrategies(join, reducer, true)) {
				Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
			} else if (checkHashJoinStrategies(join, reducer, false)) {
				Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
			} else if (checkBroadcastMergeJoin(join, reducer)) {
				Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
			} else {
				Assert.fail("Plan has no correct hash join or merge join strategies.");
			}
		}
		else if (checkRepartitionShipStrategies(join, reducer, combiner)) {
			Assert.assertTrue("Partitioned join incorrectly chosen.", partitionedOkay);

			if (checkHashJoinStrategies(join, reducer, true)) {
				Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
			} else if (checkHashJoinStrategies(join, reducer, false)) {
				Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
			} else if (checkRepartitionMergeJoin(join, reducer)) {
				Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
			} else {
				Assert.fail("Plan has no correct hash join or merge join strategies.");
			}
		} else {
			Assert.fail("Plan has neither correct BC join or partitioned join configuration.");
		}
	} catch (Exception e) {
		e.printStackTrace();
		Assert.fail(e.getMessage());
	}
}
 
示例21
@Test
public void translateUnion2Group() {
	try {
		final int parallelism = 4;
		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 3);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 2);

		dataset1.union(dataset2)
				.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
				.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
				.returns(String.class)
				.output(new DiscardingOutputFormat<>());

		Plan p = env.createProgramPlan();

		// The plan should look like the following one.
		//
		// DataSet1(3) - MapOperator(3)-+
		//	                            |- Union(-1) - SingleInputOperator - Sink
		// DataSet2(2) - MapOperator(2)-+

		GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
		Union unionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();

		// The key mappers should be added to both of the two input streams for union.
		assertTrue(unionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
		assertTrue(unionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);

		// The parallelisms of the key mappers should be equal to those of their inputs.
		assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
		assertEquals(unionOperator.getSecondInput().getParallelism(), 2);

		// The union should always have the default parallelism.
		assertEquals(unionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail("Test caused an error: " + e.getMessage());
	}
}
 
示例22
@Test
public void translateUnion3SortedGroup() {
	try {
		final int parallelism = 4;
		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 2);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 3);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset3 = getSourceDataSet(env, -1);

		dataset1.union(dataset2).union(dataset3)
				.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
				.sortGroup((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "", Order.ASCENDING)
				.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
				.returns(String.class)
				.output(new DiscardingOutputFormat<>());

		Plan p = env.createProgramPlan();

		// The plan should look like the following one.
		//
		// DataSet1(2) - MapOperator(2)-+
		//	                            |- Union(-1) -+
		// DataSet2(3) - MapOperator(3)-+             |- Union(-1) - SingleInputOperator - Sink
		//                                            |
		//             DataSet3(-1) - MapOperator(-1)-+

		GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
		Union secondUnionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();

		// The first input of the second union should be the first union.
		Union firstUnionOperator = (Union) secondUnionOperator.getFirstInput();

		// The key mapper should be added to the second input stream of the second union.
		assertTrue(secondUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);

		// The key mappers should be added to both of the two input streams for the first union.
		assertTrue(firstUnionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
		assertTrue(firstUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);

		// The parallelisms of the key mappers should be equal to those of their inputs.
		assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
		assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
		assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);

		// The union should always have the default parallelism.
		assertEquals(secondUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
		assertEquals(firstUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail("Test caused an error: " + e.getMessage());
	}
}
 
示例23
@Override
public SingleInputOperator<?, ?, ?> getOperator() {
	return (SingleInputOperator<?, ?, ?>) super.getOperator();
}
 
示例24
@Override
public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode)
		throws CompilerException
{
	// see if an internal hint dictates the strategy to use
	final Configuration conf = getOperator().getParameters();
	final String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
	final ShipStrategyType preSet;
	
	if (shipStrategy != null) {
		if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) {
			preSet = ShipStrategyType.PARTITION_HASH;
		} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) {
			preSet = ShipStrategyType.PARTITION_RANGE;
		} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) {
			preSet = ShipStrategyType.FORWARD;
		} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
			preSet = ShipStrategyType.PARTITION_RANDOM;
		} else {
			throw new CompilerException("Unrecognized ship strategy hint: " + shipStrategy);
		}
	} else {
		preSet = null;
	}
	
	// get the predecessor node
	Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput();
	
	OptimizerNode pred;
	DagConnection conn;
	if (children == null) {
		throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input.");
	} else {
		pred = contractToNode.get(children);
		conn = new DagConnection(pred, this, defaultExchangeMode);
		if (preSet != null) {
			conn.setShipStrategy(preSet);
		}
	}
	
	// create the connection and add it
	setIncomingConnection(conn);
	pred.addOutgoingConnection(conn);
}
 
示例25
/**
 * Creates a new MapNode for the given operator.
 * 
 * @param operator The map operator.
 */
public MapNode(SingleInputOperator<?, ?, ?> operator) {
	super(operator);
	
	this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapDescriptor());
}
 
示例26
/**
 * Creates a new MapNode for the given contract.
 * 
 * @param operator The map partition contract object.
 */
public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
	super(operator);
	
	this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
}
 
示例27
private void testQueryGeneric(Plan p, long orderSize, long lineitemSize,
		float orderSelectivity, float joinSelectivity,
		boolean broadcastOkay, boolean partitionedOkay,
		boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay) {
	try {
		// set statistics
		OperatorResolver cr = getContractResolver(p);
		GenericDataSourceBase<?, ?> ordersSource = cr.getNode(ORDERS);
		GenericDataSourceBase<?, ?> lineItemSource = cr.getNode(LINEITEM);
		SingleInputOperator<?, ?, ?> mapper = cr.getNode(MAPPER_NAME);
		DualInputOperator<?, ?, ?, ?> joiner = cr.getNode(JOIN_NAME);
		setSourceStatistics(ordersSource, orderSize, 100f);
		setSourceStatistics(lineItemSource, lineitemSize, 140f);
		mapper.getCompilerHints().setAvgOutputRecordSize(16f);
		mapper.getCompilerHints().setFilterFactor(orderSelectivity);
		joiner.getCompilerHints().setFilterFactor(joinSelectivity);

		// compile
		final OptimizedPlan plan = compileWithStats(p);
		final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);

		// get the nodes from the final plan
		final SinkPlanNode sink = or.getNode(SINK);
		final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
		final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
				(SingleInputPlanNode) reducer.getPredecessor() : null;
		final DualInputPlanNode join = or.getNode(JOIN_NAME);
		final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);

		checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);

		// check the possible variants and that the variant ia allowed in this specific setting
		if (checkBroadcastShipStrategies(join, reducer, combiner)) {
			Assert.assertTrue("Broadcast join incorrectly chosen.", broadcastOkay);

			if (checkHashJoinStrategies(join, reducer, true)) {
				Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
			} else if (checkHashJoinStrategies(join, reducer, false)) {
				Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
			} else if (checkBroadcastMergeJoin(join, reducer)) {
				Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
			} else {
				Assert.fail("Plan has no correct hash join or merge join strategies.");
			}
		}
		else if (checkRepartitionShipStrategies(join, reducer, combiner)) {
			Assert.assertTrue("Partitioned join incorrectly chosen.", partitionedOkay);

			if (checkHashJoinStrategies(join, reducer, true)) {
				Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
			} else if (checkHashJoinStrategies(join, reducer, false)) {
				Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
			} else if (checkRepartitionMergeJoin(join, reducer)) {
				Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
			} else {
				Assert.fail("Plan has no correct hash join or merge join strategies.");
			}
		} else {
			Assert.fail("Plan has neither correct BC join or partitioned join configuration.");
		}
	} catch (Exception e) {
		e.printStackTrace();
		Assert.fail(e.getMessage());
	}
}
 
示例28
@Test
public void translateUnion2Group() {
	try {
		final int parallelism = 4;
		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 3);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 2);

		dataset1.union(dataset2)
				.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
				.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
				.returns(String.class)
				.output(new DiscardingOutputFormat<>());

		Plan p = env.createProgramPlan();

		// The plan should look like the following one.
		//
		// DataSet1(3) - MapOperator(3)-+
		//	                            |- Union(-1) - SingleInputOperator - Sink
		// DataSet2(2) - MapOperator(2)-+

		GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
		Union unionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();

		// The key mappers should be added to both of the two input streams for union.
		assertTrue(unionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
		assertTrue(unionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);

		// The parallelisms of the key mappers should be equal to those of their inputs.
		assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
		assertEquals(unionOperator.getSecondInput().getParallelism(), 2);

		// The union should always have the default parallelism.
		assertEquals(unionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail("Test caused an error: " + e.getMessage());
	}
}
 
示例29
@Test
public void translateUnion3SortedGroup() {
	try {
		final int parallelism = 4;
		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 2);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 3);

		DataSet<Tuple3<Double, StringValue, LongValue>> dataset3 = getSourceDataSet(env, -1);

		dataset1.union(dataset2).union(dataset3)
				.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
				.sortGroup((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "", Order.ASCENDING)
				.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
				.returns(String.class)
				.output(new DiscardingOutputFormat<>());

		Plan p = env.createProgramPlan();

		// The plan should look like the following one.
		//
		// DataSet1(2) - MapOperator(2)-+
		//	                            |- Union(-1) -+
		// DataSet2(3) - MapOperator(3)-+             |- Union(-1) - SingleInputOperator - Sink
		//                                            |
		//             DataSet3(-1) - MapOperator(-1)-+

		GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
		Union secondUnionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();

		// The first input of the second union should be the first union.
		Union firstUnionOperator = (Union) secondUnionOperator.getFirstInput();

		// The key mapper should be added to the second input stream of the second union.
		assertTrue(secondUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);

		// The key mappers should be added to both of the two input streams for the first union.
		assertTrue(firstUnionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
		assertTrue(firstUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);

		// The parallelisms of the key mappers should be equal to those of their inputs.
		assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
		assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
		assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);

		// The union should always have the default parallelism.
		assertEquals(secondUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
		assertEquals(firstUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail("Test caused an error: " + e.getMessage());
	}
}
 
示例30
@Override
public SingleInputOperator<?, ?, ?> getOperator() {
	return (SingleInputOperator<?, ?, ?>) super.getOperator();
}