使用Google CloudSQL作为数据流读取源和/或接收器是否有任何指导?
在Apache BeamPythonSDK2.1.0留档中,没有一章提到谷歌云SQL。但是有关于BigQuery的文章。
当我阅读教程从关系数据库执行ETL到BigQuery时,我看到他们在这个过程中使用导出的数据作为文件的来源。这意味着中间必须有一个导出步骤,这并不理想。
在特定情况下使用云SQL时,是否需要注意特定问题?对于两个源作为接收器?
BeamPythonSDK没有从MySQL/Postgres数据库读取数据的内置转换。尽管如此,编写自定义转换来执行此操作应该不会太麻烦。您可以这样做:
with beam.Pipeline() as p:
query_result_pc = (p
| beam.Create(['select a,b,c from table1'])
| beam.ParDo(QueryMySqlFn(host='...', user='...'))
| beam.Reshuffle())
要连接到MySQL,我们将使用mysql特定的库mysql.连接器,但您可以为Postgres/etc使用适当的库。
您的查询功能是:
import mysql.connector
class QueryMySqlFn(beam.DoFn):
def __init__(self, **server_configuration):
self.config = server_configuration
def start_bundle(self):
self.mydb = mysql.connector.connect(**self.config)
self.cursor = mydb.cursor()
def process(self, query):
self.cursor.execute(query)
for result in self.cursor:
yield result
对于Postgres,您可以使用凄凉2
或任何其他允许您连接到它的库:
import psycopg2
class QueryPostgresFn(beam.DoFn):
def __init__(self, **server_config):
self.config = server_config
def process(self, query):
con = psycopg2.connect(**self.config)
cur = con.cursor()
cur.execute(query)
return cur.fetchall()
FAQ
beam. Reshuffle
转换?-因为QueryMySqlFn
不会并行化从数据库中读取数据。重新洗牌将确保我们的数据在下游并行化以进行进一步处理。
有一个很好的库https://github.com/pysql-beam/pysql-beamSQL摄取,请通过示例,它支持像MySQL和PostgreqlRDBMS。
它提供了读写选项,如下所示,我们可以从谷歌云SQL读取数据:
from pysql_beam.sql_io.sql import ReadFromSQL
....
ReadFromSQL(host=options.host, port=options.port,
username=options.username, password=options.password,
database=options.database,
query=options.source_query,
wrapper=PostgresWrapper,
batch=100000)