public static Map<String, Set<String>> getAdjacencyMap(StormTopology topology,
boolean removeSystemComponent) {
Map<String, Set<String>> adjacencyMap = new HashMap<>();
for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
String boltName = entry.getKey();
Map<GlobalStreamId, Grouping> inputs = entry.getValue().get_common().get_inputs();
for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
String inputComponentId = input.getKey().get_componentId();
Set<String> components = adjacencyMap.containsKey(inputComponentId)
? adjacencyMap.get(inputComponentId) : new HashSet<String>();
components.add(boltName);
components = removeSystemComponent ? removeSystemComponents(components)
: components;
if (!removeSystemComponent || !isSystemComponent(inputComponentId)) {
adjacencyMap.put(inputComponentId, components);
}
}
}
return adjacencyMap;
}
public static Map<String, Set<String>> getAdjacencyMap(StormTopology topology,
boolean removeSystemComponent)
throws Exception {
Map<String, Set<String>> adjacencyMap = new HashMap<>();
for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
String boltName = entry.getKey();
Map<GlobalStreamId, Grouping> inputs = entry.getValue().get_common().get_inputs();
for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
String inputComponentId = input.getKey().get_componentId();
Set<String> components = adjacencyMap.containsKey(inputComponentId)
? adjacencyMap.get(inputComponentId) : new HashSet<String>();
components.add(boltName);
components = removeSystemComponent ? removeSystemComponents(components)
: components;
if (!removeSystemComponent || !isSystemComponent(inputComponentId)) {
adjacencyMap.put(inputComponentId, components);
}
}
}
return adjacencyMap;
}
@Test
public void aggregate() throws Exception {
beansXml = "<breeze:topology id='aggregate'>" +
"<breeze:spout id='iterator' beanType='java.util.Iterator' signature='next()' outputFields='x'/>" +
"<breeze:spout id='queue' beanType='java.util.Queue' signature='poll()' outputFields='x'/>" +
"<breeze:bolt id='collector' beanType='org.slf4j.Logger' signature='info(x)'/>" +
"</breeze:topology>";
refresh();
StormTopology topology = getBean("aggregate", StormTopology.class);
Bolt collector = topology.get_bolts().get("collector");
Map<GlobalStreamId, Grouping> inputs = collector.get_common().get_inputs();
assertEquals("input count", 2, inputs.size());
assertNotNull("iterator grouping", inputs.get(new GlobalStreamId("iterator", "default")));
assertNotNull("queue grouping", inputs.get(new GlobalStreamId("queue", "default")));
}
@Test
public void build() throws Exception {
beansXml = "<breeze:topology id='t1'>" +
"<breeze:spout id='s1' beanType='eu.icolumbo.breeze.TestBean' signature='ping()' outputFields='feed'/>" +
"<breeze:bolt id='b1' beanType='eu.icolumbo.breeze.TestBean' signature='echo(feed)' outputFields='replay' scatterOutput='true'/>" +
"<breeze:bolt beanType='eu.icolumbo.breeze.TestBean' signature='drain(replay)' parallelism='2'/>" +
"</breeze:topology>";
refresh();
StormTopology topology = getBean("t1", StormTopology.class);
assertEquals("spout count", 1, topology.get_spouts_size());
assertEquals("bolt count", 2, topology.get_bolts_size());
SpringSpout spout = getBean("s1", SpringSpout.class);
assertEquals("spout ID", "s1", spout.getId());
assertEquals("spout scatter", false, spout.getScatterOutput());
SpringBolt bolt = getBean("b1", SpringBolt.class);
assertEquals("bolt ID", "b1", bolt.getId());
assertEquals("bolt scatter", true, bolt.getScatterOutput());
Map<String, SpoutSpec> topologySpouts = topology.get_spouts();
SpoutSpec spoutSpec = topologySpouts.get("s1");
assertNotNull("s1 spec", spoutSpec);
Map<String, Bolt> topologyBolts = topology.get_bolts();
Bolt boltSpec = topologyBolts.get("b1");
assertNotNull("b1 spec", boltSpec);
String anonymousBoltId = null;
for (String id : topologyBolts.keySet())
if (! "b1".equals(id))
anonymousBoltId = id;
assertNotNull("anonymous ID", anonymousBoltId);
Bolt anonymousBoltSpec = topologyBolts.get(anonymousBoltId);
assertNotNull("anonymous spec", anonymousBoltSpec);
assertEquals("s1 parralelism", 1, spoutSpec.get_common().get_parallelism_hint());
assertEquals("b1 parralelism", 1, boltSpec.get_common().get_parallelism_hint());
assertEquals("second bold parrallelism", 2, anonymousBoltSpec.get_common().get_parallelism_hint());
Map<GlobalStreamId,Grouping> boltInputs = boltSpec.get_common().get_inputs();
assertEquals("input size", 1, boltInputs.size());
GlobalStreamId streamId = boltInputs.keySet().iterator().next();
assertEquals("input component id", "s1", streamId.get_componentId());
assertEquals("input stream id", "default", streamId.get_streamId());
}