Java源码示例:org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader

示例1
public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group,
    final long timeoutMillis)
    throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
    MQBrokerException {
    QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
            return consumeTimeSpanBody.getConsumeTimeSpanSet();
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
示例2
public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group,
    final long timeoutMillis)
    throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
    MQBrokerException {
    QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
            return consumeTimeSpanBody.getConsumeTimeSpanSet();
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
示例3
public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group,
    final long timeoutMillis)
    throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
    MQBrokerException {
    QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader);

    String acturallyAddr = getActurallyBrokerAddr(addr);
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), acturallyAddr),
        request, timeoutMillis);
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
            return consumeTimeSpanBody.getConsumeTimeSpanSet();
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
示例4
public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group,
    final long timeoutMillis)
    throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
    MQBrokerException {
    QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
            return consumeTimeSpanBody.getConsumeTimeSpanSet();
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
示例5
public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis)
    throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
    MQBrokerException {
    QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
            return consumeTimeSpanBody.getConsumeTimeSpanSet();
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
示例6
public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group,
    final long timeoutMillis)
    throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
    MQBrokerException {
    QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
            return consumeTimeSpanBody.getConsumeTimeSpanSet();
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
示例7
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    QueryConsumeTimeSpanRequestHeader requestHeader =
        (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);

    final String topic = requestHeader.getTopic();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
    if (null == topicConfig) {
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("topic[" + topic + "] not exist");
        return response;
    }

    List<QueueTimeSpan> timeSpanSet = new ArrayList<QueueTimeSpan>();
    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        QueueTimeSpan timeSpan = new QueueTimeSpan();
        MessageQueue mq = new MessageQueue();
        mq.setTopic(topic);
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setQueueId(i);
        timeSpan.setMessageQueue(mq);

        long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i);
        timeSpan.setMinTimeStamp(minTime);

        long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
        long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1);
        timeSpan.setMaxTimeStamp(maxTime);

        long consumeTime;
        long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
            requestHeader.getGroup(), topic, i);
        if (consumerOffset > 0) {
            consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1);
        } else {
            consumeTime = minTime;
        }
        timeSpan.setConsumeTimeStamp(consumeTime);

        long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), i);
        if (consumerOffset < maxBrokerOffset) {
            long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset);
            timeSpan.setDelayTime(System.currentTimeMillis() - nextTime);
        }
        timeSpanSet.add(timeSpan);
    }

    QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new QueryConsumeTimeSpanBody();
    queryConsumeTimeSpanBody.setConsumeTimeSpanSet(timeSpanSet);
    response.setBody(queryConsumeTimeSpanBody.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
示例8
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        QueryConsumeTimeSpanRequestHeader requestHeader =
            (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);

        final String topic = requestHeader.getTopic();
//        按topic从缓存中查询出topic配置信息=》
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (null == topicConfig) {
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("topic[" + topic + "] not exist");
            return response;
        }

        List<QueueTimeSpan> timeSpanSet = new ArrayList<QueueTimeSpan>();
        for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
            QueueTimeSpan timeSpan = new QueueTimeSpan();
            MessageQueue mq = new MessageQueue();
            mq.setTopic(topic);
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setQueueId(i);
            timeSpan.setMessageQueue(mq);

//            按topic、queueId查询最早消息的时间=》
            long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i);
            timeSpan.setMinTimeStamp(minTime);

//            按topic、queueId查询最大的offset=》
            long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
//            按topic、queueId、consumerOffset查询最大时间=》
            long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1);
            timeSpan.setMaxTimeStamp(maxTime);

            long consumeTime;
//            按consumerGroup、topic、queueId查询consumerOffset=》
            long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
                requestHeader.getGroup(), topic, i);
            if (consumerOffset > 0) {
//                按topic、queueId、consumerOffset查询消费时间=》
                consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1);
            } else {
                consumeTime = minTime;
            }
            timeSpan.setConsumeTimeStamp(consumeTime);

