Java源码示例:org.apache.samza.application.descriptors.StreamApplicationDescriptor

示例1
/**
 * For unit testing only
 */
@VisibleForTesting
void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) {
  QueryPlanner planner =
      new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
          sqlConfig.getUdfMetadata());
  final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
  SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
  TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext);
  translate(relRoot, sqlConfig.getOutputSystemStreams().get(queryId), translatorContext, queryId);
  Map<Integer, TranslatorContext> translatorContexts = new HashMap<>();
  translatorContexts.put(queryId, translatorContext.clone());
  appDesc.withApplicationTaskContextFactory(new ApplicationTaskContextFactory<SamzaSqlApplicationContext>() {
    @Override
    public SamzaSqlApplicationContext create(ExternalContext externalContext, JobContext jobContext,
        ContainerContext containerContext, TaskContext taskContext,
        ApplicationContainerContext applicationContainerContext) {
      return new SamzaSqlApplicationContext(translatorContexts);
    }
  });
}
 
示例2
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");

  KafkaInputDescriptor<OrderRecord> orderStreamDescriptor =
      trackingSystem.getInputDescriptor("orders", new JsonSerdeV2<>(OrderRecord.class));
  KafkaInputDescriptor<ShipmentRecord> shipmentStreamDescriptor =
      trackingSystem.getInputDescriptor("shipments", new JsonSerdeV2<>(ShipmentRecord.class));
  KafkaOutputDescriptor<KV<String, FulfilledOrderRecord>> fulfilledOrdersStreamDescriptor =
      trackingSystem.getOutputDescriptor("fulfilledOrders",
          KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));

  appDescriptor.getInputStream(orderStreamDescriptor)
      .join(appDescriptor.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(),
          new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
          Duration.ofMinutes(1), "join")
      .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
      .sendTo(appDescriptor.getOutputStream(fulfilledOrdersStreamDescriptor));

}
 
示例3
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");

  KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
      trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));

  KafkaOutputDescriptor<KV<String, MyStreamOutput>> outputStreamDescriptor =
      trackingSystem.getOutputDescriptor("pageViewEventPerMember",
          KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class)));

  appDescriptor.withDefaultSystem(trackingSystem);
  MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
  OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = appDescriptor.getOutputStream(outputStreamDescriptor);

  pageViewEvents
      .partitionBy(pve -> pve.getMemberId(), pve -> pve,
          KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
      .window(Windows.keyedTumblingWindow(
          KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null), "window")
      .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane)))
      .sendTo(pageViewEventPerMember);
}
 
示例4
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");

  KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
      trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));

  KafkaOutputDescriptor<KV<String, StatsOutput>> outputStreamDescriptor =
      trackingSystem.getOutputDescriptor("pageViewEventPerMember",
          KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class)));

  appDescriptor.withDefaultSystem(trackingSystem);
  MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
  OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = appDescriptor.getOutputStream(outputStreamDescriptor);

  pageViewEvents
      .partitionBy(pve -> pve.getMemberId(), pve -> pve,
          KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
      .map(KV::getValue)
      .flatMap(new MyStatsCounter())
      .map(stats -> KV.of(stats.memberId, stats))
      .sendTo(pageViewEventPerMember);
}
 
示例5
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");

  KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
      trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));

  KafkaOutputDescriptor<KV<String, PageViewCount>> outputStreamDescriptor =
      trackingSystem.getOutputDescriptor("pageViewEventPerMember",
          KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));

  MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
  OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = appDescriptor.getOutputStream(outputStreamDescriptor);

  SupplierFunction<Integer> initialValue = () -> 0;
  FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
  pageViewEvents
      .window(Windows.keyedTumblingWindow(PageViewEvent::getMemberId, Duration.ofSeconds(10), initialValue, foldLeftFn, null, null)
          .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
          .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow")
      .map(windowPane -> KV.of(windowPane.getKey().getKey(), buildPageViewCount(windowPane)))
      .sendTo(pageViewEventPerMemberStream);
}
 
