Java源码示例:com.alibaba.rocketmq.common.MQVersion
示例1
public void reportConsumerRunningInfo(final String consumerGroup) throws InterruptedException,
MQBrokerException, RemotingException, MQClientException {
ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>();
for (Connection c : cc.getConnectionSet()) {
String clientId = c.getClientId();
if (c.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
continue;
}
try {
ConsumerRunningInfo info =
defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false);
infoMap.put(clientId, info);
}
catch (Exception e) {
}
}
if (!infoMap.isEmpty()) {
this.monitorListener.reportConsumerRunningInfo(infoMap);
}
}
示例2
private HashMap<String, String> prepareRuntimeInfo() {
//DefaultMessageStore.getRuntimeInfo
HashMap<String, String> runtimeInfo = this.brokerController.getMessageStore().getRuntimeInfo();
runtimeInfo.put("brokerVersionDesc", MQVersion.getVersionDesc(MQVersion.CurrentVersion));
runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CurrentVersion));
runtimeInfo.put("msgPutTotalYesterdayMorning",
String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning()));
runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning()));
runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow()));
runtimeInfo.put("msgGetTotalYesterdayMorning",
String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning()));
runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning()));
runtimeInfo.put("msgGetTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow()));
runtimeInfo.put("sendThreadPoolQueueSize", String.valueOf(this.brokerController.getSendThreadPoolQueue().size()));
runtimeInfo.put("sendThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity()));
runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
return runtimeInfo;
}
示例3
public void reportConsumerRunningInfo(final String consumerGroup) throws InterruptedException,
MQBrokerException, RemotingException, MQClientException {
ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>();
for (Connection c : cc.getConnectionSet()) {
String clientId = c.getClientId();
if (c.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
continue;
}
try {
ConsumerRunningInfo info =
defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false);
infoMap.put(clientId, info);
} catch (Exception e) {
}
}
if (!infoMap.isEmpty()) {
this.monitorListener.reportConsumerRunningInfo(infoMap);
}
}
示例4
public void reportConsumerRunningInfo(final String consumerGroup)
throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>();
for (Connection c : cc.getConnectionSet()) {
String clientId = c.getClientId();
// 低于3.1.8版本,不支持此功能
if (c.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
continue;
}
try {
ConsumerRunningInfo info =
defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false);
infoMap.put(clientId, info);
}
catch (Exception e) {
}
}
if (!infoMap.isEmpty()) {
this.monitorListener.reportConsumerRunningInfo(infoMap);
}
}
示例5
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl =
new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig.getUnitName());
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);
this.pullMessageService = new PullMessageService(this);
this.rebalanceService = new RebalanceService(this);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}",//
this.instanceIndex, //
this.clientId, //
this.clientConfig, //
MQVersion.getVersionDesc(MQVersion.CurrentVersion), RemotingCommand.getSerializeTypeConfigInThisServer());
}
示例6
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String group = commandLine.getOptionValue('g').trim();
String topic = commandLine.getOptionValue('t').trim();
ProducerConnection pc = defaultMQAdminExt.examineProducerConnectionInfo(group, topic);
int i = 1;
for (Connection conn : pc.getConnectionSet()) {
System.out.printf("%04d %-32s %-22s %-8s %s\n",//
i++,//
conn.getClientId(),//
conn.getClientAddr(),//
conn.getLanguage(),//
MQVersion.getVersionDesc(conn.getVersion())//
);
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
defaultMQAdminExt.shutdown();
}
}
示例7
static private void setHeaders(HttpURLConnection conn, List<String> headers, String encoding) {
if (null != headers) {
for (Iterator<String> iter = headers.iterator(); iter.hasNext();) {
conn.addRequestProperty(iter.next(), iter.next());
}
}
conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CurrentVersion));
conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
String ts = String.valueOf(System.currentTimeMillis());
conn.addRequestProperty("Metaq-Client-RequestTS", ts);
}
示例8
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);
this.pullMessageService = new PullMessageService(this);
this.rebalanceService = new RebalanceService(this);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", //
this.instanceIndex, //
this.clientId, //
this.clientConfig, //
MQVersion.getVersionDesc(MQVersion.CurrentVersion), RemotingCommand.getSerializeTypeConfigInThisServer());
}
示例9
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String group = commandLine.getOptionValue('g').trim();
String topic = commandLine.getOptionValue('t').trim();
ProducerConnection pc = defaultMQAdminExt.examineProducerConnectionInfo(group, topic);
int i = 1;
for (Connection conn : pc.getConnectionSet()) {
System.out.printf("%04d %-32s %-22s %-8s %s%n",//
i++,//
conn.getClientId(),//
conn.getClientAddr(),//
conn.getLanguage(),//
MQVersion.getVersionDesc(conn.getVersion())//
);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
defaultMQAdminExt.shutdown();
}
}
示例10
private HashMap<String, String> prepareRuntimeInfo() {
HashMap<String, String> runtimeInfo = this.brokerController.getMessageStore().getRuntimeInfo();
runtimeInfo.put("brokerVersionDesc", MQVersion.getVersionDesc(MQVersion.CurrentVersion));
runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CurrentVersion));
runtimeInfo.put("msgPutTotalYesterdayMorning",
String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning()));
runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning()));
runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow()));
runtimeInfo.put("msgGetTotalYesterdayMorning",
String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning()));
runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning()));
runtimeInfo.put("msgGetTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow()));
runtimeInfo.put("sendThreadPoolQueueSize", String.valueOf(this.brokerController.getSendThreadPoolQueue().size()));
runtimeInfo.put("sendThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity()));
runtimeInfo.put("pullThreadPoolQueueSize", String.valueOf(this.brokerController.getPullThreadPoolQueue().size()));
runtimeInfo.put("pullThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));
runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue()));
runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue()));
runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
return runtimeInfo;
}
示例11
static private void setHeaders(HttpURLConnection conn, List<String> headers, String encoding) {
if (null != headers) {
for (Iterator<String> iter = headers.iterator(); iter.hasNext(); ) {
conn.addRequestProperty(iter.next(), iter.next());
}
}
conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CurrentVersion));
conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
String ts = String.valueOf(System.currentTimeMillis());
conn.addRequestProperty("Metaq-Client-RequestTS", ts);
}
示例12
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig
.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl =
new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook);
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);
this.pullMessageService = new PullMessageService(this);
this.rebalanceService = new RebalanceService(this);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}", //
this.instanceIndex, //
this.clientId, //
this.clientConfig, //
MQVersion.getVersionDesc(MQVersion.CurrentVersion));
}
示例13
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String group = commandLine.getOptionValue('g').trim();
String topic = commandLine.getOptionValue('t').trim();
ProducerConnection pc = defaultMQAdminExt.examineProducerConnectionInfo(group, topic);
int i = 1;
for (Connection conn : pc.getConnectionSet()) {
System.out.printf("%04d %-32s %-22s %-8s %s\n", //
i++, //
conn.getClientId(), //
conn.getClientAddr(), //
conn.getLanguage(), //
MQVersion.getVersionDesc(conn.getVersion())//
);
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
defaultMQAdminExt.shutdown();
}
}
示例14
private HashMap<String, String> prepareRuntimeInfo() {
HashMap<String, String> runtimeInfo = this.brokerController.getMessageStore().getRuntimeInfo();
runtimeInfo.put("brokerVersionDesc", MQVersion.getVersionDesc(MQVersion.CurrentVersion));
runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CurrentVersion));
runtimeInfo.put("msgPutTotalYesterdayMorning",
String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning()));
runtimeInfo.put("msgPutTotalTodayMorning",
String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning()));
runtimeInfo.put("msgPutTotalTodayNow",
String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow()));
runtimeInfo.put("msgGetTotalYesterdayMorning",
String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning()));
runtimeInfo.put("msgGetTotalTodayMorning",
String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning()));
runtimeInfo.put("msgGetTotalTodayNow",
String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow()));
runtimeInfo.put("sendThreadPoolQueueSize",
String.valueOf(this.brokerController.getSendThreadPoolQueue().size()));
runtimeInfo.put("sendThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity()));
return runtimeInfo;
}
示例15
static private void setHeaders(HttpURLConnection conn, List<String> headers, String encoding) {
if (null != headers) {
for (Iterator<String> iter = headers.iterator(); iter.hasNext();) {
conn.addRequestProperty(iter.next(), iter.next());
}
}
conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CurrentVersion));
conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
// 其它
String ts = String.valueOf(System.currentTimeMillis());
conn.addRequestProperty("Metaq-Client-RequestTS", ts);
}
示例16
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",//
request.getCode(), //
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
request);
}
switch (request.getCode()) {
//sh mqadmin updateKvConfig相关
//sh mqadmin updateKvConfig相关
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
/*注册broker到nameserver. */
//broker感知到又新的topic 获取topic更新 删除,都会通知到所有nameserver
// 都会发送该报文,除了通知为,也是broker与nameserver的保活报文//把broker维护的topic配置推送给namserver, 同时把broker注册到Nameserver 或者BrokerController.start 每隔30s定时时间到,
// 都会发送该报文,除了通知为,也是broker与nameserver的保活报文
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { //broker版本高于3.0.11, 则和过滤服务器一起注册。
return this.registerBrokerWithFilterServer(ctx, request);
}
else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
//获取topic路由信息。
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
default:
break;
}
return null;
}
示例17
public String versionDesc() {
if (this.count != 0) {
return MQVersion.getVersionDesc(this.version);
}
return "";
}
示例18
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String group = commandLine.getOptionValue('g').trim();
ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group);
int i = 1;
for (Connection conn : cc.getConnectionSet()) {
System.out.printf("%03d %-32s %-22s %-8s %s\n",//
i++,//
conn.getClientId(),//
conn.getClientAddr(),//
conn.getLanguage(),//
MQVersion.getVersionDesc(conn.getVersion())//
);
}
System.out.println("\nBelow is subscription:");
Iterator<Entry<String, SubscriptionData>> it = cc.getSubscriptionTable().entrySet().iterator();
i = 1;
while (it.hasNext()) {
Entry<String, SubscriptionData> entry = it.next();
SubscriptionData sd = entry.getValue();
System.out.printf("%03d Topic: %-40s SubExpression: %s\n",//
i++,//
sd.getTopic(),//
sd.getSubString()//
);
}
System.out.println("");
System.out.printf("ConsumeType: %s\n", cc.getConsumeType());
System.out.printf("MessageModel: %s\n", cc.getMessageModel());
System.out.printf("ConsumeFromWhere: %s\n", cc.getConsumeFromWhere());
}
catch (Exception e) {
e.printStackTrace();
}
finally {
defaultMQAdminExt.shutdown();
}
}
示例19
public String versionDesc() {
if (this.count != 0) {
return MQVersion.getVersionDesc(this.version);
}
return "";
}
示例20
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",//
request.getCode(), //
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
default:
break;
}
return null;
}
示例21
public String versionDesc() {
if (this.count != 0) {
return MQVersion.getVersionDesc(this.version);
}
return "";
}
示例22
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String group = commandLine.getOptionValue('g').trim();
ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group);
int i = 1;
for (Connection conn : cc.getConnectionSet()) {
System.out.printf("%03d %-32s %-22s %-8s %s%n",//
i++,//
conn.getClientId(),//
conn.getClientAddr(),//
conn.getLanguage(),//
MQVersion.getVersionDesc(conn.getVersion())//
);
}
System.out.println("\nBelow is subscription:");
Iterator<Entry<String, SubscriptionData>> it = cc.getSubscriptionTable().entrySet().iterator();
i = 1;
while (it.hasNext()) {
Entry<String, SubscriptionData> entry = it.next();
SubscriptionData sd = entry.getValue();
System.out.printf("%03d Topic: %-40s SubExpression: %s%n",//
i++,//
sd.getTopic(),//
sd.getSubString()//
);
}
System.out.println("");
System.out.printf("ConsumeType: %s%n", cc.getConsumeType());
System.out.printf("MessageModel: %s%n", cc.getMessageModel());
System.out.printf("ConsumeFromWhere: %s%n", cc.getConsumeFromWhere());
} catch (Exception e) {
e.printStackTrace();
} finally {
defaultMQAdminExt.shutdown();
}
}
示例23
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}", //
request.getCode(), //
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
// 新版本Broker,支持Filter Server
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
// 低版本Broker,不支持Filter Server
else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KV_CONFIG_BY_VALUE:
return getKVConfigByValue(ctx, request);
case RequestCode.DELETE_KV_CONFIG_BY_VALUE:
return deleteKVConfigByValue(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
default:
break;
}
return null;
}
示例24
public String versionDesc() {
if (this.count != 0) {
return MQVersion.getVersionDesc(this.version);
}
return "";
}
示例25
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String group = commandLine.getOptionValue('g').trim();
ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group);
// 打印连接
int i = 1;
for (Connection conn : cc.getConnectionSet()) {
System.out.printf("%03d %-32s %-22s %-8s %s\n", //
i++, //
conn.getClientId(), //
conn.getClientAddr(), //
conn.getLanguage(), //
MQVersion.getVersionDesc(conn.getVersion())//
);
}
// 打印订阅关系
System.out.println("\nBelow is subscription:");
Iterator<Entry<String, SubscriptionData>> it = cc.getSubscriptionTable().entrySet().iterator();
i = 1;
while (it.hasNext()) {
Entry<String, SubscriptionData> entry = it.next();
SubscriptionData sd = entry.getValue();
System.out.printf("%03d Topic: %-40s SubExpression: %s\n", //
i++, //
sd.getTopic(), //
sd.getSubString()//
);
}
// 打印其他订阅参数
System.out.println("");
System.out.printf("ConsumeType: %s\n", cc.getConsumeType());
System.out.printf("MessageModel: %s\n", cc.getMessageModel());
System.out.printf("ConsumeFromWhere: %s\n", cc.getConsumeFromWhere());
}
catch (Exception e) {
e.printStackTrace();
}
finally {
defaultMQAdminExt.shutdown();
}
}
示例26
/**
* Broker 主动通知 Consumer,offset 需要进行重置列表发生变化
*/
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
return response;
}
Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
long consumerOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
if (-1 == consumerOffset) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("THe consumer group <%s> not exist", group));
return response;
}
long timeStampOffset =
this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
if (isForce || timeStampOffset < consumerOffset) {
offsetTable.put(mq, timeStampOffset);
}
else {
offsetTable.put(mq, consumerOffset);
}
}
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
ResetOffsetBody body = new ResetOffsetBody();
body.setOffsetTable(offsetTable);
request.setBody(body.encode());
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
// Consumer在线
if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
for (Channel channel : channelInfoTable.keySet()) {
int version = channelInfoTable.get(channel).getVersion();
if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 5000);
log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
new Object[] { topic, group, channelInfoTable.get(channel).getClientId() });
}
catch (Exception e) {
log.error("[reset-offset] reset offset exception. topic={}, group={}",
new Object[] { topic, group }, e);
}
}
else {
// 如果有一个客户端是不支持该功能的,则直接返回错误,需要应用方升级。
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the client does not support this feature. version="
+ MQVersion.getVersionDesc(version));
log.warn("[reset-offset] the client does not support this feature. version={}",
RemotingHelper.parseChannelRemoteAddr(channel), MQVersion.getVersionDesc(version));
return response;
}
}
}
// Consumer不在线
else {
String errorInfo = String.format(
"Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d", //
requestHeader.getGroup(), //
requestHeader.getTopic(), //
requestHeader.getTimestamp());
log.error(errorInfo);
response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
response.setRemark(errorInfo);
return response;
}
response.setCode(ResponseCode.SUCCESS);
ResetOffsetBody resBody = new ResetOffsetBody();
resBody.setOffsetTable(offsetTable);
response.setBody(resBody.encode());
return response;
}