Java源码示例:org.apache.hadoop.util.StopWatch
示例1
private void doPerfTest(int editsSize, int numEdits) throws Exception {
byte[] data = new byte[editsSize];
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
StopWatch sw = new StopWatch().start();
for (int i = 1; i < numEdits; i++) {
ch.sendEdits(1L, i, 1, data).get();
}
long time = sw.now(TimeUnit.MILLISECONDS);
System.err.println("Wrote " + numEdits + " batches of " + editsSize +
" bytes in " + time + "ms");
float avgRtt = (float)time/(float)numEdits;
long throughput = ((long)numEdits * editsSize * 1000L)/time;
System.err.println("Time per batch: " + avgRtt + "ms");
System.err.println("Throughput: " + throughput + " bytes/sec");
}
示例2
private void doPerfTest(int editsSize, int numEdits) throws Exception {
byte[] data = new byte[editsSize];
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
StopWatch sw = new StopWatch().start();
for (int i = 1; i < numEdits; i++) {
ch.sendEdits(1L, i, 1, data).get();
}
long time = sw.now(TimeUnit.MILLISECONDS);
System.err.println("Wrote " + numEdits + " batches of " + editsSize +
" bytes in " + time + "ms");
float avgRtt = (float)time/(float)numEdits;
long throughput = ((long)numEdits * editsSize * 1000L)/time;
System.err.println("Time per batch: " + avgRtt + "ms");
System.err.println("Throughput: " + throughput + " bytes/sec");
}
示例3
private void doAWrite() throws IOException {
StopWatch sw = new StopWatch().start();
stm.write(toWrite);
stm.hflush();
long micros = sw.now(TimeUnit.MICROSECONDS);
quantiles.insert(micros);
}
示例4
public int run(String args[]) throws Exception {
if (args.length != 1) {
System.err.println(
"usage: " + TestMultiThreadedHflush.class.getSimpleName() +
" <path to test file> ");
System.err.println(
"Configurations settable by -D options:\n" +
" num.threads [default 10] - how many threads to run\n" +
" write.size [default 511] - bytes per write\n" +
" num.writes [default 50000] - how many writes to perform");
System.exit(1);
}
TestMultiThreadedHflush test = new TestMultiThreadedHflush();
Configuration conf = getConf();
Path p = new Path(args[0]);
int numThreads = conf.getInt("num.threads", 10);
int writeSize = conf.getInt("write.size", 511);
int numWrites = conf.getInt("num.writes", 50000);
int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
StopWatch sw = new StopWatch().start();
test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
replication);
sw.stop();
System.out.println("Finished in " + sw.now(TimeUnit.MILLISECONDS) + "ms");
System.out.println("Latency quantiles (in microseconds):\n" +
test.quantiles);
return 0;
}
示例5
private void doAWrite() throws IOException {
StopWatch sw = new StopWatch().start();
stm.write(toWrite);
stm.hflush();
long micros = sw.now(TimeUnit.MICROSECONDS);
quantiles.insert(micros);
}
示例6
public int run(String args[]) throws Exception {
if (args.length != 1) {
System.err.println(
"usage: " + TestMultiThreadedHflush.class.getSimpleName() +
" <path to test file> ");
System.err.println(
"Configurations settable by -D options:\n" +
" num.threads [default 10] - how many threads to run\n" +
" write.size [default 511] - bytes per write\n" +
" num.writes [default 50000] - how many writes to perform");
System.exit(1);
}
TestMultiThreadedHflush test = new TestMultiThreadedHflush();
Configuration conf = getConf();
Path p = new Path(args[0]);
int numThreads = conf.getInt("num.threads", 10);
int writeSize = conf.getInt("write.size", 511);
int numWrites = conf.getInt("num.writes", 50000);
int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
StopWatch sw = new StopWatch().start();
test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
replication);
sw.stop();
System.out.println("Finished in " + sw.now(TimeUnit.MILLISECONDS) + "ms");
System.out.println("Latency quantiles (in microseconds):\n" +
test.quantiles);
return 0;
}
示例7
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = null;
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
StopWatch sw = new StopWatch().start();
if (numThreads == 1) {
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Lists.newArrayList(locatedFiles);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: "
+ sw.now(TimeUnit.MILLISECONDS));
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
示例8
/**
* Write a batch of edits to the journal.
* {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
*/
synchronized void journal(RequestInfo reqInfo,
long segmentTxId, long firstTxnId,
int numTxns, byte[] records) throws IOException {
checkFormatted();
checkWriteRequest(reqInfo);
checkSync(curSegment != null,
"Can't write, no segment open");
if (curSegmentTxId != segmentTxId) {
// Sanity check: it is possible that the writer will fail IPCs
// on both the finalize() and then the start() of the next segment.
// This could cause us to continue writing to an old segment
// instead of rolling to a new one, which breaks one of the
// invariants in the design. If it happens, abort the segment
// and throw an exception.
JournalOutOfSyncException e = new JournalOutOfSyncException(
"Writer out of sync: it thinks it is writing segment " + segmentTxId
+ " but current segment is " + curSegmentTxId);
abortCurSegment();
throw e;
}
checkSync(nextTxId == firstTxnId,
"Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
long lastTxnId = firstTxnId + numTxns - 1;
if (LOG.isTraceEnabled()) {
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
}
// If the edit has already been marked as committed, we know
// it has been fsynced on a quorum of other nodes, and we are
// "catching up" with the rest. Hence we do not need to fsync.
boolean isLagging = lastTxnId <= committedTxnId.get();
boolean shouldFsync = !isLagging;
curSegment.writeRaw(records, 0, records.length);
curSegment.setReadyToFlush();
StopWatch sw = new StopWatch();
sw.start();
curSegment.flush(shouldFsync);
sw.stop();
long nanoSeconds = sw.now();
metrics.addSync(
TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
long milliSeconds = TimeUnit.MILLISECONDS.convert(
nanoSeconds, TimeUnit.NANOSECONDS);
if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
" took " + milliSeconds + "ms");
}
if (isLagging) {
// This batch of edits has already been committed on a quorum of other
// nodes. So, we are in "catch up" mode. This gets its own metric.
metrics.batchesWrittenWhileLagging.incr(1);
}
metrics.batchesWritten.incr(1);
metrics.bytesWritten.incr(records.length);
metrics.txnsWritten.incr(numTxns);
highestWrittenTxId = lastTxnId;
nextTxId = lastTxnId + 1;
}
示例9
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = null;
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
StopWatch sw = new StopWatch().start();
if (numThreads == 1) {
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Lists.newArrayList(locatedFiles);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: "
+ sw.now(TimeUnit.MILLISECONDS));
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
示例10
/**
* Write a batch of edits to the journal.
* {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
*/
synchronized void journal(RequestInfo reqInfo,
long segmentTxId, long firstTxnId,
int numTxns, byte[] records) throws IOException {
checkFormatted();
checkWriteRequest(reqInfo);
checkSync(curSegment != null,
"Can't write, no segment open");
if (curSegmentTxId != segmentTxId) {
// Sanity check: it is possible that the writer will fail IPCs
// on both the finalize() and then the start() of the next segment.
// This could cause us to continue writing to an old segment
// instead of rolling to a new one, which breaks one of the
// invariants in the design. If it happens, abort the segment
// and throw an exception.
JournalOutOfSyncException e = new JournalOutOfSyncException(
"Writer out of sync: it thinks it is writing segment " + segmentTxId
+ " but current segment is " + curSegmentTxId);
abortCurSegment();
throw e;
}
checkSync(nextTxId == firstTxnId,
"Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
long lastTxnId = firstTxnId + numTxns - 1;
if (LOG.isTraceEnabled()) {
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
}
// If the edit has already been marked as committed, we know
// it has been fsynced on a quorum of other nodes, and we are
// "catching up" with the rest. Hence we do not need to fsync.
boolean isLagging = lastTxnId <= committedTxnId.get();
boolean shouldFsync = !isLagging;
curSegment.writeRaw(records, 0, records.length);
curSegment.setReadyToFlush();
StopWatch sw = new StopWatch();
sw.start();
curSegment.flush(shouldFsync);
sw.stop();
long nanoSeconds = sw.now();
metrics.addSync(
TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
long milliSeconds = TimeUnit.MILLISECONDS.convert(
nanoSeconds, TimeUnit.NANOSECONDS);
if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
" took " + milliSeconds + "ms");
}
if (isLagging) {
// This batch of edits has already been committed on a quorum of other
// nodes. So, we are in "catch up" mode. This gets its own metric.
metrics.batchesWrittenWhileLagging.incr(1);
}
metrics.batchesWritten.incr(1);
metrics.bytesWritten.incr(records.length);
metrics.txnsWritten.incr(numTxns);
highestWrittenTxId = lastTxnId;
nextTxId = lastTxnId + 1;
}