示例6
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
  KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");

  KafkaInputDescriptor<KV<String, PageViewEvent>> isd1 =
      trackingSystem.getInputDescriptor("pageViewStream1", serde);
  KafkaInputDescriptor<KV<String, PageViewEvent>> isd2 =
      trackingSystem.getInputDescriptor("pageViewStream2", serde);
  KafkaInputDescriptor<KV<String, PageViewEvent>> isd3 =
      trackingSystem.getInputDescriptor("pageViewStream3", serde);

  KafkaOutputDescriptor<KV<String, PageViewEvent>> osd =
      trackingSystem.getOutputDescriptor("mergedStream", serde);

  MessageStream
      .mergeAll(ImmutableList.of(appDescriptor.getInputStream(isd1), appDescriptor.getInputStream(isd2), appDescriptor.getInputStream(isd3)))
      .sendTo(appDescriptor.getOutputStream(osd));
}
 
示例7
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");

  KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
      trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
  KafkaOutputDescriptor<Integer> outputStreamDescriptor =
      trackingSystem.getOutputDescriptor("pageViewEventPerMember", new IntegerSerde());

  SupplierFunction<Integer> initialValue = () -> 0;
  FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
  MessageStream<PageViewEvent> inputStream = appDescriptor.getInputStream(inputStreamDescriptor);
  OutputStream<Integer> outputStream = appDescriptor.getOutputStream(outputStreamDescriptor);

  // create a tumbling window that outputs the number of message collected every 10 minutes.
  // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
  // for 1 minute.
  inputStream
      .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, new IntegerSerde())
          .setLateTrigger(Triggers.any(Triggers.count(30000),
              Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window")
      .map(WindowPane::getMessage)
      .sendTo(outputStream);
}
 
示例8
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
  KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
  KafkaInputDescriptor<KV<String, PageViewEvent>> pageViewEvent =
      trackingSystem.getInputDescriptor("pageViewEvent", serde);
  KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream1 =
      trackingSystem.getOutputDescriptor("outStream1", serde);
  KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream2 =
      trackingSystem.getOutputDescriptor("outStream2", serde);
  KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream3 =
      trackingSystem.getOutputDescriptor("outStream3", serde);

  MessageStream<KV<String, PageViewEvent>> inputStream = appDescriptor.getInputStream(pageViewEvent);
  inputStream.filter(m -> m.key.equals("key1")).sendTo(appDescriptor.getOutputStream(outStream1));
  inputStream.filter(m -> m.key.equals("key2")).sendTo(appDescriptor.getOutputStream(outStream2));
  inputStream.filter(m -> m.key.equals("key3")).sendTo(appDescriptor.getOutputStream(outStream3));
}
 
示例9
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");

  KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
      trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));

  KafkaOutputDescriptor<KV<String, PageViewCount>> outputStreamDescriptor =
      trackingSystem.getOutputDescriptor("pageViewEventPerMember",
          KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));

  appDescriptor.getInputStream(inputStreamDescriptor)
      .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(PageViewEvent::getMemberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1,
          null, null)
          .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
          .setAccumulationMode(AccumulationMode.DISCARDING), "window1")
      .map(m -> KV.of(m.getKey().getKey(), buildPageViewCount(m)))
      .sendTo(appDescriptor.getOutputStream(outputStreamDescriptor));

  appDescriptor.withMetricsReporterFactories(new HashMap<>());
}
 
示例10
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");

  KafkaInputDescriptor<AdClickEvent> inputStreamDescriptor =
      trackingSystem.getInputDescriptor("adClickEvent", new JsonSerdeV2<>(AdClickEvent.class));

  KafkaOutputDescriptor<KV<String, EnrichedAdClickEvent>> outputStreamDescriptor =
      trackingSystem.getOutputDescriptor("enrichedAdClickEvent",
          KVSerde.of(new StringSerde(), new JsonSerdeV2<>(EnrichedAdClickEvent.class)));

  MessageStream<AdClickEvent> adClickEventStream = appDescriptor.getInputStream(inputStreamDescriptor);
  OutputStream<KV<String, EnrichedAdClickEvent>> enrichedAdClickStream =
      appDescriptor.getOutputStream(outputStreamDescriptor);

  adClickEventStream
      .flatMapAsync(AsyncApplicationExample::enrichAdClickEvent)
      .map(enrichedAdClickEvent -> KV.of(enrichedAdClickEvent.getCountry(), enrichedAdClickEvent))
      .sendTo(enrichedAdClickStream);
}
 
