提问者:小点点

如何使用GCP云SQL作为数据流源和/或汇Python?


使用Google CloudSQL作为数据流读取源和/或接收器是否有任何指导?

在Apache BeamPythonSDK2.1.0留档中,没有一章提到谷歌云SQL。但是有关于BigQuery的文章。

当我阅读教程从关系数据库执行ETL到BigQuery时,我看到他们在这个过程中使用导出的数据作为文件的来源。这意味着中间必须有一个导出步骤,这并不理想。

在特定情况下使用云SQL时,是否需要注意特定问题?对于两个源作为接收器?


共2个答案

匿名用户

BeamPythonSDK没有从MySQL/Postgres数据库读取数据的内置转换。尽管如此,编写自定义转换来执行此操作应该不会太麻烦。您可以这样做:

with beam.Pipeline() as p:
  query_result_pc = (p 
                     | beam.Create(['select a,b,c from table1'])
                     | beam.ParDo(QueryMySqlFn(host='...', user='...'))
                     | beam.Reshuffle())

要连接到MySQL,我们将使用mysql特定的库mysql.连接器,但您可以为Postgres/etc使用适当的库。

您的查询功能是:

import mysql.connector


class QueryMySqlFn(beam.DoFn):

  def __init__(self, **server_configuration):
    self.config = server_configuration

  def start_bundle(self):
      self.mydb = mysql.connector.connect(**self.config)
      self.cursor = mydb.cursor()

  def process(self, query):
    self.cursor.execute(query)
    for result in self.cursor:
      yield result

对于Postgres,您可以使用凄凉2或任何其他允许您连接到它的库:

import psycopg2

class QueryPostgresFn(beam.DoFn):

  def __init__(self, **server_config):
    self.config = server_config

  def process(self, query):
    con = psycopg2.connect(**self.config)
    cur = con.cursor()

    cur.execute(query)
    return cur.fetchall()

FAQ

  • 为什么要在那里进行beam. Reshuffle转换?-因为QueryMySqlFn不会并行化从数据库中读取数据。重新洗牌将确保我们的数据在下游并行化以进行进一步处理。

匿名用户

有一个很好的库https://github.com/pysql-beam/pysql-beamSQL摄取,请通过示例,它支持像MySQL和PostgreqlRDBMS。

它提供了读写选项,如下所示,我们可以从谷歌云SQL读取数据:

from pysql_beam.sql_io.sql import ReadFromSQL

....
ReadFromSQL(host=options.host, port=options.port,
        username=options.username, password=options.password,
        database=options.database,
        query=options.source_query,
        wrapper=PostgresWrapper,
        batch=100000)