Python源码示例:asyncio.LifoQueue()
示例1
def _push_object_producer(queue: asyncio.LifoQueue, object_service_root: str, object_service_headers: dict,
multipart_chunk_size: int, upload_chunk_size: int,
objects: List[PushObject]) -> None:
"""Async method to populate the queue with upload requests
Args:
queue: The current work queue
object_service_root: The root URL to use for all objects, including the namespace and dataset name
object_service_headers: The headers to use when requesting signed urls, including auth info
multipart_chunk_size: Size in bytes for break a file apart for multi-part uploading
upload_chunk_size: Size in bytes for streaming IO chunks
objects: A list of PushObjects to push
Returns:
None
"""
for obj in objects:
presigned_request = PresignedS3Upload(object_service_root,
object_service_headers,
multipart_chunk_size,
upload_chunk_size,
obj)
await queue.put(presigned_request)
示例2
def _pull_object_producer(queue: asyncio.LifoQueue, object_service_root: str, object_service_headers: dict,
download_chunk_size: int, objects: List[PullObject]) -> None:
"""Async method to populate the queue with download requests
Args:
queue: The current work queue
object_service_root: The root URL to use for all objects, including the namespace and dataset name
object_service_headers: The headers to use when requesting signed urls, including auth info
download_chunk_size: Size in bytes for streaming IO chunks
objects: A list of PullObjects to push
Returns:
None
"""
for obj in objects:
# Create object destination dir if needed
obj_dir, _ = obj.object_path.rsplit('/', 1)
os.makedirs(obj_dir, exist_ok=True) # type: ignore
# Populate queue with item for object
presigned_request = PresignedS3Download(object_service_root,
object_service_headers,
download_chunk_size,
obj)
await queue.put(presigned_request)
示例3
def __init__(self, nethost: str, netport: int,
database: str,
user: str,
concurrency: int,
protocol: str,
**kwargs):
super().__init__(**kwargs)
self._compiler_pool_size = 0
if protocol != self.get_proto_name():
raise RuntimeError(f'unknown protocol {protocol!r}')
if concurrency <= 0 or concurrency > defines.HTTP_PORT_MAX_CONCURRENCY:
raise RuntimeError(
f'concurrency must be greater than 0 and '
f'less than {defines.HTTP_PORT_MAX_CONCURRENCY}')
self._compilers = asyncio.LifoQueue()
self._pgcons = asyncio.LifoQueue()
self._compilers_list = []
self._pgcons_list = []
self._nethost = nethost
self._netport = netport
self.database = database
self.user = user
self.concurrency = concurrency
self.last_minute_requests = windowedsum.WindowedSum()
self._http_proto_server = None
self._http_request_logger = None
self._query_cache = cache.StatementsCache(
maxsize=defines.HTTP_PORT_QUERY_CACHE_SIZE)
示例4
def using_queues():
q = asyncio.Queue()
q.put_nowait('Hello')
await q.get()
await q.put('world')
q.get_nowait()
pq = asyncio.PriorityQueue()
stack = asyncio.LifoQueue()
示例5
def test_order(self):
q = asyncio.LifoQueue(loop=self.loop)
for i in [1, 3, 2]:
q.put_nowait(i)
items = [q.get_nowait() for _ in range(3)]
self.assertEqual([2, 3, 1], items)
示例6
def test_order(self):
q = asyncio.LifoQueue(loop=self.loop)
for i in [1, 3, 2]:
q.put_nowait(i)
items = [q.get_nowait() for _ in range(3)]
self.assertEqual([2, 3, 1], items)
示例7
def _process_standard_upload(self, queue: asyncio.LifoQueue, session: aiohttp.ClientSession,
presigned_request: PresignedS3Upload, progress_update_fn: Callable) -> None:
"""Method to handle the standard single request upload workflow.
If a presigned url has not been generated, get it. if it has, put the file contents.
Args:
queue: The current work queue
session: The current aiohttp session
presigned_request: the current PresignedS3Upload object to process
progress_update_fn: A callable with arg "completed_bytes" (int) indicating how many bytes have been
uploaded in since last called
Returns:
None
"""
if not presigned_request.is_presigned:
# Fetch the signed URL
await presigned_request.get_presigned_s3_url(session)
queue.put_nowait(presigned_request)
else:
# Process S3 Upload
try:
await presigned_request.put_object(session, progress_update_fn)
self.successful_requests.append(presigned_request)
presigned_request.remove_compressed_file()
except Exception:
presigned_request.remove_compressed_file()
raise
示例8
def test_order(self):
q = asyncio.LifoQueue(loop=self.loop)
for i in [1, 3, 2]:
q.put_nowait(i)
items = [q.get_nowait() for _ in range(3)]
self.assertEqual([2, 3, 1], items)
示例9
def _init():
parallelism = config.get('parallelism', 2)
if parallelism < 2:
logger.warning('Parallelism less than 2, custom judge will not be supported.')
logger.info('Using parallelism: %d', parallelism)
sandboxes_task = create_sandboxes(parallelism)
global _lock, _queue
_lock = Lock()
_queue = LifoQueue()
put_sandbox(*get_event_loop().run_until_complete(sandboxes_task))
示例10
def test_order(self):
q = asyncio.LifoQueue(loop=self.loop)
for i in [1, 3, 2]:
q.put_nowait(i)
items = [q.get_nowait() for _ in range(3)]
self.assertEqual([2, 3, 1], items)
示例11
def _async__init__(self):
if self._initialized:
return
if self._initializing:
raise errors.InterfaceError(
'pool is being initialized in another task')
if self._closed:
raise errors.InterfaceError('pool is closed')
self._initializing = True
self._queue = asyncio.LifoQueue(maxsize=self._maxsize)
for _ in range(self._maxsize):
ch = PoolConnectionHolder(
self,
on_acquire=self._on_acquire,
on_release=self._on_release)
self._holders.append(ch)
self._queue.put_nowait(ch)
try:
await self._initialize()
return self
finally:
self._initializing = False
self._initialized = True
示例12
def _initialize(self):
self._queue = asyncio.LifoQueue(maxsize=self._maxsize)
for _ in range(self._maxsize):
ch = PoolConnectionHolder(
self,
max_queries=self._max_queries,
max_inactive_time=self._max_inactive_connection_lifetime,
setup=self._setup)
self._holders.append(ch)
self._queue.put_nowait(ch)
if self._minsize:
# Since we use a LIFO queue, the first items in the queue will be
# the last ones in `self._holders`. We want to pre-connect the
# first few connections in the queue, therefore we want to walk
# `self._holders` in reverse.
# Connect the first connection holder in the queue so that it
# can record `_working_addr` and `_working_opts`, which will
# speed up successive connection attempts.
first_ch = self._holders[-1] # type: PoolConnectionHolder
await first_ch.connect()
if self._minsize > 1:
connect_tasks = []
for i, ch in enumerate(reversed(self._holders[:-1])):
# `minsize - 1` because we already have first_ch
if i >= self._minsize - 1:
break
connect_tasks.append(ch.connect())
await asyncio.gather(*connect_tasks)
示例13
def test_order(self):
q = asyncio.LifoQueue(loop=self.loop)
for i in [1, 3, 2]:
q.put_nowait(i)
items = [q.get_nowait() for _ in range(3)]
self.assertEqual([2, 3, 1], items)
示例14
def _process_multipart_upload(self, queue: asyncio.LifoQueue, session: aiohttp.ClientSession,
presigned_request: PresignedS3Upload, progress_update_fn: Callable) -> None:
"""Method to handle the complex multipart upload workflow.
1. Create a multipart upload and get the ID
2. Upload all parts
3. Complete the part and mark the PresignedS3Upload object as successful
Args:
queue: The current work queue
session: The current aiohttp session
presigned_request: the current PresignedS3Upload object to process
progress_update_fn: A callable with arg "completed_bytes" (int) indicating how many bytes have been
uploaded in since last called
Returns:
None
"""
if not presigned_request.multipart_upload_id:
# Create a multipart upload and create parts
await presigned_request.prepare_multipart_upload(session)
# Requeue for more processing
queue.put_nowait(presigned_request)
else:
try:
if presigned_request.current_part:
if not presigned_request.is_presigned:
# Fetch the signed URL
await presigned_request.get_presigned_s3_url(session)
queue.put_nowait(presigned_request)
else:
# Process S3 Upload, mark the part as done, and requeue it
etag = await presigned_request.put_object(session, progress_update_fn)
presigned_request.mark_current_part_complete(etag)
queue.put_nowait(presigned_request)
else:
# If you get here, you are done and should complete the upload
await presigned_request.complete_multipart_upload(session)
self.successful_requests.append(presigned_request)
except Exception:
presigned_request.remove_compressed_file()
raise
示例15
def _push_object_consumer(self, queue: asyncio.LifoQueue, session: aiohttp.ClientSession,
progress_update_fn: Callable) -> None:
"""Async Queue consumer worker for pushing objects to the object service/s3
Args:
queue: The current work queue
session: The current aiohttp session
progress_update_fn: A callable with arg "completed_bytes" (int) indicating how many bytes have been
uploaded in since last called
Returns:
None
"""
while True:
presigned_request: PresignedS3Upload = await queue.get()
try:
if presigned_request.skip_object is False:
if presigned_request.is_multipart:
# Run multipart upload workflow
await self._process_multipart_upload(queue, session, presigned_request, progress_update_fn)
else:
# Run standard, single-request workflow
await self._process_standard_upload(queue, session, presigned_request, progress_update_fn)
else:
# Object skipped because it already exists in the backend (object level de-duplicating)
logger.info(f"Skipping duplicate download {presigned_request.object_details.dataset_path}")
progress_update_fn(os.path.getsize(presigned_request.object_details.object_path))
self.successful_requests.append(presigned_request)
except Exception as err:
logger.exception(err)
self.failed_requests.append(presigned_request)
if presigned_request.is_multipart and presigned_request.multipart_upload_id is not None:
# Make best effort to abort a multipart upload if needed
try:
await presigned_request.abort_multipart_upload(session)
except Exception as err:
logger.error(f"An error occured while trying to abort multipart upload "
f"{presigned_request.multipart_upload_id} for {presigned_request.object_id}")
logger.exception(err)
# Notify the queue that the item has been processed
queue.task_done()
示例16
def _run_push_pipeline(self, object_service_root: str, object_service_headers: dict,
objects: List[PushObject], progress_update_fn: Callable,
multipart_chunk_size: int, upload_chunk_size: int = 4194304,
num_workers: int = 4) -> None:
"""Method to run the async upload pipeline
Args:
object_service_root: The root URL to use for all objects, including the namespace and dataset name
object_service_headers: The headers to use when requesting signed urls, including auth info
objects: A list of PushObjects to push
progress_update_fn: A callable with arg "completed_bytes" (int) indicating how many bytes have been
uploaded in since last called
multipart_chunk_size: Size in bytes for break a file apart for multi-part uploading
upload_chunk_size: Size in bytes for streaming IO chunks
num_workers: the number of consumer workers to start
Returns:
"""
# We use a LifoQueue to ensure S3 uploads start as soon as they are ready to help ensure pre-signed urls do
# not timeout before they can be used if there are a lot of files.
queue: asyncio.LifoQueue = asyncio.LifoQueue()
async with aiohttp.ClientSession() as session:
# Start workers
workers = []
for i in range(num_workers):
task = asyncio.ensure_future(self._push_object_consumer(queue, session, progress_update_fn))
workers.append(task)
# Populate the work queue
await self._push_object_producer(queue,
object_service_root,
object_service_headers,
multipart_chunk_size,
upload_chunk_size,
objects)
# wait until the consumer has processed all items
await queue.join()
# the workers are still awaiting for work so close them
for worker in workers:
worker.cancel()
示例17
def _run_pull_pipeline(self, object_service_root: str, object_service_headers: dict,
objects: List[PullObject], progress_update_fn: Callable,
download_chunk_size: int = 4194304, num_workers: int = 4) -> None:
"""Method to run the async download pipeline
Args:
object_service_root: The root URL to use for all objects, including the namespace and dataset name
object_service_headers: The headers to use when requesting signed urls, including auth info
objects: A list of PushObjects to push
progress_update_fn: A callable with arg "completed_bytes" (int) indicating how many bytes have been
downloaded in since last called
download_chunk_size: Size in bytes for streaming IO chunks
num_workers: the number of consumer workers to start
Returns:
"""
# We use a LifoQueue to ensure S3 uploads start as soon as they are ready to help ensure pre-signed urls do
# not timeout before they can be used if there are a lot of files.
queue: asyncio.LifoQueue = asyncio.LifoQueue()
timeout = aiohttp.ClientTimeout(total=None, connect=None,
sock_connect=None, sock_read=None)
async with aiohttp.ClientSession(timeout=timeout) as session:
workers = []
for i in range(num_workers):
task = asyncio.ensure_future(self._pull_object_consumer(queue, session, progress_update_fn))
workers.append(task)
# Populate the work queue
await self._pull_object_producer(queue,
object_service_root,
object_service_headers,
download_chunk_size,
objects)
# wait until the consumer has processed all items
await queue.join()
# the workers are still awaiting for work so close them
for worker in workers:
worker.cancel()