Java源码示例:org.apache.kafka.streams.TopologyDescription
示例1
@Test void should_doNothing_whenAllDisabled() {
// Given: configs
Duration traceTtl = Duration.ofMillis(5);
Duration traceTtlCheckInterval = Duration.ofMinutes(1);
List<String> autocompleteKeys = Collections.singletonList("environment");
// When: topology provided
Topology topology = new TraceStorageTopology(
spansTopic,
autocompleteKeys,
traceTtl,
traceTtlCheckInterval,
0,
false,
false).get();
TopologyDescription description = topology.describe();
// Then:
assertThat(description.subtopologies()).hasSize(0);
// Given: streams config
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
testDriver.close();
}
示例2
@Test void should_doNothing_whenDisabled() {
// Given: configs
Duration dependenciesRetentionPeriod = Duration.ofMinutes(1);
Duration dependenciesWindowSize = Duration.ofMillis(100);
// When: topology created
Topology topology = new DependencyStorageTopology(
dependencyTopic,
dependenciesRetentionPeriod,
dependenciesWindowSize,
false).get();
TopologyDescription description = topology.describe();
// Then: topology with 1 thread
assertThat(description.subtopologies()).hasSize(0);
// Given: streams configuration
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
testDriver.close();
}
示例3
@Test void should_doNothing_whenAggregationDisabled() {
Duration traceTimeout = Duration.ofSeconds(1);
Topology topology = new SpanAggregationTopology(
spansTopic,
traceTopic,
dependencyTopic,
traceTimeout,
false).get();
TopologyDescription description = topology.describe();
// Then: single threaded topology
assertThat(description.subtopologies()).hasSize(0);
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
testDriver.close();
}
示例4
public void start() {
this.schemaRegistry.start();
this.properties
.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl());
try {
this.stateDirectory = Files.createTempDirectory("fluent-kafka-streams");
} catch (final IOException e) {
throw new UncheckedIOException("Cannot create temporary state directory", e);
}
this.properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath().toString());
final Topology topology = this.topologyFactory.apply(this.properties);
this.testDriver = new TopologyTestDriver(topology, this.properties);
this.inputTopics.clear();
this.outputTopics.clear();
for (final TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) {
for (final TopologyDescription.Node node : subtopology.nodes()) {
if (node instanceof TopologyDescription.Source) {
for (final String topic : ((Source) node).topicSet()) {
addExternalTopics(this.inputTopics, topic);
}
} else if (node instanceof TopologyDescription.Sink) {
addExternalTopics(this.outputTopics, ((TopologyDescription.Sink) node).topic());
}
}
}
for (final GlobalStore store : topology.describe().globalStores()) {
store.source().topicSet().forEach(name -> addExternalTopics(this.inputTopics, name));
}
}
示例5
@Test
public void shouldBuildSourceNode() {
setupTopicClientExpectations(1, 1);
buildJoin();
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), SOURCE_NODE);
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList(MAPVALUES_NODE)));
assertThat(node.topics(), equalTo("[test2]"));
}
示例6
@Test
public void shouldHaveLeftJoin() {
setupTopicClientExpectations(1, 1);
buildJoin();
final Topology topology = builder.build();
final TopologyDescription.Processor leftJoin
= (TopologyDescription.Processor) getNodeByName(topology, "KSTREAM-LEFTJOIN-0000000014");
final List<String> predecessors = leftJoin.predecessors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(leftJoin.stores(), equalTo(Utils.mkSet("KSTREAM-REDUCE-STATE-STORE-0000000003")));
assertThat(predecessors, equalTo(Collections.singletonList("KSTREAM-SOURCE-0000000013")));
}
示例7
@Test
public void shouldBuildSourceNode() throws Exception {
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(SOURCE_NODE);
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList(SOURCE_MAPVALUES_NODE)));
assertThat(node.topics(), equalTo("[test1]"));
}
示例8
@Test
public void shouldBuildSourceNode() {
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), PlanTestUtil.SOURCE_NODE);
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList(PlanTestUtil.MAPVALUES_NODE)));
assertThat(node.topics(), equalTo("[topic]"));
}
示例9
@Test
public void shouldBuildSourceNode() {
build();
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), SOURCE_NODE);
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList(MAPVALUES_NODE)));
assertThat(node.topics(), equalTo("[test1]"));
}
示例10
@Test
public void shouldHaveTwoSubTopologies() {
// We always require rekey at the moment.
buildRequireRekey();
final TopologyDescription description = builder.build().describe();
assertThat(description.subtopologies().size(), equalTo(2));
}
示例11
@Test
public void shouldHaveSourceNodeForSecondSubtopolgy() {
buildRequireRekey();
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), "KSTREAM-SOURCE-0000000010");
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList("KSTREAM-AGGREGATE-0000000007")));
assertThat(node.topics(), containsString("[KSTREAM-AGGREGATE-STATE-STORE-0000000006"));
assertThat(node.topics(), containsString("-repartition]"));
}
示例12
@Test
public void shouldHaveSinkNodeWithSameTopicAsSecondSource() {
buildRequireRekey();
TopologyDescription.Sink sink = (TopologyDescription.Sink) getNodeByName(builder.build(), "KSTREAM-SINK-0000000008");
final TopologyDescription.Source source = (TopologyDescription.Source) getNodeByName(builder.build(), "KSTREAM-SOURCE-0000000010");
assertThat(sink.successors(), equalTo(Collections.emptySet()));
assertThat("[" + sink.topic() + "]", equalTo(source.topics()));
}
示例13
@Test
public void shouldBuildSourceNode() {
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), SOURCE_NODE);
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList(MAPVALUES_NODE)));
assertThat(node.topics(), equalTo("[input]"));
}
示例14
@Test
public void shouldBuildOutputNode() {
final TopologyDescription.Sink sink = (TopologyDescription.Sink) getNodeByName(builder.build(), OUTPUT_NODE);
final List<String> predecessors = sink.predecessors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(sink.successors(), equalTo(Collections.emptySet()));
assertThat(predecessors, equalTo(Collections.singletonList(MAPVALUES_OUTPUT_NODE)));
assertThat(sink.topic(), equalTo("output"));
}
示例15
static TopologyDescription.Node getNodeByName(final Topology topology, final String nodeName) {
final TopologyDescription description = topology.describe();
final Set<TopologyDescription.Subtopology> subtopologies = description.subtopologies();
List<TopologyDescription.Node> nodes = subtopologies.stream().flatMap(subtopology -> subtopology.nodes().stream()).collect(Collectors.toList());
final Map<String, List<TopologyDescription.Node>> nodesByName = nodes.stream().collect(Collectors.groupingBy(TopologyDescription.Node::name));
return nodesByName.get(nodeName).get(0);
}
示例16
static void verifyProcessorNode(final TopologyDescription.Processor node,
final List<String> expectedPredecessors,
final List<String> expectedSuccessors) {
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
final List<String> predecessors = node.predecessors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(predecessors, equalTo(expectedPredecessors));
assertThat(successors, equalTo(expectedSuccessors));
}
示例17
@Test void should_persistSpans_and_onlyQueryTraces_whenEnabled() {
// Given: configs
Duration traceTtl = Duration.ofMillis(5);
Duration traceTtlCheckInterval = Duration.ofMinutes(1);
List<String> autocompleteKeys = Collections.singletonList("environment");
SpansSerde spansSerde = new SpansSerde();
// When: topology provided
Topology topology = new TraceStorageTopology(
spansTopic,
autocompleteKeys,
traceTtl,
traceTtlCheckInterval,
0,
true,
false).get();
TopologyDescription description = topology.describe();
// Then: 1 thread prepared
assertThat(description.subtopologies()).hasSize(1);
// Given: streams config
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
// When: a trace is passed
ConsumerRecordFactory<String, List<Span>> factory =
new ConsumerRecordFactory<>(spansTopic, new StringSerializer(), spansSerde.serializer());
Span a = Span.newBuilder().traceId("a").id("a").name("op_a").kind(Span.Kind.CLIENT)
.localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
.timestamp(10000L).duration(11L)
.putTag("environment", "dev")
.build();
Span b = Span.newBuilder().traceId("a").id("b").name("op_b").kind(Span.Kind.SERVER)
.localEndpoint(Endpoint.newBuilder().serviceName("svc_b").build())
.timestamp(10000L).duration(10L)
.build();
Span c = Span.newBuilder().traceId("c").id("c").name("op_a").kind(Span.Kind.CLIENT)
.localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
.timestamp(10000L).duration(11L)
.putTag("environment", "dev")
.build();
List<Span> spans = Arrays.asList(a, b, c);
testDriver.pipeInput(factory.create(spansTopic, a.traceId(), spans, 10L));
// Then: trace stores are filled
KeyValueStore<String, List<Span>> traces = testDriver.getKeyValueStore(TRACES_STORE_NAME);
assertThat(traces.get(a.traceId())).containsExactlyElementsOf(spans);
KeyValueStore<Long, Set<String>> spanIdsByTs =
testDriver.getKeyValueStore(SPAN_IDS_BY_TS_STORE_NAME);
KeyValueIterator<Long, Set<String>> ids = spanIdsByTs.all();
assertThat(ids).hasNext();
assertThat(ids.next().value).containsExactly(a.traceId());
// Then: service name stores are filled
KeyValueStore<String, String> serviceNames =
testDriver.getKeyValueStore(SERVICE_NAMES_STORE_NAME);
assertThat(serviceNames).isNull();
KeyValueStore<String, Set<String>> spanNames =
testDriver.getKeyValueStore(SPAN_NAMES_STORE_NAME);
assertThat(spanNames).isNull();
KeyValueStore<String, Set<String>> autocompleteTags =
testDriver.getKeyValueStore(AUTOCOMPLETE_TAGS_STORE_NAME);
assertThat(autocompleteTags).isNull();
// Finally close resources
testDriver.close();
spansSerde.close();
}
示例18
@Test void should_persistSpans_and_searchQueryTraces_whenAllEnabled() {
// Given: configs
Duration traceTtl = Duration.ofMillis(5);
Duration traceTtlCheckInterval = Duration.ofMinutes(1);
List<String> autocompleteKeys = Collections.singletonList("environment");
SpansSerde spansSerde = new SpansSerde();
// When: topology provided
Topology topology = new TraceStorageTopology(
spansTopic,
autocompleteKeys,
traceTtl,
traceTtlCheckInterval,
0,
true,
true).get();
TopologyDescription description = topology.describe();
// Then: 1 thread prepared
assertThat(description.subtopologies()).hasSize(1);
// Given: streams config
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
// When: a trace is passed
ConsumerRecordFactory<String, List<Span>> factory =
new ConsumerRecordFactory<>(spansTopic, new StringSerializer(), spansSerde.serializer());
Span a = Span.newBuilder().traceId("a").id("a").name("op_a").kind(Span.Kind.CLIENT)
.localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
.timestamp(10000L).duration(11L)
.putTag("environment", "dev")
.build();
Span b = Span.newBuilder().traceId("a").id("b").name("op_b").kind(Span.Kind.SERVER)
.localEndpoint(Endpoint.newBuilder().serviceName("svc_b").build())
.timestamp(10000L).duration(10L)
.build();
Span c = Span.newBuilder().traceId("c").id("c").name("op_a").kind(Span.Kind.CLIENT)
.localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
.timestamp(10000L).duration(11L)
.putTag("environment", "dev")
.build();
List<Span> spans = Arrays.asList(a, b, c);
testDriver.pipeInput(factory.create(spansTopic, a.traceId(), spans, 10L));
// Then: trace stores are filled
KeyValueStore<String, List<Span>> traces = testDriver.getKeyValueStore(TRACES_STORE_NAME);
assertThat(traces.get(a.traceId())).containsExactlyElementsOf(spans);
KeyValueStore<Long, Set<String>> spanIdsByTs =
testDriver.getKeyValueStore(SPAN_IDS_BY_TS_STORE_NAME);
KeyValueIterator<Long, Set<String>> ids = spanIdsByTs.all();
assertThat(ids).hasNext();
assertThat(ids.next().value).containsExactly(a.traceId());
// Then: service name stores are filled
KeyValueStore<String, String> serviceNames =
testDriver.getKeyValueStore(SERVICE_NAMES_STORE_NAME);
List<String> serviceNameList = new ArrayList<>();
serviceNames.all().forEachRemaining(serviceName -> serviceNameList.add(serviceName.value));
assertThat(serviceNameList).hasSize(2);
assertThat(serviceNames.get("svc_a")).isEqualTo("svc_a");
assertThat(serviceNames.get("svc_b")).isEqualTo("svc_b");
KeyValueStore<String, Set<String>> spanNames =
testDriver.getKeyValueStore(SPAN_NAMES_STORE_NAME);
assertThat(spanNames.get("svc_a")).containsExactly("op_a");
assertThat(spanNames.get("svc_b")).containsExactly("op_b");
KeyValueStore<String, Set<String>> autocompleteTags =
testDriver.getKeyValueStore(AUTOCOMPLETE_TAGS_STORE_NAME);
assertThat(autocompleteTags.get("environment")).containsExactly("dev");
// When: clock moves forward
Span d = Span.newBuilder()
.traceId("d")
.id("d")
.timestamp(
MILLISECONDS.toMicros(traceTtlCheckInterval.toMillis()) + MILLISECONDS.toMicros(20))
.build();
testDriver.pipeInput(
factory.create(spansTopic, d.traceId(), Collections.singletonList(d),
traceTtlCheckInterval.plusMillis(1).toMillis()));
// Then: Traces store is empty
assertThat(traces.get(a.traceId())).isNull();
// Finally close resources
testDriver.close();
spansSerde.close();
}
示例19
@Test void should_aggregateSpans_and_mapDependencies() {
// Given: configuration
Duration traceTimeout = Duration.ofSeconds(1);
SpansSerde spansSerde = new SpansSerde();
DependencyLinkSerde dependencyLinkSerde = new DependencyLinkSerde();
// When: topology built
Topology topology = new SpanAggregationTopology(
spansTopic,
traceTopic,
dependencyTopic,
traceTimeout,
true).get();
TopologyDescription description = topology.describe();
// Then: single threaded topology
assertThat(description.subtopologies()).hasSize(1);
// Given: test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
// When: two related spans coming on the same Session window
ConsumerRecordFactory<String, List<Span>> factory =
new ConsumerRecordFactory<>(spansTopic, new StringSerializer(), spansSerde.serializer());
Span a = Span.newBuilder().traceId("a").id("a").name("op_a").kind(Span.Kind.CLIENT)
.localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
.build();
Span b = Span.newBuilder().traceId("a").id("b").name("op_b").kind(Span.Kind.SERVER)
.localEndpoint(Endpoint.newBuilder().serviceName("svc_b").build())
.build();
testDriver.pipeInput(
factory.create(spansTopic, a.traceId(), Collections.singletonList(a), 0L));
testDriver.pipeInput(
factory.create(spansTopic, b.traceId(), Collections.singletonList(b), 0L));
// When: and new record arrive, moving the event clock further than inactivity gap
Span c = Span.newBuilder().traceId("c").id("c").build();
testDriver.pipeInput(factory.create(spansTopic, c.traceId(), Collections.singletonList(c),
traceTimeout.toMillis() + 1));
// Then: a trace is aggregated.1
ProducerRecord<String, List<Span>> trace =
testDriver.readOutput(traceTopic, new StringDeserializer(), spansSerde.deserializer());
assertThat(trace).isNotNull();
OutputVerifier.compareKeyValue(trace, a.traceId(), Arrays.asList(a, b));
// Then: a dependency link is created
ProducerRecord<String, DependencyLink> linkRecord =
testDriver.readOutput(dependencyTopic, new StringDeserializer(),
dependencyLinkSerde.deserializer());
assertThat(linkRecord).isNotNull();
DependencyLink link = DependencyLink.newBuilder()
.parent("svc_a").child("svc_b").callCount(1).errorCount(0)
.build();
OutputVerifier.compareKeyValue(linkRecord, "svc_a:svc_b", link);
//Finally close resources
testDriver.close();
spansSerde.close();
dependencyLinkSerde.close();
}
示例20
@Test void should_storeDependencies() {
// Given: configs
Duration dependenciesRetentionPeriod = Duration.ofMinutes(1);
Duration dependenciesWindowSize = Duration.ofMillis(100);
// When: topology created
Topology topology = new DependencyStorageTopology(
dependencyTopic,
dependenciesRetentionPeriod,
dependenciesWindowSize,
true).get();
TopologyDescription description = topology.describe();
// Then: topology with 1 thread
assertThat(description.subtopologies()).hasSize(1);
// Given: streams configuration
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
// When: a trace is passed
ConsumerRecordFactory<String, DependencyLink> factory =
new ConsumerRecordFactory<>(dependencyTopic, new StringSerializer(),
dependencyLinkSerde.serializer());
DependencyLink dependencyLink = DependencyLink.newBuilder()
.parent("svc_a").child("svc_b").callCount(1).errorCount(0)
.build();
String dependencyLinkId = "svc_a:svc_b";
testDriver.pipeInput(factory.create(dependencyTopic, dependencyLinkId, dependencyLink, 10L));
WindowStore<String, DependencyLink> links = testDriver.getWindowStore(DEPENDENCIES_STORE_NAME);
// Then: dependency link created
WindowStoreIterator<DependencyLink> firstLink = links.fetch(dependencyLinkId, 0L, 100L);
assertThat(firstLink).hasNext();
assertThat(firstLink.next().value).isEqualTo(dependencyLink);
// When: new links appear
testDriver.pipeInput(factory.create(dependencyTopic, dependencyLinkId, dependencyLink, 90L));
// Then: dependency link increases
WindowStoreIterator<DependencyLink> secondLink = links.fetch(dependencyLinkId, 0L, 100L);
assertThat(secondLink).hasNext();
assertThat(secondLink.next().value.callCount()).isEqualTo(2);
// When: time moves forward
testDriver.advanceWallClockTime(dependenciesRetentionPeriod.toMillis() + 91L);
testDriver.pipeInput(factory.create(dependencyTopic, dependencyLinkId, dependencyLink));
// Then: dependency link is removed and restarted
KeyValueIterator<Windowed<String>, DependencyLink> thirdLink = links.all();
assertThat(thirdLink).hasNext();
assertThat(thirdLink.next().value.callCount()).isEqualTo(1);
// Close resources
testDriver.close();
dependencyLinkSerde.close();
}
示例21
@Test
public void shouldBuildMapNode() throws Exception {
verifyProcessorNode((TopologyDescription.Processor) getNodeByName(SOURCE_MAPVALUES_NODE),
Collections.singletonList(SOURCE_NODE),
Collections.singletonList(TRANSFORM_NODE));
}
示例22
@Test
public void shouldBuildTransformNode() {
final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(TRANSFORM_NODE);
verifyProcessorNode(node, Collections.singletonList(SOURCE_MAPVALUES_NODE), Collections.singletonList(FILTER_NODE));
}
示例23
@Test
public void shouldBuildFilterNode() {
final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(FILTER_NODE);
verifyProcessorNode(node, Collections.singletonList(TRANSFORM_NODE), Collections.singletonList(FILTER_MAPVALUES_NODE));
}
示例24
@Test
public void shouldBuildMapValuesNode() {
final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(FILTER_MAPVALUES_NODE);
verifyProcessorNode(node, Collections.singletonList(FILTER_NODE), Collections.singletonList(FOREACH_NODE));
}
示例25
@Test
public void shouldBuildForEachNode() {
final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(FOREACH_NODE);
verifyProcessorNode(node, Collections.singletonList(FILTER_MAPVALUES_NODE), Collections.emptyList());
}
示例26
private TopologyDescription.Node getNodeByName(String nodeName) {
return PlanTestUtil.getNodeByName(builder.build(), nodeName);
}
示例27
@Test
public void shouldBuildMapNode() {
verifyProcessorNode((TopologyDescription.Processor) getNodeByName(builder.build(), PlanTestUtil.MAPVALUES_NODE),
Collections.singletonList(PlanTestUtil.SOURCE_NODE),
Collections.singletonList(PlanTestUtil.TRANSFORM_NODE));
}
示例28
@Test
public void shouldBuildTransformNode() {
final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(builder.build(), PlanTestUtil.TRANSFORM_NODE);
verifyProcessorNode(node, Collections.singletonList(PlanTestUtil.MAPVALUES_NODE), Collections.emptyList());
}
示例29
@Test
public void shouldHaveOneSubTopologyIfGroupByKey() {
build();
final TopologyDescription description = builder.build().describe();
assertThat(description.subtopologies().size(), equalTo(1));
}
示例30
@Test
public void shouldBuildMapNodePriorToOutput() {
verifyProcessorNode((TopologyDescription.Processor) getNodeByName(builder.build(), MAPVALUES_OUTPUT_NODE),
Collections.singletonList(TRANSFORM_NODE),
Collections.singletonList(OUTPUT_NODE));
}