From c6184203bb4ca59d2d9fabc6ac338c5b6faed288 Mon Sep 17 00:00:00 2001 From: "brian.quinlan" Date: Sat, 13 Nov 2010 10:03:32 +0000 Subject: Reintegrate feedback to head --- LICENSE | 21 -- docs/Makefile | 88 ----- docs/conf.py | 194 ----------- docs/index.rst | 345 ------------------- docs/make.bat | 112 ------ python2/crawl.py | 68 ---- python2/futures/__init__.py | 18 - python2/futures/_base.py | 541 ----------------------------- python2/futures/process.py | 337 ------------------ python2/futures/thread.py | 136 -------- python2/primes.py | 47 --- python2/setup.py | 18 - python2/test_futures.py | 811 ------------------------------------------- python3/crawl.py | 68 ---- python3/futures/__init__.py | 18 - python3/futures/_base.py | 540 ----------------------------- python3/futures/process.py | 337 ------------------ python3/futures/thread.py | 136 -------- python3/primes.py | 47 --- python3/setup.py | 18 - python3/test_futures.py | 819 -------------------------------------------- 21 files changed, 4719 deletions(-) delete mode 100644 LICENSE delete mode 100644 docs/Makefile delete mode 100644 docs/conf.py delete mode 100644 docs/index.rst delete mode 100644 docs/make.bat delete mode 100644 python2/crawl.py delete mode 100644 python2/futures/__init__.py delete mode 100644 python2/futures/_base.py delete mode 100644 python2/futures/process.py delete mode 100644 python2/futures/thread.py delete mode 100644 python2/primes.py delete mode 100755 python2/setup.py delete mode 100644 python2/test_futures.py delete mode 100644 python3/crawl.py delete mode 100644 python3/futures/__init__.py delete mode 100644 python3/futures/_base.py delete mode 100644 python3/futures/process.py delete mode 100644 python3/futures/thread.py delete mode 100644 python3/primes.py delete mode 100755 python3/setup.py delete mode 100644 python3/test_futures.py diff --git a/LICENSE b/LICENSE deleted file mode 100644 index c430db0..0000000 --- a/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -Copyright 2009 Brian Quinlan. All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - - 1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - 2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY BRIAN QUINLAN "AS IS" AND ANY EXPRESS OR IMPLIED -WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT -HALL THE FREEBSD PROJECT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE -OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF -ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/docs/Makefile b/docs/Makefile deleted file mode 100644 index f69d840..0000000 --- a/docs/Makefile +++ /dev/null @@ -1,88 +0,0 @@ -# Makefile for Sphinx documentation -# - -# You can set these variables from the command line. -SPHINXOPTS = -SPHINXBUILD = sphinx-build -PAPER = - -# Internal variables. -PAPEROPT_a4 = -D latex_paper_size=a4 -PAPEROPT_letter = -D latex_paper_size=letter -ALLSPHINXOPTS = -d _build/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . - -.PHONY: help clean html dirhtml pickle json htmlhelp qthelp latex changes linkcheck doctest - -help: - @echo "Please use \`make ' where is one of" - @echo " html to make standalone HTML files" - @echo " dirhtml to make HTML files named index.html in directories" - @echo " pickle to make pickle files" - @echo " json to make JSON files" - @echo " htmlhelp to make HTML files and a HTML help project" - @echo " qthelp to make HTML files and a qthelp project" - @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter" - @echo " changes to make an overview of all changed/added/deprecated items" - @echo " linkcheck to check all external links for integrity" - @echo " doctest to run all doctests embedded in the documentation (if enabled)" - -clean: - -rm -rf _build/* - -html: - $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) _build/html - @echo - @echo "Build finished. The HTML pages are in _build/html." - -dirhtml: - $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) _build/dirhtml - @echo - @echo "Build finished. The HTML pages are in _build/dirhtml." - -pickle: - $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) _build/pickle - @echo - @echo "Build finished; now you can process the pickle files." - -json: - $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) _build/json - @echo - @echo "Build finished; now you can process the JSON files." - -htmlhelp: - $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) _build/htmlhelp - @echo - @echo "Build finished; now you can run HTML Help Workshop with the" \ - ".hhp project file in _build/htmlhelp." - -qthelp: - $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) _build/qthelp - @echo - @echo "Build finished; now you can run "qcollectiongenerator" with the" \ - ".qhcp project file in _build/qthelp, like this:" - @echo "# qcollectiongenerator _build/qthelp/futures.qhcp" - @echo "To view the help file:" - @echo "# assistant -collectionFile _build/qthelp/futures.qhc" - -latex: - $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) _build/latex - @echo - @echo "Build finished; the LaTeX files are in _build/latex." - @echo "Run \`make all-pdf' or \`make all-ps' in that directory to" \ - "run these through (pdf)latex." - -changes: - $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) _build/changes - @echo - @echo "The overview file is in _build/changes." - -linkcheck: - $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) _build/linkcheck - @echo - @echo "Link check complete; look for any errors in the above output " \ - "or in _build/linkcheck/output.txt." - -doctest: - $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) _build/doctest - @echo "Testing of doctests in the sources finished, look at the " \ - "results in _build/doctest/output.txt." diff --git a/docs/conf.py b/docs/conf.py deleted file mode 100644 index 9f033fa..0000000 --- a/docs/conf.py +++ /dev/null @@ -1,194 +0,0 @@ -# -*- coding: utf-8 -*- -# -# futures documentation build configuration file, created by -# sphinx-quickstart on Wed Jun 3 19:35:34 2009. -# -# This file is execfile()d with the current directory set to its containing dir. -# -# Note that not all possible configuration values are present in this -# autogenerated file. -# -# All configuration values have a default; values that are commented out -# serve to show the default. - -import sys, os - -# If extensions (or modules to document with autodoc) are in another directory, -# add these directories to sys.path here. If the directory is relative to the -# documentation root, use os.path.abspath to make it absolute, like shown here. -#sys.path.append(os.path.abspath('.')) - -# -- General configuration ----------------------------------------------------- - -# Add any Sphinx extension module names here, as strings. They can be extensions -# coming with Sphinx (named 'sphinx.ext.*') or your custom ones. -extensions = [] - -# Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] - -# The suffix of source filenames. -source_suffix = '.rst' - -# The encoding of source files. -#source_encoding = 'utf-8' - -# The master toctree document. -master_doc = 'index' - -# General information about the project. -project = u'futures' -copyright = u'2009, Brian Quinlan' - -# The version info for the project you're documenting, acts as replacement for -# |version| and |release|, also used in various other places throughout the -# built documents. -# -# The short X.Y version. -version = '0.1' -# The full version, including alpha/beta/rc tags. -release = '0.1' - -# The language for content autogenerated by Sphinx. Refer to documentation -# for a list of supported languages. -#language = None - -# There are two options for replacing |today|: either, you set today to some -# non-false value, then it is used: -#today = '' -# Else, today_fmt is used as the format for a strftime call. -#today_fmt = '%B %d, %Y' - -# List of documents that shouldn't be included in the build. -#unused_docs = [] - -# List of directories, relative to source directory, that shouldn't be searched -# for source files. -exclude_trees = ['_build'] - -# The reST default role (used for this markup: `text`) to use for all documents. -#default_role = None - -# If true, '()' will be appended to :func: etc. cross-reference text. -#add_function_parentheses = True - -# If true, the current module name will be prepended to all description -# unit titles (such as .. function::). -#add_module_names = True - -# If true, sectionauthor and moduleauthor directives will be shown in the -# output. They are ignored by default. -#show_authors = False - -# The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' - -# A list of ignored prefixes for module index sorting. -#modindex_common_prefix = [] - - -# -- Options for HTML output --------------------------------------------------- - -# The theme to use for HTML and HTML Help pages. Major themes that come with -# Sphinx are currently 'default' and 'sphinxdoc'. -html_theme = 'default' - -# Theme options are theme-specific and customize the look and feel of a theme -# further. For a list of options available for each theme, see the -# documentation. -#html_theme_options = {} - -# Add any paths that contain custom themes here, relative to this directory. -#html_theme_path = [] - -# The name for this set of Sphinx documents. If None, it defaults to -# " v documentation". -#html_title = None - -# A shorter title for the navigation bar. Default is the same as html_title. -#html_short_title = None - -# The name of an image file (relative to this directory) to place at the top -# of the sidebar. -#html_logo = None - -# The name of an image file (within the static path) to use as favicon of the -# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 -# pixels large. -#html_favicon = None - -# Add any paths that contain custom static files (such as style sheets) here, -# relative to this directory. They are copied after the builtin static files, -# so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] - -# If not '', a 'Last updated on:' timestamp is inserted at every page bottom, -# using the given strftime format. -#html_last_updated_fmt = '%b %d, %Y' - -# If true, SmartyPants will be used to convert quotes and dashes to -# typographically correct entities. -#html_use_smartypants = True - -# Custom sidebar templates, maps document names to template names. -#html_sidebars = {} - -# Additional templates that should be rendered to pages, maps page names to -# template names. -#html_additional_pages = {} - -# If false, no module index is generated. -#html_use_modindex = True - -# If false, no index is generated. -#html_use_index = True - -# If true, the index is split into individual pages for each letter. -#html_split_index = False - -# If true, links to the reST sources are added to the pages. -#html_show_sourcelink = True - -# If true, an OpenSearch description file will be output, and all pages will -# contain a tag referring to it. The value of this option must be the -# base URL from which the finished HTML is served. -#html_use_opensearch = '' - -# If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml"). -#html_file_suffix = '' - -# Output file base name for HTML help builder. -htmlhelp_basename = 'futuresdoc' - - -# -- Options for LaTeX output -------------------------------------------------- - -# The paper size ('letter' or 'a4'). -#latex_paper_size = 'letter' - -# The font size ('10pt', '11pt' or '12pt'). -#latex_font_size = '10pt' - -# Grouping the document tree into LaTeX files. List of tuples -# (source start file, target name, title, author, documentclass [howto/manual]). -latex_documents = [ - ('index', 'futures.tex', u'futures Documentation', - u'Brian Quinlan', 'manual'), -] - -# The name of an image file (relative to this directory) to place at the top of -# the title page. -#latex_logo = None - -# For "manual" documents, if this is true, then toplevel headings are parts, -# not chapters. -#latex_use_parts = False - -# Additional stuff for the LaTeX preamble. -#latex_preamble = '' - -# Documents to append as an appendix to all manuals. -#latex_appendices = [] - -# If false, no module index is generated. -#latex_use_modindex = True diff --git a/docs/index.rst b/docs/index.rst deleted file mode 100644 index ac006a8..0000000 --- a/docs/index.rst +++ /dev/null @@ -1,345 +0,0 @@ -:mod:`futures` --- Asynchronous computation -=========================================== - -.. module:: futures - :synopsis: Execute computations asynchronously using threads or processes. - -The :mod:`futures` module provides a high-level interface for asynchronously -executing callables. - -The asynchronous execution can be be performed by threads using -:class:`ThreadPoolExecutor` or seperate processes using -:class:`ProcessPoolExecutor`. Both implement the same interface, which is -defined by the abstract :class:`Executor` class. - -Executor Objects ----------------- - -:class:`Executor` is an abstract class that provides methods to execute calls -asynchronously. It should not be used directly, but through its two -subclasses: :class:`ThreadPoolExecutor` and :class:`ProcessPoolExecutor`. - -.. method:: Executor.submit(fn, *args, **kwargs) - - Schedules the callable to be executed as *fn*(*\*args*, *\*\*kwargs*) and - returns a :class:`Future` representing the execution of the callable. - -:: - - with ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit(pow, 323, 1235) - print(future.result()) - -.. method:: Executor.map(func, *iterables, timeout=None) - - Equivalent to map(*func*, *\*iterables*) but func is executed asynchronously - and several calls to *func* may be made concurrently. The returned iterator - raises a :exc:`TimeoutError` if :meth:`__next__()` is called and the result - isn't available after *timeout* seconds from the original call to - :meth:`map()`. *timeout* can be an int or float. If *timeout* is not - specified or ``None`` then there is no limit to the wait time. If a call - raises an exception then that exception will be raised when its value is - retrieved from the iterator. - -.. method:: Executor.shutdown(wait=True) - - Signal the executor that it should free any resources that it is using when - the currently pending futures are done executing. Calls to - :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will - raise :exc:`RuntimeError`. - - If *wait* is `True` then this method will not return until all the pending - futures are done executing and the resources associated with the executor - have been freed. If *wait* is `False` then this method will return - immediately and the resources associated with the executor will be freed - when all pending futures are done executing. Regardless of the value of - *wait*, the entire Python program will not exit until all pending futures - are done executing. - - You can avoid having to call this method explicitly if you use the `with` - statement, which will shutdown the `Executor` (waiting as if - `Executor.shutdown` were called with *wait* set to `True`): - -:: - - import shutil - with ThreadPoolExecutor(max_workers=4) as e: - e.submit(shutil.copy, 'src1.txt', 'dest1.txt') - e.submit(shutil.copy, 'src2.txt', 'dest2.txt') - e.submit(shutil.copy, 'src3.txt', 'dest3.txt') - e.submit(shutil.copy, 'src3.txt', 'dest4.txt') - - -ThreadPoolExecutor Objects --------------------------- - -The :class:`ThreadPoolExecutor` class is an :class:`Executor` subclass that uses -a pool of threads to execute calls asynchronously. - -Deadlock can occur when the callable associated with a :class:`Future` waits on -the results of another :class:`Future`. For example: - -:: - - import time - def wait_on_b(): - time.sleep(5) - print(b.result()) # b will never complete because it is waiting on a. - return 5 - - def wait_on_a(): - time.sleep(5) - print(a.result()) # a will never complete because it is waiting on b. - return 6 - - - executor = ThreadPoolExecutor(max_workers=2) - a = executor.submit(wait_on_b) - b = executor.submit(wait_on_a) - -And: - -:: - - def wait_on_future(): - f = executor.submit(pow, 5, 2) - # This will never complete because there is only one worker thread and - # it is executing this function. - print(f.result()) - - executor = ThreadPoolExecutor(max_workers=1) - executor.submit(wait_on_future) - -.. class:: ThreadPoolExecutor(max_workers) - - Executes calls asynchronously using at pool of at most *max_workers* threads. - -.. _threadpoolexecutor-example: - -ThreadPoolExecutor Example -^^^^^^^^^^^^^^^^^^^^^^^^^^ -:: - - import futures - import urllib.request - - URLS = ['http://www.foxnews.com/', - 'http://www.cnn.com/', - 'http://europe.wsj.com/', - 'http://www.bbc.co.uk/', - 'http://some-made-up-domain.com/'] - - def load_url(url, timeout): - return urllib.request.urlopen(url, timeout=timeout).read() - - with futures.ThreadPoolExecutor(max_workers=5) as executor: - future_to_url = dict((executor.submit(load_url, url, 60), url) - for url in URLS) - - for future in futures.as_completed(future_to_url): - url = future_to_url[future] - if future.exception() is not None: - print('%r generated an exception: %s' % (url, - future.exception())) - else: - print('%r page is %d bytes' % (url, len(future.result()))) - -ProcessPoolExecutor Objects ---------------------------- - -The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that -uses a pool of processes to execute calls asynchronously. -:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which -allows it to side-step the :term:`Global Interpreter Lock` but also means that -only picklable objects can be executed and returned. - -Calling :class:`Executor` or :class:`Future` methods from a callable submitted -to a :class:`ProcessPoolExecutor` will result in deadlock. - -.. class:: ProcessPoolExecutor(max_workers=None) - - Executes calls asynchronously using a pool of at most *max_workers* - processes. If *max_workers* is ``None`` or not given then as many worker - processes will be created as the machine has processors. - -.. _processpoolexecutor-example: - -ProcessPoolExecutor Example -^^^^^^^^^^^^^^^^^^^^^^^^^^^ -:: - - import math - - PRIMES = [ - 112272535095293, - 112582705942171, - 112272535095293, - 115280095190773, - 115797848077099, - 1099726899285419] - - def is_prime(n): - if n % 2 == 0: - return False - - sqrt_n = int(math.floor(math.sqrt(n))) - for i in range(3, sqrt_n + 1, 2): - if n % i == 0: - return False - return True - - def main(): - with futures.ProcessPoolExecutor() as executor: - for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): - print('%d is prime: %s' % (number, prime)) - - if __name__ == '__main__': - main() - -Future Objects --------------- - -The :class:`Future` class encapulates the asynchronous execution of a callable. -:class:`Future` instances are created by :meth:`Executor.submit`. - -.. method:: Future.cancel() - - Attempt to cancel the call. If the call is currently being executed then - it cannot be cancelled and the method will return `False`, otherwise the call - will be cancelled and the method will return `True`. - -.. method:: Future.cancelled() - - Return `True` if the call was successfully cancelled. - -.. method:: Future.running() - - Return `True` if the call is currently being executed and cannot be - cancelled. - -.. method:: Future.done() - - Return `True` if the call was successfully cancelled or finished running. - -.. method:: Future.result(timeout=None) - - Return the value returned by the call. If the call hasn't yet completed then - this method will wait up to *timeout* seconds. If the call hasn't completed - in *timeout* seconds then a :exc:`TimeoutError` will be raised. *timeout* can - be an int or float.If *timeout* is not specified or ``None`` then there is no - limit to the wait time. - - If the future is cancelled before completing then :exc:`CancelledError` will - be raised. - - If the call raised then this method will raise the same exception. - -.. method:: Future.exception(timeout=None) - - Return the exception raised by the call. If the call hasn't yet completed - then this method will wait up to *timeout* seconds. If the call hasn't - completed in *timeout* seconds then a :exc:`TimeoutError` will be raised. - *timeout* can be an int or float. If *timeout* is not specified or ``None`` - then there is no limit to the wait time. - - If the future is cancelled before completing then :exc:`CancelledError` will - be raised. - - If the call completed without raising then ``None`` is returned. - -.. method:: Future.add_done_callback(fn) - - Attaches the callable *fn* to the future. *fn* will be called, with the - future as its only argument, when the future is cancelled or finishes - running. - - Added callables are called in the order that they were added and are always - called in a thread belonging to the process that added them. If the callable - raises an :exc:`Exception` then it will be logged and ignored. If the - callable raises another :exc:`BaseException` then the behavior is not - defined. - - If the future has already completed or been cancelled then *fn* will be - called immediately. - -Internal Future Methods -^^^^^^^^^^^^^^^^^^^^^^^ - -The following :class:`Future` methods are meant for use in unit tests and -:class:`Executor` implementations. - -.. method:: Future.set_running_or_notify_cancel() - - This method should only be called by :class:`Executor` implementations before - executing the work associated with the :class:`Future` and by unit tests. - - If the method returns `False` then the :class:`Future` was cancelled i.e. - :meth:`Future.cancel` was called and returned `True`. Any threads waiting - on the :class:`Future` completing (i.e. through :func:`as_completed` or - :func:`wait`) will be woken up. - - If the method returns `True` then the :class:`Future` was not cancelled - and has been put in the running state i.e. calls to - :meth:`Future.running` will return `True`. - - This method can only be called once and cannot be called after - :meth:`Future.set_result` or :meth:`Future.set_exception` have been - called. - -.. method:: Future.set_result(result) - - Sets the result of the work associated with the :class:`Future` to *result*. - - This method should only be used by Executor implementations and unit tests. - -.. method:: Future.set_exception(exception) - - Sets the result of the work associated with the :class:`Future` to the - :class:`Exception` *exception*. - - This method should only be used by Executor implementations and unit tests. - -Module Functions ----------------- - -.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED) - - Wait for the :class:`Future` instances (possibly created by different - :class:`Executor` instances) given by *fs* to complete. Returns a named - 2-tuple of sets. The first set, named "done", contains the futures that - completed (finished or were cancelled) before the wait completed. The second - set, named "not_done", contains uncompleted futures. - - *timeout* can be used to control the maximum number of seconds to wait before - returning. *timeout* can be an int or float. If *timeout* is not specified or - ``None`` then there is no limit to the wait time. - - *return_when* indicates when this function should return. It must be one of - the following constants: - - +-----------------------------+----------------------------------------+ - | Constant | Description | - +=============================+========================================+ - | :const:`FIRST_COMPLETED` | The function will return when any | - | | future finishes or is cancelled. | - +-----------------------------+----------------------------------------+ - | :const:`FIRST_EXCEPTION` | The function will return when any | - | | future finishes by raising an | - | | exception. If no future raises an | - | | exception then it is equivalent to | - | | `ALL_COMPLETED`. | - +-----------------------------+----------------------------------------+ - | :const:`ALL_COMPLETED` | The function will return when all | - | | futures finish or are cancelled. | - +-----------------------------+----------------------------------------+ - -.. function:: as_completed(fs, timeout=None) - - Returns an iterator over the :class:`Future` instances (possibly created - by different :class:`Executor` instances) given by *fs* that yields futures - as they complete (finished or were cancelled). Any futures that completed - before :func:`as_completed()` was called will be yielded first. The returned - iterator raises a :exc:`TimeoutError` if :meth:`__next__()` is called and - the result isn't available after *timeout* seconds from the original call - to :func:`as_completed()`. *timeout* can be an int or float. If *timeout* - is not specified or ``None`` then there is no limit to the wait time. diff --git a/docs/make.bat b/docs/make.bat deleted file mode 100644 index 3e8021b..0000000 --- a/docs/make.bat +++ /dev/null @@ -1,112 +0,0 @@ -@ECHO OFF - -REM Command file for Sphinx documentation - -set SPHINXBUILD=sphinx-build -set ALLSPHINXOPTS=-d _build/doctrees %SPHINXOPTS% . -if NOT "%PAPER%" == "" ( - set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS% -) - -if "%1" == "" goto help - -if "%1" == "help" ( - :help - echo.Please use `make ^` where ^ is one of - echo. html to make standalone HTML files - echo. dirhtml to make HTML files named index.html in directories - echo. pickle to make pickle files - echo. json to make JSON files - echo. htmlhelp to make HTML files and a HTML help project - echo. qthelp to make HTML files and a qthelp project - echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter - echo. changes to make an overview over all changed/added/deprecated items - echo. linkcheck to check all external links for integrity - echo. doctest to run all doctests embedded in the documentation if enabled - goto end -) - -if "%1" == "clean" ( - for /d %%i in (_build\*) do rmdir /q /s %%i - del /q /s _build\* - goto end -) - -if "%1" == "html" ( - %SPHINXBUILD% -b html %ALLSPHINXOPTS% _build/html - echo. - echo.Build finished. The HTML pages are in _build/html. - goto end -) - -if "%1" == "dirhtml" ( - %SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% _build/dirhtml - echo. - echo.Build finished. The HTML pages are in _build/dirhtml. - goto end -) - -if "%1" == "pickle" ( - %SPHINXBUILD% -b pickle %ALLSPHINXOPTS% _build/pickle - echo. - echo.Build finished; now you can process the pickle files. - goto end -) - -if "%1" == "json" ( - %SPHINXBUILD% -b json %ALLSPHINXOPTS% _build/json - echo. - echo.Build finished; now you can process the JSON files. - goto end -) - -if "%1" == "htmlhelp" ( - %SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% _build/htmlhelp - echo. - echo.Build finished; now you can run HTML Help Workshop with the ^ -.hhp project file in _build/htmlhelp. - goto end -) - -if "%1" == "qthelp" ( - %SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% _build/qthelp - echo. - echo.Build finished; now you can run "qcollectiongenerator" with the ^ -.qhcp project file in _build/qthelp, like this: - echo.^> qcollectiongenerator _build\qthelp\futures.qhcp - echo.To view the help file: - echo.^> assistant -collectionFile _build\qthelp\futures.ghc - goto end -) - -if "%1" == "latex" ( - %SPHINXBUILD% -b latex %ALLSPHINXOPTS% _build/latex - echo. - echo.Build finished; the LaTeX files are in _build/latex. - goto end -) - -if "%1" == "changes" ( - %SPHINXBUILD% -b changes %ALLSPHINXOPTS% _build/changes - echo. - echo.The overview file is in _build/changes. - goto end -) - -if "%1" == "linkcheck" ( - %SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% _build/linkcheck - echo. - echo.Link check complete; look for any errors in the above output ^ -or in _build/linkcheck/output.txt. - goto end -) - -if "%1" == "doctest" ( - %SPHINXBUILD% -b doctest %ALLSPHINXOPTS% _build/doctest - echo. - echo.Testing of doctests in the sources finished, look at the ^ -results in _build/doctest/output.txt. - goto end -) - -:end diff --git a/python2/crawl.py b/python2/crawl.py deleted file mode 100644 index f1d29ca..0000000 --- a/python2/crawl.py +++ /dev/null @@ -1,68 +0,0 @@ -"""Compare the speed of downloading URLs sequentially vs. using futures.""" - -import datetime -import functools -import futures.thread -import time -import timeit -import urllib2 - -URLS = ['http://www.google.com/', - 'http://www.apple.com/', - 'http://www.ibm.com', - 'http://www.thisurlprobablydoesnotexist.com', - 'http://www.slashdot.org/', - 'http://www.python.org/', - 'http://www.bing.com/', - 'http://www.facebook.com/', - 'http://www.yahoo.com/', - 'http://www.youtube.com/', - 'http://www.blogger.com/'] - -def load_url(url, timeout): - return urllib2.urlopen(url, timeout=timeout).read() - -def download_urls_sequential(urls, timeout=60): - url_to_content = {} - for url in urls: - try: - url_to_content[url] = load_url(url, timeout=timeout) - except: - pass - return url_to_content - -def download_urls_with_executor(urls, executor, timeout=60): - try: - url_to_content = {} - future_to_url = dict((executor.submit(load_url, url, timeout), url) - for url in urls) - - for future in futures.as_completed(future_to_url): - try: - url_to_content[future_to_url[future]] = future.result() - except: - pass - return url_to_content - finally: - executor.shutdown() - -def main(): - for name, fn in [('sequential', - functools.partial(download_urls_sequential, URLS)), - ('processes', - functools.partial(download_urls_with_executor, - URLS, - futures.ProcessPoolExecutor(10))), - ('threads', - functools.partial(download_urls_with_executor, - URLS, - futures.ThreadPoolExecutor(10)))]: - print name.ljust(12), - start = time.time() - url_map = fn() - print '%.2f seconds (%d of %d downloaded)' % (time.time() - start, - len(url_map), - len(URLS)) - -if __name__ == '__main__': - main() diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py deleted file mode 100644 index 8331d53..0000000 --- a/python2/futures/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Execute computations asynchronously using threads or processes.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -from futures._base import (FIRST_COMPLETED, - FIRST_EXCEPTION, - ALL_COMPLETED, - CancelledError, - TimeoutError, - Future, - Executor, - wait, - as_completed) -from futures.process import ProcessPoolExecutor -from futures.thread import ThreadPoolExecutor diff --git a/python2/futures/_base.py b/python2/futures/_base.py deleted file mode 100644 index ed7a094..0000000 --- a/python2/futures/_base.py +++ /dev/null @@ -1,541 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import collections -import functools -import logging -import threading -import time - -FIRST_COMPLETED = 'FIRST_COMPLETED' -FIRST_EXCEPTION = 'FIRST_EXCEPTION' -ALL_COMPLETED = 'ALL_COMPLETED' - -# Possible future states (for internal use by the futures package). -PENDING = 'PENDING' -RUNNING = 'RUNNING' -# The future was cancelled by the user... -CANCELLED = 'CANCELLED' -# ...and _Waiter.add_cancelled() was called by a worker. -CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' -FINISHED = 'FINISHED' - -_FUTURE_STATES = [ - PENDING, - RUNNING, - CANCELLED, - CANCELLED_AND_NOTIFIED, - FINISHED -] - -_STATE_TO_DESCRIPTION_MAP = { - PENDING: "pending", - RUNNING: "running", - CANCELLED: "cancelled", - CANCELLED_AND_NOTIFIED: "cancelled", - FINISHED: "finished" -} - -# Logger for internal use by the futures package. -LOGGER = logging.getLogger("futures") -STDERR_HANDLER = logging.StreamHandler() -LOGGER.addHandler(STDERR_HANDLER) - -class Error(Exception): - """Base class for all future-related exceptions.""" - pass - -class CancelledError(Error): - """The Future was cancelled.""" - pass - -class TimeoutError(Error): - """The operation exceeded the given deadline.""" - pass - -class _Waiter(object): - """Provides the event that wait() and as_completed() block on.""" - def __init__(self): - self.event = threading.Event() - self.finished_futures = [] - - def add_result(self, future): - self.finished_futures.append(future) - - def add_exception(self, future): - self.finished_futures.append(future) - - def add_cancelled(self, future): - self.finished_futures.append(future) - -class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED) and as_completed().""" - - def add_result(self, future): - super(_FirstCompletedWaiter, self).add_result(future) - self.event.set() - - def add_exception(self, future): - super(_FirstCompletedWaiter, self).add_exception(future) - self.event.set() - - def add_cancelled(self, future): - super(_FirstCompletedWaiter, self).add_cancelled(future) - self.event.set() - -class _AllCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" - - def __init__(self, num_pending_calls, stop_on_exception): - self.num_pending_calls = num_pending_calls - self.stop_on_exception = stop_on_exception - super(_AllCompletedWaiter, self).__init__() - - def _decrement_pending_calls(self): - self.num_pending_calls -= 1 - if not self.num_pending_calls: - self.event.set() - - def add_result(self, future): - super(_AllCompletedWaiter, self).add_result(future) - self._decrement_pending_calls() - - def add_exception(self, future): - super(_AllCompletedWaiter, self).add_exception(future) - if self.stop_on_exception: - self.event.set() - else: - self._decrement_pending_calls() - - def add_cancelled(self, future): - super(_AllCompletedWaiter, self).add_cancelled(future) - self._decrement_pending_calls() - -class _AcquireFutures(object): - """A context manager that does an ordered acquire of Future conditions.""" - - def __init__(self, futures): - self.futures = sorted(futures, key=id) - - def __enter__(self): - for future in self.futures: - future._condition.acquire() - - def __exit__(self, *args): - for future in self.futures: - future._condition.release() - -def _create_and_install_waiters(fs, return_when): - if return_when == FIRST_COMPLETED: - waiter = _FirstCompletedWaiter() - else: - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) - - if return_when == FIRST_EXCEPTION: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) - else: - raise ValueError("Invalid return condition: %r" % return_when) - - for f in fs: - f._waiters.append(waiter) - - return waiter - -def as_completed(fs, timeout=None): - """An iterator over the given futures that yields each as it completes. - - Args: - fs: The sequence of Futures (possibly created by different Executors) to - iterate over. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator that yields the given Futures as they complete (finished or - cancelled). - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - """ - if timeout is not None: - end_time = timeout + time.time() - - with _AcquireFutures(fs): - finished = set( - f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - pending = set(fs) - finished - waiter = _create_and_install_waiters(fs, FIRST_COMPLETED) - - try: - for future in finished: - yield future - - while pending: - if timeout is None: - wait_timeout = None - else: - wait_timeout = end_time - time.time() - if wait_timeout < 0: - raise TimeoutError( - '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) - - waiter.event.wait(timeout) - - for future in waiter.finished_futures[:]: - yield future - waiter.finished_futures.remove(future) - pending.remove(future) - - finally: - for f in fs: - f._waiters.remove(waiter) - -DoneAndNotDoneFutures = collections.namedtuple( - 'DoneAndNotDoneFutures', 'done not_done') -def wait(fs, timeout=None, return_when=ALL_COMPLETED): - """Wait for the futures in the given sequence to complete. - - Args: - fs: The sequence of Futures (possibly created by different Executors) to - wait upon. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when this function should return. The options - are: - - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises an exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - - Returns: - A named 2-tuple of sets. The first set, named 'done', contains the - futures that completed (is finished or cancelled) before the wait - completed. The second set, named 'not_done', contains uncompleted - futures. - """ - with _AcquireFutures(fs): - done = set(f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - not_done = set(fs) - done - - if (return_when == FIRST_COMPLETED) and done: - return DoneAndNotDoneFutures(done, not_done) - elif (return_when == FIRST_EXCEPTION) and done: - if any(f for f in done - if not f.cancelled() and f.exception() is not None): - return DoneAndNotDoneFutures(done, not_done) - - if len(done) == len(fs): - return DoneAndNotDoneFutures(done, not_done) - - waiter = _create_and_install_waiters(fs, return_when) - - waiter.event.wait(timeout) - for f in fs: - f._waiters.remove(waiter) - - done.update(waiter.finished_futures) - return DoneAndNotDoneFutures(done, set(fs) - done) - -class Future(object): - """Represents the result of an asynchronous computation.""" - - def __init__(self): - """Initializes the future. Should not be called by clients.""" - self._condition = threading.Condition() - self._state = PENDING - self._result = None - self._exception = None - self._waiters = [] - self._done_callbacks = [] - - def _invoke_callbacks(self): - for callback in self._done_callbacks: - try: - callback(self) - except Exception: - LOGGER.exception('exception calling callback for %r', self) - - def __repr__(self): - with self._condition: - if self._state == FINISHED: - if self._exception: - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._exception.__class__.__name__) - else: - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._result.__class__.__name__) - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state]) - - def cancel(self): - """Cancel the future if possible. - - Returns True if the future was cancelled, False otherwise. A future - cannot be cancelled if it is running or has already completed. - """ - with self._condition: - if self._state in [RUNNING, FINISHED]: - return False - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - return True - - self._state = CANCELLED - self._condition.notify_all() - - self._invoke_callbacks() - return True - - def cancelled(self): - """Return True if the future has cancelled.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] - - def running(self): - """Return True if the future is currently executing.""" - with self._condition: - return self._state == RUNNING - - def done(self): - """Return True of the future was cancelled or finished executing.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - - def __get_result(self): - if self._exception: - raise self._exception - else: - return self._result - - def add_done_callback(self, fn): - """Attaches a callable that will be called when the future finishes. - - Args: - fn: A callable that will be called with this future as its only - argument when the future completes or is cancelled. The callable - will always be called by a thread in the same process in which - it was added. If the future has already completed or been - cancelled then the callable will be called immediately. These - callables are called in the order that they were added. - """ - with self._condition: - if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: - self._done_callbacks.append(fn) - return - fn(self) - - def result(self, timeout=None): - """Return the result of the call that the future represents. - - Args: - timeout: The number of seconds to wait for the result if the future - isn't done. If None, then there is no limit on the wait time. - - Returns: - The result of the call that the future represents. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - Exception: If the call raised then that exception will be raised. - """ - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - else: - raise TimeoutError() - - def exception(self, timeout=None): - """Return the exception raised by the call that the future represents. - - Args: - timeout: The number of seconds to wait for the exception if the - future isn't done. If None, then there is no limit on the wait - time. - - Returns: - The exception raised by the call that the future represents or None - if the call completed without raising. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - """ - - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - else: - raise TimeoutError() - - # The following methods should only be used by Executors and in tests. - def set_running_or_notify_cancel(self): - """Mark the future as running or process any cancel notifications. - - Should only be used by Executor implementations and unit tests. - - If the future has been cancelled (cancel() was called and returned - True) then any threads waiting on the future completing (though calls - to as_completed() or wait()) are notified and False is returned. - - If the future was not cancelled then it is put in the running state - (future calls to running() will return True) and True is returned. - - This method should be called by Executor implementations before - executing the work associated with this future. If this method returns - False then the work should not be executed. - - Returns: - False if the Future was cancelled, True otherwise. - - Raises: - RuntimeError: if this method was already called or if set_result() - or set_exception() was called. - """ - with self._condition: - if self._state == CANCELLED: - self._state = CANCELLED_AND_NOTIFIED - for waiter in self._waiters: - waiter.add_cancelled(self) - # self._condition.notify_all() is not necessary because - # self.cancel() triggers a notification. - return False - elif self._state == PENDING: - self._state = RUNNING - return True - else: - LOGGER.critical('Future %s in unexpected state: %s', - id(self.future), - self.future._state) - raise RuntimeError('Future in unexpected state') - - def set_result(self, result): - """Sets the return value of work associated with the future. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._result = result - self._state = FINISHED - for waiter in self._waiters: - waiter.add_result(self) - self._condition.notify_all() - self._invoke_callbacks() - - def set_exception(self, exception): - """Sets the result of the future as being the given exception. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._exception = exception - self._state = FINISHED - for waiter in self._waiters: - waiter.add_exception(self) - self._condition.notify_all() - self._invoke_callbacks() - -class Executor(object): - """This is an abstract base class for concrete asynchronous executors.""" - - def submit(self, fn, *args, **kwargs): - """Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the callable. - - Returns: - A Future representing the given call. - """ - raise NotImplementedError() - - def map(self, fn, *iterables, **kwargs): - """Returns a iterator equivalent to map(fn, iter). - - Args: - fn: A callable that will take take as many arguments as there are - passed iterables. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator equivalent to: map(func, *iterables) but the calls may - be evaluated out-of-order. - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - Exception: If fn(*args) raises for any values. - """ - timeout = kwargs.get('timeout') - if timeout is not None: - end_time = timeout + time.time() - - fs = [self.submit(fn, *args) for args in zip(*iterables)] - - try: - for future in fs: - if timeout is None: - yield future.result() - else: - yield future.result(end_time - time.time()) - finally: - for future in fs: - future.cancel() - - def shutdown(self, wait=True): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - executor have been reclaimed. - """ - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown(wait=True) - return False diff --git a/python2/futures/process.py b/python2/futures/process.py deleted file mode 100644 index ec48377..0000000 --- a/python2/futures/process.py +++ /dev/null @@ -1,337 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ProcessPoolExecutor. - -The follow diagram and text describe the data-flow through the system: - -|======================= In-process =====================|== Out-of-process ==| - -+----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | -| | | 7 | | | | ... | | | -| Process | | ... | | Local | +-----------+ | Process | -| Pool | +----------+ | Worker | | #1..n | -| Executor | | Thread | | | -| | +----------- + | | +-----------+ | | -| | <=> | Work Items | <=> | | <= | Result Q | <= | | -| | +------------+ | | +-----------+ | | -| | | 6: call() | | | | ... | | | -| | | future | | | | 4, result | | | -| | | ... | | | | 3, except | | | -+----------+ +------------+ +--------+ +-----------+ +---------+ - -Executor.submit() called: -- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict -- adds the id of the _WorkItem to the "Work Ids" queue - -Local worker thread: -- reads work ids from the "Work Ids" queue and looks up the corresponding - WorkItem from the "Work Items" dict: if the work item has been cancelled then - it is simply removed from the dict, otherwise it is repackaged as a - _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" - until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because - calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). -- reads _ResultItems from "Result Q", updates the future stored in the - "Work Items" dict and deletes the dict entry - -Process #1..n: -- reads _CallItems from "Call Q", executes the calls, and puts the resulting - _ResultItems in "Request Q" -""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import atexit -import _base -import Queue -import multiprocessing -import threading -import weakref - -# Workers are created as daemon threads and processes. This is done to allow the -# interpreter to exit when there are still idle processes in a -# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, -# allowing workers to die with the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads/processes finish. - -_thread_references = set() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - -# Controls how many more calls than processes will be queued in the call queue. -# A smaller number will mean that processes spend more time idle waiting for -# work while a larger number will make Future.cancel() succeed less frequently -# (Futures in the call queue cannot be cancelled). -EXTRA_QUEUED_CALLS = 1 - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - -class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None): - self.work_id = work_id - self.exception = exception - self.result = result - -class _CallItem(object): - def __init__(self, work_id, fn, args, kwargs): - self.work_id = work_id - self.fn = fn - self.args = args - self.kwargs = kwargs - -def _process_worker(call_queue, result_queue, shutdown): - """Evaluates calls from call_queue and places the results in result_queue. - - This worker is run in a seperate process. - - Args: - call_queue: A multiprocessing.Queue of _CallItems that will be read and - evaluated by the worker. - result_queue: A multiprocessing.Queue of _ResultItems that will written - to by the worker. - shutdown: A multiprocessing.Event that will be set as a signal to the - worker that it should exit when call_queue is empty. - """ - while True: - try: - call_item = call_queue.get(block=True, timeout=0.1) - except Queue.Empty: - if shutdown.is_set(): - return - else: - try: - r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException as e: - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) - else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) - -def _add_call_item_to_queue(pending_work_items, - work_ids, - call_queue): - """Fills call_queue with _WorkItems from pending_work_items. - - This function never blocks. - - Args: - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids - are consumed and the corresponding _WorkItems from - pending_work_items are transformed into _CallItems and put in - call_queue. - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems. - """ - while True: - if call_queue.full(): - return - try: - work_id = work_ids.get(block=False) - except Queue.Empty: - return - else: - work_item = pending_work_items[work_id] - - if work_item.future.set_running_or_notify_cancel(): - call_queue.put(_CallItem(work_id, - work_item.fn, - work_item.args, - work_item.kwargs), - block=True) - else: - del pending_work_items[work_id] - continue - -def _queue_manangement_worker(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - shutdown_process_event): - """Manages the communication between this process and the worker processes. - - This function is run in a local thread. - - Args: - executor_reference: A weakref.ref to the ProcessPoolExecutor that owns - this thread. Used to determine if the ProcessPoolExecutor has been - garbage collected and that this function can exit. - process: A list of the multiprocessing.Process instances used as - workers. - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems for processing by the process workers. - result_queue: A multiprocessing.Queue of _ResultItems generated by the - process workers. - shutdown_process_event: A multiprocessing.Event used to signal the - process workers that they should exit when their work queue is - empty. - """ - while True: - _add_call_item_to_queue(pending_work_items, - work_ids_queue, - call_queue) - - try: - result_item = result_queue.get(block=True, timeout=0.1) - except Queue.Empty: - executor = executor_reference() - # No more work items can be added if: - # - The interpreter is shutting down OR - # - The executor that owns this worker has been collected OR - # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_process_event.set() - - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - return - del executor - else: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] - - if result_item.exception: - work_item.future.set_exception(result_item.exception) - else: - work_item.future.set_result(result_item.result) - -class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): - """Initializes a new ProcessPoolExecutor instance. - - Args: - max_workers: The maximum number of processes that can be used to - execute the given calls. If None or not given then as many - worker processes will be created as the machine has processors. - """ - _remove_dead_thread_references() - - if max_workers is None: - self._max_workers = multiprocessing.cpu_count() - else: - self._max_workers = max_workers - - # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from idling. But don't make it too big - # because futures in the call queue cannot be cancelled. - self._call_queue = multiprocessing.Queue(self._max_workers + - EXTRA_QUEUED_CALLS) - self._result_queue = multiprocessing.Queue() - self._work_ids = Queue.Queue() - self._queue_management_thread = None - self._processes = set() - - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_process_event = multiprocessing.Event() - self._shutdown_lock = threading.Lock() - self._queue_count = 0 - self._pending_work_items = {} - - def _start_queue_management_thread(self): - if self._queue_management_thread is None: - self._queue_management_thread = threading.Thread( - target=_queue_manangement_worker, - args=(weakref.ref(self), - self._processes, - self._pending_work_items, - self._work_ids, - self._call_queue, - self._result_queue, - self._shutdown_process_event)) - self._queue_management_thread.daemon = True - self._queue_management_thread.start() - _thread_references.add(weakref.ref(self._queue_management_thread)) - - def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_workers): - p = multiprocessing.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._shutdown_process_event)) - p.start() - self._processes.add(p) - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown_thread: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._pending_work_items[self._queue_count] = w - self._work_ids.put(self._queue_count) - self._queue_count += 1 - - self._start_queue_management_thread() - self._adjust_process_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown_thread = True - if wait: - if self._queue_management_thread: - self._queue_management_thread.join() - # To reduce the risk of openning too many files, remove references to - # objects that use file descriptors. - self._queue_management_thread = None - self._call_queue = None - self._result_queue = None - self._shutdown_process_event = None - self._processes = None - shutdown.__doc__ = _base.Executor.shutdown.__doc__ - -atexit.register(_python_exit) diff --git a/python2/futures/thread.py b/python2/futures/thread.py deleted file mode 100644 index 3f1584a..0000000 --- a/python2/futures/thread.py +++ /dev/null @@ -1,136 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ThreadPoolExecutor.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import atexit -import _base -import Queue -import threading -import weakref - -# Workers are created as daemon threads. This is done to allow the interpreter -# to exit when there are still idle threads in a ThreadPoolExecutor's thread -# pool (i.e. shutdown() was not called). However, allowing workers to die with -# the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads finish. - -_thread_references = set() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - -atexit.register(_python_exit) - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - - def run(self): - if not self.future.set_running_or_notify_cancel(): - return - - try: - result = self.fn(*self.args, **self.kwargs) - except BaseException as e: - self.future.set_exception(e) - else: - self.future.set_result(result) - -def _worker(executor_reference, work_queue): - try: - while True: - try: - work_item = work_queue.get(block=True, timeout=0.1) - except Queue.Empty: - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - return - del executor - else: - work_item.run() - except BaseException as e: - _base.LOGGER.critical('Exception in worker', exc_info=True) - -class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers): - """Initializes a new ThreadPoolExecutor instance. - - Args: - max_workers: The maximum number of threads that can be used to - execute the given calls. - """ - _remove_dead_thread_references() - - self._max_workers = max_workers - self._work_queue = Queue.Queue() - self._threads = set() - self._shutdown = False - self._shutdown_lock = threading.Lock() - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._work_queue.put(w) - self._adjust_thread_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def _adjust_thread_count(self): - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. - if len(self._threads) < self._max_workers: - t = threading.Thread(target=_worker, - args=(weakref.ref(self), self._work_queue)) - t.daemon = True - t.start() - self._threads.add(t) - _thread_references.add(weakref.ref(t)) - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown = True - if wait: - for t in self._threads: - t.join() - shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/python2/primes.py b/python2/primes.py deleted file mode 100644 index fa6c355..0000000 --- a/python2/primes.py +++ /dev/null @@ -1,47 +0,0 @@ -import futures -import math -import time - -PRIMES = [ - 112272535095293, - 112582705942171, - 112272535095293, - 115280095190773, - 115797848077099, - 117450548693743, - 993960000099397] - -def is_prime(n): - if n % 2 == 0: - return False - - sqrt_n = int(math.floor(math.sqrt(n))) - for i in range(3, sqrt_n + 1, 2): - if n % i == 0: - return False - return True - -def sequential(): - return list(map(is_prime, PRIMES)) - -def with_process_pool_executor(): - with futures.ProcessPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def with_thread_pool_executor(): - with futures.ThreadPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def main(): - for name, fn in [('sequential', sequential), - ('processes', with_process_pool_executor), - ('threads', with_thread_pool_executor)]: - print name.ljust(12), - start = time.time() - if fn() != [True] * len(PRIMES): - print 'failed' - else: - print '%.2f seconds' % (time.time() - start) - -if __name__ == '__main__': - main() diff --git a/python2/setup.py b/python2/setup.py deleted file mode 100755 index fcd05f2..0000000 --- a/python2/setup.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python3 - -from distutils.core import setup - -setup(name='futures3', - version='1.0', - description='Java-style futures implementation in Python 3.x', - author='Brian Quinlan', - author_email='brian@sweetapp.com', - url='http://code.google.com/p/pythonfutures', - download_url='http://pypi.python.org/pypi/futures3/', - packages=['futures'], - license='BSD', - classifiers=['License :: OSI Approved :: BSD License', - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'Programming Language :: Python :: 3'] - ) diff --git a/python2/test_futures.py b/python2/test_futures.py deleted file mode 100644 index 2d5672b..0000000 --- a/python2/test_futures.py +++ /dev/null @@ -1,811 +0,0 @@ -import logging -import multiprocessing -import re -import StringIO -import sys -import threading -from test import test_support -import time -import unittest - -if sys.platform.startswith('win'): - import ctypes - import ctypes.wintypes - -import futures -from futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, - LOGGER, STDERR_HANDLER, wait) -import futures.process - -def create_future(state=PENDING, exception=None, result=None): - f = Future() - f._state = state - f._exception = exception - f._result = result - return f - -PENDING_FUTURE = create_future(state=PENDING) -RUNNING_FUTURE = create_future(state=RUNNING) -CANCELLED_FUTURE = create_future(state=CANCELLED) -CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) -EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) -SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) - -def mul(x, y): - return x * y - -class Call(object): - """A call that can be submitted to a future.Executor for testing. - - The call signals when it is called and waits for an event before finishing. - """ - CALL_LOCKS = {} - def _create_event(self): - if sys.platform.startswith('win'): - class SECURITY_ATTRIBUTES(ctypes.Structure): - _fields_ = [("nLength", ctypes.wintypes.DWORD), - ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), - ("bInheritHandle", ctypes.wintypes.BOOL)] - - s = SECURITY_ATTRIBUTES() - s.nLength = ctypes.sizeof(s) - s.lpSecurityDescriptor = None - s.bInheritHandle = True - - handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), - True, - False, - None) - assert handle is not None - return handle - else: - event = multiprocessing.Event() - self.CALL_LOCKS[id(event)] = event - return id(event) - - def _wait_on_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) - assert r == 0 - else: - self.CALL_LOCKS[handle].wait() - - def _signal_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.SetEvent(handle) - assert r != 0 - else: - self.CALL_LOCKS[handle].set() - - def __init__(self, manual_finish=False, result=42): - self._called_event = self._create_event() - self._can_finish = self._create_event() - - self._result = result - - if not manual_finish: - self._signal_event(self._can_finish) - - def wait_on_called(self): - self._wait_on_event(self._called_event) - - def set_can(self): - self._signal_event(self._can_finish) - - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - - return self._result - - def close(self): - self.set_can() - if sys.platform.startswith('win'): - ctypes.windll.kernel32.CloseHandle(self._called_event) - ctypes.windll.kernel32.CloseHandle(self._can_finish) - else: - del self.CALL_LOCKS[self._called_event] - del self.CALL_LOCKS[self._can_finish] - -class ExceptionCall(Call): - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - raise ZeroDivisionError() - -class MapCall(Call): - def __init__(self, result=42): - super(MapCall, self).__init__(manual_finish=True, result=result) - - def __call__(self, manual_finish): - if manual_finish: - super(MapCall, self).__call__() - return self._result - -class ExecutorShutdownTest(unittest.TestCase): - def test_run_after_shutdown(self): - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.submit, - pow, 2, 5) - - - def _start_some_futures(self): - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - - try: - self.executor.submit(call1) - self.executor.submit(call2) - self.executor.submit(call3) - - call1.wait_on_called() - call2.wait_on_called() - call3.wait_on_called() - - call1.set_can() - call2.set_can() - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() - -class ThreadPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_threads_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._threads), 3) - self.executor.shutdown() - for t in self.executor._threads: - t.join() - - def test_context_manager_shutdown(self): - with futures.ThreadPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for t in executor._threads: - t.join() - - def test_del_shutdown(self): - executor = futures.ThreadPoolExecutor(max_workers=5) - executor.map(abs, range(-5, 5)) - threads = executor._threads - del executor - - for t in threads: - t.join() - -class ProcessPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_processes_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._processes), 5) - processes = self.executor._processes - self.executor.shutdown() - - for p in processes: - p.join() - - def test_context_manager_shutdown(self): - with futures.ProcessPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for p in self.executor._processes: - p.join() - - def test_del_shutdown(self): - executor = futures.ProcessPoolExecutor(max_workers=5) - list(executor.map(abs, range(-5, 5))) - queue_management_thread = executor._queue_management_thread - processes = executor._processes - del executor - - queue_management_thread.join() - for p in processes: - p.join() - -class WaitTests(unittest.TestCase): - def test_first_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - done, not_done = futures.wait( - [CANCELLED_FUTURE, future1, future2], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([future1]), done) - self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) - finally: - call1.close() - call2.close() - - def test_first_completed_one_already_completed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, future1], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_first_exception(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2, future3], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set([future3]), pending) - finally: - call1.close() - call2.close() - call3.close() - - def test_first_exception_some_already_complete(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = ExceptionCall(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1]), finished) - self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) - - - finally: - call1.close() - call2.close() - - def test_first_exception_one_already_failed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [EXCEPTION_FUTURE, future1], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([EXCEPTION_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_all_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set(), pending) - - - finally: - call1.close() - call2.close() - - def test_all_completed_some_already_completed(self): - def wait_test(): - while not future1._waiters: - pass - - future4.cancel() - call1.set_can() - call2.set_can() - call3.set_can() - - self.assertTrue( - futures.process.EXTRA_QUEUED_CALLS <= 1, - 'this test assumes that future4 will be cancelled before it is ' - 'queued to run - which might not be the case if ' - 'ProcessPoolExecutor is too aggresive in scheduling futures') - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - future4 = self.executor.submit(call4) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4]), - finished) - self.assertEquals(set(), pending) - finally: - call1.close() - call2.close() - call3.close() - call4.close() - - def test_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2], - timeout=1, - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1]), finished) - self.assertEquals(set([future2]), pending) - - - finally: - call1.close() - call2.close() - - -class ThreadPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class AsCompletedTests(unittest.TestCase): - # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. - def test_no_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - completed = set(futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2])) - self.assertEquals(set( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2]), - completed) - finally: - call1.close() - call2.close() - - def test_zero_timeout(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - completed_futures = set() - try: - for future in futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1], - timeout=0): - completed_futures.add(future) - except futures.TimeoutError: - pass - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]), - completed_futures) - finally: - call1.close() - -class ThreadPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ExecutorTest(unittest.TestCase): - # Executor.shutdown() and context manager usage is tested by - # ExecutorShutdownTest. - def test_submit(self): - future = self.executor.submit(pow, 2, 8) - self.assertEquals(256, future.result()) - - def test_submit_keyword(self): - future = self.executor.submit(mul, 2, y=8) - self.assertEquals(16, future.result()) - - def test_map(self): - self.assertEqual( - list(self.executor.map(pow, range(10), range(10))), - list(map(pow, range(10), range(10)))) - - def test_map_exception(self): - i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) - self.assertEqual(i.next(), (0, 1)) - self.assertEqual(i.next(), (0, 1)) - self.assertRaises(ZeroDivisionError, i.next) - - def test_map_timeout(self): - results = [] - timeout_call = MapCall() - try: - try: - for i in self.executor.map(timeout_call, - [False, False, True], - timeout=1): - results.append(i) - except futures.TimeoutError: - pass - else: - self.fail('expected TimeoutError') - finally: - timeout_call.close() - - self.assertEquals([42, 42], results) - -class ThreadPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class FutureTests(unittest.TestCase): - def test_done_callback_with_result(self): - self.callback_result = None - def fn(callback_future): - self.callback_result = callback_future.result() - - f = Future() - f.add_done_callback(fn) - f.set_result(5) - self.assertEquals(5, self.callback_result) - - def test_done_callback_with_exception(self): - self.callback_exception = None - def fn(callback_future): - self.callback_exception = callback_future.exception() - - f = Future() - f.add_done_callback(fn) - f.set_exception(Exception('test')) - self.assertEquals(('test',), self.callback_exception.args) - - def test_done_callback_with_cancel(self): - self.was_cancelled = None - def fn(callback_future): - self.was_cancelled = callback_future.cancelled() - - f = Future() - f.add_done_callback(fn) - self.assertTrue(f.cancel()) - self.assertTrue(self.was_cancelled) - - def test_done_callback_raises(self): - LOGGER.removeHandler(STDERR_HANDLER) - logging_stream = StringIO.StringIO() - handler = logging.StreamHandler(logging_stream) - LOGGER.addHandler(handler) - try: - self.raising_was_called = False - self.fn_was_called = False - - def raising_fn(callback_future): - self.raising_was_called = True - raise Exception('doh!') - - def fn(callback_future): - self.fn_was_called = True - - f = Future() - f.add_done_callback(raising_fn) - f.add_done_callback(fn) - f.set_result(5) - self.assertTrue(self.raising_was_called) - self.assertTrue(self.fn_was_called) - self.assertTrue('Exception: doh!' in logging_stream.getvalue()) - finally: - LOGGER.removeHandler(handler) - LOGGER.addHandler(STDERR_HANDLER) - - def test_done_callback_already_successful(self): - self.callback_result = None - def fn(callback_future): - self.callback_result = callback_future.result() - - f = Future() - f.set_result(5) - f.add_done_callback(fn) - self.assertEquals(5, self.callback_result) - - def test_done_callback_already_failed(self): - self.callback_exception = None - def fn(callback_future): - self.callback_exception = callback_future.exception() - - f = Future() - f.set_exception(Exception('test')) - f.add_done_callback(fn) - self.assertEquals(('test',), self.callback_exception.args) - - def test_done_callback_already_cancelled(self): - self.was_cancelled = None - def fn(callback_future): - self.was_cancelled = callback_future.cancelled() - - f = Future() - self.assertTrue(f.cancel()) - f.add_done_callback(fn) - self.assertTrue(self.was_cancelled) - - def test_repr(self): - self.assertTrue(re.match('', - repr(PENDING_FUTURE))) - self.assertTrue(re.match('', - repr(RUNNING_FUTURE))) - self.assertTrue(re.match('', - repr(CANCELLED_FUTURE))) - self.assertTrue(re.match('', - repr(CANCELLED_AND_NOTIFIED_FUTURE))) - self.assertTrue(re.match( - '', - repr(EXCEPTION_FUTURE))) - self.assertTrue(re.match( - '', - repr(SUCCESSFUL_FUTURE))) - - - def test_cancel(self): - f1 = create_future(state=PENDING) - f2 = create_future(state=RUNNING) - f3 = create_future(state=CANCELLED) - f4 = create_future(state=CANCELLED_AND_NOTIFIED) - f5 = create_future(state=FINISHED, exception=IOError()) - f6 = create_future(state=FINISHED, result=5) - - self.assertTrue(f1.cancel()) - self.assertEquals(f1._state, CANCELLED) - - self.assertFalse(f2.cancel()) - self.assertEquals(f2._state, RUNNING) - - self.assertTrue(f3.cancel()) - self.assertEquals(f3._state, CANCELLED) - - self.assertTrue(f4.cancel()) - self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) - - self.assertFalse(f5.cancel()) - self.assertEquals(f5._state, FINISHED) - - self.assertFalse(f6.cancel()) - self.assertEquals(f6._state, FINISHED) - - def test_cancelled(self): - self.assertFalse(PENDING_FUTURE.cancelled()) - self.assertFalse(RUNNING_FUTURE.cancelled()) - self.assertTrue(CANCELLED_FUTURE.cancelled()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) - self.assertFalse(EXCEPTION_FUTURE.cancelled()) - self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) - - def test_done(self): - self.assertFalse(PENDING_FUTURE.done()) - self.assertFalse(RUNNING_FUTURE.done()) - self.assertTrue(CANCELLED_FUTURE.done()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) - self.assertTrue(EXCEPTION_FUTURE.done()) - self.assertTrue(SUCCESSFUL_FUTURE.done()) - - def test_running(self): - self.assertFalse(PENDING_FUTURE.running()) - self.assertTrue(RUNNING_FUTURE.running()) - self.assertFalse(CANCELLED_FUTURE.running()) - self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) - self.assertFalse(EXCEPTION_FUTURE.running()) - self.assertFalse(SUCCESSFUL_FUTURE.running()) - - def test_result_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.result, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) - self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) - self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) - - def test_result_with_success(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.set_result(42) - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertEquals(f1.result(timeout=5), 42) - - def test_result_with_cancel(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.cancel() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertRaises(futures.CancelledError, f1.result, timeout=5) - - def test_exception_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.exception, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) - self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), - IOError)) - self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) - - def test_exception_with_success(self): - def notification(): - # Wait until the main thread is waiting for the exception. - time.sleep(1) - with f1._condition: - f1._state = FINISHED - f1._exception = IOError() - f1._condition.notify_all() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) - -def test_main(): - test_support.run_unittest(ProcessPoolExecutorTest, - ThreadPoolExecutorTest, - ProcessPoolWaitTests, - ThreadPoolWaitTests, - ProcessPoolAsCompletedTests, - ThreadPoolAsCompletedTests, - FutureTests, - ProcessPoolShutdownTest, - ThreadPoolShutdownTest) - -if __name__ == "__main__": - test_main() diff --git a/python3/crawl.py b/python3/crawl.py deleted file mode 100644 index 7135682..0000000 --- a/python3/crawl.py +++ /dev/null @@ -1,68 +0,0 @@ -"""Compare the speed of downloading URLs sequentially vs. using futures.""" - -import datetime -import functools -import futures.thread -import time -import timeit -import urllib.request - -URLS = ['http://www.google.com/', - 'http://www.apple.com/', - 'http://www.ibm.com', - 'http://www.thisurlprobablydoesnotexist.com', - 'http://www.slashdot.org/', - 'http://www.python.org/', - 'http://www.bing.com/', - 'http://www.facebook.com/', - 'http://www.yahoo.com/', - 'http://www.youtube.com/', - 'http://www.blogger.com/'] - -def load_url(url, timeout): - return urllib.request.urlopen(url, timeout=timeout).read() - -def download_urls_sequential(urls, timeout=60): - url_to_content = {} - for url in urls: - try: - url_to_content[url] = load_url(url, timeout=timeout) - except: - pass - return url_to_content - -def download_urls_with_executor(urls, executor, timeout=60): - try: - url_to_content = {} - future_to_url = dict((executor.submit(load_url, url, timeout), url) - for url in urls) - - for future in futures.as_completed(future_to_url): - try: - url_to_content[future_to_url[future]] = future.result() - except: - pass - return url_to_content - finally: - executor.shutdown() - -def main(): - for name, fn in [('sequential', - functools.partial(download_urls_sequential, URLS)), - ('processes', - functools.partial(download_urls_with_executor, - URLS, - futures.ProcessPoolExecutor(10))), - ('threads', - functools.partial(download_urls_with_executor, - URLS, - futures.ThreadPoolExecutor(10)))]: - print('%s: ' % name.ljust(12), end='') - start = time.time() - url_map = fn() - print('%.2f seconds (%d of %d downloaded)' % (time.time() - start, - len(url_map), - len(URLS))) - -if __name__ == '__main__': - main() diff --git a/python3/futures/__init__.py b/python3/futures/__init__.py deleted file mode 100644 index 8331d53..0000000 --- a/python3/futures/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Execute computations asynchronously using threads or processes.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -from futures._base import (FIRST_COMPLETED, - FIRST_EXCEPTION, - ALL_COMPLETED, - CancelledError, - TimeoutError, - Future, - Executor, - wait, - as_completed) -from futures.process import ProcessPoolExecutor -from futures.thread import ThreadPoolExecutor diff --git a/python3/futures/_base.py b/python3/futures/_base.py deleted file mode 100644 index 561f4d2..0000000 --- a/python3/futures/_base.py +++ /dev/null @@ -1,540 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import collections -import functools -import logging -import threading -import time - -FIRST_COMPLETED = 'FIRST_COMPLETED' -FIRST_EXCEPTION = 'FIRST_EXCEPTION' -ALL_COMPLETED = 'ALL_COMPLETED' - -# Possible future states (for internal use by the futures package). -PENDING = 'PENDING' -RUNNING = 'RUNNING' -# The future was cancelled by the user... -CANCELLED = 'CANCELLED' -# ...and _Waiter.add_cancelled() was called by a worker. -CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' -FINISHED = 'FINISHED' - -_FUTURE_STATES = [ - PENDING, - RUNNING, - CANCELLED, - CANCELLED_AND_NOTIFIED, - FINISHED -] - -_STATE_TO_DESCRIPTION_MAP = { - PENDING: "pending", - RUNNING: "running", - CANCELLED: "cancelled", - CANCELLED_AND_NOTIFIED: "cancelled", - FINISHED: "finished" -} - -# Logger for internal use by the futures package. -LOGGER = logging.getLogger("futures") -STDERR_HANDLER = logging.StreamHandler() -LOGGER.addHandler(STDERR_HANDLER) - -class Error(Exception): - """Base class for all future-related exceptions.""" - pass - -class CancelledError(Error): - """The Future was cancelled.""" - pass - -class TimeoutError(Error): - """The operation exceeded the given deadline.""" - pass - -class _Waiter(object): - """Provides the event that wait() and as_completed() block on.""" - def __init__(self): - self.event = threading.Event() - self.finished_futures = [] - - def add_result(self, future): - self.finished_futures.append(future) - - def add_exception(self, future): - self.finished_futures.append(future) - - def add_cancelled(self, future): - self.finished_futures.append(future) - -class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED) and as_completed().""" - - def add_result(self, future): - super().add_result(future) - self.event.set() - - def add_exception(self, future): - super().add_exception(future) - self.event.set() - - def add_cancelled(self, future): - super().add_cancelled(future) - self.event.set() - -class _AllCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" - - def __init__(self, num_pending_calls, stop_on_exception): - self.num_pending_calls = num_pending_calls - self.stop_on_exception = stop_on_exception - super().__init__() - - def _decrement_pending_calls(self): - self.num_pending_calls -= 1 - if not self.num_pending_calls: - self.event.set() - - def add_result(self, future): - super().add_result(future) - self._decrement_pending_calls() - - def add_exception(self, future): - super().add_exception(future) - if self.stop_on_exception: - self.event.set() - else: - self._decrement_pending_calls() - - def add_cancelled(self, future): - super().add_cancelled(future) - self._decrement_pending_calls() - -class _AcquireFutures(object): - """A context manager that does an ordered acquire of Future conditions.""" - - def __init__(self, futures): - self.futures = sorted(futures, key=id) - - def __enter__(self): - for future in self.futures: - future._condition.acquire() - - def __exit__(self, *args): - for future in self.futures: - future._condition.release() - -def _create_and_install_waiters(fs, return_when): - if return_when == FIRST_COMPLETED: - waiter = _FirstCompletedWaiter() - else: - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) - - if return_when == FIRST_EXCEPTION: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) - else: - raise ValueError("Invalid return condition: %r" % return_when) - - for f in fs: - f._waiters.append(waiter) - - return waiter - -def as_completed(fs, timeout=None): - """An iterator over the given futures that yields each as it completes. - - Args: - fs: The sequence of Futures (possibly created by different Executors) to - iterate over. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator that yields the given Futures as they complete (finished or - cancelled). - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - """ - if timeout is not None: - end_time = timeout + time.time() - - with _AcquireFutures(fs): - finished = set( - f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - pending = set(fs) - finished - waiter = _create_and_install_waiters(fs, FIRST_COMPLETED) - - try: - for future in finished: - yield future - - while pending: - if timeout is None: - wait_timeout = None - else: - wait_timeout = end_time - time.time() - if wait_timeout < 0: - raise TimeoutError( - '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) - - waiter.event.wait(timeout) - - for future in waiter.finished_futures[:]: - yield future - waiter.finished_futures.remove(future) - pending.remove(future) - - finally: - for f in fs: - f._waiters.remove(waiter) - -DoneAndNotDoneFutures = collections.namedtuple( - 'DoneAndNotDoneFutures', 'done not_done') -def wait(fs, timeout=None, return_when=ALL_COMPLETED): - """Wait for the futures in the given sequence to complete. - - Args: - fs: The sequence of Futures (possibly created by different Executors) to - wait upon. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when this function should return. The options - are: - - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises an exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - - Returns: - A named 2-tuple of sets. The first set, named 'done', contains the - futures that completed (is finished or cancelled) before the wait - completed. The second set, named 'not_done', contains uncompleted - futures. - """ - with _AcquireFutures(fs): - done = set(f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - not_done = set(fs) - done - - if (return_when == FIRST_COMPLETED) and done: - return DoneAndNotDoneFutures(done, not_done) - elif (return_when == FIRST_EXCEPTION) and done: - if any(f for f in done - if not f.cancelled() and f.exception() is not None): - return DoneAndNotDoneFutures(done, not_done) - - if len(done) == len(fs): - return DoneAndNotDoneFutures(done, not_done) - - waiter = _create_and_install_waiters(fs, return_when) - - waiter.event.wait(timeout) - for f in fs: - f._waiters.remove(waiter) - - done.update(waiter.finished_futures) - return DoneAndNotDoneFutures(done, set(fs) - done) - -class Future(object): - """Represents the result of an asynchronous computation.""" - - def __init__(self): - """Initializes the future. Should not be called by clients.""" - self._condition = threading.Condition() - self._state = PENDING - self._result = None - self._exception = None - self._waiters = [] - self._done_callbacks = [] - - def _invoke_callbacks(self): - for callback in self._done_callbacks: - try: - callback(self) - except Exception: - LOGGER.exception('exception calling callback for %r', self) - - def __repr__(self): - with self._condition: - if self._state == FINISHED: - if self._exception: - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._exception.__class__.__name__) - else: - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._result.__class__.__name__) - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state]) - - def cancel(self): - """Cancel the future if possible. - - Returns True if the future was cancelled, False otherwise. A future - cannot be cancelled if it is running or has already completed. - """ - with self._condition: - if self._state in [RUNNING, FINISHED]: - return False - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - return True - - self._state = CANCELLED - self._condition.notify_all() - - self._invoke_callbacks() - return True - - def cancelled(self): - """Return True if the future has cancelled.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] - - def running(self): - """Return True if the future is currently executing.""" - with self._condition: - return self._state == RUNNING - - def done(self): - """Return True of the future was cancelled or finished executing.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - - def __get_result(self): - if self._exception: - raise self._exception - else: - return self._result - - def add_done_callback(self, fn): - """Attaches a callable that will be called when the future finishes. - - Args: - fn: A callable that will be called with this future as its only - argument when the future completes or is cancelled. The callable - will always be called by a thread in the same process in which - it was added. If the future has already completed or been - cancelled then the callable will be called immediately. These - callables are called in the order that they were added. - """ - with self._condition: - if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: - self._done_callbacks.append(fn) - return - fn(self) - - def result(self, timeout=None): - """Return the result of the call that the future represents. - - Args: - timeout: The number of seconds to wait for the result if the future - isn't done. If None, then there is no limit on the wait time. - - Returns: - The result of the call that the future represents. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - Exception: If the call raised then that exception will be raised. - """ - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - else: - raise TimeoutError() - - def exception(self, timeout=None): - """Return the exception raised by the call that the future represents. - - Args: - timeout: The number of seconds to wait for the exception if the - future isn't done. If None, then there is no limit on the wait - time. - - Returns: - The exception raised by the call that the future represents or None - if the call completed without raising. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - """ - - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - else: - raise TimeoutError() - - # The following methods should only be used by Executors and in tests. - def set_running_or_notify_cancel(self): - """Mark the future as running or process any cancel notifications. - - Should only be used by Executor implementations and unit tests. - - If the future has been cancelled (cancel() was called and returned - True) then any threads waiting on the future completing (though calls - to as_completed() or wait()) are notified and False is returned. - - If the future was not cancelled then it is put in the running state - (future calls to running() will return True) and True is returned. - - This method should be called by Executor implementations before - executing the work associated with this future. If this method returns - False then the work should not be executed. - - Returns: - False if the Future was cancelled, True otherwise. - - Raises: - RuntimeError: if this method was already called or if set_result() - or set_exception() was called. - """ - with self._condition: - if self._state == CANCELLED: - self._state = CANCELLED_AND_NOTIFIED - for waiter in self._waiters: - waiter.add_cancelled(self) - # self._condition.notify_all() is not necessary because - # self.cancel() triggers a notification. - return False - elif self._state == PENDING: - self._state = RUNNING - return True - else: - LOGGER.critical('Future %s in unexpected state: %s', - id(self.future), - self.future._state) - raise RuntimeError('Future in unexpected state') - - def set_result(self, result): - """Sets the return value of work associated with the future. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._result = result - self._state = FINISHED - for waiter in self._waiters: - waiter.add_result(self) - self._condition.notify_all() - self._invoke_callbacks() - - def set_exception(self, exception): - """Sets the result of the future as being the given exception. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._exception = exception - self._state = FINISHED - for waiter in self._waiters: - waiter.add_exception(self) - self._condition.notify_all() - self._invoke_callbacks() - -class Executor(object): - """This is an abstract base class for concrete asynchronous executors.""" - - def submit(self, fn, *args, **kwargs): - """Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the callable. - - Returns: - A Future representing the given call. - """ - raise NotImplementedError() - - def map(self, fn, *iterables, timeout=None): - """Returns a iterator equivalent to map(fn, iter). - - Args: - fn: A callable that will take take as many arguments as there are - passed iterables. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator equivalent to: map(func, *iterables) but the calls may - be evaluated out-of-order. - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - Exception: If fn(*args) raises for any values. - """ - if timeout is not None: - end_time = timeout + time.time() - - fs = [self.submit(fn, *args) for args in zip(*iterables)] - - try: - for future in fs: - if timeout is None: - yield future.result() - else: - yield future.result(end_time - time.time()) - finally: - for future in fs: - future.cancel() - - def shutdown(self, wait=True): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - executor have been reclaimed. - """ - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown(wait=True) - return False diff --git a/python3/futures/process.py b/python3/futures/process.py deleted file mode 100644 index 6de870f..0000000 --- a/python3/futures/process.py +++ /dev/null @@ -1,337 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ProcessPoolExecutor. - -The follow diagram and text describe the data-flow through the system: - -|======================= In-process =====================|== Out-of-process ==| - -+----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | -| | | 7 | | | | ... | | | -| Process | | ... | | Local | +-----------+ | Process | -| Pool | +----------+ | Worker | | #1..n | -| Executor | | Thread | | | -| | +----------- + | | +-----------+ | | -| | <=> | Work Items | <=> | | <= | Result Q | <= | | -| | +------------+ | | +-----------+ | | -| | | 6: call() | | | | ... | | | -| | | future | | | | 4, result | | | -| | | ... | | | | 3, except | | | -+----------+ +------------+ +--------+ +-----------+ +---------+ - -Executor.submit() called: -- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict -- adds the id of the _WorkItem to the "Work Ids" queue - -Local worker thread: -- reads work ids from the "Work Ids" queue and looks up the corresponding - WorkItem from the "Work Items" dict: if the work item has been cancelled then - it is simply removed from the dict, otherwise it is repackaged as a - _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" - until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because - calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). -- reads _ResultItems from "Result Q", updates the future stored in the - "Work Items" dict and deletes the dict entry - -Process #1..n: -- reads _CallItems from "Call Q", executes the calls, and puts the resulting - _ResultItems in "Request Q" -""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import atexit -from futures import _base -import queue -import multiprocessing -import threading -import weakref - -# Workers are created as daemon threads and processes. This is done to allow the -# interpreter to exit when there are still idle processes in a -# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, -# allowing workers to die with the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads/processes finish. - -_thread_references = set() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - -# Controls how many more calls than processes will be queued in the call queue. -# A smaller number will mean that processes spend more time idle waiting for -# work while a larger number will make Future.cancel() succeed less frequently -# (Futures in the call queue cannot be cancelled). -EXTRA_QUEUED_CALLS = 1 - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - -class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None): - self.work_id = work_id - self.exception = exception - self.result = result - -class _CallItem(object): - def __init__(self, work_id, fn, args, kwargs): - self.work_id = work_id - self.fn = fn - self.args = args - self.kwargs = kwargs - -def _process_worker(call_queue, result_queue, shutdown): - """Evaluates calls from call_queue and places the results in result_queue. - - This worker is run in a seperate process. - - Args: - call_queue: A multiprocessing.Queue of _CallItems that will be read and - evaluated by the worker. - result_queue: A multiprocessing.Queue of _ResultItems that will written - to by the worker. - shutdown: A multiprocessing.Event that will be set as a signal to the - worker that it should exit when call_queue is empty. - """ - while True: - try: - call_item = call_queue.get(block=True, timeout=0.1) - except queue.Empty: - if shutdown.is_set(): - return - else: - try: - r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException as e: - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) - else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) - -def _add_call_item_to_queue(pending_work_items, - work_ids, - call_queue): - """Fills call_queue with _WorkItems from pending_work_items. - - This function never blocks. - - Args: - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids - are consumed and the corresponding _WorkItems from - pending_work_items are transformed into _CallItems and put in - call_queue. - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems. - """ - while True: - if call_queue.full(): - return - try: - work_id = work_ids.get(block=False) - except queue.Empty: - return - else: - work_item = pending_work_items[work_id] - - if work_item.future.set_running_or_notify_cancel(): - call_queue.put(_CallItem(work_id, - work_item.fn, - work_item.args, - work_item.kwargs), - block=True) - else: - del pending_work_items[work_id] - continue - -def _queue_manangement_worker(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - shutdown_process_event): - """Manages the communication between this process and the worker processes. - - This function is run in a local thread. - - Args: - executor_reference: A weakref.ref to the ProcessPoolExecutor that owns - this thread. Used to determine if the ProcessPoolExecutor has been - garbage collected and that this function can exit. - process: A list of the multiprocessing.Process instances used as - workers. - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems for processing by the process workers. - result_queue: A multiprocessing.Queue of _ResultItems generated by the - process workers. - shutdown_process_event: A multiprocessing.Event used to signal the - process workers that they should exit when their work queue is - empty. - """ - while True: - _add_call_item_to_queue(pending_work_items, - work_ids_queue, - call_queue) - - try: - result_item = result_queue.get(block=True, timeout=0.1) - except queue.Empty: - executor = executor_reference() - # No more work items can be added if: - # - The interpreter is shutting down OR - # - The executor that owns this worker has been collected OR - # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_process_event.set() - - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - return - del executor - else: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] - - if result_item.exception: - work_item.future.set_exception(result_item.exception) - else: - work_item.future.set_result(result_item.result) - -class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): - """Initializes a new ProcessPoolExecutor instance. - - Args: - max_workers: The maximum number of processes that can be used to - execute the given calls. If None or not given then as many - worker processes will be created as the machine has processors. - """ - _remove_dead_thread_references() - - if max_workers is None: - self._max_workers = multiprocessing.cpu_count() - else: - self._max_workers = max_workers - - # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from idling. But don't make it too big - # because futures in the call queue cannot be cancelled. - self._call_queue = multiprocessing.Queue(self._max_workers + - EXTRA_QUEUED_CALLS) - self._result_queue = multiprocessing.Queue() - self._work_ids = queue.Queue() - self._queue_management_thread = None - self._processes = set() - - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_process_event = multiprocessing.Event() - self._shutdown_lock = threading.Lock() - self._queue_count = 0 - self._pending_work_items = {} - - def _start_queue_management_thread(self): - if self._queue_management_thread is None: - self._queue_management_thread = threading.Thread( - target=_queue_manangement_worker, - args=(weakref.ref(self), - self._processes, - self._pending_work_items, - self._work_ids, - self._call_queue, - self._result_queue, - self._shutdown_process_event)) - self._queue_management_thread.daemon = True - self._queue_management_thread.start() - _thread_references.add(weakref.ref(self._queue_management_thread)) - - def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_workers): - p = multiprocessing.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._shutdown_process_event)) - p.start() - self._processes.add(p) - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown_thread: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._pending_work_items[self._queue_count] = w - self._work_ids.put(self._queue_count) - self._queue_count += 1 - - self._start_queue_management_thread() - self._adjust_process_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown_thread = True - if wait: - if self._queue_management_thread: - self._queue_management_thread.join() - # To reduce the risk of openning too many files, remove references to - # objects that use file descriptors. - self._queue_management_thread = None - self._call_queue = None - self._result_queue = None - self._shutdown_process_event = None - self._processes = None - shutdown.__doc__ = _base.Executor.shutdown.__doc__ - -atexit.register(_python_exit) diff --git a/python3/futures/thread.py b/python3/futures/thread.py deleted file mode 100644 index a2d96bf..0000000 --- a/python3/futures/thread.py +++ /dev/null @@ -1,136 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ThreadPoolExecutor.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import atexit -from futures import _base -import queue -import threading -import weakref - -# Workers are created as daemon threads. This is done to allow the interpreter -# to exit when there are still idle threads in a ThreadPoolExecutor's thread -# pool (i.e. shutdown() was not called). However, allowing workers to die with -# the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads finish. - -_thread_references = set() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - -atexit.register(_python_exit) - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - - def run(self): - if not self.future.set_running_or_notify_cancel(): - return - - try: - result = self.fn(*self.args, **self.kwargs) - except BaseException as e: - self.future.set_exception(e) - else: - self.future.set_result(result) - -def _worker(executor_reference, work_queue): - try: - while True: - try: - work_item = work_queue.get(block=True, timeout=0.1) - except queue.Empty: - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - return - del executor - else: - work_item.run() - except BaseException as e: - _base.LOGGER.critical('Exception in worker', exc_info=True) - -class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers): - """Initializes a new ThreadPoolExecutor instance. - - Args: - max_workers: The maximum number of threads that can be used to - execute the given calls. - """ - _remove_dead_thread_references() - - self._max_workers = max_workers - self._work_queue = queue.Queue() - self._threads = set() - self._shutdown = False - self._shutdown_lock = threading.Lock() - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._work_queue.put(w) - self._adjust_thread_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def _adjust_thread_count(self): - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. - if len(self._threads) < self._max_workers: - t = threading.Thread(target=_worker, - args=(weakref.ref(self), self._work_queue)) - t.daemon = True - t.start() - self._threads.add(t) - _thread_references.add(weakref.ref(t)) - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown = True - if wait: - for t in self._threads: - t.join() - shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/python3/primes.py b/python3/primes.py deleted file mode 100644 index 5152fcb..0000000 --- a/python3/primes.py +++ /dev/null @@ -1,47 +0,0 @@ -import futures -import math -import time - -PRIMES = [ - 112272535095293, - 112582705942171, - 112272535095293, - 115280095190773, - 115797848077099, - 117450548693743, - 993960000099397] - -def is_prime(n): - if n % 2 == 0: - return False - - sqrt_n = int(math.floor(math.sqrt(n))) - for i in range(3, sqrt_n + 1, 2): - if n % i == 0: - return False - return True - -def sequential(): - return list(map(is_prime, PRIMES)) - -def with_process_pool_executor(): - with futures.ProcessPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def with_thread_pool_executor(): - with futures.ThreadPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def main(): - for name, fn in [('sequential', sequential), - ('processes', with_process_pool_executor), - ('threads', with_thread_pool_executor)]: - print('%s: ' % name.ljust(12), end='') - start = time.time() - if fn() != [True] * len(PRIMES): - print('failed') - else: - print('%.2f seconds' % (time.time() - start)) - -if __name__ == '__main__': - main() diff --git a/python3/setup.py b/python3/setup.py deleted file mode 100755 index fcd05f2..0000000 --- a/python3/setup.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python3 - -from distutils.core import setup - -setup(name='futures3', - version='1.0', - description='Java-style futures implementation in Python 3.x', - author='Brian Quinlan', - author_email='brian@sweetapp.com', - url='http://code.google.com/p/pythonfutures', - download_url='http://pypi.python.org/pypi/futures3/', - packages=['futures'], - license='BSD', - classifiers=['License :: OSI Approved :: BSD License', - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'Programming Language :: Python :: 3'] - ) diff --git a/python3/test_futures.py b/python3/test_futures.py deleted file mode 100644 index 98eea27..0000000 --- a/python3/test_futures.py +++ /dev/null @@ -1,819 +0,0 @@ -import io -import logging -import multiprocessing -import sys -import threading -import test.support -import time -import unittest - -if sys.platform.startswith('win'): - import ctypes - import ctypes.wintypes - -import futures -from futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, - LOGGER, STDERR_HANDLER, wait) -import futures.process - -def create_future(state=PENDING, exception=None, result=None): - f = Future() - f._state = state - f._exception = exception - f._result = result - return f - -PENDING_FUTURE = create_future(state=PENDING) -RUNNING_FUTURE = create_future(state=RUNNING) -CANCELLED_FUTURE = create_future(state=CANCELLED) -CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) -EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) -SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) - -def mul(x, y): - return x * y - -class Call(object): - """A call that can be submitted to a future.Executor for testing. - - The call signals when it is called and waits for an event before finishing. - """ - CALL_LOCKS = {} - def _create_event(self): - if sys.platform.startswith('win'): - class SECURITY_ATTRIBUTES(ctypes.Structure): - _fields_ = [("nLength", ctypes.wintypes.DWORD), - ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), - ("bInheritHandle", ctypes.wintypes.BOOL)] - - s = SECURITY_ATTRIBUTES() - s.nLength = ctypes.sizeof(s) - s.lpSecurityDescriptor = None - s.bInheritHandle = True - - handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), - True, - False, - None) - assert handle is not None - return handle - else: - event = multiprocessing.Event() - self.CALL_LOCKS[id(event)] = event - return id(event) - - def _wait_on_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) - assert r == 0 - else: - self.CALL_LOCKS[handle].wait() - - def _signal_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.SetEvent(handle) - assert r != 0 - else: - self.CALL_LOCKS[handle].set() - - def __init__(self, manual_finish=False, result=42): - self._called_event = self._create_event() - self._can_finish = self._create_event() - - self._result = result - - if not manual_finish: - self._signal_event(self._can_finish) - - def wait_on_called(self): - self._wait_on_event(self._called_event) - - def set_can(self): - self._signal_event(self._can_finish) - - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - - return self._result - - def close(self): - self.set_can() - if sys.platform.startswith('win'): - ctypes.windll.kernel32.CloseHandle(self._called_event) - ctypes.windll.kernel32.CloseHandle(self._can_finish) - else: - del self.CALL_LOCKS[self._called_event] - del self.CALL_LOCKS[self._can_finish] - -class ExceptionCall(Call): - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - raise ZeroDivisionError() - -class MapCall(Call): - def __init__(self, result=42): - super().__init__(manual_finish=True, result=result) - - def __call__(self, manual_finish): - if manual_finish: - super().__call__() - return self._result - -class ExecutorShutdownTest(unittest.TestCase): - def test_run_after_shutdown(self): - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.submit, - pow, 2, 5) - - - def _start_some_futures(self): - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - - try: - self.executor.submit(call1) - self.executor.submit(call2) - self.executor.submit(call3) - - call1.wait_on_called() - call2.wait_on_called() - call3.wait_on_called() - - call1.set_can() - call2.set_can() - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() - -class ThreadPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_threads_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._threads), 3) - self.executor.shutdown() - for t in self.executor._threads: - t.join() - - def test_context_manager_shutdown(self): - with futures.ThreadPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for t in executor._threads: - t.join() - - def test_del_shutdown(self): - executor = futures.ThreadPoolExecutor(max_workers=5) - executor.map(abs, range(-5, 5)) - threads = executor._threads - del executor - - for t in threads: - t.join() - -class ProcessPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_processes_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._processes), 5) - processes = self.executor._processes - self.executor.shutdown() - - for p in processes: - p.join() - - def test_context_manager_shutdown(self): - with futures.ProcessPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for p in self.executor._processes: - p.join() - - def test_del_shutdown(self): - executor = futures.ProcessPoolExecutor(max_workers=5) - list(executor.map(abs, range(-5, 5))) - queue_management_thread = executor._queue_management_thread - processes = executor._processes - del executor - - queue_management_thread.join() - for p in processes: - p.join() - -class WaitTests(unittest.TestCase): - def test_first_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - done, not_done = futures.wait( - [CANCELLED_FUTURE, future1, future2], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([future1]), done) - self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) - finally: - call1.close() - call2.close() - - def test_first_completed_one_already_completed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, future1], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_first_exception(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2, future3], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set([future3]), pending) - finally: - call1.close() - call2.close() - call3.close() - - def test_first_exception_some_already_complete(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = ExceptionCall(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1]), finished) - self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) - - - finally: - call1.close() - call2.close() - - def test_first_exception_one_already_failed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [EXCEPTION_FUTURE, future1], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([EXCEPTION_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_all_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set(), pending) - - - finally: - call1.close() - call2.close() - - def test_all_completed_some_already_completed(self): - def wait_test(): - while not future1._waiters: - pass - - future4.cancel() - call1.set_can() - call2.set_can() - call3.set_can() - - self.assertLessEqual( - futures.process.EXTRA_QUEUED_CALLS, - 1, - 'this test assumes that future4 will be cancelled before it is ' - 'queued to run - which might not be the case if ' - 'ProcessPoolExecutor is too aggresive in scheduling futures') - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - future4 = self.executor.submit(call4) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4]), - finished) - self.assertEquals(set(), pending) - finally: - call1.close() - call2.close() - call3.close() - call4.close() - - def test_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2], - timeout=1, - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1]), finished) - self.assertEquals(set([future2]), pending) - - - finally: - call1.close() - call2.close() - - -class ThreadPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class AsCompletedTests(unittest.TestCase): - # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. - def test_no_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - completed = set(futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2])) - self.assertEquals(set( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2]), - completed) - finally: - call1.close() - call2.close() - - def test_zero_timeout(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - completed_futures = set() - try: - for future in futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1], - timeout=0): - completed_futures.add(future) - except futures.TimeoutError: - pass - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]), - completed_futures) - finally: - call1.close() - -class ThreadPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ExecutorTest(unittest.TestCase): - # Executor.shutdown() and context manager usage is tested by - # ExecutorShutdownTest. - def test_submit(self): - future = self.executor.submit(pow, 2, 8) - self.assertEquals(256, future.result()) - - def test_submit_keyword(self): - future = self.executor.submit(mul, 2, y=8) - self.assertEquals(16, future.result()) - - def test_map(self): - self.assertEqual( - list(self.executor.map(pow, range(10), range(10))), - list(map(pow, range(10), range(10)))) - - def test_map_exception(self): - i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) - self.assertEqual(i.__next__(), (0, 1)) - self.assertEqual(i.__next__(), (0, 1)) - self.assertRaises(ZeroDivisionError, i.__next__) - - def test_map_timeout(self): - results = [] - timeout_call = MapCall() - try: - try: - for i in self.executor.map(timeout_call, - [False, False, True], - timeout=1): - results.append(i) - except futures.TimeoutError: - pass - else: - self.fail('expected TimeoutError') - finally: - timeout_call.close() - - self.assertEquals([42, 42], results) - -class ThreadPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class FutureTests(unittest.TestCase): - def test_done_callback_with_result(self): - callback_result = None - def fn(callback_future): - nonlocal callback_result - callback_result = callback_future.result() - - f = Future() - f.add_done_callback(fn) - f.set_result(5) - self.assertEquals(5, callback_result) - - def test_done_callback_with_exception(self): - callback_exception = None - def fn(callback_future): - nonlocal callback_exception - callback_exception = callback_future.exception() - - f = Future() - f.add_done_callback(fn) - f.set_exception(Exception('test')) - self.assertEquals(('test',), callback_exception.args) - - def test_done_callback_with_cancel(self): - was_cancelled = None - def fn(callback_future): - nonlocal was_cancelled - was_cancelled = callback_future.cancelled() - - f = Future() - f.add_done_callback(fn) - self.assertTrue(f.cancel()) - self.assertTrue(was_cancelled) - - def test_done_callback_raises(self): - LOGGER.removeHandler(STDERR_HANDLER) - logging_stream = io.StringIO() - handler = logging.StreamHandler(logging_stream) - LOGGER.addHandler(handler) - try: - raising_was_called = False - fn_was_called = False - - def raising_fn(callback_future): - nonlocal raising_was_called - raising_was_called = True - raise Exception('doh!') - - def fn(callback_future): - nonlocal fn_was_called - fn_was_called = True - - f = Future() - f.add_done_callback(raising_fn) - f.add_done_callback(fn) - f.set_result(5) - self.assertTrue(raising_was_called) - self.assertTrue(fn_was_called) - self.assertIn('Exception: doh!', logging_stream.getvalue()) - finally: - LOGGER.removeHandler(handler) - LOGGER.addHandler(STDERR_HANDLER) - - def test_done_callback_already_successful(self): - callback_result = None - def fn(callback_future): - nonlocal callback_result - callback_result = callback_future.result() - - f = Future() - f.set_result(5) - f.add_done_callback(fn) - self.assertEquals(5, callback_result) - - def test_done_callback_already_failed(self): - callback_exception = None - def fn(callback_future): - nonlocal callback_exception - callback_exception = callback_future.exception() - - f = Future() - f.set_exception(Exception('test')) - f.add_done_callback(fn) - self.assertEquals(('test',), callback_exception.args) - - def test_done_callback_already_cancelled(self): - was_cancelled = None - def fn(callback_future): - nonlocal was_cancelled - was_cancelled = callback_future.cancelled() - - f = Future() - self.assertTrue(f.cancel()) - f.add_done_callback(fn) - self.assertTrue(was_cancelled) - - def test_repr(self): - self.assertRegexpMatches(repr(PENDING_FUTURE), - '') - self.assertRegexpMatches(repr(RUNNING_FUTURE), - '') - self.assertRegexpMatches(repr(CANCELLED_FUTURE), - '') - self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE), - '') - self.assertRegexpMatches( - repr(EXCEPTION_FUTURE), - '') - self.assertRegexpMatches( - repr(SUCCESSFUL_FUTURE), - '') - - - def test_cancel(self): - f1 = create_future(state=PENDING) - f2 = create_future(state=RUNNING) - f3 = create_future(state=CANCELLED) - f4 = create_future(state=CANCELLED_AND_NOTIFIED) - f5 = create_future(state=FINISHED, exception=IOError()) - f6 = create_future(state=FINISHED, result=5) - - self.assertTrue(f1.cancel()) - self.assertEquals(f1._state, CANCELLED) - - self.assertFalse(f2.cancel()) - self.assertEquals(f2._state, RUNNING) - - self.assertTrue(f3.cancel()) - self.assertEquals(f3._state, CANCELLED) - - self.assertTrue(f4.cancel()) - self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) - - self.assertFalse(f5.cancel()) - self.assertEquals(f5._state, FINISHED) - - self.assertFalse(f6.cancel()) - self.assertEquals(f6._state, FINISHED) - - def test_cancelled(self): - self.assertFalse(PENDING_FUTURE.cancelled()) - self.assertFalse(RUNNING_FUTURE.cancelled()) - self.assertTrue(CANCELLED_FUTURE.cancelled()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) - self.assertFalse(EXCEPTION_FUTURE.cancelled()) - self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) - - def test_done(self): - self.assertFalse(PENDING_FUTURE.done()) - self.assertFalse(RUNNING_FUTURE.done()) - self.assertTrue(CANCELLED_FUTURE.done()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) - self.assertTrue(EXCEPTION_FUTURE.done()) - self.assertTrue(SUCCESSFUL_FUTURE.done()) - - def test_running(self): - self.assertFalse(PENDING_FUTURE.running()) - self.assertTrue(RUNNING_FUTURE.running()) - self.assertFalse(CANCELLED_FUTURE.running()) - self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) - self.assertFalse(EXCEPTION_FUTURE.running()) - self.assertFalse(SUCCESSFUL_FUTURE.running()) - - def test_result_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.result, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) - self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) - self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) - - def test_result_with_success(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.set_result(42) - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertEquals(f1.result(timeout=5), 42) - - def test_result_with_cancel(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.cancel() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertRaises(futures.CancelledError, f1.result, timeout=5) - - def test_exception_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.exception, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) - self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), - IOError)) - self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) - - def test_exception_with_success(self): - def notification(): - # Wait until the main thread is waiting for the exception. - time.sleep(1) - with f1._condition: - f1._state = FINISHED - f1._exception = IOError() - f1._condition.notify_all() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) - -def test_main(): - test.support.run_unittest(ProcessPoolExecutorTest, - ThreadPoolExecutorTest, - ProcessPoolWaitTests, - ThreadPoolWaitTests, - ProcessPoolAsCompletedTests, - ThreadPoolAsCompletedTests, - FutureTests, - ProcessPoolShutdownTest, - ThreadPoolShutdownTest) - -if __name__ == "__main__": - test_main() -- cgit v1.2.1