Java源码示例:org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel
示例1
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
if (msg instanceof DefaultFileRegion) {
final DefaultFileRegion defaultFileRegion = (DefaultFileRegion) msg;
try (final FileOutputStream fileOutputStream = new FileOutputStream(outputFile)) {
fileOutputStream.getChannel();
defaultFileRegion.transferTo(fileOutputStream.getChannel(), 0L);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
return new DefaultChannelPromise(new EmbeddedChannel());
}
示例2
@Test
public void testProducerFailedException() throws Exception {
PartitionRequestQueue queue = new PartitionRequestQueue();
ResultSubpartitionView view = new ReleasedResultSubpartitionView();
ResultPartitionProvider partitionProvider =
(partitionId, index, availabilityListener) -> view;
EmbeddedChannel ch = new EmbeddedChannel(queue);
CreditBasedSequenceNumberingViewReader seqView = new CreditBasedSequenceNumberingViewReader(new InputChannelID(), 2, queue);
seqView.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
// Add available buffer to trigger enqueue the erroneous view
seqView.notifyDataAvailable();
ch.runPendingTasks();
// Read the enqueued msg
Object msg = ch.readOutbound();
assertEquals(msg.getClass(), NettyMessage.ErrorResponse.class);
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
assertTrue(err.cause instanceof CancelTaskException);
}
示例3
private void testBufferWriting(ResultSubpartitionView view) throws IOException {
// setup
ResultPartitionProvider partitionProvider =
(partitionId, index, availabilityListener) -> view;
final InputChannelID receiverId = new InputChannelID();
final PartitionRequestQueue queue = new PartitionRequestQueue();
final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, queue);
final EmbeddedChannel channel = new EmbeddedChannel(queue);
reader.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
// notify about buffer availability and encode one buffer
reader.notifyDataAvailable();
channel.runPendingTasks();
Object read = channel.readOutbound();
assertNotNull(read);
if (read instanceof NettyMessage.ErrorResponse) {
((NettyMessage.ErrorResponse) read).cause.printStackTrace();
}
assertThat(read, instanceOf(NettyMessage.BufferResponse.class));
read = channel.readOutbound();
assertNull(read);
}
示例4
/**
* Tests that the channel is closed if an Exception reaches the channel handler.
*/
@Test
public void testCloseChannelOnExceptionCaught() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(handler);
channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception"));
ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
buf.skipBytes(4); // skip frame length
// Verify the response
assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
Throwable response = MessageSerializer.deserializeServerFailure(buf);
assertTrue(response.getMessage().contains("Expected test Exception"));
channel.closeFuture().await(READ_TIMEOUT_MILLIS);
assertFalse(channel.isActive());
}
示例5
/**
* Queries the embedded channel for data.
*/
private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException {
final long sleepMillis = 50L;
long sleptMillis = 0L;
Object msg = null;
while (sleptMillis < READ_TIMEOUT_MILLIS &&
(msg = channel.readOutbound()) == null) {
Thread.sleep(sleepMillis);
sleptMillis += sleepMillis;
}
if (msg == null) {
throw new TimeoutException();
} else {
return msg;
}
}
示例6
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
if (msg instanceof DefaultFileRegion) {
final DefaultFileRegion defaultFileRegion = (DefaultFileRegion) msg;
try (final FileOutputStream fileOutputStream = new FileOutputStream(outputFile)) {
fileOutputStream.getChannel();
defaultFileRegion.transferTo(fileOutputStream.getChannel(), 0L);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
return new DefaultChannelPromise(new EmbeddedChannel());
}
示例7
/**
* Tests that {@link PartitionRequestServerHandler} responds {@link ErrorResponse} with wrapped
* {@link PartitionNotFoundException} after receiving invalid {@link PartitionRequest}.
*/
@Test
public void testResponsePartitionNotFoundException() {
final PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
new ResultPartitionManager(),
new TaskEventDispatcher(),
new PartitionRequestQueue(),
true);
final EmbeddedChannel channel = new EmbeddedChannel(serverHandler);
final ResultPartitionID partitionId = new ResultPartitionID();
// Write the message of partition request to server
channel.writeInbound(new PartitionRequest(partitionId, 0, new InputChannelID(), 2));
channel.runPendingTasks();
// Read the response message after handling partition request
final Object msg = channel.readOutbound();
assertThat(msg, instanceOf(ErrorResponse.class));
final ErrorResponse err = (ErrorResponse) msg;
assertThat(err.cause, instanceOf(PartitionNotFoundException.class));
final ResultPartitionID actualPartitionId = ((PartitionNotFoundException) err.cause).getPartitionId();
assertThat(partitionId, is(actualPartitionId));
}
示例8
@Test
public void testProducerFailedException() throws Exception {
PartitionRequestQueue queue = new PartitionRequestQueue();
ResultSubpartitionView view = new ReleasedResultSubpartitionView();
ResultPartitionProvider partitionProvider =
(partitionId, index, availabilityListener) -> view;
EmbeddedChannel ch = new EmbeddedChannel(queue);
CreditBasedSequenceNumberingViewReader seqView = new CreditBasedSequenceNumberingViewReader(new InputChannelID(), 2, queue);
seqView.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
// Add available buffer to trigger enqueue the erroneous view
seqView.notifyDataAvailable();
ch.runPendingTasks();
// Read the enqueued msg
Object msg = ch.readOutbound();
assertEquals(msg.getClass(), NettyMessage.ErrorResponse.class);
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
assertTrue(err.cause instanceof CancelTaskException);
}
示例9
private void testBufferWriting(ResultSubpartitionView view) throws IOException {
// setup
ResultPartitionProvider partitionProvider =
(partitionId, index, availabilityListener) -> view;
final InputChannelID receiverId = new InputChannelID();
final PartitionRequestQueue queue = new PartitionRequestQueue();
final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, queue);
final EmbeddedChannel channel = new EmbeddedChannel(queue);
reader.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
// notify about buffer availability and encode one buffer
reader.notifyDataAvailable();
channel.runPendingTasks();
Object read = channel.readOutbound();
assertNotNull(read);
if (read instanceof NettyMessage.ErrorResponse) {
((NettyMessage.ErrorResponse) read).cause.printStackTrace();
}
assertThat(read, instanceOf(NettyMessage.BufferResponse.class));
read = channel.readOutbound();
assertNull(read);
}
示例10
/**
* Tests that the channel is closed if an Exception reaches the channel handler.
*/
@Test
public void testCloseChannelOnExceptionCaught() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(handler);
channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception"));
ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
buf.skipBytes(4); // skip frame length
// Verify the response
assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
Throwable response = MessageSerializer.deserializeServerFailure(buf);
assertTrue(response.getMessage().contains("Expected test Exception"));
channel.closeFuture().await(READ_TIMEOUT_MILLIS);
assertFalse(channel.isActive());
}
示例11
/**
* Queries the embedded channel for data.
*/
private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException {
final long sleepMillis = 50L;
long sleptMillis = 0L;
Object msg = null;
while (sleptMillis < READ_TIMEOUT_MILLIS &&
(msg = channel.readOutbound()) == null) {
Thread.sleep(sleepMillis);
sleptMillis += sleepMillis;
}
if (msg == null) {
throw new TimeoutException();
} else {
return msg;
}
}
示例12
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
if (msg instanceof DefaultFileRegion) {
final DefaultFileRegion defaultFileRegion = (DefaultFileRegion) msg;
try (final FileOutputStream fileOutputStream = new FileOutputStream(outputFile)) {
fileOutputStream.getChannel();
defaultFileRegion.transferTo(fileOutputStream.getChannel(), 0L);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
return new DefaultChannelPromise(new EmbeddedChannel());
}
示例13
/**
* Tests that {@link PartitionRequestServerHandler} responds {@link ErrorResponse} with wrapped
* {@link PartitionNotFoundException} after receiving invalid {@link PartitionRequest}.
*/
@Test
public void testResponsePartitionNotFoundException() {
final PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
new ResultPartitionManager(),
new TaskEventDispatcher(),
new PartitionRequestQueue());
final EmbeddedChannel channel = new EmbeddedChannel(serverHandler);
final ResultPartitionID partitionId = new ResultPartitionID();
// Write the message of partition request to server
channel.writeInbound(new PartitionRequest(partitionId, 0, new InputChannelID(), 2));
channel.runPendingTasks();
// Read the response message after handling partition request
final Object msg = channel.readOutbound();
assertThat(msg, instanceOf(ErrorResponse.class));
final ErrorResponse err = (ErrorResponse) msg;
assertThat(err.cause, instanceOf(PartitionNotFoundException.class));
final ResultPartitionID actualPartitionId = ((PartitionNotFoundException) err.cause).getPartitionId();
assertThat(partitionId, is(actualPartitionId));
}
示例14
@Test
public void testResumeConsumption() {
final InputChannelID inputChannelID = new InputChannelID();
final PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
final TestViewReader testViewReader = new TestViewReader(inputChannelID, 2, partitionRequestQueue);
final PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
new ResultPartitionManager(),
new TaskEventDispatcher(),
partitionRequestQueue);
final EmbeddedChannel channel = new EmbeddedChannel(serverHandler);
partitionRequestQueue.notifyReaderCreated(testViewReader);
// Write the message of resume consumption to server
channel.writeInbound(new ResumeConsumption(inputChannelID));
channel.runPendingTasks();
assertTrue(testViewReader.consumptionResumed);
}
示例15
@Test
public void testProducerFailedException() throws Exception {
PartitionRequestQueue queue = new PartitionRequestQueue();
ResultSubpartitionView view = new ReleasedResultSubpartitionView();
ResultPartitionProvider partitionProvider =
(partitionId, index, availabilityListener) -> view;
EmbeddedChannel ch = new EmbeddedChannel(queue);
CreditBasedSequenceNumberingViewReader seqView = new CreditBasedSequenceNumberingViewReader(new InputChannelID(), 2, queue);
seqView.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
// Add available buffer to trigger enqueue the erroneous view
seqView.notifyDataAvailable();
ch.runPendingTasks();
// Read the enqueued msg
Object msg = ch.readOutbound();
assertEquals(msg.getClass(), NettyMessage.ErrorResponse.class);
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
assertTrue(err.cause instanceof CancelTaskException);
}
示例16
@Before
public void setup() throws IOException, InterruptedException {
networkBufferPool = new NetworkBufferPool(8, BUFFER_SIZE, 8);
inputGate = createSingleInputGate(1, networkBufferPool);
RemoteInputChannel inputChannel = createRemoteInputChannel(
inputGate,
new TestingPartitionRequestClient());
inputChannel.requestSubpartition(0);
inputGate.setInputChannels(inputChannel);
inputGate.assignExclusiveSegments();
CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
handler.addInputChannel(inputChannel);
channel = new EmbeddedChannel(
new NettyMessageEncoder(), // For outbound messages
new NettyMessageClientDecoderDelegate(handler)); // For inbound messages
inputChannelId = inputChannel.getInputChannelId();
}
示例17
@Before
public void setup() throws IOException, InterruptedException {
CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
networkBufferPool = new NetworkBufferPool(
NUMBER_OF_BUFFER_RESPONSES,
BUFFER_SIZE,
NUMBER_OF_BUFFER_RESPONSES);
channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler));
inputGate = createSingleInputGate(1, networkBufferPool);
RemoteInputChannel inputChannel = createRemoteInputChannel(
inputGate,
new TestingPartitionRequestClient());
inputGate.setInputChannels(inputChannel);
inputGate.assignExclusiveSegments();
inputChannel.requestSubpartition(0);
handler.addInputChannel(inputChannel);
inputChannelId = inputChannel.getInputChannelId();
SingleInputGate releasedInputGate = createSingleInputGate(1, networkBufferPool);
RemoteInputChannel releasedInputChannel = new InputChannelBuilder()
.buildRemoteChannel(inputGate);
releasedInputGate.close();
handler.addInputChannel(releasedInputChannel);
releasedInputChannelId = releasedInputChannel.getInputChannelId();
}
示例18
private List<NettyMessage> decodeMessages(EmbeddedChannel channel, List<ByteBuf> inputBuffers) {
for (ByteBuf buffer : inputBuffers) {
channel.writeInbound(buffer);
}
channel.runPendingTasks();
List<NettyMessage> decodedMessages = new ArrayList<>();
Object input;
while ((input = channel.readInbound()) != null) {
assertTrue(input instanceof NettyMessage);
decodedMessages.add((NettyMessage) input);
}
return decodedMessages;
}
示例19
/**
* In case of enqueuing an empty reader and a reader that actually has some buffers when channel is not writable,
* on channelWritability change event should result in reading all of the messages.
*/
@Test
public void testNotifyReaderNonEmptyOnEmptyReaders() throws Exception {
final int buffersToWrite = 5;
PartitionRequestQueue queue = new PartitionRequestQueue();
EmbeddedChannel channel = new EmbeddedChannel(queue);
CreditBasedSequenceNumberingViewReader reader1 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(0, 0), 10, queue);
CreditBasedSequenceNumberingViewReader reader2 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(1, 1), 10, queue);
reader1.requestSubpartitionView((partitionId, index, availabilityListener) -> new EmptyAlwaysAvailableResultSubpartitionView(), new ResultPartitionID(), 0);
reader1.notifyDataAvailable();
assertTrue(reader1.isAvailable());
assertFalse(reader1.isRegisteredAsAvailable());
channel.unsafe().outboundBuffer().setUserDefinedWritability(1, false);
assertFalse(channel.isWritable());
reader1.notifyDataAvailable();
channel.runPendingTasks();
reader2.notifyDataAvailable();
reader2.requestSubpartitionView((partitionId, index, availabilityListener) -> new DefaultBufferResultSubpartitionView(buffersToWrite), new ResultPartitionID(), 0);
assertTrue(reader2.isAvailable());
assertFalse(reader2.isRegisteredAsAvailable());
reader2.notifyDataAvailable();
// changing a channel writability should result in draining both reader1 and reader2
channel.unsafe().outboundBuffer().setUserDefinedWritability(1, true);
channel.runPendingTasks();
assertEquals(buffersToWrite, channel.outboundMessages().size());
}
示例20
/**
* Tests {@link PartitionRequestQueue#enqueueAvailableReader(NetworkSequenceViewReader)},
* verifying the reader would be enqueued in the pipeline if the next sending buffer is an event
* even though it has no available credits.
*/
@Test
public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
// setup
final ResultSubpartitionView view = new NextIsEventResultSubpartitionView();
ResultPartitionProvider partitionProvider =
(partitionId, index, availabilityListener) -> view;
final InputChannelID receiverId = new InputChannelID();
final PartitionRequestQueue queue = new PartitionRequestQueue();
final CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
final EmbeddedChannel channel = new EmbeddedChannel(queue);
reader.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
// block the channel so that we see an intermediate state in the test
ByteBuf channelBlockingBuffer = blockChannel(channel);
assertNull(channel.readOutbound());
// Notify an available event buffer to trigger enqueue the reader
reader.notifyDataAvailable();
channel.runPendingTasks();
// The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available
assertThat(queue.getAvailableReaders(), contains(reader)); // contains only (this) one!
assertEquals(0, reader.getNumCreditsAvailable());
// Flush the buffer to make the channel writable again and see the final results
channel.flush();
assertSame(channelBlockingBuffer, channel.readOutbound());
assertEquals(0, queue.getAvailableReaders().size());
assertEquals(0, reader.getNumCreditsAvailable());
assertNull(channel.readOutbound());
}
示例21
@Test
public void testDoublePartitionRequest() throws Exception {
final PartitionRequestClientHandler handler = new PartitionRequestClientHandler();
final EmbeddedChannel channel = new EmbeddedChannel(handler);
final PartitionRequestClient client = new PartitionRequestClient(
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));
final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
inputGate.setBufferPool(bufferPool);
final int numExclusiveBuffers = 2;
inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
inputChannel.requestSubpartition(0);
// The input channel should only send one partition request
assertTrue(channel.isWritable());
Object readFromOutbound = channel.readOutbound();
assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
assertEquals(inputChannel.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId);
assertEquals(numExclusiveBuffers, ((PartitionRequest) readFromOutbound).credit);
assertNull(channel.readOutbound());
} finally {
// Release all the buffer resources
inputGate.releaseAllResources();
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
}
}
示例22
/**
* Verifies that "Connection reset by peer" Exceptions are special-cased and are reported as
* an instance of {@link RemoteTransportException}.
*/
@Test
public void testConnectionResetByPeer() throws Throwable {
EmbeddedChannel ch = createEmbeddedChannel();
NetworkClientHandler handler = getClientHandler(ch);
RemoteInputChannel rich = addInputChannel(handler);
final Throwable[] error = new Throwable[1];
// Verify the Exception
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Throwable cause = (Throwable) invocation.getArguments()[0];
try {
assertEquals(RemoteTransportException.class, cause.getClass());
assertNotEquals("Connection reset by peer", cause.getMessage());
assertEquals(IOException.class, cause.getCause().getClass());
assertEquals("Connection reset by peer", cause.getCause().getMessage());
}
catch (Throwable t) {
error[0] = t;
}
return null;
}
}).when(rich).onError(any(Throwable.class));
ch.pipeline().fireExceptionCaught(new IOException("Connection reset by peer"));
assertNull(error[0]);
}
示例23
private EmbeddedChannel createEmbeddedChannel() {
NettyProtocol protocol = new NettyProtocol(
mock(ResultPartitionProvider.class),
mock(TaskEventDispatcher.class),
true);
return new EmbeddedChannel(protocol.getClientChannelHandlers());
}
示例24
/**
* Tests the failure response with {@link UnknownKvStateIdException} as cause on
* queries for unregistered KvStateIDs.
*/
@Test
public void testQueryUnknownKvStateID() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
long requestId = Integer.MAX_VALUE + 182828L;
KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
// Write the request and wait for the response
channel.writeInbound(serRequest);
ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
buf.skipBytes(4); // skip frame length
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
assertEquals(requestId, response.getRequestId());
assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateIdException);
assertEquals(1L, stats.getNumRequests());
assertEquals(1L, stats.getNumFailed());
}
示例25
/**
* Tests that incoming buffer instances are recycled.
*/
@Test
public void testIncomingBufferIsRecycled() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
assertEquals(1L, serRequest.refCnt());
// Write regular request
channel.writeInbound(serRequest);
assertEquals("Buffer not recycled", 0L, serRequest.refCnt());
// Write unexpected msg
ByteBuf unexpected = channel.alloc().buffer(8);
unexpected.writeInt(4);
unexpected.writeInt(4);
assertEquals(1L, unexpected.refCnt());
channel.writeInbound(unexpected);
assertEquals("Buffer not recycled", 0L, unexpected.refCnt());
}
示例26
/**
* In case of enqueuing an empty reader and a reader that actually has some buffers when channel is not writable,
* on channelWritability change event should result in reading all of the messages.
*/
@Test
public void testNotifyReaderNonEmptyOnEmptyReaders() throws Exception {
final int buffersToWrite = 5;
PartitionRequestQueue queue = new PartitionRequestQueue();
EmbeddedChannel channel = new EmbeddedChannel(queue);
CreditBasedSequenceNumberingViewReader reader1 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(0, 0), 10, queue);
CreditBasedSequenceNumberingViewReader reader2 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(1, 1), 10, queue);
reader1.requestSubpartitionView((partitionId, index, availabilityListener) -> new EmptyAlwaysAvailableResultSubpartitionView(), new ResultPartitionID(), 0);
reader1.notifyDataAvailable();
assertTrue(reader1.isAvailable());
assertFalse(reader1.isRegisteredAsAvailable());
channel.unsafe().outboundBuffer().setUserDefinedWritability(1, false);
assertFalse(channel.isWritable());
reader1.notifyDataAvailable();
channel.runPendingTasks();
reader2.notifyDataAvailable();
reader2.requestSubpartitionView((partitionId, index, availabilityListener) -> new DefaultBufferResultSubpartitionView(buffersToWrite), new ResultPartitionID(), 0);
assertTrue(reader2.isAvailable());
assertFalse(reader2.isRegisteredAsAvailable());
reader2.notifyDataAvailable();
// changing a channel writability should result in draining both reader1 and reader2
channel.unsafe().outboundBuffer().setUserDefinedWritability(1, true);
channel.runPendingTasks();
assertEquals(buffersToWrite, channel.outboundMessages().size());
}
示例27
/**
* Tests {@link PartitionRequestQueue#enqueueAvailableReader(NetworkSequenceViewReader)},
* verifying the reader would be enqueued in the pipeline if the next sending buffer is an event
* even though it has no available credits.
*/
@Test
public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
// setup
final ResultSubpartitionView view = new NextIsEventResultSubpartitionView();
ResultPartitionProvider partitionProvider =
(partitionId, index, availabilityListener) -> view;
final InputChannelID receiverId = new InputChannelID();
final PartitionRequestQueue queue = new PartitionRequestQueue();
final CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
final EmbeddedChannel channel = new EmbeddedChannel(queue);
reader.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
// block the channel so that we see an intermediate state in the test
ByteBuf channelBlockingBuffer = blockChannel(channel);
assertNull(channel.readOutbound());
// Notify an available event buffer to trigger enqueue the reader
reader.notifyDataAvailable();
channel.runPendingTasks();
// The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available
assertThat(queue.getAvailableReaders(), contains(reader)); // contains only (this) one!
assertEquals(0, reader.getNumCreditsAvailable());
// Flush the buffer to make the channel writable again and see the final results
channel.flush();
assertSame(channelBlockingBuffer, channel.readOutbound());
assertEquals(0, queue.getAvailableReaders().size());
assertEquals(0, reader.getNumCreditsAvailable());
assertNull(channel.readOutbound());
}
示例28
private void testCancelPartitionRequest(boolean isAvailableView) throws Exception {
// setup
final ResultPartitionManager partitionManager = new ResultPartitionManager();
final ResultPartition partition = createFinishedPartitionWithFilledData(partitionManager);
final InputChannelID receiverId = new InputChannelID();
final PartitionRequestQueue queue = new PartitionRequestQueue();
final CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
final EmbeddedChannel channel = new EmbeddedChannel(queue);
reader.requestSubpartitionView(partitionManager, partition.getPartitionId(), 0);
// add this reader into allReaders queue
queue.notifyReaderCreated(reader);
// block the channel so that we see an intermediate state in the test
blockChannel(channel);
// add credit to make this reader available for adding into availableReaders queue
if (isAvailableView) {
queue.addCredit(receiverId, 1);
assertTrue(queue.getAvailableReaders().contains(reader));
}
// cancel this subpartition view
queue.cancel(receiverId);
channel.runPendingTasks();
assertFalse(queue.getAvailableReaders().contains(reader));
// the partition and its reader view should all be released
assertTrue(reader.isReleased());
assertTrue(partition.isReleased());
for (ResultSubpartition subpartition : partition.getAllPartitions()) {
assertTrue(subpartition.isReleased());
}
// cleanup
channel.close();
}
示例29
@Test
public void testDoublePartitionRequest() throws Exception {
final PartitionRequestClientHandler handler = new PartitionRequestClientHandler();
final EmbeddedChannel channel = new EmbeddedChannel(handler);
final PartitionRequestClient client = new NettyPartitionRequestClient(
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));
final int numExclusiveBuffers = 2;
final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, numExclusiveBuffers);
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client, networkBufferPool);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
inputGate.setBufferPool(bufferPool);
inputGate.assignExclusiveSegments();
inputChannel.requestSubpartition(0);
// The input channel should only send one partition request
assertTrue(channel.isWritable());
Object readFromOutbound = channel.readOutbound();
assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
assertEquals(inputChannel.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId);
assertEquals(numExclusiveBuffers, ((PartitionRequest) readFromOutbound).credit);
assertNull(channel.readOutbound());
} finally {
// Release all the buffer resources
inputGate.close();
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
}
}
示例30
/**
* Verifies that "Connection reset by peer" Exceptions are special-cased and are reported as
* an instance of {@link RemoteTransportException}.
*/
@Test
public void testConnectionResetByPeer() throws Throwable {
EmbeddedChannel ch = createEmbeddedChannel();
NetworkClientHandler handler = getClientHandler(ch);
RemoteInputChannel rich = addInputChannel(handler);
final Throwable[] error = new Throwable[1];
// Verify the Exception
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Throwable cause = (Throwable) invocation.getArguments()[0];
try {
assertEquals(RemoteTransportException.class, cause.getClass());
assertNotEquals("Connection reset by peer", cause.getMessage());
assertEquals(IOException.class, cause.getCause().getClass());
assertEquals("Connection reset by peer", cause.getCause().getMessage());
}
catch (Throwable t) {
error[0] = t;
}
return null;
}
}).when(rich).onError(any(Throwable.class));
ch.pipeline().fireExceptionCaught(new IOException("Connection reset by peer"));
assertNull(error[0]);
}