提问者:小点点

数据流发布/子流作业卡住并不断修改消息的确认字符截止日期


我们一直将数据流与PythonBEAMSDK(2.34.02.35.0)一起用于两个不同的流作业,它们都具有Pub/Sub主题作为输入。其中一个作业在将消息写入云存储之前对其进行窗口输入和分组。另一个不应用任何窗口(但使用计时器)。这些作业目前处理的消息很少(大约每秒1条消息),并且每个作业都有一个工作人员。

在过去的几天里,这两项工作每天都被卡住一次,导致我们的系统中断。症状是:

  • Pub/Sub消息不再被确认。
  • 但是,订阅的拉取请求速率保持不变。
  • 消息上的确认字符截止时间不断延长。(1)
  • 后续步骤(在ReadFromPubSub之后)不处理任何消息。
  • 有时(但现在总是)我们会在日志中收到低级错误消息(2)(与我们的代码无关)。
  • 作业卡住之前是有关gRPC连接超时和其他网络相关错误的信息级日志。(3)然而,这些日志有时会在没有中断的情况下发生。
  • 数据流(正确)检测到系统延迟的增加。(4)

数据流不会触发任何自动缩放来响应订阅中堆积的消息。这可能是由于虚拟机的CPU保持在低水平(因为无论如何都不再处理消息)。此外,据我所知,消息只有在阶段结束时安全提交后才会被ACK(我们的两个作业都包含两个阶段)。但是数据流可能实际上仍然在读取消息(因为它会推动它们的确认字符截止日期)。但是这些消息永远不会离开ReadFromPubSub步骤。

手动删除VM辅助角色(从Compute Engine控制台中)会触发自动重新创建VM并取消冻结作业。Pub/Sub订阅的使用将恢复,一切恢复正常。

我们如何解决这个问题并确保我们的工作不会卡住?我们没有想法,因为数据流只产生很少的日志,我们的业务代码似乎对这种行为不负责任。

(1)数据流请求延长确认字符的截止日期。

(2)错误日志。ReadStream-process也可以是MergeBuckets-processNameOfAGroupByKeyStep是执行从阶段1到阶段2的步骤。

Stuck state: workflow-msec-<NameOfAGroupByKeyStep>/ReadStream-process, reporter: 0x4566bdcd4aa8, with stack:
--- Thread (name: futex-default-SDomainT/132) stack: ---
PC: @     0x55f23554d580  thread::(anonymous namespace)::FutexDomain::RawBlock()
@     0x55f23554d580  thread::(anonymous namespace)::FutexDomain::RawBlock()
@     0x55f23554cbbe  thread::(anonymous namespace)::FutexDomain::BlockCurrent()
@     0x55f2356e7aac  base::scheduling::Downcalls::UserSchedule()
@     0x55f2356e689e  AbslInternalPerThreadSemWait
@     0x55f235749e84  absl::CondVar::WaitCommon()
@     0x55f23554c221  thread::SelectUntil()
@     0x55f2345be1cb  dist_proc::dax::workflow::(anonymous namespace)::BatchingWindmillGetDataClient::GetData()
@     0x55f2345ac148  dist_proc::dax::workflow::StreamingRpcWindmillServiceStreamingServer::GetData()
@     0x55f234ae9f85  dist_proc::dax::workflow::WindmillServiceStreamingServerProxy::GetData()
@     0x55f234945ad3  dist_proc::dax::workflow::StateManager::PrefetchAll()
@     0x55f23494521b  dist_proc::dax::workflow::StateManager::ReadTag()
@     0x55f23493c3d6  dist_proc::dax::workflow::WindmillWindowingAPIDelegate::ReadKeyedStateImplVirtual()
@     0x55f2349420ed  dist_proc::dax::workflow::WindowingAPIDelegate::ReadKeyedStateImpl()
@     0x55f234941fd2  dist_proc::dax::workflow::WindmillCacheAccess::ReadKeyedStateImpl()
@     0x55f2346e6ec8  dist_proc::dax::workflow::CacheAccess::ReadStateFromCache<>()::{lambda()#1}::operator()()
@     0x55f2346e6e8e  absl::functional_internal::InvokeObject<>()
@     0x55f234942912  std::__u::__function::__policy_invoker<>::__call_impl<>()
@     0x55f2349c5927  dist_proc::dax::workflow::StateObjectsCache::ReadImpl()
@     0x55f2349c56f5  dist_proc::dax::workflow::StateObjectsCache::Read()

(3)中断之前的信息级日志,关于网络问题:

I0128 16:07:09.289409461     166 subchannel.cc:945]          subchannel 0x473cbc81a000 {address=ipv4:74.125.133.95:443, args=grpc.client_channel_factory=0x473cbfcb4690, grpc.default_authority=europe-west1-dataflowstreaming-pa.googleapis.com, grpc.dns_enable_srv_queries=1, grpc.http2_scheme=https, grpc.internal.channel_credentials=0x473cbf494f78, grpc.internal.security_connector=0x473cbb5f0230, grpc.internal.subchannel_pool=0x473cbf766870, grpc.keepalive_permit_without_calls=1, grpc.keepalive_time_ms=60000, grpc.keepalive_timeout_ms=60000, grpc.max_metadata_size=1048576, grpc.max_receive_message_length=-1, grpc.primary_user_agent=grpc-c++/1.44.0-dev, grpc.resource_quota=0x473cbf752ca8, grpc.server_uri=dns:///europe-west1-dataflowstreaming-pa.googleapis.com}: connect failed: {"created":"@1643386029.289272376","description":"Failed to connect to remote host: FD shutdown","file":"third_party/grpc/src/core/lib/iomgr/ev_poll_posix.cc","file_line":500,"grpc_status":14,"os_error":"Timeout occurred","referenced_errors":[{"created":"@1643386029.289234760","description":"connect() timed out","file":"third_party/grpc/src/core/lib/iomgr/tcp_client_posix.cc","file_line":114}],"target_address":"ipv4:74.125.133.95:443"}

(4)数据流正确检测系统滞后增加


共1个答案

匿名用户

在联系Google的支持团队后,我们从未得到问题所在的明确答案,但它停止了发生。我们刚刚得出结论,这是一个内部错误,最终由Dataflow团队修复。