Java源码示例:org.apache.hadoop.hdfs.HDFSPolicyProvider

示例1
@Before
@Override
public void setUp() throws Exception {
  super.setUp();
  conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
  conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
      HDFSPolicyProvider.class, PolicyProvider.class);
  conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
  
  dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  dfsCluster.waitClusterUp();
  namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
  
  username = System.getProperty("user.name");

  fs = dfsCluster.getFileSystem();
  assertTrue("Not a HDFS: "+fs.getUri(), 
      fs instanceof DistributedFileSystem);
}
 
示例2
@Before
@Override
public void setUp() throws Exception {
  super.setUp();
  conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
      HDFSPolicyProvider.class, PolicyProvider.class);

  // Many of the tests expect a replication value of 1 in the output
  conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);

  dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();

  dfsCluster.waitClusterUp();
  namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
  username = System.getProperty("user.name");

  fs = dfsCluster.getFileSystem();
  assertTrue("Not a HDFS: "+fs.getUri(),
             fs instanceof DistributedFileSystem);
}
 
示例3
@Before
@Override
public void setUp() throws Exception {
  super.setUp();
  conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
      HDFSPolicyProvider.class, PolicyProvider.class);
  conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);

  tmpDir = new File(System.getProperty("test.build.data", "target"),
      UUID.randomUUID().toString()).getAbsoluteFile();
  final Path jksPath = new Path(tmpDir.toString(), "test.jks");
  conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
      JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());

  dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  dfsCluster.waitClusterUp();
  createAKey("mykey", conf);
  namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");

  username = System.getProperty("user.name");

  fs = dfsCluster.getFileSystem();
  assertTrue("Not an HDFS: " + fs.getUri(),
      fs instanceof DistributedFileSystem);
}
 
示例4
@Before
@Override
public void setUp() throws Exception {
  super.setUp();
  conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
  conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
      HDFSPolicyProvider.class, PolicyProvider.class);
  conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
  
  dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  dfsCluster.waitClusterUp();
  namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
  
  username = System.getProperty("user.name");

  fs = dfsCluster.getFileSystem();
  assertTrue("Not a HDFS: "+fs.getUri(), 
      fs instanceof DistributedFileSystem);
}
 
示例5
@Before
@Override
public void setUp() throws Exception {
  super.setUp();
  conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
      HDFSPolicyProvider.class, PolicyProvider.class);

  // Many of the tests expect a replication value of 1 in the output
  conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);

  dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();

  dfsCluster.waitClusterUp();
  namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
  username = System.getProperty("user.name");

  fs = dfsCluster.getFileSystem();
  assertTrue("Not a HDFS: "+fs.getUri(),
             fs instanceof DistributedFileSystem);
}
 
示例6
@Before
@Override
public void setUp() throws Exception {
  super.setUp();
  conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
      HDFSPolicyProvider.class, PolicyProvider.class);
  conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);

  tmpDir = new File(System.getProperty("test.build.data", "target"),
      UUID.randomUUID().toString()).getAbsoluteFile();
  final Path jksPath = new Path(tmpDir.toString(), "test.jks");
  conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
      JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());

  dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  dfsCluster.waitClusterUp();
  createAKey("mykey", conf);
  namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");

  username = System.getProperty("user.name");

  fs = dfsCluster.getFileSystem();
  assertTrue("Not an HDFS: " + fs.getUri(),
      fs instanceof DistributedFileSystem);
}
 
示例7
private void rewriteHadoopPolicyFile(File policyFile) throws IOException {
  FileWriter fos = new FileWriter(policyFile);
  PolicyProvider policyProvider = new HDFSPolicyProvider();
  fos.write("<configuration>\n");
  for (Service service : policyProvider.getServices()) {
    String key = service.getServiceKey();
    String value ="*";
    if (key.equals("security.refresh.policy.protocol.acl")) {
      value = DUMMY_ACL;
    }
    fos.write("<property><name>"+ key + "</name><value>" + value + 
              "</value></property>\n");
    System.err.println("<property><name>"+ key + "</name><value>" + value + 
        "</value></property>\n");
  }
  fos.write("</configuration>\n");
  fos.close();
}
 
