Python源码示例:concurrent.futures.Future()
示例1
def set_exc_info(self, exc_info):
"""Sets the exception information of a ``Future.``
Preserves tracebacks on Python 2.
.. versionadded:: 4.0
"""
self._exc_info = exc_info
self._log_traceback = True
if not _GC_CYCLE_FINALIZERS:
self._tb_logger = _TracebackLogger(exc_info)
try:
self._set_done()
finally:
# Activate the logger after all callbacks have had a
# chance to call result() or exception().
if self._log_traceback and self._tb_logger is not None:
self._tb_logger.activate()
self._exc_info = exc_info
示例2
def set_exc_info(self, exc_info):
"""Sets the exception information of a ``Future.``
Preserves tracebacks on Python 2.
.. versionadded:: 4.0
"""
self._exc_info = exc_info
self._log_traceback = True
if not _GC_CYCLE_FINALIZERS:
self._tb_logger = _TracebackLogger(exc_info)
try:
self._set_done()
finally:
# Activate the logger after all callbacks have had a
# chance to call result() or exception().
if self._log_traceback and self._tb_logger is not None:
self._tb_logger.activate()
self._exc_info = exc_info
示例3
def future_set_exception_unless_cancelled(
future: "Union[futures.Future[_T], Future[_T]]", exc: BaseException
) -> None:
"""Set the given ``exc`` as the `Future`'s exception.
If the Future is already canceled, logs the exception instead. If
this logging is not desired, the caller should explicitly check
the state of the Future and call ``Future.set_exception`` instead of
this wrapper.
Avoids ``asyncio.InvalidStateError`` when calling ``set_exception()`` on
a cancelled `asyncio.Future`.
.. versionadded:: 6.0
"""
if not future.cancelled():
future.set_exception(exc)
else:
app_log.error("Exception after Future was cancelled", exc_info=exc)
示例4
def future_set_exc_info(
future: "Union[futures.Future[_T], Future[_T]]",
exc_info: Tuple[
Optional[type], Optional[BaseException], Optional[types.TracebackType]
],
) -> None:
"""Set the given ``exc_info`` as the `Future`'s exception.
Understands both `asyncio.Future` and the extensions in older
versions of Tornado to enable better tracebacks on Python 2.
.. versionadded:: 5.0
.. versionchanged:: 6.0
If the future is already cancelled, this function is a no-op.
(previously ``asyncio.InvalidStateError`` would be raised)
"""
if exc_info[1] is None:
raise Exception("future_set_exc_info called with no exception")
future_set_exception_unless_cancelled(future, exc_info[1])
示例5
def future_add_done_callback( # noqa: F811
future: "Union[futures.Future[_T], Future[_T]]", callback: Callable[..., None]
) -> None:
"""Arrange to call ``callback`` when ``future`` is complete.
``callback`` is invoked with one argument, the ``future``.
If ``future`` is already done, ``callback`` is invoked immediately.
This may differ from the behavior of ``Future.add_done_callback``,
which makes no such guarantee.
.. versionadded:: 5.0
"""
if future.done():
callback(future)
else:
future.add_done_callback(callback)
示例6
def fetch_all_topics(self, workers: int) -> None:
"""
Distribute stream ids across threads in order to fetch
topics concurrently.
"""
with ThreadPoolExecutor(max_workers=workers) as executor:
list_of_streams = list(self.stream_dict.keys())
thread_objects = {
i: executor.submit(self.get_topics_in_stream,
list_of_streams[i::workers])
for i in range(workers)
} # type: Dict[int, Future[str]]
wait(thread_objects.values())
results = {
str(name): self.exception_safe_result(thread_object)
for name, thread_object in thread_objects.items()
} # type: Dict[str, str]
if any(results.values()):
failures = ['fetch_topics[{}]'.format(name)
for name, result in results.items()
if result]
raise ServerConnectionFailure(", ".join(failures))
示例7
def download_file(self, media_item: DatabaseMedia, media_json: dict):
""" farms a single media download off to the thread pool.
Uses a dictionary of Futures -> mediaItem to track downloads that are
currently scheduled/running. When a Future is done it calls
do_download_complete to remove the Future from the dictionary and
complete processing of the media item.
"""
base_url = media_json["baseUrl"]
# we dont want a massive queue so wait until at least one thread is free
while len(self.pool_future_to_media) >= self.max_threads:
# check which futures are done, complete the main thread work
# and remove them from the dictionary
done_list = []
for future in self.pool_future_to_media.keys():
if future.done():
done_list.append(future)
self.do_download_complete(done_list)
# start a new background download
self.files_download_started += 1
log.info(
"downloading %d %s", self.files_download_started, media_item.relative_path
)
future = self.download_pool.submit(self.do_download_file, base_url, media_item)
self.pool_future_to_media[future] = media_item
示例8
def result_for_completed_scan_jobs(
cls, server_info: ServerConnectivityInfo, completed_scan_jobs: List[Future]
) -> ScanCommandResult:
if len(completed_scan_jobs) != cls._scan_jobs_count:
raise AssertionError("Did not receive all the scan jobs that needed to be completed")
return cls.result_cls(results_field=[future.result() for future in completed_scan_jobs]) # type: ignore
示例9
def result_for_completed_scan_jobs(
cls, server_info: ServerConnectivityInfo, completed_scan_jobs: List[Future]
) -> ScanCommandResult:
raise RuntimeError("Ran into a problem when processing results")
示例10
def all_queued_scan_jobs(self) -> Set[Future]:
all_queued_scan_jobs = set()
for scan_jobs in self.queued_scan_jobs_per_scan_command.values():
all_queued_scan_jobs.update(scan_jobs)
return all_queued_scan_jobs
示例11
def _shutdown_thread_pools(self) -> None:
self._queued_server_scans = []
for thread_pool in self._thread_pools:
thread_pool.shutdown(wait=True)
self._thread_pools = []
# Force garbage collection because for some reason the Future objects created by ThreadPoolExecutor.submit()
# take a ton of memory (compared to what they do - holding a function to call and its arguments):
# https://stackoverflow.com/questions/45946274/rss-memory-usage-from-concurrent-futures
# https://stackoverflow.com/questions/53104082/using-threadpoolexecutor-with-reduced-memory-footprint
# https://stackoverflow.com/questions/34770169/using-concurrent-futures-without-running-out-of-ram
# We force garbage collection here to ensure memory usage does not balloon when running SSLyze in some kind
# of long-running app (such as a web app). Otherwise, the GC tends to not cleanup all the Future objects right
# away (although at this point, all the work has been completed) and memory usage goes up like crazy
gc.collect()
示例12
def result_for_completed_scan_jobs(
cls, server_info: "ServerConnectivityInfo", completed_scan_jobs: List[Future]
) -> _ScanCommandResultTypeVar:
"""Transform the completed scan jobs for a given scan command into a result.
"""
pass
示例13
def __del__(self):
if self.formatted_tb:
app_log.error('Future exception was never retrieved: %s',
''.join(self.formatted_tb).rstrip())
示例14
def result(self, timeout=None):
"""If the operation succeeded, return its result. If it failed,
re-raise its exception.
This method takes a ``timeout`` argument for compatibility with
`concurrent.futures.Future` but it is an error to call it
before the `Future` is done, so the ``timeout`` is never used.
"""
self._clear_tb_log()
if self._result is not None:
return self._result
if self._exc_info is not None:
raise_exc_info(self._exc_info)
self._check_done()
return self._result
示例15
def exception(self, timeout=None):
"""If the operation raised an exception, return the `Exception`
object. Otherwise returns None.
This method takes a ``timeout`` argument for compatibility with
`concurrent.futures.Future` but it is an error to call it
before the `Future` is done, so the ``timeout`` is never used.
"""
self._clear_tb_log()
if self._exc_info is not None:
return self._exc_info[1]
else:
self._check_done()
return None
示例16
def add_done_callback(self, fn):
"""Attaches the given callback to the `Future`.
It will be invoked with the `Future` as its argument when the Future
has finished running and its result is available. In Tornado
consider using `.IOLoop.add_future` instead of calling
`add_done_callback` directly.
"""
if self._done:
fn(self)
else:
self._callbacks.append(fn)
示例17
def set_result(self, result):
"""Sets the result of a ``Future``.
It is undefined to call any of the ``set`` methods more than once
on the same object.
"""
self._result = result
self._set_done()
示例18
def __del__(self):
if not self._log_traceback:
# set_exception() was not called, or result() or exception()
# has consumed the exception
return
tb = traceback.format_exception(*self._exc_info)
app_log.error('Future %r exception was never retrieved: %s',
self, ''.join(tb).rstrip())
示例19
def __del__(self):
if self.formatted_tb:
app_log.error('Future exception was never retrieved: %s',
''.join(self.formatted_tb).rstrip())
示例20
def result(self, timeout=None):
"""If the operation succeeded, return its result. If it failed,
re-raise its exception.
This method takes a ``timeout`` argument for compatibility with
`concurrent.futures.Future` but it is an error to call it
before the `Future` is done, so the ``timeout`` is never used.
"""
self._clear_tb_log()
if self._result is not None:
return self._result
if self._exc_info is not None:
raise_exc_info(self._exc_info)
self._check_done()
return self._result
示例21
def exception(self, timeout=None):
"""If the operation raised an exception, return the `Exception`
object. Otherwise returns None.
This method takes a ``timeout`` argument for compatibility with
`concurrent.futures.Future` but it is an error to call it
before the `Future` is done, so the ``timeout`` is never used.
"""
self._clear_tb_log()
if self._exc_info is not None:
return self._exc_info[1]
else:
self._check_done()
return None
示例22
def set_result(self, result):
"""Sets the result of a ``Future``.
It is undefined to call any of the ``set`` methods more than once
on the same object.
"""
self._result = result
self._set_done()
示例23
def set_exception(self, exception):
"""Sets the exception of a ``Future.``"""
self.set_exc_info(
(exception.__class__,
exception,
getattr(exception, '__traceback__', None)))
示例24
def __del__(self):
if not self._log_traceback:
# set_exception() was not called, or result() or exception()
# has consumed the exception
return
tb = traceback.format_exception(*self._exc_info)
app_log.error('Future %r exception was never retrieved: %s',
self, ''.join(tb).rstrip())
示例25
def __init__(self):
self._future = Future()
self._awaited = False
self._scheduler = None
self._success = False
self._timeout = None
self._deadline = None
self._timedout = False
# FIXME: float_tol should be moved to ArsdkExpectationBase
self._float_tol = DEFAULT_FLOAT_TOL
示例26
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = futures.Future()
t = threading.Thread(target=_worker, args=(f, fn, args, kwargs))
t.start()
return f
示例27
def submit(self, bufs, task_id=None):
"""Submits work to the the outgoing_q.
The outgoing_q is an external process listens on this
queue for new work. This method behaves like a
submit call as described here `Python docs: <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor>`_
Parameters
----------
Bufs - Pickled buffer with (b'<Function>', b'<args>', b'<kwargs>')
Returns:
Future
"""
if self._executor_bad_state.is_set():
raise self._executor_exception
self._task_counter += 1
if not task_id:
task_id = self._task_counter
self.tasks[task_id] = Future()
# This needs to be a byte buffer
# We want a cleaner header to the task here for the downstream systems
# to appropriately route tasks
msg = {"task_id": task_id,
"buffer": bufs}
# Post task to the the outgoing queue
self.outgoing_q.put(msg)
# Return the future
return self.tasks[task_id]
示例28
def submit(self, fn, *args, **kwargs):
"""Blocking version of `Executor.submit()`. Returns a resolved
`Future`.
"""
try:
return Present(result=fn(*args, **kwargs))
except Exception as exc:
return Present(exception=exc)
示例29
def submit(
self, fn: Callable[..., _T], *args: Any, **kwargs: Any
) -> "futures.Future[_T]":
future = futures.Future() # type: futures.Future[_T]
try:
future_set_result_unless_cancelled(future, fn(*args, **kwargs))
except Exception:
future_set_exc_info(future, sys.exc_info())
return future
示例30
def chain_future(a: "Future[_T]", b: "Future[_T]") -> None:
"""Chain two futures together so that when one completes, so does the other.
The result (success or failure) of ``a`` will be copied to ``b``, unless
``b`` has already been completed or cancelled by the time ``a`` finishes.
.. versionchanged:: 5.0
Now accepts both Tornado/asyncio `Future` objects and
`concurrent.futures.Future`.
"""
def copy(future: "Future[_T]") -> None:
assert future is a
if b.done():
return
if hasattr(a, "exc_info") and a.exc_info() is not None: # type: ignore
future_set_exc_info(b, a.exc_info()) # type: ignore
elif a.exception() is not None:
b.set_exception(a.exception())
else:
b.set_result(a.result())
if isinstance(a, Future):
future_add_done_callback(a, copy)
else:
# concurrent.futures.Future
from tornado.ioloop import IOLoop
IOLoop.current().add_future(a, copy)