Java源码示例:org.apache.hadoop.hbase.replication.ReplicationEndpoint

示例1
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {

	final ReplicationEndpoint ret;

	if(LOG.isDebugEnabled()) {
		LOG.debug("==> RangerAuthorizationCoprocessor.postCreateReplicationEndPoint()");
	}

	try {
		activatePluginClassLoader();
		ret = implRegionServerObserver.postCreateReplicationEndPoint(ctx, endpoint);
	} finally {
		deactivatePluginClassLoader();
	}

	if(LOG.isDebugEnabled()) {
		LOG.debug("<== RangerAuthorizationCoprocessor.postCreateReplicationEndPoint()");
	}

	return ret;
}
 
示例2
private ReplicationEndpoint createReplicationEndpoint()
    throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
  RegionServerCoprocessorHost rsServerHost = null;
  if (server instanceof HRegionServer) {
    rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
  }
  String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();

  ReplicationEndpoint replicationEndpoint;
  if (replicationEndpointImpl == null) {
    // Default to HBase inter-cluster replication endpoint; skip reflection
    replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
  } else {
    try {
      replicationEndpoint = Class.forName(replicationEndpointImpl)
          .asSubclass(ReplicationEndpoint.class)
          .getDeclaredConstructor()
          .newInstance();
    } catch (NoSuchMethodException | InvocationTargetException e) {
      throw new IllegalArgumentException(e);
    }
  }
  if (rsServerHost != null) {
    ReplicationEndpoint newReplicationEndPoint =
      rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
    if (newReplicationEndPoint != null) {
      // Override the newly created endpoint from the hook with configured end point
      replicationEndpoint = newReplicationEndPoint;
    }
  }
  return replicationEndpoint;
}
 
示例3
private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
    throws IOException, TimeoutException {
  TableDescriptors tableDescriptors = null;
  if (server instanceof HRegionServer) {
    tableDescriptors = ((HRegionServer) server).getTableDescriptors();
  }
  replicationEndpoint
    .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs,
      replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
  replicationEndpoint.start();
  replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
}
 
示例4
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
    throws IOException {
  if (this.coprocEnvironments.isEmpty()) {
    return endpoint;
  }
  return execOperationWithResult(
      new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(
          rsObserverGetter, endpoint) {
    @Override
    public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
      return observer.postCreateReplicationEndPoint(this, getResult());
    }
  });
}
 
示例5
@Test
public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
  // tests replaying the edits to a secondary region replica using the RRRE.replicate()
  openRegion(HTU, rs0, hriSecondary);
  RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();

  ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
  when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
  when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
  when(context.getServer()).thenReturn(rs0);
  when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
  replicator.init(context);
  replicator.startAsync();

  //load some data to primary
  HTU.loadNumericRows(table, f, 0, 1000);

  Assert.assertEquals(1000, entries.size());
  // replay the edits to the secondary using replay callable
  final String fakeWalGroupId = "fakeWALGroup";
  replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
      .setWalGroupId(fakeWalGroupId));
  replicator.stop();
  Region region = rs0.getRegion(hriSecondary.getEncodedName());
  HTU.verifyNumericRows(region, f, 0, 1000);

  HTU.deleteNumericRows(table, f, 0, 1000);
  closeRegion(HTU, rs0, hriSecondary);
}
 
示例6
@Override
public ReplicationEndpoint getReplicationEndpoint() {
  return this.replicationEndpoint;
}
 
示例7
/**
 * Do the shipping logic
 */
