提问者:小点点

在MassTransit中使用TTL调度ActiveMQ消息


我正在使用mastransit来调度一条消息,以便稍后交付,使用ActiveMQ的调度插件。

var provider = new ActiveMqScheduleMessageProvider(_sendEndpointProvider);
var scheduler = new MessageScheduler(provider, _bus.Topology);

var isPublishAddressFetched = _bus.Topology
    .Publish<TMessage>()
    .TryGetPublishAddress(_bus.Address, out var publishAddress);
if (!isPublishAddressFetched)
    throw new InvalidOperationException("Publish address could not be fetched from " + _bus.Address);

using var combinedCancellationTokenSource = GetCombinedCancellationTokenWithTimeout(cancellationToken);

await scheduler.ScheduleSend(
    publishAddress,
    delay,
    message,
    combinedCancellationTokenSource.Token);

这很好,但我不知道如何在此场景中指定TTL(使用MessageScheduler)。有什么想法吗?


共1个答案

匿名用户

要在使用消息调度程序时设置SendContext的属性,可以创建一个执行管道并将其传递给调度程序方法。

还有,消息调度程序已经有了一个内置的SchedulePublish方法,所以没有理由做上面所有的工作-- 只要调用它就行了。

DateTime scheduledTime = DateTime.UtcNow + TimeSpan.FromMinutes(2);

await scheduler.SchedulePublish(scheduledTime, new Message(), 
    Pipe.Execute<SendContext<Message>>(x => x.TimeToLive  = TimeSpan.FromSeconds(30)));