summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-07-13 09:54:25 +0000
committerGerrit Code Review <review@openstack.org>2016-07-13 09:54:25 +0000
commitbf17fdab166d52a8e22bf2e4aacd108610d46306 (patch)
treec0f74cf46a02409359e5df0ca57beee1efcf090b
parentab8c985588786cf0279a17bf052fff9b99617684 (diff)
parentc7cba1fe02e03744e5b45e54c84f3e971b450698 (diff)
downloadceilometermiddleware-bf17fdab166d52a8e22bf2e4aacd108610d46306.tar.gz
Merge "Add background thread notifier sending ability"
-rw-r--r--ceilometermiddleware/swift.py66
-rw-r--r--ceilometermiddleware/tests/test_swift.py26
2 files changed, 91 insertions, 1 deletions
diff --git a/ceilometermiddleware/swift.py b/ceilometermiddleware/swift.py
index 411351a..f9c7605 100644
--- a/ceilometermiddleware/swift.py
+++ b/ceilometermiddleware/swift.py
@@ -37,6 +37,13 @@ before "proxy-server" and add the following filter in the file:
topic = notifications
# skip metering of requests from listed project ids
ignore_projects = <proj_uuid>, <proj_uuid2>
+ # Whether to send events to messaging driver in a background thread
+ nonblocking_notify = False
+ # Queue size for sending notifications in background thread (0=unlimited).
+ # New notifications will be discarded if the queue is full.
+ send_queue_size = 1000
+ # Logging level control
+ log_level = WARNING
"""
import functools
import logging
@@ -50,7 +57,9 @@ from pycadf import measurement as cadf_measurement
from pycadf import metric as cadf_metric
from pycadf import resource as cadf_resource
import six
+import six.moves.queue as queue
import six.moves.urllib.parse as urlparse
+import threading
_LOG = logging.getLogger(__name__)
@@ -99,6 +108,9 @@ class InputProxy(object):
class Swift(object):
"""Swift middleware used for counting requests."""
+ event_queue = None
+ threadLock = threading.Lock()
+
def __init__(self, app, conf):
self._app = app
self.ignore_projects = [
@@ -122,6 +134,27 @@ class Swift(object):
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
self.reseller_prefix += '_'
+ _LOG.setLevel(getattr(logging, conf.get('log_level', 'WARNING')))
+
+ # NOTE: If the background thread's send queue fills up, the event will
+ # be discarded
+ #
+ # For backward compatibility we default to False and therefore wait for
+ # sending to complete. This causes swift proxy to hang if the
+ # destination is unavailable.
+ self.nonblocking_notify = conf.get('nonblocking_notify', False)
+
+ # Initialize the sending queue and thread, but only once
+ if self.nonblocking_notify and Swift.event_queue is None:
+ Swift.threadLock.acquire()
+ if Swift.event_queue is None:
+ send_queue_size = int(conf.get('send_queue_size', 1000))
+ Swift.event_queue = queue.Queue(send_queue_size)
+ Swift.event_sender = SendEventThread(self._notifier)
+ Swift.event_sender.start()
+ _LOG.debug('Started sender thread')
+ Swift.threadLock.release()
+
def __call__(self, env, start_response):
start_response_args = [None]
input_proxy = InputProxy(env['wsgi.input'])
@@ -244,7 +277,38 @@ class Swift(object):
metric=cadf_metric.Metric(
name='storage.objects.outgoing.bytes', unit='B')))
- self._notifier.info({}, 'objectstore.http.request', event.as_dict())
+ if self.nonblocking_notify:
+ try:
+ Swift.event_queue.put(event, False)
+ _LOG.debug('Event %s added to send queue', event.id)
+ except queue.Full:
+ _LOG.warning('Send queue FULL: Event %s not added', event.id)
+ else:
+ Swift.send_notification(self._notifier, event)
+
+ @staticmethod
+ def send_notification(notifier, event):
+ notifier.info({}, 'objectstore.http.request', event.as_dict())
+
+
+class SendEventThread(threading.Thread):
+
+ def __init__(self, notifier):
+ super(SendEventThread, self).__init__()
+ self.notifier = notifier
+ self.daemon = True
+
+ def run(self):
+ """Send events without blocking swift proxy."""
+ while True:
+ try:
+ _LOG.debug('Wait for event from send queue')
+ event = Swift.event_queue.get()
+ _LOG.debug('Got event %s from queue - now send it', event.id)
+ Swift.send_notification(self.notifier, event)
+ _LOG.debug('Event %s sent.', event.id)
+ except Exception:
+ _LOG.exception("SendEventThread loop exception")
def filter_factory(global_conf, **local_conf):
diff --git a/ceilometermiddleware/tests/test_swift.py b/ceilometermiddleware/tests/test_swift.py
index 2e7287a..4c0f43e 100644
--- a/ceilometermiddleware/tests/test_swift.py
+++ b/ceilometermiddleware/tests/test_swift.py
@@ -20,6 +20,7 @@ import six
from ceilometermiddleware import swift
from ceilometermiddleware.tests import base as tests_base
+from threading import Event
class FakeApp(object):
@@ -90,6 +91,31 @@ class TestSwift(tests_base.TestCase):
self.assertEqual('obj', metadata['object'])
self.assertEqual('get', data[2]['target']['action'])
+ def test_get_background(self):
+ notified = Event()
+ app = swift.Swift(FakeApp(),
+ {"nonblocking_notify": "True",
+ "send_queue_size": "1"})
+ req = FakeRequest('/1.0/account/container/obj',
+ environ={'REQUEST_METHOD': 'GET'})
+ with mock.patch('oslo_messaging.Notifier.info',
+ side_effect=lambda *args, **kwargs: notified.set()
+ ) as notify:
+ resp = app(req.environ, self.start_response)
+ self.assertEqual(["This string is 28 bytes long"], list(resp))
+ notified.wait()
+ self.assertEqual(1, len(notify.call_args_list))
+ data = notify.call_args_list[0][0]
+ self.assertEqual('objectstore.http.request', data[1])
+ self.assertEqual(28, data[2]['measurements'][0]['result'])
+ self.assertEqual('storage.objects.outgoing.bytes',
+ data[2]['measurements'][0]['metric']['name'])
+ metadata = data[2]['target']['metadata']
+ self.assertEqual('1.0', metadata['version'])
+ self.assertEqual('container', metadata['container'])
+ self.assertEqual('obj', metadata['object'])
+ self.assertEqual('get', data[2]['target']['action'])
+
def test_put(self):
app = swift.Swift(FakeApp(body=['']), {})
req = FakeRequest(