private void shipEdits(WALEntryBatch entryBatch) {
  List<Entry> entries = entryBatch.getWalEntries();
  int sleepMultiplier = 0;
  if (entries.isEmpty()) {
    updateLogPosition(entryBatch);
    return;
  }
  int currentSize = (int) entryBatch.getHeapSize();
  int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
  source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size() - 1)
      .getKey().getWriteTime());
  while (isActive()) {
    try {
      try {
        source.tryThrottle(currentSize);
      } catch (InterruptedException e) {
        LOG.debug("Interrupted while sleeping for throttling control");
        Thread.currentThread().interrupt();
        // current thread might be interrupted to terminate
        // directly go back to while() for confirm this
        continue;
      }
      // create replicateContext here, so the entries can be GC'd upon return from this call
      // stack
      ReplicationEndpoint.ReplicateContext replicateContext =
          new ReplicationEndpoint.ReplicateContext();
      replicateContext.setEntries(entries).setSize(currentSize);
      replicateContext.setWalGroupId(walGroupId);
      replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));

      long startTimeNs = System.nanoTime();
      // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
      boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
      long endTimeNs = System.nanoTime();

      if (!replicated) {
        continue;
      } else {
        sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
      }
      // Clean up hfile references
      for (Entry entry : entries) {
        cleanUpHFileRefs(entry.getEdit());
        LOG.trace("shipped entry {}: ", entry);
      }
      // Log and clean up WAL logs
      updateLogPosition(entryBatch);

      //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
      //this sizeExcludeBulkLoad has to use same calculation that when calling
      //acquireBufferQuota() in ReplicationSourceWALReader because they maintain
      //same variable: totalBufferUsed
      source.postShipEdits(entries, sizeExcludeBulkLoad);
      // FIXME check relationship between wal group and overall
      source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
        entryBatch.getNbHFiles());
      source.getSourceMetrics().setAgeOfLastShippedOp(
        entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
      source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());

      if (LOG.isTraceEnabled()) {
        LOG.debug("Replicated {} entries or {} operations in {} ms",
            entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
      }
      break;
    } catch (Exception ex) {
      LOG.warn("{} threw unknown exception:",
        source.getReplicationEndpoint().getClass().getName(), ex);
      if (sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
        maxRetriesMultiplier)) {
        sleepMultiplier++;
      }
    }
  }
}
 
示例8
public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
    VisibilityLabelService visibilityLabelsService) {
  this.delegator = endpoint;
  this.visibilityLabelsService = visibilityLabelsService;
}
 
示例9
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
}
 
示例10
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return endpoint;
}
 
示例11
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
  String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
  boolean checkClusterKey = true;
  if (!StringUtils.isBlank(replicationEndpointImpl)) {
    // try creating a instance
    ReplicationEndpoint endpoint;
    try {
      endpoint = Class.forName(replicationEndpointImpl)
        .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
    } catch (Throwable e) {
      throw new DoNotRetryIOException(
        "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
        e);
    }
    // do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
    if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
      checkClusterKey = false;
    }
  }
  if (checkClusterKey) {
    checkClusterKey(peerConfig.getClusterKey());
  }

  if (peerConfig.replicateAllUserTables()) {
    // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
    // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
    // cluster.
    if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
      (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
      throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " +
        "when you want replicate all cluster");
    }
    checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
      peerConfig.getExcludeTableCFsMap());
  } else {
    // If replicate_all flag is false, it means all user tables can't be replicated to peer
    // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
    // cluster.
    if ((peerConfig.getExcludeNamespaces() != null &&
      !peerConfig.getExcludeNamespaces().isEmpty()) ||
      (peerConfig.getExcludeTableCFsMap() != null &&
        !peerConfig.getExcludeTableCFsMap().isEmpty())) {
      throw new DoNotRetryIOException(
        "Need clean exclude-namespaces or exclude-table-cfs config firstly" +
          " when replicate_all flag is false");
    }
    checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
      peerConfig.getTableCFsMap());
  }

  if (peerConfig.isSyncReplication()) {
    checkPeerConfigForSyncReplication(peerConfig);
  }

  checkConfiguredWALEntryFilters(peerConfig);
}
 
