提问者:小点点

为什么我收到 50% 的 GCP 发布/订阅消息重复?


我正在运行一个分析管道。

  • 吞吐量为每秒约11条消息。
  • 我的Pub/Sub主题包含2M预定的消息。
  • 80个GCE实例正在并行拉取消息。

这是我的主题和订阅:

gcloud pubsub topics create pipeline-input

gcloud beta pubsub subscriptions create pipeline-input-sub \
    --topic pipeline-input \
    --ack-deadline 600 \
    --expiration-period never \
    --dead-letter-topic dead-letter

以下是我提取消息的方法:

import { PubSub, Message } from '@google-cloud/pubsub'

const pubSubClient = new PubSub()

const queue: Message[] = []

const populateQueue = async () => {
  const subscription = pubSubClient.subscription('pipeline-input-sub', {
    flowControl: {
      maxMessages: 5
    }
  })
  const messageHandler = async (message: Message) => {
    queue.push(message)
  }
  subscription.on('message', messageHandler)
}

const processQueueMessage = () => {
  const message = queue.shift()
  try {
    ...
    message.ack()
  } catch {
    ...
    message.nack()
  }
  processQueueMessage()
}

processQueueMessage()

处理时间约为7秒。

这是许多类似的重复案例之一。同一消息会向不同的 GCE 实例传递 5 (!!!) 次:

  • 03:37:42.377
  • 03:45:20.883
  • 03:48:14.262
  • 04:01:33.848
  • 05:57:45.141

所有 5 次消息都已成功处理和 .ack() ed。输出包含的消息比输入多 50%!我很清楚“至少一次”的行为,但我认为它可能会重复 0.01% 的消息,而不是其中的 50%。

主题输入100%没有重复。我通过云监视器验证了主题输入法和未确认消息的数量。数字匹配:在发布/订阅主题中没有重复。

更新:

    < li >看起来所有这些副本都是由于ack截止日期过期而创建的。我100%确定我在600秒的期限前确认了99.9%的邮件。

共1个答案

匿名用户

预计会出现一些重复,尽管 50% 的重复率肯定很高。第一个问题是,这些是发布端重复项还是订阅端重复项?前者是在重试发布同一邮件时创建的,从而导致同一邮件的多次发布。这些消息将具有不同的消息 ID。后者是由将同一消息重新传递给订阅者引起的。这些消息具有相同的消息 ID(尽管确认 ID 不同)。

听起来您已经验证了这些是订阅端的重复。因此,正如您所提到的,可能的原因是过期的ack截止日期。问题是,为什么消息超过了ack截止日期?需要注意的一点是,当使用客户端库时,订阅中设置的ack截止日期不是使用的那个。相反,客户端库尝试根据客户端库设置和99%的ack延迟来优化ack截止日期。然后它会更新消息的租约,直到FlowControl对象的max_lease_duration属性传递到订阅方法。这默认为一小时。

因此,为了使消息保持租用状态,客户端库必须能够向服务器发送修改AckDeadline请求。重复请求的一个可能原因是客户端无法发送这些请求,可能是由于机器过载。运行此管道的机器是否在做其他工作?如果是这样,它们可能在CPU、内存或网络方面过载,无法发送修改AckDeadline请求,也无法及时处理消息。

消息批处理也可能会影响您确认消息的能力。作为优化,发布/订阅系统存储成批消息的确认,而不是单个消息的确认。因此,批中的所有消息都必须得到确认,以便所有消息都得到确认。因此,如果一批中有五条消息,并且确认了其中的四条,但是没有确认最后一条消息,那么这五条消息都将被重新发送。有一些缓存可以尽量减少这种情况,但这仍然是可能的。有一篇媒体文章对此进行了更详细的讨论(参见“消息重新传递

批处理和重复之间的这种耦合是我们正在积极改进的。我希望这个问题在某个时候停止。同时,如果您可以控制发布者,则可以将批处理设置中的< code>max_messages属性设置为1,以防止消息的批处理。

如果这些都无济于事,最好打开一个支持案例,并提供一些重复消息的项目名称、订阅名称和消息 ID。工程师可以更详细地调查重新传递单个消息的原因。