Java源码示例:org.apache.hadoop.util.StopWatch

示例1
private void doPerfTest(int editsSize, int numEdits) throws Exception {
  byte[] data = new byte[editsSize];
  ch.newEpoch(1).get();
  ch.setEpoch(1);
  ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
  
  StopWatch sw = new StopWatch().start();
  for (int i = 1; i < numEdits; i++) {
    ch.sendEdits(1L, i, 1, data).get();
  }
  long time = sw.now(TimeUnit.MILLISECONDS);
  
  System.err.println("Wrote " + numEdits + " batches of " + editsSize +
      " bytes in " + time + "ms");
  float avgRtt = (float)time/(float)numEdits;
  long throughput = ((long)numEdits * editsSize * 1000L)/time;
  System.err.println("Time per batch: " + avgRtt + "ms");
  System.err.println("Throughput: " + throughput + " bytes/sec");
}
 
示例2
private void doPerfTest(int editsSize, int numEdits) throws Exception {
  byte[] data = new byte[editsSize];
  ch.newEpoch(1).get();
  ch.setEpoch(1);
  ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
  
  StopWatch sw = new StopWatch().start();
  for (int i = 1; i < numEdits; i++) {
    ch.sendEdits(1L, i, 1, data).get();
  }
  long time = sw.now(TimeUnit.MILLISECONDS);
  
  System.err.println("Wrote " + numEdits + " batches of " + editsSize +
      " bytes in " + time + "ms");
  float avgRtt = (float)time/(float)numEdits;
  long throughput = ((long)numEdits * editsSize * 1000L)/time;
  System.err.println("Time per batch: " + avgRtt + "ms");
  System.err.println("Throughput: " + throughput + " bytes/sec");
}
 
示例3
private void doAWrite() throws IOException {
  StopWatch sw = new StopWatch().start();
  stm.write(toWrite);
  stm.hflush();
  long micros = sw.now(TimeUnit.MICROSECONDS);
  quantiles.insert(micros);
}
 
示例4
public int run(String args[]) throws Exception {
  if (args.length != 1) {
    System.err.println(
      "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
      " <path to test file> ");
    System.err.println(
        "Configurations settable by -D options:\n" +
        "  num.threads [default 10] - how many threads to run\n" +
        "  write.size [default 511] - bytes per write\n" +
        "  num.writes [default 50000] - how many writes to perform");
    System.exit(1);
  }
  TestMultiThreadedHflush test = new TestMultiThreadedHflush();
  Configuration conf = getConf();
  Path p = new Path(args[0]);
  
  int numThreads = conf.getInt("num.threads", 10);
  int writeSize = conf.getInt("write.size", 511);
  int numWrites = conf.getInt("num.writes", 50000);
  int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
      DFSConfigKeys.DFS_REPLICATION_DEFAULT);
  
  StopWatch sw = new StopWatch().start();
  test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
      replication);
  sw.stop();
  
  System.out.println("Finished in " + sw.now(TimeUnit.MILLISECONDS) + "ms");
  System.out.println("Latency quantiles (in microseconds):\n" +
      test.quantiles);
  return 0;
}
 
示例5
private void doAWrite() throws IOException {
  StopWatch sw = new StopWatch().start();
  stm.write(toWrite);
  stm.hflush();
  long micros = sw.now(TimeUnit.MICROSECONDS);
  quantiles.insert(micros);
}
 
示例6
public int run(String args[]) throws Exception {
  if (args.length != 1) {
    System.err.println(
      "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
      " <path to test file> ");
    System.err.println(
        "Configurations settable by -D options:\n" +
        "  num.threads [default 10] - how many threads to run\n" +
        "  write.size [default 511] - bytes per write\n" +
        "  num.writes [default 50000] - how many writes to perform");
    System.exit(1);
  }
  TestMultiThreadedHflush test = new TestMultiThreadedHflush();
  Configuration conf = getConf();
  Path p = new Path(args[0]);
  
  int numThreads = conf.getInt("num.threads", 10);
  int writeSize = conf.getInt("write.size", 511);
  int numWrites = conf.getInt("num.writes", 50000);
  int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
      DFSConfigKeys.DFS_REPLICATION_DEFAULT);
  
  StopWatch sw = new StopWatch().start();
  test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
      replication);
  sw.stop();
  
  System.out.println("Finished in " + sw.now(TimeUnit.MILLISECONDS) + "ms");
  System.out.println("Latency quantiles (in microseconds):\n" +
      test.quantiles);
  return 0;
}
 