示例12
private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
    throws Exception {
  // tests having edits from a disabled or dropped table is handled correctly by skipping those
  // entries and further edits after the edits from dropped/disabled table can be replicated
  // without problems.
  final TableName tableName = TableName.valueOf(
    name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication);
  HTableDescriptor htd = HTU.createTableDescriptor(tableName);
  int regionReplication = 3;
  htd.setRegionReplication(regionReplication);
  HTU.deleteTableIfAny(tableName);

  HTU.getAdmin().createTable(htd);
  TableName toBeDisabledTable = TableName.valueOf(
    dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
  HTU.deleteTableIfAny(toBeDisabledTable);
  htd = HTU.createTableDescriptor(TableName.valueOf(toBeDisabledTable.toString()),
    HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
    HColumnDescriptor.DEFAULT_KEEP_DELETED);
  htd.setRegionReplication(regionReplication);
  HTU.getAdmin().createTable(htd);

  // both tables are created, now pause replication
  HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());

  // now that the replication is disabled, write to the table to be dropped, then drop the table.

  Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
  Table table = connection.getTable(tableName);
  Table tableToBeDisabled = connection.getTable(toBeDisabledTable);

  HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);

  RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
  HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
  byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();

  Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
      .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
  Entry entry = new Entry(
    new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
      new WALEdit()
          .add(cell));
  HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
  if (dropTable) {
    HTU.getAdmin().deleteTable(toBeDisabledTable);
  } else if (disableReplication) {
    htd.setRegionReplication(regionReplication - 2);
    HTU.getAdmin().modifyTable(htd);
    HTU.getAdmin().enableTable(toBeDisabledTable);
  }

  HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
  MetricsSource metrics = mock(MetricsSource.class);
  ReplicationEndpoint.Context ctx =
    new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
      HTU.getTestFileSystem(), ServerRegionReplicaUtil.getReplicationPeerId(),
      UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
        .getPeer(ServerRegionReplicaUtil.getReplicationPeerId()),
      metrics, rs.getTableDescriptors(), rs);
  RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
  rrpe.init(ctx);
  rrpe.start();
  ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
  repCtx.setEntries(Lists.newArrayList(entry, entry));
  assertTrue(rrpe.replicate(repCtx));
  verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
  rrpe.stop();
  if (disableReplication) {
    // enable replication again so that we can verify replication
    HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
    htd.setRegionReplication(regionReplication);
    HTU.getAdmin().modifyTable(htd);
    HTU.getAdmin().enableTable(toBeDisabledTable);
  }

  try {
    // load some data to the to-be-dropped table
    // load the data to the table
    HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);

    // now enable the replication
    HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());

    verifyReplication(tableName, regionReplication, 0, 1000);
  } finally {
    table.close();
    rl.close();
    tableToBeDisabled.close();
    HTU.deleteTableIfAny(toBeDisabledTable);
    connection.close();
  }
}
 
示例13
/**
 * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
 */
@Test
public void testTerminateTimeout() throws Exception {
  ReplicationSource source = new ReplicationSource();
  ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
    @Override
    protected void doStart() {
      notifyStarted();
    }

    @Override
    protected void doStop() {
      // not calling notifyStopped() here causes the caller of stop() to get a Future that never
      // completes
    }
  };
  replicationEndpoint.start();
  ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
  Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
  Configuration testConf = HBaseConfiguration.create();
  testConf.setInt("replication.source.maxretriesmultiplier", 1);
  ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
  Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
  source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
    p -> OptionalLong.empty(), null);
  ExecutorService executor = Executors.newSingleThreadExecutor();
  Future<?> future = executor.submit(new Runnable() {

    @Override
    public void run() {
      source.terminate("testing source termination");
    }
  });
  long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
  Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {

    @Override
    public boolean evaluate() throws Exception {
      return future.isDone();
    }
  });
}
 
示例14
public VisibilityReplicationEndPointForTest(ReplicationEndpoint endpoint,
    VisibilityLabelService visibilityLabelsService) {
  super(endpoint, visibilityLabelsService);
}
 
示例15
/**
 * @return the replication endpoint used by this replication source
 */
ReplicationEndpoint getReplicationEndpoint();
 
示例16
/**
 * This will be called after the replication endpoint is instantiated.
 * @param ctx the environment to interact with the framework and region server.
 * @param endpoint - the base endpoint for replication
 * @return the endpoint to use during replication.
 */
default ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return endpoint;
}