summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-09-29 01:57:44 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-09-29 01:57:44 +0000
commit4728497e7697fb70ec2301f60e8819c0a135cce8 (patch)
tree78db03e03a70577b66b6a5da33871bb920cad3fc
parent61817dfa2e0f4e56420f59522e055c1cfbd531b6 (diff)
downloadqpid-python-4728497e7697fb70ec2301f60e8819c0a135cce8.tar.gz
Improved the error handling and modified the parameters to pass in an address string when invoking a receiver or a sender.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1002447 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xjava/testkit/testkit.py77
1 files changed, 47 insertions, 30 deletions
diff --git a/java/testkit/testkit.py b/java/testkit/testkit.py
index 7410e3ef45..4e6ee9b586 100755
--- a/java/testkit/testkit.py
+++ b/java/testkit/testkit.py
@@ -30,6 +30,17 @@ try:
except ImportError:
_cp = checkenv("QP_CP")
+class Formatter:
+
+ def __init__(self, message):
+ self.message = message
+ self.environ = {"M": self.message,
+ "P": self.message.properties,
+ "C": self.message.content}
+
+ def __getitem__(self, st):
+ return eval(st, self.environ)
+
# The base test case has support for launching the generic
# receiver and sender through the TestLauncher with all the options.
#
@@ -47,15 +58,14 @@ class JavaClientTest(BrokerTest):
def client(self,**options):
cmd = ["java","-cp",_cp]
+
cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")]
cmd += ["-Dhost=" + options.get("host","127.0.0.1")]
cmd += ["-Dport=" + str(options.get("port",5672))]
cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
- cmd += ["-Dssn_count=" + str(options.get("ssn_count",1))]
- cmd += ["-Dqueue_name=" + options.get("queue_name","queue")]
- cmd += ["-Dexchange_name=" + options.get("exchange_name","amq.direct")]
- cmd += ["-Drouting_key=" + options.get("routing_key","routing_key")]
- cmd += ["-Dunique_dests=" + str(options.get("unique_dests",True))]
+ cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))]
+ cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))]
+ cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",True))]
cmd += ["-Ddurable=" + str(options.get("durable",False))]
cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
@@ -66,11 +76,12 @@ class JavaClientTest(BrokerTest):
cmd += ["-Dmsg_count=" + str(options.get("msg_count",10))]
cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
- cmd += ["-Dreliability=" + options.get("reliability", "exactly_once")]
+ cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))]
cmd += ["-Dlog.level=" + options.get("log.level", "warn")]
cmd += [self.client_class]
+ cmd += [options.get("address", "my_queue; {create: always}")]
- print str(options.get("port",5672))
+ #print str(options.get("port",5672))
return cmd
# currently there is no transparent reconnection.
@@ -79,21 +90,21 @@ class JavaClientTest(BrokerTest):
ssn = broker.connect().session()
err_watcher = ssn.receiver("control; {create:always}", capacity=1)
i = run_time/error_ck_freq
- for j in range(i):
- try:
- m = err_watcher.fetch(timeout=error_ck_freq)
- print "Java process notified of an error"
- print self.check_for_error(m)
- except messaging.Empty, e:
- pass # do nothing
+ for j in range(i):
+ not_empty = True
+ while not_empty:
+ try:
+ m = err_watcher.fetch(timeout=error_ck_freq)
+ ssn.acknowledge()
+ print "Java process notified of an error"
+ self.check_for_error(m)
+ except messaging.Empty, e:
+ not_empty = False
+
ssn.close()
def check_for_error(self,msg):
- raise Exception("Error:%s \nTime:%s\nTrace:%s\n" %
- (msg.properties.get("desc"),
- msg.properties.get("time"),
- msg.properties.get("exception-trace")
- ))
+ print msg.properties.get("exception-trace")
def verify(self, receiver,sender):
sender_running = receiver.is_running()
@@ -115,14 +126,18 @@ class ConcurrencyTest(JavaClientTest):
self.start_error_watcher(broker=cluster[0])
receiver = self.popen(self.client(receiver=True,
- ssn_count=25,
+ ssn_per_con=25,
port=p,
+ address="amq.topic",
+ use_unique_dests=True,
test_name=self.id()),
expect=EXPECT_RUNNING)
sender = self.popen(self.client(sender=True,
- ssn_count=25,
+ ssn_per_con=25,
port=p,
+ address="amq.topic",
+ use_unique_dests=True,
test_name=self.id()),
expect=EXPECT_RUNNING)
@@ -140,16 +155,20 @@ class ConcurrencyTest(JavaClientTest):
self.start_error_watcher(broker=cluster[0])
receiver = self.popen(self.client(receiver=True,
- ssn_count=25,
+ ssn_per_con=25,
port=p,
transacted=True,
+ address="amq.topic",
+ use_unique_dests=True,
test_name=self.id()),
expect=EXPECT_RUNNING)
sender = self.popen(self.client(sender=True,
- ssn_count=25,
+ ssn_per_con=25,
port=p,
transacted=True,
+ address="amq.topic",
+ use_unique_dests=True,
test_name=self.id()),
expect=EXPECT_RUNNING)
@@ -165,28 +184,27 @@ class SoakTest(JavaClientTest):
p = cluster[0].port()
self.start_error_watcher(broker=cluster[0])
receiver = self.popen(self.client(receiver=True,
- ssn_count=1,
+ ssn_per_con=1,
port=p,
- reliability="at_least_once",
test_name=self.id()),
expect=EXPECT_RUNNING)
sender = self.popen(self.client(sender=True,
- ssn_count=1,
+ ssn_per_con=1,
port=p,
- reliability="at_least_once",
test_name=self.id()),
expect=EXPECT_RUNNING)
# grace period for java clients to get the failover properly setup.
time.sleep(30)
- error_msg=None
+ error_msg= None
# Kill original brokers, start new ones.
try:
for i in range(8):
cluster[i].kill()
b=cluster.start()
self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
+ print "iteration : " + str(i)
except ConnectError, e1:
error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
@@ -195,5 +213,4 @@ class SoakTest(JavaClientTest):
self.verify(receiver,sender)
if error_msg:
- raise Exception(error_msg)
-
+ raise Exception(error_msg)