示例8
private void rewriteHadoopPolicyFile(File policyFile) throws IOException {
  FileWriter fos = new FileWriter(policyFile);
  PolicyProvider policyProvider = new HDFSPolicyProvider();
  fos.write("<configuration>\n");
  for (Service service : policyProvider.getServices()) {
    String key = service.getServiceKey();
    String value ="*";
    if (key.equals("security.refresh.policy.protocol.acl")) {
      value = DUMMY_ACL;
    }
    fos.write("<property><name>"+ key + "</name><value>" + value + 
              "</value></property>\n");
    System.err.println("<property><name>"+ key + "</name><value>" + value + 
        "</value></property>\n");
  }
  fos.write("</configuration>\n");
  fos.close();
}
 
示例9
@Override // RefreshAuthorizationPolicyProtocol
public void refreshServiceAcl() throws IOException {
  checkNNStartup();
  if (!serviceAuthEnabled) {
    throw new AuthorizationException("Service Level Authorization not enabled!");
  }

  this.clientRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
  if (this.serviceRpcServer != null) {
    this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
  }
}
 
示例10
JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
  this.jn = jn;
  
  Configuration confCopy = new Configuration(conf);
  
  // Ensure that nagling doesn't kick in, which could cause latency issues.
  confCopy.setBoolean(
      CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
      true);
  
  InetSocketAddress addr = getAddress(confCopy);
  RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class,
      ProtobufRpcEngine.class);
  QJournalProtocolServerSideTranslatorPB translator =
      new QJournalProtocolServerSideTranslatorPB(this);
  BlockingService service = QJournalProtocolService
      .newReflectiveBlockingService(translator);
  
  this.server = new RPC.Builder(confCopy)
    .setProtocol(QJournalProtocolPB.class)
    .setInstance(service)
    .setBindAddress(addr.getHostName())
    .setPort(addr.getPort())
    .setNumHandlers(HANDLER_COUNT)
    .setVerbose(false)
    .build();

  // set service-level authorization security policy
  if (confCopy.getBoolean(
    CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
        server.refreshServiceAcl(confCopy, new HDFSPolicyProvider());
  }
}
 
示例11
@Before
@Override
public void setUp() throws Exception {
  super.setUp();
  conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
      HDFSPolicyProvider.class, PolicyProvider.class);
  
  // Many of the tests expect a replication value of 1 in the output
  conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
  
  // Build racks and hosts configuration to test dfsAdmin -printTopology
  String [] racks =  {"/rack1", "/rack1", "/rack2", "/rack2",
                      "/rack2", "/rack3", "/rack4", "/rack4" };
  String [] hosts = {"host1", "host2", "host3", "host4",
                     "host5", "host6", "host7", "host8" };
  dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(8)
                                               .racks(racks)
                                               .hosts(hosts)
                                               .build();
  dfsCluster.waitClusterUp();
  namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
  
  username = System.getProperty("user.name");

  fs = dfsCluster.getFileSystem();
  assertTrue("Not a HDFS: "+fs.getUri(),
             fs instanceof DistributedFileSystem);
}
 
示例12
@Override // RefreshAuthorizationPolicyProtocol
public void refreshServiceAcl() throws IOException {
  checkNNStartup();
  if (!serviceAuthEnabled) {
    throw new AuthorizationException("Service Level Authorization not enabled!");
  }

  this.clientRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
  if (this.serviceRpcServer != null) {
    this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
  }
}
 
