summaryrefslogtreecommitdiff
path: root/tools/simulator.py
diff options
context:
space:
mode:
authorYulia Portnova <yportnova@mirantis.com>2016-02-17 15:03:41 +0200
committerDmitry Mescheryakov <dmescheryakov@mirantis.com>2016-02-24 11:58:12 +0300
commitd70dfc2f7e051444d2f02b8539c8a2e4ae1988aa (patch)
treedcd49ca5a424f1b6fc704139e15c85b441f45df4 /tools/simulator.py
parent629632bfff840cb6f65d56f63b99ec6684900d13 (diff)
downloadoslo-messaging-d70dfc2f7e051444d2f02b8539c8a2e4ae1988aa.tar.gz
Added duration to notify server/client
Change-Id: I4feeeec0c69305d92dce5baf60502a39ebe6b247
Diffstat (limited to 'tools/simulator.py')
-rwxr-xr-xtools/simulator.py90
1 files changed, 51 insertions, 39 deletions
diff --git a/tools/simulator.py b/tools/simulator.py
index 1551955..d9ca8d8 100755
--- a/tools/simulator.py
+++ b/tools/simulator.py
@@ -33,6 +33,7 @@ from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging import notify # noqa
from oslo_messaging import rpc # noqa
+from oslo_utils import timeutils
LOG = logging.getLogger()
RANDOM_VARIABLE = None
@@ -121,13 +122,12 @@ class NotifyEndpoint(Monitor):
return messaging.NotificationResult.HANDLED
-def notify_server(transport, topic, show_stats):
+def notify_server(transport, topic, show_stats, duration):
endpoints = [NotifyEndpoint(show_stats)]
target = messaging.Target(topic=topic)
server = notify.get_notification_listener(transport, [target],
endpoints, executor='eventlet')
- server.start()
- server.wait()
+ run_server(server, duration=duration)
class BatchNotifyEndpoint(Monitor):
@@ -152,15 +152,14 @@ class BatchNotifyEndpoint(Monitor):
return messaging.NotificationResult.HANDLED
-def batch_notify_server(transport, topic, show_stats):
+def batch_notify_server(transport, topic, show_stats, duration):
endpoints = [BatchNotifyEndpoint(show_stats)]
target = messaging.Target(topic=topic)
server = notify.get_batch_notification_listener(
transport, [target],
endpoints, executor='eventlet',
batch_size=1000, batch_time=5)
- server.start()
- server.wait()
+ run_server(server, duration=duration)
class RpcEndpoint(Monitor):
@@ -226,23 +225,32 @@ def init_msg(messages_count):
LOG.info("Messages has been prepared")
+def run_server(server, duration=None):
+ server.start()
+ if duration:
+ with timeutils.StopWatch(duration) as stop_watch:
+ while not stop_watch.expired():
+ time.sleep(1)
+ server.stop()
+ server.wait()
+
+
def rpc_server(transport, target, wait_before_answer, executor, show_stats,
duration):
endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
server = rpc.get_rpc_server(transport, target, endpoints,
executor=executor)
LOG.debug("starting RPC server for target %s", target)
- server.start()
- if duration:
- start_t = time.time()
- while time.time() - start_t < duration:
- time.sleep(1)
- server.stop()
- server.wait()
- LOG.info("Received total messages: %d",
- server.dispatcher.endpoints[0].messages_received)
- return
- server.wait()
+ run_server(server, duration=duration)
+ LOG.info("Received total messages: %d",
+ server.dispatcher.endpoints[0].messages_received)
+
+
+def spawn_notify_clients(threads, *args, **kwargs):
+ p = eventlet.GreenPool(size=threads)
+ for i in range(0, threads):
+ p.spawn_n(notifier, i, *args, **kwargs)
+ p.waitall()
def spawn_rpc_clients(threads, transport, targets,
@@ -256,13 +264,6 @@ def spawn_rpc_clients(threads, transport, targets,
p.waitall()
-def threads_spawner(threads, method, *args, **kwargs):
- p = eventlet.GreenPool(size=threads)
- for i in range(0, threads):
- p.spawn_n(method, i, *args, **kwargs)
- p.waitall()
-
-
def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
messages_count, duration):
rpc_method = _rpc_cast if is_cast else _rpc_call
@@ -270,9 +271,9 @@ def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
RPC_CLIENTS.append(client)
if duration:
- start_time = time.time()
- while time.time() - start_time < duration:
- client.send_msg()
+ with timeutils.StopWatch(duration) as stop_watch:
+ while not stop_watch.expired():
+ client.send_msg()
else:
LOG.debug("Sending %d messages using client %d", messages_count, c_id)
for _ in six.moves.range(0, messages_count):
@@ -298,22 +299,30 @@ def _rpc_cast(client, msg):
LOG.debug("SENT: %s", msg)
-def notifier(_id, topic, transport, messages, wait_after_msg, timeout):
+def notifier(_id, topic, transport, messages, wait_after_msg, timeout,
+ duration):
n1 = notify.Notifier(transport,
driver='messaging',
topic=topic).prepare(
publisher_id='publisher-%d' % _id)
- msg = 0
- for i in range(0, messages):
- msg = 1 + msg
- ctxt = {}
- payload = dict(msg=msg, vm='test', otherdata='ahah')
- LOG.debug("send msg")
- LOG.debug(payload)
+ payload = dict(msg=0, vm='test', otherdata='ahah')
+ ctxt = {}
+
+ def send_notif():
+ payload['msg'] += 1
+ LOG.debug("sending notification %s", payload)
n1.info(ctxt, 'compute.start1', payload)
if wait_after_msg > 0:
time.sleep(wait_after_msg)
+ if duration:
+ with timeutils.StopWatch(duration) as stop_watch:
+ while not stop_watch.expired():
+ send_notif()
+ else:
+ for i in range(0, messages):
+ send_notif()
+
def _setup_logging(is_debug):
log_level = logging.DEBUG if is_debug else logging.INFO
@@ -360,6 +369,7 @@ def main():
server = subparsers.add_parser('batch-notify-server')
server.add_argument('--show-stats', dest='show_stats',
type=bool, default=True)
+
client = subparsers.add_parser('notify-client')
client.add_argument('-p', dest='threads', type=int, default=1,
help='number of client threads')
@@ -420,12 +430,14 @@ def main():
rpc_server(transport, target, args.wait_before_answer, args.executor,
args.show_stats, args.duration)
elif args.mode == 'notify-server':
- notify_server(transport, args.topic, args.show_stats)
+ notify_server(transport, args.topic, args.show_stats, args.duration)
elif args.mode == 'batch-notify-server':
- batch_notify_server(transport, args.topic, args.show_stats)
+ batch_notify_server(transport, args.topic, args.show_stats,
+ args.duration)
elif args.mode == 'notify-client':
- threads_spawner(args.threads, notifier, args.topic, transport,
- args.messages, args.wait_after_msg, args.timeout)
+ spawn_notify_clients(args.threads, args.topic, transport,
+ args.messages, args.wait_after_msg, args.timeout,
+ args.duration)
elif args.mode == 'rpc-client':
init_msg(args.messages)
targets = [target.partition('.')[::2] for target in args.targets]