示例7
/** List input directories.
 * Subclasses may override to, e.g., select only files matching a regular
 * expression. 
 * 
 * @param job the job to list input paths for
 * @return array of FileStatus objects
 * @throws IOException if zero items.
 */
protected List<FileStatus> listStatus(JobContext job
                                      ) throws IOException {
  Path[] dirs = getInputPaths(job);
  if (dirs.length == 0) {
    throw new IOException("No input paths specified in job");
  }
  
  // get tokens for all the required FileSystems..
  TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
                                      job.getConfiguration());

  // Whether we need to recursive look into the directory structure
  boolean recursive = getInputDirRecursive(job);

  // creates a MultiPathFilter with the hiddenFileFilter and the
  // user provided one (if any).
  List<PathFilter> filters = new ArrayList<PathFilter>();
  filters.add(hiddenFileFilter);
  PathFilter jobFilter = getInputPathFilter(job);
  if (jobFilter != null) {
    filters.add(jobFilter);
  }
  PathFilter inputFilter = new MultiPathFilter(filters);
  
  List<FileStatus> result = null;

  int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
      DEFAULT_LIST_STATUS_NUM_THREADS);
  StopWatch sw = new StopWatch().start();
  if (numThreads == 1) {
    result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
  } else {
    Iterable<FileStatus> locatedFiles = null;
    try {
      LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
          job.getConfiguration(), dirs, recursive, inputFilter, true);
      locatedFiles = locatedFileStatusFetcher.getFileStatuses();
    } catch (InterruptedException e) {
      throw new IOException("Interrupted while getting file statuses");
    }
    result = Lists.newArrayList(locatedFiles);
  }
  
  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Time taken to get FileStatuses: "
        + sw.now(TimeUnit.MILLISECONDS));
  }
  LOG.info("Total input paths to process : " + result.size()); 
  return result;
}
 
示例8
/**
 * Write a batch of edits to the journal.
 * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
 */
synchronized void journal(RequestInfo reqInfo,
    long segmentTxId, long firstTxnId,
    int numTxns, byte[] records) throws IOException {
  checkFormatted();
  checkWriteRequest(reqInfo);

  checkSync(curSegment != null,
      "Can't write, no segment open");
  
  if (curSegmentTxId != segmentTxId) {
    // Sanity check: it is possible that the writer will fail IPCs
    // on both the finalize() and then the start() of the next segment.
    // This could cause us to continue writing to an old segment
    // instead of rolling to a new one, which breaks one of the
    // invariants in the design. If it happens, abort the segment
    // and throw an exception.
    JournalOutOfSyncException e = new JournalOutOfSyncException(
        "Writer out of sync: it thinks it is writing segment " + segmentTxId
        + " but current segment is " + curSegmentTxId);
    abortCurSegment();
    throw e;
  }
    
  checkSync(nextTxId == firstTxnId,
      "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
  
  long lastTxnId = firstTxnId + numTxns - 1;
  if (LOG.isTraceEnabled()) {
    LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
  }

  // If the edit has already been marked as committed, we know
  // it has been fsynced on a quorum of other nodes, and we are
  // "catching up" with the rest. Hence we do not need to fsync.
  boolean isLagging = lastTxnId <= committedTxnId.get();
  boolean shouldFsync = !isLagging;
  
  curSegment.writeRaw(records, 0, records.length);
  curSegment.setReadyToFlush();
  StopWatch sw = new StopWatch();
  sw.start();
  curSegment.flush(shouldFsync);
  sw.stop();

  long nanoSeconds = sw.now();
  metrics.addSync(
      TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
  long milliSeconds = TimeUnit.MILLISECONDS.convert(
      nanoSeconds, TimeUnit.NANOSECONDS);

  if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
    LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
             " took " + milliSeconds + "ms");
  }

  if (isLagging) {
    // This batch of edits has already been committed on a quorum of other
    // nodes. So, we are in "catch up" mode. This gets its own metric.
    metrics.batchesWrittenWhileLagging.incr(1);
  }
  
  metrics.batchesWritten.incr(1);
  metrics.bytesWritten.incr(records.length);
  metrics.txnsWritten.incr(numTxns);
  
  highestWrittenTxId = lastTxnId;
  nextTxId = lastTxnId + 1;
}
 
