@SuppressWarnings("deprecation")
private synchronized void commonInitialization(
DistributedLogConfiguration conf,
String ledgersPath,
EventLoopGroup eventLoopGroup,
StatsLogger statsLogger, HashedWheelTimer requestTimer)
throws IOException, InterruptedException {
ClientConfiguration bkConfig = new ClientConfiguration();
bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout());
bkConfig.setReadTimeout(conf.getBKClientReadTimeout());
bkConfig.setZkLedgersRootPath(ledgersPath);
bkConfig.setZkTimeout(conf.getBKClientZKSessionTimeoutMilliSeconds());
bkConfig.setNumWorkerThreads(conf.getBKClientNumberWorkerThreads());
bkConfig.setEnsemblePlacementPolicy(RegionAwareEnsemblePlacementPolicy.class);
bkConfig.setZkRequestRateLimit(conf.getBKClientZKRequestRateLimit());
bkConfig.setProperty(RegionAwareEnsemblePlacementPolicy.REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME,
DistributedLogConstants.DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME);
// reload configuration from dl configuration with settings prefixed with 'bkc.'
ConfUtils.loadConfiguration(bkConfig, conf, "bkc.");
Class<? extends DNSToSwitchMapping> dnsResolverCls;
try {
dnsResolverCls = conf.getEnsemblePlacementDnsResolverClass();
} catch (ConfigurationException e) {
LOG.error("Failed to load bk dns resolver : ", e);
throw new IOException("Failed to load bk dns resolver : ", e);
}
final DNSToSwitchMapping dnsResolver =
NetUtils.getDNSResolver(dnsResolverCls, conf.getBkDNSResolverOverrides());
try {
this.bkc = BookKeeper.forConfig(bkConfig)
.setZookeeper(zkc.get())
.setEventLoopGroup(eventLoopGroup)
.setStatsLogger(statsLogger)
.dnsResolver(dnsResolver)
.requestTimer(requestTimer)
.featureProvider(featureProvider.orNull())
.build();
} catch (BKException bke) {
throw new IOException(bke);
}
}
@SuppressWarnings("deprecation")
private synchronized void commonInitialization(
DistributedLogConfiguration conf, String ledgersPath,
ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer,
boolean registerExpirationHandler)
throws IOException, InterruptedException, KeeperException {
ClientConfiguration bkConfig = new ClientConfiguration();
bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout());
bkConfig.setReadTimeout(conf.getBKClientReadTimeout());
bkConfig.setZkLedgersRootPath(ledgersPath);
bkConfig.setZkTimeout(conf.getBKClientZKSessionTimeoutMilliSeconds());
bkConfig.setNumWorkerThreads(conf.getBKClientNumberWorkerThreads());
bkConfig.setEnsemblePlacementPolicy(RegionAwareEnsemblePlacementPolicy.class);
bkConfig.setZkRequestRateLimit(conf.getBKClientZKRequestRateLimit());
bkConfig.setProperty(RegionAwareEnsemblePlacementPolicy.REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME,
DistributedLogConstants.DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME);
// reload configuration from dl configuration with settings prefixed with 'bkc.'
ConfUtils.loadConfiguration(bkConfig, conf, "bkc.");
Class<? extends DNSToSwitchMapping> dnsResolverCls;
try {
dnsResolverCls = conf.getEnsemblePlacementDnsResolverClass();
} catch (ConfigurationException e) {
LOG.error("Failed to load bk dns resolver : ", e);
throw new IOException("Failed to load bk dns resolver : ", e);
}
final DNSToSwitchMapping dnsResolver =
NetUtils.getDNSResolver(dnsResolverCls, conf.getBkDNSResolverOverrides());
this.bkc = BookKeeper.newBuilder()
.config(bkConfig)
.zk(zkc.get())
.channelFactory(channelFactory)
.statsLogger(statsLogger)
.dnsResolver(dnsResolver)
.requestTimer(requestTimer)
.featureProvider(featureProvider.orNull())
.build();
if (registerExpirationHandler) {
sessionExpireWatcher = this.zkc.registerExpirationHandler(this);
}
}