summaryrefslogtreecommitdiff
path: root/java/tools/bin/testkit.py
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /java/tools/bin/testkit.py
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-rajith_jms_client.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools/bin/testkit.py')
-rwxr-xr-xjava/tools/bin/testkit.py278
1 files changed, 0 insertions, 278 deletions
diff --git a/java/tools/bin/testkit.py b/java/tools/bin/testkit.py
deleted file mode 100755
index 1c2ad598b8..0000000000
--- a/java/tools/bin/testkit.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import time, string, traceback
-from brokertest import *
-from qpid.messaging import *
-
-
-try:
- import java.lang.System
- _cp = java.lang.System.getProperty("java.class.path");
-except ImportError:
- _cp = checkenv("QP_CP")
-
-class Formatter:
-
- def __init__(self, message):
- self.message = message
- self.environ = {"M": self.message,
- "P": self.message.properties,
- "C": self.message.content}
-
- def __getitem__(self, st):
- return eval(st, self.environ)
-
-# The base test case has support for launching the generic
-# receiver and sender through the TestLauncher with all the options.
-#
-class JavaClientTest(BrokerTest):
- """Base Case for Java Test cases"""
-
- client_class = "org.apache.qpid.testkit.TestLauncher"
-
- # currently there is no transparent reconnection.
- # temp hack: just creating the queue here and closing it.
- def start_error_watcher(self,broker=None):
- ssn = broker.connect().session()
- err_watcher = ssn.receiver("control; {create:always}", capacity=1)
- ssn.close()
-
- def store_module_args(self):
- if BrokerTest.store_lib:
- return ["--load-module", BrokerTest.store_lib]
- else:
- print "Store module not present."
- return [""]
-
- def client(self,**options):
- cmd = ["java","-cp",_cp]
-
- cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")]
- cmd += ["-Dhost=" + options.get("host","127.0.0.1")]
- cmd += ["-Dport=" + str(options.get("port",5672))]
- cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
- cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))]
- cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))]
- cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))]
- cmd += ["-Ddurable=" + str(options.get("durable",False))]
- cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
- cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
- cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))]
- cmd += ["-Dsender=" + str(options.get("sender",False))]
- cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))]
- cmd += ["-Dtx_size=" + str(options.get("tx_size",10))]
- cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))]
- cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))]
- cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))]
- cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))]
- cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
- cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
- cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))]
- cmd += ["-Dlog.level=" + options.get("log.level", "warn")]
- cmd += [self.client_class]
- cmd += [options.get("address", "my_queue; {create: always}")]
-
- #print str(options.get("port",5672))
- return cmd
-
- # currently there is no transparent reconnection.
- # temp hack: just creating a receiver and closing session soon after.
- def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60):
- ssn = broker.connect().session()
- err_watcher = ssn.receiver("control; {create:always}", capacity=1)
- i = run_time/error_ck_freq
- is_error = False
- for j in range(i):
- not_empty = True
- while not_empty:
- try:
- m = err_watcher.fetch(timeout=error_ck_freq)
- ssn.acknowledge()
- print "Java process notified of an error"
- self.print_error(m)
- is_error = True
- except messaging.Empty, e:
- not_empty = False
-
- ssn.close()
- return is_error
-
- def print_error(self,msg):
- print msg.properties.get("exception-trace")
-
- def verify(self, receiver,sender):
- sender_running = receiver.is_running()
- receiver_running = sender.is_running()
-
- self.assertTrue(receiver_running,"Receiver has exited prematually")
- self.assertTrue(sender_running,"Sender has exited prematually")
-
- def start_sender_and_receiver(self,**options):
-
- receiver_opts = options
- receiver_opts["receiver"]=True
- receiver = self.popen(self.client(**receiver_opts),
- expect=EXPECT_RUNNING)
-
- sender_opts = options
- sender_opts["sender"]=True
- sender = self.popen(self.client(**sender_opts),
- expect=EXPECT_RUNNING)
-
- return receiver, sender
-
- def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options):
- if options.get("durable",False)==True:
- cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args())
- else:
- cluster = Cluster(self, count=count)
- return cluster
-
-class ConcurrencyTest(JavaClientTest):
- """A concurrency test suite for the JMS client"""
- skip = False
-
- def base_case(self,**options):
- if self.skip :
- print "Skipping test"
- return
-
- cluster = self.start_cluster(count=2,**options)
- self.start_error_watcher(broker=cluster[0])
- options["port"] = port=cluster[0].port()
-
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
- receiver, sender = self.start_sender_and_receiver(**options)
- self.monitor_clients(broker=cluster[0],run_time=180)
- self.verify(receiver,sender)
-
- def test_multiplexing_con(self):
- """Tests multiple sessions on a single connection"""
-
- self.base_case(ssn_per_con=25,test_name=self.id())
-
- def test_multiplexing_con_with_tx(self):
- """Tests multiple transacted sessions on a single connection"""
-
- self.base_case(ssn_per_con=25,transacted=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_rcv(self):
- """Tests multiple sessions with sync receive"""
-
- self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id())
-
- def test_multiplexing_con_with_durable_sub(self):
- """Tests multiple sessions with durable subs"""
-
- self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_ack(self):
- """Tests multiple sessions with sync ack"""
-
- self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_pub(self):
- """Tests multiple sessions with sync pub"""
-
- self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id())
-
- def test_multiple_cons_and_ssns(self):
- """Tests multiple connections and sessions"""
-
- self.base_case(con_count=10,ssn_per_con=25,test_name=self.id())
-
-
-class SoakTest(JavaClientTest):
- """A soak test suite for the JMS client"""
-
- def base_case(self,**options):
- cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options)
- options["port"] = port=cluster[0].port()
- self.start_error_watcher(broker=cluster[0])
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
- receiver,sender = self.start_sender_and_receiver(**options)
- is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30)
-
- if (is_error):
- print "The sender or receiver didn't start properly. Exiting test."
- return
- else:
- "Print no error !"
-
- # grace period for java clients to get the failover properly setup.
- time.sleep(30)
- error_msg= None
- # Kill original brokers, start new ones.
- try:
- for i in range(8):
- cluster[i].kill()
- b=cluster.start()
- self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
- print "iteration : " + str(i)
- except ConnectError, e1:
- error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
-
- except SessionError, e2:
- error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
-
- self.verify(receiver,sender)
- if error_msg:
- raise Exception(error_msg)
-
-
- def test_failover(self) :
- """Test basic failover"""
-
- self.base_case(test_name=self.id())
-
-
- def test_failover_with_durablesub(self):
- """Test failover with durable subscriber"""
-
- self.base_case(durable=True,jms_durable_sub=True,test_name=self.id())
-
-
- def test_failover_with_sync_rcv(self):
- """Test failover with sync receive"""
-
- self.base_case(sync_rcv=True,test_name=self.id())
-
-
- def test_failover_with_sync_ack(self):
- """Test failover with sync ack"""
-
- self.base_case(sync_ack=True,test_name=self.id())
-
-
- def test_failover_with_noprefetch(self):
- """Test failover with no prefetch"""
-
- self.base_case(max_prefetch=1,test_name=self.id())
-
-
- def test_failover_with_multiple_cons_and_ssns(self):
- """Test failover with multiple connections and sessions"""
-
- self.base_case(use_unique_dests=True,address="amq.topic",
- con_count=10,ssn_per_con=25,test_name=self.id())