提问者:小点点

PythonApache Beam错误"InvalidSchema:没有找到用于"的连接适配器,当请求带有空格的api url时


以下示例从Apache Beam Pipeline读取RESTAPI在本地运行但不在Dataflow管道上请求api中的数据

response = requests.get(url, auth=HTTPDigestAuth(self.USER, self.PASSWORD), headers=headers)

其中url字符串

url = "https://host:port/car('power%203')/speed"

管道错误失败,请注意'power 3周围的额外\

InvalidSchema: No connection adapters were found for '(("https://host:post/car(\'power%203\')/speed",),)' [while running 'fetch API data']

想法是在本地开发和测试管道,然后在gcp数据流上运行生产。请求在管道外部工作,但在Apache Beam管道内部失败Python。在WSL2 Ubuntu conda pyhton 3.9环境或cloud jupyter hub的DirectRunner上执行的管道仍然返回相同的错误。请在下面找到完整的管道示例:

import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import requests
import json
from requests.auth import HTTPDigestAuth

class get_api_data(beam.DoFn):
    def __init__(self, url):
        self.url = url,
        self.USER = 'user' 
        self.PASSWORD = 'password'

    def process(self, buffer=[]):        
        logging.info(self.url)
        headers = {
            'Prefer': f'data.maxpagesize=2000',
        }        
        response = requests.get(self.url, auth=HTTPDigestAuth(self.USER, self.PASSWORD), headers=headers)
        buffer = response.json()['value']
        return buffer


class Split(beam.DoFn):
    def process(self, element):
        try:
            etag = element['etag']
            car_id = element['carID']
            power = element['power']
            speed = element['speed']
        except ValueError as e:
            logging.error(e)

        return [{
            'etag': str(etag),
            'car_id': str(car_id),
            'power': int(power),
            'speed': float(speed),
        }]

def run(argv=None):   
    url = "https://host:port/car('power%203')/speed"
    p1 = beam.Pipeline(options=pipeline_options)
    ingest_data = (
        p1
        | 'Start Pipeline' >> beam.Create([None])
        | 'fetch API data' >> beam.ParDo(get_api_data(url)) 
        | 'split records' >> beam.ParDo(Split())
        | 'write to text' >> beam.io.WriteToText("./test_v2.csv")
    )

    result = p1.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

这让我很困惑,如果有人能分享任何关于为什么url字符串被扭曲的建议或评论,我将不胜感激。


共1个答案

匿名用户

删除get_api_data类中url旁边的逗号-它应该可以解决问题

class get_api_data(beam.DoFn):
    def __init__(self, url):
        self.url = url
        self.USER = 'user' 
        self.PASSWORD = 'password'