Java源码示例:org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
示例1
TaskManagerServices(
TaskManagerLocation taskManagerLocation,
MemoryManager memoryManager,
IOManager ioManager,
NetworkEnvironment networkEnvironment,
BroadcastVariableManager broadcastVariableManager,
TaskSlotTable taskSlotTable,
JobManagerTable jobManagerTable,
JobLeaderService jobLeaderService,
TaskExecutorLocalStateStoresManager taskManagerStateStore) {
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
this.memoryManager = Preconditions.checkNotNull(memoryManager);
this.ioManager = Preconditions.checkNotNull(ioManager);
this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
this.jobManagerTable = Preconditions.checkNotNull(jobManagerTable);
this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService);
this.taskManagerStateStore = Preconditions.checkNotNull(taskManagerStateStore);
}
示例2
public TaskManagerServicesBuilder() {
taskManagerLocation = new LocalTaskManagerLocation();
memoryManager = new MemoryManager(
MemoryManager.MIN_PAGE_SIZE,
1,
MemoryManager.MIN_PAGE_SIZE,
MemoryType.HEAP,
false);
ioManager = mock(IOManager.class);
networkEnvironment = mock(NetworkEnvironment.class);
broadcastVariableManager = new BroadcastVariableManager();
taskSlotTable = mock(TaskSlotTable.class);
jobManagerTable = new JobManagerTable();
jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
}
示例3
TaskManagerServices(
TaskManagerLocation taskManagerLocation,
MemoryManager memoryManager,
IOManager ioManager,
ShuffleEnvironment<?, ?> shuffleEnvironment,
KvStateService kvStateService,
BroadcastVariableManager broadcastVariableManager,
TaskSlotTable taskSlotTable,
JobManagerTable jobManagerTable,
JobLeaderService jobLeaderService,
TaskExecutorLocalStateStoresManager taskManagerStateStore,
TaskEventDispatcher taskEventDispatcher) {
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
this.memoryManager = Preconditions.checkNotNull(memoryManager);
this.ioManager = Preconditions.checkNotNull(ioManager);
this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
this.kvStateService = Preconditions.checkNotNull(kvStateService);
this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
this.jobManagerTable = Preconditions.checkNotNull(jobManagerTable);
this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService);
this.taskManagerStateStore = Preconditions.checkNotNull(taskManagerStateStore);
this.taskEventDispatcher = Preconditions.checkNotNull(taskEventDispatcher);
}
示例4
public TaskManagerServicesBuilder() {
taskManagerLocation = new LocalTaskManagerLocation();
memoryManager = new MemoryManager(
MemoryManager.MIN_PAGE_SIZE,
1,
MemoryManager.MIN_PAGE_SIZE,
MemoryType.HEAP,
false);
ioManager = mock(IOManager.class);
shuffleEnvironment = mock(ShuffleEnvironment.class);
kvStateService = new KvStateService(new KvStateRegistry(), null, null);
broadcastVariableManager = new BroadcastVariableManager();
taskEventDispatcher = new TaskEventDispatcher();
taskSlotTable = mock(TaskSlotTable.class);
jobManagerTable = new JobManagerTable();
jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
}
示例5
private TaskExecutorTestingContext createTaskExecutorTestingContext(final TaskSlotTable<Task> taskSlotTable) throws IOException {
final OneShotLatch offerSlotsLatch = new OneShotLatch();
final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
.setOfferSlotsFunction((resourceID, slotOffers) -> {
offerSlotsLatch.trigger();
return CompletableFuture.completedFuture(slotOffers);
}).build();
rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
final JobLeaderService jobLeaderService = new DefaultJobLeaderService(
unresolvedTaskManagerLocation,
RetryingRegistrationConfiguration.defaultConfiguration());
TaskExecutorLocalStateStoresManager stateStoresManager = createTaskExecutorLocalStateStoresManager();
final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(new TaskManagerServicesBuilder()
.setTaskSlotTable(taskSlotTable)
.setJobLeaderService(jobLeaderService)
.setTaskStateManager(stateStoresManager)
.build());
jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
return new TaskExecutorTestingContext(jobMasterGateway, taskSlotTable, taskExecutor);
}
示例6
@Test
public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
final String resourceManagerAddress = "/resource/manager/address/one";
final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
final CountDownLatch taskManagerRegisteredLatch = new CountDownLatch(1);
testingResourceManagerGateway.setRegisterTaskExecutorFunction(FunctionUtils.uncheckedFunction(
ignored -> {
taskManagerRegisteredLatch.countDown();
return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
new InstanceID(), new ResourceID(resourceManagerAddress), new ClusterInformation("localhost", 1234)));
}
));
rpc.registerGateway(resourceManagerAddress, testingResourceManagerGateway);
final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);
try {
taskManager.start();
resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, UUID.randomUUID());
assertTrue(taskManagerRegisteredLatch.await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
}
示例7
@Test
public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
final String resourceManagerAddress = "/resource/manager/address/one";
final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
final CountDownLatch taskManagerRegisteredLatch = new CountDownLatch(1);
testingResourceManagerGateway.setRegisterTaskExecutorFunction(FunctionUtils.uncheckedFunction(
ignored -> {
taskManagerRegisteredLatch.countDown();
return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
new InstanceID(), new ResourceID(resourceManagerAddress), new ClusterInformation("localhost", 1234)));
}
));
rpc.registerGateway(resourceManagerAddress, testingResourceManagerGateway);
final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);
try {
taskManager.start();
resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, UUID.randomUUID());
assertTrue(taskManagerRegisteredLatch.await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
}
示例8
TaskManagerServices(
UnresolvedTaskManagerLocation unresolvedTaskManagerLocation,
long managedMemorySize,
IOManager ioManager,
ShuffleEnvironment<?, ?> shuffleEnvironment,
KvStateService kvStateService,
BroadcastVariableManager broadcastVariableManager,
TaskSlotTable<Task> taskSlotTable,
JobTable jobTable,
JobLeaderService jobLeaderService,
TaskExecutorLocalStateStoresManager taskManagerStateStore,
TaskEventDispatcher taskEventDispatcher,
ExecutorService ioExecutor,
LibraryCacheManager libraryCacheManager) {
this.unresolvedTaskManagerLocation = Preconditions.checkNotNull(unresolvedTaskManagerLocation);
this.managedMemorySize = managedMemorySize;
this.ioManager = Preconditions.checkNotNull(ioManager);
this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
this.kvStateService = Preconditions.checkNotNull(kvStateService);
this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
this.jobTable = Preconditions.checkNotNull(jobTable);
this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService);
this.taskManagerStateStore = Preconditions.checkNotNull(taskManagerStateStore);
this.taskEventDispatcher = Preconditions.checkNotNull(taskEventDispatcher);
this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
this.libraryCacheManager = Preconditions.checkNotNull(libraryCacheManager);
}
示例9
public TaskManagerServicesBuilder() {
unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
ioManager = mock(IOManager.class);
shuffleEnvironment = mock(ShuffleEnvironment.class);
kvStateService = new KvStateService(new KvStateRegistry(), null, null);
broadcastVariableManager = new BroadcastVariableManager();
taskEventDispatcher = new TaskEventDispatcher();
taskSlotTable = TestingTaskSlotTable.<Task>newBuilder().closeAsyncReturns(CompletableFuture.completedFuture(null)).build();
jobTable = DefaultJobTable.create();
jobLeaderService = new DefaultJobLeaderService(unresolvedTaskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
ioExecutor = TestingUtils.defaultExecutor();
libraryCacheManager = TestingLibraryCacheManager.newBuilder().build();
}
示例10
@Test
public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {
final TaskSlotTableImpl<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
final JobLeaderService jobLeaderService = new DefaultJobLeaderService(unresolvedTaskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
final IOManager ioManager = new IOManagerAsync(tmp.newFolder().getAbsolutePath());
final TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
ioManager.getSpillingDirectories(),
Executors.directExecutor());
nettyShuffleEnvironment.start();
final KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null);
kvStateService.start();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation)
.setIoManager(ioManager)
.setShuffleEnvironment(nettyShuffleEnvironment)
.setKvStateService(kvStateService)
.setTaskSlotTable(taskSlotTable)
.setJobLeaderService(jobLeaderService)
.setTaskStateManager(localStateStoresManager)
.build();
final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);
try {
taskManager.start();
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
assertThat(taskSlotTable.isClosed(), is(true));
assertThat(nettyShuffleEnvironment.isClosed(), is(true));
assertThat(kvStateService.isShutdown(), is(true));
}
示例11
@Test
public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
final String resourceManagerAddress = "/resource/manager/address/one";
final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
final CountDownLatch taskManagerRegisteredLatch = new CountDownLatch(1);
testingResourceManagerGateway.setRegisterTaskExecutorFunction(FunctionUtils.uncheckedFunction(
ignored -> {
taskManagerRegisteredLatch.countDown();
return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
new InstanceID(), new ResourceID(resourceManagerAddress), new ClusterInformation("localhost", 1234)));
}
));
rpc.registerGateway(resourceManagerAddress, testingResourceManagerGateway);
final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);
try {
taskManager.start();
resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, UUID.randomUUID());
assertTrue(taskManagerRegisteredLatch.await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
}
示例12
public TaskExecutorLocalStateStoresManager getTaskManagerStateStore() {
return taskManagerStateStore;
}
示例13
/**
* Creates and returns the task manager services.
*
* @param resourceID resource ID of the task manager
* @param taskManagerServicesConfiguration task manager configuration
* @param taskIOExecutor executor for async IO operations.
* @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
* @param maxJvmHeapMemory the maximum JVM heap size
* @return task manager components
* @throws Exception
*/
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
ResourceID resourceID,
Executor taskIOExecutor,
long freeHeapMemoryWithDefrag,
long maxJvmHeapMemory) throws Exception {
// pre-start checks
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
network.start();
final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
resourceID,
taskManagerServicesConfiguration.getTaskManagerAddress(),
network.getConnectionManager().getDataPort());
// this call has to happen strictly after the network stack has been initialized
final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
resourceProfiles.add(ResourceProfile.ANY);
}
final TimerService<AllocationID> timerService = new TimerService<>(
new ScheduledThreadPoolExecutor(1),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());
final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
final JobManagerTable jobManagerTable = new JobManagerTable();
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
}
final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
stateRootDirectoryFiles,
taskIOExecutor);
return new TaskManagerServices(
taskManagerLocation,
memoryManager,
ioManager,
network,
broadcastVariableManager,
taskSlotTable,
jobManagerTable,
jobLeaderService,
taskStateManager);
}
示例14
public TaskManagerServicesBuilder setTaskStateManager(TaskExecutorLocalStateStoresManager taskStateManager) {
this.taskStateManager = taskStateManager;
return this;
}
示例15
@Test
public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {
final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
final IOManager ioManager = new IOManagerAsync(tmp.newFolder().getAbsolutePath());
final TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
ioManager.getSpillingDirectories(),
Executors.directExecutor());
final MemoryManager memoryManager = new MemoryManager(
4096,
1,
4096,
MemoryType.HEAP,
false);
final NetworkEnvironment networkEnvironment = new NetworkEnvironment(
1,
1,
0,
0,
2,
8,
true);
networkEnvironment.start();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setMemoryManager(memoryManager)
.setIoManager(ioManager)
.setNetworkEnvironment(networkEnvironment)
.setTaskSlotTable(taskSlotTable)
.setJobLeaderService(jobLeaderService)
.setTaskStateManager(localStateStoresManager)
.build();
final long heartbeatInterval = 1000L;
final long heartbeatTimeout = 1000L;
final HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
final TaskExecutor taskManager = new TaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
taskManagerServices,
heartbeatServices,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
try {
taskManager.start();
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
assertThat(memoryManager.isShutdown(), is(true));
assertThat(networkEnvironment.isShutdown(), is(true));
assertThat(ioManager.isProperlyShutDown(), is(true));
}
示例16
@Test
public void testHeartbeatTimeoutWithResourceManager() throws Exception {
final String rmAddress = "rm";
final ResourceID rmResourceId = new ResourceID(rmAddress);
final long heartbeatInterval = 1L;
final long heartbeatTimeout = 3L;
final ResourceManagerId rmLeaderId = ResourceManagerId.generate();
TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
rmLeaderId,
rmResourceId,
rmAddress,
rmAddress);
final TaskExecutorRegistrationSuccess registrationResponse = new TaskExecutorRegistrationSuccess(
new InstanceID(),
rmResourceId,
new ClusterInformation("localhost", 1234));
final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>();
final CountDownLatch registrationAttempts = new CountDownLatch(2);
rmGateway.setRegisterTaskExecutorFunction(
registration -> {
taskExecutorRegistrationFuture.complete(registration.f1);
registrationAttempts.countDown();
return CompletableFuture.completedFuture(registrationResponse);
});
final CompletableFuture<ResourceID> taskExecutorDisconnectFuture = new CompletableFuture<>();
rmGateway.setDisconnectTaskExecutorConsumer(
disconnectInfo -> taskExecutorDisconnectFuture.complete(disconnectInfo.f0));
rpc.registerGateway(rmAddress, rmGateway);
final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
final SlotReport slotReport = new SlotReport();
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
final TaskExecutor taskManager = new TaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
taskManagerServices,
heartbeatServices,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
try {
taskManager.start();
// define a leader and see that a registration happens
resourceManagerLeaderRetriever.notifyListener(rmAddress, rmLeaderId.toUUID());
// register resource manager success will trigger monitoring heartbeat target between tm and rm
assertThat(taskExecutorRegistrationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));
// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));
assertTrue(
"The TaskExecutor should try to reconnect to the RM",
registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
}
示例17
/**
* Tests that the correct slot report is sent as part of the heartbeat response.
*/
@Test
public void testHeartbeatSlotReporting() throws Exception {
final String rmAddress = "rm";
final UUID rmLeaderId = UUID.randomUUID();
// register the mock resource manager gateway
final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway();
final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>();
final ResourceID rmResourceId = rmGateway.getOwnResourceId();
final CompletableFuture<RegistrationResponse> registrationResponse = CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(
new InstanceID(),
rmResourceId,
new ClusterInformation("localhost", 1234)));
rmGateway.setRegisterTaskExecutorFunction(stringResourceIDIntegerHardwareDescriptionTuple4 -> {
taskExecutorRegistrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1);
return registrationResponse;
});
final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();
rmGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2);
return CompletableFuture.completedFuture(Acknowledge.get());
});
final CompletableFuture<SlotReport> heartbeatSlotReportFuture = new CompletableFuture<>();
rmGateway.setTaskExecutorHeartbeatConsumer((resourceID, slotReport) -> heartbeatSlotReportFuture.complete(slotReport));
rpc.registerGateway(rmAddress, rmGateway);
final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0);
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
final SlotReport slotReport1 = new SlotReport(
new SlotStatus(
slotId,
resourceProfile));
final SlotReport slotReport2 = new SlotReport(
new SlotStatus(
slotId,
resourceProfile,
new JobID(),
new AllocationID()));
final TestingTaskSlotTable taskSlotTable = new TestingTaskSlotTable(new ArrayDeque<>(Arrays.asList(slotReport1, slotReport2)));
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
final TaskExecutor taskManager = new TaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
taskManagerServices,
HEARTBEAT_SERVICES,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
try {
taskManager.start();
// define a leader and see that a registration happens
resourceManagerLeaderRetriever.notifyListener(rmAddress, rmLeaderId);
// register resource manager success will trigger monitoring heartbeat target between tm and rm
assertThat(taskExecutorRegistrationFuture.get(), equalTo(taskManagerLocation.getResourceID()));
assertThat(initialSlotReportFuture.get(), equalTo(slotReport1));
TaskExecutorGateway taskExecutorGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);
// trigger the heartbeat asynchronously
taskExecutorGateway.heartbeatFromResourceManager(rmResourceId);
// wait for heartbeat response
SlotReport actualSlotReport = heartbeatSlotReportFuture.get();
// the new slot report should be reported
assertEquals(slotReport2, actualSlotReport);
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
}
示例18
@Test
public void testTriggerRegistrationOnLeaderChange() throws Exception {
final String address1 = "/resource/manager/address/one";
final String address2 = "/resource/manager/address/two";
final UUID leaderId1 = UUID.randomUUID();
final UUID leaderId2 = UUID.randomUUID();
final ResourceID rmResourceId1 = new ResourceID(address1);
final ResourceID rmResourceId2 = new ResourceID(address2);
// register the mock resource manager gateways
ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
when(rmGateway1.registerTaskExecutor(
anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, new ClusterInformation("localhost", 1234))));
when(rmGateway2.registerTaskExecutor(
anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, new ClusterInformation("localhost", 1234))));
rpc.registerGateway(address1, rmGateway1);
rpc.registerGateway(address2, rmGateway2);
final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
final SlotReport slotReport = new SlotReport();
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
TaskExecutor taskManager = new TaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
taskManagerServices,
HEARTBEAT_SERVICES,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
try {
taskManager.start();
String taskManagerAddress = taskManager.getAddress();
// no connection initially, since there is no leader
assertNull(taskManager.getResourceManagerConnection());
// define a leader and see that a registration happens
resourceManagerLeaderRetriever.notifyListener(address1, leaderId1);
verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
eq(taskManagerAddress), eq(taskManagerLocation.getResourceID()), anyInt(), any(HardwareDescription.class), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
// cancel the leader
resourceManagerLeaderRetriever.notifyListener(null, null);
// set a new leader, see that a registration happens
resourceManagerLeaderRetriever.notifyListener(address2, leaderId2);
verify(rmGateway2, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
eq(taskManagerAddress), eq(taskManagerLocation.getResourceID()), anyInt(), any(HardwareDescription.class), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
}
finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
}
示例19
/**
* Tests that a TaskManager detects a job leader for which it has reserved slots. Upon detecting
* the job leader, it will offer all reserved slots to the JobManager.
*/
@Test
public void testJobLeaderDetection() throws Exception {
final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
final JobManagerTable jobManagerTable = new JobManagerTable();
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
CompletableFuture<Void> initialSlotReportFuture = new CompletableFuture<>();
resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
initialSlotReportFuture.complete(null);
return CompletableFuture.completedFuture(Acknowledge.get());
});
final CompletableFuture<Collection<SlotOffer>> offeredSlotsFuture = new CompletableFuture<>();
final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
.setOfferSlotsFunction((resourceID, slotOffers) -> {
offeredSlotsFuture.complete(new ArrayList<>(slotOffers));
return CompletableFuture.completedFuture(slotOffers);
})
.build();
rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
final AllocationID allocationId = new AllocationID();
final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setJobManagerTable(jobManagerTable)
.setJobLeaderService(jobLeaderService)
.setTaskStateManager(localStateStoresManager)
.build();
TaskExecutor taskManager = new TaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
taskManagerServices,
HEARTBEAT_SERVICES,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
try {
taskManager.start();
final TaskExecutorGateway tmGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);
// tell the task manager about the rm leader
resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
// wait for the initial slot report
initialSlotReportFuture.get();
// request slots from the task manager under the given allocation id
CompletableFuture<Acknowledge> slotRequestAck = tmGateway.requestSlot(
slotId,
jobId,
allocationId,
jobMasterGateway.getAddress(),
resourceManagerGateway.getFencingToken(),
timeout);
slotRequestAck.get();
// now inform the task manager about the new job leader
jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
final Collection<SlotOffer> offeredSlots = offeredSlotsFuture.get();
final Collection<AllocationID> allocationIds = offeredSlots.stream().map(SlotOffer::getAllocationId).collect(Collectors.toList());
assertThat(allocationIds, containsInAnyOrder(allocationId));
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
}
示例20
/**
* This tests makes sure that duplicate JobMaster gained leadership messages are filtered out
* by the TaskExecutor. See FLINK-7526.
*/
@Test
public void testFilterOutDuplicateJobMasterRegistrations() throws Exception {
final long verificationTimeout = 500L;
final JobLeaderService jobLeaderService = mock(JobLeaderService.class);
final HeartbeatServices heartbeatServicesMock = mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS);
final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
when(jobMasterGateway.getHostname()).thenReturn("localhost");
final JMTMRegistrationSuccess registrationMessage = new JMTMRegistrationSuccess(ResourceID.generate());
final JobManagerTable jobManagerTableMock = spy(new JobManagerTable());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setJobManagerTable(jobManagerTableMock)
.setJobLeaderService(jobLeaderService)
.setTaskStateManager(localStateStoresManager)
.build();
final TestingTaskExecutor taskExecutor = new TestingTaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
taskManagerServices,
heartbeatServicesMock,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
try {
taskExecutor.start();
taskExecutor.waitUntilStarted();
ArgumentCaptor<JobLeaderListener> jobLeaderListenerArgumentCaptor = ArgumentCaptor.forClass(JobLeaderListener.class);
verify(jobLeaderService).start(anyString(), any(RpcService.class), any(HighAvailabilityServices.class), jobLeaderListenerArgumentCaptor.capture());
JobLeaderListener taskExecutorListener = jobLeaderListenerArgumentCaptor.getValue();
taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, registrationMessage);
// duplicate job manager gained leadership message
taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, registrationMessage);
ArgumentCaptor<JobManagerConnection> jobManagerConnectionArgumentCaptor = ArgumentCaptor.forClass(JobManagerConnection.class);
verify(jobManagerTableMock, Mockito.timeout(verificationTimeout).times(1)).put(eq(jobId), jobManagerConnectionArgumentCaptor.capture());
JobManagerConnection jobManagerConnection = jobManagerConnectionArgumentCaptor.getValue();
assertEquals(jobMasterGateway, jobManagerConnection.getJobManagerGateway());
} finally {
RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
}
}
示例21
/**
* Tests that the heartbeat is stopped once the TaskExecutor detects that the RM is no longer leader.
*
* <p>See FLINK-8462
*/
@Test
public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
final long heartbeatInterval = 1L;
final long heartbeatTimeout = 10000L;
final long pollTimeout = 1000L;
final RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
final ResourceID rmResourceID = ResourceID.generate();
final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
final String rmAddress = "rm";
final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
ResourceManagerId.generate(),
rmResourceID,
rmAddress,
rmAddress);
rpc.registerGateway(rmAddress, rmGateway);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
final TaskExecutor taskExecutor = new TaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
taskManagerServices,
heartbeatServices,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
try {
taskExecutor.start();
final BlockingQueue<ResourceID> unmonitoredTargets = heartbeatServices.getUnmonitoredTargets();
final BlockingQueue<ResourceID> monitoredTargets = heartbeatServices.getMonitoredTargets();
resourceManagerLeaderRetriever.notifyListener(rmAddress, rmGateway.getFencingToken().toUUID());
// wait for TM registration by checking the registered heartbeat targets
assertThat(
monitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS),
equalTo(rmResourceID));
// let RM lose leadership
resourceManagerLeaderRetriever.notifyListener(null, null);
// the timeout should not have triggered since it is much higher
assertThat(unmonitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS), equalTo(rmResourceID));
} finally {
RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
}
}
示例22
/**
* Tests that a job is removed from the JobLeaderService once a TaskExecutor has
* no more slots assigned to this job.
*
* <p>See FLINK-8504
*/
@Test
public void testRemoveJobFromJobLeaderService() throws Exception {
final TaskSlotTable taskSlotTable = new TaskSlotTable(
Collections.singleton(ResourceProfile.UNKNOWN),
timerService);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
final TestingTaskExecutor taskExecutor = new TestingTaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
taskManagerServices,
HEARTBEAT_SERVICES,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
try {
final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
final CompletableFuture<Void> initialSlotReport = new CompletableFuture<>();
resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
initialSlotReport.complete(null);
return CompletableFuture.completedFuture(Acknowledge.get());
});
final ResourceManagerId resourceManagerId = resourceManagerGateway.getFencingToken();
rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerId.toUUID());
final CompletableFuture<LeaderRetrievalListener> startFuture = new CompletableFuture<>();
final CompletableFuture<Void> stopFuture = new CompletableFuture<>();
final StartStopNotifyingLeaderRetrievalService jobMasterLeaderRetriever = new StartStopNotifyingLeaderRetrievalService(
startFuture,
stopFuture);
haServices.setJobMasterLeaderRetriever(jobId, jobMasterLeaderRetriever);
taskExecutor.start();
taskExecutor.waitUntilStarted();
final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0);
final AllocationID allocationId = new AllocationID();
assertThat(startFuture.isDone(), is(false));
final JobLeaderService jobLeaderService = taskManagerServices.getJobLeaderService();
assertThat(jobLeaderService.containsJob(jobId), is(false));
// wait for the initial slot report
initialSlotReport.get();
taskExecutorGateway.requestSlot(
slotId,
jobId,
allocationId,
"foobar",
resourceManagerId,
timeout).get();
// wait until the job leader retrieval service for jobId is started
startFuture.get();
assertThat(jobLeaderService.containsJob(jobId), is(true));
taskExecutorGateway.freeSlot(allocationId, new FlinkException("Test exception"), timeout).get();
// wait that the job leader retrieval service for jobId stopped becaue it should get removed
stopFuture.get();
assertThat(jobLeaderService.containsJob(jobId), is(false));
} finally {
RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
}
}
示例23
private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager() throws IOException {
return new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
}
示例24
public TaskExecutorLocalStateStoresManager getTaskManagerStateStore() {
return taskManagerStateStore;
}
示例25
/**
* Creates and returns the task manager services.
*
* @param taskManagerServicesConfiguration task manager configuration
* @param taskManagerMetricGroup metric group of the task manager
* @param taskIOExecutor executor for async IO operations
* @return task manager components
* @throws Exception
*/
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
MetricGroup taskManagerMetricGroup,
Executor taskIOExecutor) throws Exception {
// pre-start checks
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
taskManagerMetricGroup);
final int dataPort = shuffleEnvironment.start();
final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
kvStateService.start();
final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
taskManagerServicesConfiguration.getResourceID(),
taskManagerServicesConfiguration.getTaskManagerAddress(),
dataPort);
// this call has to happen strictly after the network stack has been initialized
final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration);
final long managedMemorySize = memoryManager.getMemorySize();
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
final int numOfSlots = taskManagerServicesConfiguration.getNumberOfSlots();
final List<ResourceProfile> resourceProfiles =
Collections.nCopies(numOfSlots, computeSlotResourceProfile(numOfSlots, managedMemorySize));
final TimerService<AllocationID> timerService = new TimerService<>(
new ScheduledThreadPoolExecutor(1),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());
final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
final JobManagerTable jobManagerTable = new JobManagerTable();
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
}
final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
stateRootDirectoryFiles,
taskIOExecutor);
return new TaskManagerServices(
taskManagerLocation,
memoryManager,
ioManager,
shuffleEnvironment,
kvStateService,
broadcastVariableManager,
taskSlotTable,
jobManagerTable,
jobLeaderService,
taskStateManager,
taskEventDispatcher);
}
示例26
public TaskManagerServicesBuilder setTaskStateManager(TaskExecutorLocalStateStoresManager taskStateManager) {
this.taskStateManager = taskStateManager;
return this;
}
示例27
@Test
public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {
final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
final IOManager ioManager = new IOManagerAsync(tmp.newFolder().getAbsolutePath());
final TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
ioManager.getSpillingDirectories(),
Executors.directExecutor());
final MemoryManager memoryManager = new MemoryManager(
4096,
1,
4096,
MemoryType.HEAP,
false);
nettyShuffleEnvironment.start();
final KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null);
kvStateService.start();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setMemoryManager(memoryManager)
.setIoManager(ioManager)
.setShuffleEnvironment(nettyShuffleEnvironment)
.setKvStateService(kvStateService)
.setTaskSlotTable(taskSlotTable)
.setJobLeaderService(jobLeaderService)
.setTaskStateManager(localStateStoresManager)
.build();
final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);
try {
taskManager.start();
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
assertThat(memoryManager.isShutdown(), is(true));
assertThat(nettyShuffleEnvironment.isClosed(), is(true));
assertThat(kvStateService.isShutdown(), is(true));
}
示例28
@Test
public void testHeartbeatTimeoutWithResourceManager() throws Exception {
final String rmAddress = "rm";
final ResourceID rmResourceId = new ResourceID(rmAddress);
final long heartbeatInterval = 1L;
final long heartbeatTimeout = 3L;
final ResourceManagerId rmLeaderId = ResourceManagerId.generate();
TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
rmLeaderId,
rmResourceId,
rmAddress,
rmAddress);
final TaskExecutorRegistrationSuccess registrationResponse = new TaskExecutorRegistrationSuccess(
new InstanceID(),
rmResourceId,
new ClusterInformation("localhost", 1234));
final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>();
final CountDownLatch registrationAttempts = new CountDownLatch(2);
rmGateway.setRegisterTaskExecutorFunction(
registration -> {
taskExecutorRegistrationFuture.complete(registration.f1);
registrationAttempts.countDown();
return CompletableFuture.completedFuture(registrationResponse);
});
final CompletableFuture<ResourceID> taskExecutorDisconnectFuture = new CompletableFuture<>();
rmGateway.setDisconnectTaskExecutorConsumer(
disconnectInfo -> taskExecutorDisconnectFuture.complete(disconnectInfo.f0));
rpc.registerGateway(rmAddress, rmGateway);
final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
final SlotReport slotReport = new SlotReport();
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
final TaskExecutor taskManager = createTaskExecutor(taskManagerServices, heartbeatServices);
try {
taskManager.start();
// define a leader and see that a registration happens
resourceManagerLeaderRetriever.notifyListener(rmAddress, rmLeaderId.toUUID());
// register resource manager success will trigger monitoring heartbeat target between tm and rm
assertThat(taskExecutorRegistrationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));
// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));
assertTrue(
"The TaskExecutor should try to reconnect to the RM",
registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
}
示例29
/**
* Tests that the correct slot report is sent as part of the heartbeat response.
*/
@Test
public void testHeartbeatSlotReporting() throws Exception {
final String rmAddress = "rm";
final UUID rmLeaderId = UUID.randomUUID();
// register the mock resource manager gateway
final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway();
final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>();
final ResourceID rmResourceId = rmGateway.getOwnResourceId();
final CompletableFuture<RegistrationResponse> registrationResponse = CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(
new InstanceID(),
rmResourceId,
new ClusterInformation("localhost", 1234)));
rmGateway.setRegisterTaskExecutorFunction(stringResourceIDIntegerHardwareDescriptionTuple4 -> {
taskExecutorRegistrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1);
return registrationResponse;
});
final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();
rmGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2);
return CompletableFuture.completedFuture(Acknowledge.get());
});
final CompletableFuture<SlotReport> heartbeatSlotReportFuture = new CompletableFuture<>();
rmGateway.setTaskExecutorHeartbeatConsumer((resourceID, slotReport) -> heartbeatSlotReportFuture.complete(slotReport));
rpc.registerGateway(rmAddress, rmGateway);
final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0);
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
final SlotReport slotReport1 = new SlotReport(
new SlotStatus(
slotId,
resourceProfile));
final SlotReport slotReport2 = new SlotReport(
new SlotStatus(
slotId,
resourceProfile,
new JobID(),
new AllocationID()));
final TestingTaskSlotTable taskSlotTable = new TestingTaskSlotTable(new ArrayDeque<>(Arrays.asList(slotReport1, slotReport2)));
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);
try {
taskManager.start();
// define a leader and see that a registration happens
resourceManagerLeaderRetriever.notifyListener(rmAddress, rmLeaderId);
// register resource manager success will trigger monitoring heartbeat target between tm and rm
assertThat(taskExecutorRegistrationFuture.get(), equalTo(taskManagerLocation.getResourceID()));
assertThat(initialSlotReportFuture.get(), equalTo(slotReport1));
TaskExecutorGateway taskExecutorGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);
// trigger the heartbeat asynchronously
taskExecutorGateway.heartbeatFromResourceManager(rmResourceId);
// wait for heartbeat response
SlotReport actualSlotReport = heartbeatSlotReportFuture.get();
// the new slot report should be reported
assertEquals(slotReport2, actualSlotReport);
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
}
示例30
@Test
public void testTriggerRegistrationOnLeaderChange() throws Exception {
final String address1 = "/resource/manager/address/one";
final String address2 = "/resource/manager/address/two";
final UUID leaderId1 = UUID.randomUUID();
final UUID leaderId2 = UUID.randomUUID();
final ResourceID rmResourceId1 = new ResourceID(address1);
final ResourceID rmResourceId2 = new ResourceID(address2);
// register the mock resource manager gateways
ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
when(rmGateway1.registerTaskExecutor(
anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, new ClusterInformation("localhost", 1234))));
when(rmGateway2.registerTaskExecutor(
anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, new ClusterInformation("localhost", 1234))));
rpc.registerGateway(address1, rmGateway1);
rpc.registerGateway(address2, rmGateway2);
final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
final SlotReport slotReport = new SlotReport();
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
TaskExecutor taskManager = createTaskExecutor(taskManagerServices);
try {
taskManager.start();
String taskManagerAddress = taskManager.getAddress();
// no connection initially, since there is no leader
assertNull(taskManager.getResourceManagerConnection());
// define a leader and see that a registration happens
resourceManagerLeaderRetriever.notifyListener(address1, leaderId1);
verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
eq(taskManagerAddress), eq(taskManagerLocation.getResourceID()), anyInt(), any(HardwareDescription.class), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
// cancel the leader
resourceManagerLeaderRetriever.notifyListener(null, null);
// set a new leader, see that a registration happens
resourceManagerLeaderRetriever.notifyListener(address2, leaderId2);
verify(rmGateway2, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
eq(taskManagerAddress), eq(taskManagerLocation.getResourceID()), anyInt(), any(HardwareDescription.class), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
}
finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
}