Python源码示例:docker.errors()
示例1
def test_export(docker_tasker, no_container):
if MOCK:
mock_docker()
container_dict = docker_tasker.create_container(INPUT_IMAGE, command=["/bin/bash"])
container_id = container_dict['Id']
try:
if no_container:
with pytest.raises(docker.errors.APIError):
docker_tasker.export_container('NOT_THERE')
else:
export_generator = docker_tasker.export_container(container_id)
for _ in export_generator:
pass
finally:
docker_tasker.remove_container(container_id)
示例2
def test_retry_method(retry_times):
my_args = ('some', 'new')
my_kwargs = {'one': 'first', 'two': 'second'}
(flexmock(sys.modules[__name__])
.should_call('my_func')
.with_args(*my_args, **my_kwargs)
.times(retry_times + 1))
(flexmock(time)
.should_receive('sleep')
.and_return(None))
if retry_times >= 0:
with pytest.raises(docker.errors.APIError):
retry(my_func, *my_args, retry=retry_times, **my_kwargs)
else:
retry(my_func, *my_args, retry=retry_times, **my_kwargs)
示例3
def __init__(self):
self._containers = None
self._images = None # displayed images
self._all_images = None # docker images -a
self._df = None
kwargs = {"version": "auto"}
kwargs.update(docker.utils.kwargs_from_env(assert_hostname=False))
try:
APIClientClass = docker.Client # 1.x
except AttributeError:
APIClientClass = docker.APIClient # 2.x
try:
self.client = APIClientClass(**kwargs)
except docker.errors.DockerException as ex:
raise TerminateApplication("can't establish connection to docker daemon: {0}".format(str(ex)))
self.scratch_image = RootImage(self)
# backend queries
示例4
def __init__(self, docker_config: DockerHostConfig, mock_client=None) -> None:
self.name = docker_config.name
self.docker_config = docker_config
if not docker_config.tls:
tls = None
else:
tls = docker.tls.TLSConfig(client_cert=(docker_config.tls_cert, docker_config.tls_key), verify=docker_config.tls_ca)
# Simplify testing
if mock_client is not None:
self.cli = mock_client
return
try:
self.cli = docker.DockerClient(base_url=docker_config.address, version="auto", tls=tls)
except docker.errors.DockerException as e:
raise ZoeException("Cannot connect to Docker host {} at address {}: {}".format(docker_config.name, docker_config.address, str(e)))
示例5
def terminate_container(self, docker_id: str, delete=False) -> None:
"""
Terminate a container.
:param docker_id: The container to terminate
:type docker_id: str
:param delete: If True, also delete the container files
:type delete: bool
:return: None
"""
try:
cont = self.cli.containers.get(docker_id)
except docker.errors.NotFound:
return
try:
if delete:
cont.remove(force=True)
else:
cont.stop(timeout=5)
except docker.errors.NotFound:
pass
except docker.errors.APIError as e:
log.warning(str(e))
示例6
def list(self, only_label=None, status=None) -> List[dict]:
"""
List running or defined containers.
:param only_label: filter containers with only a certain label
:param status: filter containers with only a certain status (one of restarting, running, paused, exited)
:return: a list of containers
"""
filters = {}
if only_label is not None:
filters['label'] = only_label
if status is not None:
filters['status'] = status
try:
ret = self.cli.containers.list(all=True, filters=filters)
except docker.errors.APIError as ex:
raise ZoeException(str(ex))
except requests.exceptions.RequestException as ex:
raise ZoeException(str(ex))
conts = []
for cont_info in ret:
conts.append(self._container_summary(cont_info))
return conts
示例7
def stats(self, docker_id: str, stream: bool):
"""Retrieves container stats based on resource usage."""
try:
cont = self.cli.containers.get(docker_id)
except docker.errors.NotFound:
raise ZoeException('Container not found')
except docker.errors.APIError as e:
raise ZoeException('Docker API error: {}'.format(e))
try:
return cont.stats(stream=stream)
except docker.errors.APIError as e:
raise ZoeException('Docker API error: {}'.format(e))
except requests.exceptions.ReadTimeout:
raise ZoeException('Read timeout')
except ValueError:
raise ZoeException('Docker API decoding error')
示例8
def logs(self, docker_id: str, stream: bool, follow=None):
"""
Retrieves the logs of the selected container.
:param docker_id:
:param stream:
:param follow:
:return:
"""
try:
cont = self.cli.containers.get(docker_id)
except (docker.errors.NotFound, docker.errors.APIError):
return None
try:
return cont.logs(stdout=True, stderr=True, follow=follow, stream=stream, timestamps=True, tail='all')
except docker.errors.APIError:
return None
示例9
def update(self, docker_id, cpu_quota=None, mem_reservation=None, mem_limit=None):
"""Update the resource reservation for a container."""
kwargs = {}
if cpu_quota is not None:
kwargs['cpu_quota'] = cpu_quota
if mem_reservation is not None:
kwargs['mem_reservation'] = mem_reservation
if mem_limit is not None:
kwargs['mem_limit'] = mem_limit
try:
cont = self.cli.containers.get(docker_id)
except (docker.errors.NotFound, docker.errors.APIError):
return
try:
cont.update(**kwargs)
except docker.errors.APIError:
pass
示例10
def build_apb(project, dockerfile=None, tag=None):
if dockerfile is None:
dockerfile = "Dockerfile"
spec = get_spec(project)
if 'version' not in spec:
print("APB spec does not have a listed version. Please update apb.yml")
exit(1)
if not tag:
tag = spec['name']
update_dockerfile(project, dockerfile)
print("Building APB using tag: [%s]" % tag)
try:
client = create_docker_client()
client.images.build(path=project, tag=tag, dockerfile=dockerfile)
except docker.errors.DockerException:
print("Error accessing the docker API. Is the daemon running?")
raise
print("Successfully built APB image: %s" % tag)
return tag
示例11
def image_available(self) -> bool:
"""Do we currently see an up-to-date Docker image?
Returns:
True if we've gotten an Image ID stored.
"""
if not self._image_id:
try:
self._image_id = self._client.images.get(name=self.image_tag).id
return True
except docker.errors.ImageNotFound:
return False
else:
return True
# Working with Containers
示例12
def pytest_report_header(config):
logger = get_logger('report_header')
msg = []
try:
client = create_client()
metadata = client.api.inspect_container('pytest_inline_scan')
except docker.errors.NotFound:
logger.info("No running container was found, can't add info to report header")
metadata = {'Config': {'Labels': {}}}
msg = ['Docker: Anchore inline_scan container not running yet']
except DockerException as e:
logger.exception('Unable to connect to a docker socket')
msg = ['Anchore Version: Unable to connect to a docker socket']
msg.append('Error: %s' % str(e))
return msg
labels = metadata['Config']['Labels']
version = labels.get('version', 'unknown')
commit = labels.get('anchore_commit', 'unknown')
msg.extend([
'Anchore Version: %s' % version,
'Anchore Commit: %s' % commit
])
return msg
示例13
def testDockerPipeChainErrorDetection(self, disableCaching=True):
"""
By default, executing cmd1 | cmd2 | ... | cmdN, will only return an
error if cmdN fails. This can lead to all manor of errors being
silently missed. This tests to make sure that the piping API for
dockerCall() throws an exception if non-last commands in the chain fail.
"""
options = Job.Runner.getDefaultOptions(os.path.join(self.tempDir,
'jobstore'))
options.logLevel = self.dockerTestLogLevel
options.workDir = self.tempDir
options.clean = 'always'
options.caching = disableCaching
A = Job.wrapJobFn(_testDockerPipeChainErrorFn)
rv = Job.Runner.startToil(A, options)
assert rv == True
示例14
def docker_logs(self, container_id, show_stdout, show_stderr, follow):
try:
return (self.cli.logs(container=container_id, stdout=show_stdout, stderr=show_stderr, follow=follow))\
.decode('utf-8')
except docker.errors.APIError as ex:
if "configured logging reader does not support reading" in str(ex):
message = "Docker logging driver is not set to be 'json-file' or 'journald'"
DagdaLogger.get_logger().error(message)
raise DagdaError(message)
else:
message = "Unexpected exception of type {0} occurred: {1!r}" \
.format(type(ex).__name__, str(ex))
DagdaLogger.get_logger().error(message)
raise ex
# Creates container and return the container id
示例15
def test_handle_missing_squash_cache(experimental_daemon, squashcache):
run_docker_make("-f data/secret-squash.yml cache-test invisible-secret")
client = helpers.get_client()
cachelayer = client.images.get("invisible-secret")
firstimg = client.images.get("cache-test")
for _id in ("cache-test", firstimg.id, "invisible_secret", cachelayer.id):
try:
client.images.remove(_id)
except docker.errors.ImageNotFound:
pass
# Make sure the image can rebuild even if original layers are missing
run_docker_make("-f data/secret-squash.yml cache-test")
# Sanity check - makes sure that the first image was in fact removed and not used for cache
assert client.images.get("cache-test").id != firstimg.id
示例16
def ensure_local_net(
network_name: str = DOCKER_STARCRAFT_NETWORK,
subnet_cidr: str = SUBNET_CIDR
) -> None:
"""
Create docker local net if not found.
:raises docker.errors.APIError
"""
logger.info(f"checking whether docker has network {network_name}")
ipam_pool = docker.types.IPAMPool(subnet=subnet_cidr)
ipam_config = docker.types.IPAMConfig(pool_configs=[ipam_pool])
networks = docker_client.networks.list(names=DOCKER_STARCRAFT_NETWORK)
output = networks[0].short_id if networks else None
if not output:
logger.info("network not found, creating ...")
output = docker_client.networks.create(DOCKER_STARCRAFT_NETWORK, ipam=ipam_config).short_id
logger.debug(f"docker network id: {output}")
示例17
def stream_error(chunk, operation, repo, tag):
"""Translate docker stream errors into a more digestable format"""
# grab the generic error and strip the useless Error: portion
message = chunk['error'].replace('Error: ', '')
# not all errors provide the code
if 'code' in chunk['errorDetail']:
# permission denied on the repo
if chunk['errorDetail']['code'] == 403:
message = 'Permission Denied attempting to {} image {}:{}'.format(operation, repo, tag)
raise RegistryException(message)
示例18
def setup_module(module):
if MOCK:
return
d = docker.Client() # TODO: current version of python-docker does not have Client anymore
try:
d.inspect_image(INPUT_IMAGE)
setattr(module, 'HAS_IMAGE', True)
except docker.errors.APIError:
d.pull(INPUT_IMAGE)
setattr(module, 'HAS_IMAGE', False)
示例19
def test_run_invalid_command(docker_tasker):
if MOCK:
mock_docker(should_raise_error={'start': []})
try:
with pytest.raises(docker.errors.APIError):
docker_tasker.run(input_image_name, command=COMMAND)
finally:
# remove the container
containers = docker_tasker.tasker.d.containers(all=True)
container_id = [c for c in containers if c["Command"] == COMMAND][0]['Id']
docker_tasker.remove_container(container_id)
示例20
def clean(command_args, prog, description):
parser = argparse.ArgumentParser(prog=prog, description=description)
_debug(parser)
parser.add_argument(
"-f",
"--force",
action="store_true",
help="Whether to remove images of running containers",
)
parser.add_argument(
"--no-download",
dest="download",
action="store_false",
help="If set, will not attempt to pull images before removing existing",
)
args = parser.parse_args(command_args)
coloredlogs.install(level=args.log_level)
docker_client = docker.from_env(version="auto")
if args.download:
logger.info("Pulling the latest docker images")
_pull_docker_images(docker_client)
logger.info("Deleting old docker images")
tags = set()
for image in docker_client.images.list():
tags.update(image.tags)
# If dev version, only remove old dev-tagged containers
for tag in sorted(tags):
if images.is_old(tag):
logger.info(f"Attempting to remove tag {tag}")
try:
docker_client.images.remove(tag, force=args.force)
logger.info(f"* Tag {tag} removed")
except docker.errors.APIError as e:
if not args.force and "(must force)" in str(e):
logger.exception(f"Cannot delete tag {tag}. Must use `--force`")
else:
raise
示例21
def display_inspect(self):
try:
return json.dumps(self.inspect().response, indent=2)
except docker.errors.NotFound:
raise NotAvailableAnymore()
示例22
def metadata_get(self, path, cached=True):
"""
get metadata from inspect, specified by path
:param path: list of str
:param cached: bool, use cached version of inspect if available
"""
try:
value = graceful_chain_get(self.inspect(cached=cached).response, *path)
except docker.errors.NotFound:
logger.warning("object %s is not available anymore", self)
raise NotAvailableAnymore()
return value
示例23
def layers(self):
"""
similar as parent images, except that it uses /history API endpoint
:return:
"""
# sample output:
# {
# "Created": 1457116802,
# "Id": "sha256:507cb13a216097710f0d234668bf64a4c92949c573ba15eba13d05aad392fe04",
# "Size": 204692029,
# "Tags": [
# "docker.io/fedora:latest"
# ],
# "Comment": "",
# "CreatedBy": "/bin/sh -c #(nop) ADD file:bcb5e5c... in /"
# }
try:
response = self.d.history(self.image_id)
except docker.errors.NotFound:
raise NotAvailableAnymore()
layers = []
for l in response:
layer_id = l["Id"]
if layer_id == "<missing>":
layers.append(DockerImage(l, self.docker_backend))
else:
layers.append(self.docker_backend.get_image_by_id(layer_id))
return layers
示例24
def inspect(self, cached=False):
if self._inspect is None or cached is False:
try:
self._inspect = self.d.inspect_image(self.image_id)
except docker.errors.NotFound:
self._inspect = self._inspect or {}
return self._inspect
示例25
def net(self):
"""
get ACTIVE port mappings of a container
:return: dict:
{
"host_port": "container_port"
}
"""
try:
return NetData(self.inspect(cached=True).response)
except docker.errors.NotFound:
raise NotAvailableAnymore()
示例26
def inspect(self, cached=False):
if cached is False or self._inspect is None:
try:
self._inspect = self.d.inspect_container(self.container_id)
except docker.errors.NotFound:
self._inspect = self._inspect or {}
return self._inspect
示例27
def logs(self, follow=False, lines="all"):
# when tail is set to all, it takes ages to populate widget
# docker-py does `inspect` in the background
try:
logs_data = self.d.logs(self.container_id, stream=follow, tail=lines)
except docker.errors.NotFound:
return None
else:
return logs_data
示例28
def handle_cleanup_exception(docker_error):
# It is okay to have these errors as
# it means resource not exist or being removed or killed already
allowed_errors = [404, 409]
if docker_error.status_code in allowed_errors:
pass
else:
raise
示例29
def stream_error(chunk, operation, repo, tag):
"""Translate docker stream errors into a more digestable format"""
# grab the generic error and strip the useless Error: portion
message = chunk['error'].replace('Error: ', '')
# not all errors provide the code
if 'code' in chunk['errorDetail']:
# permission denied on the repo
if chunk['errorDetail']['code'] == 403:
message = 'Permission Denied attempting to {} image {}:{}'.format(operation, repo, tag)
raise RegistryException(message)
示例30
def pull_image(self, image_name):
"""Pulls an image in the docker engine."""
try:
self.cli.images.pull(image_name)
except docker.errors.APIError as e:
log.error('Cannot download image {}: {}'.format(image_name, e))
raise ZoeException('Cannot download image {}: {}'.format(image_name, e))