示例13
JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
  this.jn = jn;
  
  Configuration confCopy = new Configuration(conf);
  
  // Ensure that nagling doesn't kick in, which could cause latency issues.
  confCopy.setBoolean(
      CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
      true);
  
  InetSocketAddress addr = getAddress(confCopy);
  RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class,
      ProtobufRpcEngine.class);
  QJournalProtocolServerSideTranslatorPB translator =
      new QJournalProtocolServerSideTranslatorPB(this);
  BlockingService service = QJournalProtocolService
      .newReflectiveBlockingService(translator);
  
  this.server = new RPC.Builder(confCopy)
    .setProtocol(QJournalProtocolPB.class)
    .setInstance(service)
    .setBindAddress(addr.getHostName())
    .setPort(addr.getPort())
    .setNumHandlers(HANDLER_COUNT)
    .setVerbose(false)
    .build();

  // set service-level authorization security policy
  if (confCopy.getBoolean(
    CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
        server.refreshServiceAcl(confCopy, new HDFSPolicyProvider());
  }
}
 
示例14
@Before
@Override
public void setUp() throws Exception {
  super.setUp();
  conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
      HDFSPolicyProvider.class, PolicyProvider.class);
  
  // Many of the tests expect a replication value of 1 in the output
  conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
  
  // Build racks and hosts configuration to test dfsAdmin -printTopology
  String [] racks =  {"/rack1", "/rack1", "/rack2", "/rack2",
                      "/rack2", "/rack3", "/rack4", "/rack4" };
  String [] hosts = {"host1", "host2", "host3", "host4",
                     "host5", "host6", "host7", "host8" };
  dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(8)
                                               .racks(racks)
                                               .hosts(hosts)
                                               .build();
  dfsCluster.waitClusterUp();
  namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
  
  username = System.getProperty("user.name");

  fs = dfsCluster.getFileSystem();
  assertTrue("Not a HDFS: "+fs.getUri(),
             fs instanceof DistributedFileSystem);
}
 
示例15
@Override
public Service[] getServices() {
  Service[] hdfsServices = new HDFSPolicyProvider().getServices();
  Service[] mrServices = new MapReducePolicyProvider().getServices();
  
  Service[] hadoopServices = 
    new Service[hdfsServices.length + mrServices.length];
  System.arraycopy(hdfsServices, 0, hadoopServices, 0, hdfsServices.length);
  System.arraycopy(mrServices, 0, hadoopServices, hdfsServices.length, 
                   mrServices.length);

  return hadoopServices;
}
 
示例16
@Override
public Service[] getServices() {
  Service[] hdfsServices = new HDFSPolicyProvider().getServices();
  Service[] mrServices = new MapReducePolicyProvider().getServices();
  
  Service[] hadoopServices = 
    new Service[hdfsServices.length + mrServices.length];
  System.arraycopy(hdfsServices, 0, hadoopServices, 0, hdfsServices.length);
  System.arraycopy(mrServices, 0, hadoopServices, hdfsServices.length, 
                   mrServices.length);

  return hadoopServices;
}
 
示例17
/**
 * Initialize name-node.
 * 
 * @param conf the configuration
 */
private void initialize(Configuration conf) throws IOException {
  InetSocketAddress socAddr = NameNode.getAddress(conf);
  int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
  
  // set service-level authorization security policy
  if (serviceAuthEnabled = 
        conf.getBoolean(
          ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
    PolicyProvider policyProvider = 
      (PolicyProvider)(ReflectionUtils.newInstance(
          conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
              HDFSPolicyProvider.class, PolicyProvider.class), 
          conf));
    SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
  }

  // create rpc server 
  this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
                              handlerCount, false, conf);

  // The rpc-server port can be ephemeral... ensure we have the correct info
  this.serverAddress = this.server.getListenerAddress(); 
  FileSystem.setDefaultUri(conf, getUri(serverAddress));
  LOG.info("Namenode up at: " + this.serverAddress);

  myMetrics = new NameNodeMetrics(conf, this);

  this.namesystem = new FSNamesystem(this, conf);
  startHttpServer(conf);
  this.server.start();  //start RPC server   
  startTrashEmptier(conf);
}
 
