@Override
public Properties submitJob(Properties jobProps) throws IOException {
EMRJobConfig emrJobConfig = new EMRJobConfig(jobProps);
Utils.checkNotNull(emrJobConfig.getClusterId(), "EMR Cluster Id");
StepConfig stepConfig = new StepConfig()
.withName(emrJobConfig.getJobName())
.withActionOnFailure(ActionOnFailure.CONTINUE) // check if action on failure needs to be configurable
.withHadoopJarStep(new HadoopJarStepConfig()
.withJar(emrJobConfig.getDriverJarPath())
.withMainClass(emrJobConfig.getDriverMainClass()).withArgs(
emrJobConfig.getArchives(),
emrJobConfig.getLibjars(),
emrJobConfig.getUniquePrefix(),
emrJobConfig.getJavaOpts(),
emrJobConfig.getLogLevel()
));
LOG.debug("Step config is {}", stepConfig.toString());
AddJobFlowStepsResult addJobFlowStepsResult = getEmrClient(emrClusterConfig).addJobFlowSteps(
new AddJobFlowStepsRequest()
.withJobFlowId(emrJobConfig.getClusterId())
.withSteps(stepConfig));
String stepId = addJobFlowStepsResult.getStepIds().get(0);
jobProps.setProperty("stepId", stepId);
return jobProps;
}
private Submitter existingClusterSubmitter(AmazonElasticMapReduce emr, String tag, StepCompiler stepCompiler, String clusterId, Filer filer)
{
return () -> {
List<String> stepIds = pollingRetryExecutor(state, "submission")
.retryUnless(AmazonServiceException.class, Aws::isDeterministicException)
.withRetryInterval(DurationInterval.of(Duration.ofSeconds(30), Duration.ofMinutes(5)))
.runOnce(new TypeReference<List<String>>() {}, s -> {
RemoteFile runner = prepareRunner(filer, tag);
// Compile steps
stepCompiler.compile(runner);
// Stage files to S3
filer.stageFiles();
AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
.withJobFlowId(clusterId)
.withSteps(stepCompiler.stepConfigs());
int steps = request.getSteps().size();
logger.info("Submitting {} EMR step(s) to {}", steps, clusterId);
AddJobFlowStepsResult result = emr.addJobFlowSteps(request);
logSubmittedSteps(clusterId, steps, i -> request.getSteps().get(i).getName(), i -> result.getStepIds().get(i));
return ImmutableList.copyOf(result.getStepIds());
});
return SubmissionResult.ofExistingCluster(clusterId, stepIds);
};
}
protected String fireEMRJob(String paramsStr,String clusterId){
StepFactory stepFactory = new StepFactory();
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient();
emr.setRegion(Region.getRegion(Regions.fromName(System.getenv().get("AWS_REGION"))));
Application sparkConfig = new Application()
.withName("Spark");
String[] params = paramsStr.split(",");
StepConfig enabledebugging = new StepConfig()
.withName("Enable debugging")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(stepFactory.newEnableDebuggingStep());
HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs(params);
final StepConfig sparkStep = new StepConfig()
.withName("Spark Step")
.withActionOnFailure("CONTINUE")
.withHadoopJarStep(sparkStepConf);
AddJobFlowStepsRequest request = new AddJobFlowStepsRequest(clusterId)
.withSteps(new ArrayList<StepConfig>(){{add(sparkStep);}});
AddJobFlowStepsResult result = emr.addJobFlowSteps(request);
return result.getStepIds().get(0);
}