Python源码示例:asyncio.DefaultEventLoopPolicy()
示例1
def test_get_event_loop_returns_running_loop(self):
class Policy(asyncio.DefaultEventLoopPolicy):
def get_event_loop(self):
raise NotImplementedError
loop = None
old_policy = asyncio.get_event_loop_policy()
try:
asyncio.set_event_loop_policy(Policy())
loop = asyncio.new_event_loop()
self.assertIs(asyncio._get_running_loop(), None)
async def func():
self.assertIs(asyncio.get_event_loop(), loop)
self.assertIs(asyncio._get_running_loop(), loop)
loop.run_until_complete(func())
finally:
asyncio.set_event_loop_policy(old_policy)
if loop is not None:
loop.close()
self.assertIs(asyncio._get_running_loop(), None)
示例2
def test_get_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
self.assertIsNone(policy._local._loop)
loop = policy.get_event_loop()
self.assertIsInstance(loop, asyncio.AbstractEventLoop)
self.assertIs(policy._local._loop, loop)
self.assertIs(loop, policy.get_event_loop())
loop.close()
示例3
def test_get_event_loop_calls_set_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
with mock.patch.object(
policy, "set_event_loop",
wraps=policy.set_event_loop) as m_set_event_loop:
loop = policy.get_event_loop()
# policy._local._loop must be set through .set_event_loop()
# (the unix DefaultEventLoopPolicy needs this call to attach
# the child watcher correctly)
m_set_event_loop.assert_called_with(loop)
loop.close()
示例4
def test_get_event_loop_after_set_none(self):
policy = asyncio.DefaultEventLoopPolicy()
policy.set_event_loop(None)
self.assertRaises(RuntimeError, policy.get_event_loop)
示例5
def test_get_event_loop_thread(self, m_current_thread):
def f():
policy = asyncio.DefaultEventLoopPolicy()
self.assertRaises(RuntimeError, policy.get_event_loop)
th = threading.Thread(target=f)
th.start()
th.join()
示例6
def test_new_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
loop = policy.new_event_loop()
self.assertIsInstance(loop, asyncio.AbstractEventLoop)
loop.close()
示例7
def test_set_event_loop_policy(self):
self.assertRaises(
AssertionError, asyncio.set_event_loop_policy, object())
old_policy = asyncio.get_event_loop_policy()
policy = asyncio.DefaultEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
self.assertIs(policy, asyncio.get_event_loop_policy())
self.assertIsNot(policy, old_policy)
示例8
def create_policy(self):
return asyncio.DefaultEventLoopPolicy()
示例9
def __init__(self, application=None):
self._default_loop = None
self._application = application
self._watcher_lock = threading.Lock()
self._watcher = None
self._policy = asyncio.DefaultEventLoopPolicy()
self._policy.new_event_loop = self.new_event_loop
self.get_event_loop = self._policy.get_event_loop
self.set_event_loop = self._policy.set_event_loop
示例10
def test_get_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
self.assertIsNone(policy._local._loop)
loop = policy.get_event_loop()
self.assertIsInstance(loop, asyncio.AbstractEventLoop)
self.assertIs(policy._local._loop, loop)
self.assertIs(loop, policy.get_event_loop())
loop.close()
示例11
def test_get_event_loop_calls_set_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
with mock.patch.object(
policy, "set_event_loop",
wraps=policy.set_event_loop) as m_set_event_loop:
loop = policy.get_event_loop()
# policy._local._loop must be set through .set_event_loop()
# (the unix DefaultEventLoopPolicy needs this call to attach
# the child watcher correctly)
m_set_event_loop.assert_called_with(loop)
loop.close()
示例12
def test_get_event_loop_after_set_none(self):
policy = asyncio.DefaultEventLoopPolicy()
policy.set_event_loop(None)
self.assertRaises(RuntimeError, policy.get_event_loop)
示例13
def test_get_event_loop_thread(self, m_current_thread):
def f():
policy = asyncio.DefaultEventLoopPolicy()
self.assertRaises(RuntimeError, policy.get_event_loop)
th = threading.Thread(target=f)
th.start()
th.join()
示例14
def test_new_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
loop = policy.new_event_loop()
self.assertIsInstance(loop, asyncio.AbstractEventLoop)
loop.close()
示例15
def test_set_event_loop_policy(self):
self.assertRaises(
AssertionError, asyncio.set_event_loop_policy, object())
old_policy = asyncio.get_event_loop_policy()
policy = asyncio.DefaultEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
self.assertIs(policy, asyncio.get_event_loop_policy())
self.assertIsNot(policy, old_policy)
示例16
def create_policy(self):
return asyncio.DefaultEventLoopPolicy()
示例17
def pytest_generate_tests(metafunc): # type: ignore
if 'loop_factory' not in metafunc.fixturenames:
return
loops = metafunc.config.option.aiohttp_loop
avail_factories = {'pyloop': asyncio.DefaultEventLoopPolicy}
if uvloop is not None: # pragma: no cover
avail_factories['uvloop'] = uvloop.EventLoopPolicy
if tokio is not None: # pragma: no cover
avail_factories['tokio'] = tokio.EventLoopPolicy
if loops == 'all':
loops = 'pyloop,uvloop?,tokio?'
factories = {} # type: ignore
for name in loops.split(','):
required = not name.endswith('?')
name = name.strip(' ?')
if name not in avail_factories: # pragma: no cover
if required:
raise ValueError(
"Unknown loop '%s', available loops: %s" % (
name, list(factories.keys())))
else:
continue
factories[name] = avail_factories[name]
metafunc.parametrize("loop_factory",
list(factories.values()),
ids=list(factories.keys()))
示例18
def test_get_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
self.assertIsNone(policy._local._loop)
loop = policy.get_event_loop()
self.assertIsInstance(loop, asyncio.AbstractEventLoop)
self.assertIs(policy._local._loop, loop)
self.assertIs(loop, policy.get_event_loop())
loop.close()
示例19
def test_get_event_loop_calls_set_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
with mock.patch.object(
policy, "set_event_loop",
wraps=policy.set_event_loop) as m_set_event_loop:
loop = policy.get_event_loop()
# policy._local._loop must be set through .set_event_loop()
# (the unix DefaultEventLoopPolicy needs this call to attach
# the child watcher correctly)
m_set_event_loop.assert_called_with(loop)
loop.close()
示例20
def test_get_event_loop_after_set_none(self):
policy = asyncio.DefaultEventLoopPolicy()
policy.set_event_loop(None)
self.assertRaises(RuntimeError, policy.get_event_loop)
示例21
def test_get_event_loop_thread(self, m_current_thread):
def f():
policy = asyncio.DefaultEventLoopPolicy()
self.assertRaises(RuntimeError, policy.get_event_loop)
th = threading.Thread(target=f)
th.start()
th.join()
示例22
def test_new_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
loop = policy.new_event_loop()
self.assertIsInstance(loop, asyncio.AbstractEventLoop)
loop.close()
示例23
def test_set_event_loop_policy(self):
self.assertRaises(
AssertionError, asyncio.set_event_loop_policy, object())
old_policy = asyncio.get_event_loop_policy()
policy = asyncio.DefaultEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
self.assertIs(policy, asyncio.get_event_loop_policy())
self.assertIsNot(policy, old_policy)
示例24
def create_policy(self):
return asyncio.DefaultEventLoopPolicy()
示例25
def test_get_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
self.assertIsNone(policy._local._loop)
loop = policy.get_event_loop()
self.assertIsInstance(loop, asyncio.AbstractEventLoop)
self.assertIs(policy._local._loop, loop)
self.assertIs(loop, policy.get_event_loop())
loop.close()
示例26
def test_get_event_loop_calls_set_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
with mock.patch.object(
policy, "set_event_loop",
wraps=policy.set_event_loop) as m_set_event_loop:
loop = policy.get_event_loop()
# policy._local._loop must be set through .set_event_loop()
# (the unix DefaultEventLoopPolicy needs this call to attach
# the child watcher correctly)
m_set_event_loop.assert_called_with(loop)
loop.close()
示例27
def test_get_event_loop_after_set_none(self):
policy = asyncio.DefaultEventLoopPolicy()
policy.set_event_loop(None)
self.assertRaises(RuntimeError, policy.get_event_loop)
示例28
def test_get_event_loop_thread(self, m_current_thread):
def f():
policy = asyncio.DefaultEventLoopPolicy()
self.assertRaises(RuntimeError, policy.get_event_loop)
th = threading.Thread(target=f)
th.start()
th.join()
示例29
def test_new_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
loop = policy.new_event_loop()
self.assertIsInstance(loop, asyncio.AbstractEventLoop)
loop.close()
示例30
def test_set_event_loop_policy(self):
self.assertRaises(
AssertionError, asyncio.set_event_loop_policy, object())
old_policy = asyncio.get_event_loop_policy()
policy = asyncio.DefaultEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
self.assertIs(policy, asyncio.get_event_loop_policy())
self.assertIsNot(policy, old_policy)