summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/brokertest.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r--qpid/cpp/src/tests/brokertest.py52
1 files changed, 27 insertions, 25 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index f4a20eda5a..b2b4d89b0f 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -198,16 +198,17 @@ class Popen(subprocess.Popen):
os.kill( self.pid , signal.SIGTERM)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
- self._cleanup()
+ self.wait()
def kill(self):
- try: subprocess.Popen.kill(self)
+ try:
+ subprocess.Popen.kill(self)
except AttributeError: # No terminate method
try:
os.kill( self.pid , signal.SIGKILL)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
- self._cleanup()
+ self.wait()
def _cleanup(self):
"""Clean up after a dead process"""
@@ -555,22 +556,22 @@ class NumberedSender(Thread):
"""
def __init__(self, broker, max_depth=None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS):
+ connection_options=Cluster.CONNECTION_OPTIONS,
+ failover_updates=True, url=None):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
"""
Thread.__init__(self)
+ cmd = ["qpid-send",
+ "--broker", url or broker.host_port(),
+ "--address", "%s;{create:always}"%queue,
+ "--connection-options", "{%s}"%(connection_options),
+ "--content-stdin"
+ ]
+ if failover_updates: cmd += ["--failover-updates"]
self.sender = broker.test.popen(
- ["qpid-send",
- "--broker", "localhost:%s"%broker.port(),
- "--address", "%s;{create:always}"%queue,
- "--failover-updates",
- "--connection-options", "{%s}"%(connection_options),
- "--content-stdin"
- ],
- expect=EXPECT_RUNNING,
- stdin=PIPE)
+ cmd, expect=EXPECT_RUNNING, stdin=PIPE)
self.condition = Condition()
self.max = max_depth
self.received = 0
@@ -617,30 +618,31 @@ class NumberedReceiver(Thread):
Thread to run a receiver client and verify it receives
sequentially numbered messages.
"""
- def __init__(self, broker, sender = None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS):
+ def __init__(self, broker, sender=None, queue="test-queue",
+ connection_options=Cluster.CONNECTION_OPTIONS,
+ failover_updates=True, url=None):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
Thread.__init__(self)
self.test = broker.test
+ cmd = ["qpid-receive",
+ "--broker", url or broker.host_port(),
+ "--address", "%s;{create:always}"%queue,
+ "--connection-options", "{%s}"%(connection_options),
+ "--forever"
+ ]
+ if failover_updates: cmd += [ "--failover-updates" ]
self.receiver = self.test.popen(
- ["qpid-receive",
- "--broker", "localhost:%s"%broker.port(),
- "--address", "%s;{create:always}"%queue,
- "--failover-updates",
- "--connection-options", "{%s}"%(connection_options),
- "--forever"
- ],
- expect=EXPECT_RUNNING,
- stdout=PIPE)
+ cmd, expect=EXPECT_RUNNING, stdout=PIPE)
self.lock = Lock()
self.error = None
self.sender = sender
self.received = 0
def read_message(self):
- return int(self.receiver.stdout.readline())
+ n = int(self.receiver.stdout.readline())
+ return n
def run(self):
try: