我正在运行一个分析管道。
这是我的主题和订阅:
gcloud pubsub topics create pipeline-input
gcloud beta pubsub subscriptions create pipeline-input-sub \
--topic pipeline-input \
--ack-deadline 600 \
--expiration-period never \
--dead-letter-topic dead-letter
以下是我提取消息的方法:
import { PubSub, Message } from '@google-cloud/pubsub'
const pubSubClient = new PubSub()
const queue: Message[] = []
const populateQueue = async () => {
const subscription = pubSubClient.subscription('pipeline-input-sub', {
flowControl: {
maxMessages: 5
}
})
const messageHandler = async (message: Message) => {
queue.push(message)
}
subscription.on('message', messageHandler)
}
const processQueueMessage = () => {
const message = queue.shift()
try {
...
message.ack()
} catch {
...
message.nack()
}
processQueueMessage()
}
processQueueMessage()
处理时间约为7秒。
这是许多类似的重复案例之一。同一消息会向不同的 GCE 实例传递 5 (!!!) 次:
所有 5 次消息都已成功处理和 .ack()
ed。输出包含的消息比输入多 50%!我很清楚“至少一次”的行为,但我认为它可能会重复 0.01% 的消息,而不是其中的 50%。
主题输入100%没有重复。我通过云监视器验证了主题输入法和未确认消息的数量。数字匹配:在发布/订阅主题中没有重复。
更新:
预计会出现一些重复,尽管 50% 的重复率肯定很高。第一个问题是,这些是发布端重复项还是订阅端重复项?前者是在重试发布同一邮件时创建的,从而导致同一邮件的多次发布。这些消息将具有不同的消息 ID。后者是由将同一消息重新传递给订阅者引起的。这些消息具有相同的消息 ID(尽管确认 ID 不同)。
听起来您已经验证了这些是订阅端的重复。因此,正如您所提到的,可能的原因是过期的ack截止日期。问题是,为什么消息超过了ack截止日期?需要注意的一点是,当使用客户端库时,订阅中设置的ack截止日期不是使用的那个。相反,客户端库尝试根据客户端库设置和99%的ack延迟来优化ack截止日期。然后它会更新消息的租约,直到FlowControl
对象的max_lease_duration
属性传递到订阅
方法。这默认为一小时。
因此,为了使消息保持租用状态,客户端库必须能够向服务器发送修改AckDeadline
请求。重复请求的一个可能原因是客户端无法发送这些请求,可能是由于机器过载。运行此管道的机器是否在做其他工作?如果是这样,它们可能在CPU、内存或网络方面过载,无法发送修改AckDeadline
请求,也无法及时处理消息。
消息批处理也可能会影响您确认消息的能力。作为优化,发布/订阅系统存储成批消息的确认,而不是单个消息的确认。因此,批中的所有消息都必须得到确认,以便所有消息都得到确认。因此,如果一批中有五条消息,并且确认了其中的四条,但是没有确认最后一条消息,那么这五条消息都将被重新发送。有一些缓存可以尽量减少这种情况,但这仍然是可能的。有一篇媒体文章对此进行了更详细的讨论(参见“消息重新传递
批处理和重复之间的这种耦合是我们正在积极改进的。我希望这个问题在某个时候停止。同时,如果您可以控制发布者,则可以将批处理设置中的< code>max_messages属性设置为1,以防止消息的批处理。
如果这些都无济于事,最好打开一个支持案例,并提供一些重复消息的项目名称、订阅名称和消息 ID。工程师可以更详细地调查重新传递单个消息的原因。