Java源码示例:org.apache.hadoop.hbase.replication.ReplicationEndpoint
示例1
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
final ReplicationEndpoint ret;
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerAuthorizationCoprocessor.postCreateReplicationEndPoint()");
}
try {
activatePluginClassLoader();
ret = implRegionServerObserver.postCreateReplicationEndPoint(ctx, endpoint);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerAuthorizationCoprocessor.postCreateReplicationEndPoint()");
}
return ret;
}
示例2
private ReplicationEndpoint createReplicationEndpoint()
throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
RegionServerCoprocessorHost rsServerHost = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
}
String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
ReplicationEndpoint replicationEndpoint;
if (replicationEndpointImpl == null) {
// Default to HBase inter-cluster replication endpoint; skip reflection
replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
} else {
try {
replicationEndpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class)
.getDeclaredConstructor()
.newInstance();
} catch (NoSuchMethodException | InvocationTargetException e) {
throw new IllegalArgumentException(e);
}
}
if (rsServerHost != null) {
ReplicationEndpoint newReplicationEndPoint =
rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
if (newReplicationEndPoint != null) {
// Override the newly created endpoint from the hook with configured end point
replicationEndpoint = newReplicationEndPoint;
}
}
return replicationEndpoint;
}
示例3
private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
throws IOException, TimeoutException {
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
replicationEndpoint
.init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs,
replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
replicationEndpoint.start();
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
}
示例4
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return endpoint;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(
rsObserverGetter, endpoint) {
@Override
public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
return observer.postCreateReplicationEndPoint(this, getResult());
}
});
}
示例5
@Test
public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
// tests replaying the edits to a secondary region replica using the RRRE.replicate()
openRegion(HTU, rs0, hriSecondary);
RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
when(context.getServer()).thenReturn(rs0);
when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
replicator.init(context);
replicator.startAsync();
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
// replay the edits to the secondary using replay callable
final String fakeWalGroupId = "fakeWALGroup";
replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
.setWalGroupId(fakeWalGroupId));
replicator.stop();
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
}
示例6
@Override
public ReplicationEndpoint getReplicationEndpoint() {
return this.replicationEndpoint;
}
示例7
/**
* Do the shipping logic
*/
private void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
updateLogPosition(entryBatch);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size() - 1)
.getKey().getWriteTime());
while (isActive()) {
try {
try {
source.tryThrottle(currentSize);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping for throttling control");
Thread.currentThread().interrupt();
// current thread might be interrupted to terminate
// directly go back to while() for confirm this
continue;
}
// create replicateContext here, so the entries can be GC'd upon return from this call
// stack
ReplicationEndpoint.ReplicateContext replicateContext =
new ReplicationEndpoint.ReplicateContext();
replicateContext.setEntries(entries).setSize(currentSize);
replicateContext.setWalGroupId(walGroupId);
replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
long endTimeNs = System.nanoTime();
if (!replicated) {
continue;
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);
}
// Log and clean up WAL logs
updateLogPosition(entryBatch);
//offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
//this sizeExcludeBulkLoad has to use same calculation that when calling
//acquireBufferQuota() in ReplicationSourceWALReader because they maintain
//same variable: totalBufferUsed
source.postShipEdits(entries, sizeExcludeBulkLoad);
// FIXME check relationship between wal group and overall
source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
entryBatch.getNbHFiles());
source.getSourceMetrics().setAgeOfLastShippedOp(
entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
if (LOG.isTraceEnabled()) {
LOG.debug("Replicated {} entries or {} operations in {} ms",
entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
}
break;
} catch (Exception ex) {
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
maxRetriesMultiplier)) {
sleepMultiplier++;
}
}
}
}
示例8
public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
VisibilityLabelService visibilityLabelsService) {
this.delegator = endpoint;
this.visibilityLabelsService = visibilityLabelsService;
}
示例9
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
}
示例10
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
return endpoint;
}
示例11
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
boolean checkClusterKey = true;
if (!StringUtils.isBlank(replicationEndpointImpl)) {
// try creating a instance
ReplicationEndpoint endpoint;
try {
endpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
} catch (Throwable e) {
throw new DoNotRetryIOException(
"Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
e);
}
// do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
checkClusterKey = false;
}
}
if (checkClusterKey) {
checkClusterKey(peerConfig.getClusterKey());
}
if (peerConfig.replicateAllUserTables()) {
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
// Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
// cluster.
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
(peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " +
"when you want replicate all cluster");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
peerConfig.getExcludeTableCFsMap());
} else {
// If replicate_all flag is false, it means all user tables can't be replicated to peer
// cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
// cluster.
if ((peerConfig.getExcludeNamespaces() != null &&
!peerConfig.getExcludeNamespaces().isEmpty()) ||
(peerConfig.getExcludeTableCFsMap() != null &&
!peerConfig.getExcludeTableCFsMap().isEmpty())) {
throw new DoNotRetryIOException(
"Need clean exclude-namespaces or exclude-table-cfs config firstly" +
" when replicate_all flag is false");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
peerConfig.getTableCFsMap());
}
if (peerConfig.isSyncReplication()) {
checkPeerConfigForSyncReplication(peerConfig);
}
checkConfiguredWALEntryFilters(peerConfig);
}
示例12
private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
throws Exception {
// tests having edits from a disabled or dropped table is handled correctly by skipping those
// entries and further edits after the edits from dropped/disabled table can be replicated
// without problems.
final TableName tableName = TableName.valueOf(
name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication);
HTableDescriptor htd = HTU.createTableDescriptor(tableName);
int regionReplication = 3;
htd.setRegionReplication(regionReplication);
HTU.deleteTableIfAny(tableName);
HTU.getAdmin().createTable(htd);
TableName toBeDisabledTable = TableName.valueOf(
dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
HTU.deleteTableIfAny(toBeDisabledTable);
htd = HTU.createTableDescriptor(TableName.valueOf(toBeDisabledTable.toString()),
HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
HColumnDescriptor.DEFAULT_KEEP_DELETED);
htd.setRegionReplication(regionReplication);
HTU.getAdmin().createTable(htd);
// both tables are created, now pause replication
HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
// now that the replication is disabled, write to the table to be dropped, then drop the table.
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
.setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
Entry entry = new Entry(
new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
new WALEdit()
.add(cell));
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
if (dropTable) {
HTU.getAdmin().deleteTable(toBeDisabledTable);
} else if (disableReplication) {
htd.setRegionReplication(regionReplication - 2);
HTU.getAdmin().modifyTable(htd);
HTU.getAdmin().enableTable(toBeDisabledTable);
}
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
MetricsSource metrics = mock(MetricsSource.class);
ReplicationEndpoint.Context ctx =
new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
HTU.getTestFileSystem(), ServerRegionReplicaUtil.getReplicationPeerId(),
UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
.getPeer(ServerRegionReplicaUtil.getReplicationPeerId()),
metrics, rs.getTableDescriptors(), rs);
RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
rrpe.init(ctx);
rrpe.start();
ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
repCtx.setEntries(Lists.newArrayList(entry, entry));
assertTrue(rrpe.replicate(repCtx));
verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
rrpe.stop();
if (disableReplication) {
// enable replication again so that we can verify replication
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
htd.setRegionReplication(regionReplication);
HTU.getAdmin().modifyTable(htd);
HTU.getAdmin().enableTable(toBeDisabledTable);
}
try {
// load some data to the to-be-dropped table
// load the data to the table
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
// now enable the replication
HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
table.close();
rl.close();
tableToBeDisabled.close();
HTU.deleteTableIfAny(toBeDisabledTable);
connection.close();
}
}
示例13
/**
* Tests that {@link ReplicationSource#terminate(String)} will timeout properly
*/
@Test
public void testTerminateTimeout() throws Exception {
ReplicationSource source = new ReplicationSource();
ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected void doStop() {
// not calling notifyStopped() here causes the caller of stop() to get a Future that never
// completes
}
};
replicationEndpoint.start();
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(new Runnable() {
@Override
public void run() {
source.terminate("testing source termination");
}
});
long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return future.isDone();
}
});
}
示例14
public VisibilityReplicationEndPointForTest(ReplicationEndpoint endpoint,
VisibilityLabelService visibilityLabelsService) {
super(endpoint, visibilityLabelsService);
}
示例15
/**
* @return the replication endpoint used by this replication source
*/
ReplicationEndpoint getReplicationEndpoint();
示例16
/**
* This will be called after the replication endpoint is instantiated.
* @param ctx the environment to interact with the framework and region server.
* @param endpoint - the base endpoint for replication
* @return the endpoint to use during replication.
*/
default ReplicationEndpoint postCreateReplicationEndPoint(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
return endpoint;
}