我一直在尝试使用Dataflow上的经典模板运行我的管道。
管道应该读取RESTAPI。然后,从API返回的答案应该写入一个bigquery表。
当我将api限制为1个结果时,它运行成功,但是,如果我将限制增加到1个以上,它会返回错误
非常感谢帮助-这是我的管道代码:
#!/usr/bin/env python3
import apache_beam as beam
#from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
import setuptools
import logging
# Handling of API calls
import json
import requests
class get_api_data(beam.DoFn):
def __init__(self):
logging.debug("fetching api data")
def process(self,dummy_start):
api_url = "https://api.thedogapi.com/v1/breeds/?limit=05"
logging.debug("Now fetching from ", api_url)
response = requests.get(api_url)
return list(response.json())
def run(argv=None):
beam_options = PipelineOptions(
runner='direct', #'DataflowRunner',
region='us-west2',
project='projectid',
job_name='api-to-bq-test1', # Dataflow job name
temp_location='gs://rb-munish-playground/temp',
)
# BigQuery Table details
table_spec = bigquery.TableReference(
projectId='projectid',
datasetId='dataflow_test',
tableId='dogs')
table_schema = {
'fields': [
{
'name': 'weight', 'type': 'RECORD', 'mode': 'REPEATED',
'fields': [{'name':'imperial','type':'STRING','mode':'NULLABLE'},
{'name':'metric','type':'STRING','mode':'NULLABLE'}]
},
{
'name': 'height', 'type': 'RECORD', 'mode': 'REPEATED',
'fields': [{'name':'imperial','type':'STRING','mode':'NULLABLE'},
{'name':'metric','type':'STRING','mode':'NULLABLE'}]
},
{
'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'
},
{
'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'
},
{
'name': 'bred_for', 'type': 'STRING', 'mode': 'NULLABLE'
},
{
'name': 'breed_group', 'type': 'STRING', 'mode': 'NULLABLE'
},
{
'name': 'life_span', 'type': 'STRING', 'mode': 'NULLABLE'
},
{
'name': 'temperament', 'type': 'STRING', 'mode': 'NULLABLE'
},
{
'name': 'origin', 'type': 'STRING', 'mode': 'NULLABLE'
},
{
'name': 'reference_image_id', 'type': 'STRING', 'mode': 'NULLABLE'
},
{
'name': 'image', 'type': 'RECORD', 'mode': 'REPEATED',
'fields': [{'name':'height','type':'INTEGER','mode':'NULLABLE'},
{'name':'id','type':'STRING','mode':'NULLABLE'},
{'name':'url','type':'STRING','mode':'NULLABLE'},
{'name':'width','type':'INTEGER','mode':'NULLABLE'}]
},
]
}
# BigQueryDisposition.CREATE_IF_NEEDED:
# Specifies that the write operation should create a new table if one does not exist.
# If you use this value, you must provide a table schema.
# CREATE_IF_NEEDED is the default behavior.
# BigQueryDisposition.CREATE_NEVER:
# Specifies that a table should never be created.
# If the destination table does not exist, the write operation fails.
# BigQueryDisposition.WRITE_EMPTY:
# Specifies that the write operation should fail at runtime if the destination table is not empty.
# WRITE_EMPTY is the default behavior.
# BigQueryDisposition.WRITE_TRUNCATE:
# Specifies that the write operation should replace an existing table.
# Any existing rows in the destination table are removed, and the new rows are added to the table.
# BigQueryDisposition.WRITE_APPEND:
# Specifies that the write operation should append the rows to the end of the existing table.
#p1 = beam.Pipeline(options=beam_options)
with beam.Pipeline(options=beam_options) as pipeline:
ingest_data = (
pipeline
| 'Create' >> beam.Create(['Start']) # workaround to kickstart the pipeline
| 'fetch API data' >> beam.ParDo(get_api_data())
| 'write into gbq' >> beam.io.WriteToBigQuery(table = table_spec, schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
#| 'write to text' >> beam.io.WriteToText("./results.txt")
)
result = pipeline.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
这里是错误堆栈
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 742, in process
self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 627, in wait_for_bq_job
raise RuntimeError(
RuntimeError: BigQuery job beam_bq_job_LOAD_apitobqtest1_LOAD_STEP_465_21cc6601da786b6d152c6a0338a36bf7_1d78c9d4d4674091a7952fd88df97035 failed. Error Result: <ErrorProto
location: 'gs://rb-munish-playground/temp/bq_load/843e8ffd21e346d5b754d3d15dff14da/rb-munish-playground.dataflow_test.dogs/35808f33-0892-4fe6-8cb0-5f90d7d11a89'
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 2; errors: 1. Please look into the errors[] collection for more details.'
reason: 'invalid'> [while running 'write into gbq/BigQueryBatchFileLoads/WaitForDestinationLoadJobs']
查看您在管道上指定的Cloud Storage文件夹Optiontemp_location
或在beam.io中指定参数
custom_gcs_temp_location
。您应该找到加载到BigQuery中的文件。
另一方面,您应该使用gcloud并使用bq show-j检查您的BigQuery作业
告诉您哪个字段导致了错误。在我的例子中,我的模式缺少liutravail
字段。
一般来说,不要在ParDo中发出API请求。Beam并行化该ParDo中的工作项;管道可以扇出1000个或更多并发请求,本质上成为JMeter测试。API可能无法处理负载。
您的示例显示了一个要发出请求的元素,这是安全的。尽管很可能您不需要Beam来处理这么少的数据。