summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDarren Hague <d.hague@sap.com>2016-06-13 15:48:30 +0100
committerDarren Hague <d.hague@sap.com>2016-07-11 16:37:13 +0100
commitc7cba1fe02e03744e5b45e54c84f3e971b450698 (patch)
tree52e4dde73718f1e077288ea82c15144a7e2f0139
parentc962fedb851c4bdc36f43b22d55efc82ca1d0dfd (diff)
downloadceilometermiddleware-c7cba1fe02e03744e5b45e54c84f3e971b450698.tar.gz
Add background thread notifier sending ability
Add ability to hand off notifier event sending to a queue processed by a background thread so as not to block swift proxy. This fixes an issue whereby if ceilometer's RabbitMQ went down then the swift proxy would wait for it to come back up, effectively coupling Swift's availabiilty to that of Ceilometer's RabbitMQ. Background sending is activated by setting config item 'nonblocking_notify' to True. Queue size defaults to 1000, which can be overridden by setting config item 'send_queue_size'. If the queue is full, new events are discarded. Change-Id: I3da2b88b2bc9b7fd8c572a0085fa1d78c4f54701
-rw-r--r--ceilometermiddleware/swift.py66
-rw-r--r--ceilometermiddleware/tests/test_swift.py26
2 files changed, 91 insertions, 1 deletions
diff --git a/ceilometermiddleware/swift.py b/ceilometermiddleware/swift.py
index 411351a..f9c7605 100644
--- a/ceilometermiddleware/swift.py
+++ b/ceilometermiddleware/swift.py
@@ -37,6 +37,13 @@ before "proxy-server" and add the following filter in the file:
topic = notifications
# skip metering of requests from listed project ids
ignore_projects = <proj_uuid>, <proj_uuid2>
+ # Whether to send events to messaging driver in a background thread
+ nonblocking_notify = False
+ # Queue size for sending notifications in background thread (0=unlimited).
+ # New notifications will be discarded if the queue is full.
+ send_queue_size = 1000
+ # Logging level control
+ log_level = WARNING
"""
import functools
import logging
@@ -50,7 +57,9 @@ from pycadf import measurement as cadf_measurement
from pycadf import metric as cadf_metric
from pycadf import resource as cadf_resource
import six
+import six.moves.queue as queue
import six.moves.urllib.parse as urlparse
+import threading
_LOG = logging.getLogger(__name__)
@@ -99,6 +108,9 @@ class InputProxy(object):
class Swift(object):
"""Swift middleware used for counting requests."""
+ event_queue = None
+ threadLock = threading.Lock()
+
def __init__(self, app, conf):
self._app = app
self.ignore_projects = [
@@ -122,6 +134,27 @@ class Swift(object):
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
self.reseller_prefix += '_'
+ _LOG.setLevel(getattr(logging, conf.get('log_level', 'WARNING')))
+
+ # NOTE: If the background thread's send queue fills up, the event will
+ # be discarded
+ #
+ # For backward compatibility we default to False and therefore wait for
+ # sending to complete. This causes swift proxy to hang if the
+ # destination is unavailable.
+ self.nonblocking_notify = conf.get('nonblocking_notify', False)
+
+ # Initialize the sending queue and thread, but only once
+ if self.nonblocking_notify and Swift.event_queue is None:
+ Swift.threadLock.acquire()
+ if Swift.event_queue is None:
+ send_queue_size = int(conf.get('send_queue_size', 1000))
+ Swift.event_queue = queue.Queue(send_queue_size)
+ Swift.event_sender = SendEventThread(self._notifier)
+ Swift.event_sender.start()
+ _LOG.debug('Started sender thread')
+ Swift.threadLock.release()
+
def __call__(self, env, start_response):
start_response_args = [None]
input_proxy = InputProxy(env['wsgi.input'])
@@ -244,7 +277,38 @@ class Swift(object):
metric=cadf_metric.Metric(
name='storage.objects.outgoing.bytes', unit='B')))
- self._notifier.info({}, 'objectstore.http.request', event.as_dict())
+ if self.nonblocking_notify:
+ try:
+ Swift.event_queue.put(event, False)
+ _LOG.debug('Event %s added to send queue', event.id)
+ except queue.Full:
+ _LOG.warning('Send queue FULL: Event %s not added', event.id)
+ else:
+ Swift.send_notification(self._notifier, event)
+
+ @staticmethod
+ def send_notification(notifier, event):
+ notifier.info({}, 'objectstore.http.request', event.as_dict())
+
+
+class SendEventThread(threading.Thread):
+
+ def __init__(self, notifier):
+ super(SendEventThread, self).__init__()
+ self.notifier = notifier
+ self.daemon = True
+
+ def run(self):
+ """Send events without blocking swift proxy."""
+ while True:
+ try:
+ _LOG.debug('Wait for event from send queue')
+ event = Swift.event_queue.get()
+ _LOG.debug('Got event %s from queue - now send it', event.id)
+ Swift.send_notification(self.notifier, event)
+ _LOG.debug('Event %s sent.', event.id)
+ except Exception:
+ _LOG.exception("SendEventThread loop exception")
def filter_factory(global_conf, **local_conf):
diff --git a/ceilometermiddleware/tests/test_swift.py b/ceilometermiddleware/tests/test_swift.py
index 2e7287a..4c0f43e 100644
--- a/ceilometermiddleware/tests/test_swift.py
+++ b/ceilometermiddleware/tests/test_swift.py
@@ -20,6 +20,7 @@ import six
from ceilometermiddleware import swift
from ceilometermiddleware.tests import base as tests_base
+from threading import Event
class FakeApp(object):
@@ -90,6 +91,31 @@ class TestSwift(tests_base.TestCase):
self.assertEqual('obj', metadata['object'])
self.assertEqual('get', data[2]['target']['action'])
+ def test_get_background(self):
+ notified = Event()
+ app = swift.Swift(FakeApp(),
+ {"nonblocking_notify": "True",
+ "send_queue_size": "1"})
+ req = FakeRequest('/1.0/account/container/obj',
+ environ={'REQUEST_METHOD': 'GET'})
+ with mock.patch('oslo_messaging.Notifier.info',
+ side_effect=lambda *args, **kwargs: notified.set()
+ ) as notify:
+ resp = app(req.environ, self.start_response)
+ self.assertEqual(["This string is 28 bytes long"], list(resp))
+ notified.wait()
+ self.assertEqual(1, len(notify.call_args_list))
+ data = notify.call_args_list[0][0]
+ self.assertEqual('objectstore.http.request', data[1])
+ self.assertEqual(28, data[2]['measurements'][0]['result'])
+ self.assertEqual('storage.objects.outgoing.bytes',
+ data[2]['measurements'][0]['metric']['name'])
+ metadata = data[2]['target']['metadata']
+ self.assertEqual('1.0', metadata['version'])
+ self.assertEqual('container', metadata['container'])
+ self.assertEqual('obj', metadata['object'])
+ self.assertEqual('get', data[2]['target']['action'])
+
def test_put(self):
app = swift.Swift(FakeApp(body=['']), {})
req = FakeRequest(