diff options
author | Darren Hague <d.hague@sap.com> | 2016-06-13 15:48:30 +0100 |
---|---|---|
committer | Darren Hague <d.hague@sap.com> | 2016-07-11 16:37:13 +0100 |
commit | c7cba1fe02e03744e5b45e54c84f3e971b450698 (patch) | |
tree | 52e4dde73718f1e077288ea82c15144a7e2f0139 /ceilometermiddleware/swift.py | |
parent | c962fedb851c4bdc36f43b22d55efc82ca1d0dfd (diff) | |
download | ceilometermiddleware-c7cba1fe02e03744e5b45e54c84f3e971b450698.tar.gz |
Add background thread notifier sending ability
Add ability to hand off notifier event sending to a queue processed by
a background thread so as not to block swift proxy. This fixes an
issue whereby if ceilometer's RabbitMQ went down then the swift proxy
would wait for it to come back up, effectively coupling Swift's
availabiilty to that of Ceilometer's RabbitMQ.
Background sending is activated by setting config item
'nonblocking_notify' to True.
Queue size defaults to 1000, which can be overridden by setting config
item 'send_queue_size'. If the queue is full, new events are discarded.
Change-Id: I3da2b88b2bc9b7fd8c572a0085fa1d78c4f54701
Diffstat (limited to 'ceilometermiddleware/swift.py')
-rw-r--r-- | ceilometermiddleware/swift.py | 66 |
1 files changed, 65 insertions, 1 deletions
diff --git a/ceilometermiddleware/swift.py b/ceilometermiddleware/swift.py index 411351a..f9c7605 100644 --- a/ceilometermiddleware/swift.py +++ b/ceilometermiddleware/swift.py @@ -37,6 +37,13 @@ before "proxy-server" and add the following filter in the file: topic = notifications # skip metering of requests from listed project ids ignore_projects = <proj_uuid>, <proj_uuid2> + # Whether to send events to messaging driver in a background thread + nonblocking_notify = False + # Queue size for sending notifications in background thread (0=unlimited). + # New notifications will be discarded if the queue is full. + send_queue_size = 1000 + # Logging level control + log_level = WARNING """ import functools import logging @@ -50,7 +57,9 @@ from pycadf import measurement as cadf_measurement from pycadf import metric as cadf_metric from pycadf import resource as cadf_resource import six +import six.moves.queue as queue import six.moves.urllib.parse as urlparse +import threading _LOG = logging.getLogger(__name__) @@ -99,6 +108,9 @@ class InputProxy(object): class Swift(object): """Swift middleware used for counting requests.""" + event_queue = None + threadLock = threading.Lock() + def __init__(self, app, conf): self._app = app self.ignore_projects = [ @@ -122,6 +134,27 @@ class Swift(object): if self.reseller_prefix and self.reseller_prefix[-1] != '_': self.reseller_prefix += '_' + _LOG.setLevel(getattr(logging, conf.get('log_level', 'WARNING'))) + + # NOTE: If the background thread's send queue fills up, the event will + # be discarded + # + # For backward compatibility we default to False and therefore wait for + # sending to complete. This causes swift proxy to hang if the + # destination is unavailable. + self.nonblocking_notify = conf.get('nonblocking_notify', False) + + # Initialize the sending queue and thread, but only once + if self.nonblocking_notify and Swift.event_queue is None: + Swift.threadLock.acquire() + if Swift.event_queue is None: + send_queue_size = int(conf.get('send_queue_size', 1000)) + Swift.event_queue = queue.Queue(send_queue_size) + Swift.event_sender = SendEventThread(self._notifier) + Swift.event_sender.start() + _LOG.debug('Started sender thread') + Swift.threadLock.release() + def __call__(self, env, start_response): start_response_args = [None] input_proxy = InputProxy(env['wsgi.input']) @@ -244,7 +277,38 @@ class Swift(object): metric=cadf_metric.Metric( name='storage.objects.outgoing.bytes', unit='B'))) - self._notifier.info({}, 'objectstore.http.request', event.as_dict()) + if self.nonblocking_notify: + try: + Swift.event_queue.put(event, False) + _LOG.debug('Event %s added to send queue', event.id) + except queue.Full: + _LOG.warning('Send queue FULL: Event %s not added', event.id) + else: + Swift.send_notification(self._notifier, event) + + @staticmethod + def send_notification(notifier, event): + notifier.info({}, 'objectstore.http.request', event.as_dict()) + + +class SendEventThread(threading.Thread): + + def __init__(self, notifier): + super(SendEventThread, self).__init__() + self.notifier = notifier + self.daemon = True + + def run(self): + """Send events without blocking swift proxy.""" + while True: + try: + _LOG.debug('Wait for event from send queue') + event = Swift.event_queue.get() + _LOG.debug('Got event %s from queue - now send it', event.id) + Swift.send_notification(self.notifier, event) + _LOG.debug('Event %s sent.', event.id) + except Exception: + _LOG.exception("SendEventThread loop exception") def filter_factory(global_conf, **local_conf): |