Java源码示例:org.apache.flink.optimizer.testfunctions.IdentityMapper
示例1
@Test
public void testBranchesOnlyInBCVariables1() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Long> input = env.generateSequence(1, 10);
DataSet<Long> bc_input = env.generateSequence(1, 10);
input
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
compileNoStats(plan);
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
示例2
/**
* Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
* |--------------------------/ /
* |--------------------------------------------/
*
* First cross has SameKeyFirst output contract
*/
@Test
public void testTicket158() {
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> set1 = env.generateSequence(0,1);
set1.map(new IdentityMapper<Long>()).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
.cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
.cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
.output(new DiscardingOutputFormat<Long>()).name("Sink");
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
}
示例3
@Test
public void testBranchAfterIteration() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> sourceA = env.generateSequence(0,1);
IterativeDataSet<Long> loopHead = sourceA.iterate(10);
DataSet<Long> loopTail = loopHead.map(new IdentityMapper<Long>()).name("Mapper");
DataSet<Long> loopRes = loopHead.closeWith(loopTail);
loopRes.output(new DiscardingOutputFormat<Long>());
loopRes.map(new IdentityMapper<Long>())
.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
try {
compileNoStats(plan);
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
示例4
/**
* Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
* |--------------------------/ /
* |--------------------------------------------/
*
* First cross has SameKeyFirst output contract
*/
@Test
public void testTicket158() {
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> set1 = env.generateSequence(0,1);
set1.map(new IdentityMapper<Long>()).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
.cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
.cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
.output(new DiscardingOutputFormat<Long>()).name("Sink");
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
}
示例5
@Test
public void testBranchAfterIteration() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> sourceA = env.generateSequence(0,1);
IterativeDataSet<Long> loopHead = sourceA.iterate(10);
DataSet<Long> loopTail = loopHead.map(new IdentityMapper<Long>()).name("Mapper");
DataSet<Long> loopRes = loopHead.closeWith(loopTail);
loopRes.output(new DiscardingOutputFormat<Long>());
loopRes.map(new IdentityMapper<Long>())
.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
try {
compileNoStats(plan);
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
示例6
@Test
public void testNoBreakerForIndependentVariable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> source1 = env.fromElements("test");
DataSet<String> source2 = env.fromElements("test");
source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name")
.output(new DiscardingOutputFormat<String>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());
assertEquals(DataExchangeMode.PIPELINED, mapper.getInput().getDataExchangeMode());
assertEquals(DataExchangeMode.PIPELINED, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例7
@Test
public void testBreakerForDependentVariable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> source1 = env.fromElements("test");
source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name")
.output(new DiscardingOutputFormat<String>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode beforeMapper = (SingleInputPlanNode) mapper.getInput().getSource();
assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
assertEquals(TempMode.NONE, beforeMapper.getInput().getTempMode());
assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());
assertEquals(DataExchangeMode.PIPELINED, mapper.getInput().getDataExchangeMode());
assertEquals(DataExchangeMode.BATCH, beforeMapper.getInput().getDataExchangeMode());
assertEquals(DataExchangeMode.BATCH, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例8
@Test
public void testDistinctDestroysPartitioningOfNonDistinctFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
data.distinct(1)
.groupBy(0)
.sum(1)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
SingleInputPlanNode distinctReducer = (SingleInputPlanNode) combiner.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
// reducer must repartition, because it works on a different field
assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
// distinct reducer is partitioned
assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例9
/**
* <pre>
* +---------Iteration-------+
* | |
* /--map--< >----\ |
* / | \ /-------< >---sink
* src-map | join------/ |
* \ | / |
* \ +-----/-------------------+
* \ /
* \--reduce--/
* </pre>
*/
@Test
public void testIterationWithStaticInput() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Long> source = env.generateSequence(1, 1000000);
DataSet<Long> mapped = source.map(new IdentityMapper<Long>());
DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
IterativeDataSet<Long> iteration = mapped.iterate(10);
iteration.closeWith(
iteration.join(reduced)
.where(new IdentityKeyExtractor<Long>())
.equalTo(new IdentityKeyExtractor<Long>())
.with(new DummyFlatJoinFunction<Long>()))
.output(new DiscardingOutputFormat<Long>());
compileNoStats(env.createProgramPlan());
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
示例10
@Test
public void testWorksetIterationPipelineBreakerPlacement() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
// trivial iteration, since we are interested in the inputs to the iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
initialWorkset
.join(result, JoinHint.REPARTITION_HASH_FIRST)
.where(0).equalTo(0)
.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
Plan p = env.createProgramPlan();
compileNoStats(p);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例11
private Plan getTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Tuple2<Long, Long>> solSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Solution Set");
DataSet<Tuple2<Long, Long>> workSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Workset");
DataSet<Tuple2<Long, Long>> invariantInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Invariant Input");
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIt = solSetInput.iterateDelta(workSetInput, 100, 0).name(ITERATION_NAME);
DataSet<Tuple2<Long, Long>> join1 = deltaIt.getWorkset().join(invariantInput).where(0).equalTo(0)
.with(new IdentityJoiner<Tuple2<Long, Long>>())
.withForwardedFieldsFirst("*").name(JOIN_WITH_INVARIANT_NAME);
DataSet<Tuple2<Long, Long>> join2 = deltaIt.getSolutionSet().join(join1).where(0).equalTo(0)
.with(new IdentityJoiner<Tuple2<Long, Long>>())
.name(JOIN_WITH_SOLUTION_SET);
if(joinPreservesSolutionSet) {
((JoinOperator<?,?,?>)join2).withForwardedFieldsFirst("*");
}
DataSet<Tuple2<Long, Long>> nextWorkset = join2.groupBy(0).reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>())
.withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME);
if(mapBeforeSolutionDelta) {
DataSet<Tuple2<Long, Long>> mapper = join2.map(new IdentityMapper<Tuple2<Long, Long>>())
.withForwardedFields("*").name(SOLUTION_DELTA_MAPPER_NAME);
deltaIt.closeWith(mapper, nextWorkset)
.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
}
else {
deltaIt.closeWith(join2, nextWorkset)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
}
return env.createProgramPlan();
}
示例12
@Test
public void testBulkIterationInClosure() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> data1 = env.generateSequence(1, 100);
DataSet<Long> data2 = env.generateSequence(1, 100);
IterativeDataSet<Long> firstIteration = data1.iterate(100);
DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
DataSet<Long> joined = mainIteration.join(firstResult)
.where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
.with(new DummyFlatJoinFunction<Long>());
DataSet<Long> mainResult = mainIteration.closeWith(joined);
mainResult.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
// optimizer should be able to translate this
OptimizedPlan op = compileNoStats(p);
// job graph generator should be able to translate this
new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例13
/**
* Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
*
* Increases parallelism between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
* Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
* transit as well.
*/
@Test
public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
final int p = DEFAULT_PARALLELISM;
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(p);
DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
set1.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
Plan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
// check the optimized Plan
// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
// mapper respectively reducer
SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
}
示例14
@Test
public void testCostComputationWithMultipleDataSinks() {
final int SINKS = 5;
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> source = env.generateSequence(1, 10000);
DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
DataSet<Long> mappedC = source.map(new IdentityMapper<Long>());
for (int sink = 0; sink < SINKS; sink++) {
mappedA.output(new DiscardingOutputFormat<Long>());
mappedC.output(new DiscardingOutputFormat<Long>());
}
Plan plan = env.createProgramPlan("Plans With Multiple Data Sinks");
OptimizedPlan oPlan = compileNoStats(plan);
new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例15
@Test
public void testWorksetIterationPipelineBreakerPlacement() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
// trivial iteration, since we are interested in the inputs to the iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
initialWorkset
.join(result, JoinHint.REPARTITION_HASH_FIRST)
.where(0).equalTo(0)
.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
Plan p = env.createProgramPlan();
compileNoStats(p);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例16
/**
* Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
*
* Increases parallelism between 1st reduce and 2nd map, such that more tasks are on one instance.
* Expected to re-establish partitioning between map and reduce via a local hash.
*/
@Test
public void checkPropertyHandlingWithIncreasingLocalParallelism() {
final int p = DEFAULT_PARALLELISM * 2;
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(p);
DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
set1.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
Plan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
// check the optimized Plan
// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
// mapper respectively reducer
SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
Assert.assertTrue("Invalid ship strategy for an operator.",
(ShipStrategyType.PARTITION_RANDOM == mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) ||
(ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
}
示例17
/**
* <pre>
* +---------Iteration-------+
* | |
* /--map--< >----\ |
* / | \ /-------< >---sink
* src-map | join------/ |
* \ | / |
* \ +-----/-------------------+
* \ /
* \--reduce--/
* </pre>
*/
@Test
public void testIterationWithStaticInput() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Long> source = env.generateSequence(1, 1000000);
DataSet<Long> mapped = source.map(new IdentityMapper<Long>());
DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
IterativeDataSet<Long> iteration = mapped.iterate(10);
iteration.closeWith(
iteration.join(reduced)
.where(new IdentityKeyExtractor<Long>())
.equalTo(new IdentityKeyExtractor<Long>())
.with(new DummyFlatJoinFunction<Long>()))
.output(new DiscardingOutputFormat<Long>());
compileNoStats(env.createProgramPlan());
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
示例18
@Test
public void testMultipleIterations() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
DataSet<String> reduced = input
.map(new IdentityMapper<String>())
.reduceGroup(new Top1GroupReducer<String>());
IterativeDataSet<String> iteration1 = input.iterate(100);
IterativeDataSet<String> iteration2 = input.iterate(20);
IterativeDataSet<String> iteration3 = input.iterate(17);
iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1"))
.output(new DiscardingOutputFormat<String>());
iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2"))
.output(new DiscardingOutputFormat<String>());
iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3"))
.output(new DiscardingOutputFormat<String>());
Plan plan = env.createProgramPlan();
try{
compileNoStats(plan);
}catch(Exception e){
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
示例19
@Test
public void testMultipleIterationsWithClosueBCVars() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
IterativeDataSet<String> iteration1 = input.iterate(100);
IterativeDataSet<String> iteration2 = input.iterate(20);
IterativeDataSet<String> iteration3 = input.iterate(17);
iteration1.closeWith(iteration1.map(new IdentityMapper<String>()))
.output(new DiscardingOutputFormat<String>());
iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()))
.output(new DiscardingOutputFormat<String>());
iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()))
.output(new DiscardingOutputFormat<String>());
Plan plan = env.createProgramPlan();
try{
compileNoStats(plan);
}catch(Exception e){
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
示例20
/**
* Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
*
* Increases parallelism between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
* Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
* transit as well.
*/
@Test
public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
final int p = DEFAULT_PARALLELISM;
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(p);
DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
set1.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
Plan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
// check the optimized Plan
// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
// mapper respectively reducer
SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
}
示例21
@Test
public void testBranchesOnlyInBCVariables2() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
DataSet<Tuple2<Long, Long>> joinInput1 =
input.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
.withBroadcastSet(bc_input2, "bc2");
DataSet<Tuple2<Long, Long>> joinInput2 =
input.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1, "bc1")
.withBroadcastSet(bc_input2, "bc2");
DataSet<Tuple2<Long, Long>> joinResult = joinInput1
.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
input
.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1, "bc1")
.union(joinResult)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
compileNoStats(plan);
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
示例22
@Test
public void testPipelineBreakerWithBroadcastVariable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);
env.setParallelism(64);
DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
DataSet<Long> result = source.map(new IdentityMapper<Long>())
.map(new IdentityMapper<Long>())
.withBroadcastSet(source, "bc");
result.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode mapperInput = (SingleInputPlanNode) mapper.getInput().getSource();
assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());
assertEquals(DataExchangeMode.BATCH, mapperInput.getInput().getDataExchangeMode());
assertEquals(DataExchangeMode.BATCH, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例23
@Test
public void testDistinctDestroysPartitioningOfNonDistinctFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
data.distinct(1)
.groupBy(0)
.sum(1)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
SingleInputPlanNode distinctReducer = (SingleInputPlanNode) combiner.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
// reducer must repartition, because it works on a different field
assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
// distinct reducer is partitioned
assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例24
@Test
public void testNoBreakerForIndependentVariable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> source1 = env.fromElements("test");
DataSet<String> source2 = env.fromElements("test");
source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name")
.output(new DiscardingOutputFormat<String>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());
assertEquals(DataExchangeMode.PIPELINED, mapper.getInput().getDataExchangeMode());
assertEquals(DataExchangeMode.PIPELINED, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例25
@Test
public void testBreakerForDependentVariable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> source1 = env.fromElements("test");
source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name")
.output(new DiscardingOutputFormat<String>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode beforeMapper = (SingleInputPlanNode) mapper.getInput().getSource();
assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
assertEquals(TempMode.NONE, beforeMapper.getInput().getTempMode());
assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());
assertEquals(DataExchangeMode.PIPELINED, mapper.getInput().getDataExchangeMode());
assertEquals(DataExchangeMode.BATCH, beforeMapper.getInput().getDataExchangeMode());
assertEquals(DataExchangeMode.BATCH, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例26
@Test
public void testDistinctPreservesPartitioningOfDistinctFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
data.distinct(0)
.groupBy(0)
.sum(1)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode distinctReducer = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
// reducer can be forward, reuses partitioning from distinct
assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
// distinct reducer is partitioned
assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例27
@Test
public void testDistinctDestroysPartitioningOfNonDistinctFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
data.distinct(1)
.groupBy(0)
.sum(1)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
SingleInputPlanNode distinctReducer = (SingleInputPlanNode) combiner.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
// reducer must repartition, because it works on a different field
assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
// distinct reducer is partitioned
assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例28
@Test
public void testGeneratingJobGraphWithUnconsumedResultPartition() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<>(1L, 2L))
.setParallelism(1);
DataSet<Tuple2<Long, Long>> ds = input.map(new IdentityMapper<>())
.setParallelism(3);
AbstractID intermediateDataSetID = new AbstractID();
// this output branch will be excluded.
ds.output(BlockingShuffleOutputFormat.createOutputFormat(intermediateDataSetID))
.setParallelism(1);
// this is the normal output branch.
ds.output(new DiscardingOutputFormat<>())
.setParallelism(1);
JobGraph jobGraph = compileJob(env);
Assert.assertEquals(3, jobGraph.getVerticesSortedTopologicallyFromSources().size());
JobVertex mapVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
Assert.assertThat(mapVertex, Matchers.instanceOf(JobVertex.class));
// there are 2 output result with one of them is ResultPartitionType.BLOCKING_PERSISTENT
Assert.assertEquals(2, mapVertex.getProducedDataSets().size());
Assert.assertTrue(mapVertex.getProducedDataSets().stream()
.anyMatch(dataSet -> dataSet.getId().equals(new IntermediateDataSetID(intermediateDataSetID)) &&
dataSet.getResultType() == ResultPartitionType.BLOCKING_PERSISTENT));
}
示例29
@Test
public void testSolutionSetDeltaDependsOnBroadcastVariable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> source =
env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());
DataSet<Tuple2<Long, Long>> invariantInput =
env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());
// iteration from here
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = source.iterateDelta(source, 1000, 1);
DataSet<Tuple2<Long, Long>> result =
invariantInput
.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);
iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result)
.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
OptimizedPlan p = compileNoStats(env.createProgramPlan());
// check that the JSON generator accepts this plan
new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
// check that the JobGraphGenerator accepts the plan
new JobGraphGenerator().compileJobGraph(p);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例30
@Test
public void testWorksetIterationPipelineBreakerPlacement() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
// trivial iteration, since we are interested in the inputs to the iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
initialWorkset
.join(result, JoinHint.REPARTITION_HASH_FIRST)
.where(0).equalTo(0)
.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
Plan p = env.createProgramPlan();
compileNoStats(p);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}