提问者:小点点

Go Apache BeamGCP数据流:找不到pubsub的接收器,检查接收器库是否指定alwayslink=1


我正在使用带有Apache Beam的GoSDK来构建一个简单的Dataflow管道,该管道将从查询中获取数据并使用以下代码将数据发布到pub/sub:

package main

import (
    "context"
    "flag"
    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
    "gitlab.com/bq-to-pubsub/infra/env"
    "gitlab.com/bq-to-pubsub/sources"
    "gitlab.com/bq-to-pubsub/sources/pp"
)

func main() {
    flag.Parse()
    ctx := context.Background()
    beam.Init()
    log.Info(ctx, "Creating new pipeline")
    pipeline, scope := beam.NewPipelineWithRoot()
    project := gcpopts.GetProject(ctx)

    ppData := pp.Query(scope, project)
    ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
    pubsubio.Write(scope, "project", "topic", ppMessages)

    if err := beamx.Run(ctx, pipeline); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}

当我的管道在Google Cloud Dataflow上运行时,我收到以下错误:

工作流失败。原因:S01:Source pp/bigquery. Query/脉冲源pp/bigquery.Query/bigqueryio.queryFn pp.ToByteArray pubsubio.Write/外部失败。,作业失败是因为一个工作项失败了4次。在以前的日志条目中查找4次失败的原因。有关详细信息,请参阅https://cloud.google.com/dataflow/docs/guides/common-errors.在这些工作线程上尝试了该工作项:pp10112132-vhzf-harness-p8v0根本原因:找不到pubsub的接收器,检查接收器库指定alwayslink=1。, pp10112132-vhzf-harness-p8v0根本原因:找不到pubsub的接收器,检查接收器库指定alwayslink=1。,pp10112132-vhzf-harness-p8v0根本原因:找不到pubsub的接收器,检查接收器库指定alwayslink=1。,pp10112132-vhzf-harness-p8v0根本原因:找不到pubsub的接收器,检查接收器库指定alwayslink=1。

我读过这个帖子,但我不确定它是如何解决的。

任何想法?


共1个答案

匿名用户

作业是在流式模式还是批处理模式下运行?我猜是批处理模式。可能是用于批处理模式的数据流内部运行器在发布子接收器中没有链接。

不幸的是,此时GoSDK没有为写入pubsub提供本地“回退”,批处理运行器可以使用它。

也就是说,如果您使用标准Go包编写自己的DoFn以写入PubSub,您应该很容易解除阻塞。https://pkg.go.dev/cloud.google.com/go/pubsub#hdr-Publishing

你应该写的大致如下。

var (
  // Assuming everything is one project
  clientOnce sync.Once
  pubSubClient pubsub.Client
)

type PubSubSinkFn struct{
  Project, Topic string // Whatever configuration you need

  client pubsub.Client  // Client is safe to use on multiple goroutines
  batch []*myMessages   // per bundle batches.
}

func (fn *PubSubSinkFn) Setup(ctx context.Context) {
   clientOnce.Do (... ) // create the client with the sync.Once so it can be shared by all bundles
   fn.client = pubSubClient
}

func (fn *PubSubSinkFn) ProcessElement(ctx context.Context, v *myMessage) {
  fn.batch = append(fn.batch, v)
  if len(fn.batch) > batchSize { // or whatever criteria you want
     fn.publishBatch()
  }
}

func (fn *PubSubSinkFn) FinishBundle() {
  fn.publishBatch()
}

func (fn *PubSubSinkFn) publishBatch() {
  // use fn.client to publish the batch
  fn.batch = nil
}

// When constructing your pipeline
beam.ParDo0(s, &PubSubSinkFn{Project: "foo", Topic: "bar"}, messages)