提问者:小点点

kubeflow-如何从kubeflow中的Jupyter笔记本连接到kubeflow管道endpoint


在kubeflow jupyter笔记本中,与kubeflow管道的连接失败,尽管遵循从同一集群连接到Kubeflow管道-多用户模式。

import os
import kfp

with open(os.environ['KF_PIPELINES_SA_TOKEN_PATH'], "r") as f:
    TOKEN = f.read()
client = kfp.Client(
    existing_token=TOKEN
)
print(client.list_pipelines())
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/urllib3/connection.py in _new_conn(self)
    168         try:
--> 169             conn = connection.create_connection(
    170                 (self._dns_host, self.port), self.timeout, **extra_kw

/opt/conda/lib/python3.8/site-packages/urllib3/util/connection.py in create_connection(address, timeout, source_address, socket_options)
     95     if err is not None:
---> 96         raise err
     97 

/opt/conda/lib/python3.8/site-packages/urllib3/util/connection.py in create_connection(address, timeout, source_address, socket_options)
     85                 sock.bind(source_address)
---> 86             sock.connect(sa)
     87             return sock

ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

NewConnectionError                        Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    698             # Make the request on the httplib connection object.
--> 699             httplib_response = self._make_request(
    700                 conn,

/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    393             else:
--> 394                 conn.request(method, url, **httplib_request_kw)
    395 

/opt/conda/lib/python3.8/site-packages/urllib3/connection.py in request(self, method, url, body, headers)
    233             headers["User-Agent"] = _get_default_user_agent()
--> 234         super(HTTPConnection, self).request(method, url, body=body, headers=headers)
    235 

/opt/conda/lib/python3.8/http/client.py in request(self, method, url, body, headers, encode_chunked)
   1251         """Send a complete request to the server."""
-> 1252         self._send_request(method, url, body, headers, encode_chunked)
   1253 

/opt/conda/lib/python3.8/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
   1297             body = _encode(body, 'body')
-> 1298         self.endheaders(body, encode_chunked=encode_chunked)
   1299 

/opt/conda/lib/python3.8/http/client.py in endheaders(self, message_body, encode_chunked)
   1246             raise CannotSendHeader()
-> 1247         self._send_output(message_body, encode_chunked=encode_chunked)
   1248 

/opt/conda/lib/python3.8/http/client.py in _send_output(self, message_body, encode_chunked)
   1006         del self._buffer[:]
-> 1007         self.send(msg)
   1008 

/opt/conda/lib/python3.8/http/client.py in send(self, data)
    946             if self.auto_open:
--> 947                 self.connect()
    948             else:

/opt/conda/lib/python3.8/site-packages/urllib3/connection.py in connect(self)
    199     def connect(self):
--> 200         conn = self._new_conn()
    201         self._prepare_conn(conn)

/opt/conda/lib/python3.8/site-packages/urllib3/connection.py in _new_conn(self)
    180         except SocketError as e:
--> 181             raise NewConnectionError(
    182                 self, "Failed to establish a new connection: %s" % e

NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f5b1ac2e2b0>: Failed to establish a new connection: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

MaxRetryError                             Traceback (most recent call last)
<ipython-input-26-245cf5dc3b72> in <module>
      3     existing_token=TOKEN
      4 )
----> 5 print(client.list_pipelines())

/opt/conda/lib/python3.8/site-packages/kfp/_client.py in list_pipelines(self, page_token, page_size, sort_by)
    543       A response object including a list of pipelines and next page token.
    544     """
--> 545     return self._pipelines_api.list_pipelines(page_token=page_token, page_size=page_size, sort_by=sort_by)
    546 
    547   def list_pipeline_versions(self, pipeline_id: str, page_token='', page_size=10, sort_by=''):

/opt/conda/lib/python3.8/site-packages/kfp_server_api/api/pipeline_service_api.py in list_pipelines(self, **kwargs)
   1210         """
   1211         kwargs['_return_http_data_only'] = True
-> 1212         return self.list_pipelines_with_http_info(**kwargs)  # noqa: E501
   1213 
   1214     def list_pipelines_with_http_info(self, **kwargs):  # noqa: E501

/opt/conda/lib/python3.8/site-packages/kfp_server_api/api/pipeline_service_api.py in list_pipelines_with_http_info(self, **kwargs)
   1311         auth_settings = ['Bearer']  # noqa: E501
   1312 
-> 1313         return self.api_client.call_api(
   1314             '/apis/v1beta1/pipelines', 'GET',
   1315             path_params,

/opt/conda/lib/python3.8/site-packages/kfp_server_api/api_client.py in call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, async_req, _return_http_data_only, collection_formats, _preload_content, _request_timeout, _host)
    362         """
    363         if not async_req:
--> 364             return self.__call_api(resource_path, method,
    365                                    path_params, query_params, header_params,
    366                                    body, post_params, files,

/opt/conda/lib/python3.8/site-packages/kfp_server_api/api_client.py in __call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout, _host)
    179         try:
    180             # perform request and return response
--> 181             response_data = self.request(
    182                 method, url, query_params=query_params, headers=header_params,
    183                 post_params=post_params, body=body,

/opt/conda/lib/python3.8/site-packages/kfp_server_api/api_client.py in request(self, method, url, query_params, headers, post_params, body, _preload_content, _request_timeout)
    387         """Makes the HTTP request using RESTClient."""
    388         if method == "GET":
--> 389             return self.rest_client.GET(url,
    390                                         query_params=query_params,
    391                                         _preload_content=_preload_content,

/opt/conda/lib/python3.8/site-packages/kfp_server_api/rest.py in GET(self, url, headers, query_params, _preload_content, _request_timeout)
    228     def GET(self, url, headers=None, query_params=None, _preload_content=True,
    229             _request_timeout=None):
--> 230         return self.request("GET", url,
    231                             headers=headers,
    232                             _preload_content=_preload_content,

/opt/conda/lib/python3.8/site-packages/kfp_server_api/rest.py in request(self, method, url, query_params, headers, body, post_params, _preload_content, _request_timeout)
    206             # For `GET`, `HEAD`
    207             else:
--> 208                 r = self.pool_manager.request(method, url,
    209                                               fields=query_params,
    210                                               preload_content=_preload_content,

/opt/conda/lib/python3.8/site-packages/urllib3/request.py in request(self, method, url, fields, headers, **urlopen_kw)
     72 
     73         if method in self._encode_url_methods:
---> 74             return self.request_encode_url(
     75                 method, url, fields=fields, headers=headers, **urlopen_kw
     76             )

/opt/conda/lib/python3.8/site-packages/urllib3/request.py in request_encode_url(self, method, url, fields, headers, **urlopen_kw)
     94             url += "?" + urlencode(fields)
     95 
---> 96         return self.urlopen(method, url, **extra_kw)
     97 
     98     def request_encode_body(

/opt/conda/lib/python3.8/site-packages/urllib3/poolmanager.py in urlopen(self, method, url, redirect, **kw)
    373             response = conn.urlopen(method, url, **kw)
    374         else:
--> 375             response = conn.urlopen(method, u.request_uri, **kw)
    376 
    377         redirect_location = redirect and response.get_redirect_location()

/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    781                 "Retrying (%r) after connection broken by '%r': %s", retries, err, url
    782             )
--> 783             return self.urlopen(
    784                 method,
    785                 url,

/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    781                 "Retrying (%r) after connection broken by '%r': %s", retries, err, url
    782             )
--> 783             return self.urlopen(
    784                 method,
    785                 url,

/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    781                 "Retrying (%r) after connection broken by '%r': %s", retries, err, url
    782             )
--> 783             return self.urlopen(
    784                 method,
    785                 url,

/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    753                 e = ProtocolError("Connection aborted.", e)
    754 
--> 755             retries = retries.increment(
    756                 method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
    757             )

/opt/conda/lib/python3.8/site-packages/urllib3/util/retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
    572 
    573         if new_retry.is_exhausted():
--> 574             raise MaxRetryError(_pool, url, error or ResponseError(cause))
    575 
    576         log.debug("Incremented Retry for (url='%s'): %r", url, new_retry)

MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/pipelines?page_token=&page_size=10&sort_by= (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f5b1ac2e2b0>: Failed to establish a new connection: [Errno 111] Connection refused'))

这个问题在下面的github问题中进行了讨论,但没有明确的答案。

  • [多用户]调用kfp. Client()失败。create_run_from_pipeline_func在集群内的juy有记事本#4440

共1个答案

匿名用户

import os
with open(os.environ['KF_PIPELINES_SA_TOKEN_PATH'], "r") as f:
    TOKEN = f.read()

import kfp
client = kfp.Client(
    host='http://ml-pipeline.kubeflow.svc.cluster.local:8888',
    # host='http://ml-pipeline-ui.kubeflow.svc.cluster.local:80', # <--- Does not work as later causes HTTP response body: RBAC: access denied
    # existing_token=TOKEN. # Not required
)

print(client.list_pipelines())
{'next_page_token': None,
 'pipelines': [{'created_at': datetime.datetime(2022, 5, 22, 2, 5, 33, tzinfo=tzlocal()),
                'default_version': {'code_source_url': None,
                                    'created_at': datetime.datetime(2022, 5, 22, 2, 5, 33, tzinfo=tzlocal()),
                                    'id': 'b693a0d3-b11c-4c5b-b3f9-6158382948d6',
                                    'name': '[Demo] XGBoost - Iterative model '
                                            'training',
                                    'package_url': None,
                                    'parameters': None,
                                    'resource_references': [{'key': {'id': 'b693a0d3-b11c-4c5b-b3f9-6158382948d6',
                                                                     'type': 'PIPELINE'},
                                                             'name': None,
                                                             'relationship': 'OWNER'}]},
                'description': '[source '
                               'code](https://github.com/kubeflow/pipelines/blob/c8a18bde299f2fdf5f72144f15887915b8d11520/samples/core/train_until_good/train_until_good.py) '
                               'This sample demonstrates iterative training '
                               'using a train-eval-check recursive loop. The '
                               'main pipeline trains the initial model and '
                               'then gradually trains the model some more '
                               'until the model evaluation metrics are good '
                               'enough.',
                'error': None,
                'id': 'b693a0d3-b11c-4c5b-b3f9-6158382948d6',
                'name': '[Demo] XGBoost - Iterative model training',
                'parameters': None,
                'resource_references': None,
                'url': None},
               {'created_at': datetime.datetime(2022, 5, 22, 2, 5, 34, tzinfo=tzlocal()),
                'default_version': {'code_source_url': None,
                                    'created_at': datetime.datetime(2022, 5, 22, 2, 5, 34, tzinfo=tzlocal()),
                                    'id': 'c65b4f2e-362d-41a8-8f5c-9b944830029e',
                                    'name': '[Demo] TFX - Taxi tip prediction '
                                            'model trainer',
                                    'package_url': None,
                                    'parameters': [{'name': 'pipeline-root',
                                                    'value': 'gs://{{kfp-default-bucket}}/tfx_taxi_simple/{{workflow.uid}}'},
                                                   {'name': 'module-file',
                                                    'value': '/opt/conda/lib/python3.7/site-packages/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py'},
                                                   {'name': 'push_destination',
                                                    'value': '{"filesystem": '
                                                             '{"base_directory": '
                                                             '"gs://your-bucket/serving_model/tfx_taxi_simple"}}'}],
                                    'resource_references': [{'key': {'id': 'c65b4f2e-362d-41a8-8f5c-9b944830029e',
                                                                     'type': 'PIPELINE'},
                                                             'name': None,
                                                             'relationship': 'OWNER'}]},
                'description': '[source '
                               'code](https://github.com/kubeflow/pipelines/tree/c8a18bde299f2fdf5f72144f15887915b8d11520/samples/core/parameterized_tfx_oss) '
                               '[GCP Permission '
                               'requirements](https://github.com/kubeflow/pipelines/blob/c8a18bde299f2fdf5f72144f15887915b8d11520/samples/core/parameterized_tfx_oss#permission). '
                               'Example pipeline that does classification with '
                               'model analysis based on a public tax cab '
                               'dataset.',
                'error': None,
                'id': 'c65b4f2e-362d-41a8-8f5c-9b944830029e',
                'name': '[Demo] TFX - Taxi tip prediction model trainer',
                'parameters': [{'name': 'pipeline-root',
                                'value': 'gs://{{kfp-default-bucket}}/tfx_taxi_simple/{{workflow.uid}}'},
                               {'name': 'module-file',
                                'value': '/opt/conda/lib/python3.7/site-packages/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py'},
                               {'name': 'push_destination',
                                'value': '{"filesystem": {"base_directory": '
                                         '"gs://your-bucket/serving_model/tfx_taxi_simple"}}'}],
                'resource_references': None,
                'url': None},
               {'created_at': datetime.datetime(2022, 5, 22, 2, 5, 35, tzinfo=tzlocal()),
                'default_version': {'code_source_url': None,
                                    'created_at': datetime.datetime(2022, 5, 22, 2, 5, 35, tzinfo=tzlocal()),
                                    'id': '56bb7063-ade0-4074-9721-b063f42c46fd',
                                    'name': '[Tutorial] Data passing in python '
                                            'components',
                                    'package_url': None,
                                    'parameters': None,
                                    'resource_references': [{'key': {'id': '56bb7063-ade0-4074-9721-b063f42c46fd',
                                                                     'type': 'PIPELINE'},
                                                             'name': None,
                                                             'relationship': 'OWNER'}]},
                'description': '[source '
                               'code](https://github.com/kubeflow/pipelines/tree/c8a18bde299f2fdf5f72144f15887915b8d11520/samples/tutorials/Data%20passing%20in%20python%20components) '
                               'Shows how to pass data between python '
                               'components.',
                'error': None,
                'id': '56bb7063-ade0-4074-9721-b063f42c46fd',
                'name': '[Tutorial] Data passing in python components',
                'parameters': None,
                'resource_references': None,
                'url': None},
               {'created_at': datetime.datetime(2022, 5, 22, 2, 5, 36, tzinfo=tzlocal()),
                'default_version': {'code_source_url': None,
                                    'created_at': datetime.datetime(2022, 5, 22, 2, 5, 36, tzinfo=tzlocal()),
                                    'id': '36b09aa0-a317-4ad4-a0ed-ddf55a485eb0',
                                    'name': '[Tutorial] DSL - Control '
                                            'structures',
                                    'package_url': None,
                                    'parameters': None,
                                    'resource_references': [{'key': {'id': '36b09aa0-a317-4ad4-a0ed-ddf55a485eb0',
                                                                     'type': 'PIPELINE'},
                                                             'name': None,
                                                             'relationship': 'OWNER'}]},
                'description': '[source '
                               'code](https://github.com/kubeflow/pipelines/tree/c8a18bde299f2fdf5f72144f15887915b8d11520/samples/tutorials/DSL%20-%20Control%20structures) '
                               'Shows how to use conditional execution and '
                               'exit handlers. This pipeline will randomly '
                               'fail to demonstrate that the exit handler gets '
                               'executed even in case of failure.',
                'error': None,
                'id': '36b09aa0-a317-4ad4-a0ed-ddf55a485eb0',
                'name': '[Tutorial] DSL - Control structures',
                'parameters': None,
                'resource_references': None,
                'url': None},
               {'created_at': datetime.datetime(2022, 5, 24, 6, 46, 45, tzinfo=tzlocal()),
                'default_version': {'code_source_url': None,
                                    'created_at': datetime.datetime(2022, 5, 24, 6, 46, 45, tzinfo=tzlocal()),
                                    'id': 'da2bc8b4-27f2-4aa3-befb-c53487d9db49',
                                    'name': 'test',
                                    'package_url': None,
                                    'parameters': [{'name': 'a', 'value': '1'},
                                                   {'name': 'b', 'value': '7'}],
                                    'resource_references': [{'key': {'id': 'da2bc8b4-27f2-4aa3-befb-c53487d9db49',
                                                                     'type': 'PIPELINE'},
                                                             'name': None,
                                                             'relationship': 'OWNER'}]},
                'description': 'test',
                'error': None,
                'id': 'da2bc8b4-27f2-4aa3-befb-c53487d9db49',
                'name': 'test',
                'parameters': [{'name': 'a', 'value': '1'},
                               {'name': 'b', 'value': '7'}],
                'resource_references': None,
                'url': None}],
 'total_size': 5}