示例9
/** List input directories.
 * Subclasses may override to, e.g., select only files matching a regular
 * expression. 
 * 
 * @param job the job to list input paths for
 * @return array of FileStatus objects
 * @throws IOException if zero items.
 */
protected List<FileStatus> listStatus(JobContext job
                                      ) throws IOException {
  Path[] dirs = getInputPaths(job);
  if (dirs.length == 0) {
    throw new IOException("No input paths specified in job");
  }
  
  // get tokens for all the required FileSystems..
  TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
                                      job.getConfiguration());

  // Whether we need to recursive look into the directory structure
  boolean recursive = getInputDirRecursive(job);

  // creates a MultiPathFilter with the hiddenFileFilter and the
  // user provided one (if any).
  List<PathFilter> filters = new ArrayList<PathFilter>();
  filters.add(hiddenFileFilter);
  PathFilter jobFilter = getInputPathFilter(job);
  if (jobFilter != null) {
    filters.add(jobFilter);
  }
  PathFilter inputFilter = new MultiPathFilter(filters);
  
  List<FileStatus> result = null;

  int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
      DEFAULT_LIST_STATUS_NUM_THREADS);
  StopWatch sw = new StopWatch().start();
  if (numThreads == 1) {
    result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
  } else {
    Iterable<FileStatus> locatedFiles = null;
    try {
      LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
          job.getConfiguration(), dirs, recursive, inputFilter, true);
      locatedFiles = locatedFileStatusFetcher.getFileStatuses();
    } catch (InterruptedException e) {
      throw new IOException("Interrupted while getting file statuses");
    }
    result = Lists.newArrayList(locatedFiles);
  }
  
  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Time taken to get FileStatuses: "
        + sw.now(TimeUnit.MILLISECONDS));
  }
  LOG.info("Total input paths to process : " + result.size()); 
  return result;
}
 
示例10
/**
 * Write a batch of edits to the journal.
 * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
 */
synchronized void journal(RequestInfo reqInfo,
    long segmentTxId, long firstTxnId,
    int numTxns, byte[] records) throws IOException {
  checkFormatted();
  checkWriteRequest(reqInfo);

  checkSync(curSegment != null,
      "Can't write, no segment open");
  
  if (curSegmentTxId != segmentTxId) {
    // Sanity check: it is possible that the writer will fail IPCs
    // on both the finalize() and then the start() of the next segment.
    // This could cause us to continue writing to an old segment
    // instead of rolling to a new one, which breaks one of the
    // invariants in the design. If it happens, abort the segment
    // and throw an exception.
    JournalOutOfSyncException e = new JournalOutOfSyncException(
        "Writer out of sync: it thinks it is writing segment " + segmentTxId
        + " but current segment is " + curSegmentTxId);
    abortCurSegment();
    throw e;
  }
    
  checkSync(nextTxId == firstTxnId,
      "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
  
  long lastTxnId = firstTxnId + numTxns - 1;
  if (LOG.isTraceEnabled()) {
    LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
  }

  // If the edit has already been marked as committed, we know
  // it has been fsynced on a quorum of other nodes, and we are
  // "catching up" with the rest. Hence we do not need to fsync.
  boolean isLagging = lastTxnId <= committedTxnId.get();
  boolean shouldFsync = !isLagging;
  
  curSegment.writeRaw(records, 0, records.length);
  curSegment.setReadyToFlush();
  StopWatch sw = new StopWatch();
  sw.start();
  curSegment.flush(shouldFsync);
  sw.stop();

  long nanoSeconds = sw.now();
  metrics.addSync(
      TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
  long milliSeconds = TimeUnit.MILLISECONDS.convert(
      nanoSeconds, TimeUnit.NANOSECONDS);

  if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
    LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
             " took " + milliSeconds + "ms");
  }

  if (isLagging) {
    // This batch of edits has already been committed on a quorum of other
    // nodes. So, we are in "catch up" mode. This gets its own metric.
    metrics.batchesWrittenWhileLagging.incr(1);
  }
  
  metrics.batchesWritten.incr(1);
  metrics.bytesWritten.incr(records.length);
  metrics.txnsWritten.incr(numTxns);
  
  highestWrittenTxId = lastTxnId;
  nextTxId = lastTxnId + 1;
}