我正在尝试使用带有python sdk的google数据流从Google spanner数据库中读取表,并将其写入文本文件进行备份。我编写了以下脚本:
from __future__ import absolute_import
import argparse
import itertools
import logging
import re
import time
import datetime as dt
import logging
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io import WriteToText
from apache_beam.io.range_trackers import OffsetRangeTracker, UnsplittableRangeTracker
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from google.cloud.spanner.client import Client
from google.cloud.spanner.keyset import KeySet
BUCKET_URL = 'gs://my_bucket'
OUTPUT = '%s/output/' % BUCKET_URL
PROJECT_ID = 'my_project'
INSTANCE_ID = 'my_instance'
DATABASE_ID = 'my_db'
JOB_NAME = 'spanner-backup'
TABLE = 'my_table'
class SpannerSource(iobase.BoundedSource):
def __init__(self):
logging.info('Enter __init__')
self.spannerOptions = {
"id": PROJECT_ID,
"instance": INSTANCE_ID,
"database": DATABASE_ID
}
self.SpannerClient = Client
def estimate_size(self):
logging.info('Enter estimate_size')
return 1
def get_range_tracker(self, start_position=None, stop_position=None):
logging.info('Enter get_range_tracker')
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = OffsetRangeTracker.OFFSET_INFINITY
range_tracker = OffsetRangeTracker(start_position, stop_position)
return UnsplittableRangeTracker(range_tracker)
def read(self, range_tracker): # This is not called when using the dataflowRunner !
logging.info('Enter read')
# instantiate spanner client
spanner_client = self.SpannerClient(self.spannerOptions["id"])
instance = spanner_client.instance(self.spannerOptions["instance"])
database = instance.database(self.spannerOptions["database"])
# read from table
table_fields = database.execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % TABLE)
table_fields.consume_all()
self.columns = [x[0] for x in table_fields]
keyset = KeySet(all_=True)
results = database.read(table=TABLE, columns=self.columns, keyset=keyset)
# iterator over rows
results.consume_all()
for row in results:
JSON_row = {
self.columns[i]: row[i] for i in range(len(self.columns))
}
yield JSON_row
def split(self, start_position=None, stop_position=None):
# this should not be called since the source is unspittable
logging.info('Enter split')
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = 1
# Because the source is unsplittable (for now), only a single source is returned
yield iobase.SourceBundle(
weight=1,
source=self,
start_position=start_position,
stop_position=stop_position)
def run(argv=None):
"""Main entry point"""
pipeline_options = PipelineOptions()
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL
#pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=pipeline_options)
output = p | 'Get Rows from Spanner' >> beam.io.Read(SpannerSource())
iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat()
output | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime + '-' + TABLE, file_name_suffix='') # if this line is commented, job completes but does not do anything
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
但是,此脚本仅在DirectRunner上正确运行:当我让它在DataflowRunner上运行时,它会运行一段时间而没有任何输出,然后退出并出现错误:
"执行失败步骤失败14[…]工作流失败。原因:[…]工作人员与服务失去联系。"
有时,它会永远持续下去,而不会产生输出。
此外,如果我注释“输出=…”行,则作业完成,但没有实际读取数据。
似乎dataflowRunner调用源的函数estimate_size,而不是调用函数read或get_range_tracker。
有人知道是什么导致了这种情况吗?我知道有一个(更完整的)javaSDK,有一个实验性的扳手源/接收器可用,但如果可能的话,我宁愿坚持使用python。
谢啦
Google目前添加了对Backup Spanner with Dataflow的支持,您可以在创建DataFlow作业时选择相关模板。
更多信息:https://cloud.google.com/blog/products/gcp/cloud-spanner-adds-import-export-functionality-to-ease-data-movement
我已经按照简单使用ParDo而不是使用BoundedSource类的建议重新编写了我的代码。作为参考,这是我的解决方案;我相信有很多方法可以改进它,我很乐意听到意见。特别是,我很惊讶在启动管道时必须创建一个虚拟的PColl(如果我不这样做,我会得到一个错误)
属性错误:“PBegin”对象没有属性“窗口”
我无法工作。虚拟PColl感觉有点像黑客。
from __future__ import absolute_import
import datetime as dt
import logging
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from google.cloud.spanner.client import Client
from google.cloud.spanner.keyset import KeySet
BUCKET_URL = 'gs://my_bucket'
OUTPUT = '%s/some_folder/' % BUCKET_URL
PROJECT_ID = 'my_project'
INSTANCE_ID = 'my_instance'
DATABASE_ID = 'my_database'
JOB_NAME = 'my_jobname'
class ReadTables(beam.DoFn):
def __init__(self, project, instance, database):
super(ReadTables, self).__init__()
self._project = project
self._instance = instance
self._database = database
def process(self, element):
# get list of tables in the database
table_names_row = Client(self._project).instance(self._instance).database(self._database).execute_sql('SELECT t.table_name FROM information_schema.tables AS t')
for row in table_names_row:
if row[0] in [u'COLUMNS', u'INDEXES', u'INDEX_COLUMNS', u'SCHEMATA', u'TABLES']: # skip these
continue
yield row[0]
class ReadSpannerTable(beam.DoFn):
def __init__(self, project, instance, database):
super(ReadSpannerTable, self).__init__()
self._project = project
self._instance = instance
self._database = database
def process(self, element):
# first read the columns present in the table
table_fields = Client(self._project).instance(self._instance).database(self._database).execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % element)
columns = [x[0] for x in table_fields]
# next, read the actual data in the table
keyset = KeySet(all_=True)
results_streamed_set = Client(self._project).instance(self._instance).database(self._database).read(table=element, columns=columns, keyset=keyset)
for row in results_streamed_set:
JSON_row = { columns[i]: row[i] for i in xrange(len(columns)) }
yield (element, JSON_row) # output pairs of (table_name, data)
def run(argv=None):
"""Main entry point"""
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(SetupOptions).requirements_file = "requirements.txt"
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=pipeline_options)
init = p | 'Begin pipeline' >> beam.Create(["test"]) # have to create a dummy transform to initialize the pipeline, surely there is a better way ?
tables = init | 'Get tables from Spanner' >> beam.ParDo(ReadTables(PROJECT, INSTANCE_ID, DATABASE_ID)) # read the tables in the db
rows = (tables | 'Get rows from Spanner table' >> beam.ParDo(ReadSpannerTable(PROJECT, INSTANCE_ID, DATABASE_ID)) # for each table, read the entries
| 'Group by table' >> beam.GroupByKey()
| 'Formatting' >> beam.Map(lambda (table_name, rows): (table_name, list(rows)))) # have to force to list here (dataflowRunner produces _Unwindowedvalues)
iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat()
rows | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime, file_name_suffix='')
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()