提问者:小点点

阿帕奇光束管道和毒丸


我正在使用GCP数据流运行Apache Beam管道,并从worker收到此错误:

Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: Got poison pill or timeout but stream is not done

实际上在大约2分钟内得到了一堆。

我正在使用管道将消息从PubSub写入BigQuery。在管道内部,当将PubSub消息转换为TableRows时,我使用的是故障安全元素

我的结论是对的吗?如果是这样,有没有办法用Apache Beam捕获这些毒丸并将它们记录/写入某个地方?

更新:

管道是用Java编写的,我正在使用Streaming Engine运行Dataflow管道。

这是管道的架构:

  1. 阅读来自Pub/Sub的消息
  2. 将PubsubMessages转换为TableRow
  3. 对数据进行一些本地计算
  4. 将成功的记录写入BigQuery
  5. 将失败的记录写入BigQuery(死信表)

第一部分看起来像这样:

PCollection<PubsubMessage> messages =
                pipeline.apply(
                        "ReadPubSubSubscription",
                        PubsubIO.readMessagesWithAttributes()
                                .withIdAttribute("Some Attribute Field Name")
                                .fromSubscription(options.getInputSubscription()));

所以我想我正在阅读带有PubsubMessageWellAtbantesCoder的PubSub消息,对于有效负载,它使用ByteArrayCoder

全栈跟踪如下:

Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: Got poison pill or timeout but stream is not done.
        org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:233)
        org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:474)
        org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:365)
        org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$2.get(Futures.java:542)
        org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.fetchData(WindmillStateInternals.java:1636)
        org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.access$2400(WindmillStateInternals.java:1583)
        org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag$1.read(WindmillStateInternals.java:1683)
        org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag$1.read(WindmillStateInternals.java:1674)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:999)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:97)
        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1437)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1113)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Got poison pill or timeout but stream is not done.
        org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AppendableInputStream$1.hasMoreElements(GrpcWindmillServer.java:1538)
        java.base/java.io.SequenceInputStream.peekNextStream(SequenceInputStream.java:101)
        java.base/java.io.SequenceInputStream.nextStream(SequenceInputStream.java:97)
        java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:203)
        org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AppendableInputStream.read(GrpcWindmillServer.java:1605)
        org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.CodedInputStream$StreamDecoder.tryRefillBuffer(CodedInputStream.java:2786)
        org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.CodedInputStream$StreamDecoder.isAtEnd(CodedInputStream.java:2709)
        org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.CodedInputStream$StreamDecoder.readTag(CodedInputStream.java:2060)
        org.apache.beam.runners.dataflow.worker.windmill.Windmill$KeyedGetDataResponse.<init>(Windmill.java:37549)
        org.apache.beam.runners.dataflow.worker.windmill.Windmill$KeyedGetDataResponse.<init>(Windmill.java:37505)
        org.apache.beam.runners.dataflow.worker.windmill.Windmill$KeyedGetDataResponse$1.parsePartialFrom(Windmill.java:40068)
        org.apache.beam.runners.dataflow.worker.windmill.Windmill$KeyedGetDataResponse$1.parsePartialFrom(Windmill.java:40062)
        org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:215)
        org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:232)
        org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:237)
        org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
        org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.GeneratedMessageV3.parseWithIOException(GeneratedMessageV3.java:339)
        org.apache.beam.runners.dataflow.worker.windmill.Windmill$KeyedGetDataResponse.parseFrom(Windmill.java:38173)
        org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetDataStream.issueRequest(GrpcWindmillServer.java:1164)
        org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetDataStream.requestKeyedData(GrpcWindmillServer.java:1117)
        org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:220)
        org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:474)
        org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:365)
        org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$2.get(Futures.java:542)
        org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.fetchData(WindmillStateInternals.java:1636)
        org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.access$2400(WindmillStateInternals.java:1583)
        org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag$1.read(WindmillStateInternals.java:1683)
        org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag$1.read(WindmillStateInternals.java:1674)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:999)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:97)
        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1437)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1113)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:834)

共1个答案

匿名用户

你的基本想法是对的。请参阅使用Google Pub/Sub时如何处理“毒丸”消息?仅讨论Pubsub部分。

您是否使用Streaming Engine运行Dataflow管道?您是否有管道示例以便我们更好地理解?也许还有堆栈跟踪?

关于转换消息之前的错误,您是否以什么格式从Pub/Sub阅读?

PubsubIO允许您在阅读时指定死信主题,这可以涵盖格式错误的消息的用例:

PubsubIO.readProtos(Proto.class)
    .fromSubscription("projects/{project_id}/subscriptions/{input-subscription}") 
    .withDeadLetterTopic("projects/{project_id}/topics/{output=dlq}"))

相关问题