Python源码示例:asyncio.gather()
示例1
def _async_wait_for_process(
future_process: Any,
out: Optional[Union[TeeCapture, IO[str]]] = sys.stdout,
err: Optional[Union[TeeCapture, IO[str]]] = sys.stderr
) -> CommandOutput:
"""Awaits the creation and completion of an asynchronous process.
Args:
future_process: The eventually created process.
out: Where to write stuff emitted by the process' stdout.
err: Where to write stuff emitted by the process' stderr.
Returns:
A (captured output, captured error output, return code) triplet.
"""
process = await future_process
future_output = _async_forward(process.stdout, out)
future_err_output = _async_forward(process.stderr, err)
output, err_output = await asyncio.gather(future_output, future_err_output)
await process.wait()
return CommandOutput(output, err_output, process.returncode)
示例2
def test_main(self):
# Need to run within a Task, as the scope manager depends
# on Task.current_task()
async def main_task():
with self.tracer.start_active_span('parent'):
tasks = self.submit_callbacks()
await asyncio.gather(*tasks)
self.loop.create_task(main_task())
stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) >= 4)
self.loop.run_forever()
spans = self.tracer.finished_spans()
self.assertEquals(len(spans), 4)
self.assertNamesEqual(spans, ['task', 'task', 'task', 'parent'])
for i in range(3):
self.assertSameTrace(spans[i], spans[-1])
self.assertIsChildOf(spans[i], spans[-1])
示例3
def test_websocket_non_regression_bug_105(event_loop, server):
# This test will check a fix to a race condition which happens if the user is trying
# to connect using the same client twice at the same time
# See bug #105
url = f"ws://{server.hostname}:{server.port}/graphql"
print(f"url = {url}")
sample_transport = WebsocketsTransport(url=url)
client = Client(transport=sample_transport)
# Create a coroutine which start the connection with the transport but does nothing
async def client_connect(client):
async with client:
await asyncio.sleep(2 * MS)
# Create two tasks which will try to connect using the same client (not allowed)
connect_task1 = asyncio.ensure_future(client_connect(client))
connect_task2 = asyncio.ensure_future(client_connect(client))
with pytest.raises(TransportAlreadyConnected):
await asyncio.gather(connect_task1, connect_task2)
示例4
def make_query_loop(tmpdir, config_data, registry):
query_loops = []
def make_loop():
config_file = tmpdir / "config.yaml"
config_file.write_text(yaml.dump(config_data), "utf-8")
with config_file.open() as fh:
config = load_config(fh, logging.getLogger())
registry.create_metrics(config.metrics.values())
query_loop = loop.QueryLoop(config, registry, logging)
query_loops.append(query_loop)
return query_loop
yield make_loop
await asyncio.gather(
*(query_loop.stop() for query_loop in query_loops), return_exceptions=True,
)
示例5
def _do_ops(self, ops):
try:
for r in await asyncio.gather(*ops, return_exceptions=True):
if isinstance(r, MessageNotModifiedError):
logging.debug("db not modified", exc_info=r)
elif isinstance(r, Exception):
raise r # Makes more sense to raise even for MessageEditTimeExpiredError
elif not isinstance(r, Message):
logging.debug("unknown ret from gather, %r", r)
except MessageEditTimeExpiredError:
logging.debug("Making new channel.")
_db = self.db
self.db = None
await self._client(DeleteChannelRequest(channel=_db))
return True
return False
示例6
def test_main(self):
# Need to run within a Task, as the scope manager depends
# on Task.current_task()
async def main_task():
with self.tracer.start_active_span('parent'):
tasks = self.submit_callbacks()
await asyncio.gather(*tasks)
self.loop.create_task(main_task())
stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) >= 4)
self.loop.run_forever()
spans = self.tracer.finished_spans()
self.assertEqual(len(spans), 4)
self.assertNamesEqual(spans, ['task', 'task', 'task', 'parent'])
for i in range(3):
self.assertSameTrace(spans[i], spans[-1])
self.assertIsChildOf(spans[i], spans[-1])
示例7
def _cancel_all_tasks(loop: asyncio.AbstractEventLoop) -> None:
tasks = [task for task in asyncio.all_tasks(loop) if not task.done()]
if not tasks:
return
for task in tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*tasks, loop=loop, return_exceptions=True))
for task in tasks:
if not task.cancelled() and task.exception() is not None:
loop.call_exception_handler(
{
"message": "unhandled exception during shutdown",
"exception": task.exception(),
"task": task,
}
)
示例8
def test_task_local() -> None:
local_ = TaskLocal()
queue: asyncio.Queue = asyncio.Queue()
tasks = 2
for _ in range(tasks):
queue.put_nowait(None)
async def _test_local(value: int) -> int:
local_.test = value
await queue.get()
queue.task_done()
await queue.join()
return local_.test
futures = [asyncio.ensure_future(_test_local(value)) for value in range(tasks)]
asyncio.gather(*futures)
for value, future in enumerate(futures):
assert (await future) == value
示例9
def import_file(filename):
log.info("import_file: {}".format(filename))
loop = globals["loop"]
max_concurrent_tasks = config.get("max_concurrent_tasks")
tasks = []
with open(filename, 'r') as fh:
for line in fh:
line = line.rstrip()
#loop.run_until_complete(import_line(line))
tasks.append(asyncio.ensure_future(import_line(line)))
if len(tasks) < max_concurrent_tasks:
continue # get next line
# got a batch, move them out!
loop.run_until_complete(asyncio.gather(*tasks))
tasks = []
# finish any remaining tasks
loop.run_until_complete(asyncio.gather(*tasks))
globals["files_read"] += 1
示例10
def _get_dispatches(
self, payload: Payload, request: Request
) -> Tuple[Payload, Set[str]]:
# Run all dispatchers to form our initial set of worker plugins to run
worker_plugins: Set[str] = set(
self.always_dispatch
) if self.always_dispatch else set()
dispatch_results: List[Set[str]] = await asyncio.gather( # type: ignore
*[
self._apply_dispatcher(dispatcher, payload, request)
for dispatcher in self._loaded_dispatcher_plugins.values()
]
)
for dispatch_result in dispatch_results:
worker_plugins.update(dispatch_result)
return payload, worker_plugins
示例11
def test_increment_lock(self):
"""Test that we can't produce a race condition in .increment."""
await self.cog.redis.set("test_key", 0)
tasks = []
# Increment this a lot in different tasks
for _ in range(100):
task = asyncio.create_task(
self.cog.redis.increment("test_key", 1)
)
tasks.append(task)
await asyncio.gather(*tasks)
# Confirm that the value has been incremented the exact right number of times.
value = await self.cog.redis.get("test_key")
self.assertEqual(value, 100)
示例12
def refresh_inventory(self) -> None:
"""Refresh internal documentation inventory."""
log.debug("Refreshing documentation inventory...")
# Clear the old base URLS and inventories to ensure
# that we start from a fresh local dataset.
# Also, reset the cache used for fetching documentation.
self.base_urls.clear()
self.inventories.clear()
self.renamed_symbols.clear()
async_cache.cache = OrderedDict()
# Run all coroutines concurrently - since each of them performs a HTTP
# request, this speeds up fetching the inventory data heavily.
coros = [
self.update_single(
package["package"], package["base_url"], package["inventory_url"]
) for package in await self.bot.api_client.get('bot/documentation-links')
]
await asyncio.gather(*coros)
示例13
def main():
loop = asyncio.get_event_loop()
tsk = asyncio.ensure_future(pubsub(), loop=loop)
async def publish():
pub = await aioredis.create_redis(
'redis://localhost')
while not tsk.done():
# wait for clients to subscribe
while True:
subs = await pub.pubsub_numsub('channel:1')
if subs[b'channel:1'] == 1:
break
await asyncio.sleep(0, loop=loop)
# publish some messages
for msg in ['one', 'two', 'three']:
await pub.publish('channel:1', msg)
# send stop word
await pub.publish('channel:1', STOPWORD)
pub.close()
await pub.wait_closed()
loop.run_until_complete(asyncio.gather(publish(), tsk, loop=loop))
示例14
def pipeline(self):
"""Returns :class:`Pipeline` object to execute bulk of commands.
It is provided for convenience.
Commands can be pipelined without it.
Example:
>>> pipe = redis.pipeline()
>>> fut1 = pipe.incr('foo') # NO `await` as it will block forever!
>>> fut2 = pipe.incr('bar')
>>> result = await pipe.execute()
>>> result
[1, 1]
>>> await asyncio.gather(fut1, fut2)
[1, 1]
>>> #
>>> # The same can be done without pipeline:
>>> #
>>> fut1 = redis.incr('foo') # the 'INCRY foo' command already sent
>>> fut2 = redis.incr('bar')
>>> await asyncio.gather(fut1, fut2)
[2, 2]
"""
return Pipeline(self._pool_or_conn, self.__class__)
示例15
def _do_close(self):
# TODO: lock
tasks = []
task, self._monitor_task = self._monitor_task, None
task.cancel()
tasks.append(task)
while self._pools:
pool = self._pools.pop(0)
pool.close()
tasks.append(pool.wait_closed())
while self._masters:
_, pool = self._masters.popitem()
pool.close()
tasks.append(pool.wait_closed())
while self._slaves:
_, pool = self._slaves.popitem()
pool.close()
tasks.append(pool.wait_closed())
await asyncio.gather(*tasks)
示例16
def execute_pubsub(self, command, *channels):
"""Executes Redis (p)subscribe/(p)unsubscribe commands.
ConnectionsPool picks separate connection for pub/sub
and uses it until explicitly closed or disconnected
(unsubscribing from all channels/patterns will leave connection
locked for pub/sub use).
There is no auto-reconnect for this PUB/SUB connection.
Returns asyncio.gather coroutine waiting for all channels/patterns
to receive answers.
"""
conn, address = self.get_connection(command)
if conn is not None:
return conn.execute_pubsub(command, *channels)
else:
return self._wait_execute_pubsub(address, command, channels, {})
示例17
def test_pool_size_growth(create_pool, server):
pool = await create_pool(
server.tcp_address,
minsize=1, maxsize=1)
done = set()
tasks = []
async def task1(i):
with (await pool):
assert pool.size <= pool.maxsize
assert pool.freesize == 0
await asyncio.sleep(0.2)
done.add(i)
async def task2():
with (await pool):
assert pool.size <= pool.maxsize
assert pool.freesize >= 0
assert done == {0, 1}
for _ in range(2):
tasks.append(asyncio.ensure_future(task1(_)))
tasks.append(asyncio.ensure_future(task2()))
await asyncio.gather(*tasks)
示例18
def test_subscribe_concurrency(create_redis, server):
sub = await create_redis(server.tcp_address)
pub = await create_redis(server.tcp_address)
async def subscribe(*args):
return await sub.subscribe(*args)
async def publish(*args):
await asyncio.sleep(0)
return await pub.publish(*args)
res = await asyncio.gather(
subscribe('channel:0'),
publish('channel:0', 'Hello'),
subscribe('channel:1'),
)
(ch1,), subs, (ch2,) = res
assert ch1.name == b'channel:0'
assert subs == 1
assert ch2.name == b'channel:1'
示例19
def test_blpop_blocking_features(redis, create_redis, server):
key1, key2 = b'key:blpop:1', b'key:blpop:2'
value = b'blpop:value:2'
other_redis = await create_redis(server.tcp_address)
# create blocking task in separate connection
consumer = other_redis.blpop(key1, key2)
producer_task = asyncio.ensure_future(
push_data_with_sleep(redis, key2, value))
results = await asyncio.gather(consumer, producer_task)
assert results[0] == [key2, value]
assert results[1] == 1
# wait for data with timeout, list is emtpy, so blpop should
# return None in 1 sec
waiter = redis.blpop(key1, key2, timeout=1)
test_value = await waiter
assert test_value is None
other_redis.close()
示例20
def test_brpoplpush_blocking_features(redis, create_redis, server):
source = b'key:brpoplpush:12'
value = b'brpoplpush:value:2'
destkey = b'destkey:brpoplpush:2'
other_redis = await create_redis(
server.tcp_address)
# create blocking task
consumer_task = other_redis.brpoplpush(source, destkey)
producer_task = asyncio.ensure_future(
push_data_with_sleep(redis, source, value))
results = await asyncio.gather(consumer_task, producer_task)
assert results[0] == value
assert results[1] == 1
# make sure that all values stored in new destkey list
test_value = await redis.lrange(destkey, 0, -1)
assert test_value == [value]
# wait for data with timeout, list is emtpy, so brpoplpush should
# return None in 1 sec
waiter = redis.brpoplpush(source, destkey, timeout=1)
test_value = await waiter
assert test_value is None
other_redis.close()
示例21
def _cleanup_remaining_tasks(loop, logger):
# https://stackoverflow.com/questions/30765606/whats-the-correct-way-to-clean-up-after-an-interrupted-event-loop
# https://medium.com/python-pandemonium/asyncio-coroutine-patterns-beyond-await-a6121486656f
# Handle shutdown gracefully by waiting for all tasks to be cancelled
not_done_tasks = [task for task in asyncio.Task.all_tasks(loop=loop) if not task.done()]
if not_done_tasks:
logger.info("cancelling all remaining tasks")
# NOTE: following code cancels all tasks - possibly not ours as well
remaining_tasks = asyncio.gather(*not_done_tasks, loop=loop, return_exceptions=True)
remaining_tasks.add_done_callback(lambda t: loop.stop())
logger.debug("remaining tasks = {}".format(not_done_tasks))
remaining_tasks.cancel()
# Keep the event loop running until it is either destroyed or all
# tasks have really terminated
loop.run_until_complete(remaining_tasks)
示例22
def _cleanup_remaining_tasks(loop, logger):
# https://stackoverflow.com/questions/30765606/whats-the-correct-way-to-clean-up-after-an-interrupted-event-loop
# https://medium.com/python-pandemonium/asyncio-coroutine-patterns-beyond-await-a6121486656f
# Handle shutdown gracefully by waiting for all tasks to be cancelled
not_done_tasks = [task for task in asyncio.Task.all_tasks(loop=loop) if not task.done()]
if not_done_tasks:
logger.info("cancelling all remaining tasks")
# NOTE: following code cancels all tasks - possibly not ours as well
remaining_tasks = asyncio.gather(*not_done_tasks, loop=loop, return_exceptions=True)
remaining_tasks.add_done_callback(lambda t: loop.stop())
logger.debug("remaining tasks = {}".format(not_done_tasks))
remaining_tasks.cancel()
# Keep the event loop running until it is either destroyed or all
# tasks have really terminated
loop.run_until_complete(remaining_tasks)
示例23
def list_images(self) -> Sequence[Mapping[str, Any]]:
known_registries = await get_known_registries(self.etcd)
reverse_aliases = await self._scan_reverse_aliases()
data = await self.etcd.get_prefix('images')
coros = []
for registry, images in data.items():
if registry == '_aliases':
continue
for image, tags in images.items():
if image == '':
continue
if tags == '1':
continue
for tag, image_info in tags.items():
if tag == '':
continue
raw_ref = f'{etcd_unquote(registry)}/{etcd_unquote(image)}:{tag}'
ref = ImageRef(raw_ref, known_registries)
coros.append(self._parse_image(ref, image_info, reverse_aliases))
result = await asyncio.gather(*coros)
return result
示例24
def test_websocket_subscription_task_cancel(
event_loop, client_and_server, subscription_str
):
session, server = client_and_server
count = 10
subscription = gql(subscription_str.format(count=count))
async def task_coro():
nonlocal count
async for result in session.subscribe(subscription):
number = result["number"]
print(f"Number received: {number}")
assert number == count
count -= 1
task = asyncio.ensure_future(task_coro())
async def cancel_task_coro():
nonlocal task
await asyncio.sleep(11 * MS)
task.cancel()
cancel_task = asyncio.ensure_future(cancel_task_coro())
await asyncio.gather(task, cancel_task)
assert count > 0
示例25
def test_websocket_subscription_close_transport(
event_loop, client_and_server, subscription_str
):
session, server = client_and_server
count = 10
subscription = gql(subscription_str.format(count=count))
async def task_coro():
nonlocal count
async for result in session.subscribe(subscription):
number = result["number"]
print(f"Number received: {number}")
assert number == count
count -= 1
task = asyncio.ensure_future(task_coro())
async def close_transport_task_coro():
nonlocal task
await asyncio.sleep(11 * MS)
await session.transport.close()
close_transport_task = asyncio.ensure_future(close_transport_task_coro())
await asyncio.gather(task, close_transport_task)
assert count > 0
示例26
def test_websocket_two_queries_in_parallel(
event_loop, client_and_server, query_str
):
session, server = client_and_server
query = gql(query_str)
result1 = None
result2 = None
async def task1_coro():
nonlocal result1
result1 = await session.execute(query)
async def task2_coro():
nonlocal result2
result2 = await session.execute(query)
task1 = asyncio.ensure_future(task1_coro())
task2 = asyncio.ensure_future(task2_coro())
await asyncio.gather(task1, task2)
print("Query1 received:", result1)
print("Query2 received:", result2)
assert result1 == result2
示例27
def stop(self):
"""Stop timed query execution."""
coros = (call.stop() for call in self._timed_calls.values())
await asyncio.gather(*coros, return_exceptions=True)
self._timed_calls.clear()
coros = (db.close() for db in self._databases)
await asyncio.gather(*coros, return_exceptions=True)
示例28
def run_aperiodic_queries(self):
"""Run queries on request."""
coros = (
self._execute_query(query, dbname)
for query in self._aperiodic_queries
for dbname in query.databases
)
await asyncio.gather(*coros, return_exceptions=True)
示例29
def test_connect_lock(self, caplog, db):
"""The connect method has a lock to prevent concurrent calls."""
with caplog.at_level(logging.DEBUG):
await asyncio.gather(db.connect(), db.connect())
assert caplog.messages == ['connected to database "db"']
示例30
def test_execute_no_keep_disconnect_after_pending_queries(self):
"""The db is disconnected only after pending queries are run."""
db = DataBase("db", "sqlite://", keep_connected=False)
query1 = Query(
"query1", ["db"], [QueryMetric("metric1", [])], "SELECT 1.0 AS metric1"
)
query2 = Query(
"query1", ["db"], [QueryMetric("metric2", [])], "SELECT 1.0 AS metric2"
)
await db.connect()
await asyncio.gather(db.execute(query1), db.execute(query2))
assert not db.connected