示例11
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
  KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde());
  KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
  KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
  KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);

  MessageStream<PageView> pageViews = appDescriptor.getInputStream(id);
  OutputStream<KV<String, Integer>> outputStream = appDescriptor.getOutputStream(od);

  pageViews
      .filter(m -> !FILTER_KEY.equals(m.getUserId()))
      .window(Windows.keyedSessionWindow(PageView::getUserId, Duration.ofSeconds(3),
          new StringSerde(), new JsonSerdeV2<>(PageView.class)), "sessionWindow")
      .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
      .sendTo(outputStream);
}
 
示例12
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
  KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde());
  KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
  KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
  KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);

  MessageStream<PageView> pageViews = appDescriptor.getInputStream(id);
  OutputStream<KV<String, Integer>> outputStream = appDescriptor.getOutputStream(od);

  pageViews
      .filter(m -> !FILTER_KEY.equals(m.getUserId()))
      .window(Windows.keyedTumblingWindow(PageView::getUserId, Duration.ofSeconds(3),
          new StringSerde(), new JsonSerdeV2<>(PageView.class)), "tumblingWindow")
      .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
      .sendTo(outputStream);

}
 
示例13
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KVSerde<String, PageView> inputSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class));
  KVSerde<String, String> outputSerde = KVSerde.of(new StringSerde(), new StringSerde());
  KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
  KafkaInputDescriptor<KV<String, PageView>> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
  KafkaOutputDescriptor<KV<String, String>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);

  appDescriptor.getInputStream(id)
      .map(KV::getValue)
      .partitionBy(PageView::getUserId, m -> m, inputSerde, "p1")
      .window(Windows.keyedSessionWindow(m -> m.getKey(), Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new IntegerSerde()), "w1")
      .map(wp -> KV.of(wp.getKey().getKey().toString(), String.valueOf(wp.getMessage())))
      .sendTo(appDescriptor.getOutputStream(od));

}
 
示例14
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  Config config = appDescriptor.getConfig();
  KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(TEST_SYSTEM);
  KafkaOutputDescriptor<PageView>
      outputDescriptor = kafkaSystemDescriptor.getOutputDescriptor(NON_GUEST_PAGE_VIEW_STREAM, new NoOpSerde<>());
  OutputStream<PageView> nonGuestPageViewStream = appDescriptor.getOutputStream(outputDescriptor);

  Predicate<PageView> failProcess = (Predicate<PageView> & Serializable) (ignored) -> config.getBoolean(FAIL_PROCESS, false);
  Predicate<PageView> failDownstreamOperator = (Predicate<PageView> & Serializable) (ignored) -> config.getBoolean(FAIL_DOWNSTREAM_OPERATOR, false);
  Supplier<Long> processJitter = (Supplier<Long> & Serializable) () -> config.getLong(PROCESS_JITTER, 100);

  appDescriptor.getInputStream(kafkaSystemDescriptor.getInputDescriptor(PAGE_VIEW_STREAM, new NoOpSerde<PageView>()))
      .flatMapAsync(pageView -> filterGuestPageViews(pageView, failProcess, processJitter))
      .filter(pageView -> filterLoginPageViews(pageView, failDownstreamOperator))
      .sendTo(nonGuestPageViewStream);
}
 
示例15
private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache,
    StreamApplicationDescriptor appDesc) {
  String id = actualTableDesc.getTableId();
  CachingTableDescriptor<K, V> cachingDesc;
  if (defaultCache) {
    cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc);
    cachingDesc.withReadTtl(Duration.ofMinutes(5));
    cachingDesc.withWriteTtl(Duration.ofMinutes(5));
  } else {
    GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
    guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
    cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc);
  }

  return appDesc.getTable(cachingDesc);
}
 