示例18
private void initIpcServer(Configuration conf) throws IOException {
  InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
      conf.getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
  
  // Add all the RPC protocols that the Datanode implements    
  RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
      ProtobufRpcEngine.class);
  ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator = 
        new ClientDatanodeProtocolServerSideTranslatorPB(this);
  BlockingService service = ClientDatanodeProtocolService
      .newReflectiveBlockingService(clientDatanodeProtocolXlator);
  ipcServer = new RPC.Builder(conf)
      .setProtocol(ClientDatanodeProtocolPB.class)
      .setInstance(service)
      .setBindAddress(ipcAddr.getHostName())
      .setPort(ipcAddr.getPort())
      .setNumHandlers(
          conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
              DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
      .setSecretManager(blockPoolTokenSecretManager).build();
  
  InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = 
      new InterDatanodeProtocolServerSideTranslatorPB(this);
  service = InterDatanodeProtocolService
      .newReflectiveBlockingService(interDatanodeProtocolXlator);
  DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
      ipcServer);

  TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
      new TraceAdminProtocolServerSideTranslatorPB(this);
  BlockingService traceAdminService = TraceAdminService
      .newReflectiveBlockingService(traceAdminXlator);
  DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService,
      ipcServer);

  LOG.info("Opened IPC server at " + ipcServer.getListenerAddress());

  // set service-level authorization security policy
  if (conf.getBoolean(
      CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
    ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
  }
}
 
示例19
@Override
protected PolicyProvider getPolicyProvider() {
  return new HDFSPolicyProvider();
}
 
示例20
private void initIpcServer(Configuration conf) throws IOException {
  InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
      conf.getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
  
  // Add all the RPC protocols that the Datanode implements    
  RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
      ProtobufRpcEngine.class);
  ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator = 
        new ClientDatanodeProtocolServerSideTranslatorPB(this);
  BlockingService service = ClientDatanodeProtocolService
      .newReflectiveBlockingService(clientDatanodeProtocolXlator);
  ipcServer = new RPC.Builder(conf)
      .setProtocol(ClientDatanodeProtocolPB.class)
      .setInstance(service)
      .setBindAddress(ipcAddr.getHostName())
      .setPort(ipcAddr.getPort())
      .setNumHandlers(
          conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
              DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
      .setSecretManager(blockPoolTokenSecretManager).build();
  
  InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = 
      new InterDatanodeProtocolServerSideTranslatorPB(this);
  service = InterDatanodeProtocolService
      .newReflectiveBlockingService(interDatanodeProtocolXlator);
  DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
      ipcServer);

  TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
      new TraceAdminProtocolServerSideTranslatorPB(this);
  BlockingService traceAdminService = TraceAdminService
      .newReflectiveBlockingService(traceAdminXlator);
  DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService,
      ipcServer);

  LOG.info("Opened IPC server at " + ipcServer.getListenerAddress());

  // set service-level authorization security policy
  if (conf.getBoolean(
      CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
    ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
  }
}
 
示例21
@Override
protected PolicyProvider getPolicyProvider() {
  return new HDFSPolicyProvider();
}
 
示例22
/**
 * Initialize name-node.
 * 
 */
