我正在使用带有Apache Beam的GoSDK来构建一个简单的Dataflow管道,该管道将从查询中获取数据并使用以下代码将数据发布到pub/sub:
package main
import (
"context"
"flag"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
"gitlab.com/bq-to-pubsub/infra/env"
"gitlab.com/bq-to-pubsub/sources"
"gitlab.com/bq-to-pubsub/sources/pp"
)
func main() {
flag.Parse()
ctx := context.Background()
beam.Init()
log.Info(ctx, "Creating new pipeline")
pipeline, scope := beam.NewPipelineWithRoot()
project := gcpopts.GetProject(ctx)
ppData := pp.Query(scope, project)
ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
pubsubio.Write(scope, "project", "topic", ppMessages)
if err := beamx.Run(ctx, pipeline); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
当我的管道在Google Cloud Dataflow上运行时,我收到以下错误:
工作流失败。原因:S01:Source pp/bigquery. Query/脉冲源pp/bigquery.Query/bigqueryio.queryFn pp.ToByteArray pubsubio.Write/外部失败。,作业失败是因为一个工作项失败了4次。在以前的日志条目中查找4次失败的原因。有关详细信息,请参阅https://cloud.google.com/dataflow/docs/guides/common-errors.在这些工作线程上尝试了该工作项:pp10112132-vhzf-harness-p8v0根本原因:找不到pubsub的接收器,检查接收器库指定alwayslink=1。, pp10112132-vhzf-harness-p8v0根本原因:找不到pubsub的接收器,检查接收器库指定alwayslink=1。,pp10112132-vhzf-harness-p8v0根本原因:找不到pubsub的接收器,检查接收器库指定alwayslink=1。,pp10112132-vhzf-harness-p8v0根本原因:找不到pubsub的接收器,检查接收器库指定alwayslink=1。
我读过这个帖子,但我不确定它是如何解决的。
任何想法?
作业是在流式模式还是批处理模式下运行?我猜是批处理模式。可能是用于批处理模式的数据流内部运行器在发布子接收器中没有链接。
不幸的是,此时GoSDK没有为写入pubsub提供本地“回退”,批处理运行器可以使用它。
也就是说,如果您使用标准Go包编写自己的DoFn以写入PubSub,您应该很容易解除阻塞。https://pkg.go.dev/cloud.google.com/go/pubsub#hdr-Publishing
你应该写的大致如下。
var (
// Assuming everything is one project
clientOnce sync.Once
pubSubClient pubsub.Client
)
type PubSubSinkFn struct{
Project, Topic string // Whatever configuration you need
client pubsub.Client // Client is safe to use on multiple goroutines
batch []*myMessages // per bundle batches.
}
func (fn *PubSubSinkFn) Setup(ctx context.Context) {
clientOnce.Do (... ) // create the client with the sync.Once so it can be shared by all bundles
fn.client = pubSubClient
}
func (fn *PubSubSinkFn) ProcessElement(ctx context.Context, v *myMessage) {
fn.batch = append(fn.batch, v)
if len(fn.batch) > batchSize { // or whatever criteria you want
fn.publishBatch()
}
}
func (fn *PubSubSinkFn) FinishBundle() {
fn.publishBatch()
}
func (fn *PubSubSinkFn) publishBatch() {
// use fn.client to publish the batch
fn.batch = nil
}
// When constructing your pipeline
beam.ParDo0(s, &PubSubSinkFn{Project: "foo", Topic: "bar"}, messages)