Java源码示例:org.apache.helix.task.TaskResult
示例1
@Override
public TaskResult run() {
try {
logger.info(String
.format("Waiting for a single task process to finish. job name: %s. job id: %s",
this.jobName, this.jobId));
int exitCode = this.taskProcess.waitFor();
if (exitCode == 0) {
logger.info("Task process finished. job name: {}. job id: {}", this.jobName, this.jobId);
return new TaskResult(TaskResult.Status.COMPLETED, "");
} else {
logger.warn("Task process failed with exitcode ({}). job name: {}. job id: {}", exitCode,
this.jobName, this.jobId);
return new TaskResult(TaskResult.Status.FATAL_FAILED, "Exit code: " + exitCode);
}
} catch (final Throwable t) {
logger.error("SingleHelixTask failed due to " + t.getMessage(), t);
return new TaskResult(TaskResult.Status.FAILED, Throwables.getStackTraceAsString(t));
}
}
示例2
@Override
public TaskResult run() {
if (_quotaType != null) {
synchronized (_quotaTypeExecutionCount) {
_quotaTypeExecutionCount.put(_quotaType, _quotaTypeExecutionCount.get(_quotaType) + 1);
}
}
// Only take long if finishTask is false
while (!_finishTask) {
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return new TaskResult(TaskResult.Status.COMPLETED,
generateInfoMessageForDebugging(_instanceName, _quotaType));
}
示例3
@Override
public TaskResult run() {
this.taskMetrics.helixTaskTotalRunning.incrementAndGet();
long startTime = System.currentTimeMillis();
log.info("Actual task {} started. [{} {}]", this.taskId, this.applicationName, this.instanceName);
try (Closer closer = Closer.create()) {
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, this.jobName));
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, this.jobKey));
this.task.run();
log.info("Actual task {} completed.", this.taskId);
this.taskMetrics.helixTaskTotalCompleted.incrementAndGet();
return new TaskResult(TaskResult.Status.COMPLETED, "");
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("Actual task {} interrupted.", this.taskId);
this.taskMetrics.helixTaskTotalFailed.incrementAndGet();
return new TaskResult(TaskResult.Status.CANCELED, "");
} catch (TaskCreationException te) {
eventBus.post(createTaskCreationEvent("Task Execution"));
log.error("Actual task {} failed in creation due to {}, will request new container to schedule it",
this.taskId, te.getMessage());
this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
return new TaskResult(TaskResult.Status.FAILED, Throwables.getStackTraceAsString(te));
} catch (Throwable t) {
log.error("Actual task {} failed due to {}", this.taskId, t.getMessage());
this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
return new TaskResult(TaskResult.Status.FAILED, Throwables.getStackTraceAsString(t));
} finally {
this.taskMetrics.helixTaskTotalRunning.decrementAndGet();
this.taskMetrics.updateTimeForTaskExecution(startTime);
}
}
示例4
@Test(dependsOnMethods = "testPrepareTask")
public void testRun() throws IOException {
TaskResult taskResult = this.gobblinHelixTask.run();
System.out.println(taskResult.getInfo());
Assert.assertEquals(taskResult.getStatus(), TaskResult.Status.COMPLETED);
File outputAvroFile = new File(this.taskOutputDir.toString(),
TestHelper.REL_WRITER_FILE_PATH + File.separator + TestHelper.WRITER_FILE_NAME);
Assert.assertTrue(outputAvroFile.exists());
Schema schema = new Schema.Parser().parse(TestHelper.SOURCE_SCHEMA);
TestHelper.assertGenericRecords(outputAvroFile, schema);
}
示例5
@Test
public void successTaskProcessShouldResultInCompletedStatus()
throws IOException, InterruptedException {
when(this.mockProcess.waitFor()).thenReturn(0);
final TaskResult result = createAndRunTask();
assertThat(result.getStatus()).isEqualTo(TaskResult.Status.COMPLETED);
final Path expectedPath = Paths.get(WORK_UNIT_FILE_PATH);
verify(this.mockLauncher).launch(JOB_ID, expectedPath);
verify(this.mockProcess).waitFor();
}
示例6
@Test
public void failedTaskProcessShouldResultInFailedStatus()
throws IOException, InterruptedException {
when(this.mockProcess.waitFor()).thenReturn(1);
final TaskResult result = createAndRunTask();
assertThat(result.getStatus()).isEqualTo(TaskResult.Status.FATAL_FAILED);
}
示例7
@Test
public void NonInterruptedExceptionShouldResultInFailedStatus()
throws IOException, InterruptedException {
when(this.mockProcess.waitFor()).thenThrow(new RuntimeException());
final TaskResult result = createAndRunTask();
assertThat(result.getStatus()).isEqualTo(TaskResult.Status.FAILED);
}
示例8
private TaskResult createAndRunTask()
throws IOException {
when(this.mockLauncher.launch(any(), any())).thenReturn(this.mockProcess);
final ImmutableMap<String, String> configMap = ImmutableMap
.of("job.name", "testJob", "job.id", JOB_ID, "gobblin.cluster.work.unit.file.path",
WORK_UNIT_FILE_PATH);
this.task = new SingleHelixTask(this.mockLauncher, configMap);
return this.task.run();
}
示例9
@Override
public TaskResult run() {
_invokedClasses.add(getClass().getName());
_runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1);
if (failTask) {
return new TaskResult(TaskResult.Status.FAILED, "");
}
return new TaskResult(TaskResult.Status.COMPLETED, "");
}
示例10
/**
* Number of instance is equal to number of failure threshold, thus job failure mechanism needs to
* consider given up
* tasks that no longer exist on the instance, not only given up tasks currently reported on
* CurrentState.
*/
@Test
public void testHighThreshold() throws InterruptedException {
final String WORKFLOW_NAME = "testWorkflow";
final String JOB_NAME = "testJob";
JobConfig.Builder jobBuilder =
new JobConfig.Builder().setWorkflow(WORKFLOW_NAME).setTargetResource(DB_NAME)
.setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
.setCommand(MockTask.TASK_COMMAND)
.setJobCommandConfigMap(
ImmutableMap.of(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FATAL_FAILED.name()))
.setFailureThreshold(1);
Workflow.Builder workflowBuilder =
new Workflow.Builder(WORKFLOW_NAME).addJob(JOB_NAME, jobBuilder);
_driver.start(workflowBuilder.build());
_driver.pollForJobState(WORKFLOW_NAME, TaskUtil.getNamespacedJobName(WORKFLOW_NAME, JOB_NAME),
TaskState.FAILED);
_driver.pollForWorkflowState(WORKFLOW_NAME, TaskState.FAILED);
JobContext jobContext =
_driver.getJobContext(TaskUtil.getNamespacedJobName(WORKFLOW_NAME, JOB_NAME));
int countAborted = 0;
int countNoState = 0;
for (int pId : jobContext.getPartitionSet()) {
TaskPartitionState state = jobContext.getPartitionState(pId);
if (state == TaskPartitionState.TASK_ABORTED) {
countAborted++;
} else if (state == null) {
countNoState++;
} else {
throw new HelixException(String.format("State %s is not expected.", state));
}
}
Assert.assertEquals(countAborted, 2); // Failure threshold is 1, so 2 tasks aborted.
Assert.assertEquals(countNoState, 3); // Other 3 tasks are not scheduled at all.
}
示例11
@Override
public synchronized TaskResult run() {
_invokedClasses.add(getClass().getName());
_runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1);
// Fail the task if it should fail
if (_shouldFail) {
return new TaskResult(Status.ERROR, null);
}
return super.run();
}
示例12
@Override
public synchronized TaskResult run() {
if (!hasFailed) {
hasFailed = true;
return new TaskResult(Status.ERROR, null);
}
return new TaskResult(Status.COMPLETED, null);
}
示例13
@Override
public TaskResult run() {
if (_quotaType != null) {
synchronized (_quotaTypeExecutionCount) {
_quotaTypeExecutionCount.put(_quotaType, _quotaTypeExecutionCount.get(_quotaType) + 1);
}
}
return new TaskResult(TaskResult.Status.COMPLETED,
generateInfoMessageForDebugging(_instanceName, _quotaType));
}
示例14
@Override
public TaskResult run() {
if (_quotaType != null) {
synchronized (_quotaTypeExecutionCount) {
_quotaTypeExecutionCount.put(_quotaType, _quotaTypeExecutionCount.get(_quotaType) + 1);
}
}
return new TaskResult(TaskResult.Status.FAILED,
generateInfoMessageForDebugging(_instanceName, _quotaType));
}
示例15
public MockTask(TaskCallbackContext context) {
Map<String, String> cfg = context.getJobConfig().getJobCommandConfigMap();
if (cfg == null) {
cfg = new HashMap<>();
}
TaskConfig taskConfig = context.getTaskConfig();
Map<String, String> taskConfigMap = taskConfig.getConfigMap();
if (taskConfigMap != null) {
cfg.putAll(taskConfigMap);
}
_delay = cfg.containsKey(JOB_DELAY) ? Long.parseLong(cfg.get(JOB_DELAY)) : 100L;
_notAllowToCancel = cfg.containsKey(NOT_ALLOW_TO_CANCEL)
? Boolean.parseBoolean(cfg.get(NOT_ALLOW_TO_CANCEL))
: false;
_taskResultStatus = cfg.containsKey(TASK_RESULT_STATUS) ?
TaskResult.Status.valueOf(cfg.get(TASK_RESULT_STATUS)) :
TaskResult.Status.COMPLETED;
_throwException = cfg.containsKey(THROW_EXCEPTION) ?
Boolean.valueOf(cfg.containsKey(THROW_EXCEPTION)) :
false;
_numOfFailBeforeSuccess =
cfg.containsKey(FAILURE_COUNT_BEFORE_SUCCESS) ? Integer.parseInt(
cfg.get(FAILURE_COUNT_BEFORE_SUCCESS)) : 0;
_numOfSuccessBeforeFail = cfg.containsKey(SUCCESS_COUNT_BEFORE_FAIL) ? Integer
.parseInt(cfg.get(SUCCESS_COUNT_BEFORE_FAIL)) : Integer.MAX_VALUE;
_errorMsg = cfg.containsKey(ERROR_MESSAGE) ? cfg.get(ERROR_MESSAGE) : null;
setTargetPartitionsConfigs(cfg, taskConfig.getTargetPartition());
}
示例16
private void setTargetPartitionsConfigs(Map<String, String> cfg, String targetPartition) {
if (cfg.containsKey(TARGET_PARTITION_CONFIG)) {
Map<String, Map<String, String>> targetPartitionConfigs =
deserializeTargetPartitionConfig(cfg.get(TARGET_PARTITION_CONFIG));
if (targetPartitionConfigs.containsKey(targetPartition)) {
Map<String, String> targetPartitionConfig = targetPartitionConfigs.get(targetPartition);
if (targetPartitionConfig.containsKey(JOB_DELAY)) {
_delay = Long.parseLong(targetPartitionConfig.get(JOB_DELAY));
}
if (targetPartitionConfig.containsKey(TASK_RESULT_STATUS)) {
_taskResultStatus = TaskResult.Status.valueOf(targetPartitionConfig.get(TASK_RESULT_STATUS));
}
}
}
}
示例17
@Override
public TaskResult run() {
long expiry = System.currentTimeMillis() + _delay;
long timeLeft;
while (System.currentTimeMillis() < expiry) {
if (_canceled && !_notAllowToCancel) {
timeLeft = expiry - System.currentTimeMillis();
return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
: timeLeft));
}
if (_signalFail) {
return new TaskResult(TaskResult.Status.FAILED, "Signaled to fail.");
}
sleep(10);
}
timeLeft = expiry - System.currentTimeMillis();
if (_throwException) {
_numOfFailBeforeSuccess--;
if (_errorMsg == null) {
_errorMsg = "Test failed";
}
throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test failed");
}
if (getUserContent(SUCCESS_COUNT_BEFORE_FAIL, Scope.WORKFLOW) != null) {
_numOfSuccessBeforeFail =
Integer.parseInt(getUserContent(SUCCESS_COUNT_BEFORE_FAIL, Scope.WORKFLOW));
}
putUserContent(SUCCESS_COUNT_BEFORE_FAIL, "" + --_numOfSuccessBeforeFail, Scope.WORKFLOW);
if (_numOfFailBeforeSuccess > 0 || _numOfSuccessBeforeFail < 0){
_numOfFailBeforeSuccess--;
throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test failed");
}
return new TaskResult(_taskResultStatus,
_errorMsg != null ? _errorMsg : String.valueOf(timeLeft < 0 ? 0 : timeLeft));
}
示例18
@Override public TaskResult run() {
putUserContent("ContentTest", "Value1", Scope.JOB);
putUserContent("ContentTest", "Value2", Scope.WORKFLOW);
putUserContent("ContentTest", "Value3", Scope.TASK);
if (!getUserContent("ContentTest", Scope.JOB).equals("Value1") || !getUserContent(
"ContentTest", Scope.WORKFLOW).equals("Value2") || !getUserContent("ContentTest",
Scope.TASK).equals("Value3")) {
return new TaskResult(TaskResult.Status.FAILED, null);
}
return new TaskResult(TaskResult.Status.COMPLETED, null);
}
示例19
@Override public TaskResult run() {
if (!getUserContent("RaceTest", Scope.WORKFLOW).equals("RaceValue")) {
return new TaskResult(TaskResult.Status.FAILED, null);
}
return new TaskResult(TaskResult.Status.COMPLETED, null);
}
示例20
@Override
public TaskResult run() {
try {
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
List<String> instanceNames = manager.getClusterManagmentTool().getInstancesInCluster(manager.getClusterName());
Map<String, String> statsWrappersJSON = new HashMap<>();
for (String instanceName : instanceNames) {
PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
HelixProperty record = helixDataAccessor.getProperty(keyBuilder.healthReport(instanceName, healthReportName));
if (record != null && record.getRecord() != null) {
statsWrappersJSON.put(instanceName, record.getRecord().getSimpleField(statsFieldName));
}
}
Pair<String, String> results = clusterAggregator.doWork(statsWrappersJSON, statsReportType);
String resultId = String.format("Aggregated_%s", healthReportName);
ZNRecord znRecord = new ZNRecord(resultId);
znRecord.setSimpleField(RAW_VALID_SIZE_FIELD_NAME, results.getFirst());
znRecord.setSimpleField(VALID_SIZE_FIELD_NAME, results.getSecond());
znRecord.setSimpleField(TIMESTAMP_FIELD_NAME, String.valueOf(SystemTime.getInstance().milliseconds()));
znRecord.setListField(ERROR_OCCURRED_INSTANCES_FIELD_NAME,
clusterAggregator.getExceptionOccurredInstances(statsReportType));
String path = String.format("/%s", resultId);
manager.getHelixPropertyStore().set(path, znRecord, AccessOption.PERSISTENT);
return new TaskResult(TaskResult.Status.COMPLETED, "Aggregation success");
} catch (Exception e) {
logger.error("Exception thrown while aggregating stats from health reports across all nodes ", e);
return new TaskResult(TaskResult.Status.FAILED, "Exception thrown");
}
}
示例21
@Test
public void testTaskNotStarted() throws InterruptedException {
setupUnbalancedDB();
final String BLOCK_WORKFLOW_NAME = "blockWorkflow";
final String FAIL_WORKFLOW_NAME = "failWorkflow";
final String FAIL_JOB_NAME = "failJob";
ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
final int numTask =
configAccessor.getClusterConfig(CLUSTER_NAME).getMaxConcurrentTaskPerInstance();
// Tasks targeting the unbalanced DB, the instance is setup to stuck on INIT->RUNNING, so it
// takes all threads
// on that instance.
JobConfig.Builder blockJobBuilder = new JobConfig.Builder().setWorkflow(BLOCK_WORKFLOW_NAME)
.setTargetResource(UNBALANCED_DB_NAME)
.setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
.setCommand(MockTask.TASK_COMMAND).setNumConcurrentTasksPerInstance(numTask);
Workflow.Builder blockWorkflowBuilder =
new Workflow.Builder(BLOCK_WORKFLOW_NAME).addJob("blockJob", blockJobBuilder);
_driver.start(blockWorkflowBuilder.build());
Assert.assertTrue(TaskTestUtil.pollForAllTasksBlock(_manager.getHelixDataAccessor(),
_blockedParticipant.getInstanceName(), numTask, 10000));
// Now, all HelixTask threads are stuck at INIT->RUNNING for task state transition(user task
// can't be submitted)
// New tasks assigned to the instance won't start INIT->RUNNING transition at all.
// A to-be-failed job, 2 tasks, 1 stuck and 1 fail, making the job fail.
JobConfig.Builder failJobBuilder =
new JobConfig.Builder().setWorkflow(FAIL_WORKFLOW_NAME).setTargetResource(DB_NAME)
.setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
.setCommand(MockTask.TASK_COMMAND).setJobCommandConfigMap(
ImmutableMap.of(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FAILED.name()));
Workflow.Builder failWorkflowBuilder =
new Workflow.Builder(FAIL_WORKFLOW_NAME).addJob(FAIL_JOB_NAME, failJobBuilder);
_driver.start(failWorkflowBuilder.build());
_driver.pollForJobState(FAIL_WORKFLOW_NAME,
TaskUtil.getNamespacedJobName(FAIL_WORKFLOW_NAME, FAIL_JOB_NAME), TaskState.FAILED);
_driver.pollForWorkflowState(FAIL_WORKFLOW_NAME, TaskState.FAILED);
JobContext jobContext =
_driver.getJobContext(TaskUtil.getNamespacedJobName(FAIL_WORKFLOW_NAME, FAIL_JOB_NAME));
for (int pId : jobContext.getPartitionSet()) {
String assignedParticipant = jobContext.getAssignedParticipant(pId);
if (assignedParticipant == null) {
continue; // May not have been assigned at all due to quota limitations
}
if (jobContext.getAssignedParticipant(pId).equals(_blockedParticipant.getInstanceName())) {
Assert.assertEquals(jobContext.getPartitionState(pId), TaskPartitionState.TASK_ABORTED);
} else if (assignedParticipant.equals(_normalParticipant.getInstanceName())) {
Assert.assertEquals(jobContext.getPartitionState(pId), TaskPartitionState.TASK_ERROR);
} else {
throw new HelixException("There should be only 2 instances, 1 blocked, 1 normal.");
}
}
}
示例22
@Test
public void test() throws Exception {
int taskRetryCount = 1;
int num_tasks = 5;
String jobResource = TestHelper.getTestMethodName();
JobConfig.Builder jobBuilder = new JobConfig.Builder();
jobBuilder.setCommand(MockTask.TASK_COMMAND).setTimeoutPerTask(10000)
.setMaxAttemptsPerTask(taskRetryCount).setFailureThreshold(Integer.MAX_VALUE);
// create each task configs.
final int abortedTask = 1;
final int failedTask = 2;
final int exceptionTask = 3;
final String abortedMsg = "This task aborted, some terrible things must happened.";
final String failedMsg = "This task failed, something may be wrong.";
final String exceptionMsg = "This task throws exception.";
final String successMsg = "Yes, we did it!";
List<TaskConfig> taskConfigs = new ArrayList<TaskConfig>();
for (int j = 0; j < num_tasks; j++) {
TaskConfig.Builder configBuilder = new TaskConfig.Builder().setTaskId("task_" + j);
switch (j) {
case abortedTask:
configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FATAL_FAILED.name())
.addConfig(MockTask.ERROR_MESSAGE, abortedMsg);
break;
case failedTask:
configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FAILED.name())
.addConfig(MockTask.ERROR_MESSAGE, failedMsg);
break;
case exceptionTask:
configBuilder.addConfig(MockTask.THROW_EXCEPTION, Boolean.TRUE.toString())
.addConfig(MockTask.ERROR_MESSAGE, exceptionMsg);
break;
default:
configBuilder.addConfig(MockTask.ERROR_MESSAGE, successMsg);
break;
}
configBuilder.setTargetPartition(String.valueOf(j));
taskConfigs.add(configBuilder.build());
}
jobBuilder.addTaskConfigs(taskConfigs);
Workflow flow =
WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
_driver.start(flow);
// Wait until the job completes.
_driver.pollForWorkflowState(jobResource, TaskState.COMPLETED);
JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
for (int i = 0; i < num_tasks; i++) {
TaskPartitionState state = ctx.getPartitionState(i);
String taskId = ctx.getTaskIdForPartition(i);
String errMsg = ctx.getPartitionInfo(i);
if (taskId.equals("task_" + abortedTask)) {
Assert.assertEquals(state, TaskPartitionState.TASK_ABORTED);
Assert.assertEquals(errMsg, abortedMsg);
} else if (taskId.equals("task_" + failedTask)) {
Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
Assert.assertEquals(errMsg, failedMsg);
} else if (taskId.equals("task_" + exceptionTask)) {
Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
Assert.assertTrue(errMsg.contains(exceptionMsg));
} else {
Assert.assertEquals(state, TaskPartitionState.COMPLETED);
Assert.assertEquals(errMsg, successMsg);
}
}
}
示例23
@BeforeClass
public void beforeClass() throws Exception {
_participants = new MockParticipantManager[_numNodes];
_gSetupTool.addCluster(CLUSTER_NAME, true);
for (int i = 0; i < _numNodes; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
}
// start dummy participants
for (int i = 0; i < _numNodes; i++) {
final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
// Set task callbacks
Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
taskFactoryReg.put("TaskOne", context -> new TaskOne(context, instanceName));
taskFactoryReg.put("TaskTwo", context -> new TaskTwo(context, instanceName));
taskFactoryReg.put("ControllableFailTask", context -> new Task() {
@Override
public TaskResult run() {
if (_failureCtl.get()) {
return new TaskResult(Status.FAILED, null);
} else {
return new TaskResult(Status.COMPLETED, null);
}
}
@Override
public void cancel() {
}
});
taskFactoryReg.put("SingleFailTask", context -> new SingleFailTask());
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
// Register a Task state model factory.
StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
stateMachine.registerStateModelFactory("Task",
new TaskStateModelFactory(_participants[i], taskFactoryReg));
_participants[i].syncStart();
}
// Start controller
String controllerName = CONTROLLER_PREFIX + "_0";
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
// Start an admin connection
_manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
InstanceType.ADMINISTRATOR, ZK_ADDR);
_manager.connect();
_driver = new TaskDriver(_manager);
}
示例24
@Test
public void test() throws Exception {
int taskRetryCount = 5;
int num_tasks = 5;
String jobResource = TestHelper.getTestMethodName();
JobConfig.Builder jobBuilder = new JobConfig.Builder();
jobBuilder.setCommand(MockTask.TASK_COMMAND).setTimeoutPerTask(10000)
.setMaxAttemptsPerTask(taskRetryCount).setFailureThreshold(Integer.MAX_VALUE);
// create each task configs.
final int abortedTask = 1;
final int failedTask = 2;
final int exceptionTask = 3;
List<TaskConfig> taskConfigs = new ArrayList<>();
for (int j = 0; j < num_tasks; j++) {
TaskConfig.Builder configBuilder = new TaskConfig.Builder().setTaskId("task_" + j);
switch (j) {
case abortedTask:
configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FATAL_FAILED.name());
break;
case failedTask:
configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FAILED.name());
break;
case exceptionTask:
configBuilder.addConfig(MockTask.THROW_EXCEPTION, Boolean.TRUE.toString());
break;
default:
break;
}
configBuilder.setTargetPartition(String.valueOf(j));
taskConfigs.add(configBuilder.build());
}
jobBuilder.addTaskConfigs(taskConfigs);
Workflow flow =
WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
_driver.start(flow);
// Wait until the job completes.
_driver.pollForWorkflowState(jobResource, TaskState.COMPLETED);
JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
for (int i = 0; i < num_tasks; i++) {
TaskPartitionState state = ctx.getPartitionState(i);
int retriedCount = ctx.getPartitionNumAttempts(i);
String taskId = ctx.getTaskIdForPartition(i);
if (taskId.equals("task_" + abortedTask)) {
Assert.assertEquals(state, TaskPartitionState.TASK_ABORTED);
Assert.assertEquals(retriedCount, 1);
} else if (taskId.equals("task_" + failedTask) || taskId.equals("task_" + exceptionTask)) {
Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
Assert.assertEquals(taskRetryCount, retriedCount);
} else {
Assert.assertEquals(state, TaskPartitionState.COMPLETED);
Assert.assertEquals(retriedCount, 1);
}
}
}
示例25
@Override public TaskResult run() {
putUserContent("RaceTest", "RaceValue", Scope.WORKFLOW);
return new TaskResult(TaskResult.Status.COMPLETED, null);
}