我在python中创建了一个云函数,它可以监控是否在云存储中创建或修改了任何文件,如果是,则使用我在apache光束中创建的模板在数据流中触发一个作业,该模板已经配置为从云中的存储中获取文件并将其发送到bigquery表。我部署了“main.py”函数,它部署了规范,在云函数中变得绿色,但它什么都不做,我不知道我哪里出错了。
云功能
def startDataflowProcess(data, context):
from googleapiclient.discovery import build
#replace with your projectID
project = "curso-dataflow-beam-347613"
job = project + " " + str(data['timeCreated'])
#path of the dataflow template on google storage bucket
template = "gs://curso-apachebeam-dataflow/template/storage_to_bigquery"
inputFile = "gs://" + str(data['bucket']) + "/" + str(data['dataset.csv'])
#user defined parameters to pass to the dataflow pipeline job
parameters = {
'inputFile': inputFile,
}
#tempLocation is the path on GCS to store temp files generated during the dataflow job
environment = {'tempLocation': 'gs://curso-apachebeam-dataflow/temp'}
service = build('dataflow', 'v1b3', cache_discovery=False)
#below API is used when we want to pass the location of the dataflow job
request = service.projects().locations().templates().launch(
projectId=project,
gcsPath=template,
location='southamerica-east1',
body={
'jobName': job,
'parameters': parameters,
'environment':environment
},
)
response = request.execute()
print(str(response))
模型数据流
import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = {
'project': 'curso-dataflow-beam-347613' ,
'runner': 'DataflowRunner',
'region': 'southamerica-east1',
'staging_location': 'gs://curso-apachebeam-dataflow/temp',
'temp_location': 'gs://curso-apachebeam-dataflow/temp',
'template_location': 'gs://curso-apachebeam-dataflow/template/storage_to_bigquery',
'save_main_session' : True }
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
p1 = beam.Pipeline(options=pipeline_options)
serviceAccount = r'C:\Users\paulo.santos\Documents\CURSO DATA FLOW\Python-master\curso-dataflow-beam-347613-c998cb1e5f49.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= serviceAccount
def criar_dict(record):
dict_ = {}
dict_['name']
dict_['company'] = record[1]
dict_['pin'] = record[2]
return(dict_)
table_schema = 'name:STRING, company:STRING, pin:INTEGER'
tabela = 'projeto_curso_dataflow.Curso_dataflow_projeto'
Tabela_Dados = (
p1
| "Importar Dados" >> beam.io.ReadFromText(r"gs://bucket_trigger_dataflow_storage_to_bigquery/dataset.csv", skip_header_lines = 2)
| "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
| "Criar um dic" >> beam.Map(lambda record: criar_dict(record))
| "Gravar no BigQuery" >> beam.io.WriteToBigQuery(
tabela,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location = 'gs://curso-apachebeam-dataflow/temp' )
)
p1.run()
BUG报告
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/curso-dataflow-beam-347613/locations/southamerica-east1/templates:launch?gcsPath=gs%3A%2F%2Fcurso-apachebeam-dataflow%2Ftemplate%2Fstorage_to_bigquery&alt=json returned "(87fc63e208f59924): The workflow could not be created. Causes: (87fc63e208f5949f): Found unexpected parameters: ['inputFile' (perhaps you meant 'update')]". Details: "(87fc63e208f59924): The workflow could not be created. Causes: (87fc63e208f5949f): Found unexpected parameters: ['inputFile' (perhaps you meant 'update')]">
at.execute ( /layers/google.python.pip/pip/lib/python3.7/site-packages/googleapiclient/http.py:937 )
at.positional_wrapper ( /layers/google.python.pip/pip/lib/python3.7/site-packages/googleapiclient/_helpers.py:131 )
at.startDataflowProcess ( /workspace/main.py:29 )
at.view_func ( /layers/google.python.pip/pip/lib/python3.7/site-packages/functions_framework/__init__.py:171 )
at.dispatch_request ( /layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py:1502 )
at.full_dispatch_request ( /layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py:1516 )
at.full_dispatch_request ( /layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py:1518 )
at.wsgi_app ( /layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py:2073 )
tp/topics/cloud-functions-en4vxzzki3woz4k7qfnl3b36zy [POST]
Traceback (most recent call last):
File "/layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py", line 2073, in wsgi_app
response = self.full_dispatch_request()
File "/layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py", line 1518, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py", line 1516, in full_dispatch_request
rv = self.dispatch_request()
File "/layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py", line 1502, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
File "/layers/google.python.pip/pip/lib/python3.7/site-packages/functions_framework/__init__.py", line 171, in view_func
function(data, context)
File "/workspace/main.py", line 29, in startDataflowProcess
response = request.execute()
File "/layers/google.python.pip/pip/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 131, in positional_wrapper
return wrapped(*args, **kwargs)
File "/layers/google.python.pip/pip/lib/python3.7/site-packages/googleapiclient/http.py", line 937, in execute
raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/curso-dataflow-beam-347613/locations/southamerica-east1/templates:launch?gcsPath=gs%3A%2F%2Fcurso-apachebeam-dataflow%2Ftemplate%2Fstorage_to_bigquery&alt=json returned "(6e270cbeb486518f): The workflow could not be created. Causes: (6e270cbeb48651ce): Found unexpected parameters: ['inputFile' (perhaps you meant 'update')]". Details: "(6e270cbeb486518f): The workflow could not be created. Causes: (6e270cbeb48651ce): Found unexpected parameters: ['inputFile' (perhaps you meant 'update')]">
日志
insertId: "ptesrsc3ih"
labels: {
dataflow.googleapis.com/job_id: "2022-05-04_08_59_01-7937326881475810309"
dataflow.googleapis.com/job_name: "curso-dataflow-beam-347613 2022-05-04T15:58:55.872Z"
dataflow.googleapis.com/log_type: "system"
dataflow.googleapis.com/region: "southamerica-east1"
}
logName: "projects/curso-dataflow-beam-347613/logs/dataflow.googleapis.com%2Fjob-message"
receiveTimestamp: "2022-05-04T15:59:03.802251063Z"
resource: {
labels: {
job_id: "2022-05-04_08_59_01-7937326881475810309"
job_name: "curso-dataflow-beam-347613 2022-05-04T15:58:55.872Z"
project_id: "curso-dataflow-beam-347613"
region: "southamerica-east1"
step_id: ""
}
type: "dataflow_step"
}
severity: "DEBUG"
textPayload: "The workflow could not be created. Causes: Found unexpected parameters: ['inputFile' (perhaps you meant 'update')]"
timestamp: "2022-05-04T15:59:01.944998497Z"
这似乎是遗留模板的问题。请改用flex模板。