//            按topic、queueId查询最大的offset=》
            long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), i);
            if (consumerOffset < maxBrokerOffset) {
//                按topic、queueId、consumerOffset查询消费时间=》
                long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset);
                timeSpan.setDelayTime(System.currentTimeMillis() - nextTime);
            }
            timeSpanSet.add(timeSpan);
        }

        QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new QueryConsumeTimeSpanBody();
        queryConsumeTimeSpanBody.setConsumeTimeSpanSet(timeSpanSet);
        response.setBody(queryConsumeTimeSpanBody.encode());
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
 
示例9
/**
 * 查询消费的timespan
 * @param ctx ;
 * @param request ;
 * @return ;
 * @throws RemotingCommandException ;
 */
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {

    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    QueryConsumeTimeSpanRequestHeader requestHeader =
        (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);

    final String topic = requestHeader.getTopic();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
    if (null == topicConfig) {
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("topic[" + topic + "] not exist");
        return response;
    }

    List<QueueTimeSpan> timeSpanSet = new ArrayList<QueueTimeSpan>();
    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        QueueTimeSpan timeSpan = new QueueTimeSpan();
        MessageQueue mq = new MessageQueue();
        mq.setTopic(topic);
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setQueueId(i);
        timeSpan.setMessageQueue(mq);

        long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i);
        timeSpan.setMinTimeStamp(minTime);

        long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
        long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1);
        timeSpan.setMaxTimeStamp(maxTime);

        long consumeTime;
        long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
            requestHeader.getGroup(), topic, i);
        if (consumerOffset > 0) {
            consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1);
        } else {
            consumeTime = minTime;
        }
        timeSpan.setConsumeTimeStamp(consumeTime);

        long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), i);
        if (consumerOffset < maxBrokerOffset) {
            long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset);
            timeSpan.setDelayTime(System.currentTimeMillis() - nextTime);
        }
        timeSpanSet.add(timeSpan);
    }

    QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new QueryConsumeTimeSpanBody();
    queryConsumeTimeSpanBody.setConsumeTimeSpanSet(timeSpanSet);
    response.setBody(queryConsumeTimeSpanBody.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
示例10
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    QueryConsumeTimeSpanRequestHeader requestHeader =
        (QueryConsumeTimeSpanRequestHeader)request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);

    final String topic = requestHeader.getTopic();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
    if (null == topicConfig) {
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("topic[" + topic + "] not exist");
        return response;
    }

    List<QueueTimeSpan> timeSpanSet = new ArrayList<QueueTimeSpan>();
    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        QueueTimeSpan timeSpan = new QueueTimeSpan();
        MessageQueue mq = new MessageQueue();
        mq.setTopic(topic);
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setQueueId(i);
        timeSpan.setMessageQueue(mq);

        long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i);
        timeSpan.setMinTimeStamp(minTime);

        long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
        long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1);
        timeSpan.setMaxTimeStamp(maxTime);

        long consumeTime;
        long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
            requestHeader.getGroup(), topic, i);
        if (consumerOffset > 0) {
            consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1);
        } else {
            consumeTime = minTime;
        }
        timeSpan.setConsumeTimeStamp(consumeTime);

        long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), i);
        if (consumerOffset < maxBrokerOffset) {
            long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset);
            timeSpan.setDelayTime(System.currentTimeMillis() - nextTime);
        }
        timeSpanSet.add(timeSpan);
    }

    QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new QueryConsumeTimeSpanBody();
    queryConsumeTimeSpanBody.setConsumeTimeSpanSet(timeSpanSet);
    response.setBody(queryConsumeTimeSpanBody.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
示例11
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    QueryConsumeTimeSpanRequestHeader requestHeader =
        (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);

    final String topic = requestHeader.getTopic();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
    if (null == topicConfig) {
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("topic[" + topic + "] not exist");
        return response;
    }

    List<QueueTimeSpan> timeSpanSet = new ArrayList<QueueTimeSpan>();
    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        QueueTimeSpan timeSpan = new QueueTimeSpan();
        MessageQueue mq = new MessageQueue();
        mq.setTopic(topic);
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setQueueId(i);
        timeSpan.setMessageQueue(mq);

        long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i);
        timeSpan.setMinTimeStamp(minTime);

        long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
        long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1);
        timeSpan.setMaxTimeStamp(maxTime);

        long consumeTime;
        long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
            requestHeader.getGroup(), topic, i);
        if (consumerOffset > 0) {
            consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1);
        } else {
            consumeTime = minTime;
        }
        timeSpan.setConsumeTimeStamp(consumeTime);

        long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), i);
        if (consumerOffset < maxBrokerOffset) {
            long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset);
            timeSpan.setDelayTime(System.currentTimeMillis() - nextTime);
        }
        timeSpanSet.add(timeSpan);
    }

    QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new QueryConsumeTimeSpanBody();
    queryConsumeTimeSpanBody.setConsumeTimeSpanSet(timeSpanSet);
    response.setBody(queryConsumeTimeSpanBody.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
示例12
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    QueryConsumeTimeSpanRequestHeader requestHeader =
        (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);

    final String topic = requestHeader.getTopic();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
    if (null == topicConfig) {
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("topic[" + topic + "] not exist");
        return response;
    }

    List<QueueTimeSpan> timeSpanSet = new ArrayList<QueueTimeSpan>();
    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        QueueTimeSpan timeSpan = new QueueTimeSpan();
        MessageQueue mq = new MessageQueue();
        mq.setTopic(topic);
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setQueueId(i);
        timeSpan.setMessageQueue(mq);

        long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i);
        timeSpan.setMinTimeStamp(minTime);

        long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
        long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1);
        timeSpan.setMaxTimeStamp(maxTime);

        long consumeTime;
        long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
            requestHeader.getGroup(), topic, i);
        if (consumerOffset > 0) {
            consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1);
        } else {
            consumeTime = minTime;
        }
        timeSpan.setConsumeTimeStamp(consumeTime);

        long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), i);
        if (consumerOffset < maxBrokerOffset) {
            long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset);
            timeSpan.setDelayTime(System.currentTimeMillis() - nextTime);
        }
        timeSpanSet.add(timeSpan);
    }

    QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new QueryConsumeTimeSpanBody();
    queryConsumeTimeSpanBody.setConsumeTimeSpanSet(timeSpanSet);
    response.setBody(queryConsumeTimeSpanBody.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
示例13
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    QueryConsumeTimeSpanRequestHeader requestHeader =
        (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);

    final String topic = requestHeader.getTopic();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
    if (null == topicConfig) {
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("topic[" + topic + "] not exist");
        return response;
    }

    List<QueueTimeSpan> timeSpanSet = new ArrayList<QueueTimeSpan>();
    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        QueueTimeSpan timeSpan = new QueueTimeSpan();
        MessageQueue mq = new MessageQueue();
        mq.setTopic(topic);
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setQueueId(i);
        timeSpan.setMessageQueue(mq);

        long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i);
        timeSpan.setMinTimeStamp(minTime);

        long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
        long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1);
        timeSpan.setMaxTimeStamp(maxTime);

        long consumeTime;
        long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
            requestHeader.getGroup(), topic, i);
        if (consumerOffset > 0) {
            consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1);
        } else {
            consumeTime = minTime;
        }
        timeSpan.setConsumeTimeStamp(consumeTime);

        long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), i);
        if (consumerOffset < maxBrokerOffset) {
            long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset);
            timeSpan.setDelayTime(System.currentTimeMillis() - nextTime);
        }
        timeSpanSet.add(timeSpan);
    }

    QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new QueryConsumeTimeSpanBody();
    queryConsumeTimeSpanBody.setConsumeTimeSpanSet(timeSpanSet);
    response.setBody(queryConsumeTimeSpanBody.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}