Java源码示例:org.apache.flink.graph.gsa.GatherSumApplyIteration
示例1
/**
* Runs a Gather-Sum-Apply iteration on the graph with configuration options.
*
* @param gatherFunction the gather function collects information about adjacent vertices and edges
* @param sumFunction the sum function aggregates the gathered information
* @param applyFunction the apply function updates the vertex values with the aggregates
* @param maximumNumberOfIterations maximum number of iterations to perform
* @param parameters the iteration configuration parameters
* @param <M> the intermediate type used between gather, sum and apply
*
* @return the updated Graph after the gather-sum-apply iteration has converged or
* after maximumNumberOfIterations.
*/
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
GSAConfiguration parameters) {
GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
iteration.configure(parameters);
DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
return new Graph<>(newVertices, this.edges, this.context);
}
示例2
@Test
public void testIterationConfiguration() throws Exception {
/*
* Test name, parallelism and solutionSetUnmanaged parameters
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
GatherSumApplyIteration<Long, Long, Long, Long> iteration = GatherSumApplyIteration
.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyGather(),
new DummySum(), new DummyApply(), 10);
GSAConfiguration parameters = new GSAConfiguration();
parameters.setName("gelly iteration");
parameters.setParallelism(2);
parameters.setSolutionSetUnmanagedMemory(true);
iteration.configure(parameters);
Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
List<Vertex<Long, Long>> result = data.collect();
expectedResult = "1,11\n" +
"2,12\n" +
"3,13\n" +
"4,14\n" +
"5,15";
compareResultAsTuples(result, expectedResult);
}
示例3
/**
* Runs a Gather-Sum-Apply iteration on the graph with configuration options.
*
* @param gatherFunction the gather function collects information about adjacent vertices and edges
* @param sumFunction the sum function aggregates the gathered information
* @param applyFunction the apply function updates the vertex values with the aggregates
* @param maximumNumberOfIterations maximum number of iterations to perform
* @param parameters the iteration configuration parameters
* @param <M> the intermediate type used between gather, sum and apply
*
* @return the updated Graph after the gather-sum-apply iteration has converged or
* after maximumNumberOfIterations.
*/
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
GSAConfiguration parameters) {
GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
iteration.configure(parameters);
DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
return new Graph<>(newVertices, this.edges, this.context);
}
示例4
@Test
public void testIterationConfiguration() throws Exception {
/*
* Test name, parallelism and solutionSetUnmanaged parameters
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
GatherSumApplyIteration<Long, Long, Long, Long> iteration = GatherSumApplyIteration
.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyGather(),
new DummySum(), new DummyApply(), 10);
GSAConfiguration parameters = new GSAConfiguration();
parameters.setName("gelly iteration");
parameters.setParallelism(2);
parameters.setSolutionSetUnmanagedMemory(true);
iteration.configure(parameters);
Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
List<Vertex<Long, Long>> result = data.collect();
expectedResult = "1,11\n" +
"2,12\n" +
"3,13\n" +
"4,14\n" +
"5,15";
compareResultAsTuples(result, expectedResult);
}
示例5
/**
* Runs a Gather-Sum-Apply iteration on the graph with configuration options.
*
* @param gatherFunction the gather function collects information about adjacent vertices and edges
* @param sumFunction the sum function aggregates the gathered information
* @param applyFunction the apply function updates the vertex values with the aggregates
* @param maximumNumberOfIterations maximum number of iterations to perform
* @param parameters the iteration configuration parameters
* @param <M> the intermediate type used between gather, sum and apply
*
* @return the updated Graph after the gather-sum-apply iteration has converged or
* after maximumNumberOfIterations.
*/
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
GSAConfiguration parameters) {
GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
iteration.configure(parameters);
DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
return new Graph<>(newVertices, this.edges, this.context);
}
示例6
@Test
public void testIterationConfiguration() throws Exception {
/*
* Test name, parallelism and solutionSetUnmanaged parameters
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
GatherSumApplyIteration<Long, Long, Long, Long> iteration = GatherSumApplyIteration
.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyGather(),
new DummySum(), new DummyApply(), 10);
GSAConfiguration parameters = new GSAConfiguration();
parameters.setName("gelly iteration");
parameters.setParallelism(2);
parameters.setSolutionSetUnmanagedMemory(true);
iteration.configure(parameters);
Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
List<Vertex<Long, Long>> result = data.collect();
expectedResult = "1,11\n" +
"2,12\n" +
"3,13\n" +
"4,14\n" +
"5,15";
compareResultAsTuples(result, expectedResult);
}