summaryrefslogtreecommitdiff
path: root/tools/simulator.py
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-12-11 04:16:57 +0000
committerGerrit Code Review <review@openstack.org>2015-12-11 04:16:57 +0000
commit213176657d55a65ecd2dfebc7320651f5442761c (patch)
tree6eda9c30c6bbbf11a175fc7509452a4044b61377 /tools/simulator.py
parent4b6144a3db4b2d10b0b4fa7e437ed264e7460df1 (diff)
parent4dd644ac201ee0fe247d648a2f735998416bf2c7 (diff)
downloadoslo-messaging-213176657d55a65ecd2dfebc7320651f5442761c.tar.gz
Merge "batch notification listener"
Diffstat (limited to 'tools/simulator.py')
-rwxr-xr-xtools/simulator.py97
1 files changed, 69 insertions, 28 deletions
diff --git a/tools/simulator.py b/tools/simulator.py
index 03afe08..c3f9415 100755
--- a/tools/simulator.py
+++ b/tools/simulator.py
@@ -79,14 +79,34 @@ class LoggingNoParsingFilter(logging.Filter):
return True
-class NotifyEndpoint(object):
- def __init__(self):
+class Monitor(object):
+ def __init__(self, show_stats=False, *args, **kwargs):
+ self._count = self._prev_count = 0
+ self.show_stats = show_stats
+ if self.show_stats:
+ self._monitor()
+
+ def _monitor(self):
+ threading.Timer(1.0, self._monitor).start()
+ print ("%d msg was received per second"
+ % (self._count - self._prev_count))
+ self._prev_count = self._count
+
+ def info(self, *args, **kwargs):
+ self._count += 1
+
+
+class NotifyEndpoint(Monitor):
+ def __init__(self, *args, **kwargs):
+ super(NotifyEndpoint, self).__init__(*args, **kwargs)
self.cache = []
def info(self, ctxt, publisher_id, event_type, payload, metadata):
+ super(NotifyEndpoint, self).info(ctxt, publisher_id, event_type,
+ payload, metadata)
LOG.info('msg rcv')
LOG.info("%s %s %s %s" % (ctxt, publisher_id, event_type, payload))
- if payload not in self.cache:
+ if not self.show_stats and payload not in self.cache:
LOG.info('requeue msg')
self.cache.append(payload)
for i in range(15):
@@ -97,8 +117,8 @@ class NotifyEndpoint(object):
return messaging.NotificationResult.HANDLED
-def notify_server(transport):
- endpoints = [NotifyEndpoint()]
+def notify_server(transport, show_stats):
+ endpoints = [NotifyEndpoint(show_stats)]
target = messaging.Target(topic='n-t1')
server = notify.get_notification_listener(transport, [target],
endpoints, executor='eventlet')
@@ -106,8 +126,41 @@ def notify_server(transport):
server.wait()
-class RpcEndpoint(object):
- def __init__(self, wait_before_answer):
+class BatchNotifyEndpoint(Monitor):
+ def __init__(self, *args, **kwargs):
+ super(BatchNotifyEndpoint, self).__init__(*args, **kwargs)
+ self.cache = []
+
+ def info(self, messages):
+ super(BatchNotifyEndpoint, self).info(messages)
+ self._count += len(messages) - 1
+
+ LOG.info('msg rcv')
+ LOG.info("%s" % messages)
+ if not self.show_stats and messages not in self.cache:
+ LOG.info('requeue msg')
+ self.cache.append(messages)
+ for i in range(15):
+ eventlet.sleep(1)
+ return messaging.NotificationResult.REQUEUE
+ else:
+ LOG.info('ack msg')
+ return messaging.NotificationResult.HANDLED
+
+
+def batch_notify_server(transport, show_stats):
+ endpoints = [BatchNotifyEndpoint(show_stats)]
+ target = messaging.Target(topic='n-t1')
+ server = notify.get_batch_notification_listener(
+ transport, [target],
+ endpoints, executor='eventlet',
+ batch_size=1000, batch_time=5)
+ server.start()
+ server.wait()
+
+
+class RpcEndpoint(Monitor):
+ def __init__(self, wait_before_answer, show_stats):
self.count = None
self.wait_before_answer = wait_before_answer
@@ -126,27 +179,8 @@ class RpcEndpoint(object):
return "OK: %s" % message
-class RpcEndpointMonitor(RpcEndpoint):
- def __init__(self, *args, **kwargs):
- super(RpcEndpointMonitor, self).__init__(*args, **kwargs)
-
- self._count = self._prev_count = 0
- self._monitor()
-
- def _monitor(self):
- threading.Timer(1.0, self._monitor).start()
- print ("%d msg was received per second"
- % (self._count - self._prev_count))
- self._prev_count = self._count
-
- def info(self, *args, **kwargs):
- self._count += 1
- super(RpcEndpointMonitor, self).info(*args, **kwargs)
-
-
def rpc_server(transport, target, wait_before_answer, executor, show_stats):
- endpoint_cls = RpcEndpointMonitor if show_stats else RpcEndpoint
- endpoints = [endpoint_cls(wait_before_answer)]
+ endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
server = rpc.get_rpc_server(transport, target, endpoints,
executor=executor)
server.start()
@@ -244,6 +278,11 @@ def main():
help='notify/rpc server/client mode')
server = subparsers.add_parser('notify-server')
+ server.add_argument('--show-stats', dest='show_stats',
+ type=bool, default=True)
+ server = subparsers.add_parser('batch-notify-server')
+ server.add_argument('--show-stats', dest='show_stats',
+ type=bool, default=True)
client = subparsers.add_parser('notify-client')
client.add_argument('-p', dest='threads', type=int, default=1,
help='number of client threads')
@@ -302,7 +341,9 @@ def main():
rpc_server(transport, target, args.wait_before_answer, args.executor,
args.show_stats)
elif args.mode == 'notify-server':
- notify_server(transport)
+ notify_server(transport, args.show_stats)
+ elif args.mode == 'batch-notify-server':
+ batch_notify_server(transport, args.show_stats)
elif args.mode == 'notify-client':
threads_spawner(args.threads, notifier, transport, args.messages,
args.wait_after_msg, args.timeout)