示例16
@Override
public void describe(StreamApplicationDescriptor appDesc) {
  Table<KV<Integer, Profile>> table = appDesc.getTable(
      new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
  DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
  GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
  appDesc.getInputStream(profileISD)
      .map(m -> new KV(m.getMemberId(), m))
      .sendTo(table);

  GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
  appDesc.getInputStream(pageViewISD)
      .map(pv -> {
        received.add(pv);
        return pv;
      })
      .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
      .join(table, new PageViewToProfileJoinFunction())
      .sink((m, collector, coordinator) -> joined.add(m));
}
 
示例17
@Test
public void testImmediateTimer() {
  final InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
  final InMemoryInputDescriptor<Integer> imid = isd.getInputDescriptor("test-input", new IntegerSerde());

  StreamApplication app = new StreamApplication() {
    @Override
    public void describe(StreamApplicationDescriptor appDescriptor) {

      appDescriptor.getInputStream(imid)
          .map(new TestFunction());
    }
  };

  TestRunner
      .of(app)
      .addInputStream(imid, Arrays.asList(1, 2, 3, 4, 5))
      .run(Duration.ofSeconds(1));

  assertTrue(timerFired.get());
}
 
示例18
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
  KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka");
  KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, serde);
  final MessageStream<PageView> pageViews = appDescriptor.getInputStream(isd);
  final MessageStream<PageView> output = pageViews.flatMap(new FlatmapScheduledFn());

  MessageStreamAssert.that("Output from scheduling function should container all complete messages", output, serde)
      .containsInAnyOrder(
          Arrays.asList(
              new PageView("v1-complete", "p1", "u1"),
              new PageView("v2-complete", "p2", "u1"),
              new PageView("v3-complete", "p1", "u2"),
              new PageView("v4-complete", "p3", "u2")
          ));
}
 
示例19
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  Config config = appDescriptor.getConfig();
  String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);

  final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
  KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
  KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, serde);
  final MessageStream<PageView> broadcastPageViews = appDescriptor
      .getInputStream(isd)
      .broadcast(serde, "pv");

  /**
   * Each task will see all the pageview events
   */
  MessageStreamAssert.that("Each task contains all broadcast PageView events", broadcastPageViews, serde)
      .forEachTask()
      .containsInAnyOrder(
          Arrays.asList(
              new PageView("v1", "p1", "u1"),
              new PageView("v2", "p2", "u1"),
              new PageView("v3", "p1", "u2"),
              new PageView("v4", "p3", "u2")
          ));
}
 
示例20
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME)
      .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
      .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
      .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);

  KVSerde<String, PageView> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class));
  KafkaInputDescriptor<KV<String, PageView>> inputDescriptor =
      kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, serde);
  KafkaOutputDescriptor<KV<String, PageView>> outputDescriptor =
      kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde);

  appDescriptor.withDefaultSystem(kafkaSystemDescriptor);

  MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(inputDescriptor);
  OutputStream<KV<String, PageView>> filteredPageViews = appDescriptor.getOutputStream(outputDescriptor);

  pageViews
      .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId))
      .sendTo(filteredPageViews);
}
 
示例21
public TranslationContext(
    StreamApplicationDescriptor appDescriptor,
    Map<PValue, String> idMap,
    SamzaPipelineOptions options) {
  this.appDescriptor = appDescriptor;
  this.idMap = idMap;
  this.options = options;
}
 
示例22
/**
 * Create the instance of TranslatorContext
 * @param streamAppDesc Samza's streamAppDesc that is populated during the translation.
 * @param relRoot Root of the relational graph from calcite.
 * @param executionContext the execution context
 */
public TranslatorContext(StreamApplicationDescriptor streamAppDesc, RelRoot relRoot, SamzaSqlExecutionContext executionContext) {
  this.streamAppDesc = streamAppDesc;
  this.compiler = createExpressionCompiler(relRoot);
  this.executionContext = executionContext;
  this.dataContext = new DataContextImpl();
  this.relSamzaConverters = executionContext.getSamzaSqlApplicationConfig().getSamzaRelConverters();
  this.relTableKeyConverters = executionContext.getSamzaSqlApplicationConfig().getSamzaRelTableKeyConverters();
  this.messageStreams = new HashMap<>();
  this.relNodes = new HashMap<>();
  this.systemDescriptors = new HashMap<>();
}
 
示例23
public QueryTranslator(StreamApplicationDescriptor appDesc, SamzaSqlApplicationConfig sqlConfig) {
  this.sqlConfig = sqlConfig;
  this.streamAppDescriptor = appDesc;
  this.systemDescriptors = new HashMap<>();
  this.outputMsgStreams = new HashMap<>();
  this.inputMsgStreams = new HashMap<>();
}
 
