我正在尝试在Google Cloud Platform中执行apache光束管道作为数据流作业。
我的项目结构如下:
root_dir/
__init__.py
setup.py
main.py
utils/
__init__.py
log_util.py
config_util.py
这是我的setup.py
setuptools.setup(
name='dataflow_example',
version='1.0',
install_requires=[
"google-cloud-tasks==2.2.0",
"google-cloud-pubsub>=0.1.0",
"google-cloud-storage==1.39.0",
"google-cloud-bigquery==2.6.2",
"google-cloud-secret-manager==2.0.0",
"google-api-python-client==2.3.0",
"oauth2client==4.1.3",
"apache-beam[gcp]>=2.20.0",
"wheel>=0.36.2"
],
packages=setuptools.find_packages()
)
这是我的管道代码:
import math
import apache_beam as beam
from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions
from utils.log_util import LogUtil
from utils.config_util import ConfigUtil
class DataflowExample:
config = {}
def __init__(self):
self.config = ConfigUtil.get_config(module_config=["config"])
self.project = self.config['project']
self.region = self.config['location']
self.bucket = self.config['core_bucket']
self.batch_size = 10
def execute_pipeline(self):
try:
LogUtil.log_n_notify(log_type="info", msg=f"Dataflow started")
query = "SELECT id, name, company FROM `<bigquery_table>` LIMIT 10"
beam_options = {
"project": self.project,
"region": self.region,
"job_name": "dataflow_example",
"runner": "DataflowRunner",
"temp_location": f"gs://{self.bucket}/temp_location/"
}
options = PipelineOptions(**beam_options, save_main_session=True)
with beam.Pipeline(options=options) as pipeline:
data = (
pipeline
| 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
| 'Count records' >> beam.combiners.Count.Globally()
| 'Print ' >> beam.ParDo(PrintCount(), self.batch_size)
)
LogUtil.log_n_notify(log_type="info", msg=f"Dataflow completed")
except Exception as e:
LogUtil.log_n_notify(log_type="error", msg=f"Exception in execute_pipeline - {str(e)}")
class PrintCount(beam.DoFn):
def __init__(self):
self.logger = LogUtil()
def process(self, row_count, batch_size):
try:
current_date = datetime.today().date()
total = int(math.ceil(row_count / batch_size))
self.logger.log_n_notify(log_type="info", msg=f"Records pulled from table on {current_date} is {row_count}")
self.logger.log_n_notify(log_type="info", msg=f"Records per batch: {batch_size}. Total batches: {total}")
except Exception as e:
self.logger.log_n_notify(log_type="error", msg=f"Exception in PrintCount.process - {str(e)}")
if __name__ == "__main__":
df_example = DataflowExample()
df_example.execute_pipeline()
管道的功能是
我正在使用命令-python3-main.py
使用Cloud shell运行作业
尽管Dataflow作业启动,但工作节点在几分钟后抛出错误,说“ModuleNotFoundError: No module name'utils'”
“utils”文件夹可用,相同的代码在使用“DirectRunner”执行时可以正常工作。
log_util
和config_util
文件分别是用于日志记录和配置获取的自定义util文件。
另外,我尝试使用setup_file
选项作为python3-main.py--setup_file
如何使用“DataflowRunner”解决ModuleNotFoundError?
发布为社区wiki。经@GopinathS确认,错误和修复如下:
工作人员遇到的错误是BeamSDK基本版本2.32.0与DataflowPython工作人员版本2.28.0不匹配。请检查Dataflow工作人员启动日志并确保安装了正确版本的BeamSDK。
要修复这个“apache-光束[gcp]
更新setup.py:
setuptools.setup(
name='dataflow_example',
version='1.0',
install_requires=[
"google-cloud-tasks==2.2.0",
"google-cloud-pubsub>=0.1.0",
"google-cloud-storage==1.39.0",
"google-cloud-bigquery==2.6.2",
"google-cloud-secret-manager==2.0.0",
"google-api-python-client==2.3.0",
"oauth2client==4.1.3", # removed apache-beam[gcp]>=2.20.0
"wheel>=0.36.2"
],
packages=setuptools.find_packages()
)
更新了流水线代码中的beam_options
:
beam_options = {
"project": self.project,
"region": self.region,
"job_name": "dataflow_example",
"runner": "DataflowRunner",
"temp_location": f"gs://{self.bucket}/temp_location/",
"setup_file": "./setup.py"
}
还要确保一次传递所有管道选项,而不是部分传递。
如果你通过--setup_file
为了避免解析参数并附加到beam_options
,我直接在beam_options
中添加了它作为"setup_file":"./setup.py"
数据流在安装平台锁定在隔离网络中的包时可能会遇到问题。如果没有网络,它将无法编译它们。或者它可能尝试安装它们,但因为无法编译下载轮子?不知道。
仍然要能够使用像心理2(二进制文件)或google-cloud d-secure-manager(没有二进制文件,但依赖项有二进制文件)这样的包,你需要安装所有没有二进制文件(无任何)和没有二进制文件依赖项的东西,通过需求. txt,其余的通过轮子的--extra_packages参数。示例:
--extra_packages=package_1_needed_by_2-manylinux.whl \
--extra_packages=package_2_needed_by_3-manylinux.whl \
--extra_packages=what-you-need_needing_3-none-any.whl