在kubeflow jupyter笔记本中,与kubeflow管道的连接失败,尽管遵循从同一集群连接到Kubeflow管道-多用户模式。
import os
import kfp
with open(os.environ['KF_PIPELINES_SA_TOKEN_PATH'], "r") as f:
TOKEN = f.read()
client = kfp.Client(
existing_token=TOKEN
)
print(client.list_pipelines())
---------------------------------------------------------------------------
ConnectionRefusedError Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/urllib3/connection.py in _new_conn(self)
168 try:
--> 169 conn = connection.create_connection(
170 (self._dns_host, self.port), self.timeout, **extra_kw
/opt/conda/lib/python3.8/site-packages/urllib3/util/connection.py in create_connection(address, timeout, source_address, socket_options)
95 if err is not None:
---> 96 raise err
97
/opt/conda/lib/python3.8/site-packages/urllib3/util/connection.py in create_connection(address, timeout, source_address, socket_options)
85 sock.bind(source_address)
---> 86 sock.connect(sa)
87 return sock
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
NewConnectionError Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
698 # Make the request on the httplib connection object.
--> 699 httplib_response = self._make_request(
700 conn,
/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
393 else:
--> 394 conn.request(method, url, **httplib_request_kw)
395
/opt/conda/lib/python3.8/site-packages/urllib3/connection.py in request(self, method, url, body, headers)
233 headers["User-Agent"] = _get_default_user_agent()
--> 234 super(HTTPConnection, self).request(method, url, body=body, headers=headers)
235
/opt/conda/lib/python3.8/http/client.py in request(self, method, url, body, headers, encode_chunked)
1251 """Send a complete request to the server."""
-> 1252 self._send_request(method, url, body, headers, encode_chunked)
1253
/opt/conda/lib/python3.8/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
1297 body = _encode(body, 'body')
-> 1298 self.endheaders(body, encode_chunked=encode_chunked)
1299
/opt/conda/lib/python3.8/http/client.py in endheaders(self, message_body, encode_chunked)
1246 raise CannotSendHeader()
-> 1247 self._send_output(message_body, encode_chunked=encode_chunked)
1248
/opt/conda/lib/python3.8/http/client.py in _send_output(self, message_body, encode_chunked)
1006 del self._buffer[:]
-> 1007 self.send(msg)
1008
/opt/conda/lib/python3.8/http/client.py in send(self, data)
946 if self.auto_open:
--> 947 self.connect()
948 else:
/opt/conda/lib/python3.8/site-packages/urllib3/connection.py in connect(self)
199 def connect(self):
--> 200 conn = self._new_conn()
201 self._prepare_conn(conn)
/opt/conda/lib/python3.8/site-packages/urllib3/connection.py in _new_conn(self)
180 except SocketError as e:
--> 181 raise NewConnectionError(
182 self, "Failed to establish a new connection: %s" % e
NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f5b1ac2e2b0>: Failed to establish a new connection: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
MaxRetryError Traceback (most recent call last)
<ipython-input-26-245cf5dc3b72> in <module>
3 existing_token=TOKEN
4 )
----> 5 print(client.list_pipelines())
/opt/conda/lib/python3.8/site-packages/kfp/_client.py in list_pipelines(self, page_token, page_size, sort_by)
543 A response object including a list of pipelines and next page token.
544 """
--> 545 return self._pipelines_api.list_pipelines(page_token=page_token, page_size=page_size, sort_by=sort_by)
546
547 def list_pipeline_versions(self, pipeline_id: str, page_token='', page_size=10, sort_by=''):
/opt/conda/lib/python3.8/site-packages/kfp_server_api/api/pipeline_service_api.py in list_pipelines(self, **kwargs)
1210 """
1211 kwargs['_return_http_data_only'] = True
-> 1212 return self.list_pipelines_with_http_info(**kwargs) # noqa: E501
1213
1214 def list_pipelines_with_http_info(self, **kwargs): # noqa: E501
/opt/conda/lib/python3.8/site-packages/kfp_server_api/api/pipeline_service_api.py in list_pipelines_with_http_info(self, **kwargs)
1311 auth_settings = ['Bearer'] # noqa: E501
1312
-> 1313 return self.api_client.call_api(
1314 '/apis/v1beta1/pipelines', 'GET',
1315 path_params,
/opt/conda/lib/python3.8/site-packages/kfp_server_api/api_client.py in call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, async_req, _return_http_data_only, collection_formats, _preload_content, _request_timeout, _host)
362 """
363 if not async_req:
--> 364 return self.__call_api(resource_path, method,
365 path_params, query_params, header_params,
366 body, post_params, files,
/opt/conda/lib/python3.8/site-packages/kfp_server_api/api_client.py in __call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout, _host)
179 try:
180 # perform request and return response
--> 181 response_data = self.request(
182 method, url, query_params=query_params, headers=header_params,
183 post_params=post_params, body=body,
/opt/conda/lib/python3.8/site-packages/kfp_server_api/api_client.py in request(self, method, url, query_params, headers, post_params, body, _preload_content, _request_timeout)
387 """Makes the HTTP request using RESTClient."""
388 if method == "GET":
--> 389 return self.rest_client.GET(url,
390 query_params=query_params,
391 _preload_content=_preload_content,
/opt/conda/lib/python3.8/site-packages/kfp_server_api/rest.py in GET(self, url, headers, query_params, _preload_content, _request_timeout)
228 def GET(self, url, headers=None, query_params=None, _preload_content=True,
229 _request_timeout=None):
--> 230 return self.request("GET", url,
231 headers=headers,
232 _preload_content=_preload_content,
/opt/conda/lib/python3.8/site-packages/kfp_server_api/rest.py in request(self, method, url, query_params, headers, body, post_params, _preload_content, _request_timeout)
206 # For `GET`, `HEAD`
207 else:
--> 208 r = self.pool_manager.request(method, url,
209 fields=query_params,
210 preload_content=_preload_content,
/opt/conda/lib/python3.8/site-packages/urllib3/request.py in request(self, method, url, fields, headers, **urlopen_kw)
72
73 if method in self._encode_url_methods:
---> 74 return self.request_encode_url(
75 method, url, fields=fields, headers=headers, **urlopen_kw
76 )
/opt/conda/lib/python3.8/site-packages/urllib3/request.py in request_encode_url(self, method, url, fields, headers, **urlopen_kw)
94 url += "?" + urlencode(fields)
95
---> 96 return self.urlopen(method, url, **extra_kw)
97
98 def request_encode_body(
/opt/conda/lib/python3.8/site-packages/urllib3/poolmanager.py in urlopen(self, method, url, redirect, **kw)
373 response = conn.urlopen(method, url, **kw)
374 else:
--> 375 response = conn.urlopen(method, u.request_uri, **kw)
376
377 redirect_location = redirect and response.get_redirect_location()
/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
781 "Retrying (%r) after connection broken by '%r': %s", retries, err, url
782 )
--> 783 return self.urlopen(
784 method,
785 url,
/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
781 "Retrying (%r) after connection broken by '%r': %s", retries, err, url
782 )
--> 783 return self.urlopen(
784 method,
785 url,
/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
781 "Retrying (%r) after connection broken by '%r': %s", retries, err, url
782 )
--> 783 return self.urlopen(
784 method,
785 url,
/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
753 e = ProtocolError("Connection aborted.", e)
754
--> 755 retries = retries.increment(
756 method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
757 )
/opt/conda/lib/python3.8/site-packages/urllib3/util/retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
572
573 if new_retry.is_exhausted():
--> 574 raise MaxRetryError(_pool, url, error or ResponseError(cause))
575
576 log.debug("Incremented Retry for (url='%s'): %r", url, new_retry)
MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/pipelines?page_token=&page_size=10&sort_by= (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f5b1ac2e2b0>: Failed to establish a new connection: [Errno 111] Connection refused'))
这个问题在下面的github问题中进行了讨论,但没有明确的答案。
import os
with open(os.environ['KF_PIPELINES_SA_TOKEN_PATH'], "r") as f:
TOKEN = f.read()
import kfp
client = kfp.Client(
host='http://ml-pipeline.kubeflow.svc.cluster.local:8888',
# host='http://ml-pipeline-ui.kubeflow.svc.cluster.local:80', # <--- Does not work as later causes HTTP response body: RBAC: access denied
# existing_token=TOKEN. # Not required
)
print(client.list_pipelines())
{'next_page_token': None,
'pipelines': [{'created_at': datetime.datetime(2022, 5, 22, 2, 5, 33, tzinfo=tzlocal()),
'default_version': {'code_source_url': None,
'created_at': datetime.datetime(2022, 5, 22, 2, 5, 33, tzinfo=tzlocal()),
'id': 'b693a0d3-b11c-4c5b-b3f9-6158382948d6',
'name': '[Demo] XGBoost - Iterative model '
'training',
'package_url': None,
'parameters': None,
'resource_references': [{'key': {'id': 'b693a0d3-b11c-4c5b-b3f9-6158382948d6',
'type': 'PIPELINE'},
'name': None,
'relationship': 'OWNER'}]},
'description': '[source '
'code](https://github.com/kubeflow/pipelines/blob/c8a18bde299f2fdf5f72144f15887915b8d11520/samples/core/train_until_good/train_until_good.py) '
'This sample demonstrates iterative training '
'using a train-eval-check recursive loop. The '
'main pipeline trains the initial model and '
'then gradually trains the model some more '
'until the model evaluation metrics are good '
'enough.',
'error': None,
'id': 'b693a0d3-b11c-4c5b-b3f9-6158382948d6',
'name': '[Demo] XGBoost - Iterative model training',
'parameters': None,
'resource_references': None,
'url': None},
{'created_at': datetime.datetime(2022, 5, 22, 2, 5, 34, tzinfo=tzlocal()),
'default_version': {'code_source_url': None,
'created_at': datetime.datetime(2022, 5, 22, 2, 5, 34, tzinfo=tzlocal()),
'id': 'c65b4f2e-362d-41a8-8f5c-9b944830029e',
'name': '[Demo] TFX - Taxi tip prediction '
'model trainer',
'package_url': None,
'parameters': [{'name': 'pipeline-root',
'value': 'gs://{{kfp-default-bucket}}/tfx_taxi_simple/{{workflow.uid}}'},
{'name': 'module-file',
'value': '/opt/conda/lib/python3.7/site-packages/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py'},
{'name': 'push_destination',
'value': '{"filesystem": '
'{"base_directory": '
'"gs://your-bucket/serving_model/tfx_taxi_simple"}}'}],
'resource_references': [{'key': {'id': 'c65b4f2e-362d-41a8-8f5c-9b944830029e',
'type': 'PIPELINE'},
'name': None,
'relationship': 'OWNER'}]},
'description': '[source '
'code](https://github.com/kubeflow/pipelines/tree/c8a18bde299f2fdf5f72144f15887915b8d11520/samples/core/parameterized_tfx_oss) '
'[GCP Permission '
'requirements](https://github.com/kubeflow/pipelines/blob/c8a18bde299f2fdf5f72144f15887915b8d11520/samples/core/parameterized_tfx_oss#permission). '
'Example pipeline that does classification with '
'model analysis based on a public tax cab '
'dataset.',
'error': None,
'id': 'c65b4f2e-362d-41a8-8f5c-9b944830029e',
'name': '[Demo] TFX - Taxi tip prediction model trainer',
'parameters': [{'name': 'pipeline-root',
'value': 'gs://{{kfp-default-bucket}}/tfx_taxi_simple/{{workflow.uid}}'},
{'name': 'module-file',
'value': '/opt/conda/lib/python3.7/site-packages/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py'},
{'name': 'push_destination',
'value': '{"filesystem": {"base_directory": '
'"gs://your-bucket/serving_model/tfx_taxi_simple"}}'}],
'resource_references': None,
'url': None},
{'created_at': datetime.datetime(2022, 5, 22, 2, 5, 35, tzinfo=tzlocal()),
'default_version': {'code_source_url': None,
'created_at': datetime.datetime(2022, 5, 22, 2, 5, 35, tzinfo=tzlocal()),
'id': '56bb7063-ade0-4074-9721-b063f42c46fd',
'name': '[Tutorial] Data passing in python '
'components',
'package_url': None,
'parameters': None,
'resource_references': [{'key': {'id': '56bb7063-ade0-4074-9721-b063f42c46fd',
'type': 'PIPELINE'},
'name': None,
'relationship': 'OWNER'}]},
'description': '[source '
'code](https://github.com/kubeflow/pipelines/tree/c8a18bde299f2fdf5f72144f15887915b8d11520/samples/tutorials/Data%20passing%20in%20python%20components) '
'Shows how to pass data between python '
'components.',
'error': None,
'id': '56bb7063-ade0-4074-9721-b063f42c46fd',
'name': '[Tutorial] Data passing in python components',
'parameters': None,
'resource_references': None,
'url': None},
{'created_at': datetime.datetime(2022, 5, 22, 2, 5, 36, tzinfo=tzlocal()),
'default_version': {'code_source_url': None,
'created_at': datetime.datetime(2022, 5, 22, 2, 5, 36, tzinfo=tzlocal()),
'id': '36b09aa0-a317-4ad4-a0ed-ddf55a485eb0',
'name': '[Tutorial] DSL - Control '
'structures',
'package_url': None,
'parameters': None,
'resource_references': [{'key': {'id': '36b09aa0-a317-4ad4-a0ed-ddf55a485eb0',
'type': 'PIPELINE'},
'name': None,
'relationship': 'OWNER'}]},
'description': '[source '
'code](https://github.com/kubeflow/pipelines/tree/c8a18bde299f2fdf5f72144f15887915b8d11520/samples/tutorials/DSL%20-%20Control%20structures) '
'Shows how to use conditional execution and '
'exit handlers. This pipeline will randomly '
'fail to demonstrate that the exit handler gets '
'executed even in case of failure.',
'error': None,
'id': '36b09aa0-a317-4ad4-a0ed-ddf55a485eb0',
'name': '[Tutorial] DSL - Control structures',
'parameters': None,
'resource_references': None,
'url': None},
{'created_at': datetime.datetime(2022, 5, 24, 6, 46, 45, tzinfo=tzlocal()),
'default_version': {'code_source_url': None,
'created_at': datetime.datetime(2022, 5, 24, 6, 46, 45, tzinfo=tzlocal()),
'id': 'da2bc8b4-27f2-4aa3-befb-c53487d9db49',
'name': 'test',
'package_url': None,
'parameters': [{'name': 'a', 'value': '1'},
{'name': 'b', 'value': '7'}],
'resource_references': [{'key': {'id': 'da2bc8b4-27f2-4aa3-befb-c53487d9db49',
'type': 'PIPELINE'},
'name': None,
'relationship': 'OWNER'}]},
'description': 'test',
'error': None,
'id': 'da2bc8b4-27f2-4aa3-befb-c53487d9db49',
'name': 'test',
'parameters': [{'name': 'a', 'value': '1'},
{'name': 'b', 'value': '7'}],
'resource_references': None,
'url': None}],
'total_size': 5}