private void initialize() throws IOException {    
  // set service-level authorization security policy
  if (serviceAuthEnabled =
      getConf().getBoolean(
          ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
    PolicyProvider policyProvider = 
      (PolicyProvider)(ReflectionUtils.newInstance(
          getConf().getClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
              HDFSPolicyProvider.class, PolicyProvider.class), 
          getConf()));
    SecurityUtil.setPolicy(new ConfiguredPolicy(getConf(), policyProvider));
  }

  // This is a check that the port is free
  // create a socket and bind to it, throw exception if port is busy
  // This has to be done before we are reading Namesystem not to waste time and fail fast
  InetSocketAddress clientSocket = NameNode.getAddress(getConf());
  ServerSocket socket = new ServerSocket();
  socket.bind(clientSocket);
  socket.close();
  InetSocketAddress dnSocket = NameNode.getDNProtocolAddress(getConf());
  if (dnSocket != null) {
    socket = new ServerSocket();
    socket.bind(dnSocket);
    socket.close();
    //System.err.println("Tested " + dnSocket);
  }
  
  long serverVersion = ClientProtocol.versionID;
  this.clientProtocolMethodsFingerprint = ProtocolSignature
      .getMethodsSigFingerPrint(ClientProtocol.class, serverVersion);
  
  myMetrics = new NameNodeMetrics(getConf(), this);

  this.clusterName = getConf().get("dfs.cluster.name");
  this.namesystem = new FSNamesystem(this, getConf());
  // HACK: from removal of FSNamesystem.getFSNamesystem().
  JspHelper.fsn = this.namesystem;

  this.startDNServer();
  startHttpServer(getConf());
}
 
示例23
private void initConfig(Configuration conf) throws IOException {
  if (conf.get("slave.host.name") != null) {
    machineName = conf.get("slave.host.name");   
  }
  if (machineName == null) {
    machineName = DNS.getDefaultHost(
                                   conf.get("dfs.datanode.dns.interface","default"),
                                   conf.get("dfs.datanode.dns.nameserver","default"));
  }
  // Allow configuration to delay block reports to find bugs
  artificialBlockReceivedDelay = conf.getInt(
    "dfs.datanode.artificialBlockReceivedDelay", 0);
  if (conf.getBoolean(
      ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
    PolicyProvider policyProvider = (PolicyProvider) (ReflectionUtils
        .newInstance(conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
            HDFSPolicyProvider.class, PolicyProvider.class), conf));
    SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
  }
  this.socketTimeout = conf.getInt("dfs.socket.timeout",
      HdfsConstants.READ_TIMEOUT);
  this.socketReadExtentionTimeout = conf.getInt(
      HdfsConstants.DFS_DATANODE_READ_EXTENSION,
      HdfsConstants.READ_TIMEOUT_EXTENSION);
  this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
      HdfsConstants.WRITE_TIMEOUT);
  this.socketWriteExtentionTimeout = conf.getInt(
      HdfsConstants.DFS_DATANODE_WRITE_EXTENTSION,
      HdfsConstants.WRITE_TIMEOUT_EXTENSION);
  
  /* Based on results on different platforms, we might need set the default 
   * to false on some of them. */
  this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed",
                                           true);

  // TODO: remove the global setting and change data protocol to support
  // per session setting for this value.
  this.ignoreChecksumWhenRead = conf.getBoolean("dfs.datanode.read.ignore.checksum",
      false);

  this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
  
  this.deletedReportInterval =
    conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
  // Calculate the full block report interval
  int fullReportMagnifier = conf.getInt("dfs.fullblockreport.magnifier", 2);
  this.blockReportInterval = fullReportMagnifier * deletedReportInterval;
  this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
  long heartbeatRecheckInterval = conf.getInt(
      "heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
  this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
      10 * heartBeatInterval;
  
  this.initialBlockReportDelay = conf.getLong("dfs.blockreport.initialDelay",
      BLOCKREPORT_INITIAL_DELAY) * 1000L;
  if (this.initialBlockReportDelay >= blockReportInterval) {
    this.initialBlockReportDelay = 0;
    LOG.info("dfs.blockreport.initialDelay is greater than "
        + "dfs.blockreport.intervalMsec."
        + " Setting initial delay to 0 msec:");
  }

  // do we need to sync block file contents to disk when blockfile is closed?
  this.syncOnClose = conf.getBoolean("dfs.datanode.synconclose", false);
  
  this.minDiskCheckIntervalMsec = conf.getLong(
      "dfs.datnode.checkdisk.mininterval",
      FSConstants.MIN_INTERVAL_CHECK_DIR_MSEC);
}