提问者:小点点

使用最新的pythonapache_beamcloud datafow sdk创建用于从云数据存储读取的自定义源


最近云数据流python sdk可用,我决定使用它。不幸的是,从云数据存储读取的支持还没有到来,所以我不得不依靠编写自定义源代码,这样我就可以像promise的那样利用动态拆分、进度估计等的好处。我确实彻底研究了留档,但无法将碎片放在一起,这样我就可以加快整个过程。

更明确地说,我的第一个方法是:

  1. 查询云数据存储
  2. 创建ParDo函数并将返回的查询传递给它。

但是这样花了13分钟来迭代200k条目。

所以我决定编写自定义源代码来有效地读取实体。但是由于我对将各个部分组合在一起缺乏理解,我无法实现这一目标。有人能帮助我创建自定义源以从数据存储中读取吗?

编辑:对于第一种方法,我的要点链接是:https://gist.github.com/shriyanka/cbf30bbfbf277deed4bac0c526cf01f1

谢谢你。


共1个答案

匿名用户

在您提供的代码中,对Datastore的访问发生在管道构建之前:

query = client.query(kind='User').fetch()

这将执行整个查询并在涉及BeamSDK之前读取所有实体。

更准确地说,fetch()返回对查询结果的惰性迭代,当您在beam. Create(query)构建管道时,它们会被迭代-但是,这再次发生在您的主程序中,在管道开始之前。最有可能的是,这需要13分钟,而不是管道本身(但请随意提供一个作业ID以便我们可以更深入地了解)。您可以通过对代码进行小的更改来验证这一点:

query = list(client.query(kind='User').fetch())

但是,我认为您的意图是并行读取和处理实体。

特别是对于云数据存储,自定义源API不是这样做的最佳选择。原因是底层云数据存储API本身目前没有提供实现自定义源“好东西”所需的属性,例如进度估计和动态拆分,因为它的查询API非常通用(不像Cloud Bigtable,它总是返回按键排序的结果,因此例如,您可以通过查看当前键来估计进度)。

我们目前正在重写JavaCloud Datastore连接器以使用不同的方法,该方法使用ParDo来拆分查询,并使用ParDo来读取每个子查询。有关详细信息,请参阅此拉取请求。