我创建了一个自定义模板,它使用ReadFromBigQuery
I/O连接器从BigQuery读取。我这样使用它:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--query',
help='Query to retrieve from BigQuery acting as data source.')
parser.add_argument(
'--bucket',
default='mybucketname',
help='Bucket name for staging, temp and schema files.')
options = PipelineOptions()
args = options.view_as(CustomOptions)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'myproject'
google_cloud_options.region = 'europe-west1'
google_cloud_options.staging_location = 'gs://{}/staging/'.format(args.bucket)
google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket)
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(SetupOptions).setup_file = './setup.py'
def run():
with beam.Pipeline(options=options) as p:
(
p
| "Read from BigQuery" >> beam.io.ReadFromBigQuery(
query=args.query,
use_standard_sql=True,
flatten_results=False)
...
)
setup.py
import setuptools
REQUIRED_PACKAGES = [
'apache-beam',
'apache-beam[gcp]',
'google-cloud-storage'
]
setuptools.setup(
name='ProcessEmailMetrics',
version='0.0.1',
description='Workflow to process email metrics.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
include_package_data=True
)
最后,这是我如何使数据流API调用我的云函数:
import google.auth
from googleapiclient.discovery import build
credentials, _ = google.auth.default()
service = build('dataflow', 'v1b3', credentials=credentials, cache_discovery=False)
query = """
SELECT ...
"""
BODY = {
'jobName': 'process-data',
'gcsPath': 'gs://mybucket/templates/my_template',
'parameters': {
'query' : query
}
}
req = service.projects().locations().templates().create(
projectId='myproject',
location='europe-west1',
body=BODY
)
req.execute()
我通过在侦听Pub/Sub主题的Cloud Function上启动模板进行API调用来开始这项工作。如果我仅在该主题上发布一条消息,则管道将完成,没有任何错误。但是,如果我从同一个Cloud Function执行启动多个作业,则会收到两个不同的错误。
第一个是关于丢失的文件。前两个错误属于这种类型:
访问https://www.googleapis.com/storage/v1/b/my-bucket/o/tmp/6b2d2ba6-1/bigquery-table-dump-000000000003.json?alt=media
第二个是索引错误超出范围,同样是在读取ReadFromBigQuery
上生成的avro文件时。接下来的三个错误属于这种类型:
2021-08-13 12:03:48.656来自worker的CESTError消息:Traceback(最近一次调用是最后一次):File"/usr/local/lib/python3.7/site-不包/dataflow_worker/native_operations",第651行,do_workwork_executor.执行()File"/usr/local/lib/python3.7/site-不包/dataflow_worker/dataflow_worker",第179行,在执行op.start()File"dataflow_worker/dataflow_worker",第38行,dataflow_worker.native_operations.NativeReadOper.start File"dataflow_worker/native_operations",第39行,dataflow_worker.native_operations.NativeReadOper.start File"dataflow_worker/apache_beam",第44行,dataflow_worker.native_operations.NativeReadOperation.start File"dataflow_worker/_source_bundles",第48行,dataflow_worker.native_operations.NativeReadOper.start File"/usr/local/lib/python3.7/site-不包/apache_beam/io/source_ix",第84行,在self中读取记录._source_bundles[source_ix
在这五个错误发生后,我的管道失败并停止了。
似乎ReadFromBigQuery
连接器正在寻找一个包含一些实际上不存在或已被弄乱的BigQuery行的临时文件。
正如我所说的,如果我只启动一个数据流作业,它会毫无错误地完成,所以我有两个假设。
>
可能与我的Cloud Function有关。当两条消息发布时间太接近时,函数没有时间进入睡眠状态,可能文件路径就这样乱了。
build
数据流服务时,cache_discovery=False
选项会产生这个问题吗?也许,这是由于我的模板是如何编码的:
选项。view_as(SetupOptions)。save_main_session=True
选项是问题的关键吗?上的不同时间位置google_cloud_options.temp_location='gs://{}/tmp/'. format(args.bucket)
用于每个作业执行?我需要能够在同一个Cloud Function执行上启动多个Dataflow作业,因此实际行为不符合我的项目需求。
这是我失败工作的ID之一:2021-08-13_02_54_10-11165491620802897150
。
知道怎么解决吗?
更新:
版本
python: 3.7.3 (on Cloud Shell)
beam: 2.31.0 (on Cloud Shell)
beam: undefined (on setup.py)
我认为问题是两个管道都在执行Bigquery导出到同一个临时目录中,并且它们相互干扰。您可以为每个管道提供不同的目录,如下所示:
您可以尝试为ReadFromBigQuery转换提供单独的GCS位置吗?您可以这样做:
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
...
parser.add_value_provider_argument(
'--export_location',
help='GCS location to perform Bigquery export')
...
在您的管道中,您将单独传递此导出位置:
def run():
with beam.Pipeline(options=options) as p:
(
p
| "Read from BigQuery" >> beam.io.ReadFromBigQuery(
query=args.query,
use_standard_sql=True,
flatten_results=False,
gcs_location=options.export_location)
...
)
最后,每次启动管道时都会自动生成一个新管道:
BODY = {
'jobName': 'process-data',
'gcsPath': 'gs://mybucket/templates/my_template',
'parameters': {
'query' : query,
'export_location': 'gs://mybucket/templates/my_template/tmp/' + str(uuid.uuid4())
}
}
req = service.projects().locations().templates().create(
projectId='myproject',
location='europe-west1',
body=BODY
)
req.execute()