summaryrefslogtreecommitdiff
path: root/swiftclient/multithreading.py
diff options
context:
space:
mode:
authorJoel Wright <joel.wright@sohonet.com>2014-04-04 21:13:01 +0200
committerJoel Wright <joel.wright@sohonet.com>2014-08-26 14:14:21 +0200
commit24673f8d19fe2f48964f528369081c37e880ec47 (patch)
tree8cd0cabfc9b8d858339da556fe561674dd8bc83a /swiftclient/multithreading.py
parentd97ec374cb1ef91c34e49302842e5a151ee3e476 (diff)
downloadpython-swiftclient-24673f8d19fe2f48964f528369081c37e880ec47.tar.gz
Add importable SwiftService incorporating shell.py logic
This patch adds a SwiftService class that incorporates the high level logic from swiftclient/shell.py. It also ports shell.py to use the new class, and updates the code in swiftclient/multithreading.py to allow the SwiftService to be used for multiple operations whilst using only one thread pool. Currently, code that imports swiftclient has to have its own logic for things like creating large objects, parallel uploads, and parallel downloads. This patch adds a SwiftService class that makes that functionality available in Python code as well as through the shell. Change-Id: I08c5796b4c01001d79fd571651c3017c16462ffd Implements: blueprint bin-swift-logic-as-importable-library
Diffstat (limited to 'swiftclient/multithreading.py')
-rw-r--r--swiftclient/multithreading.py286
1 files changed, 88 insertions, 198 deletions
diff --git a/swiftclient/multithreading.py b/swiftclient/multithreading.py
index d187091..a2dcd71 100644
--- a/swiftclient/multithreading.py
+++ b/swiftclient/multithreading.py
@@ -15,171 +15,21 @@
from __future__ import print_function
-from itertools import chain
import six
import sys
-from time import sleep
-from six.moves.queue import Queue
-from threading import Thread
-from traceback import format_exception
-from swiftclient.exceptions import ClientException
+from concurrent.futures import ThreadPoolExecutor
+from six.moves.queue import PriorityQueue
-class StopWorkerThreadSignal(object):
- pass
-
-
-class QueueFunctionThread(Thread):
- """
- Calls `func`` for each item in ``queue``; ``func`` is called with a
- de-queued item as the first arg followed by ``*args`` and ``**kwargs``.
-
- Any exceptions raised by ``func`` are stored in :attr:`self.exc_infos`.
-
- If the optional kwarg ``store_results`` is specified, it must be a list and
- each result of invoking ``func`` will be appended to that list.
-
- Putting a :class:`StopWorkerThreadSignal` instance into queue will cause
- this thread to exit.
- """
-
- def __init__(self, queue, func, *args, **kwargs):
- """
- :param queue: A :class:`Queue` object from which work jobs will be
- pulled.
- :param func: A callable which will be invoked with a dequeued item
- followed by ``*args`` and ``**kwargs``.
- :param \*args: Optional positional arguments for ``func``.
- :param \*\*kwargs: Optional kwargs for func. If the kwarg
- ``store_results`` is specified, its value must be a
- list, and every result from invoking ``func`` will
- be appended to the supplied list. The kwarg
- ``store_results`` will not be passed into ``func``.
- """
- Thread.__init__(self)
- self.queue = queue
- self.func = func
- self.args = args
- self.kwargs = kwargs
- self.exc_infos = []
- self.store_results = kwargs.pop('store_results', None)
-
- def run(self):
- while True:
- item = self.queue.get()
- if isinstance(item, StopWorkerThreadSignal):
- break
- try:
- result = self.func(item, *self.args, **self.kwargs)
- if self.store_results is not None:
- self.store_results.append(result)
- except Exception:
- self.exc_infos.append(sys.exc_info())
-
-
-class QueueFunctionManager(object):
+class OutputManager(object):
"""
- A context manager to handle the life-cycle of a single :class:`Queue`
- and a list of associated :class:`QueueFunctionThread` instances.
-
- This class is not usually instantiated directly. Instead, call the
- :meth:`MultiThreadingManager.queue_manager` object method,
- which will return an instance of this class.
-
- When entering the context, ``thread_count`` :class:`QueueFunctionThread`
- instances are created and started. The input queue is returned. Inside
- the context, any work item put into the queue will get worked on by one of
- the :class:`QueueFunctionThread` instances.
-
- When the context is exited, all threads are sent a
- :class:`StopWorkerThreadSignal` instance and then all threads are waited
- upon. Finally, any exceptions from any of the threads are reported on via
- the supplied ``thread_manager``'s :meth:`error` method. If an
- ``error_counter`` list was supplied on instantiation, its first element is
- incremented once for every exception which occurred.
- """
-
- def __init__(self, func, thread_count, thread_manager, thread_args=None,
- thread_kwargs=None, error_counter=None,
- connection_maker=None):
- """
- :param func: The worker function which will be passed into each
- :class:`QueueFunctionThread`'s constructor.
- :param thread_count: The number of worker threads to run.
- :param thread_manager: An instance of :class:`MultiThreadingManager`.
- :param thread_args: Optional positional arguments to be passed into
- each invocation of ``func`` after the de-queued
- work item.
- :param thread_kwargs: Optional keyword arguments to be passed into each
- invocation of ``func``. If a list is supplied as
- the ``store_results`` keyword argument, it will
- be filled with every result of invoking ``func``
- in all threads.
- :param error_counter: Optional list containing one integer. If
- supplied, the list's first element will be
- incremented once for each exception in any
- thread. This happens only when exiting the
- context.
- :param connection_maker: Optional callable. If supplied, this callable
- will be invoked once per created thread, and
- the result will be passed into func after the
- de-queued work item but before ``thread_args``
- and ``thread_kwargs``. This is used to ensure
- each thread has its own connection to Swift.
- """
- self.func = func
- self.thread_count = thread_count
- self.thread_manager = thread_manager
- self.error_counter = error_counter
- self.connection_maker = connection_maker
- self.queue = Queue(10000)
- self.thread_list = []
- self.thread_args = thread_args if thread_args else ()
- self.thread_kwargs = thread_kwargs if thread_kwargs else {}
-
- def __enter__(self):
- for _junk in range(self.thread_count):
- if self.connection_maker:
- thread_args = (self.connection_maker(),) + self.thread_args
- else:
- thread_args = self.thread_args
- qf_thread = QueueFunctionThread(self.queue, self.func,
- *thread_args, **self.thread_kwargs)
- qf_thread.start()
- self.thread_list.append(qf_thread)
- return self.queue
-
- def __exit__(self, exc_type, exc_value, traceback):
- for thread in [t for t in self.thread_list if t.isAlive()]:
- self.queue.put(StopWorkerThreadSignal())
-
- while any(map(QueueFunctionThread.is_alive, self.thread_list)):
- sleep(0.05)
-
- for thread in self.thread_list:
- for info in thread.exc_infos:
- if self.error_counter:
- self.error_counter[0] += 1
- if isinstance(info[1], ClientException):
- self.thread_manager.error(str(info[1]))
- else:
- self.thread_manager.error(''.join(format_exception(*info)))
-
-
-class MultiThreadingManager(object):
- """
- One object to manage context for multi-threading. This should make
- bin/swift less error-prone and allow us to test this code.
+ One object to manage and provide helper functions for output.
This object is a context manager and returns itself into the context. When
entering the context, two printing threads are created (see below) and they
are waited on and cleaned up when exiting the context.
- A convenience method, :meth:`queue_manager`, is provided to create a
- :class:`QueueFunctionManager` context manager (a thread-pool with an
- associated input queue for work items).
-
Also, thread-safe printing to two streams is provided. The
:meth:`print_msg` method will print to the supplied ``print_stream``
(defaults to ``sys.stdout``) and the :meth:`error` method will print to the
@@ -198,39 +48,29 @@ class MultiThreadingManager(object):
def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr):
"""
:param print_stream: The stream to which :meth:`print_msg` sends
- formatted messages
+ formatted messages.
:param error_stream: The stream to which :meth:`error` sends formatted
- messages
+ messages.
On Python 2, Unicode messages are encoded to utf8.
"""
self.print_stream = print_stream
- self.printer = QueueFunctionManager(self._print, 1, self)
+ self.print_pool = ThreadPoolExecutor(max_workers=1)
self.error_stream = error_stream
- self.error_printer = QueueFunctionManager(self._print_error, 1, self)
+ self.error_print_pool = ThreadPoolExecutor(max_workers=1)
self.error_count = 0
def __enter__(self):
- self.printer.__enter__()
- self.error_printer.__enter__()
return self
def __exit__(self, exc_type, exc_value, traceback):
- self.error_printer.__exit__(exc_type, exc_value, traceback)
- self.printer.__exit__(exc_type, exc_value, traceback)
-
- def queue_manager(self, func, thread_count, *args, **kwargs):
- connection_maker = kwargs.pop('connection_maker', None)
- error_counter = kwargs.pop('error_counter', None)
- return QueueFunctionManager(func, thread_count, self, thread_args=args,
- thread_kwargs=kwargs,
- connection_maker=connection_maker,
- error_counter=error_counter)
+ self.error_print_pool.__exit__(exc_type, exc_value, traceback)
+ self.print_pool.__exit__(exc_type, exc_value, traceback)
def print_msg(self, msg, *fmt_args):
if fmt_args:
msg = msg % fmt_args
- self.printer.queue.put(msg)
+ self.print_pool.submit(self._print, msg)
def print_items(self, items, offset=DEFAULT_OFFSET, skip_missing=False):
lines = []
@@ -241,36 +81,10 @@ class MultiThreadingManager(object):
lines.append((template % (k, v)).rstrip())
self.print_msg('\n'.join(lines))
- def print_headers(self, headers, meta_prefix='', exclude_headers=None,
- offset=DEFAULT_OFFSET):
- exclude_headers = exclude_headers or []
- meta_headers = []
- other_headers = []
- template = '%%%ds: %%s' % offset
- for key, value in headers.items():
- if key.startswith(meta_prefix):
- meta_key = 'Meta %s' % key[len(meta_prefix):].title()
- meta_headers.append(template % (meta_key, value))
- elif key not in exclude_headers:
- other_headers.append(template % (key.title(), value))
- self.print_msg('\n'.join(chain(meta_headers, other_headers)))
-
- def headers_to_items(self, headers, meta_prefix='', exclude_headers=None):
- exclude_headers = exclude_headers or []
- meta_items = []
- other_items = []
- for key, value in headers.items():
- if key.startswith(meta_prefix):
- meta_key = 'Meta %s' % key[len(meta_prefix):].title()
- meta_items.append((meta_key, value))
- elif key not in exclude_headers:
- other_items.append((key.title(), value))
- return meta_items + other_items
-
def error(self, msg, *fmt_args):
if fmt_args:
msg = msg % fmt_args
- self.error_printer.queue.put(msg)
+ self.error_print_pool.submit(self._print_error, msg)
def _print(self, item, stream=None):
if stream is None:
@@ -282,3 +96,79 @@ class MultiThreadingManager(object):
def _print_error(self, item):
self.error_count += 1
return self._print(item, stream=self.error_stream)
+
+
+class MultiThreadingManager(object):
+ """
+ One object to manage context for multi-threading. This should make
+ bin/swift less error-prone and allow us to test this code.
+ """
+
+ def __init__(self, create_connection, segment_threads=10,
+ object_dd_threads=10, object_uu_threads=10,
+ container_threads=10):
+
+ """
+ :param segment_threads: The number of threads allocated to segment
+ uploads
+ :param object_dd_threads: The number of threads allocated to object
+ download/delete jobs
+ :param object_uu_threads: The number of threads allocated to object
+ upload/update based jobs
+ :param container_threads: The number of threads allocated to
+ container/account level jobs
+ """
+ self.segment_pool = ConnectionThreadPoolExecutor(
+ create_connection, max_workers=segment_threads)
+ self.object_dd_pool = ConnectionThreadPoolExecutor(
+ create_connection, max_workers=object_dd_threads)
+ self.object_uu_pool = ConnectionThreadPoolExecutor(
+ create_connection, max_workers=object_uu_threads)
+ self.container_pool = ConnectionThreadPoolExecutor(
+ create_connection, max_workers=container_threads)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.segment_pool.__exit__(exc_type, exc_value, traceback)
+ self.object_dd_pool.__exit__(exc_type, exc_value, traceback)
+ self.object_uu_pool.__exit__(exc_type, exc_value, traceback)
+ self.container_pool.__exit__(exc_type, exc_value, traceback)
+
+
+class ConnectionThreadPoolExecutor(ThreadPoolExecutor):
+ """
+ A wrapper class to maintain a pool of connections alongside the thread
+ pool. We start by creating a priority queue of connections, and each job
+ submitted takes one of those connections (initialising if necessary) and
+ passes it as the first arg to the executed function.
+
+ At the end of execution that connection is returned to the queue.
+
+ By using a PriorityQueue we avoid creating more connections than required.
+ We will only create as many connections as are required concurrently.
+ """
+ def __init__(self, create_connection, max_workers):
+ self._connections = PriorityQueue()
+ self._create_connection = create_connection
+ for p in range(0, max_workers):
+ self._connections.put((p, None))
+ super(ConnectionThreadPoolExecutor, self).__init__(max_workers)
+
+ def submit(self, fn, *args, **kwargs):
+ def conn_fn():
+ priority = None
+ conn = None
+ try:
+ # If we get a connection we must put it back later
+ (priority, conn) = self._connections.get()
+ if conn is None:
+ conn = self._create_connection()
+ conn_args = (conn,) + args
+ return fn(*conn_args, **kwargs)
+ finally:
+ if priority is not None:
+ self._connections.put((priority, conn))
+
+ return super(ConnectionThreadPoolExecutor, self).submit(conn_fn)