我有一个简单的Apache光束程序,它从gcp云存储中读取一个avro文件并将其写入大查询。
#import print library
import logging
import os
import datetime
#import apache beam library
import apache_beam as beam
from apache_beam import window
from google.cloud import storage
from google.oauth2 import service_account
from google.cloud import language
KEY_PATH= '/home/kiruba/tutorials/gcp-pubsub/bigqueryprojectkey.json'
credentials = service_account.Credentials.from_service_account_file(KEY_PATH)
client = language.LanguageServiceClient(credentials=credentials)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]=KEY_PATH
PROJECT='bigqueryproject-XXXX'
BUCKET='XXXX_customdataflow1'
currentdatetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET_PATH='pubsub_avro/cloud_storage_transfer/avrofile_'+ currentdatetime+".avro"
# Instantiates a client
client = storage.Client(credentials=credentials)
#import pipeline options.
from apache_beam.options.pipeline_options import PipelineOptions
#Set log level to info
root = logging.getLogger()
root.setLevel(logging.INFO)
class ComputeWordLengthFn(beam.DoFn):
def process(self, element):
# print(os.path.getsize('/home/kiruba/tutorials/gcp-pubsub/pubsubprojectdir/avrofile_20220624161520.avro'))
print(element)
return [len(element)]
#Create a pipeline
p = beam.Pipeline(options=PipelineOptions())
#create a PCollectionfromAVRO file
transaction =(p
| 'Read all from AVRO' >> beam.io.avroio.ReadFromAvro('gs://my_bigquery_poc/pubsub_avro/cloud_storage_transfer/avrofile_20220703112646.avro')
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('bigqueryproject-XXXX:movielens.movietitle_pubsub'))
# Run the pipeline
def run():
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
result = p.run()
# wait until pipeline processing is complete
result.wait_until_finish()
我使用命令执行程序
python 03_05_classictemplate_pipeline_avro_bigquery.py --project bigqueryproject-XXXX --runner DataflowRunner --staging_location gs://XXXX_customdataflow/staging --temp_location gs://XXXX_customdataflow/staging --region=us-central1
我明白错误了
{'gs://my_bigquery_poc/pubsub_avro/cloud_storage_transfer/avrofile_20220624161520.avro': HttpUnauthorizedError('HttpError accessing <https://www.googleapis.com/storage/v1/b/my_bigquery_poc/o/pubsub_avro%2Fcloud_storage_transfer%2Favrofile_20220624161520.avro?alt=json>: response: <{\'x-guploader-uploadid\': \'ADPycds1MWSKBe3m3NRS2Z9YTc1gvjXyoGIXN7HCr8DrWacKDa0w-6-ImqyFubVP4ewIhsp09dCnSbRYi67sZbClEo_CtHJMlSlR\', \'content-type\': \'application/json; charset=UTF-8\', \'date\': \'Sat, 02 Jul 2022 04:21:18 GMT\', \'vary\': \'Origin, X-Origin\', \'www-authenticate\': \'Bearer realm="https://accounts.google.com/"\', \'cache-control\': \'no-cache, no-store, max-age=0, must-revalidate\', \'expires\': \'Mon, 01 Jan 1990 00:00:00 GMT\', \'pragma\': \'no-cache\', \'content-length\': \'432\', \'server\': \'UploadServer\', \'alt-svc\': \'h3=":443"; ma=2592000,h3-29=":443"; ma=2592000,h3-Q050=":443"; ma=2592000,h3-Q046=":443"; ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"\', \'status\': \'401\'}>, content <{\n "error": {\n "code": 401,\n "message": "Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object.",\n "errors": [\n {\n "message": "Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object.",\n "domain": "global",\n "reason": "required",\n "locationType": "header",\n "location": "Authorization"\n }\n ]\n }\n}\n>')}
为什么我的程序仍然认为访问是匿名的。我如何将有效的服务凭据传递给数据流运行器。
终于奏效了。