Python源码示例:concurrent.futures.wait()
示例1
def await_future_or_eol(connection_observer, remain_time, start_time, timeout, logger):
# Observer lifetime started with its timeout clock
# but setting connection_observer._future may be delayed by nonempty commands queue.
# In such case we have to wait either for _future or timeout.
end_of_life = False
while (connection_observer._future is None) and (remain_time > 0.0):
time.sleep(0.005)
if connection_observer.done():
logger.debug("{} is done before creating future".format(connection_observer))
end_of_life = True
break
now = time.time()
already_passed = now - start_time
remain_time = timeout - already_passed
observer_lifetime_passed = now - connection_observer.life_status.start_time
remain_observer_lifetime = connection_observer.timeout + connection_observer.life_status.terminating_timeout\
- observer_lifetime_passed
# we timeout on earlier timeout (timeout or connection_observer.timeout)
if remain_observer_lifetime <= 0.0:
remain_time = 0.0
if remain_time <= 0.0:
logger.debug("{} timeout before creating future".format(connection_observer))
return end_of_life, remain_time
示例2
def test_infinite_loop_stops(self):
"""An infinite loop can be stopped after at least one iteration."""
class Inc(Runnable):
def __init__(self):
super(Inc, self).__init__()
self.first_run = threading.Event()
def next(self, state):
self.first_run.set()
return state.updated(cnt=state.cnt + 1)
loop = Loop(Inc())
state = loop.run(State(cnt=0))
# make sure loop body runnable is run at least once, then issue stop
loop.runnable.first_run.wait(timeout=1)
loop.stop()
self.assertTrue(state.result().cnt >= 1)
示例3
def test_basic(self):
wait = Wait()
state = State(x=1)
out = wait.run(state)
# wait block should not finish
done, not_done = futures.wait({out}, timeout=0.1)
self.assertEqual(len(done), 0)
self.assertEqual(len(not_done), 1)
# until we stop it
wait.stop()
done, not_done = futures.wait({out}, timeout=0.1)
self.assertEqual(len(done), 1)
self.assertEqual(len(not_done), 0)
self.assertEqual(out.result().x, 1)
示例4
def perform_requests(self):
signal.signal(signal.SIGINT, self.exit_fast)
signal.signal(signal.SIGTERM, self.exit_fast)
self.state = b'E'
for q_batch in self.get_batch():
for (_, _) in self.split_batch(q_batch):
if self.state != b"R":
self.state = b'R'
yield
continue
# wait for all batches to finish before returning
self.state = b'W'
while self.futures:
f_len = len(self.futures)
self.futures = [i for i in self.futures if not i.done()]
if f_len != len(self.futures):
self.ui.debug('Waiting for final requests to finish. '
'remaining requests: {}'
''.format(len(self.futures)))
wait(self.futures, return_when=FIRST_COMPLETED)
self.state = b'D'
yield True
示例5
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
示例6
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 6)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
示例7
def _block_and_handle_missing(method):
"""For ModelFuture file-retrieving property methods.
Block until completion and attempt to retrieve result.
Raise exception only if the result isn't found.
"""
@wraps(method)
def wrapper(self):
futures.wait((self,)) # Block until done
try:
return method(self)
except FileNotFoundError:
# We get here if the modeling job failed to produce
# any output and we don't have metadata.
if self.exception():
raise self.exception() from None
else:
raise
return wrapper
示例8
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
示例9
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 6)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
示例10
def _wait_for(fs, no_green_return_when, on_all_green_cb,
caller_name, timeout=None):
green_fs = sum(1 for f in fs if isinstance(f, futurist.GreenFuture))
if not green_fs:
done, not_done = futures.wait(fs, timeout=timeout,
return_when=no_green_return_when)
return DoneAndNotDoneFutures(done, not_done)
else:
non_green_fs = len(fs) - green_fs
if non_green_fs:
raise RuntimeError("Can not wait on %s green futures and %s"
" non-green futures in the same"
" `%s` call" % (green_fs, non_green_fs,
caller_name))
else:
return on_all_green_cb(fs, timeout=timeout)
示例11
def _wait_for_all_green(fs, timeout=None):
if not fs:
return DoneAndNotDoneFutures(set(), set())
with _acquire_and_release_futures(fs):
done, not_done = _partition_futures(fs)
if len(done) == len(fs):
return DoneAndNotDoneFutures(done, not_done)
waiter = _create_and_install_waiters(not_done,
_AllGreenWaiter,
len(not_done))
waiter.event.wait(timeout)
for f in not_done:
with f._condition:
f._waiters.remove(waiter)
with _acquire_and_release_futures(fs):
done, not_done = _partition_futures(fs)
return DoneAndNotDoneFutures(done, not_done)
示例12
def _wait_for_any_green(fs, timeout=None):
if not fs:
return DoneAndNotDoneFutures(set(), set())
with _acquire_and_release_futures(fs):
done, not_done = _partition_futures(fs)
if done:
return DoneAndNotDoneFutures(done, not_done)
waiter = _create_and_install_waiters(fs, _AnyGreenWaiter)
waiter.event.wait(timeout)
for f in fs:
with f._condition:
f._waiters.remove(waiter)
with _acquire_and_release_futures(fs):
done, not_done = _partition_futures(fs)
return DoneAndNotDoneFutures(done, not_done)
示例13
def get_latch(num):
"""Get a countdown latch function for use in n threads."""
cv = threading.Condition()
count = 0
def countdown_latch():
"""Block until n-1 other threads have called."""
nonlocal count
cv.acquire()
count += 1
cv.notify()
cv.release()
cv.acquire()
while count < num:
cv.wait()
cv.release()
return countdown_latch
示例14
def process(self, queue, workflow):
while queue.__futures__:
done, _ = wait(queue.__futures__, return_when=FIRST_COMPLETED)
queue.progress(done)
return workflow.result()
示例15
def download_batch(self, batch: Mapping[str, DatabaseMedia]):
""" Downloads a batch of media items collected in download_photo_media.
A fresh 'base_url' is required since they have limited lifespan and
these are obtained by a single call to the service function
mediaItems.batchGet.
"""
try:
response = self._api.mediaItems.batchGet.execute(mediaItemIds=batch.keys())
r_json = response.json()
if r_json.get("pageToken"):
log.error("Ops - Batch size too big, some items dropped!")
for i, result in enumerate(r_json["mediaItemResults"]):
media_item_json = result.get("mediaItem")
if not media_item_json:
log.warning("Null response in mediaItems.batchGet %s", batch.keys())
log.debug(
"Null response in mediaItems.batchGet"
"for item %d in\n\n %s \n\n which is \n%s",
i,
str(r_json),
str(result),
)
else:
media_item = batch.get(media_item_json["id"])
self.download_file(media_item, media_item_json)
except RequestException:
self.find_bad_items(batch)
except KeyboardInterrupt:
log.warning("Cancelling download threads ...")
for f in self.pool_future_to_media:
f.cancel()
futures.wait(self.pool_future_to_media)
log.warning("Cancelled download threads")
raise
示例16
def download_file(self, media_item: DatabaseMedia, media_json: dict):
""" farms a single media download off to the thread pool.
Uses a dictionary of Futures -> mediaItem to track downloads that are
currently scheduled/running. When a Future is done it calls
do_download_complete to remove the Future from the dictionary and
complete processing of the media item.
"""
base_url = media_json["baseUrl"]
# we dont want a massive queue so wait until at least one thread is free
while len(self.pool_future_to_media) >= self.max_threads:
# check which futures are done, complete the main thread work
# and remove them from the dictionary
done_list = []
for future in self.pool_future_to_media.keys():
if future.done():
done_list.append(future)
self.do_download_complete(done_list)
# start a new background download
self.files_download_started += 1
log.info(
"downloading %d %s", self.files_download_started, media_item.relative_path
)
future = self.download_pool.submit(self.do_download_file, base_url, media_item)
self.pool_future_to_media[future] = media_item
示例17
def _shutdown_thread_pools(self) -> None:
self._queued_server_scans = []
for thread_pool in self._thread_pools:
thread_pool.shutdown(wait=True)
self._thread_pools = []
# Force garbage collection because for some reason the Future objects created by ThreadPoolExecutor.submit()
# take a ton of memory (compared to what they do - holding a function to call and its arguments):
# https://stackoverflow.com/questions/45946274/rss-memory-usage-from-concurrent-futures
# https://stackoverflow.com/questions/53104082/using-threadpoolexecutor-with-reduced-memory-footprint
# https://stackoverflow.com/questions/34770169/using-concurrent-futures-without-running-out-of-ram
# We force garbage collection here to ensure memory usage does not balloon when running SSLyze in some kind
# of long-running app (such as a web app). Otherwise, the GC tends to not cleanup all the Future objects right
# away (although at this point, all the work has been completed) and memory usage goes up like crazy
gc.collect()
示例18
def start(self, jobs=None):
"""
Engine starts to run jobs
:param jobs: A list contains at least one job
:return:
"""
try:
if not jobs:
logger.warning("CloudConnectEngine just exits with no jobs to run")
return
for job in jobs:
self._add_job(job)
while not self._shutdown:
logger.info("CloudConnectEngine starts to run...")
if not self._pending_job_results:
logger.info("CloudConnectEngine has no more jobs to run")
break
# check the intermediate results to find the done jobs and not
# done jobs
done_and_not_done_jobs = cf.wait(self._pending_job_results,
return_when=cf.FIRST_COMPLETED)
self._pending_job_results = done_and_not_done_jobs.not_done
done_job_results = done_and_not_done_jobs.done
for future in done_job_results:
# get the result of each done jobs and add new jobs to the
# engine if the result spawns more jobs
result = future.result()
if result:
if isinstance(result, Iterable):
for temp in result:
self._add_job(temp)
else:
self._add_job(result)
except:
logger.exception("CloudConnectEngine encountered exception")
finally:
self._teardown()
示例19
def _teardown(self):
"""
internal method which will call stop method of each running jobs
firstly and then wait for the thread pool to shutdown in a blocked way
:return:
"""
logger.info("CloudConnectEngine is going to tear down...")
self._shutdown = True
with self._lock:
for job in self._pending_jobs:
job.stop()
self._executor.shutdown(wait=True)
logger.info("CloudConnectEngine successfully tears down")
示例20
def start(self, jobs=None):
"""
Engine starts to run jobs
:param jobs: A list contains at least one job
:return:
"""
try:
if not jobs:
logger.warning("CloudConnectEngine just exits with no jobs to run")
return
for job in jobs:
self._add_job(job)
while not self._shutdown:
logger.info("CloudConnectEngine starts to run...")
if not self._pending_job_results:
logger.info("CloudConnectEngine has no more jobs to run")
break
# check the intermediate results to find the done jobs and not
# done jobs
done_and_not_done_jobs = cf.wait(self._pending_job_results,
return_when=cf.FIRST_COMPLETED)
self._pending_job_results = done_and_not_done_jobs.not_done
done_job_results = done_and_not_done_jobs.done
for future in done_job_results:
# get the result of each done jobs and add new jobs to the
# engine if the result spawns more jobs
result = future.result()
if result:
if isinstance(result, Iterable):
for temp in result:
self._add_job(temp)
else:
self._add_job(result)
except:
logger.exception("CloudConnectEngine encountered exception")
finally:
self._teardown()
示例21
def _teardown(self):
"""
internal method which will call stop method of each running jobs
firstly and then wait for the thread pool to shutdown in a blocked way
:return:
"""
logger.info("CloudConnectEngine is going to tear down...")
self._shutdown = True
with self._lock:
for job in self._pending_jobs:
job.stop()
self._executor.shutdown(wait=True)
logger.info("CloudConnectEngine successfully tears down")
示例22
def wait_tasks(futures, timeout=None, return_when=ALL_COMPLETED, raise_exceptions=True):
running_futures = [fut for fut in futures if not fut.done()]
done, _ = wait(running_futures, timeout=timeout, return_when=return_when)
if raise_exceptions:
[f.result() for f in done if not f.cancelled() and f.exception() is not None] # raises the exception
示例23
def wait_tasks_or_abort(futures, timeout=60, kill_switch_ev=None):
try:
LazySingletonTasksCoordinator.wait_tasks(futures, return_when=FIRST_EXCEPTION, raise_exceptions=True)
except Exception as e:
if kill_switch_ev is not None:
# Used when we want to keep both raise the exception and wait for all tasks to finish
kill_switch_ev.set()
LazySingletonTasksCoordinator.wait_tasks(futures, return_when=ALL_COMPLETED,
raise_exceptions=False, timeout=timeout)
raise e
示例24
def shutdown(self, timeout=None):
if self.is_shutdown:
return
with type(self)._POOL_LOCK:
self.is_shutdown = True
if timeout is not None:
self.await_termination(timeout=timeout)
self._workers_pool.shutdown(wait=timeout is not None)
示例25
def _stop(self, no_wait=False):
self._stop_running.set() # force threaded-function to exit
if no_wait:
return
if not self._is_done.wait(timeout=self._stop_timeout):
err_msg = "Failed to stop thread-running function within {} sec".format(self._stop_timeout)
# TODO: should we break current thread or just set this exception inside connection-observer
# (is it symetric to failed-start ?)
# may cause leaking resources - no call to moler_conn.unsubscribe()
raise MolerException(err_msg)
示例26
def _execute_till_eol(self, connection_observer, connection_observer_future, max_timeout, await_timeout,
remain_time):
eol_remain_time = remain_time
# either we wait forced-max-timeout or we check done-status each 0.1sec tick
if eol_remain_time > 0.0:
future = connection_observer_future or connection_observer._future
assert future is not None
if max_timeout:
done, not_done = wait([future], timeout=remain_time)
if (future in done) or connection_observer.done():
self._cancel_submitted_future(connection_observer, future)
return True
self._wait_for_time_out(connection_observer, connection_observer_future,
timeout=await_timeout)
if connection_observer.life_status.terminating_timeout > 0.0:
connection_observer.life_status.in_terminating = True
done, not_done = wait([future], timeout=connection_observer.life_status.terminating_timeout)
if (future in done) or connection_observer.done():
self._cancel_submitted_future(connection_observer, future)
return True
else:
while eol_remain_time > 0.0:
done, not_done = wait([future], timeout=self._tick)
if (future in done) or connection_observer.done():
self._cancel_submitted_future(connection_observer, future)
return True
already_passed = time.time() - connection_observer.life_status.start_time
eol_timeout = connection_observer.timeout + connection_observer.life_status.terminating_timeout
eol_remain_time = eol_timeout - already_passed
timeout = connection_observer.timeout
remain_time = timeout - already_passed
if remain_time <= 0.0:
self._wait_for_time_out(connection_observer, connection_observer_future,
timeout=await_timeout)
if not connection_observer.life_status.in_terminating:
connection_observer.life_status.in_terminating = True
else:
self._wait_for_not_started_connection_observer_is_done(connection_observer=connection_observer)
return False
示例27
def _wait_for_not_started_connection_observer_is_done(self, connection_observer):
# Have to wait till connection_observer is done with terminaing timeout.
eol_remain_time = connection_observer.life_status.terminating_timeout
start_time = time.time()
while not connection_observer.done() and eol_remain_time > 0.0:
time.sleep(self._tick)
eol_remain_time = start_time + connection_observer.life_status.terminating_timeout - time.time()
示例28
def load(data_dir, data_name, batch_size, resize_wh,
crop_locs, crop_wh, total_num=None):
files, labels = get_files(data_dir, data_name, total_num)
total_num = len(labels)
for batch_start in range(0, total_num, batch_size):
data_spec = [batch_size, 1, crop_wh, crop_wh, 3]
if isinstance(crop_locs, list):
data_spec[1] = len(crop_locs)
elif crop_locs == 10:
data_spec[1] = 10
X = np.zeros(data_spec, np.float32)
jobs = []
with cf.ThreadPoolExecutor(max_workers=48) as executor:
for (k, f) in enumerate(files[batch_start:batch_start+batch_size]):
filename = os.path.join("%s/ILSVRC2012_img_val" % data_dir, f)
if os.path.isfile(filename):
jobs.append(executor.submit(
load_single, (*(filename, resize_wh, crop_wh, crop_locs))))
cf.wait(jobs)
for (k, out) in enumerate(jobs):
X[k] = out.result()
yield X.reshape((-1, crop_wh, crop_wh, 3)), \
labels[batch_start:batch_start+batch_size]
del X
示例29
def run(self, funcs):
"""Run a set of functions in parallel, returning their results.
Make sure any function you pass exits with a reasonable timeout. If it
doesn't return within the timeout or the result is ignored due an exception
in a separate thread it will continue to stick around until it finishes,
including blocking process exit.
Args:
funcs: An iterable of functions or iterable of args to functools.partial.
Returns:
A list of return values with the values matching the order in funcs.
Raises:
Propagates the first exception encountered in one of the functions.
"""
funcs = [f if callable(f) else functools.partial(*f) for f in funcs]
if len(funcs) == 1: # Ignore threads if it's not needed.
return [funcs[0]()]
if len(funcs) > self._workers: # Lazy init and grow as needed.
self.shutdown()
self._workers = len(funcs)
self._executor = futures.ThreadPoolExecutor(self._workers)
futs = [self._executor.submit(f) for f in funcs]
done, not_done = futures.wait(futs, self._timeout, futures.FIRST_EXCEPTION)
# Make sure to propagate any exceptions.
for f in done:
if not f.cancelled() and f.exception() is not None:
if not_done:
# If there are some calls that haven't finished, cancel and recreate
# the thread pool. Otherwise we may have a thread running forever
# blocking parallel calls.
for nd in not_done:
nd.cancel()
self.shutdown(False) # Don't wait, they may be deadlocked.
raise f.exception()
# Either done or timed out, so don't wait again.
return [f.result(timeout=0) for f in futs]
示例30
def shutdown(self, wait=True):
if self._executor:
self._executor.shutdown(wait)
self._executor = None
self._workers = 0