我正在尝试将数据从API导入我的GBQ,并希望使用数据流。
由于未知和难以想象的原因,API只是返回一个". csv.gz"的URL,然后我需要在将数据推送到GBQ之前下载和处理它。
此外,API具有不记名令牌的身份验证,因此我正在寻找一种方法来处理数据的下载和解析以及身份验证,并发现:
pd.read_csv('https://app.SOMEPROVIDER.com/api/reporting/download/SOMEID.csv.gz', storage_options={'Authorization': 'Bearer '+ bearer_token}, compression='gzip', header=0, sep=',', quotechar='"')
当在我的本地Beam管道中使用它时,它的工作效果非常好。
但是,一旦我将管道上传到数据流并在那里运行它,我就会收到错误消息
ValueError:storage_options传递文件对象或非fsspec文件路径
完整的痕迹:
"apache_beam/runners/common.py", line 1223, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 572, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
".\filename.py", line 144, in process File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
610, in read_csv return _read(filepath_or_buffer, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
462, in _read parser = TextFileReader(filepath_or_buffer, **kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
819, in __init__ self._engine = self._make_engine(self.engine) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1050, in _make_engine return mapping[engine](self.f, **self.options) #
type: ignore[call-arg] File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1867, in __init__ self._open_handles(src, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1362, in _open_handles self.handles = get_handle( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
558, in get_handle ioargs = _get_filepath_or_buffer( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
286, in _get_filepath_or_buffer raise ValueError( ValueError:
storage_options passed with file object or non-fsspec file path During
handling of the above exception, another exception occurred: Traceback
(most recent call last): File
"/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py",
line 651, in do_work work_executor.execute() File
"/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py",
line 179, in execute op.start() File
"dataflow_worker/shuffle_operations.py", line 63, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 353, in
apache_beam.runners.worker.operations.Operation.output File
"apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 353, in
apache_beam.runners.worker.operations.Operation.output File
"apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 713, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1225, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1290, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1223, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 752, in
apache_beam.runners.common.PerWindowInvoker.invoke_process File
"apache_beam/runners/common.py", line 875, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1386, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 713, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1225, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1223, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 572, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
".\filename.py", line 144, in process File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
610, in read_csv return _read(filepath_or_buffer, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
462, in _read parser = TextFileReader(filepath_or_buffer, **kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
819, in __init__ self._engine = self._make_engine(self.engine) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1050, in _make_engine return mapping[engine](self.f, **self.options) #
type: ignore[call-arg] File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1867, in __init__ self._open_handles(src, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1362, in _open_handles self.handles = get_handle( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
558, in get_handle ioargs = _get_filepath_or_buffer( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
286, in _get_filepath_or_buffer raise ValueError( ValueError:
storage_options passed with file object or non-fsspec file path [while
running 'Fetch actual report data'] ```
有人知道为什么这在本地有效但在云中无效吗?我假设这可能与文件系统和临时文件有关-但错误消息没有多大意义……
根据熊猫文档,storage_options参数交给urllib用于https链接,而只交给fsspec用于s3和gcs路径
原来这只是一个版本问题。存储选项参数作为授权信息的解释在数据流图像中包含的熊猫版本中不存在,当我通过带有-extra_package
参数的最新可用熊猫版本的本地“轮子”时,问题自行解决了。