Java源码示例:org.apache.samza.system.IncomingMessageEnvelope
示例1
private List<IncomingMessageEnvelope> getNextMessages(
SystemStreamPartition ssp, long timeoutMillis) throws InterruptedException {
if (lastException != null) {
throw new RuntimeException(lastException);
}
final List<IncomingMessageEnvelope> envelopes = new ArrayList<>();
final BlockingQueue<IncomingMessageEnvelope> queue = queues.get(ssp);
final IncomingMessageEnvelope envelope = queue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
if (envelope != null) {
envelopes.add(envelope);
queue.drainTo(envelopes);
}
available.release(envelopes.size());
if (lastException != null) {
throw new RuntimeException(lastException);
}
return envelopes;
}
示例2
private ReaderTask(
Map<UnboundedReader, SystemStreamPartition> readerToSsp,
Coder<CheckpointMarkT> checkpointMarkCoder,
int capacity,
long watermarkInterval,
FnWithMetricsWrapper metricsWrapper) {
this.readerToSsp = readerToSsp;
this.checkpointMarkCoder = checkpointMarkCoder;
this.readers = ImmutableList.copyOf(readerToSsp.keySet());
this.watermarkInterval = watermarkInterval;
this.available = new Semaphore(capacity);
this.metricsWrapper = metricsWrapper;
final Map<SystemStreamPartition, LinkedBlockingQueue<IncomingMessageEnvelope>> qs =
new HashMap<>();
readerToSsp.values().forEach(ssp -> qs.put(ssp, new LinkedBlockingQueue<>()));
this.queues = ImmutableMap.copyOf(qs);
}
示例3
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new LinkedHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
assertEquals(1, systemStreamPartitions.size());
SystemStreamPartition systemStreamPartition = systemStreamPartitions.iterator().next();
assertEquals(expectedSystemStreamPartition, systemStreamPartition);
if (pollCount++ == 0) {
List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
SetConfig setConfig1 = new SetConfig("test", "job.name", "my-job-name");
SetConfig setConfig2 = new SetConfig("test", "job.id", "1234");
Delete delete = new Delete("test", "job.name", SetConfig.TYPE);
list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig1.getKeyArray()), serialize(setConfig1.getMessageMap())));
list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig2.getKeyArray()), serialize(setConfig2.getMessageMap())));
list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(delete.getKeyArray()), delete.getMessageMap()));
map.put(systemStreamPartition, list);
}
return map;
}
示例4
private static List<IncomingMessageEnvelope> consumeUntilTimeoutOrWatermark(
SystemConsumer consumer, SystemStreamPartition ssp, long timeoutMillis)
throws InterruptedException {
assertTrue("Expected timeoutMillis (" + timeoutMillis + ") >= 0", timeoutMillis >= 0);
final List<IncomingMessageEnvelope> accumulator = new ArrayList<>();
final long start = System.currentTimeMillis();
long now = start;
while (timeoutMillis + start >= now) {
accumulator.addAll(pollOnce(consumer, ssp, now - start - timeoutMillis));
if (!accumulator.isEmpty()
&& MessageType.of(accumulator.get(accumulator.size() - 1).getMessage())
== MessageType.WATERMARK) {
break;
}
now = System.currentTimeMillis();
}
return accumulator;
}
示例5
private static List<IncomingMessageEnvelope> consumeUntilTimeoutOrEos(
SystemConsumer consumer, SystemStreamPartition ssp, long timeoutMillis)
throws InterruptedException {
assertTrue("Expected timeoutMillis (" + timeoutMillis + ") >= 0", timeoutMillis >= 0);
final List<IncomingMessageEnvelope> accumulator = new ArrayList<>();
final long start = System.currentTimeMillis();
long now = start;
while (timeoutMillis + start >= now) {
accumulator.addAll(pollOnce(consumer, ssp, now - start - timeoutMillis));
if (!accumulator.isEmpty() && accumulator.get(accumulator.size() - 1).isEndOfStream()) {
break;
}
now = System.currentTimeMillis();
}
return accumulator;
}
示例6
/**
* Processes the incoming side input message envelope and updates the last processed offset for its SSP.
* Synchronized inorder to be exclusive with flush().
*
* @param envelope incoming envelope to be processed
*/
public synchronized void process(IncomingMessageEnvelope envelope) {
SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
String envelopeOffset = envelope.getOffset();
for (String store: this.sspToStores.get(envelopeSSP)) {
SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
KeyValueStore keyValueStore = (KeyValueStore) this.taskSideInputStorageManager.getStore(store);
Collection<Entry<?, ?>> entriesToBeWritten = storeProcessor.process(envelope, keyValueStore);
// TODO: SAMZA-2255: optimize writes to side input stores
for (Entry entry : entriesToBeWritten) {
// If the key is null we ignore, if the value is null, we issue a delete, else we issue a put
if (entry.getKey() != null) {
if (entry.getValue() != null) {
keyValueStore.put(entry.getKey(), entry.getValue());
} else {
keyValueStore.delete(entry.getKey());
}
}
}
}
this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
}
示例7
/**
* Compare two multi-file style offset. A multi-file style offset consist of both
* the file index as well as the offset within that file. And the format of it is:
* "fileIndex:offsetWithinFile"
* For example, "2:0", "3:127"
* Format of the offset within file is defined by the implementation of
* {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself.
*
* @param offset1 First offset for comparison.
* @param offset2 Second offset for comparison.
* @return -1, if offset1 @lt offset2
* 0, if offset1 == offset2
* 1, if offset1 @gt offset2
* null, if not comparable
*/
@Override
public Integer offsetComparator(String offset1, String offset2) {
if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) {
return null;
}
/*
* Properly handle END_OF_STREAM offset here. If both are END_OF_STREAM,
* then they are equal. Otherwise END_OF_STREAM is always greater than any
* other offsets.
*/
if (offset1.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
return offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET) ? 0 : 1;
}
if (offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
return -1;
}
int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1);
int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2);
if (fileIndex1 == fileIndex2) {
String offsetWithinFile1 = MultiFileHdfsReader.getCurSingleFileOffset(offset1);
String offsetWithinFile2 = MultiFileHdfsReader.getCurSingleFileOffset(offset2);
return HdfsReaderFactory.offsetComparator(readerType, offsetWithinFile1, offsetWithinFile2);
}
return Integer.compare(fileIndex1, fileIndex2);
}
示例8
/**
* {@inheritDoc}
*/
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
Set<SystemStreamPartition> systemStreamPartitions, long timeout)
throws InterruptedException {
systemStreamPartitions.forEach(systemStreamPartition -> {
Future status = readerRunnableStatus.get(systemStreamPartition);
if (status.isDone()) {
try {
status.get();
} catch (ExecutionException | InterruptedException e) {
MultiFileHdfsReader reader = readers.get(systemStreamPartition);
LOG.warn(
String.format("Detect failure in ReaderRunnable for ssp: %s. Try to reconnect now.", systemStreamPartition),
e);
reader.reconnect();
readerRunnableStatus.put(systemStreamPartition, executorService.submit(new ReaderRunnable(reader)));
}
}
});
return super.poll(systemStreamPartitions, timeout);
}
示例9
@Test(expected = SamzaException.class)
public void testReachingMaxReconnect() {
int numMaxRetries = 3;
SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
MultiFileHdfsReader multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp, Arrays.asList(descriptors), "0:0", numMaxRetries);
// first read a few events, and then reconnect
for (int i = 0; i < NUM_EVENTS / 2; i++) {
multiReader.readNext();
}
for (int i = 0; i < numMaxRetries; i++) {
IncomingMessageEnvelope envelope = multiReader.readNext();
multiReader.reconnect();
IncomingMessageEnvelope envelopeAfterReconnect = multiReader.readNext();
Assert.assertEquals(envelope, envelopeAfterReconnect);
}
multiReader.readNext();
multiReader.reconnect();
Assert.fail();
}
示例10
@Before
public void setup() {
completeCount = new AtomicInteger(0);
failureCount = new AtomicInteger(0);
throwable = null;
listener = new TaskCallbackListener() {
@Override
public void onComplete(TaskCallback callback) {
completeCount.incrementAndGet();
}
@Override
public void onFailure(TaskCallback callback, Throwable t) {
throwable = t;
failureCount.incrementAndGet();
}
};
callback = new TaskCallbackImpl(listener, null, mock(IncomingMessageEnvelope.class), null, 0L, 0L);
}
示例11
/**
* Fetch the pending envelope in the pending queue for the task to process.
* Update the chooser for flow control on the SSP level. Once it's updated, the RunLoop
* will be able to choose new messages from this SSP for the task to process. Note that we
* update only when the envelope is first time being processed. This solves the issue in
* Broadcast stream where a message need to be processed by multiple tasks. In that case,
* the envelope will be in the pendingEnvelopeQueue of each task. Only the first fetch updates
* the chooser with the next envelope in the broadcast stream partition.
* The function will be called in the run loop thread so no synchronization.
* @return
*/
private IncomingMessageEnvelope fetchEnvelope() {
PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.remove();
int queueSize = pendingEnvelopeQueue.size();
taskMetrics.pendingMessages().set(queueSize);
log.trace("fetch envelope ssp {} offset {} to process.",
pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset());
log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize);
if (pendingEnvelope.markProcessed()) {
SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition();
consumerMultiplexer.tryUpdate(partition);
log.debug("Update chooser for {}", partition);
}
return pendingEnvelope.envelope;
}
示例12
private List<IncomingMessageEnvelope> consumeRawMessages(SystemConsumer consumer, Set<SystemStreamPartition> sspsToPoll) {
try {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = consumer.poll(sspsToPoll, POLL_TIMEOUT_MS);
return results.entrySet()
.stream()
.filter(entry -> entry.getValue().size() != 0)
.map(Map.Entry::getValue)
.flatMap(List::stream)
.collect(Collectors.toList());
} catch (Exception e) {
fail("Unable to consume messages");
}
return new ArrayList<>();
}
示例13
@Test
public void testSyncTaskWithMultiplePartitionMultithreadedWithCustomIME() throws Exception {
Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
Map<Integer, List<KV>> inputPartitionIME = new HashMap<>();
Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
genData(inputPartitionData, expectedOutputPartitionData);
for (Map.Entry<Integer, List<KV>> entry: inputPartitionData.entrySet()) {
Integer partitionId = entry.getKey();
List<KV> messages = entry.getValue();
SystemStreamPartition ssp = new SystemStreamPartition("test", "input", new Partition(partitionId));
inputPartitionIME.put(partitionId, new ArrayList<>());
int offset = 0;
for (KV message: messages) {
IncomingMessageEnvelope ime = new IncomingMessageEnvelope(ssp, String.valueOf(offset++), message.key, message.getValue());
inputPartitionIME.get(partitionId).add(KV.of(message.key, ime));
}
}
syncTaskWithMultiplePartitionMultithreadedHelper(inputPartitionIME, expectedOutputPartitionData);
}
示例14
@Test
public void testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled() throws Exception {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY)).thenReturn(GROUPER_FACTORY_CLASS);
// mock out a consumer that returns a single checkpoint IME
SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);
SystemAdmin mockAdmin = newAdmin("0", "1");
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
// wire up an exception throwing serde with the checkpointmanager
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
false, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(),
new ExceptionThrowingCheckpointKeySerde());
checkpointManager.register(TASK1);
checkpointManager.start();
// expect the read to succeed inspite of the exception from ExceptionThrowingSerde
checkpointManager.readLastCheckpoint(TASK1);
}
示例15
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long timeout)
throws InterruptedException {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopeMap = new HashMap<>();
set.forEach(ssp -> {
int curMessages = curMessagesPerSsp.get(ssp);
// We send num Messages and an end of stream message following that.
List<IncomingMessageEnvelope> envelopes =
IntStream.range(curMessages, curMessages + numMessages / 4)
.mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, getKey(i, ssp),
getData(i, ssp)) : IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp))
.collect(Collectors.toList());
envelopeMap.put(ssp, envelopes);
curMessagesPerSsp.put(ssp, curMessages + numMessages / 4);
});
if (sleepBetweenPollsMs > 0) {
Thread.sleep(sleepBetweenPollsMs);
}
return envelopeMap;
}
示例16
/**
* Pass an invalid IME to processAsync. Any exceptions in processAsync should still get propagated through the
* task callback.
*/
@Test
public void testExceptionsInProcessInvokesTaskCallback() throws InterruptedException {
ExecutorService taskThreadPool = Executors.newFixedThreadPool(2);
TaskCallback mockTaskCallback = mock(TaskCallback.class);
MessageCollector mockMessageCollector = mock(MessageCollector.class);
TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class));
operatorTask.setTaskThreadPool(taskThreadPool);
CountDownLatch failureLatch = new CountDownLatch(1);
doAnswer(ctx -> {
failureLatch.countDown();
return null;
}).when(mockTaskCallback).failure(anyObject());
operatorTask.processAsync(mock(IncomingMessageEnvelope.class), mockMessageCollector,
mockTaskCoordinator, mockTaskCallback);
failureLatch.await();
}
示例17
private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> readEvents(Set<SystemStreamPartition> ssps,
KinesisSystemConsumer consumer, int numEvents) throws InterruptedException {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<>();
int totalEventsConsumed = 0;
while (totalEventsConsumed < numEvents) {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> receivedMessages =
consumer.poll(ssps, Duration.ofSeconds(1).toMillis());
receivedMessages.forEach((key, value) -> {
if (messages.containsKey(key)) {
messages.get(key).addAll(value);
} else {
messages.put(key, new ArrayList<>(value));
}
});
totalEventsConsumed = messages.values().stream().mapToInt(List::size).sum();
}
if (totalEventsConsumed < numEvents) {
String msg = String.format("Received only %d of %d events", totalEventsConsumed, numEvents);
throw new SamzaException(msg);
}
return messages;
}
示例18
private ReaderTask(
Map<BoundedReader<T>, SystemStreamPartition> readerToSsp,
int capacity,
FnWithMetricsWrapper metricsWrapper) {
this.readerToSsp = readerToSsp;
this.available = new Semaphore(capacity);
this.metricsWrapper = metricsWrapper;
final Map<SystemStreamPartition, LinkedBlockingQueue<IncomingMessageEnvelope>> qs =
new HashMap<>();
readerToSsp.values().forEach(ssp -> qs.put(ssp, new LinkedBlockingQueue<>()));
this.queues = ImmutableMap.copyOf(qs);
}
示例19
private void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator,
TaskCallback callback) {
try {
wrappedTask.process(envelope, collector, coordinator);
callback.complete();
} catch (Throwable t) {
callback.failure(t);
}
}
示例20
@Test
public void testWithUnkeyedInput() {
InputOperatorImpl inputOperator =
new InputOperatorImpl(new InputOperatorSpec("stream-id", null, null, null, false, "input-op-id"));
IncomingMessageEnvelope ime =
new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "123", "key", "msg");
Collection<Object> results =
inputOperator.handleMessage(ime, mock(MessageCollector.class), mock(TaskCoordinator.class));
Object result = results.iterator().next();
assertEquals("msg", result);
}
示例21
private void setError(Exception exception) {
this.lastException = exception;
// A dummy message used to force the consumer to wake up immediately and check the
// lastException field, which will be populated.
readerToSsp
.values()
.forEach(
ssp -> {
final IncomingMessageEnvelope checkLastExceptionEvelope =
new IncomingMessageEnvelope(ssp, null, null, null);
enqueueUninterruptibly(checkLastExceptionEvelope);
});
}
示例22
@Test
public void testUpdateCallbackWithCoordinatorRequests() {
TaskName taskName = new TaskName("Partition 0");
SystemStreamPartition ssp = new SystemStreamPartition("kafka", "topic", new Partition(0));
// simulate out of order
IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp, "2", null, null);
ReadableCoordinator coordinator2 = new ReadableCoordinator(taskName);
coordinator2.shutdown(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator2, 2, 0);
List<TaskCallbackImpl> callbacksToUpdate = callbackManager.updateCallback(callback2);
assertTrue(callbacksToUpdate.isEmpty());
IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null);
ReadableCoordinator coordinator1 = new ReadableCoordinator(taskName);
coordinator1.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator1, 1, 0);
callbacksToUpdate = callbackManager.updateCallback(callback1);
assertTrue(callbacksToUpdate.isEmpty());
IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null);
ReadableCoordinator coordinator = new ReadableCoordinator(taskName);
TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
callbacksToUpdate = callbackManager.updateCallback(callback0);
assertEquals(2, callbacksToUpdate.size());
//Check for envelope0
TaskCallbackImpl taskCallback = callbacksToUpdate.get(0);
assertTrue(taskCallback.matchSeqNum(0));
assertEquals(ssp, taskCallback.envelope.getSystemStreamPartition());
assertEquals("0", taskCallback.envelope.getOffset());
//Check for envelope1
taskCallback = callbacksToUpdate.get(1);
assertTrue(taskCallback.matchSeqNum(1));
assertEquals(ssp, taskCallback.envelope.getSystemStreamPartition());
assertEquals("1", taskCallback.envelope.getOffset());
}
示例23
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
Set<SystemStreamPartition> systemStreamPartitions, long timeout)
throws InterruptedException {
assert !readerToSsp.isEmpty(); // start should be called before poll
final Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = new HashMap<>();
for (SystemStreamPartition ssp : systemStreamPartitions) {
envelopes.put(ssp, readerTask.getNextMessages(ssp, timeout));
}
return envelopes;
}
示例24
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
processed = true;
if (e != null) {
throw e;
}
}
示例25
@Override
@SuppressWarnings("unchecked")
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
Map<String, Object> message = (Map<String, Object>) envelope.getMessage();
if (!message.get("event").equals("postMessage")) {
throw new IllegalStateException("Unexpected event type on deliveries stream: " + message.get("event"));
}
String recipient = (String) message.get("recipient");
String time = (String) message.get("time");
homeTimeline.put(recipient + ":" + time + ":" + numMessages, message);
numMessages++;
}
示例26
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
Set<SystemStreamPartition> ssps, long timeout) throws InterruptedException {
if (isEnd.compareAndSet(false, true)) {
return ssps.stream()
.collect(
Collectors.toMap(
Function.identity(), SamzaImpulseSystemConsumer::constructMessages));
} else {
return Collections.emptyMap();
}
}
示例27
static IncomingMessageEnvelope createElementMessage(
SystemStreamPartition ssp, String offset, String element, Instant timestamp) {
return new IncomingMessageEnvelope(
ssp,
offset,
null,
OpMessage.ofElement(WindowedValue.timestampedValueInGlobalWindow(element, timestamp)));
}
示例28
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
throws Exception {
for (SamzaStream stream : this.outputStreams) {
stream.setCollector(collector);
}
this.getProcessor().process((ContentEvent) envelope.getMessage());
}
示例29
public void start() throws IOException, InterruptedException {
super.start();
SystemAdmin systemAdmin = factory.getAdmin(systemName, config);
SystemStreamMetadata ssm =
systemAdmin.getSystemStreamMetadata(Collections.singleton(physicalStreamName)).get(physicalStreamName);
NoOpMetricsRegistry metricsRegistry = new NoOpMetricsRegistry();
Set<SystemStreamPartition> ssps = createSSPs(systemName, physicalStreamName, startPartition, endPartition);
SystemConsumer consumer = factory.getConsumer(systemName, config, metricsRegistry);
for (SystemStreamPartition ssp : ssps) {
consumer.register(ssp, ssm.getSystemStreamPartitionMetadata().get(ssp.getPartition()).getOldestOffset());
}
consumer.start();
System.out.println("starting consumption at " + Instant.now());
Instant startTime = Instant.now();
int numEvents = 0;
while (numEvents < totalEvents) {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollResult = consumer.poll(ssps, 2000);
numEvents += pollResult.values().stream().mapToInt(List::size).sum();
}
System.out.println("Ending consumption at " + Instant.now());
System.out.println(String.format("Event Rate is %s Messages/Sec ",
numEvents * 1000 / Duration.between(startTime, Instant.now()).toMillis()));
consumer.stop();
System.exit(0);
}
示例30
@Override
public IncomingMessageEnvelope readNext() {
// get checkpoint for THIS record
String checkpoint = nextOffset();
GenericRecord record = fileReader.next();
if (fileReader.previousSync() != curBlockStart) {
curBlockStart = fileReader.previousSync();
curRecordOffset = 0;
} else {
curRecordOffset++;
}
// avro schema doesn't necessarily have key field
return new IncomingMessageEnvelope(systemStreamPartition, checkpoint, null, record);
}