我正在使用RabbitMQ Stomp的持久订阅(这里留档)。根据留档,当客户端使用相同的id重新连接(订阅)时,他应该得到所有排队的消息。然而,即使消息在服务器端排队,我也无法取回任何东西。下面是我正在使用的代码:
RabbitMQ版本:3.6.0
客户代码:
var sock;
var stomp;
var messageCount = 0;
var stompConnect = function() {
sock = new SockJS(options.url);
stomp = Stomp.over(sock);
stomp.connect({}, function(frame) {
debug('Connected: ', frame);
console.log(frame);
var id = stomp.subscribe('<url>' + options.source + "." + options.type + "." + options.id, function(d) {
console.log(messageCount);
messageCount = messageCount + 1;
}, {'auto-delete' : false, 'persistent' : true , 'id' : 'unique_id', 'ack' : 'client'});
}, function(err) {
console.log(err);
debug('error', err, err.stack);
setTimeout(stompConnect, 10);
});
};
服务器代码:
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(final MessageBrokerRegistry config) {
config.enableStompBrokerRelay("<endpoint>", "<endpoint>").setRelayHost(host)
.setSystemLogin(username).setSystemPasscode(password).setClientLogin(username)
.setClientPasscode(password);
}
@Override
public void registerStompEndpoints(final StompEndpointRegistry registry) {
registry.addEndpoint("<endpoint>").setAllowedOrigins("*").withSockJS();
}
}
我正在执行的步骤:
哎呀!失去了连接
我尝试了以下方法:
'ack':'client'
标头。这导致所有消息都被排挤出队列,但是没有一条到达客户端。我在完成这个SO答案后添加了这个标头。d. ack();
在函数中,在递增message ageCount之前。这会导致服务器端在会话关闭后尝试ack消息时出错(由于断开连接)。此外,在某些情况下,当我重新连接排队消息的数量少于100时,我能够获得所有消息。然而,一旦它超过100,什么都不会发生(不确定这是否与问题有关)。
我不知道问题是否存在于服务器端或客户端。有输入吗?
最后,我能够找到(并解决)问题。我们使用nginx作为代理,它的proxy_buffering
设置为on
(默认值),看看这里的留档。
它是这么说的:
启用缓冲后,nginx会尽快收到代理服务器的响应,并将其保存到proxy_buffer_size和proxy_buffers指令设置的缓冲区中。
因此,消息被缓冲(延迟),导致断开连接。我们尝试绕过nginx,它工作正常,然后我们禁用了代理缓冲,现在似乎工作正常,即使使用nginx代理。