Java源码示例:org.apache.hadoop.yarn.util.RackResolver
示例1
private Set<String> resolveRacks(List<String> nodes) {
Set<String> racks = new HashSet<String>();
if (nodes != null) {
for (String node : nodes) {
// Ensure node requests are accompanied by requests for
// corresponding rack
String rack = RackResolver.resolve(node).getNetworkLocation();
if (rack == null) {
LOG.warn("Failed to resolve rack for node " + node + ".");
} else {
racks.add(rack);
}
}
}
return racks;
}
示例2
private void computeRackAndLocality() {
NodeId containerNodeId = container.getNodeId();
nodeRackName = RackResolver.resolve(
containerNodeId.getHost()).getNetworkLocation();
locality = Locality.OFF_SWITCH;
if (dataLocalHosts.size() > 0) {
String cHost = resolveHost(containerNodeId.getHost());
if (dataLocalHosts.contains(cHost)) {
locality = Locality.NODE_LOCAL;
}
}
if (locality == Locality.OFF_SWITCH) {
if (dataLocalRacks.contains(nodeRackName)) {
locality = Locality.RACK_LOCAL;
}
}
}
示例3
private Set<String> resolveRacks(List<String> nodes) {
Set<String> racks = new HashSet<String>();
if (nodes != null) {
for (String node : nodes) {
// Ensure node requests are accompanied by requests for
// corresponding rack
String rack = RackResolver.resolve(node).getNetworkLocation();
if (rack == null) {
LOG.warn("Failed to resolve rack for node " + node + ".");
} else {
racks.add(rack);
}
}
}
return racks;
}
示例4
private void computeRackAndLocality() {
NodeId containerNodeId = container.getNodeId();
nodeRackName = RackResolver.resolve(
containerNodeId.getHost()).getNetworkLocation();
locality = Locality.OFF_SWITCH;
if (dataLocalHosts.size() > 0) {
String cHost = resolveHost(containerNodeId.getHost());
if (dataLocalHosts.contains(cHost)) {
locality = Locality.NODE_LOCAL;
}
}
if (locality == Locality.OFF_SWITCH) {
if (dataLocalRacks.contains(nodeRackName)) {
locality = Locality.RACK_LOCAL;
}
}
}
示例5
@SuppressWarnings("rawtypes")
public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
this.eventHandler = eventHandler;
//Reported status
this.conf = conf;
this.clock = clock;
this.taskHeartbeatHandler = taskHeartbeatHandler;
this.appContext = appContext;
this.reportedStatus = new TaskAttemptStatus();
initTaskAttemptStatus(reportedStatus);
RackResolver.init(conf);
this.stateMachine = stateMachineFactory.make(this);
this.isRescheduled = isRescheduled;
this.taskResource = resource;
this.containerContext = containerContext;
this.leafVertex = leafVertex;
}
示例6
HeldContainer(Container container,
long nextScheduleTime,
long containerExpiryTime,
CookieContainerRequest firstTaskInfo,
ContainerSignatureMatcher signatureMatcher) {
this.container = container;
this.nextScheduleTime = nextScheduleTime;
if (firstTaskInfo != null) {
this.lastTaskInfo = firstTaskInfo;
this.lastAssignedContainerSignature = firstTaskInfo.getCookie().getContainerSignature();
}
this.localityMatchLevel = LocalityMatchLevel.NODE;
this.containerExpiryTime = containerExpiryTime;
this.rack = RackResolver.resolve(container.getNodeId().getHost())
.getNetworkLocation();
this.signatureMatcher = signatureMatcher;
}
示例7
@Override
protected void serviceInit(Configuration conf) throws Exception {
resourceTrackerAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
RackResolver.init(conf);
nextHeartBeatInterval =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
if (nextHeartBeatInterval <= 0) {
throw new YarnRuntimeException("Invalid Configuration. "
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
+ " should be larger than 0.");
}
minAllocMb = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
minAllocVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
minAllocGcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_GCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_GCORES);
minimumNodeManagerVersion = conf.get(
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
super.serviceInit(conf);
}
示例8
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
reduceSlowStart = conf.getFloat(
MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART);
maxReduceRampupLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_RAMPUP_UP_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT);
maxReducePreemptionLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
allocationDelayThresholdMs = conf.getInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
MRJobConfig.DEFAULT_JOB_RUNNING_MAP_LIMIT);
maxRunningReduces = conf.getInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT,
MRJobConfig.DEFAULT_JOB_RUNNING_REDUCE_LIMIT);
RackResolver.init(conf);
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
mapNodeLabelExpression = conf.get(MRJobConfig.MAP_NODE_LABEL_EXP);
reduceNodeLabelExpression = conf.get(MRJobConfig.REDUCE_NODE_LABEL_EXP);
// Init startTime to current time. If all goes well, it will be reset after
// first attempt to contact RM.
retrystartTime = System.currentTimeMillis();
}
示例9
@Override
protected void serviceInit(Configuration conf) throws Exception {
resourceTrackerAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
RackResolver.init(conf);
nextHeartBeatInterval =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
if (nextHeartBeatInterval <= 0) {
throw new YarnRuntimeException("Invalid Configuration. "
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
+ " should be larger than 0.");
}
minAllocMb = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
minAllocVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
minimumNodeManagerVersion = conf.get(
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
super.serviceInit(conf);
}
示例10
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
//this.reduceScheduler.serviceInit(conf);
reduceSlowStart = conf.getFloat(
MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART);
maxReduceRampupLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_RAMPUP_UP_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT);
maxReducePreemptionLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
allocationDelayThresholdMs = conf.getInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
MRJobConfig.DEFAULT_JOB_RUNNING_MAP_LIMIT);
maxRunningReduces = conf.getInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT,
MRJobConfig.DEFAULT_JOB_RUNNING_REDUCE_LIMIT);
RackResolver.init(conf);
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
// Init startTime to current time. If all goes well, it will be reset after
// first attempt to contact RM.
retrystartTime = System.currentTimeMillis();
}
示例11
private void addLeafTask(TaskAttemptToSchedulerEvent event) {
TaskAttempt taskAttempt = event.getTaskAttempt();
List<DataLocation> locations = taskAttempt.getTask().getDataLocations();
for (DataLocation location : locations) {
String host = location.getHost();
leafTaskHosts.add(host);
HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
if (hostVolumeMapping == null) {
String rack = RackResolver.resolve(host).getNetworkLocation();
hostVolumeMapping = new HostVolumeMapping(host, rack);
leafTaskHostMapping.put(host, hostVolumeMapping);
}
hostVolumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt);
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to host " + host);
}
HashSet<TaskAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
if (list == null) {
list = new HashSet<>();
leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
}
list.add(taskAttempt.getId());
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
}
}
leafTasks.add(taskAttempt.getId());
}
示例12
private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
// update the fragment maps per host
String normalizeHost = NetUtils.normalizeHost(host);
Map<Integer, FragmentsPerDisk> diskFragmentMap;
if (fragmentHostMapping.containsKey(normalizeHost)) {
diskFragmentMap = fragmentHostMapping.get(normalizeHost);
} else {
diskFragmentMap = new HashMap<Integer, FragmentsPerDisk>();
fragmentHostMapping.put(normalizeHost, diskFragmentMap);
}
FragmentsPerDisk fragmentsPerDisk;
if (diskFragmentMap.containsKey(diskId)) {
fragmentsPerDisk = diskFragmentMap.get(diskId);
} else {
fragmentsPerDisk = new FragmentsPerDisk(diskId);
diskFragmentMap.put(diskId, fragmentsPerDisk);
}
fragmentsPerDisk.addFragmentPair(fragmentPair);
// update the fragment maps per rack
String rack = RackResolver.resolve(normalizeHost).getNetworkLocation();
Set<FragmentPair> fragmentPairList;
if (rackFragmentMapping.containsKey(rack)) {
fragmentPairList = rackFragmentMapping.get(rack);
} else {
fragmentPairList = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
rackFragmentMapping.put(rack, fragmentPairList);
}
fragmentPairList.add(fragmentPair);
}
示例13
@Override
public void removeFragment(FragmentPair fragmentPair) {
boolean removed = false;
for (String eachHost : fragmentPair.getLeftFragment().getHosts()) {
String normalizedHost = NetUtils.normalizeHost(eachHost);
Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
for (Entry<Integer, FragmentsPerDisk> entry : diskFragmentMap.entrySet()) {
FragmentsPerDisk fragmentsPerDisk = entry.getValue();
removed = fragmentsPerDisk.removeFragmentPair(fragmentPair);
if (removed) {
if (fragmentsPerDisk.size() == 0) {
diskFragmentMap.remove(entry.getKey());
}
if (diskFragmentMap.size() == 0) {
fragmentHostMapping.remove(normalizedHost);
}
break;
}
}
String rack = RackResolver.resolve(normalizedHost).getNetworkLocation();
if (rackFragmentMapping.containsKey(rack)) {
Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
fragmentPairs.remove(fragmentPair);
if (fragmentPairs.size() == 0) {
rackFragmentMapping.remove(rack);
}
}
}
if (removed) {
fragmentNum--;
}
}
示例14
/**
* Randomly select a fragment among the fragments stored on nodes of the same rack with the host.
* @param host
* @return a randomly selected fragment
*/
@Override
public FragmentPair getRackLocalFragment(String host) {
String rack = RackResolver.resolve(host).getNetworkLocation();
if (rackFragmentMapping.containsKey(rack)) {
Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
if (!fragmentPairs.isEmpty()) {
return fragmentPairs.iterator().next();
}
}
return null;
}
示例15
@Override
public void init(Configuration conf) {
this.systemConf = (TajoConf)conf;
RackResolver.init(systemConf);
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
super.init(conf);
}
示例16
private void addLeafTask(QueryUnitAttemptScheduleEvent event) {
QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt();
List<DataLocation> locations = queryUnitAttempt.getQueryUnit().getDataLocations();
for (DataLocation location : locations) {
String host = location.getHost();
HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
if (hostVolumeMapping == null) {
String rack = RackResolver.resolve(host).getNetworkLocation();
hostVolumeMapping = new HostVolumeMapping(host, rack);
leafTaskHostMapping.put(host, hostVolumeMapping);
}
hostVolumeMapping.addQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to host " + host);
}
LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
if (list == null) {
list = new LinkedList<QueryUnitAttemptId>();
leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
}
if(!list.contains(queryUnitAttempt.getId())) list.add(queryUnitAttempt.getId());
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
}
}
leafTasks.add(queryUnitAttempt.getId());
}
示例17
public String resolve(String host) {
if (hostRackMap.containsKey(host)) {
return hostRackMap.get(host);
} else {
String rack = RackResolver.resolve(host).getNetworkLocation();
hostRackMap.put(host, rack);
return rack;
}
}
示例18
@Override
public CookieContainerRequest assignNewContainer(Container container) {
String location = RackResolver.resolve(container.getNodeId().getHost())
.getNetworkLocation();
CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
location);
doBookKeepingForAssignedContainer(assigned, container, location, false);
return assigned;
}
示例19
HeldContainer(Container container,
long nextScheduleTime,
long containerExpiryTime,
CookieContainerRequest firstTaskInfo) {
this.container = container;
this.nextScheduleTime = nextScheduleTime;
if (firstTaskInfo != null) {
this.lastTaskInfo = firstTaskInfo;
this.firstContainerSignature = firstTaskInfo.getCookie().getContainerSignature();
}
this.localityMatchLevel = LocalityMatchLevel.NODE;
this.containerExpiryTime = containerExpiryTime;
this.rack = RackResolver.resolve(container.getNodeId().getHost())
.getNetworkLocation();
}
示例20
@Override
public CookieContainerRequest assignNewContainer(Container container) {
String location = RackResolver.resolve(container.getNodeId().getHost())
.getNetworkLocation();
CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
location);
doBookKeepingForAssignedContainer(assigned, container, location, false);
return assigned;
}
示例21
@SuppressWarnings("unchecked")
@Before
public void setup() {
amrmClient = new TezAMRMClientAsync(new AMRMClientImpl(),
1000, mock(AMRMClientAsync.CallbackHandler.class));
RackResolver.init(new Configuration());
}
示例22
@Override
protected void serviceInit(Configuration conf) throws Exception {
RackResolver.init(conf);
super.serviceInit(conf);
}
示例23
public TaskAttemptImpl(TaskId taskId, int i,
EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
JobConf conf, String[] dataLocalHosts,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
AppContext appContext) {
oldJobId = TypeConverter.fromYarn(taskId.getJobId());
this.conf = conf;
this.clock = clock;
attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
attemptId.setTaskId(taskId);
attemptId.setId(i);
this.taskAttemptListener = taskAttemptListener;
this.appContext = appContext;
// Initialize reportedStatus
reportedStatus = new TaskAttemptStatus();
initTaskAttemptStatus(reportedStatus);
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
this.credentials = credentials;
this.jobToken = jobToken;
this.eventHandler = eventHandler;
this.jobFile = jobFile;
this.partition = partition;
//TODO:create the resource reqt for this Task attempt
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
this.resourceCapability.setMemory(
getMemoryRequired(conf, taskId.getTaskType()));
this.resourceCapability.setVirtualCores(
getCpuRequired(conf, taskId.getTaskType()));
this.resourceCapability.setGpuCores(
getGpuRequired(conf, taskId.getTaskType()));
this.dataLocalHosts = resolveHosts(dataLocalHosts);
RackResolver.init(conf);
this.dataLocalRacks = new HashSet<String>();
for (String host : this.dataLocalHosts) {
this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
}
locality = Locality.OFF_SWITCH;
avataar = Avataar.VIRGIN;
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
}
示例24
@Test(timeout = 50000)
public void testScanningOldDirs() throws Exception {
LOG.info("STARTING testScanningOldDirs");
try {
Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
hfm.init(conf);
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
Assert.assertNotNull("Unable to locate job history", fileInfo);
// force the manager to "forget" the job
hfm.deleteJobFromJobListCache(fileInfo);
final int msecPerSleep = 10;
int msecToSleep = 10 * 1000;
while (fileInfo.isMovePending() && msecToSleep > 0) {
Assert.assertTrue(!fileInfo.didMoveFail());
msecToSleep -= msecPerSleep;
Thread.sleep(msecPerSleep);
}
Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
fileInfo = hfm.getFileInfo(jobId);
hfm.stop();
Assert.assertNotNull("Unable to locate old job history", fileInfo);
Assert.assertTrue("HistoryFileManager not shutdown properly",
hfm.moveToDoneExecutor.isTerminated());
} finally {
LOG.info("FINISHED testScanningOldDirs");
}
}
示例25
/**
* test clean old history files. Files should be deleted after 1 week by
* default.
*/
@Test(timeout = 15000)
public void testDeleteFileInfo() throws Exception {
LOG.info("STARTING testDeleteFileInfo");
try {
Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
HistoryFileManager hfm = new HistoryFileManager();
hfm.init(conf);
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
hfm.initExisting();
// wait for move files form the done_intermediate directory to the gone
// directory
while (fileInfo.isMovePending()) {
Thread.sleep(300);
}
Assert.assertNotNull(hfm.jobListCache.values());
// try to remove fileInfo
hfm.clean();
// check that fileInfo does not deleted
Assert.assertFalse(fileInfo.isDeleted());
// correct live time
hfm.setMaxHistoryAge(-1);
hfm.clean();
hfm.stop();
Assert.assertTrue("Thread pool shutdown",
hfm.moveToDoneExecutor.isTerminated());
// should be deleted !
Assert.assertTrue("file should be deleted ", fileInfo.isDeleted());
} finally {
LOG.info("FINISHED testDeleteFileInfo");
}
}
示例26
/**
* Simple test some methods of JobHistory
*/
@Test(timeout = 20000)
public void testJobHistoryMethods() throws Exception {
LOG.info("STARTING testJobHistoryMethods");
try {
Configuration configuration = new Configuration();
configuration
.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(configuration);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
true);
app.submit(configuration);
Job job = app.getContext().getAllJobs().values().iterator().next();
app.waitForState(job, JobState.SUCCEEDED);
JobHistory jobHistory = new JobHistory();
jobHistory.init(configuration);
// Method getAllJobs
Assert.assertEquals(1, jobHistory.getAllJobs().size());
// and with ApplicationId
Assert.assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size());
JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default",
0L, System.currentTimeMillis() + 1, 0L,
System.currentTimeMillis() + 1, JobState.SUCCEEDED);
Assert.assertEquals(1, jobsinfo.getJobs().size());
Assert.assertNotNull(jobHistory.getApplicationAttemptId());
// test Application Id
Assert.assertEquals("application_0_0000", jobHistory.getApplicationID()
.toString());
Assert
.assertEquals("Job History Server", jobHistory.getApplicationName());
// method does not work
Assert.assertNull(jobHistory.getEventHandler());
// method does not work
Assert.assertNull(jobHistory.getClock());
// method does not work
Assert.assertNull(jobHistory.getClusterInfo());
} finally {
LOG.info("FINISHED testJobHistoryMethods");
}
}
示例27
@Override
protected void serviceInit(Configuration conf) throws Exception {
RackResolver.init(conf);
super.serviceInit(conf);
}
示例28
public TaskAttemptImpl(TaskId taskId, int i,
EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
JobConf conf, String[] dataLocalHosts,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
AppContext appContext) {
oldJobId = TypeConverter.fromYarn(taskId.getJobId());
this.conf = conf;
this.clock = clock;
attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
attemptId.setTaskId(taskId);
attemptId.setId(i);
this.taskAttemptListener = taskAttemptListener;
this.appContext = appContext;
// Initialize reportedStatus
reportedStatus = new TaskAttemptStatus();
initTaskAttemptStatus(reportedStatus);
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
this.credentials = credentials;
this.jobToken = jobToken;
this.eventHandler = eventHandler;
this.jobFile = jobFile;
this.partition = partition;
//TODO:create the resource reqt for this Task attempt
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
this.resourceCapability.setMemory(
getMemoryRequired(conf, taskId.getTaskType()));
this.resourceCapability.setVirtualCores(
getCpuRequired(conf, taskId.getTaskType()));
this.dataLocalHosts = resolveHosts(dataLocalHosts);
RackResolver.init(conf);
this.dataLocalRacks = new HashSet<String>();
for (String host : this.dataLocalHosts) {
this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
}
locality = Locality.OFF_SWITCH;
avataar = Avataar.VIRGIN;
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
}
示例29
@Test(timeout = 50000)
public void testScanningOldDirs() throws Exception {
LOG.info("STARTING testScanningOldDirs");
try {
Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
hfm.init(conf);
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
Assert.assertNotNull("Unable to locate job history", fileInfo);
// force the manager to "forget" the job
hfm.deleteJobFromJobListCache(fileInfo);
final int msecPerSleep = 10;
int msecToSleep = 10 * 1000;
while (fileInfo.isMovePending() && msecToSleep > 0) {
Assert.assertTrue(!fileInfo.didMoveFail());
msecToSleep -= msecPerSleep;
Thread.sleep(msecPerSleep);
}
Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
fileInfo = hfm.getFileInfo(jobId);
hfm.stop();
Assert.assertNotNull("Unable to locate old job history", fileInfo);
Assert.assertTrue("HistoryFileManager not shutdown properly",
hfm.moveToDoneExecutor.isTerminated());
} finally {
LOG.info("FINISHED testScanningOldDirs");
}
}
示例30
/**
* test clean old history files. Files should be deleted after 1 week by
* default.
*/
@Test(timeout = 15000)
public void testDeleteFileInfo() throws Exception {
LOG.info("STARTING testDeleteFileInfo");
try {
Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
HistoryFileManager hfm = new HistoryFileManager();
hfm.init(conf);
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
hfm.initExisting();
// wait for move files form the done_intermediate directory to the gone
// directory
while (fileInfo.isMovePending()) {
Thread.sleep(300);
}
Assert.assertNotNull(hfm.jobListCache.values());
// try to remove fileInfo
hfm.clean();
// check that fileInfo does not deleted
Assert.assertFalse(fileInfo.isDeleted());
// correct live time
hfm.setMaxHistoryAge(-1);
hfm.clean();
hfm.stop();
Assert.assertTrue("Thread pool shutdown",
hfm.moveToDoneExecutor.isTerminated());
// should be deleted !
Assert.assertTrue("file should be deleted ", fileInfo.isDeleted());
} finally {
LOG.info("FINISHED testDeleteFileInfo");
}
}