我试图运行一个Python(版本2.7.1')脚本,我使用pymysql包创建一个表到数据库从CSV文件。
它在我的本地系统中正确运行,但是,在Google Cloud Dataflow中作为管道的一部分运行相同的脚本时出现问题。
我的Python函数如下:
class charge_to_db(beam.DoFn):
def process(self, element):
import pymysql
with open(element, 'r') as f:
data = f.read().decode("UTF-8")
datalist = []
for line in data.split('\n'):
datalist.append(line.split(','))
db = pymysql.connect(host='IPaddress', user='root', password='mypassword', database='stack_model')
cursor = db.cursor()
cursor.execute("DROP TABLE IF EXISTS stack_convergence")
# create column names from the first line in fList
up = "upper_bnd"
primal = "primal"
d = "dualit"
gap = "gap_rel"
teta = "teta"
alpha = "alpha"
imba = "imba_avg"
price = "price_avg"
# create STUDENT table // place a comma after each new column except the last
queryCreateConvergenceTable = """CREATE TABLE stack_convergence(
{} float not null,
{} float not null,
{} float not null,
{} float not null,
{} float not null,
{} float not null,
{} float not null,
{} float not null )""".format(up, primal, d, gap, teta, alpha, imba, price)
cursor.execute(queryCreateConvergenceTable)
在云中运行此函数时,我收到以下错误:
RuntimeError: OperationalError: (2003, 'Can\'t connect to MySQL server on \'35.195.1.40\' (110 "Connection timed out")')
我不知道为什么会发生这个错误,因为它在本地系统中正确运行,所以从本地系统我可以访问我的云SQL实例,但不能从云中的数据流访问。
为什么会出现这个错误?
在Dataflow上,您不能将IP列入白名单以使Dataflow能够访问SQL实例。如果您要使用Java,最简单的方法是使用JdbcIO/JDBC套接字工厂。
但是由于您使用的是Python,那么使用Python特定的数据库连接工具模仿JdbcIO.read()的实现会有所帮助。在更改了一些云SQL设置并添加了相关的python代码后,有一个相关的问题。
如果这看起来很复杂,或者您可以将数据从云SQL导出到云存储,然后从云存储加载。