我们一直将数据流与PythonBEAMSDK(2.34.0
和2.35.0
)一起用于两个不同的流作业,它们都具有Pub/Sub主题作为输入。其中一个作业在将消息写入云存储之前对其进行窗口输入和分组。另一个不应用任何窗口(但使用计时器)。这些作业目前处理的消息很少(大约每秒1条消息),并且每个作业都有一个工作人员。
在过去的几天里,这两项工作每天都被卡住一次,导致我们的系统中断。症状是:
ReadFromPubSub
之后)不处理任何消息。数据流不会触发任何自动缩放来响应订阅中堆积的消息。这可能是由于虚拟机的CPU保持在低水平(因为无论如何都不再处理消息)。此外,据我所知,消息只有在阶段结束时安全提交后才会被ACK(我们的两个作业都包含两个阶段)。但是数据流可能实际上仍然在读取消息(因为它会推动它们的确认字符截止日期)。但是这些消息永远不会离开ReadFromPubSub
步骤。
手动删除VM辅助角色(从Compute Engine控制台中)会触发自动重新创建VM并取消冻结作业。Pub/Sub订阅的使用将恢复,一切恢复正常。
我们如何解决这个问题并确保我们的工作不会卡住?我们没有想法,因为数据流只产生很少的日志,我们的业务代码似乎对这种行为不负责任。
(1)数据流请求延长确认字符的截止日期。
(2)错误日志。ReadStream-process
也可以是MergeBuckets-process
。NameOfAGroupByKeyStep
是执行从阶段1到阶段2的步骤。
Stuck state: workflow-msec-<NameOfAGroupByKeyStep>/ReadStream-process, reporter: 0x4566bdcd4aa8, with stack:
--- Thread (name: futex-default-SDomainT/132) stack: ---
PC: @ 0x55f23554d580 thread::(anonymous namespace)::FutexDomain::RawBlock()
@ 0x55f23554d580 thread::(anonymous namespace)::FutexDomain::RawBlock()
@ 0x55f23554cbbe thread::(anonymous namespace)::FutexDomain::BlockCurrent()
@ 0x55f2356e7aac base::scheduling::Downcalls::UserSchedule()
@ 0x55f2356e689e AbslInternalPerThreadSemWait
@ 0x55f235749e84 absl::CondVar::WaitCommon()
@ 0x55f23554c221 thread::SelectUntil()
@ 0x55f2345be1cb dist_proc::dax::workflow::(anonymous namespace)::BatchingWindmillGetDataClient::GetData()
@ 0x55f2345ac148 dist_proc::dax::workflow::StreamingRpcWindmillServiceStreamingServer::GetData()
@ 0x55f234ae9f85 dist_proc::dax::workflow::WindmillServiceStreamingServerProxy::GetData()
@ 0x55f234945ad3 dist_proc::dax::workflow::StateManager::PrefetchAll()
@ 0x55f23494521b dist_proc::dax::workflow::StateManager::ReadTag()
@ 0x55f23493c3d6 dist_proc::dax::workflow::WindmillWindowingAPIDelegate::ReadKeyedStateImplVirtual()
@ 0x55f2349420ed dist_proc::dax::workflow::WindowingAPIDelegate::ReadKeyedStateImpl()
@ 0x55f234941fd2 dist_proc::dax::workflow::WindmillCacheAccess::ReadKeyedStateImpl()
@ 0x55f2346e6ec8 dist_proc::dax::workflow::CacheAccess::ReadStateFromCache<>()::{lambda()#1}::operator()()
@ 0x55f2346e6e8e absl::functional_internal::InvokeObject<>()
@ 0x55f234942912 std::__u::__function::__policy_invoker<>::__call_impl<>()
@ 0x55f2349c5927 dist_proc::dax::workflow::StateObjectsCache::ReadImpl()
@ 0x55f2349c56f5 dist_proc::dax::workflow::StateObjectsCache::Read()
(3)中断之前的信息级日志,关于网络问题:
I0128 16:07:09.289409461 166 subchannel.cc:945] subchannel 0x473cbc81a000 {address=ipv4:74.125.133.95:443, args=grpc.client_channel_factory=0x473cbfcb4690, grpc.default_authority=europe-west1-dataflowstreaming-pa.googleapis.com, grpc.dns_enable_srv_queries=1, grpc.http2_scheme=https, grpc.internal.channel_credentials=0x473cbf494f78, grpc.internal.security_connector=0x473cbb5f0230, grpc.internal.subchannel_pool=0x473cbf766870, grpc.keepalive_permit_without_calls=1, grpc.keepalive_time_ms=60000, grpc.keepalive_timeout_ms=60000, grpc.max_metadata_size=1048576, grpc.max_receive_message_length=-1, grpc.primary_user_agent=grpc-c++/1.44.0-dev, grpc.resource_quota=0x473cbf752ca8, grpc.server_uri=dns:///europe-west1-dataflowstreaming-pa.googleapis.com}: connect failed: {"created":"@1643386029.289272376","description":"Failed to connect to remote host: FD shutdown","file":"third_party/grpc/src/core/lib/iomgr/ev_poll_posix.cc","file_line":500,"grpc_status":14,"os_error":"Timeout occurred","referenced_errors":[{"created":"@1643386029.289234760","description":"connect() timed out","file":"third_party/grpc/src/core/lib/iomgr/tcp_client_posix.cc","file_line":114}],"target_address":"ipv4:74.125.133.95:443"}
(4)数据流正确检测系统滞后增加
在联系Google的支持团队后,我们从未得到问题所在的明确答案,但它停止了发生。我们刚刚得出结论,这是一个内部错误,最终由Dataflow团队修复。