Java源码示例:org.apache.flink.runtime.taskmanager.TaskActions
示例1
/**
* Helper to create simple {@link ResultPartition} instance for use by a {@link Task} inside
* {@link NetworkEnvironment#registerTask(Task)}.
*
* @param partitionType
* the produced partition type
* @param channels
* the number of output channels
*
* @return instance with minimal data set and some mocks so that it is useful for {@link
* NetworkEnvironment#registerTask(Task)}
*/
private static ResultPartition createResultPartition(
final ResultPartitionType partitionType, final int channels) {
return new ResultPartition(
"TestTask-" + partitionType + ":" + channels,
mock(TaskActions.class),
new JobID(),
new ResultPartitionID(),
partitionType,
channels,
channels,
mock(ResultPartitionManager.class),
new NoOpResultPartitionConsumableNotifier(),
mock(IOManager.class),
false);
}
示例2
private SingleInputGate createInputGate(
int numberOfInputChannels, ResultPartitionType partitionType) {
SingleInputGate inputGate = new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(),
partitionType,
0,
numberOfInputChannels,
mock(TaskActions.class),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
enableCreditBasedFlowControl);
assertEquals(partitionType, inputGate.getConsumedPartitionType());
return inputGate;
}
示例3
/**
* Tests {@link ResultPartition#addBufferConsumer} on a partition which has already finished.
*
* @param pipelined the result partition type to set up
*/
protected void testAddOnFinishedPartition(final ResultPartitionType pipelined)
throws Exception {
BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
try {
ResultPartition partition = createPartition(notifier, pipelined, true);
partition.finish();
reset(notifier);
// partition.add() should fail
partition.addBufferConsumer(bufferConsumer, 0);
Assert.fail("exception expected");
} catch (IllegalStateException e) {
// expected => ignored
} finally {
if (!bufferConsumer.isRecycled()) {
bufferConsumer.close();
Assert.fail("bufferConsumer not recycled");
}
// should not have notified either
verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
}
}
示例4
/**
* Tests {@link ResultPartition#addBufferConsumer} on a partition which has already been released.
*
* @param pipelined the result partition type to set up
*/
protected void testAddOnReleasedPartition(final ResultPartitionType pipelined)
throws Exception {
BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
try {
ResultPartition partition = createPartition(notifier, pipelined, true);
partition.release();
// partition.add() silently drops the bufferConsumer but recycles it
partition.addBufferConsumer(bufferConsumer, 0);
assertTrue(partition.isReleased());
} finally {
if (!bufferConsumer.isRecycled()) {
bufferConsumer.close();
Assert.fail("bufferConsumer not recycled");
}
// should not have notified either
verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
}
}
示例5
/**
* Tests {@link ResultPartition#addBufferConsumer(BufferConsumer, int)} on a working partition.
*
* @param pipelined the result partition type to set up
*/
protected void testAddOnPartition(final ResultPartitionType pipelined)
throws Exception {
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
ResultPartition partition = createPartition(notifier, pipelined, true);
BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
try {
// partition.add() adds the bufferConsumer without recycling it (if not spilling)
partition.addBufferConsumer(bufferConsumer, 0);
assertFalse("bufferConsumer should not be recycled (still in the queue)", bufferConsumer.isRecycled());
} finally {
if (!bufferConsumer.isRecycled()) {
bufferConsumer.close();
}
// should have been notified for pipelined partitions
if (pipelined.isPipelined()) {
verify(notifier, times(1))
.notifyPartitionConsumable(
eq(partition.getJobId()),
eq(partition.getPartitionId()),
any(TaskActions.class));
}
}
}
示例6
private static ResultPartition createPartition(
ResultPartitionConsumableNotifier notifier,
ResultPartitionType type,
boolean sendScheduleOrUpdateConsumersMessage) {
return new ResultPartition(
"TestTask",
mock(TaskActions.class),
new JobID(),
new ResultPartitionID(),
type,
1,
1,
mock(ResultPartitionManager.class),
notifier,
ioManager,
sendScheduleOrUpdateConsumersMessage);
}
示例7
@SuppressWarnings("unchecked")
public FairnessVerifyingInputGate(
String owningTaskName,
JobID jobId,
IntermediateDataSetID consumedResultId,
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
TaskIOMetricGroup metrics,
boolean isCreditBased) {
super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED,
consumedSubpartitionIndex,
numberOfInputChannels, taskActions, metrics, isCreditBased);
try {
Field f = SingleInputGate.class.getDeclaredField("inputChannelsWithData");
f.setAccessible(true);
channelsWithData = (ArrayDeque<InputChannel>) f.get(this);
}
catch (Exception e) {
throw new RuntimeException(e);
}
this.uniquenessChecker = new HashSet<>();
}
示例8
/**
* Tests {@link ResultPartition#addBufferConsumer(BufferConsumer, int)} on a working partition.
*
* @param partitionType the result partition type to set up
*/
private void testAddOnPartition(final ResultPartitionType partitionType) throws Exception {
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
JobID jobId = new JobID();
TaskActions taskActions = new NoOpTaskActions();
ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
partitionType,
taskActions,
jobId,
notifier);
BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
try {
// partition.add() adds the bufferConsumer without recycling it (if not spilling)
consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
assertFalse("bufferConsumer should not be recycled (still in the queue)", bufferConsumer.isRecycled());
} finally {
if (!bufferConsumer.isRecycled()) {
bufferConsumer.close();
}
// should have been notified for pipelined partitions
if (partitionType.isPipelined()) {
verify(notifier, times(1))
.notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions));
}
}
}
示例9
/**
* Tests {@link ResultPartition#addBufferConsumer(BufferConsumer, int)} on a working partition.
*
* @param partitionType the result partition type to set up
*/
private void testAddOnPartition(final ResultPartitionType partitionType) throws Exception {
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
JobID jobId = new JobID();
TaskActions taskActions = new NoOpTaskActions();
ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
partitionType,
taskActions,
jobId,
notifier);
BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
try {
// partition.add() adds the bufferConsumer without recycling it (if not spilling)
consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
assertFalse("bufferConsumer should not be recycled (still in the queue)", bufferConsumer.isRecycled());
} finally {
if (!bufferConsumer.isRecycled()) {
bufferConsumer.close();
}
// should have been notified for pipelined partitions
if (partitionType.isPipelined()) {
verify(notifier, times(1))
.notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions));
}
}
}
示例10
@Override
public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);
acknowledgeFuture.whenCompleteAsync(
(Acknowledge ack, Throwable throwable) -> {
if (throwable != null) {
LOG.error("Could not schedule or update consumers at the JobManager.", throwable);
taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
}
},
executor);
}
示例11
public SingleInputGate(
String owningTaskName,
JobID jobId,
IntermediateDataSetID consumedResultId,
final ResultPartitionType consumedPartitionType,
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
TaskIOMetricGroup metrics,
boolean isCreditBased) {
this.owningTaskName = checkNotNull(owningTaskName);
this.jobId = checkNotNull(jobId);
this.consumedResultId = checkNotNull(consumedResultId);
this.consumedPartitionType = checkNotNull(consumedPartitionType);
checkArgument(consumedSubpartitionIndex >= 0);
this.consumedSubpartitionIndex = consumedSubpartitionIndex;
checkArgument(numberOfInputChannels > 0);
this.numberOfInputChannels = numberOfInputChannels;
this.inputChannels = new HashMap<>(numberOfInputChannels);
this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
this.enqueuedInputChannelsWithData = new BitSet(numberOfInputChannels);
this.taskActions = checkNotNull(taskActions);
this.isCreditBased = isCreditBased;
}
示例12
/**
* Creates and returns the single input gate for credit-based testing.
*
* @return The new created single input gate.
*/
static SingleInputGate createSingleInputGate() {
return new SingleInputGate(
"InputGate",
new JobID(),
new IntermediateDataSetID(),
ResultPartitionType.PIPELINED,
0,
1,
mock(TaskActions.class),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
}
示例13
/**
* Helper to create spy of a {@link SingleInputGate} for use by a {@link Task} inside
* {@link NetworkEnvironment#registerTask(Task)}.
*
* @param partitionType
* the consumed partition type
* @param channels
* the number of input channels
*
* @return input gate with some fake settings
*/
private SingleInputGate createSingleInputGate(
final ResultPartitionType partitionType, final int channels) {
return spy(new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(),
partitionType,
0,
channels,
mock(TaskActions.class),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
enableCreditBasedFlowControl));
}
示例14
ResultPartitionWithCountDownLatch(
String owningTaskName,
TaskActions taskActions,
JobID jobId,
ResultPartitionID partitionId,
ResultPartitionType partitionType,
int numberOfSubpartitions,
int numTargetKeyGroups,
ResultPartitionManager partitionManager,
ResultPartitionConsumableNotifier partitionConsumableNotifier,
IOManager ioManager,
boolean sendScheduleOrUpdateConsumersMessage,
CountDownLatch blockLatch,
CountDownLatch doneLatch) {
super(
owningTaskName,
taskActions,
jobId,
partitionId,
partitionType,
numberOfSubpartitions,
numTargetKeyGroups,
partitionManager,
partitionConsumableNotifier,
ioManager,
sendScheduleOrUpdateConsumersMessage);
this.blockLatch = Preconditions.checkNotNull(blockLatch);
this.doneLatch = Preconditions.checkNotNull(doneLatch);
}
示例15
private SingleInputGate createSingleInputGate() {
return new SingleInputGate(
"InputGate",
new JobID(),
new IntermediateDataSetID(),
ResultPartitionType.PIPELINED,
0,
1,
mock(TaskActions.class),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
}
示例16
@Test
public void testConsumptionWithLocalChannels() throws Exception {
final int numberOfChannels = 11;
final int buffersPerChannel = 1000;
final ResultPartition resultPartition = mock(ResultPartition.class);
final PipelinedSubpartition[] partitions = new PipelinedSubpartition[numberOfChannels];
final Source[] sources = new Source[numberOfChannels];
final ResultPartitionManager resultPartitionManager = createResultPartitionManager(partitions);
final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, numberOfChannels,
mock(TaskActions.class),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
for (int i = 0; i < numberOfChannels; i++) {
LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
partitions[i] = new PipelinedSubpartition(0, resultPartition);
sources[i] = new PipelinedSubpartitionSource(partitions[i]);
}
ProducerThread producer = new ProducerThread(sources, numberOfChannels * buffersPerChannel, 4, 10);
ConsumerThread consumer = new ConsumerThread(gate, numberOfChannels * buffersPerChannel);
producer.start();
consumer.start();
// the 'sync()' call checks for exceptions and failed assertions
producer.sync();
consumer.sync();
}
示例17
@Test
public void testConsumptionWithRemoteChannels() throws Exception {
final int numberOfChannels = 11;
final int buffersPerChannel = 1000;
final ConnectionManager connManager = createDummyConnectionManager();
final Source[] sources = new Source[numberOfChannels];
final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0,
numberOfChannels,
mock(TaskActions.class),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
for (int i = 0; i < numberOfChannels; i++) {
RemoteInputChannel channel = new RemoteInputChannel(
gate, i, new ResultPartitionID(), mock(ConnectionID.class),
connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
sources[i] = new RemoteChannelSource(channel);
}
ProducerThread producer = new ProducerThread(sources, numberOfChannels * buffersPerChannel, 4, 10);
ConsumerThread consumer = new ConsumerThread(gate, numberOfChannels * buffersPerChannel);
producer.start();
consumer.start();
// the 'sync()' call checks for exceptions and failed assertions
producer.sync();
consumer.sync();
}
示例18
@Override
public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);
acknowledgeFuture.whenCompleteAsync(
(Acknowledge ack, Throwable throwable) -> {
if (throwable != null) {
LOG.error("Could not schedule or update consumers at the JobManager.", throwable);
taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
}
},
executor);
}
示例19
/**
* Tests {@link ResultPartition#addBufferConsumer} on a partition which has already finished.
*
* @param partitionType the result partition type to set up
*/
private void testAddOnFinishedPartition(final ResultPartitionType partitionType) throws Exception {
BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
JobID jobId = new JobID();
TaskActions taskActions = new NoOpTaskActions();
ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
partitionType,
taskActions,
jobId,
notifier);
try {
consumableNotifyingPartitionWriter.finish();
reset(notifier);
// partition.add() should fail
consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
Assert.fail("exception expected");
} catch (IllegalStateException e) {
// expected => ignored
} finally {
if (!bufferConsumer.isRecycled()) {
bufferConsumer.close();
Assert.fail("bufferConsumer not recycled");
}
// should not have notified either
verify(notifier, never()).notifyPartitionConsumable(
eq(jobId),
eq(consumableNotifyingPartitionWriter.getPartitionId()),
eq(taskActions));
}
}
示例20
/**
* Tests {@link ResultPartition#addBufferConsumer} on a partition which has already been released.
*
* @param partitionType the result partition type to set up
*/
private void testAddOnReleasedPartition(final ResultPartitionType partitionType) throws Exception {
BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
JobID jobId = new JobID();
TaskActions taskActions = new NoOpTaskActions();
ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
new ResultPartitionWriter[] {partition},
taskActions,
jobId,
notifier)[0];
try {
partition.release();
// partition.add() silently drops the bufferConsumer but recycles it
consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
assertTrue(partition.isReleased());
} finally {
if (!bufferConsumer.isRecycled()) {
bufferConsumer.close();
Assert.fail("bufferConsumer not recycled");
}
// should not have notified either
verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions));
}
}
示例21
private ResultPartitionWriter createConsumableNotifyingResultPartitionWriter(
ResultPartitionType partitionType,
TaskActions taskActions,
JobID jobId,
ResultPartitionConsumableNotifier notifier) {
ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
return ConsumableNotifyingResultPartitionWriterDecorator.decorate(
Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
new ResultPartitionWriter[] {partition},
taskActions,
jobId,
notifier)[0];
}
示例22
@Override
public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);
acknowledgeFuture.whenCompleteAsync(
(Acknowledge ack, Throwable throwable) -> {
if (throwable != null) {
LOG.error("Could not schedule or update consumers at the JobManager.", throwable);
taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
}
},
executor);
}
示例23
/**
* Tests {@link ResultPartition#addBufferConsumer} on a partition which has already finished.
*
* @param partitionType the result partition type to set up
*/
private void testAddOnFinishedPartition(final ResultPartitionType partitionType) throws Exception {
BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
JobID jobId = new JobID();
TaskActions taskActions = new NoOpTaskActions();
ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
partitionType,
taskActions,
jobId,
notifier);
try {
consumableNotifyingPartitionWriter.finish();
reset(notifier);
// partition.add() should fail
consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
Assert.fail("exception expected");
} catch (IllegalStateException e) {
// expected => ignored
} finally {
if (!bufferConsumer.isRecycled()) {
bufferConsumer.close();
Assert.fail("bufferConsumer not recycled");
}
// should not have notified either
verify(notifier, never()).notifyPartitionConsumable(
eq(jobId),
eq(consumableNotifyingPartitionWriter.getPartitionId()),
eq(taskActions));
}
}
示例24
/**
* Tests {@link ResultPartition#addBufferConsumer} on a partition which has already been released.
*
* @param partitionType the result partition type to set up
*/
private void testAddOnReleasedPartition(final ResultPartitionType partitionType) throws Exception {
BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
JobID jobId = new JobID();
TaskActions taskActions = new NoOpTaskActions();
ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
new ResultPartitionWriter[] {partition},
taskActions,
jobId,
notifier)[0];
try {
partition.release();
// partition.add() silently drops the bufferConsumer but recycles it
consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
assertTrue(partition.isReleased());
} finally {
if (!bufferConsumer.isRecycled()) {
bufferConsumer.close();
Assert.fail("bufferConsumer not recycled");
}
// should not have notified either
verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions));
}
}
示例25
private ResultPartitionWriter createConsumableNotifyingResultPartitionWriter(
ResultPartitionType partitionType,
TaskActions taskActions,
JobID jobId,
ResultPartitionConsumableNotifier notifier) {
ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
return ConsumableNotifyingResultPartitionWriterDecorator.decorate(
Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
new ResultPartitionWriter[] {partition},
taskActions,
jobId,
notifier)[0];
}
示例26
/**
* Creates an input gate and all of its input channels.
*/
public static SingleInputGate create(
String owningTaskName,
JobID jobId,
ExecutionAttemptID executionId,
InputGateDeploymentDescriptor igdd,
NetworkEnvironment networkEnvironment,
TaskActions taskActions,
TaskIOMetricGroup metrics) {
final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType());
final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex();
checkArgument(consumedSubpartitionIndex >= 0);
final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
icdd.length, taskActions, metrics, networkEnvironment.isCreditBased());
// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
int numLocalChannels = 0;
int numRemoteChannels = 0;
int numUnknownChannels = 0;
for (int i = 0; i < inputChannels.length; i++) {
final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
if (partitionLocation.isLocal()) {
inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
networkEnvironment.getResultPartitionManager(),
networkEnvironment.getTaskEventDispatcher(),
networkEnvironment.getPartitionRequestInitialBackoff(),
networkEnvironment.getPartitionRequestMaxBackoff(),
metrics
);
numLocalChannels++;
}
else if (partitionLocation.isRemote()) {
inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
partitionLocation.getConnectionId(),
networkEnvironment.getConnectionManager(),
networkEnvironment.getPartitionRequestInitialBackoff(),
networkEnvironment.getPartitionRequestMaxBackoff(),
metrics
);
numRemoteChannels++;
}
else if (partitionLocation.isUnknown()) {
inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
networkEnvironment.getResultPartitionManager(),
networkEnvironment.getTaskEventDispatcher(),
networkEnvironment.getConnectionManager(),
networkEnvironment.getPartitionRequestInitialBackoff(),
networkEnvironment.getPartitionRequestMaxBackoff(),
metrics
);
numUnknownChannels++;
}
else {
throw new IllegalStateException("Unexpected partition location.");
}
inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
}
LOG.debug("{}: Created {} input channels (local: {}, remote: {}, unknown: {}).",
owningTaskName,
inputChannels.length,
numLocalChannels,
numRemoteChannels,
numUnknownChannels);
return inputGate;
}
示例27
public ResultPartition(
String owningTaskName,
TaskActions taskActions, // actions on the owning task
JobID jobId,
ResultPartitionID partitionId,
ResultPartitionType partitionType,
int numberOfSubpartitions,
int numTargetKeyGroups,
ResultPartitionManager partitionManager,
ResultPartitionConsumableNotifier partitionConsumableNotifier,
IOManager ioManager,
boolean sendScheduleOrUpdateConsumersMessage) {
this.owningTaskName = checkNotNull(owningTaskName);
this.taskActions = checkNotNull(taskActions);
this.jobId = checkNotNull(jobId);
this.partitionId = checkNotNull(partitionId);
this.partitionType = checkNotNull(partitionType);
this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
this.numTargetKeyGroups = numTargetKeyGroups;
this.partitionManager = checkNotNull(partitionManager);
this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
// Create the subpartitions.
switch (partitionType) {
case BLOCKING:
for (int i = 0; i < subpartitions.length; i++) {
subpartitions[i] = new SpillableSubpartition(i, this, ioManager);
}
break;
case PIPELINED:
case PIPELINED_BOUNDED:
for (int i = 0; i < subpartitions.length; i++) {
subpartitions[i] = new PipelinedSubpartition(i, this);
}
break;
default:
throw new IllegalArgumentException("Unsupported result partition type.");
}
// Initially, partitions should be consumed once before release.
pin();
LOG.debug("{}: Initialized {}", owningTaskName, this);
}
示例28
public TestSingleInputGate(int numberOfInputChannels, boolean initialize) {
checkArgument(numberOfInputChannels >= 1);
SingleInputGate realGate = new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(),
ResultPartitionType.PIPELINED,
0,
numberOfInputChannels,
mock(TaskActions.class),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
this.inputGate = spy(realGate);
// Notify about late registrations (added for DataSinkTaskTest#testUnionDataSinkTask).
// After merging registerInputOutput and invoke, we have to make sure that the test
// notifications happen at the expected time. In real programs, this is guaranteed by
// the instantiation and request partition life cycle.
try {
Field f = realGate.getClass().getDeclaredField("inputChannelsWithData");
f.setAccessible(true);
final ArrayDeque<InputChannel> notifications = (ArrayDeque<InputChannel>) f.get(realGate);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
invocation.callRealMethod();
synchronized (notifications) {
if (!notifications.isEmpty()) {
InputGateListener listener = (InputGateListener) invocation.getArguments()[0];
listener.notifyInputGateNonEmpty(inputGate);
}
}
return null;
}
}).when(inputGate).registerListener(any(InputGateListener.class));
} catch (Exception e) {
throw new RuntimeException(e);
}
this.inputChannels = new TestInputChannel[numberOfInputChannels];
if (initialize) {
for (int i = 0; i < numberOfInputChannels; i++) {
inputChannels[i] = new TestInputChannel(inputGate, i);
inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i]);
}
}
}
示例29
/**
* Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
* value after receiving all end-of-partition events.
*
* <p>For buffer-or-event instances, it is important to verify that they have been set off to
* the correct logical index.
*/
@Test(timeout = 120 * 1000)
public void testBasicGetNextLogic() throws Exception {
// Setup
final String testTaskName = "Test Task";
final SingleInputGate ig1 = new SingleInputGate(
testTaskName, new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, 3,
mock(TaskActions.class),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
final SingleInputGate ig2 = new SingleInputGate(
testTaskName, new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, 5,
mock(TaskActions.class),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
assertEquals(ig1.getNumberOfInputChannels() + ig2.getNumberOfInputChannels(), union.getNumberOfInputChannels());
final TestInputChannel[][] inputChannels = new TestInputChannel[][]{
TestInputChannel.createInputChannels(ig1, 3),
TestInputChannel.createInputChannels(ig2, 5)
};
inputChannels[0][0].readBuffer(); // 0 => 0
inputChannels[0][0].readEndOfPartitionEvent(); // 0 => 0
inputChannels[1][2].readBuffer(); // 2 => 5
inputChannels[1][2].readEndOfPartitionEvent(); // 2 => 5
inputChannels[1][0].readBuffer(); // 0 => 3
inputChannels[1][1].readBuffer(); // 1 => 4
inputChannels[0][1].readBuffer(); // 1 => 1
inputChannels[1][3].readBuffer(); // 3 => 6
inputChannels[0][1].readEndOfPartitionEvent(); // 1 => 1
inputChannels[1][3].readEndOfPartitionEvent(); // 3 => 6
inputChannels[0][2].readBuffer(); // 1 => 2
inputChannels[0][2].readEndOfPartitionEvent(); // 1 => 2
inputChannels[1][4].readBuffer(); // 4 => 7
inputChannels[1][4].readEndOfPartitionEvent(); // 4 => 7
inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3
inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3
ig1.notifyChannelNonEmpty(inputChannels[0][0]);
ig1.notifyChannelNonEmpty(inputChannels[0][1]);
ig1.notifyChannelNonEmpty(inputChannels[0][2]);
ig2.notifyChannelNonEmpty(inputChannels[1][0]);
ig2.notifyChannelNonEmpty(inputChannels[1][1]);
ig2.notifyChannelNonEmpty(inputChannels[1][2]);
ig2.notifyChannelNonEmpty(inputChannels[1][3]);
ig2.notifyChannelNonEmpty(inputChannels[1][4]);
verifyBufferOrEvent(union, true, 0, true); // gate 1, channel 0
verifyBufferOrEvent(union, true, 3, true); // gate 2, channel 0
verifyBufferOrEvent(union, true, 1, true); // gate 1, channel 1
verifyBufferOrEvent(union, true, 4, true); // gate 2, channel 1
verifyBufferOrEvent(union, true, 2, true); // gate 1, channel 2
verifyBufferOrEvent(union, true, 5, true); // gate 2, channel 1
verifyBufferOrEvent(union, false, 0, true); // gate 1, channel 0
verifyBufferOrEvent(union, true, 6, true); // gate 2, channel 1
verifyBufferOrEvent(union, false, 1, true); // gate 1, channel 1
verifyBufferOrEvent(union, true, 7, true); // gate 2, channel 1
verifyBufferOrEvent(union, false, 2, true); // gate 1, channel 2
verifyBufferOrEvent(union, false, 3, true); // gate 2, channel 0
verifyBufferOrEvent(union, false, 4, true); // gate 2, channel 1
verifyBufferOrEvent(union, false, 5, true); // gate 2, channel 2
verifyBufferOrEvent(union, false, 6, true); // gate 2, channel 3
verifyBufferOrEvent(union, false, 7, false); // gate 2, channel 4
// Return null when the input gate has received all end-of-partition events
assertTrue(union.isFinished());
assertFalse(union.getNextBufferOrEvent().isPresent());
}
示例30
/**
* Tests request back off configuration is correctly forwarded to the channels.
*/
@Test
public void testRequestBackoffConfiguration() throws Exception {
ResultPartitionID[] partitionIds = new ResultPartitionID[] {
new ResultPartitionID(),
new ResultPartitionID(),
new ResultPartitionID()
};
InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{
// Local
new InputChannelDeploymentDescriptor(
partitionIds[0],
ResultPartitionLocation.createLocal()),
// Remote
new InputChannelDeploymentDescriptor(
partitionIds[1],
ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
// Unknown
new InputChannelDeploymentDescriptor(
partitionIds[2],
ResultPartitionLocation.createUnknown())};
InputGateDeploymentDescriptor gateDesc =
new InputGateDeploymentDescriptor(new IntermediateDataSetID(),
ResultPartitionType.PIPELINED, 0, channelDescs);
int initialBackoff = 137;
int maxBackoff = 1001;
final NetworkEnvironment netEnv = new NetworkEnvironment(
100, 32, initialBackoff, maxBackoff, 2, 8, enableCreditBasedFlowControl);
SingleInputGate gate = SingleInputGate.create(
"TestTask",
new JobID(),
new ExecutionAttemptID(),
gateDesc,
netEnv,
mock(TaskActions.class),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
try {
assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());
Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();
assertEquals(3, channelMap.size());
InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId());
assertEquals(LocalInputChannel.class, localChannel.getClass());
InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId());
assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId());
assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
InputChannel[] channels =
new InputChannel[] {localChannel, remoteChannel, unknownChannel};
for (InputChannel ch : channels) {
assertEquals(0, ch.getCurrentBackoff());
assertTrue(ch.increaseBackoff());
assertEquals(initialBackoff, ch.getCurrentBackoff());
assertTrue(ch.increaseBackoff());
assertEquals(initialBackoff * 2, ch.getCurrentBackoff());
assertTrue(ch.increaseBackoff());
assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff());
assertTrue(ch.increaseBackoff());
assertEquals(maxBackoff, ch.getCurrentBackoff());
assertFalse(ch.increaseBackoff());
}
} finally {
gate.releaseAllResources();
netEnv.shutdown();
}
}