Java源码示例:org.apache.flink.api.common.operators.SingleInputOperator
示例1
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
// Local properties for GroupCombine may only be preserved on key fields.
SingleInputSemanticProperties origProps =
((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
FieldSet readSet = origProps.getReadFields(0);
if(readSet != null) {
filteredProps.addReadFields(readSet);
}
// only add forward field information for key fields
if(this.keys != null) {
for (int f : this.keys) {
FieldSet targets = origProps.getForwardingTargetFields(0, f);
for (int t : targets) {
filteredProps.addForwardedField(f, t);
}
}
}
return filteredProps;
}
示例2
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
// Local properties for GroupCombine may only be preserved on key fields.
SingleInputSemanticProperties origProps =
((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
FieldSet readSet = origProps.getReadFields(0);
if(readSet != null) {
filteredProps.addReadFields(readSet);
}
// only add forward field information for key fields
if(this.keys != null) {
for (int f : this.keys) {
FieldSet targets = origProps.getForwardingTargetFields(0, f);
for (int t : targets) {
filteredProps.addForwardedField(f, t);
}
}
}
return filteredProps;
}
示例3
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
// Local properties for GroupCombine may only be preserved on key fields.
SingleInputSemanticProperties origProps =
((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
FieldSet readSet = origProps.getReadFields(0);
if(readSet != null) {
filteredProps.addReadFields(readSet);
}
// only add forward field information for key fields
if(this.keys != null) {
for (int f : this.keys) {
FieldSet targets = origProps.getForwardingTargetFields(0, f);
for (int t : targets) {
filteredProps.addForwardedField(f, t);
}
}
}
return filteredProps;
}
示例4
/**
* Creates a new node with a single input for the optimizer plan.
*
* @param programOperator The PACT that the node represents.
*/
protected SingleInputNode(SingleInputOperator<?, ?, ?> programOperator) {
super(programOperator);
int[] k = programOperator.getKeyColumns(0);
this.keys = k == null || k.length == 0 ? null : new FieldSet(k);
}
示例5
public UnaryOperatorNode(String name, SingleInputOperator<?, ?, ?> operator, boolean onDynamicPath) {
super(operator);
this.name = name;
this.operators = new ArrayList<>();
this.onDynamicPath = onDynamicPath;
}
示例6
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
// Local properties for MapPartition may not be preserved.
SingleInputSemanticProperties origProps =
((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
FieldSet readSet = origProps.getReadFields(0);
if(readSet != null) {
filteredProps.addReadFields(readSet);
}
return filteredProps;
}
示例7
/**
* Creates a new node with a single input for the optimizer plan.
*
* @param programOperator The PACT that the node represents.
*/
protected SingleInputNode(SingleInputOperator<?, ?, ?> programOperator) {
super(programOperator);
int[] k = programOperator.getKeyColumns(0);
this.keys = k == null || k.length == 0 ? null : new FieldSet(k);
}
示例8
public UnaryOperatorNode(String name, SingleInputOperator<?, ?, ?> operator, boolean onDynamicPath) {
super(operator);
this.name = name;
this.operators = new ArrayList<>();
this.onDynamicPath = onDynamicPath;
}
示例9
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
// Local properties for MapPartition may not be preserved.
SingleInputSemanticProperties origProps =
((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
FieldSet readSet = origProps.getReadFields(0);
if(readSet != null) {
filteredProps.addReadFields(readSet);
}
return filteredProps;
}
示例10
/**
* Creates a new node with a single input for the optimizer plan.
*
* @param programOperator The PACT that the node represents.
*/
protected SingleInputNode(SingleInputOperator<?, ?, ?> programOperator) {
super(programOperator);
int[] k = programOperator.getKeyColumns(0);
this.keys = k == null || k.length == 0 ? null : new FieldSet(k);
}
示例11
public UnaryOperatorNode(String name, SingleInputOperator<?, ?, ?> operator, boolean onDynamicPath) {
super(operator);
this.name = name;
this.operators = new ArrayList<>();
this.onDynamicPath = onDynamicPath;
}
示例12
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
// Local properties for MapPartition may not be preserved.
SingleInputSemanticProperties origProps =
((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
FieldSet readSet = origProps.getReadFields(0);
if(readSet != null) {
filteredProps.addReadFields(readSet);
}
return filteredProps;
}
示例13
private void testQueryGeneric(Plan p, long orderSize, long lineitemSize,
float orderSelectivity, float joinSelectivity,
boolean broadcastOkay, boolean partitionedOkay,
boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay) {
try {
// set statistics
OperatorResolver cr = getContractResolver(p);
GenericDataSourceBase<?, ?> ordersSource = cr.getNode(ORDERS);
GenericDataSourceBase<?, ?> lineItemSource = cr.getNode(LINEITEM);
SingleInputOperator<?, ?, ?> mapper = cr.getNode(MAPPER_NAME);
DualInputOperator<?, ?, ?, ?> joiner = cr.getNode(JOIN_NAME);
setSourceStatistics(ordersSource, orderSize, 100f);
setSourceStatistics(lineItemSource, lineitemSize, 140f);
mapper.getCompilerHints().setAvgOutputRecordSize(16f);
mapper.getCompilerHints().setFilterFactor(orderSelectivity);
joiner.getCompilerHints().setFilterFactor(joinSelectivity);
// compile
final OptimizedPlan plan = compileWithStats(p);
final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
// get the nodes from the final plan
final SinkPlanNode sink = or.getNode(SINK);
final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
(SingleInputPlanNode) reducer.getPredecessor() : null;
final DualInputPlanNode join = or.getNode(JOIN_NAME);
final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);
checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
// check the possible variants and that the variant ia allowed in this specific setting
if (checkBroadcastShipStrategies(join, reducer, combiner)) {
Assert.assertTrue("Broadcast join incorrectly chosen.", broadcastOkay);
if (checkHashJoinStrategies(join, reducer, true)) {
Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
} else if (checkHashJoinStrategies(join, reducer, false)) {
Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
} else if (checkBroadcastMergeJoin(join, reducer)) {
Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
} else {
Assert.fail("Plan has no correct hash join or merge join strategies.");
}
}
else if (checkRepartitionShipStrategies(join, reducer, combiner)) {
Assert.assertTrue("Partitioned join incorrectly chosen.", partitionedOkay);
if (checkHashJoinStrategies(join, reducer, true)) {
Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
} else if (checkHashJoinStrategies(join, reducer, false)) {
Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
} else if (checkRepartitionMergeJoin(join, reducer)) {
Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
} else {
Assert.fail("Plan has no correct hash join or merge join strategies.");
}
} else {
Assert.fail("Plan has neither correct BC join or partitioned join configuration.");
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
示例14
@Test
public void translateUnion2Group() {
try {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 3);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 2);
dataset1.union(dataset2)
.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
.returns(String.class)
.output(new DiscardingOutputFormat<>());
Plan p = env.createProgramPlan();
// The plan should look like the following one.
//
// DataSet1(3) - MapOperator(3)-+
// |- Union(-1) - SingleInputOperator - Sink
// DataSet2(2) - MapOperator(2)-+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
Union unionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
// The key mappers should be added to both of the two input streams for union.
assertTrue(unionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
assertTrue(unionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
// The parallelisms of the key mappers should be equal to those of their inputs.
assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
assertEquals(unionOperator.getSecondInput().getParallelism(), 2);
// The union should always have the default parallelism.
assertEquals(unionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Test caused an error: " + e.getMessage());
}
}
示例15
@Test
public void translateUnion3SortedGroup() {
try {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 2);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 3);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset3 = getSourceDataSet(env, -1);
dataset1.union(dataset2).union(dataset3)
.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
.sortGroup((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "", Order.ASCENDING)
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
.returns(String.class)
.output(new DiscardingOutputFormat<>());
Plan p = env.createProgramPlan();
// The plan should look like the following one.
//
// DataSet1(2) - MapOperator(2)-+
// |- Union(-1) -+
// DataSet2(3) - MapOperator(3)-+ |- Union(-1) - SingleInputOperator - Sink
// |
// DataSet3(-1) - MapOperator(-1)-+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
Union secondUnionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
// The first input of the second union should be the first union.
Union firstUnionOperator = (Union) secondUnionOperator.getFirstInput();
// The key mapper should be added to the second input stream of the second union.
assertTrue(secondUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
// The key mappers should be added to both of the two input streams for the first union.
assertTrue(firstUnionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
assertTrue(firstUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
// The parallelisms of the key mappers should be equal to those of their inputs.
assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);
// The union should always have the default parallelism.
assertEquals(secondUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
assertEquals(firstUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Test caused an error: " + e.getMessage());
}
}
示例16
@Override
public SingleInputOperator<?, ?, ?> getOperator() {
return (SingleInputOperator<?, ?, ?>) super.getOperator();
}
示例17
@Override
public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode)
throws CompilerException
{
// see if an internal hint dictates the strategy to use
final Configuration conf = getOperator().getParameters();
final String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
final ShipStrategyType preSet;
if (shipStrategy != null) {
if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) {
preSet = ShipStrategyType.PARTITION_HASH;
} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) {
preSet = ShipStrategyType.PARTITION_RANGE;
} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) {
preSet = ShipStrategyType.FORWARD;
} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
preSet = ShipStrategyType.PARTITION_RANDOM;
} else {
throw new CompilerException("Unrecognized ship strategy hint: " + shipStrategy);
}
} else {
preSet = null;
}
// get the predecessor node
Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput();
OptimizerNode pred;
DagConnection conn;
if (children == null) {
throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input.");
} else {
pred = contractToNode.get(children);
conn = new DagConnection(pred, this, defaultExchangeMode);
if (preSet != null) {
conn.setShipStrategy(preSet);
}
}
// create the connection and add it
setIncomingConnection(conn);
pred.addOutgoingConnection(conn);
}
示例18
/**
* Creates a new MapNode for the given operator.
*
* @param operator The map operator.
*/
public MapNode(SingleInputOperator<?, ?, ?> operator) {
super(operator);
this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapDescriptor());
}
示例19
/**
* Creates a new MapNode for the given contract.
*
* @param operator The map partition contract object.
*/
public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
super(operator);
this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
}
示例20
private void testQueryGeneric(Plan p, long orderSize, long lineitemSize,
float orderSelectivity, float joinSelectivity,
boolean broadcastOkay, boolean partitionedOkay,
boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay) {
try {
// set statistics
OperatorResolver cr = getContractResolver(p);
GenericDataSourceBase<?, ?> ordersSource = cr.getNode(ORDERS);
GenericDataSourceBase<?, ?> lineItemSource = cr.getNode(LINEITEM);
SingleInputOperator<?, ?, ?> mapper = cr.getNode(MAPPER_NAME);
DualInputOperator<?, ?, ?, ?> joiner = cr.getNode(JOIN_NAME);
setSourceStatistics(ordersSource, orderSize, 100f);
setSourceStatistics(lineItemSource, lineitemSize, 140f);
mapper.getCompilerHints().setAvgOutputRecordSize(16f);
mapper.getCompilerHints().setFilterFactor(orderSelectivity);
joiner.getCompilerHints().setFilterFactor(joinSelectivity);
// compile
final OptimizedPlan plan = compileWithStats(p);
final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
// get the nodes from the final plan
final SinkPlanNode sink = or.getNode(SINK);
final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
(SingleInputPlanNode) reducer.getPredecessor() : null;
final DualInputPlanNode join = or.getNode(JOIN_NAME);
final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);
checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
// check the possible variants and that the variant ia allowed in this specific setting
if (checkBroadcastShipStrategies(join, reducer, combiner)) {
Assert.assertTrue("Broadcast join incorrectly chosen.", broadcastOkay);
if (checkHashJoinStrategies(join, reducer, true)) {
Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
} else if (checkHashJoinStrategies(join, reducer, false)) {
Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
} else if (checkBroadcastMergeJoin(join, reducer)) {
Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
} else {
Assert.fail("Plan has no correct hash join or merge join strategies.");
}
}
else if (checkRepartitionShipStrategies(join, reducer, combiner)) {
Assert.assertTrue("Partitioned join incorrectly chosen.", partitionedOkay);
if (checkHashJoinStrategies(join, reducer, true)) {
Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
} else if (checkHashJoinStrategies(join, reducer, false)) {
Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
} else if (checkRepartitionMergeJoin(join, reducer)) {
Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
} else {
Assert.fail("Plan has no correct hash join or merge join strategies.");
}
} else {
Assert.fail("Plan has neither correct BC join or partitioned join configuration.");
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
示例21
@Test
public void translateUnion2Group() {
try {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 3);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 2);
dataset1.union(dataset2)
.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
.returns(String.class)
.output(new DiscardingOutputFormat<>());
Plan p = env.createProgramPlan();
// The plan should look like the following one.
//
// DataSet1(3) - MapOperator(3)-+
// |- Union(-1) - SingleInputOperator - Sink
// DataSet2(2) - MapOperator(2)-+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
Union unionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
// The key mappers should be added to both of the two input streams for union.
assertTrue(unionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
assertTrue(unionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
// The parallelisms of the key mappers should be equal to those of their inputs.
assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
assertEquals(unionOperator.getSecondInput().getParallelism(), 2);
// The union should always have the default parallelism.
assertEquals(unionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Test caused an error: " + e.getMessage());
}
}
示例22
@Test
public void translateUnion3SortedGroup() {
try {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 2);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 3);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset3 = getSourceDataSet(env, -1);
dataset1.union(dataset2).union(dataset3)
.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
.sortGroup((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "", Order.ASCENDING)
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
.returns(String.class)
.output(new DiscardingOutputFormat<>());
Plan p = env.createProgramPlan();
// The plan should look like the following one.
//
// DataSet1(2) - MapOperator(2)-+
// |- Union(-1) -+
// DataSet2(3) - MapOperator(3)-+ |- Union(-1) - SingleInputOperator - Sink
// |
// DataSet3(-1) - MapOperator(-1)-+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
Union secondUnionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
// The first input of the second union should be the first union.
Union firstUnionOperator = (Union) secondUnionOperator.getFirstInput();
// The key mapper should be added to the second input stream of the second union.
assertTrue(secondUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
// The key mappers should be added to both of the two input streams for the first union.
assertTrue(firstUnionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
assertTrue(firstUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
// The parallelisms of the key mappers should be equal to those of their inputs.
assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);
// The union should always have the default parallelism.
assertEquals(secondUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
assertEquals(firstUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Test caused an error: " + e.getMessage());
}
}
示例23
@Override
public SingleInputOperator<?, ?, ?> getOperator() {
return (SingleInputOperator<?, ?, ?>) super.getOperator();
}
示例24
@Override
public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode)
throws CompilerException
{
// see if an internal hint dictates the strategy to use
final Configuration conf = getOperator().getParameters();
final String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
final ShipStrategyType preSet;
if (shipStrategy != null) {
if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) {
preSet = ShipStrategyType.PARTITION_HASH;
} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) {
preSet = ShipStrategyType.PARTITION_RANGE;
} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) {
preSet = ShipStrategyType.FORWARD;
} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
preSet = ShipStrategyType.PARTITION_RANDOM;
} else {
throw new CompilerException("Unrecognized ship strategy hint: " + shipStrategy);
}
} else {
preSet = null;
}
// get the predecessor node
Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput();
OptimizerNode pred;
DagConnection conn;
if (children == null) {
throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input.");
} else {
pred = contractToNode.get(children);
conn = new DagConnection(pred, this, defaultExchangeMode);
if (preSet != null) {
conn.setShipStrategy(preSet);
}
}
// create the connection and add it
setIncomingConnection(conn);
pred.addOutgoingConnection(conn);
}
示例25
/**
* Creates a new MapNode for the given operator.
*
* @param operator The map operator.
*/
public MapNode(SingleInputOperator<?, ?, ?> operator) {
super(operator);
this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapDescriptor());
}
示例26
/**
* Creates a new MapNode for the given contract.
*
* @param operator The map partition contract object.
*/
public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
super(operator);
this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
}
示例27
private void testQueryGeneric(Plan p, long orderSize, long lineitemSize,
float orderSelectivity, float joinSelectivity,
boolean broadcastOkay, boolean partitionedOkay,
boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay) {
try {
// set statistics
OperatorResolver cr = getContractResolver(p);
GenericDataSourceBase<?, ?> ordersSource = cr.getNode(ORDERS);
GenericDataSourceBase<?, ?> lineItemSource = cr.getNode(LINEITEM);
SingleInputOperator<?, ?, ?> mapper = cr.getNode(MAPPER_NAME);
DualInputOperator<?, ?, ?, ?> joiner = cr.getNode(JOIN_NAME);
setSourceStatistics(ordersSource, orderSize, 100f);
setSourceStatistics(lineItemSource, lineitemSize, 140f);
mapper.getCompilerHints().setAvgOutputRecordSize(16f);
mapper.getCompilerHints().setFilterFactor(orderSelectivity);
joiner.getCompilerHints().setFilterFactor(joinSelectivity);
// compile
final OptimizedPlan plan = compileWithStats(p);
final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
// get the nodes from the final plan
final SinkPlanNode sink = or.getNode(SINK);
final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
(SingleInputPlanNode) reducer.getPredecessor() : null;
final DualInputPlanNode join = or.getNode(JOIN_NAME);
final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);
checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
// check the possible variants and that the variant ia allowed in this specific setting
if (checkBroadcastShipStrategies(join, reducer, combiner)) {
Assert.assertTrue("Broadcast join incorrectly chosen.", broadcastOkay);
if (checkHashJoinStrategies(join, reducer, true)) {
Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
} else if (checkHashJoinStrategies(join, reducer, false)) {
Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
} else if (checkBroadcastMergeJoin(join, reducer)) {
Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
} else {
Assert.fail("Plan has no correct hash join or merge join strategies.");
}
}
else if (checkRepartitionShipStrategies(join, reducer, combiner)) {
Assert.assertTrue("Partitioned join incorrectly chosen.", partitionedOkay);
if (checkHashJoinStrategies(join, reducer, true)) {
Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
} else if (checkHashJoinStrategies(join, reducer, false)) {
Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
} else if (checkRepartitionMergeJoin(join, reducer)) {
Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
} else {
Assert.fail("Plan has no correct hash join or merge join strategies.");
}
} else {
Assert.fail("Plan has neither correct BC join or partitioned join configuration.");
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
示例28
@Test
public void translateUnion2Group() {
try {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 3);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 2);
dataset1.union(dataset2)
.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
.returns(String.class)
.output(new DiscardingOutputFormat<>());
Plan p = env.createProgramPlan();
// The plan should look like the following one.
//
// DataSet1(3) - MapOperator(3)-+
// |- Union(-1) - SingleInputOperator - Sink
// DataSet2(2) - MapOperator(2)-+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
Union unionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
// The key mappers should be added to both of the two input streams for union.
assertTrue(unionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
assertTrue(unionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
// The parallelisms of the key mappers should be equal to those of their inputs.
assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
assertEquals(unionOperator.getSecondInput().getParallelism(), 2);
// The union should always have the default parallelism.
assertEquals(unionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Test caused an error: " + e.getMessage());
}
}
示例29
@Test
public void translateUnion3SortedGroup() {
try {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 2);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 3);
DataSet<Tuple3<Double, StringValue, LongValue>> dataset3 = getSourceDataSet(env, -1);
dataset1.union(dataset2).union(dataset3)
.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
.sortGroup((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "", Order.ASCENDING)
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
.returns(String.class)
.output(new DiscardingOutputFormat<>());
Plan p = env.createProgramPlan();
// The plan should look like the following one.
//
// DataSet1(2) - MapOperator(2)-+
// |- Union(-1) -+
// DataSet2(3) - MapOperator(3)-+ |- Union(-1) - SingleInputOperator - Sink
// |
// DataSet3(-1) - MapOperator(-1)-+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
Union secondUnionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
// The first input of the second union should be the first union.
Union firstUnionOperator = (Union) secondUnionOperator.getFirstInput();
// The key mapper should be added to the second input stream of the second union.
assertTrue(secondUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
// The key mappers should be added to both of the two input streams for the first union.
assertTrue(firstUnionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
assertTrue(firstUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
// The parallelisms of the key mappers should be equal to those of their inputs.
assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);
// The union should always have the default parallelism.
assertEquals(secondUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
assertEquals(firstUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Test caused an error: " + e.getMessage());
}
}
示例30
@Override
public SingleInputOperator<?, ?, ?> getOperator() {
return (SingleInputOperator<?, ?, ?>) super.getOperator();
}