summaryrefslogtreecommitdiff
path: root/tools/simulator.py
diff options
context:
space:
mode:
authorYulia Portnova <yportnova@mirantis.com>2016-01-11 18:46:40 +0200
committerYulia Portnova <yportnova@mirantis.com>2016-01-11 18:46:40 +0200
commit2c8f39312f9b103b12586323a5d88f188c787d41 (patch)
tree20c038a38e93d8d59d28a1be358cde712951b168 /tools/simulator.py
parent7265a82a4e9369e561931dfc6b1be2f93f148d0c (diff)
downloadoslo-messaging-2c8f39312f9b103b12586323a5d88f188c787d41.tar.gz
Add duration option to simulator.py
Change-Id: I992fdc1e22ee0debed34b4beb62cbd563351d12f
Diffstat (limited to 'tools/simulator.py')
-rwxr-xr-xtools/simulator.py39
1 files changed, 30 insertions, 9 deletions
diff --git a/tools/simulator.py b/tools/simulator.py
index 6ab9e6d..0bc0c3b 100755
--- a/tools/simulator.py
+++ b/tools/simulator.py
@@ -165,8 +165,10 @@ class RpcEndpoint(Monitor):
def __init__(self, wait_before_answer, show_stats):
self.count = None
self.wait_before_answer = wait_before_answer
+ self.messages_received = 0
def info(self, ctxt, message):
+ self.messages_received += 1
i = int(message.split(' ')[-1])
if self.count is None:
self.count = i
@@ -184,7 +186,7 @@ class RpcEndpoint(Monitor):
class RPCClient(object):
def __init__(self, transport, target, timeout, method, wait_after_msg):
self.client = rpc.RPCClient(transport, target)
- self.client.prepare(timeout=timeout)
+ self.client = self.client.prepare(timeout=timeout)
self.method = method
self.bytes = 0
self.msg_sent = 0
@@ -222,11 +224,21 @@ def init_msg(messages_count):
LOG.info("Messages has been prepared")
-def rpc_server(transport, target, wait_before_answer, executor, show_stats):
+def rpc_server(transport, target, wait_before_answer, executor, show_stats,
+ duration):
endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
server = rpc.get_rpc_server(transport, target, endpoints,
executor=executor)
server.start()
+ if duration:
+ start_t = time.time()
+ while time.time() - start_t < duration:
+ time.sleep(1)
+ server.stop()
+ server.wait()
+ LOG.info("Received total messages: %d",
+ server.dispatcher.endpoints[0].messages_received)
+ return
server.wait()
@@ -238,14 +250,20 @@ def threads_spawner(threads, method, *args, **kwargs):
def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
- messages_count):
- LOG.debug("Sending %d messages using client %d", messages_count, c_id)
+ messages_count, duration):
rpc_method = _rpc_cast if is_cast else _rpc_call
client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg)
RPC_CLIENTS.append(client)
- for _ in xrange(0, messages_count):
- client.send_msg()
- LOG.debug("Client %d has sent all messages", c_id)
+
+ if duration:
+ start_time = time.time()
+ while time.time() - start_time < duration:
+ client.send_msg()
+ else:
+ LOG.debug("Sending %d messages using client %d", messages_count, c_id)
+ for _ in xrange(0, messages_count):
+ client.send_msg()
+ LOG.debug("Client %d has sent %d messages", c_id, messages_count)
def _rpc_call(client, msg):
@@ -304,6 +322,9 @@ def main():
parser.add_argument('-tp', '--topic', dest='topic',
default="profiler_topic",
help="Topic to publish/receive messages to/from.")
+ parser.add_argument('-l', dest='duration', type=int,
+ help='send messages for certain time')
+
subparsers = parser.add_subparsers(dest='mode',
help='notify/rpc server/client mode')
@@ -369,7 +390,7 @@ def main():
cfg.CONF.rpc_zmq_matchmaker = "redis"
transport._driver.matchmaker._redis.flushdb()
rpc_server(transport, target, args.wait_before_answer, args.executor,
- args.show_stats)
+ args.show_stats, args.duration)
elif args.mode == 'notify-server':
notify_server(transport, args.show_stats)
elif args.mode == 'batch-notify-server':
@@ -383,7 +404,7 @@ def main():
start = datetime.datetime.now()
threads_spawner(args.threads, send_msg, transport, target,
args.wait_after_msg, args.timeout, args.is_cast,
- args.messages)
+ args.messages, args.duration)
time_elapsed = (datetime.datetime.now() - start).total_seconds()
msg_count = 0