提问者:小点点

监控云存储并触发数据流的云函数中的触发器


我在python中创建了一个云函数,它可以监控是否在云存储中创建或修改了任何文件,如果是,则使用我在apache光束中创建的模板在数据流中触发一个作业,该模板已经配置为从云中的存储中获取文件并将其发送到bigquery表。我部署了“main.py”函数,它部署了规范,在云函数中变得绿色,但它什么都不做,我不知道我哪里出错了。

云功能

def startDataflowProcess(data, context):
    from googleapiclient.discovery import build
    #replace with your projectID
    project = "curso-dataflow-beam-347613"
    job = project + " " + str(data['timeCreated'])
    #path of the dataflow template on google storage bucket
    template = "gs://curso-apachebeam-dataflow/template/storage_to_bigquery"
    inputFile = "gs://" + str(data['bucket']) + "/" + str(data['dataset.csv'])
    #user defined parameters to pass to the dataflow pipeline job

    parameters = {
        'inputFile': inputFile,
    }
    #tempLocation is the path on GCS to store temp files generated during the dataflow job
    environment = {'tempLocation': 'gs://curso-apachebeam-dataflow/temp'}

    service = build('dataflow', 'v1b3', cache_discovery=False)
    #below API is used when we want to pass the location of the dataflow job
    request = service.projects().locations().templates().launch(
        projectId=project,
        gcsPath=template,
        location='southamerica-east1',
        body={
            'jobName': job,
            'parameters': parameters,
            'environment':environment
        },
    )
    response = request.execute()
    print(str(response))

模型数据流

import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions

pipeline_options = {
    'project': 'curso-dataflow-beam-347613' ,
    'runner': 'DataflowRunner',
    'region': 'southamerica-east1',
    'staging_location': 'gs://curso-apachebeam-dataflow/temp',
    'temp_location': 'gs://curso-apachebeam-dataflow/temp',
    'template_location': 'gs://curso-apachebeam-dataflow/template/storage_to_bigquery',
    'save_main_session' : True }

pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
p1 = beam.Pipeline(options=pipeline_options)

serviceAccount = r'C:\Users\paulo.santos\Documents\CURSO DATA FLOW\Python-master\curso-dataflow-beam-347613-c998cb1e5f49.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= serviceAccount


def criar_dict(record):
    dict_ = {}
    dict_['name'] 
    dict_['company'] = record[1]
    dict_['pin'] = record[2]
    return(dict_)

table_schema = 'name:STRING, company:STRING, pin:INTEGER'
tabela = 'projeto_curso_dataflow.Curso_dataflow_projeto'

Tabela_Dados = (
    p1
    | "Importar Dados" >> beam.io.ReadFromText(r"gs://bucket_trigger_dataflow_storage_to_bigquery/dataset.csv", skip_header_lines = 2)
    | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
    | "Criar um dic" >> beam.Map(lambda record: criar_dict(record)) 
    | "Gravar no BigQuery" >> beam.io.WriteToBigQuery(
                              tabela,
                              schema=table_schema,
                              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                              custom_gcs_temp_location = 'gs://curso-apachebeam-dataflow/temp' )
)

p1.run()

BUG报告

googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/curso-dataflow-beam-347613/locations/southamerica-east1/templates:launch?gcsPath=gs%3A%2F%2Fcurso-apachebeam-dataflow%2Ftemplate%2Fstorage_to_bigquery&alt=json returned "(87fc63e208f59924): The workflow could not be created. Causes: (87fc63e208f5949f): Found unexpected parameters: ['inputFile' (perhaps you meant 'update')]". Details: "(87fc63e208f59924): The workflow could not be created. Causes: (87fc63e208f5949f): Found unexpected parameters: ['inputFile' (perhaps you meant 'update')]">

at.execute ( /layers/google.python.pip/pip/lib/python3.7/site-packages/googleapiclient/http.py:937 )
at.positional_wrapper ( /layers/google.python.pip/pip/lib/python3.7/site-packages/googleapiclient/_helpers.py:131 )
at.startDataflowProcess ( /workspace/main.py:29 )
at.view_func ( /layers/google.python.pip/pip/lib/python3.7/site-packages/functions_framework/__init__.py:171 )
at.dispatch_request ( /layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py:1502 )
at.full_dispatch_request ( /layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py:1516 )
at.full_dispatch_request ( /layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py:1518 )
at.wsgi_app ( /layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py:2073 )
tp/topics/cloud-functions-en4vxzzki3woz4k7qfnl3b36zy [POST]
Traceback (most recent call last):
  File "/layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py", line 2073, in wsgi_app
    response = self.full_dispatch_request()
  File "/layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py", line 1518, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py", line 1516, in full_dispatch_request
    rv = self.dispatch_request()
  File "/layers/google.python.pip/pip/lib/python3.7/site-packages/flask/app.py", line 1502, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
  File "/layers/google.python.pip/pip/lib/python3.7/site-packages/functions_framework/__init__.py", line 171, in view_func
    function(data, context)
  File "/workspace/main.py", line 29, in startDataflowProcess
    response = request.execute()
  File "/layers/google.python.pip/pip/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 131, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/layers/google.python.pip/pip/lib/python3.7/site-packages/googleapiclient/http.py", line 937, in execute
    raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/curso-dataflow-beam-347613/locations/southamerica-east1/templates:launch?gcsPath=gs%3A%2F%2Fcurso-apachebeam-dataflow%2Ftemplate%2Fstorage_to_bigquery&alt=json returned "(6e270cbeb486518f): The workflow could not be created. Causes: (6e270cbeb48651ce): Found unexpected parameters: ['inputFile' (perhaps you meant 'update')]". Details: "(6e270cbeb486518f): The workflow could not be created. Causes: (6e270cbeb48651ce): Found unexpected parameters: ['inputFile' (perhaps you meant 'update')]">

日志

insertId: "ptesrsc3ih"
labels: {
dataflow.googleapis.com/job_id: "2022-05-04_08_59_01-7937326881475810309"
dataflow.googleapis.com/job_name: "curso-dataflow-beam-347613 2022-05-04T15:58:55.872Z"
dataflow.googleapis.com/log_type: "system"
dataflow.googleapis.com/region: "southamerica-east1"
}
logName: "projects/curso-dataflow-beam-347613/logs/dataflow.googleapis.com%2Fjob-message"
receiveTimestamp: "2022-05-04T15:59:03.802251063Z"
resource: {
labels: {
job_id: "2022-05-04_08_59_01-7937326881475810309"
job_name: "curso-dataflow-beam-347613 2022-05-04T15:58:55.872Z"
project_id: "curso-dataflow-beam-347613"
region: "southamerica-east1"
step_id: ""
}
type: "dataflow_step"
}
severity: "DEBUG"
textPayload: "The workflow could not be created. Causes: Found unexpected parameters: ['inputFile' (perhaps you meant 'update')]"
timestamp: "2022-05-04T15:59:01.944998497Z"

共1个答案

匿名用户

这似乎是遗留模板的问题。请改用flex模板。