我正在使用GCP数据流运行Apache Beam管道,并从worker收到此错误:
Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: Got poison pill or timeout but stream is not done
实际上在大约2分钟内得到了一堆。
我正在使用管道将消息从PubSub写入BigQuery。在管道内部,当将PubSub消息转换为TableRow
s时,我使用的是故障安全元素
我的结论是对的吗?如果是这样,有没有办法用Apache Beam捕获这些毒丸并将它们记录/写入某个地方?
更新:
管道是用Java编写的,我正在使用Streaming Engine运行Dataflow管道。
这是管道的架构:
第一部分看起来像这样:
PCollection<PubsubMessage> messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.withIdAttribute("Some Attribute Field Name")
.fromSubscription(options.getInputSubscription()));
所以我想我正在阅读带有PubsubMessageWellAtbantesCoder
的PubSub消息,对于有效负载,它使用ByteArrayCoder
。
全栈跟踪如下:
Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: Got poison pill or timeout but stream is not done.
org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:233)
org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:474)
org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:365)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$2.get(Futures.java:542)
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.fetchData(WindmillStateInternals.java:1636)
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.access$2400(WindmillStateInternals.java:1583)
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag$1.read(WindmillStateInternals.java:1683)
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag$1.read(WindmillStateInternals.java:1674)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:999)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:97)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1437)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1113)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Got poison pill or timeout but stream is not done.
org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AppendableInputStream$1.hasMoreElements(GrpcWindmillServer.java:1538)
java.base/java.io.SequenceInputStream.peekNextStream(SequenceInputStream.java:101)
java.base/java.io.SequenceInputStream.nextStream(SequenceInputStream.java:97)
java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:203)
org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AppendableInputStream.read(GrpcWindmillServer.java:1605)
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.CodedInputStream$StreamDecoder.tryRefillBuffer(CodedInputStream.java:2786)
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.CodedInputStream$StreamDecoder.isAtEnd(CodedInputStream.java:2709)
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.CodedInputStream$StreamDecoder.readTag(CodedInputStream.java:2060)
org.apache.beam.runners.dataflow.worker.windmill.Windmill$KeyedGetDataResponse.<init>(Windmill.java:37549)
org.apache.beam.runners.dataflow.worker.windmill.Windmill$KeyedGetDataResponse.<init>(Windmill.java:37505)
org.apache.beam.runners.dataflow.worker.windmill.Windmill$KeyedGetDataResponse$1.parsePartialFrom(Windmill.java:40068)
org.apache.beam.runners.dataflow.worker.windmill.Windmill$KeyedGetDataResponse$1.parsePartialFrom(Windmill.java:40062)
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:215)
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:232)
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:237)
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.GeneratedMessageV3.parseWithIOException(GeneratedMessageV3.java:339)
org.apache.beam.runners.dataflow.worker.windmill.Windmill$KeyedGetDataResponse.parseFrom(Windmill.java:38173)
org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetDataStream.issueRequest(GrpcWindmillServer.java:1164)
org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetDataStream.requestKeyedData(GrpcWindmillServer.java:1117)
org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:220)
org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:474)
org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:365)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$2.get(Futures.java:542)
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.fetchData(WindmillStateInternals.java:1636)
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.access$2400(WindmillStateInternals.java:1583)
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag$1.read(WindmillStateInternals.java:1683)
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag$1.read(WindmillStateInternals.java:1674)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:999)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:97)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1437)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1113)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
你的基本想法是对的。请参阅使用Google Pub/Sub时如何处理“毒丸”消息?仅讨论Pubsub部分。
您是否使用Streaming Engine运行Dataflow管道?您是否有管道示例以便我们更好地理解?也许还有堆栈跟踪?
关于转换消息之前的错误,您是否以什么格式从Pub/Sub阅读?
PubsubIO允许您在阅读时指定死信主题,这可以涵盖格式错误的消息的用例:
PubsubIO.readProtos(Proto.class)
.fromSubscription("projects/{project_id}/subscriptions/{input-subscription}")
.withDeadLetterTopic("projects/{project_id}/topics/{output=dlq}"))