diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-12-11 04:16:57 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-12-11 04:16:57 +0000 |
commit | 213176657d55a65ecd2dfebc7320651f5442761c (patch) | |
tree | 6eda9c30c6bbbf11a175fc7509452a4044b61377 /tools/simulator.py | |
parent | 4b6144a3db4b2d10b0b4fa7e437ed264e7460df1 (diff) | |
parent | 4dd644ac201ee0fe247d648a2f735998416bf2c7 (diff) | |
download | oslo-messaging-213176657d55a65ecd2dfebc7320651f5442761c.tar.gz |
Merge "batch notification listener"
Diffstat (limited to 'tools/simulator.py')
-rwxr-xr-x | tools/simulator.py | 97 |
1 files changed, 69 insertions, 28 deletions
diff --git a/tools/simulator.py b/tools/simulator.py index 03afe08..c3f9415 100755 --- a/tools/simulator.py +++ b/tools/simulator.py @@ -79,14 +79,34 @@ class LoggingNoParsingFilter(logging.Filter): return True -class NotifyEndpoint(object): - def __init__(self): +class Monitor(object): + def __init__(self, show_stats=False, *args, **kwargs): + self._count = self._prev_count = 0 + self.show_stats = show_stats + if self.show_stats: + self._monitor() + + def _monitor(self): + threading.Timer(1.0, self._monitor).start() + print ("%d msg was received per second" + % (self._count - self._prev_count)) + self._prev_count = self._count + + def info(self, *args, **kwargs): + self._count += 1 + + +class NotifyEndpoint(Monitor): + def __init__(self, *args, **kwargs): + super(NotifyEndpoint, self).__init__(*args, **kwargs) self.cache = [] def info(self, ctxt, publisher_id, event_type, payload, metadata): + super(NotifyEndpoint, self).info(ctxt, publisher_id, event_type, + payload, metadata) LOG.info('msg rcv') LOG.info("%s %s %s %s" % (ctxt, publisher_id, event_type, payload)) - if payload not in self.cache: + if not self.show_stats and payload not in self.cache: LOG.info('requeue msg') self.cache.append(payload) for i in range(15): @@ -97,8 +117,8 @@ class NotifyEndpoint(object): return messaging.NotificationResult.HANDLED -def notify_server(transport): - endpoints = [NotifyEndpoint()] +def notify_server(transport, show_stats): + endpoints = [NotifyEndpoint(show_stats)] target = messaging.Target(topic='n-t1') server = notify.get_notification_listener(transport, [target], endpoints, executor='eventlet') @@ -106,8 +126,41 @@ def notify_server(transport): server.wait() -class RpcEndpoint(object): - def __init__(self, wait_before_answer): +class BatchNotifyEndpoint(Monitor): + def __init__(self, *args, **kwargs): + super(BatchNotifyEndpoint, self).__init__(*args, **kwargs) + self.cache = [] + + def info(self, messages): + super(BatchNotifyEndpoint, self).info(messages) + self._count += len(messages) - 1 + + LOG.info('msg rcv') + LOG.info("%s" % messages) + if not self.show_stats and messages not in self.cache: + LOG.info('requeue msg') + self.cache.append(messages) + for i in range(15): + eventlet.sleep(1) + return messaging.NotificationResult.REQUEUE + else: + LOG.info('ack msg') + return messaging.NotificationResult.HANDLED + + +def batch_notify_server(transport, show_stats): + endpoints = [BatchNotifyEndpoint(show_stats)] + target = messaging.Target(topic='n-t1') + server = notify.get_batch_notification_listener( + transport, [target], + endpoints, executor='eventlet', + batch_size=1000, batch_time=5) + server.start() + server.wait() + + +class RpcEndpoint(Monitor): + def __init__(self, wait_before_answer, show_stats): self.count = None self.wait_before_answer = wait_before_answer @@ -126,27 +179,8 @@ class RpcEndpoint(object): return "OK: %s" % message -class RpcEndpointMonitor(RpcEndpoint): - def __init__(self, *args, **kwargs): - super(RpcEndpointMonitor, self).__init__(*args, **kwargs) - - self._count = self._prev_count = 0 - self._monitor() - - def _monitor(self): - threading.Timer(1.0, self._monitor).start() - print ("%d msg was received per second" - % (self._count - self._prev_count)) - self._prev_count = self._count - - def info(self, *args, **kwargs): - self._count += 1 - super(RpcEndpointMonitor, self).info(*args, **kwargs) - - def rpc_server(transport, target, wait_before_answer, executor, show_stats): - endpoint_cls = RpcEndpointMonitor if show_stats else RpcEndpoint - endpoints = [endpoint_cls(wait_before_answer)] + endpoints = [RpcEndpoint(wait_before_answer, show_stats)] server = rpc.get_rpc_server(transport, target, endpoints, executor=executor) server.start() @@ -244,6 +278,11 @@ def main(): help='notify/rpc server/client mode') server = subparsers.add_parser('notify-server') + server.add_argument('--show-stats', dest='show_stats', + type=bool, default=True) + server = subparsers.add_parser('batch-notify-server') + server.add_argument('--show-stats', dest='show_stats', + type=bool, default=True) client = subparsers.add_parser('notify-client') client.add_argument('-p', dest='threads', type=int, default=1, help='number of client threads') @@ -302,7 +341,9 @@ def main(): rpc_server(transport, target, args.wait_before_answer, args.executor, args.show_stats) elif args.mode == 'notify-server': - notify_server(transport) + notify_server(transport, args.show_stats) + elif args.mode == 'batch-notify-server': + batch_notify_server(transport, args.show_stats) elif args.mode == 'notify-client': threads_spawner(args.threads, notifier, transport, args.messages, args.wait_after_msg, args.timeout) |