diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-09-29 01:57:44 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-09-29 01:57:44 +0000 |
commit | 4728497e7697fb70ec2301f60e8819c0a135cce8 (patch) | |
tree | 78db03e03a70577b66b6a5da33871bb920cad3fc | |
parent | 61817dfa2e0f4e56420f59522e055c1cfbd531b6 (diff) | |
download | qpid-python-4728497e7697fb70ec2301f60e8819c0a135cce8.tar.gz |
Improved the error handling and modified the parameters to pass in an address string when invoking a receiver or a sender.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1002447 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | java/testkit/testkit.py | 77 |
1 files changed, 47 insertions, 30 deletions
diff --git a/java/testkit/testkit.py b/java/testkit/testkit.py index 7410e3ef45..4e6ee9b586 100755 --- a/java/testkit/testkit.py +++ b/java/testkit/testkit.py @@ -30,6 +30,17 @@ try: except ImportError: _cp = checkenv("QP_CP") +class Formatter: + + def __init__(self, message): + self.message = message + self.environ = {"M": self.message, + "P": self.message.properties, + "C": self.message.content} + + def __getitem__(self, st): + return eval(st, self.environ) + # The base test case has support for launching the generic # receiver and sender through the TestLauncher with all the options. # @@ -47,15 +58,14 @@ class JavaClientTest(BrokerTest): def client(self,**options): cmd = ["java","-cp",_cp] + cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")] cmd += ["-Dhost=" + options.get("host","127.0.0.1")] cmd += ["-Dport=" + str(options.get("port",5672))] cmd += ["-Dcon_count=" + str(options.get("con_count",1))] - cmd += ["-Dssn_count=" + str(options.get("ssn_count",1))] - cmd += ["-Dqueue_name=" + options.get("queue_name","queue")] - cmd += ["-Dexchange_name=" + options.get("exchange_name","amq.direct")] - cmd += ["-Drouting_key=" + options.get("routing_key","routing_key")] - cmd += ["-Dunique_dests=" + str(options.get("unique_dests",True))] + cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))] + cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))] + cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",True))] cmd += ["-Ddurable=" + str(options.get("durable",False))] cmd += ["-Dtransacted=" + str(options.get("transacted",False))] cmd += ["-Dreceiver=" + str(options.get("receiver",False))] @@ -66,11 +76,12 @@ class JavaClientTest(BrokerTest): cmd += ["-Dmsg_count=" + str(options.get("msg_count",10))] cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))] cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")] - cmd += ["-Dreliability=" + options.get("reliability", "exactly_once")] + cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))] cmd += ["-Dlog.level=" + options.get("log.level", "warn")] cmd += [self.client_class] + cmd += [options.get("address", "my_queue; {create: always}")] - print str(options.get("port",5672)) + #print str(options.get("port",5672)) return cmd # currently there is no transparent reconnection. @@ -79,21 +90,21 @@ class JavaClientTest(BrokerTest): ssn = broker.connect().session() err_watcher = ssn.receiver("control; {create:always}", capacity=1) i = run_time/error_ck_freq - for j in range(i): - try: - m = err_watcher.fetch(timeout=error_ck_freq) - print "Java process notified of an error" - print self.check_for_error(m) - except messaging.Empty, e: - pass # do nothing + for j in range(i): + not_empty = True + while not_empty: + try: + m = err_watcher.fetch(timeout=error_ck_freq) + ssn.acknowledge() + print "Java process notified of an error" + self.check_for_error(m) + except messaging.Empty, e: + not_empty = False + ssn.close() def check_for_error(self,msg): - raise Exception("Error:%s \nTime:%s\nTrace:%s\n" % - (msg.properties.get("desc"), - msg.properties.get("time"), - msg.properties.get("exception-trace") - )) + print msg.properties.get("exception-trace") def verify(self, receiver,sender): sender_running = receiver.is_running() @@ -115,14 +126,18 @@ class ConcurrencyTest(JavaClientTest): self.start_error_watcher(broker=cluster[0]) receiver = self.popen(self.client(receiver=True, - ssn_count=25, + ssn_per_con=25, port=p, + address="amq.topic", + use_unique_dests=True, test_name=self.id()), expect=EXPECT_RUNNING) sender = self.popen(self.client(sender=True, - ssn_count=25, + ssn_per_con=25, port=p, + address="amq.topic", + use_unique_dests=True, test_name=self.id()), expect=EXPECT_RUNNING) @@ -140,16 +155,20 @@ class ConcurrencyTest(JavaClientTest): self.start_error_watcher(broker=cluster[0]) receiver = self.popen(self.client(receiver=True, - ssn_count=25, + ssn_per_con=25, port=p, transacted=True, + address="amq.topic", + use_unique_dests=True, test_name=self.id()), expect=EXPECT_RUNNING) sender = self.popen(self.client(sender=True, - ssn_count=25, + ssn_per_con=25, port=p, transacted=True, + address="amq.topic", + use_unique_dests=True, test_name=self.id()), expect=EXPECT_RUNNING) @@ -165,28 +184,27 @@ class SoakTest(JavaClientTest): p = cluster[0].port() self.start_error_watcher(broker=cluster[0]) receiver = self.popen(self.client(receiver=True, - ssn_count=1, + ssn_per_con=1, port=p, - reliability="at_least_once", test_name=self.id()), expect=EXPECT_RUNNING) sender = self.popen(self.client(sender=True, - ssn_count=1, + ssn_per_con=1, port=p, - reliability="at_least_once", test_name=self.id()), expect=EXPECT_RUNNING) # grace period for java clients to get the failover properly setup. time.sleep(30) - error_msg=None + error_msg= None # Kill original brokers, start new ones. try: for i in range(8): cluster[i].kill() b=cluster.start() self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) + print "iteration : " + str(i) except ConnectError, e1: error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1) @@ -195,5 +213,4 @@ class SoakTest(JavaClientTest): self.verify(receiver,sender) if error_msg: - raise Exception(error_msg) - + raise Exception(error_msg) |