Java源码示例:org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber
示例1
protected String getShardIteratorForSentinel(SequenceNumber sentinelSequenceNumber) throws InterruptedException {
String nextShardItr;
if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
// if the shard is already closed, there will be no latest next record to get for this shard
if (subscribedShard.isClosed()) {
nextShardItr = null;
} else {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
}
} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
nextShardItr = null;
} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
} else {
throw new RuntimeException("Unknown sentinel type: " + sentinelSequenceNumber);
}
return nextShardItr;
}
示例2
/**
* Update the shard to last processed sequence number state.
* This method is called by {@link ShardConsumer}s.
*
* @param shardStateIndex index of the shard to update in subscribedShardsState;
* this index should be the returned value from
* {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
* when the shard state was registered.
* @param lastSequenceNumber the last sequence number value to update
*/
protected final void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);
// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
// we've finished reading the shard and should determine it to be non-active
if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
LOG.info("Subtask {} has reached the end of subscribed shard: {}",
indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getStreamShardHandle());
// check if we need to mark the source as idle;
// note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards
// AFTER the old shards had reached the end, the subtask's status will be automatically toggled back to
// be active immediately afterwards as soon as we collect records from the new shards
if (this.numberOfActiveShards.decrementAndGet() == 0) {
LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...",
indexOfThisConsumerSubtask);
sourceContext.markAsTemporarilyIdle();
}
}
}
}
示例3
protected String getShardIteratorForSentinel(SequenceNumber sentinelSequenceNumber) throws InterruptedException {
String nextShardItr;
if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
// if the shard is already closed, there will be no latest next record to get for this shard
if (subscribedShard.isClosed()) {
nextShardItr = null;
} else {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
}
} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
nextShardItr = null;
} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
} else {
throw new RuntimeException("Unknown sentinel type: " + sentinelSequenceNumber);
}
return nextShardItr;
}
示例4
/**
* Update the shard to last processed sequence number state.
* This method is called by {@link ShardConsumer}s.
*
* @param shardStateIndex index of the shard to update in subscribedShardsState;
* this index should be the returned value from
* {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
* when the shard state was registered.
* @param lastSequenceNumber the last sequence number value to update
*/
protected final void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);
// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
// we've finished reading the shard and should determine it to be non-active
if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
LOG.info("Subtask {} has reached the end of subscribed shard: {}",
indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getStreamShardHandle());
// check if we need to mark the source as idle;
// note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards
// AFTER the old shards had reached the end, the subtask's status will be automatically toggled back to
// be active immediately afterwards as soon as we collect records from the new shards
if (this.numberOfActiveShards.decrementAndGet() == 0) {
LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...",
indexOfThisConsumerSubtask);
sourceContext.markAsTemporarilyIdle();
}
}
}
}
示例5
protected String getShardIteratorForSentinel(SequenceNumber sentinelSequenceNumber) throws InterruptedException {
String nextShardItr;
if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
// if the shard is already closed, there will be no latest next record to get for this shard
if (subscribedShard.isClosed()) {
nextShardItr = null;
} else {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
}
} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
nextShardItr = null;
} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
} else {
throw new RuntimeException("Unknown sentinel type: " + sentinelSequenceNumber);
}
return nextShardItr;
}
示例6
/**
* Update the shard to last processed sequence number state.
* This method is called by {@link ShardConsumer}s.
*
* @param shardStateIndex index of the shard to update in subscribedShardsState;
* this index should be the returned value from
* {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
* when the shard state was registered.
* @param lastSequenceNumber the last sequence number value to update
*/
protected final void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);
// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
// we've finished reading the shard and should determine it to be non-active
if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
LOG.info("Subtask {} has reached the end of subscribed shard: {}",
indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getStreamShardHandle());
// check if we need to mark the source as idle;
// note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards
// AFTER the old shards had reached the end, the subtask's status will be automatically toggled back to
// be active immediately afterwards as soon as we collect records from the new shards
if (this.numberOfActiveShards.decrementAndGet() == 0) {
LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...",
indexOfThisConsumerSubtask);
sourceContext.markAsTemporarilyIdle();
}
}
}
}
示例7
/**
* Register a new subscribed shard state.
*
* @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to
*/
public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {
synchronized (checkpointLock) {
subscribedShardsState.add(newSubscribedShardState);
// If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
// if the consumer had already finished reading a shard before we failed and restored), we determine that
// this subtask has a new active shard
if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
this.numberOfActiveShards.incrementAndGet();
}
int shardStateIndex = subscribedShardsState.size() - 1;
// track all discovered shards for watermark determination
ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
if (sws == null) {
sws = new ShardWatermarkState();
try {
sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e);
}
sws.emitQueue = recordEmitter.getQueue(shardStateIndex);
sws.lastUpdated = getCurrentTimeMillis();
sws.lastRecordTimestamp = Long.MIN_VALUE;
shardWatermarks.put(shardStateIndex, sws);
}
return shardStateIndex;
}
}
示例8
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
TestSourceContext<String> sourceContext = new TestSourceContext<>();
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
new Properties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L),
new ShardMetricsReporter()).run();
assertEquals(1000, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
示例9
/**
* Register a new subscribed shard state.
*
* @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to
*/
public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {
synchronized (checkpointLock) {
subscribedShardsState.add(newSubscribedShardState);
// If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
// if the consumer had already finished reading a shard before we failed and restored), we determine that
// this subtask has a new active shard
if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
this.numberOfActiveShards.incrementAndGet();
}
int shardStateIndex = subscribedShardsState.size() - 1;
// track all discovered shards for watermark determination
ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
if (sws == null) {
sws = new ShardWatermarkState();
try {
sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e);
}
sws.emitQueue = recordEmitter.getQueue(shardStateIndex);
sws.lastUpdated = getCurrentTimeMillis();
sws.lastRecordTimestamp = Long.MIN_VALUE;
shardWatermarks.put(shardStateIndex, sws);
}
return shardStateIndex;
}
}
示例10
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
TestSourceContext<String> sourceContext = new TestSourceContext<>();
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
new Properties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L),
new ShardMetricsReporter()).run();
assertEquals(1000, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
示例11
/**
* Register a new subscribed shard state.
*
* @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to
*/
public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {
synchronized (checkpointLock) {
subscribedShardsState.add(newSubscribedShardState);
// If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
// if the consumer had already finished reading a shard before we failed and restored), we determine that
// this subtask has a new active shard
if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
this.numberOfActiveShards.incrementAndGet();
}
int shardStateIndex = subscribedShardsState.size() - 1;
// track all discovered shards for watermark determination
ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
if (sws == null) {
sws = new ShardWatermarkState();
try {
sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e);
}
sws.emitQueue = recordEmitter.getQueue(shardStateIndex);
sws.lastUpdated = getCurrentTimeMillis();
sws.lastRecordTimestamp = Long.MIN_VALUE;
shardWatermarks.put(shardStateIndex, sws);
}
return shardStateIndex;
}
}
示例12
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
String nextShardItr = getShardIterator(lastSequenceNum);
long processingStartTimeNanos = System.nanoTime();
while (isRunning()) {
if (nextShardItr == null) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
// we can close this consumer thread once we've reached the end of the subscribed shard
break;
} else {
shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
List<Record> aggregatedRecords = getRecordsResult.getRecords();
int numberOfAggregatedRecords = aggregatedRecords.size();
shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
List<UserRecord> fetchedRecords = deaggregateRecords(
aggregatedRecords,
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
long recordBatchSizeBytes = 0L;
for (UserRecord record : fetchedRecords) {
recordBatchSizeBytes += record.getData().remaining();
deserializeRecordForCollectionAndUpdateState(record);
}
int numberOfDeaggregatedRecords = fetchedRecords.size();
shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
nextShardItr = getRecordsResult.getNextShardIterator();
long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
示例13
InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
this.sentinelSequenceNumber = sentinelSequenceNumber;
}
示例14
public SentinelSequenceNumber toSentinelSequenceNumber() {
return this.sentinelSequenceNumber;
}
示例15
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
// can potentially have new shards to subscribe to later on
KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);
// initial discovery
List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
for (StreamShardHandle shard : allShards) {
StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(shard));
if (sequenceNumsToRestore != null) {
if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
// if the shard was already seen and is contained in the state,
// just use the sequence number stored in the state
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, sequenceNumsToRestore.get(kinesisStreamShard)));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
" starting state set to the restored sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
}
} else {
// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
" starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString());
}
}
} else {
// we're starting fresh; use the configured start position as initial state
SentinelSequenceNumber startingSeqNum =
InitialPosition.valueOf(configProps.getProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get());
}
}
}
// check that we are running before starting the fetcher
if (!running) {
return;
}
// expose the fetcher from this point, so that state
// snapshots can be taken from the fetcher's state holders
this.fetcher = fetcher;
// start the fetcher loop. The fetcher will stop running only when cancel() or
// close() is called, or an error is thrown by threads created by the fetcher
fetcher.runFetcher();
// check that the fetcher has terminated before fully closing
fetcher.awaitTermination();
sourceContext.close();
}
示例16
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
TestSourceContext<String> sourceContext = new TestSourceContext<>();
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
new Properties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
// Get a total of 1000 records with 9 getRecords() calls,
// and the 7th getRecords() call will encounter an unexpected expired shard iterator
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
1000, 9, 7, 500L),
new ShardMetricsReporter()).run();
assertEquals(1000, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
示例17
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
Properties consumerProperties = new Properties();
consumerProperties.put("flink.shard.adaptivereads", "true");
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
TestSourceContext<String> sourceContext = new TestSourceContext<>();
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
consumerProperties,
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
// Initial number of records to fetch --> 10
FakeKinesisBehavioursFactory.initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(10, 2, 500L),
new ShardMetricsReporter()).run();
// Avg record size for first batch --> 10 * 10 Kb/10 = 10 Kb
// Number of records fetched in second batch --> 2 Mb/10Kb * 5 = 40
// Total number of records = 10 + 40 = 50
assertEquals(50, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
示例18
@Test
public void testRestoreWithEmptyState() throws Exception {
final List<StreamShardHandle> initialDiscoveryShards = new ArrayList<>(TEST_STATE.size());
for (StreamShardMetadata shardMetadata : TEST_STATE.keySet()) {
Shard shard = new Shard();
shard.setShardId(shardMetadata.getShardId());
SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
sequenceNumberRange.withStartingSequenceNumber("1");
shard.setSequenceNumberRange(sequenceNumberRange);
initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), shard));
}
final TestFetcher<String> fetcher = new TestFetcher<>(
Collections.singletonList(TEST_STREAM_NAME),
new TestSourceContext<>(),
new TestRuntimeContext(true, 1, 0),
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
null,
initialDiscoveryShards);
final DummyFlinkKinesisConsumer<String> consumerFunction = new DummyFlinkKinesisConsumer<>(
fetcher, new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()));
StreamSource<String, DummyFlinkKinesisConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction);
final AbstractStreamOperatorTestHarness<String> testHarness =
new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
testHarness.setup();
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-empty-snapshot"));
testHarness.open();
consumerFunction.run(new TestSourceContext<>());
// assert that no state was restored
assertTrue(consumerFunction.getRestoredState().isEmpty());
// although the restore state is empty, the fetcher should still have been registered the initial discovered shard;
// furthermore, the discovered shard should be considered a newly created shard while the job wasn't running,
// and therefore should be consumed from the earliest sequence number
KinesisStreamShardState restoredShardState = fetcher.getSubscribedShardsState().get(0);
assertEquals(TEST_STREAM_NAME, restoredShardState.getStreamShardHandle().getStreamName());
assertEquals(TEST_SHARD_ID, restoredShardState.getStreamShardHandle().getShard().getShardId());
assertFalse(restoredShardState.getStreamShardHandle().isClosed());
assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredShardState.getLastProcessedSequenceNum());
consumerOperator.close();
consumerOperator.cancel();
}
示例19
@Test
public void testRestoreWithReshardedStream() throws Exception {
final List<StreamShardHandle> initialDiscoveryShards = new ArrayList<>(TEST_STATE.size());
for (StreamShardMetadata shardMetadata : TEST_STATE.keySet()) {
// setup the closed shard
Shard closedShard = new Shard();
closedShard.setShardId(shardMetadata.getShardId());
SequenceNumberRange closedSequenceNumberRange = new SequenceNumberRange();
closedSequenceNumberRange.withStartingSequenceNumber("1");
closedSequenceNumberRange.withEndingSequenceNumber("1087654321"); // this represents a closed shard
closedShard.setSequenceNumberRange(closedSequenceNumberRange);
initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), closedShard));
// setup the new shards
Shard newSplitShard1 = new Shard();
newSplitShard1.setShardId(KinesisShardIdGenerator.generateFromShardOrder(1));
SequenceNumberRange newSequenceNumberRange1 = new SequenceNumberRange();
newSequenceNumberRange1.withStartingSequenceNumber("1087654322");
newSplitShard1.setSequenceNumberRange(newSequenceNumberRange1);
newSplitShard1.setParentShardId(TEST_SHARD_ID);
Shard newSplitShard2 = new Shard();
newSplitShard2.setShardId(KinesisShardIdGenerator.generateFromShardOrder(2));
SequenceNumberRange newSequenceNumberRange2 = new SequenceNumberRange();
newSequenceNumberRange2.withStartingSequenceNumber("2087654322");
newSplitShard2.setSequenceNumberRange(newSequenceNumberRange2);
newSplitShard2.setParentShardId(TEST_SHARD_ID);
initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), newSplitShard1));
initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), newSplitShard2));
}
final TestFetcher<String> fetcher = new TestFetcher<>(
Collections.singletonList(TEST_STREAM_NAME),
new TestSourceContext<>(),
new TestRuntimeContext(true, 1, 0),
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
null,
initialDiscoveryShards);
final DummyFlinkKinesisConsumer<String> consumerFunction = new DummyFlinkKinesisConsumer<>(
fetcher, new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()));
StreamSource<String, DummyFlinkKinesisConsumer<String>> consumerOperator =
new StreamSource<>(consumerFunction);
final AbstractStreamOperatorTestHarness<String> testHarness =
new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
testHarness.setup();
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
testHarness.open();
consumerFunction.run(new TestSourceContext<>());
// assert that state is correctly restored
assertNotEquals(null, consumerFunction.getRestoredState());
assertEquals(1, consumerFunction.getRestoredState().size());
assertEquals(TEST_STATE, removeEquivalenceWrappers(consumerFunction.getRestoredState()));
// assert that the fetcher is registered with all shards, including new shards
assertEquals(3, fetcher.getSubscribedShardsState().size());
KinesisStreamShardState restoredClosedShardState = fetcher.getSubscribedShardsState().get(0);
assertEquals(TEST_STREAM_NAME, restoredClosedShardState.getStreamShardHandle().getStreamName());
assertEquals(TEST_SHARD_ID, restoredClosedShardState.getStreamShardHandle().getShard().getShardId());
assertTrue(restoredClosedShardState.getStreamShardHandle().isClosed());
assertEquals(TEST_SEQUENCE_NUMBER, restoredClosedShardState.getLastProcessedSequenceNum());
KinesisStreamShardState restoredNewSplitShard1 = fetcher.getSubscribedShardsState().get(1);
assertEquals(TEST_STREAM_NAME, restoredNewSplitShard1.getStreamShardHandle().getStreamName());
assertEquals(KinesisShardIdGenerator.generateFromShardOrder(1), restoredNewSplitShard1.getStreamShardHandle().getShard().getShardId());
assertFalse(restoredNewSplitShard1.getStreamShardHandle().isClosed());
// new shards should be consumed from the beginning
assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredNewSplitShard1.getLastProcessedSequenceNum());
KinesisStreamShardState restoredNewSplitShard2 = fetcher.getSubscribedShardsState().get(2);
assertEquals(TEST_STREAM_NAME, restoredNewSplitShard2.getStreamShardHandle().getStreamName());
assertEquals(KinesisShardIdGenerator.generateFromShardOrder(2), restoredNewSplitShard2.getStreamShardHandle().getShard().getShardId());
assertFalse(restoredNewSplitShard2.getStreamShardHandle().isClosed());
// new shards should be consumed from the beginning
assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredNewSplitShard2.getLastProcessedSequenceNum());
consumerOperator.close();
consumerOperator.cancel();
}
示例20
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
shards.add(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
fakeRestoredState.put(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
示例21
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
String nextShardItr = getShardIterator(lastSequenceNum);
long processingStartTimeNanos = System.nanoTime();
while (isRunning()) {
if (nextShardItr == null) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
// we can close this consumer thread once we've reached the end of the subscribed shard
break;
} else {
shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
List<Record> aggregatedRecords = getRecordsResult.getRecords();
int numberOfAggregatedRecords = aggregatedRecords.size();
shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
List<UserRecord> fetchedRecords = deaggregateRecords(
aggregatedRecords,
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
long recordBatchSizeBytes = 0L;
for (UserRecord record : fetchedRecords) {
recordBatchSizeBytes += record.getData().remaining();
deserializeRecordForCollectionAndUpdateState(record);
}
int numberOfDeaggregatedRecords = fetchedRecords.size();
shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
nextShardItr = getRecordsResult.getNextShardIterator();
long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
示例22
InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
this.sentinelSequenceNumber = sentinelSequenceNumber;
}
示例23
public SentinelSequenceNumber toSentinelSequenceNumber() {
return this.sentinelSequenceNumber;
}
示例24
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
// can potentially have new shards to subscribe to later on
KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);
// initial discovery
List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
for (StreamShardHandle shard : allShards) {
StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(shard));
if (sequenceNumsToRestore != null) {
if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
// if the shard was already seen and is contained in the state,
// just use the sequence number stored in the state
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, sequenceNumsToRestore.get(kinesisStreamShard)));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
" starting state set to the restored sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
}
} else {
// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
" starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString());
}
}
} else {
// we're starting fresh; use the configured start position as initial state
SentinelSequenceNumber startingSeqNum =
InitialPosition.valueOf(configProps.getProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get());
}
}
}
// check that we are running before starting the fetcher
if (!running) {
return;
}
// expose the fetcher from this point, so that state
// snapshots can be taken from the fetcher's state holders
this.fetcher = fetcher;
// start the fetcher loop. The fetcher will stop running only when cancel() or
// close() is called, or an error is thrown by threads created by the fetcher
fetcher.runFetcher();
// check that the fetcher has terminated before fully closing
fetcher.awaitTermination();
sourceContext.close();
}
示例25
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
TestSourceContext<String> sourceContext = new TestSourceContext<>();
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
new Properties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
// Get a total of 1000 records with 9 getRecords() calls,
// and the 7th getRecords() call will encounter an unexpected expired shard iterator
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
1000, 9, 7, 500L),
new ShardMetricsReporter()).run();
assertEquals(1000, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
示例26
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
Properties consumerProperties = new Properties();
consumerProperties.put("flink.shard.adaptivereads", "true");
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
TestSourceContext<String> sourceContext = new TestSourceContext<>();
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
consumerProperties,
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
// Initial number of records to fetch --> 10
FakeKinesisBehavioursFactory.initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(10, 2, 500L),
new ShardMetricsReporter()).run();
// Avg record size for first batch --> 10 * 10 Kb/10 = 10 Kb
// Number of records fetched in second batch --> 2 Mb/10Kb * 5 = 40
// Total number of records = 10 + 40 = 50
assertEquals(50, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
示例27
@Test
public void testRestoreWithEmptyState() throws Exception {
final List<StreamShardHandle> initialDiscoveryShards = new ArrayList<>(TEST_STATE.size());
for (StreamShardMetadata shardMetadata : TEST_STATE.keySet()) {
Shard shard = new Shard();
shard.setShardId(shardMetadata.getShardId());
SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
sequenceNumberRange.withStartingSequenceNumber("1");
shard.setSequenceNumberRange(sequenceNumberRange);
initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), shard));
}
final TestFetcher<String> fetcher = new TestFetcher<>(
Collections.singletonList(TEST_STREAM_NAME),
new TestSourceContext<>(),
new TestRuntimeContext(true, 1, 0),
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
null,
initialDiscoveryShards);
final DummyFlinkKinesisConsumer<String> consumerFunction = new DummyFlinkKinesisConsumer<>(
fetcher, new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()));
StreamSource<String, DummyFlinkKinesisConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction);
final AbstractStreamOperatorTestHarness<String> testHarness =
new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
testHarness.setup();
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-empty-snapshot"));
testHarness.open();
consumerFunction.run(new TestSourceContext<>());
// assert that no state was restored
assertTrue(consumerFunction.getRestoredState().isEmpty());
// although the restore state is empty, the fetcher should still have been registered the initial discovered shard;
// furthermore, the discovered shard should be considered a newly created shard while the job wasn't running,
// and therefore should be consumed from the earliest sequence number
KinesisStreamShardState restoredShardState = fetcher.getSubscribedShardsState().get(0);
assertEquals(TEST_STREAM_NAME, restoredShardState.getStreamShardHandle().getStreamName());
assertEquals(TEST_SHARD_ID, restoredShardState.getStreamShardHandle().getShard().getShardId());
assertFalse(restoredShardState.getStreamShardHandle().isClosed());
assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredShardState.getLastProcessedSequenceNum());
consumerOperator.close();
consumerOperator.cancel();
}
示例28
@Test
public void testRestoreWithReshardedStream() throws Exception {
final List<StreamShardHandle> initialDiscoveryShards = new ArrayList<>(TEST_STATE.size());
for (StreamShardMetadata shardMetadata : TEST_STATE.keySet()) {
// setup the closed shard
Shard closedShard = new Shard();
closedShard.setShardId(shardMetadata.getShardId());
SequenceNumberRange closedSequenceNumberRange = new SequenceNumberRange();
closedSequenceNumberRange.withStartingSequenceNumber("1");
closedSequenceNumberRange.withEndingSequenceNumber("1087654321"); // this represents a closed shard
closedShard.setSequenceNumberRange(closedSequenceNumberRange);
initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), closedShard));
// setup the new shards
Shard newSplitShard1 = new Shard();
newSplitShard1.setShardId(KinesisShardIdGenerator.generateFromShardOrder(1));
SequenceNumberRange newSequenceNumberRange1 = new SequenceNumberRange();
newSequenceNumberRange1.withStartingSequenceNumber("1087654322");
newSplitShard1.setSequenceNumberRange(newSequenceNumberRange1);
newSplitShard1.setParentShardId(TEST_SHARD_ID);
Shard newSplitShard2 = new Shard();
newSplitShard2.setShardId(KinesisShardIdGenerator.generateFromShardOrder(2));
SequenceNumberRange newSequenceNumberRange2 = new SequenceNumberRange();
newSequenceNumberRange2.withStartingSequenceNumber("2087654322");
newSplitShard2.setSequenceNumberRange(newSequenceNumberRange2);
newSplitShard2.setParentShardId(TEST_SHARD_ID);
initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), newSplitShard1));
initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), newSplitShard2));
}
final TestFetcher<String> fetcher = new TestFetcher<>(
Collections.singletonList(TEST_STREAM_NAME),
new TestSourceContext<>(),
new TestRuntimeContext(true, 1, 0),
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
null,
initialDiscoveryShards);
final DummyFlinkKinesisConsumer<String> consumerFunction = new DummyFlinkKinesisConsumer<>(
fetcher, new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()));
StreamSource<String, DummyFlinkKinesisConsumer<String>> consumerOperator =
new StreamSource<>(consumerFunction);
final AbstractStreamOperatorTestHarness<String> testHarness =
new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
testHarness.setup();
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
testHarness.open();
consumerFunction.run(new TestSourceContext<>());
// assert that state is correctly restored
assertNotEquals(null, consumerFunction.getRestoredState());
assertEquals(1, consumerFunction.getRestoredState().size());
assertEquals(TEST_STATE, removeEquivalenceWrappers(consumerFunction.getRestoredState()));
// assert that the fetcher is registered with all shards, including new shards
assertEquals(3, fetcher.getSubscribedShardsState().size());
KinesisStreamShardState restoredClosedShardState = fetcher.getSubscribedShardsState().get(0);
assertEquals(TEST_STREAM_NAME, restoredClosedShardState.getStreamShardHandle().getStreamName());
assertEquals(TEST_SHARD_ID, restoredClosedShardState.getStreamShardHandle().getShard().getShardId());
assertTrue(restoredClosedShardState.getStreamShardHandle().isClosed());
assertEquals(TEST_SEQUENCE_NUMBER, restoredClosedShardState.getLastProcessedSequenceNum());
KinesisStreamShardState restoredNewSplitShard1 = fetcher.getSubscribedShardsState().get(1);
assertEquals(TEST_STREAM_NAME, restoredNewSplitShard1.getStreamShardHandle().getStreamName());
assertEquals(KinesisShardIdGenerator.generateFromShardOrder(1), restoredNewSplitShard1.getStreamShardHandle().getShard().getShardId());
assertFalse(restoredNewSplitShard1.getStreamShardHandle().isClosed());
// new shards should be consumed from the beginning
assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredNewSplitShard1.getLastProcessedSequenceNum());
KinesisStreamShardState restoredNewSplitShard2 = fetcher.getSubscribedShardsState().get(2);
assertEquals(TEST_STREAM_NAME, restoredNewSplitShard2.getStreamShardHandle().getStreamName());
assertEquals(KinesisShardIdGenerator.generateFromShardOrder(2), restoredNewSplitShard2.getStreamShardHandle().getShard().getShardId());
assertFalse(restoredNewSplitShard2.getStreamShardHandle().isClosed());
// new shards should be consumed from the beginning
assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredNewSplitShard2.getLastProcessedSequenceNum());
consumerOperator.close();
consumerOperator.cancel();
}
示例29
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
shards.add(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
fakeRestoredState.put(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
示例30
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
String nextShardItr = getShardIterator(lastSequenceNum);
long processingStartTimeNanos = System.nanoTime();
while (isRunning()) {
if (nextShardItr == null) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
// we can close this consumer thread once we've reached the end of the subscribed shard
break;
} else {
shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
List<Record> aggregatedRecords = getRecordsResult.getRecords();
int numberOfAggregatedRecords = aggregatedRecords.size();
shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
List<UserRecord> fetchedRecords = deaggregateRecords(
aggregatedRecords,
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
long recordBatchSizeBytes = 0L;
for (UserRecord record : fetchedRecords) {
recordBatchSizeBytes += record.getData().remaining();
deserializeRecordForCollectionAndUpdateState(record);
}
int numberOfDeaggregatedRecords = fetchedRecords.size();
shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
nextShardItr = getRecordsResult.getNextShardIterator();
long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}