Java源码示例:org.apache.samza.application.descriptors.StreamApplicationDescriptor
示例1
/**
* For unit testing only
*/
@VisibleForTesting
void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) {
QueryPlanner planner =
new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata());
final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext);
translate(relRoot, sqlConfig.getOutputSystemStreams().get(queryId), translatorContext, queryId);
Map<Integer, TranslatorContext> translatorContexts = new HashMap<>();
translatorContexts.put(queryId, translatorContext.clone());
appDesc.withApplicationTaskContextFactory(new ApplicationTaskContextFactory<SamzaSqlApplicationContext>() {
@Override
public SamzaSqlApplicationContext create(ExternalContext externalContext, JobContext jobContext,
ContainerContext containerContext, TaskContext taskContext,
ApplicationContainerContext applicationContainerContext) {
return new SamzaSqlApplicationContext(translatorContexts);
}
});
}
示例2
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<OrderRecord> orderStreamDescriptor =
trackingSystem.getInputDescriptor("orders", new JsonSerdeV2<>(OrderRecord.class));
KafkaInputDescriptor<ShipmentRecord> shipmentStreamDescriptor =
trackingSystem.getInputDescriptor("shipments", new JsonSerdeV2<>(ShipmentRecord.class));
KafkaOutputDescriptor<KV<String, FulfilledOrderRecord>> fulfilledOrdersStreamDescriptor =
trackingSystem.getOutputDescriptor("fulfilledOrders",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
appDescriptor.getInputStream(orderStreamDescriptor)
.join(appDescriptor.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(),
new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
Duration.ofMinutes(1), "join")
.map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
.sendTo(appDescriptor.getOutputStream(fulfilledOrdersStreamDescriptor));
}
示例3
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
KafkaOutputDescriptor<KV<String, MyStreamOutput>> outputStreamDescriptor =
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class)));
appDescriptor.withDefaultSystem(trackingSystem);
MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = appDescriptor.getOutputStream(outputStreamDescriptor);
pageViewEvents
.partitionBy(pve -> pve.getMemberId(), pve -> pve,
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
.window(Windows.keyedTumblingWindow(
KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null), "window")
.map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane)))
.sendTo(pageViewEventPerMember);
}
示例4
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
KafkaOutputDescriptor<KV<String, StatsOutput>> outputStreamDescriptor =
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class)));
appDescriptor.withDefaultSystem(trackingSystem);
MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = appDescriptor.getOutputStream(outputStreamDescriptor);
pageViewEvents
.partitionBy(pve -> pve.getMemberId(), pve -> pve,
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
.map(KV::getValue)
.flatMap(new MyStatsCounter())
.map(stats -> KV.of(stats.memberId, stats))
.sendTo(pageViewEventPerMember);
}
示例5
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
KafkaOutputDescriptor<KV<String, PageViewCount>> outputStreamDescriptor =
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = appDescriptor.getOutputStream(outputStreamDescriptor);
SupplierFunction<Integer> initialValue = () -> 0;
FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
pageViewEvents
.window(Windows.keyedTumblingWindow(PageViewEvent::getMemberId, Duration.ofSeconds(10), initialValue, foldLeftFn, null, null)
.setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
.setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow")
.map(windowPane -> KV.of(windowPane.getKey().getKey(), buildPageViewCount(windowPane)))
.sendTo(pageViewEventPerMemberStream);
}
示例6
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<KV<String, PageViewEvent>> isd1 =
trackingSystem.getInputDescriptor("pageViewStream1", serde);
KafkaInputDescriptor<KV<String, PageViewEvent>> isd2 =
trackingSystem.getInputDescriptor("pageViewStream2", serde);
KafkaInputDescriptor<KV<String, PageViewEvent>> isd3 =
trackingSystem.getInputDescriptor("pageViewStream3", serde);
KafkaOutputDescriptor<KV<String, PageViewEvent>> osd =
trackingSystem.getOutputDescriptor("mergedStream", serde);
MessageStream
.mergeAll(ImmutableList.of(appDescriptor.getInputStream(isd1), appDescriptor.getInputStream(isd2), appDescriptor.getInputStream(isd3)))
.sendTo(appDescriptor.getOutputStream(osd));
}
示例7
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
KafkaOutputDescriptor<Integer> outputStreamDescriptor =
trackingSystem.getOutputDescriptor("pageViewEventPerMember", new IntegerSerde());
SupplierFunction<Integer> initialValue = () -> 0;
FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
MessageStream<PageViewEvent> inputStream = appDescriptor.getInputStream(inputStreamDescriptor);
OutputStream<Integer> outputStream = appDescriptor.getOutputStream(outputStreamDescriptor);
// create a tumbling window that outputs the number of message collected every 10 minutes.
// also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
// for 1 minute.
inputStream
.window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, new IntegerSerde())
.setLateTrigger(Triggers.any(Triggers.count(30000),
Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window")
.map(WindowPane::getMessage)
.sendTo(outputStream);
}
示例8
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<KV<String, PageViewEvent>> pageViewEvent =
trackingSystem.getInputDescriptor("pageViewEvent", serde);
KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream1 =
trackingSystem.getOutputDescriptor("outStream1", serde);
KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream2 =
trackingSystem.getOutputDescriptor("outStream2", serde);
KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream3 =
trackingSystem.getOutputDescriptor("outStream3", serde);
MessageStream<KV<String, PageViewEvent>> inputStream = appDescriptor.getInputStream(pageViewEvent);
inputStream.filter(m -> m.key.equals("key1")).sendTo(appDescriptor.getOutputStream(outStream1));
inputStream.filter(m -> m.key.equals("key2")).sendTo(appDescriptor.getOutputStream(outStream2));
inputStream.filter(m -> m.key.equals("key3")).sendTo(appDescriptor.getOutputStream(outStream3));
}
示例9
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
KafkaOutputDescriptor<KV<String, PageViewCount>> outputStreamDescriptor =
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
appDescriptor.getInputStream(inputStreamDescriptor)
.window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(PageViewEvent::getMemberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1,
null, null)
.setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
.setAccumulationMode(AccumulationMode.DISCARDING), "window1")
.map(m -> KV.of(m.getKey().getKey(), buildPageViewCount(m)))
.sendTo(appDescriptor.getOutputStream(outputStreamDescriptor));
appDescriptor.withMetricsReporterFactories(new HashMap<>());
}
示例10
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<AdClickEvent> inputStreamDescriptor =
trackingSystem.getInputDescriptor("adClickEvent", new JsonSerdeV2<>(AdClickEvent.class));
KafkaOutputDescriptor<KV<String, EnrichedAdClickEvent>> outputStreamDescriptor =
trackingSystem.getOutputDescriptor("enrichedAdClickEvent",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(EnrichedAdClickEvent.class)));
MessageStream<AdClickEvent> adClickEventStream = appDescriptor.getInputStream(inputStreamDescriptor);
OutputStream<KV<String, EnrichedAdClickEvent>> enrichedAdClickStream =
appDescriptor.getOutputStream(outputStreamDescriptor);
adClickEventStream
.flatMapAsync(AsyncApplicationExample::enrichAdClickEvent)
.map(enrichedAdClickEvent -> KV.of(enrichedAdClickEvent.getCountry(), enrichedAdClickEvent))
.sendTo(enrichedAdClickStream);
}
示例11
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde());
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
MessageStream<PageView> pageViews = appDescriptor.getInputStream(id);
OutputStream<KV<String, Integer>> outputStream = appDescriptor.getOutputStream(od);
pageViews
.filter(m -> !FILTER_KEY.equals(m.getUserId()))
.window(Windows.keyedSessionWindow(PageView::getUserId, Duration.ofSeconds(3),
new StringSerde(), new JsonSerdeV2<>(PageView.class)), "sessionWindow")
.map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
.sendTo(outputStream);
}
示例12
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde());
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
MessageStream<PageView> pageViews = appDescriptor.getInputStream(id);
OutputStream<KV<String, Integer>> outputStream = appDescriptor.getOutputStream(od);
pageViews
.filter(m -> !FILTER_KEY.equals(m.getUserId()))
.window(Windows.keyedTumblingWindow(PageView::getUserId, Duration.ofSeconds(3),
new StringSerde(), new JsonSerdeV2<>(PageView.class)), "tumblingWindow")
.map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
.sendTo(outputStream);
}
示例13
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KVSerde<String, PageView> inputSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class));
KVSerde<String, String> outputSerde = KVSerde.of(new StringSerde(), new StringSerde());
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<KV<String, PageView>> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
KafkaOutputDescriptor<KV<String, String>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
appDescriptor.getInputStream(id)
.map(KV::getValue)
.partitionBy(PageView::getUserId, m -> m, inputSerde, "p1")
.window(Windows.keyedSessionWindow(m -> m.getKey(), Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new IntegerSerde()), "w1")
.map(wp -> KV.of(wp.getKey().getKey().toString(), String.valueOf(wp.getMessage())))
.sendTo(appDescriptor.getOutputStream(od));
}
示例14
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
Config config = appDescriptor.getConfig();
KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(TEST_SYSTEM);
KafkaOutputDescriptor<PageView>
outputDescriptor = kafkaSystemDescriptor.getOutputDescriptor(NON_GUEST_PAGE_VIEW_STREAM, new NoOpSerde<>());
OutputStream<PageView> nonGuestPageViewStream = appDescriptor.getOutputStream(outputDescriptor);
Predicate<PageView> failProcess = (Predicate<PageView> & Serializable) (ignored) -> config.getBoolean(FAIL_PROCESS, false);
Predicate<PageView> failDownstreamOperator = (Predicate<PageView> & Serializable) (ignored) -> config.getBoolean(FAIL_DOWNSTREAM_OPERATOR, false);
Supplier<Long> processJitter = (Supplier<Long> & Serializable) () -> config.getLong(PROCESS_JITTER, 100);
appDescriptor.getInputStream(kafkaSystemDescriptor.getInputDescriptor(PAGE_VIEW_STREAM, new NoOpSerde<PageView>()))
.flatMapAsync(pageView -> filterGuestPageViews(pageView, failProcess, processJitter))
.filter(pageView -> filterLoginPageViews(pageView, failDownstreamOperator))
.sendTo(nonGuestPageViewStream);
}
示例15
private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache,
StreamApplicationDescriptor appDesc) {
String id = actualTableDesc.getTableId();
CachingTableDescriptor<K, V> cachingDesc;
if (defaultCache) {
cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc);
cachingDesc.withReadTtl(Duration.ofMinutes(5));
cachingDesc.withWriteTtl(Duration.ofMinutes(5));
} else {
GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc);
}
return appDesc.getTable(cachingDesc);
}
示例16
@Override
public void describe(StreamApplicationDescriptor appDesc) {
Table<KV<Integer, Profile>> table = appDesc.getTable(
new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
appDesc.getInputStream(profileISD)
.map(m -> new KV(m.getMemberId(), m))
.sendTo(table);
GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
appDesc.getInputStream(pageViewISD)
.map(pv -> {
received.add(pv);
return pv;
})
.partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
.join(table, new PageViewToProfileJoinFunction())
.sink((m, collector, coordinator) -> joined.add(m));
}
示例17
@Test
public void testImmediateTimer() {
final InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
final InMemoryInputDescriptor<Integer> imid = isd.getInputDescriptor("test-input", new IntegerSerde());
StreamApplication app = new StreamApplication() {
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
appDescriptor.getInputStream(imid)
.map(new TestFunction());
}
};
TestRunner
.of(app)
.addInputStream(imid, Arrays.asList(1, 2, 3, 4, 5))
.run(Duration.ofSeconds(1));
assertTrue(timerFired.get());
}
示例18
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka");
KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, serde);
final MessageStream<PageView> pageViews = appDescriptor.getInputStream(isd);
final MessageStream<PageView> output = pageViews.flatMap(new FlatmapScheduledFn());
MessageStreamAssert.that("Output from scheduling function should container all complete messages", output, serde)
.containsInAnyOrder(
Arrays.asList(
new PageView("v1-complete", "p1", "u1"),
new PageView("v2-complete", "p2", "u1"),
new PageView("v3-complete", "p1", "u2"),
new PageView("v4-complete", "p3", "u2")
));
}
示例19
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
Config config = appDescriptor.getConfig();
String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, serde);
final MessageStream<PageView> broadcastPageViews = appDescriptor
.getInputStream(isd)
.broadcast(serde, "pv");
/**
* Each task will see all the pageview events
*/
MessageStreamAssert.that("Each task contains all broadcast PageView events", broadcastPageViews, serde)
.forEachTask()
.containsInAnyOrder(
Arrays.asList(
new PageView("v1", "p1", "u1"),
new PageView("v2", "p2", "u1"),
new PageView("v3", "p1", "u2"),
new PageView("v4", "p3", "u2")
));
}
示例20
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME)
.withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
.withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
.withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
KVSerde<String, PageView> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class));
KafkaInputDescriptor<KV<String, PageView>> inputDescriptor =
kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, serde);
KafkaOutputDescriptor<KV<String, PageView>> outputDescriptor =
kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde);
appDescriptor.withDefaultSystem(kafkaSystemDescriptor);
MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(inputDescriptor);
OutputStream<KV<String, PageView>> filteredPageViews = appDescriptor.getOutputStream(outputDescriptor);
pageViews
.filter(kv -> !INVALID_USER_ID.equals(kv.value.userId))
.sendTo(filteredPageViews);
}
示例21
public TranslationContext(
StreamApplicationDescriptor appDescriptor,
Map<PValue, String> idMap,
SamzaPipelineOptions options) {
this.appDescriptor = appDescriptor;
this.idMap = idMap;
this.options = options;
}
示例22
/**
* Create the instance of TranslatorContext
* @param streamAppDesc Samza's streamAppDesc that is populated during the translation.
* @param relRoot Root of the relational graph from calcite.
* @param executionContext the execution context
*/
public TranslatorContext(StreamApplicationDescriptor streamAppDesc, RelRoot relRoot, SamzaSqlExecutionContext executionContext) {
this.streamAppDesc = streamAppDesc;
this.compiler = createExpressionCompiler(relRoot);
this.executionContext = executionContext;
this.dataContext = new DataContextImpl();
this.relSamzaConverters = executionContext.getSamzaSqlApplicationConfig().getSamzaRelConverters();
this.relTableKeyConverters = executionContext.getSamzaSqlApplicationConfig().getSamzaRelTableKeyConverters();
this.messageStreams = new HashMap<>();
this.relNodes = new HashMap<>();
this.systemDescriptors = new HashMap<>();
}
示例23
public QueryTranslator(StreamApplicationDescriptor appDesc, SamzaSqlApplicationConfig sqlConfig) {
this.sqlConfig = sqlConfig;
this.streamAppDescriptor = appDesc;
this.systemDescriptors = new HashMap<>();
this.outputMsgStreams = new HashMap<>();
this.inputMsgStreams = new HashMap<>();
}
示例24
private void sendToOutputStream(String queryLogicalId, String logicalOpId, String sinkStream,
StreamApplicationDescriptor appDesc, TranslatorContext translatorContext, RelNode node, int queryId) {
SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sinkStream);
MessageStream<SamzaSqlRelMessage> stream = translatorContext.getMessageStream(node.getId());
MessageStream<KV<Object, Object>> outputStream =
stream.map(new OutputMapFunction(queryLogicalId, logicalOpId, sinkStream, queryId));
Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
if (!tableDescriptor.isPresent()) {
KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
String systemName = sinkConfig.getSystemName();
DelegatingSystemDescriptor sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
OutputStream stm = outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v -> appDesc.getOutputStream(osd));
outputStream.sendTo(stm);
// Process system events only if the output is a stream.
if (sqlConfig.isProcessSystemEvents()) {
for (MessageStream<SamzaSqlInputMessage> inputStream : inputMsgStreams.values()) {
MessageStream<KV<Object, Object>> systemEventStream =
inputStream.filter(message -> message.getMetadata().isSystemMessage())
.map(SamzaSqlInputMessage::getKeyAndMessageKV);
systemEventStream.sendTo(stm);
}
}
} else {
Table outputTable = appDesc.getTable(tableDescriptor.get());
if (outputTable == null) {
String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
throw new SamzaException(msg);
}
outputStream.sendTo(outputTable);
}
}
示例25
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
String systemName = "testSystemName";
String inputStreamName = appDescriptor.getConfig().get("input.stream.name");
String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, outputStreamName);
KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(systemName);
KVSerde<Object, Object> noOpSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
KafkaInputDescriptor<KV<Object, Object>> isd =
kafkaSystemDescriptor.getInputDescriptor(inputStreamName, noOpSerde);
KafkaOutputDescriptor<KV<Object, Object>> osd =
kafkaSystemDescriptor.getOutputDescriptor(outputStreamName, noOpSerde);
appDescriptor.getInputStream(isd).sendTo(appDescriptor.getOutputStream(osd));
}
示例26
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(systemName);
KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor(outputTopic, new StringSerde());
OutputStream<String> outputStream = appDescriptor.getOutputStream(osd);
for (String inputTopic : inputTopics) {
KafkaInputDescriptor<String> isd = ksd.getInputDescriptor(inputTopic, new NoOpSerde<>());
MessageStream<String> inputStream = appDescriptor.getInputStream(isd);
inputStream.map(new TestMapFunction(appName, processorName)).sendTo(outputStream);
}
}
示例27
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(getTableDescriptor());
KafkaSystemDescriptor sd =
new KafkaSystemDescriptor("test");
appDescriptor.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
.partitionBy(TestTableData.PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "partition-page-view")
.join(table, new PageViewToProfileJoinFunction())
.sendTo(appDescriptor.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())));
}
示例28
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(
new RocksDbTableDescriptor<Integer, TestTableData.Profile>("profile-view-store",
KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
KafkaInputDescriptor<KV<String, TestTableData.Profile>> profileISD =
ksd.getInputDescriptor("Profile", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
KafkaInputDescriptor<KV<String, TestTableData.PageView>> pageViewISD =
ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD =
ksd.getOutputDescriptor("EnrichedPageView", new JsonSerdeV2<>());
appDescriptor.getInputStream(profileISD)
.map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
.sendTo(table)
.sink((kv, collector, coordinator) -> {
LOG.info("Inserted Profile with Key: {} in profile-view-store", kv.getKey());
});
OutputStream<TestTableData.EnrichedPageView> outputStream = appDescriptor.getOutputStream(enrichedPageViewOSD);
appDescriptor.getInputStream(pageViewISD)
.partitionBy(pv -> pv.getValue().getMemberId(), pv -> pv.getValue(), KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(TestTableData.PageView.class)), "p1")
.join(table, new PageViewToProfileJoinFunction())
.sendTo(outputStream)
.map(TestTableData.EnrichedPageView::getPageKey)
.sink((joinPageKey, collector, coordinator) -> {
collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "JoinPageKeys"), null, null, joinPageKey));
});
}
示例29
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
KafkaInputDescriptor<KV<String, PageView>> isd =
ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
inputStream.map(KV::getValue).filter(pv -> pv.getPageKey().equals("inbox"));
}
示例30
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
KafkaInputDescriptor<KV<String, PageView>> isd =
ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
inputStream
.map(KV::getValue)
.partitionBy(PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(PageView.class)), "p1")
.sink((m, collector, coordinator) ->
collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "Output"), m.getKey(), m.getKey(), m)));
}