summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/BrokerMgmtAgent.cpp3
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp7
-rw-r--r--qpid/cpp/src/tests/ForkedBroker.cpp3
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp72
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp4
-rw-r--r--qpid/cpp/src/tests/SocketProxy.h4
-rw-r--r--qpid/cpp/src/tests/brokertest.py15
-rwxr-xr-xqpid/cpp/src/tests/cluster_test_logs.py2
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py185
-rwxr-xr-xqpid/cpp/src/tests/federation.py13
10 files changed, 277 insertions, 31 deletions
diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
index d0c6668b72..1d5289dc90 100644
--- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
+++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
@@ -599,13 +599,12 @@ namespace qpid {
// populate the agent with multiple test objects
const size_t objCount = 50;
std::vector<TestManageable *> tmv;
- uint32_t objLen;
for (size_t i = 0; i < objCount; i++) {
std::stringstream key;
key << "testobj-" << i;
TestManageable *tm = new TestManageable(agent, key.str());
- objLen = tm->GetManagementObject()->writePropertiesSize();
+ (void) tm->GetManagementObject()->writePropertiesSize();
agent->addObject(tm->GetManagementObject(), key.str());
tmv.push_back(tm);
}
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 939f8f2b88..3c0cff7350 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -271,8 +271,12 @@ QPID_AUTO_TEST_CASE(testOpenFailure) {
QPID_AUTO_TEST_CASE(testPeriodicExpiration) {
Broker::Options opts;
opts.queueCleanInterval = 1;
+ opts.queueFlowStopRatio = 0;
+ opts.queueFlowResumeRatio = 0;
ClientSessionFixture fix(opts);
- fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
+ FieldTable args;
+ args.setInt("qpid.max_count",10);
+ fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
for (uint i = 0; i < 10; i++) {
Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
@@ -283,6 +287,7 @@ QPID_AUTO_TEST_CASE(testPeriodicExpiration) {
BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u);
qpid::sys::sleep(2);
BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u);
+ fix.session.messageTransfer(arg::content=Message("Message_11", "my-queue"));//ensure policy is also updated
}
QPID_AUTO_TEST_CASE(testExpirationOnPop) {
diff --git a/qpid/cpp/src/tests/ForkedBroker.cpp b/qpid/cpp/src/tests/ForkedBroker.cpp
index 53eaa7e1ce..10674b5175 100644
--- a/qpid/cpp/src/tests/ForkedBroker.cpp
+++ b/qpid/cpp/src/tests/ForkedBroker.cpp
@@ -68,8 +68,7 @@ ForkedBroker::~ForkedBroker() {
}
if (!dataDir.empty())
{
- int unused_ret; // Suppress warnings about ignoring return value.
- unused_ret = ::system(("rm -rf "+dataDir).c_str());
+ (void) ::system(("rm -rf "+dataDir).c_str());
}
}
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index 6aa4c63ed7..fae45a94d0 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -992,6 +992,78 @@ QPID_AUTO_TEST_CASE(testTtlForever)
BOOST_CHECK(in.getTtl() == Duration::FOREVER);
}
+QPID_AUTO_TEST_CASE(testExclusiveTopicSubscriber)
+{
+ TopicFixture fix;
+ std::string address = (boost::format("%1%; { link: { name: 'my-subscription', x-declare: { auto-delete: true, exclusive: true }}}") % fix.topic).str();
+ Sender sender = fix.session.createSender(fix.topic);
+ Receiver receiver1 = fix.session.createReceiver(address);
+ {
+ ScopedSuppressLogging sl;
+ try {
+ fix.session.createReceiver(address);
+ fix.session.sync();
+ BOOST_FAIL("Expected exception.");
+ } catch (const MessagingException& /*e*/) {}
+ }
+}
+
+QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber)
+{
+ TopicFixture fix;
+ std::string address = (boost::format("%1%; {node:{type:topic}, link:{name:'my-subscription', x-declare:{auto-delete:true, exclusive:false}}}") % fix.topic).str();
+ Receiver receiver1 = fix.session.createReceiver(address);
+ Receiver receiver2 = fix.session.createReceiver(address);
+ Sender sender = fix.session.createSender(fix.topic);
+ sender.send(Message("one"), true);
+ Message in = receiver1.fetch(Duration::IMMEDIATE);
+ BOOST_CHECK_EQUAL(in.getContent(), std::string("one"));
+ sender.send(Message("two"), true);
+ in = receiver2.fetch(Duration::IMMEDIATE);
+ BOOST_CHECK_EQUAL(in.getContent(), std::string("two"));
+ fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testAcknowledgeUpTo)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ const uint count(20);
+ for (uint i = 0; i < count; ++i) {
+ sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+ }
+
+ Session other = fix.connection.createSession();
+ Receiver receiver = other.createReceiver(fix.queue);
+ std::vector<Message> messages;
+ for (uint i = 0; i < count; ++i) {
+ Message msg = receiver.fetch();
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+ messages.push_back(msg);
+ }
+ const uint batch = 10;
+ other.acknowledgeUpTo(messages[batch-1]);//acknowledge first 10 messages only
+
+ messages.clear();
+ other.sync();
+ other.close();
+
+ other = fix.connection.createSession();
+ receiver = other.createReceiver(fix.queue);
+ Message msg;
+ for (uint i = 0; i < (count-batch); ++i) {
+ msg = receiver.fetch();
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
+ }
+ other.acknowledgeUpTo(msg);
+ other.sync();
+ other.close();
+
+ Message m;
+ //check queue is empty
+ BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index a752e3afec..6372aa93c3 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -681,7 +681,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
addMessagesToQueue(10, queue);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
::usleep(300*1000);
- queue.purgeExpired();
+ queue.purgeExpired(0);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
}
@@ -692,7 +692,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
- QueueCleaner cleaner(queues, timer);
+ QueueCleaner cleaner(queues, &timer);
cleaner.start(100 * qpid::sys::TIME_MSEC);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h
index 0c6f39d62e..d195f11aa9 100644
--- a/qpid/cpp/src/tests/SocketProxy.h
+++ b/qpid/cpp/src/tests/SocketProxy.h
@@ -35,6 +35,8 @@
#include "qpid/sys/Mutex.h"
#include "qpid/log/Statement.h"
+#include <boost/lexical_cast.hpp>
+
namespace qpid {
namespace tests {
@@ -62,7 +64,7 @@ class SocketProxy : private qpid::sys::Runnable
: closed(false), joined(true),
port(listener.listen()), dropClient(), dropServer()
{
- client.connect(host, connectPort);
+ client.connect(host, boost::lexical_cast<std::string>(connectPort));
joined = false;
thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
}
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index a19dd305e5..0415a667a2 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -157,8 +157,13 @@ class Popen(subprocess.Popen):
try: self.kill() # Just make sure its dead
except: pass
elif self.expect == EXPECT_RUNNING:
- try: self.kill()
- except: self.unexpected("expected running, exit code %d" % self.wait())
+ if self.poll() != None:
+ self.unexpected("expected running, exit code %d" % self.returncode)
+ else:
+ try:
+ self.kill()
+ except Exception,e:
+ self.unexpected("exception from kill: %s" % str(e))
else:
retry(lambda: self.poll() is not None)
if self.returncode is None: # Still haven't stopped
@@ -544,6 +549,7 @@ class NumberedSender(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
+ "--connection-options", "{reconnect:true}",
"--content-stdin"
],
expect=EXPECT_RUNNING,
@@ -562,6 +568,7 @@ class NumberedSender(Thread):
try:
self.sent = 0
while not self.stopped:
+ self.sender.assert_running()
if self.max:
self.condition.acquire()
while not self.stopped and self.sent - self.received > self.max:
@@ -604,6 +611,7 @@ class NumberedReceiver(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
+ "--connection-options", "{reconnect:true}",
"--forever"
],
expect=EXPECT_RUNNING,
@@ -611,15 +619,16 @@ class NumberedReceiver(Thread):
self.lock = Lock()
self.error = None
self.sender = sender
+ self.received = 0
def read_message(self):
return int(self.receiver.stdout.readline())
def run(self):
try:
- self.received = 0
m = self.read_message()
while m != -1:
+ self.receiver.assert_running()
assert(m <= self.received) # Check for missing messages
if (m == self.received): # Ignore duplicates
self.received += 1
diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py
index 9f7d1e2f6c..a0ce8fb9c3 100755
--- a/qpid/cpp/src/tests/cluster_test_logs.py
+++ b/qpid/cpp/src/tests/cluster_test_logs.py
@@ -54,7 +54,7 @@ def filter_log(log):
'caught up',
'active for links|Passivating links|Activating links',
'info Connection.* connected to', # UpdateClient connection
- 'warning Connection [\d+ [0-9.:]+] closed', # UpdateClient connection
+ 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection
'warning Broker closed connection: 200, OK',
'task late',
'task overran',
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index caa41fa001..c1d9103f08 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -18,11 +18,12 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging
+import cluster_test_logs
from qpid import datatypes, messaging
from brokertest import *
from qpid.harness import Skipped
-from qpid.messaging import Message, Empty, Disposition, REJECTED
+from qpid.messaging import Message, Empty, Disposition, REJECTED, util
from threading import Thread, Lock, Condition
from logging import getLogger
from itertools import chain
@@ -96,9 +97,15 @@ class ShortTests(BrokerTest):
destination="amq.direct",
message=qpid.datatypes.Message(props, "content"))
+ # Try message with TTL and differnet headers/properties
+ cluster[0].send_message("q", Message(durable=True, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000))
+
# Now update a new member and compare their dumps.
cluster.start(args=["--test-store-dump", "updatee.dump"])
assert readfile("direct.dump") == readfile("updatee.dump")
+
os.remove("direct.dump")
os.remove("updatee.dump")
@@ -253,6 +260,7 @@ acl allow all all
client was attached.
"""
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+ # First broker will be killed.
cluster0 = self.cluster(1, args=args)
cluster1 = self.cluster(1, args=args)
assert 0 == subprocess.call(
@@ -287,6 +295,7 @@ acl allow all all
# Force a change of elder
cluster0.start()
+ cluster0[0].expect=EXPECT_EXIT_FAIL # About to die.
cluster0[0].kill()
time.sleep(2) # Allow a management interval to pass.
# Verify logs are consistent
@@ -525,7 +534,7 @@ acl allow all all
receiver.wait()
q_obj.update()
assert not q_obj.flowStopped
- assert q_obj.msgDepth == 0
+ self.assertEqual(q_obj.msgDepth, 0)
# verify that the sender has become unblocked
sender.join(timeout=5)
@@ -685,6 +694,25 @@ acl allow all all
self.assert_browse(s1, "q", ["foo"])
+ def test_ttl_consistent(self):
+ """Ensure we don't get inconsistent errors with message that have TTL very close together"""
+ messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)]
+ messages.append(Message("x"))
+ cluster = self.cluster(2)
+ sender = cluster[0].connect().session().sender("q;{create:always}")
+
+ def fetch(b):
+ receiver = b.connect().session().receiver("q;{create:always}")
+ while receiver.fetch().content != "x": pass
+
+ for m in messages: sender.send(m, sync=False)
+ for m in messages: sender.send(m, sync=False)
+ fetch(cluster[0])
+ fetch(cluster[1])
+ for m in messages: sender.send(m, sync=False)
+ cluster.start()
+ fetch(cluster[2])
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -697,22 +725,28 @@ class LongTests(BrokerTest):
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+ for b in cluster: b.ready() # Wait for brokers to be ready
for b in cluster: ErrorGenerator(b)
# Start sender and receiver threads
cluster[0].declare_queue("test-queue")
- sender = NumberedSender(cluster[1], 1000) # Max queue depth
- receiver = NumberedReceiver(cluster[2], sender)
+ sender = NumberedSender(cluster[0], 1000) # Max queue depth
+ receiver = NumberedReceiver(cluster[0], sender)
receiver.start()
sender.start()
+ # Wait for sender & receiver to get up and running
+ retry(lambda: receiver.received > 0)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration()
i = 0
while time.time() < endtime:
+ sender.sender.assert_running()
+ receiver.receiver.assert_running()
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ for b in cluster[i:]: b.ready()
ErrorGenerator(b)
time.sleep(5)
sender.stop()
@@ -782,7 +816,7 @@ class LongTests(BrokerTest):
args += ["--log-enable=trace+:management"]
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
- cluster = self.cluster(3, args)
+ cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed
clients = [] # Per-broker list of clients that only connect to one broker.
mclients = [] # Management clients that connect to every broker in the cluster.
@@ -806,7 +840,7 @@ class LongTests(BrokerTest):
endtime = time.time() + self.duration()
# For long duration, first run is a quarter of the duration.
- runtime = max(5, self.duration() / 4.0)
+ runtime = min(5.0, self.duration() / 3.0)
alive = 0 # First live cluster member
for i in range(len(cluster)): start_clients(cluster[i])
start_mclients(cluster[alive])
@@ -817,7 +851,7 @@ class LongTests(BrokerTest):
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
# Kill the first broker, expect the clients to fail.
b = cluster[alive]
- b.expect = EXPECT_EXIT_FAIL
+ b.ready()
b.kill()
# Stop the brokers clients and all the mclients.
for c in clients[alive] + mclients:
@@ -827,11 +861,15 @@ class LongTests(BrokerTest):
mclients = []
# Start another broker and clients
alive += 1
- cluster.start()
+ cluster.start(expect=EXPECT_EXIT_FAIL)
+ cluster[-1].ready() # Wait till its ready
start_clients(cluster[-1])
start_mclients(cluster[alive])
for c in chain(mclients, *clients):
c.stop()
+ for b in cluster[alive:]:
+ b.ready() # Verify still alive
+ b.kill()
# Verify that logs are consistent
cluster_test_logs.verify_logs()
@@ -844,7 +882,7 @@ class LongTests(BrokerTest):
end = time.time() + self.duration()
while (time.time() < end): # Get a management interval
for i in xrange(1000): cluster[0].connect().close()
- cluster_test_logs.verify_logs()
+ cluster_test_logs.verify_logs()
def test_flowlimit_failover(self):
"""Test fail-over during continuous send-receive with flow control
@@ -853,34 +891,149 @@ class LongTests(BrokerTest):
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
- #for b in cluster: ErrorGenerator(b)
+ for b in cluster: b.ready() # Wait for brokers to be ready
# create a queue with rather draconian flow control settings
ssn0 = cluster[0].connect().session()
s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
- receiver = NumberedReceiver(cluster[2])
+ receiver = NumberedReceiver(cluster[0])
receiver.start()
- senders = [NumberedSender(cluster[i]) for i in range(1,3)]
+ senders = [NumberedSender(cluster[0]) for i in range(1,3)]
for s in senders:
s.start()
+ # Wait for senders & receiver to get up and running
+ retry(lambda: receiver.received > 2*senders)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration();
i = 0
while time.time() < endtime:
+ for s in senders: s.sender.assert_running()
+ receiver.receiver.assert_running()
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
- #ErrorGenerator(b)
time.sleep(5)
- #b = cluster[0]
- #b.startQmf()
for s in senders:
s.stop()
receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
+ def test_ttl_failover(self):
+ """Test that messages with TTL don't cause problems in a cluster with failover"""
+
+ class Client(StoppableThread):
+
+ def __init__(self, broker):
+ StoppableThread.__init__(self)
+ self.connection = broker.connect(reconnect=True)
+ self.auto_fetch_reconnect_urls(self.connection)
+ self.session = self.connection.session()
+
+ def auto_fetch_reconnect_urls(self, conn):
+ """Replacment for qpid.messaging.util version which is noisy"""
+ ssn = conn.session("auto-fetch-reconnect-urls")
+ rcv = ssn.receiver("amq.failover")
+ rcv.capacity = 10
+
+ def main():
+ while True:
+ try:
+ msg = rcv.fetch()
+ qpid.messaging.util.set_reconnect_urls(conn, msg)
+ ssn.acknowledge(msg, sync=False)
+ except messaging.exceptions.LinkClosed: return
+ except messaging.exceptions.ConnectionError: return
+
+ thread = Thread(name="auto-fetch-reconnect-urls", target=main)
+ thread.setDaemon(True)
+ thread.start()
+
+ def stop(self):
+ StoppableThread.stop(self)
+ self.connection.detach()
+
+ class Sender(Client):
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.sent = 0 # Number of messages _reliably_ sent.
+ self.sender = self.session.sender(address, capacity=1000)
+
+ def send_counted(self, ttl):
+ self.sender.send(Message(str(self.sent), ttl=ttl))
+ self.sent += 1
+
+ def run(self):
+ while not self.stopped:
+ choice = random.randint(0,4)
+ if choice == 0: self.send_counted(None) # No ttl
+ elif choice == 1: self.send_counted(100000) # Large ttl
+ else: # Small ttl, might expire
+ self.sender.send(Message("", ttl=random.random()/10))
+ self.sender.send(Message("z"), sync=True) # Chaser.
+
+ class Receiver(Client):
+
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.received = 0 # Number of non-empty (reliable) messages received.
+ self.receiver = self.session.receiver(address, capacity=1000)
+ def run(self):
+ try:
+ while True:
+ m = self.receiver.fetch(1)
+ if m.content == "z": break
+ if m.content: # Ignore unreliable messages
+ # Ignore duplicates
+ if int(m.content) == self.received: self.received += 1
+ except Exception,e: self.error = e
+
+ # def test_ttl_failover
+
+ # Original cluster will all be killed so expect exit with failure
+ # Set small purge interval.
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"])
+ for b in cluster: b.ready() # Wait for brokers to be ready
+
+ # Python client failover produces noisy WARN logs, disable temporarily
+ logger = logging.getLogger()
+ log_level = logger.getEffectiveLevel()
+ logger.setLevel(logging.ERROR)
+ try:
+ # Start sender and receiver threads
+ receiver = Receiver(cluster[0], "q;{create:always}")
+ receiver.start()
+ sender = Sender(cluster[0], "q;{create:always}")
+ sender.start()
+ # Wait for sender & receiver to get up and running
+ retry(lambda: receiver.received > 0)
+
+ # Kill brokers in a cycle.
+ endtime = time.time() + self.duration()
+ runtime = min(5.0, self.duration() / 4.0)
+ i = 0
+ while time.time() < endtime:
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ b.ready()
+ time.sleep(runtime)
+ sender.stop()
+ receiver.stop()
+ for b in cluster[i:]:
+ b.ready() # Check it didn't crash
+ b.kill()
+ self.assertEqual(sender.sent, receiver.received)
+ cluster_test_logs.verify_logs()
+
+ finally:
+ # Detach to avoid slow reconnect attempts during shut-down if test fails.
+ sender.connection.detach()
+ receiver.connection.detach()
+ logger.setLevel(log_level)
class StoreTests(BrokerTest):
"""
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 201b06a4a2..49bdecdd95 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -649,10 +649,17 @@ class FederationTests(TestBase010):
self.verify_cleanup()
- def test_dynamic_headers(self):
+ def test_dynamic_headers_any(self):
+ self.do_test_dynamic_headers('any')
+
+ def test_dynamic_headers_all(self):
+ self.do_test_dynamic_headers('all')
+
+
+ def do_test_dynamic_headers(self, match_mode):
session = self.session
r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
- r_session = r_conn.session("test_dynamic_headers")
+ r_session = r_conn.session("test_dynamic_headers_%s" % match_mode)
session.exchange_declare(exchange="fed.headers", type="headers")
r_session.exchange_declare(exchange="fed.headers", type="headers")
@@ -671,7 +678,7 @@ class FederationTests(TestBase010):
sleep(5)
session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
- session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':'any', 'class':'first'})
+ session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':match_mode, 'class':'first'})
self.subscribe(queue="fed1", destination="f1")
queue = session.incoming("f1")