summaryrefslogtreecommitdiff
path: root/ceilometermiddleware/swift.py
diff options
context:
space:
mode:
Diffstat (limited to 'ceilometermiddleware/swift.py')
-rw-r--r--ceilometermiddleware/swift.py66
1 files changed, 65 insertions, 1 deletions
diff --git a/ceilometermiddleware/swift.py b/ceilometermiddleware/swift.py
index 411351a..f9c7605 100644
--- a/ceilometermiddleware/swift.py
+++ b/ceilometermiddleware/swift.py
@@ -37,6 +37,13 @@ before "proxy-server" and add the following filter in the file:
topic = notifications
# skip metering of requests from listed project ids
ignore_projects = <proj_uuid>, <proj_uuid2>
+ # Whether to send events to messaging driver in a background thread
+ nonblocking_notify = False
+ # Queue size for sending notifications in background thread (0=unlimited).
+ # New notifications will be discarded if the queue is full.
+ send_queue_size = 1000
+ # Logging level control
+ log_level = WARNING
"""
import functools
import logging
@@ -50,7 +57,9 @@ from pycadf import measurement as cadf_measurement
from pycadf import metric as cadf_metric
from pycadf import resource as cadf_resource
import six
+import six.moves.queue as queue
import six.moves.urllib.parse as urlparse
+import threading
_LOG = logging.getLogger(__name__)
@@ -99,6 +108,9 @@ class InputProxy(object):
class Swift(object):
"""Swift middleware used for counting requests."""
+ event_queue = None
+ threadLock = threading.Lock()
+
def __init__(self, app, conf):
self._app = app
self.ignore_projects = [
@@ -122,6 +134,27 @@ class Swift(object):
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
self.reseller_prefix += '_'
+ _LOG.setLevel(getattr(logging, conf.get('log_level', 'WARNING')))
+
+ # NOTE: If the background thread's send queue fills up, the event will
+ # be discarded
+ #
+ # For backward compatibility we default to False and therefore wait for
+ # sending to complete. This causes swift proxy to hang if the
+ # destination is unavailable.
+ self.nonblocking_notify = conf.get('nonblocking_notify', False)
+
+ # Initialize the sending queue and thread, but only once
+ if self.nonblocking_notify and Swift.event_queue is None:
+ Swift.threadLock.acquire()
+ if Swift.event_queue is None:
+ send_queue_size = int(conf.get('send_queue_size', 1000))
+ Swift.event_queue = queue.Queue(send_queue_size)
+ Swift.event_sender = SendEventThread(self._notifier)
+ Swift.event_sender.start()
+ _LOG.debug('Started sender thread')
+ Swift.threadLock.release()
+
def __call__(self, env, start_response):
start_response_args = [None]
input_proxy = InputProxy(env['wsgi.input'])
@@ -244,7 +277,38 @@ class Swift(object):
metric=cadf_metric.Metric(
name='storage.objects.outgoing.bytes', unit='B')))
- self._notifier.info({}, 'objectstore.http.request', event.as_dict())
+ if self.nonblocking_notify:
+ try:
+ Swift.event_queue.put(event, False)
+ _LOG.debug('Event %s added to send queue', event.id)
+ except queue.Full:
+ _LOG.warning('Send queue FULL: Event %s not added', event.id)
+ else:
+ Swift.send_notification(self._notifier, event)
+
+ @staticmethod
+ def send_notification(notifier, event):
+ notifier.info({}, 'objectstore.http.request', event.as_dict())
+
+
+class SendEventThread(threading.Thread):
+
+ def __init__(self, notifier):
+ super(SendEventThread, self).__init__()
+ self.notifier = notifier
+ self.daemon = True
+
+ def run(self):
+ """Send events without blocking swift proxy."""
+ while True:
+ try:
+ _LOG.debug('Wait for event from send queue')
+ event = Swift.event_queue.get()
+ _LOG.debug('Got event %s from queue - now send it', event.id)
+ Swift.send_notification(self.notifier, event)
+ _LOG.debug('Event %s sent.', event.id)
+ except Exception:
+ _LOG.exception("SendEventThread loop exception")
def filter_factory(global_conf, **local_conf):