summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rwxr-xr-xtools/simulator.py47
1 files changed, 34 insertions, 13 deletions
diff --git a/tools/simulator.py b/tools/simulator.py
index 0a2af19..1ce2419 100755
--- a/tools/simulator.py
+++ b/tools/simulator.py
@@ -22,6 +22,7 @@ import json
import logging
import os
import random
+import signal
import six
import string
import sys
@@ -40,6 +41,8 @@ RANDOM_GENERATOR = None
CURRENT_PID = None
CLIENTS = []
MESSAGES = []
+IS_RUNNING = True
+SERVERS = []
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
{notify-server,notify-client,rpc-server,rpc-client} ...
@@ -115,7 +118,9 @@ class MessageStatsCollector(object):
threading.Timer(1.0, self.monitor).start() # schedule in a second
def monitor(self):
- threading.Timer(1.0, self.monitor).start()
+ global IS_RUNNING
+ if IS_RUNNING:
+ threading.Timer(1.0, self.monitor).start()
now = time.time()
count = len(self.buffer)
@@ -381,17 +386,18 @@ def generate_messages(messages_count):
def run_server(server, duration=None):
- try:
- server.start()
- if duration:
- with timeutils.StopWatch(duration) as stop_watch:
- while not stop_watch.expired():
- time.sleep(1)
- server.stop()
- server.wait()
- except KeyboardInterrupt: # caught SIGINT
- LOG.info('Caught SIGINT, terminating')
- time.sleep(1) # wait for stats collector to process the last second
+ global IS_RUNNING
+ SERVERS.append(server)
+ server.start()
+ if duration:
+ with timeutils.StopWatch(duration) as stop_watch:
+ while not stop_watch.expired() and IS_RUNNING:
+ time.sleep(1)
+ server.stop()
+ IS_RUNNING = False
+ server.wait()
+ LOG.info('The server is terminating')
+ time.sleep(1) # wait for stats collector to process the last second
def rpc_server(transport, target, wait_before_answer, executor, duration):
@@ -429,20 +435,24 @@ def spawn_notify_clients(threads, topic, transport, message_count,
def send_messages(client_id, client_builder, messages_count, duration):
+ global IS_RUNNING
client = client_builder()
CLIENTS.append(client)
if duration:
with timeutils.StopWatch(duration) as stop_watch:
- while not stop_watch.expired():
+ while not stop_watch.expired() and IS_RUNNING:
client.send_msg()
eventlet.sleep()
+ IS_RUNNING = False
else:
LOG.debug("Sending %d messages using client %d",
messages_count, client_id)
for _ in six.moves.range(0, messages_count):
client.send_msg()
eventlet.sleep()
+ if not IS_RUNNING:
+ break
LOG.debug("Client %d has sent %d messages", client_id, messages_count)
time.sleep(1) # wait for replies to be collected
@@ -521,6 +531,14 @@ def write_json_file(filename, output):
LOG.info('Stats are written into %s', filename)
+def signal_handler(signum, frame):
+ global IS_RUNNING
+ IS_RUNNING = False
+ LOG.info('Signal %s is caught. Interrupting the execution', signum)
+ for server in SERVERS:
+ server.stop()
+
+
def _setup_logging(is_debug):
log_level = logging.DEBUG if is_debug else logging.INFO
logging.basicConfig(
@@ -625,6 +643,9 @@ def main():
cfg.CONF.prog = os.path.basename(__file__)
cfg.CONF.project = 'oslo.messaging'
+ signal.signal(signal.SIGTERM, signal_handler)
+ signal.signal(signal.SIGINT, signal_handler)
+
if args.mode == 'rpc-server':
target = messaging.Target(topic=args.topic, server=args.server)
if args.url.startswith('zmq'):