示例24
private void sendToOutputStream(String queryLogicalId, String logicalOpId, String sinkStream,
    StreamApplicationDescriptor appDesc, TranslatorContext translatorContext, RelNode node, int queryId) {
  SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sinkStream);
  MessageStream<SamzaSqlRelMessage> stream = translatorContext.getMessageStream(node.getId());
  MessageStream<KV<Object, Object>> outputStream =
      stream.map(new OutputMapFunction(queryLogicalId, logicalOpId, sinkStream, queryId));
  Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
  if (!tableDescriptor.isPresent()) {
    KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
    String systemName = sinkConfig.getSystemName();
    DelegatingSystemDescriptor sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
    GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
    OutputStream stm = outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v -> appDesc.getOutputStream(osd));
    outputStream.sendTo(stm);

    // Process system events only if the output is a stream.
    if (sqlConfig.isProcessSystemEvents()) {
      for (MessageStream<SamzaSqlInputMessage> inputStream : inputMsgStreams.values()) {
        MessageStream<KV<Object, Object>> systemEventStream =
            inputStream.filter(message -> message.getMetadata().isSystemMessage())
                .map(SamzaSqlInputMessage::getKeyAndMessageKV);

        systemEventStream.sendTo(stm);
      }
    }
  } else {
    Table outputTable = appDesc.getTable(tableDescriptor.get());
    if (outputTable == null) {
      String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
      throw new SamzaException(msg);
    }
    outputStream.sendTo(outputTable);
  }
}
 
示例25
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  String systemName = "testSystemName";
  String inputStreamName = appDescriptor.getConfig().get("input.stream.name");
  String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
  LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, outputStreamName);
  KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(systemName);

  KVSerde<Object, Object> noOpSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
  KafkaInputDescriptor<KV<Object, Object>> isd =
      kafkaSystemDescriptor.getInputDescriptor(inputStreamName, noOpSerde);
  KafkaOutputDescriptor<KV<Object, Object>> osd =
      kafkaSystemDescriptor.getOutputDescriptor(outputStreamName, noOpSerde);
  appDescriptor.getInputStream(isd).sendTo(appDescriptor.getOutputStream(osd));
}
 
示例26
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(systemName);
  KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor(outputTopic, new StringSerde());
  OutputStream<String> outputStream = appDescriptor.getOutputStream(osd);

  for (String inputTopic : inputTopics) {
    KafkaInputDescriptor<String> isd = ksd.getInputDescriptor(inputTopic, new NoOpSerde<>());
    MessageStream<String> inputStream = appDescriptor.getInputStream(isd);
    inputStream.map(new TestMapFunction(appName, processorName)).sendTo(outputStream);
  }
}
 
示例27
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(getTableDescriptor());
  KafkaSystemDescriptor sd =
      new KafkaSystemDescriptor("test");
  appDescriptor.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
      .partitionBy(TestTableData.PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "partition-page-view")
      .join(table, new PageViewToProfileJoinFunction())
      .sendTo(appDescriptor.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())));
}
 
示例28
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(
      new RocksDbTableDescriptor<Integer, TestTableData.Profile>("profile-view-store",
          KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));

  KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");

  KafkaInputDescriptor<KV<String, TestTableData.Profile>> profileISD =
      ksd.getInputDescriptor("Profile", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));

  KafkaInputDescriptor<KV<String, TestTableData.PageView>> pageViewISD =
      ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
  KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD =
      ksd.getOutputDescriptor("EnrichedPageView", new JsonSerdeV2<>());

  appDescriptor.getInputStream(profileISD)
      .map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
      .sendTo(table)
      .sink((kv, collector, coordinator) -> {
        LOG.info("Inserted Profile with Key: {} in profile-view-store", kv.getKey());
      });

  OutputStream<TestTableData.EnrichedPageView> outputStream = appDescriptor.getOutputStream(enrichedPageViewOSD);
  appDescriptor.getInputStream(pageViewISD)
      .partitionBy(pv -> pv.getValue().getMemberId(),  pv -> pv.getValue(), KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(TestTableData.PageView.class)), "p1")
      .join(table, new PageViewToProfileJoinFunction())
      .sendTo(outputStream)
      .map(TestTableData.EnrichedPageView::getPageKey)
      .sink((joinPageKey, collector, coordinator) -> {
        collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "JoinPageKeys"), null, null, joinPageKey));
      });

}
 
示例29
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
  KafkaInputDescriptor<KV<String, PageView>> isd =
      ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
  MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
  inputStream.map(KV::getValue).filter(pv -> pv.getPageKey().equals("inbox"));
}
 
示例30
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
  KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
  KafkaInputDescriptor<KV<String, PageView>> isd =
      ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
  MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
  inputStream
      .map(KV::getValue)
      .partitionBy(PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(PageView.class)), "p1")
      .sink((m, collector, coordinator) ->
          collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "Output"), m.getKey(), m.getKey(), m)));
}