diff options
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r-- | qpid/cpp/src/tests/BrokerMgmtAgent.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ClientSessionTest.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ForkedBroker.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 72 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/tests/SocketProxy.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 15 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_test_logs.py | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 185 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 13 |
10 files changed, 277 insertions, 31 deletions
diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp index d0c6668b72..1d5289dc90 100644 --- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp +++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp @@ -599,13 +599,12 @@ namespace qpid { // populate the agent with multiple test objects const size_t objCount = 50; std::vector<TestManageable *> tmv; - uint32_t objLen; for (size_t i = 0; i < objCount; i++) { std::stringstream key; key << "testobj-" << i; TestManageable *tm = new TestManageable(agent, key.str()); - objLen = tm->GetManagementObject()->writePropertiesSize(); + (void) tm->GetManagementObject()->writePropertiesSize(); agent->addObject(tm->GetManagementObject(), key.str()); tmv.push_back(tm); } diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 939f8f2b88..3c0cff7350 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -271,8 +271,12 @@ QPID_AUTO_TEST_CASE(testOpenFailure) { QPID_AUTO_TEST_CASE(testPeriodicExpiration) { Broker::Options opts; opts.queueCleanInterval = 1; + opts.queueFlowStopRatio = 0; + opts.queueFlowResumeRatio = 0; ClientSessionFixture fix(opts); - fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); + FieldTable args; + args.setInt("qpid.max_count",10); + fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); for (uint i = 0; i < 10; i++) { Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); @@ -283,6 +287,7 @@ QPID_AUTO_TEST_CASE(testPeriodicExpiration) { BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u); qpid::sys::sleep(2); BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u); + fix.session.messageTransfer(arg::content=Message("Message_11", "my-queue"));//ensure policy is also updated } QPID_AUTO_TEST_CASE(testExpirationOnPop) { diff --git a/qpid/cpp/src/tests/ForkedBroker.cpp b/qpid/cpp/src/tests/ForkedBroker.cpp index 53eaa7e1ce..10674b5175 100644 --- a/qpid/cpp/src/tests/ForkedBroker.cpp +++ b/qpid/cpp/src/tests/ForkedBroker.cpp @@ -68,8 +68,7 @@ ForkedBroker::~ForkedBroker() { } if (!dataDir.empty()) { - int unused_ret; // Suppress warnings about ignoring return value. - unused_ret = ::system(("rm -rf "+dataDir).c_str()); + (void) ::system(("rm -rf "+dataDir).c_str()); } } diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 6aa4c63ed7..fae45a94d0 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -992,6 +992,78 @@ QPID_AUTO_TEST_CASE(testTtlForever) BOOST_CHECK(in.getTtl() == Duration::FOREVER); } +QPID_AUTO_TEST_CASE(testExclusiveTopicSubscriber) +{ + TopicFixture fix; + std::string address = (boost::format("%1%; { link: { name: 'my-subscription', x-declare: { auto-delete: true, exclusive: true }}}") % fix.topic).str(); + Sender sender = fix.session.createSender(fix.topic); + Receiver receiver1 = fix.session.createReceiver(address); + { + ScopedSuppressLogging sl; + try { + fix.session.createReceiver(address); + fix.session.sync(); + BOOST_FAIL("Expected exception."); + } catch (const MessagingException& /*e*/) {} + } +} + +QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber) +{ + TopicFixture fix; + std::string address = (boost::format("%1%; {node:{type:topic}, link:{name:'my-subscription', x-declare:{auto-delete:true, exclusive:false}}}") % fix.topic).str(); + Receiver receiver1 = fix.session.createReceiver(address); + Receiver receiver2 = fix.session.createReceiver(address); + Sender sender = fix.session.createSender(fix.topic); + sender.send(Message("one"), true); + Message in = receiver1.fetch(Duration::IMMEDIATE); + BOOST_CHECK_EQUAL(in.getContent(), std::string("one")); + sender.send(Message("two"), true); + in = receiver2.fetch(Duration::IMMEDIATE); + BOOST_CHECK_EQUAL(in.getContent(), std::string("two")); + fix.session.acknowledge(); +} + +QPID_AUTO_TEST_CASE(testAcknowledgeUpTo) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + const uint count(20); + for (uint i = 0; i < count; ++i) { + sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); + } + + Session other = fix.connection.createSession(); + Receiver receiver = other.createReceiver(fix.queue); + std::vector<Message> messages; + for (uint i = 0; i < count; ++i) { + Message msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + messages.push_back(msg); + } + const uint batch = 10; + other.acknowledgeUpTo(messages[batch-1]);//acknowledge first 10 messages only + + messages.clear(); + other.sync(); + other.close(); + + other = fix.connection.createSession(); + receiver = other.createReceiver(fix.queue); + Message msg; + for (uint i = 0; i < (count-batch); ++i) { + msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); + } + other.acknowledgeUpTo(msg); + other.sync(); + other.close(); + + Message m; + //check queue is empty + BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index a752e3afec..6372aa93c3 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -681,7 +681,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) { addMessagesToQueue(10, queue); BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); ::usleep(300*1000); - queue.purgeExpired(); + queue.purgeExpired(0); BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); } @@ -692,7 +692,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) { addMessagesToQueue(10, *queue, 200, 400); BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); - QueueCleaner cleaner(queues, timer); + QueueCleaner cleaner(queues, &timer); cleaner.start(100 * qpid::sys::TIME_MSEC); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index 0c6f39d62e..d195f11aa9 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -35,6 +35,8 @@ #include "qpid/sys/Mutex.h" #include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> + namespace qpid { namespace tests { @@ -62,7 +64,7 @@ class SocketProxy : private qpid::sys::Runnable : closed(false), joined(true), port(listener.listen()), dropClient(), dropServer() { - client.connect(host, connectPort); + client.connect(host, boost::lexical_cast<std::string>(connectPort)); joined = false; thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this)); } diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index a19dd305e5..0415a667a2 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -157,8 +157,13 @@ class Popen(subprocess.Popen): try: self.kill() # Just make sure its dead except: pass elif self.expect == EXPECT_RUNNING: - try: self.kill() - except: self.unexpected("expected running, exit code %d" % self.wait()) + if self.poll() != None: + self.unexpected("expected running, exit code %d" % self.returncode) + else: + try: + self.kill() + except Exception,e: + self.unexpected("exception from kill: %s" % str(e)) else: retry(lambda: self.poll() is not None) if self.returncode is None: # Still haven't stopped @@ -544,6 +549,7 @@ class NumberedSender(Thread): "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", + "--connection-options", "{reconnect:true}", "--content-stdin" ], expect=EXPECT_RUNNING, @@ -562,6 +568,7 @@ class NumberedSender(Thread): try: self.sent = 0 while not self.stopped: + self.sender.assert_running() if self.max: self.condition.acquire() while not self.stopped and self.sent - self.received > self.max: @@ -604,6 +611,7 @@ class NumberedReceiver(Thread): "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", + "--connection-options", "{reconnect:true}", "--forever" ], expect=EXPECT_RUNNING, @@ -611,15 +619,16 @@ class NumberedReceiver(Thread): self.lock = Lock() self.error = None self.sender = sender + self.received = 0 def read_message(self): return int(self.receiver.stdout.readline()) def run(self): try: - self.received = 0 m = self.read_message() while m != -1: + self.receiver.assert_running() assert(m <= self.received) # Check for missing messages if (m == self.received): # Ignore duplicates self.received += 1 diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py index 9f7d1e2f6c..a0ce8fb9c3 100755 --- a/qpid/cpp/src/tests/cluster_test_logs.py +++ b/qpid/cpp/src/tests/cluster_test_logs.py @@ -54,7 +54,7 @@ def filter_log(log): 'caught up', 'active for links|Passivating links|Activating links', 'info Connection.* connected to', # UpdateClient connection - 'warning Connection [\d+ [0-9.:]+] closed', # UpdateClient connection + 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection 'warning Broker closed connection: 200, OK', 'task late', 'task overran', diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index caa41fa001..c1d9103f08 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -18,11 +18,12 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs +import os, signal, sys, time, imp, re, subprocess, glob, random, logging +import cluster_test_logs from qpid import datatypes, messaging from brokertest import * from qpid.harness import Skipped -from qpid.messaging import Message, Empty, Disposition, REJECTED +from qpid.messaging import Message, Empty, Disposition, REJECTED, util from threading import Thread, Lock, Condition from logging import getLogger from itertools import chain @@ -96,9 +97,15 @@ class ShortTests(BrokerTest): destination="amq.direct", message=qpid.datatypes.Message(props, "content")) + # Try message with TTL and differnet headers/properties + cluster[0].send_message("q", Message(durable=True, ttl=100000)) + cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000)) + cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000)) + # Now update a new member and compare their dumps. cluster.start(args=["--test-store-dump", "updatee.dump"]) assert readfile("direct.dump") == readfile("updatee.dump") + os.remove("direct.dump") os.remove("updatee.dump") @@ -253,6 +260,7 @@ acl allow all all client was attached. """ args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] + # First broker will be killed. cluster0 = self.cluster(1, args=args) cluster1 = self.cluster(1, args=args) assert 0 == subprocess.call( @@ -287,6 +295,7 @@ acl allow all all # Force a change of elder cluster0.start() + cluster0[0].expect=EXPECT_EXIT_FAIL # About to die. cluster0[0].kill() time.sleep(2) # Allow a management interval to pass. # Verify logs are consistent @@ -525,7 +534,7 @@ acl allow all all receiver.wait() q_obj.update() assert not q_obj.flowStopped - assert q_obj.msgDepth == 0 + self.assertEqual(q_obj.msgDepth, 0) # verify that the sender has become unblocked sender.join(timeout=5) @@ -685,6 +694,25 @@ acl allow all all self.assert_browse(s1, "q", ["foo"]) + def test_ttl_consistent(self): + """Ensure we don't get inconsistent errors with message that have TTL very close together""" + messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)] + messages.append(Message("x")) + cluster = self.cluster(2) + sender = cluster[0].connect().session().sender("q;{create:always}") + + def fetch(b): + receiver = b.connect().session().receiver("q;{create:always}") + while receiver.fetch().content != "x": pass + + for m in messages: sender.send(m, sync=False) + for m in messages: sender.send(m, sync=False) + fetch(cluster[0]) + fetch(cluster[1]) + for m in messages: sender.send(m, sync=False) + cluster.start() + fetch(cluster[2]) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -697,22 +725,28 @@ class LongTests(BrokerTest): # Original cluster will all be killed so expect exit with failure cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) + for b in cluster: b.ready() # Wait for brokers to be ready for b in cluster: ErrorGenerator(b) # Start sender and receiver threads cluster[0].declare_queue("test-queue") - sender = NumberedSender(cluster[1], 1000) # Max queue depth - receiver = NumberedReceiver(cluster[2], sender) + sender = NumberedSender(cluster[0], 1000) # Max queue depth + receiver = NumberedReceiver(cluster[0], sender) receiver.start() sender.start() + # Wait for sender & receiver to get up and running + retry(lambda: receiver.received > 0) # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration() i = 0 while time.time() < endtime: + sender.sender.assert_running() + receiver.receiver.assert_running() cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) + for b in cluster[i:]: b.ready() ErrorGenerator(b) time.sleep(5) sender.stop() @@ -782,7 +816,7 @@ class LongTests(BrokerTest): args += ["--log-enable=trace+:management"] # Use store if present. if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] - cluster = self.cluster(3, args) + cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed clients = [] # Per-broker list of clients that only connect to one broker. mclients = [] # Management clients that connect to every broker in the cluster. @@ -806,7 +840,7 @@ class LongTests(BrokerTest): endtime = time.time() + self.duration() # For long duration, first run is a quarter of the duration. - runtime = max(5, self.duration() / 4.0) + runtime = min(5.0, self.duration() / 3.0) alive = 0 # First live cluster member for i in range(len(cluster)): start_clients(cluster[i]) start_mclients(cluster[alive]) @@ -817,7 +851,7 @@ class LongTests(BrokerTest): for b in cluster[alive:]: b.ready() # Check if a broker crashed. # Kill the first broker, expect the clients to fail. b = cluster[alive] - b.expect = EXPECT_EXIT_FAIL + b.ready() b.kill() # Stop the brokers clients and all the mclients. for c in clients[alive] + mclients: @@ -827,11 +861,15 @@ class LongTests(BrokerTest): mclients = [] # Start another broker and clients alive += 1 - cluster.start() + cluster.start(expect=EXPECT_EXIT_FAIL) + cluster[-1].ready() # Wait till its ready start_clients(cluster[-1]) start_mclients(cluster[alive]) for c in chain(mclients, *clients): c.stop() + for b in cluster[alive:]: + b.ready() # Verify still alive + b.kill() # Verify that logs are consistent cluster_test_logs.verify_logs() @@ -844,7 +882,7 @@ class LongTests(BrokerTest): end = time.time() + self.duration() while (time.time() < end): # Get a management interval for i in xrange(1000): cluster[0].connect().close() - cluster_test_logs.verify_logs() + cluster_test_logs.verify_logs() def test_flowlimit_failover(self): """Test fail-over during continuous send-receive with flow control @@ -853,34 +891,149 @@ class LongTests(BrokerTest): # Original cluster will all be killed so expect exit with failure cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) - #for b in cluster: ErrorGenerator(b) + for b in cluster: b.ready() # Wait for brokers to be ready # create a queue with rather draconian flow control settings ssn0 = cluster[0].connect().session() s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}") - receiver = NumberedReceiver(cluster[2]) + receiver = NumberedReceiver(cluster[0]) receiver.start() - senders = [NumberedSender(cluster[i]) for i in range(1,3)] + senders = [NumberedSender(cluster[0]) for i in range(1,3)] for s in senders: s.start() + # Wait for senders & receiver to get up and running + retry(lambda: receiver.received > 2*senders) # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration(); i = 0 while time.time() < endtime: + for s in senders: s.sender.assert_running() + receiver.receiver.assert_running() + for b in cluster[i:]: b.ready() # Check if any broker crashed. cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) - #ErrorGenerator(b) time.sleep(5) - #b = cluster[0] - #b.startQmf() for s in senders: s.stop() receiver.stop() for i in range(i, len(cluster)): cluster[i].kill() + def test_ttl_failover(self): + """Test that messages with TTL don't cause problems in a cluster with failover""" + + class Client(StoppableThread): + + def __init__(self, broker): + StoppableThread.__init__(self) + self.connection = broker.connect(reconnect=True) + self.auto_fetch_reconnect_urls(self.connection) + self.session = self.connection.session() + + def auto_fetch_reconnect_urls(self, conn): + """Replacment for qpid.messaging.util version which is noisy""" + ssn = conn.session("auto-fetch-reconnect-urls") + rcv = ssn.receiver("amq.failover") + rcv.capacity = 10 + + def main(): + while True: + try: + msg = rcv.fetch() + qpid.messaging.util.set_reconnect_urls(conn, msg) + ssn.acknowledge(msg, sync=False) + except messaging.exceptions.LinkClosed: return + except messaging.exceptions.ConnectionError: return + + thread = Thread(name="auto-fetch-reconnect-urls", target=main) + thread.setDaemon(True) + thread.start() + + def stop(self): + StoppableThread.stop(self) + self.connection.detach() + + class Sender(Client): + def __init__(self, broker, address): + Client.__init__(self, broker) + self.sent = 0 # Number of messages _reliably_ sent. + self.sender = self.session.sender(address, capacity=1000) + + def send_counted(self, ttl): + self.sender.send(Message(str(self.sent), ttl=ttl)) + self.sent += 1 + + def run(self): + while not self.stopped: + choice = random.randint(0,4) + if choice == 0: self.send_counted(None) # No ttl + elif choice == 1: self.send_counted(100000) # Large ttl + else: # Small ttl, might expire + self.sender.send(Message("", ttl=random.random()/10)) + self.sender.send(Message("z"), sync=True) # Chaser. + + class Receiver(Client): + + def __init__(self, broker, address): + Client.__init__(self, broker) + self.received = 0 # Number of non-empty (reliable) messages received. + self.receiver = self.session.receiver(address, capacity=1000) + def run(self): + try: + while True: + m = self.receiver.fetch(1) + if m.content == "z": break + if m.content: # Ignore unreliable messages + # Ignore duplicates + if int(m.content) == self.received: self.received += 1 + except Exception,e: self.error = e + + # def test_ttl_failover + + # Original cluster will all be killed so expect exit with failure + # Set small purge interval. + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"]) + for b in cluster: b.ready() # Wait for brokers to be ready + + # Python client failover produces noisy WARN logs, disable temporarily + logger = logging.getLogger() + log_level = logger.getEffectiveLevel() + logger.setLevel(logging.ERROR) + try: + # Start sender and receiver threads + receiver = Receiver(cluster[0], "q;{create:always}") + receiver.start() + sender = Sender(cluster[0], "q;{create:always}") + sender.start() + # Wait for sender & receiver to get up and running + retry(lambda: receiver.received > 0) + + # Kill brokers in a cycle. + endtime = time.time() + self.duration() + runtime = min(5.0, self.duration() / 4.0) + i = 0 + while time.time() < endtime: + for b in cluster[i:]: b.ready() # Check if any broker crashed. + cluster[i].kill() + i += 1 + b = cluster.start(expect=EXPECT_EXIT_FAIL) + b.ready() + time.sleep(runtime) + sender.stop() + receiver.stop() + for b in cluster[i:]: + b.ready() # Check it didn't crash + b.kill() + self.assertEqual(sender.sent, receiver.received) + cluster_test_logs.verify_logs() + + finally: + # Detach to avoid slow reconnect attempts during shut-down if test fails. + sender.connection.detach() + receiver.connection.detach() + logger.setLevel(log_level) class StoreTests(BrokerTest): """ diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 201b06a4a2..49bdecdd95 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -649,10 +649,17 @@ class FederationTests(TestBase010): self.verify_cleanup() - def test_dynamic_headers(self): + def test_dynamic_headers_any(self): + self.do_test_dynamic_headers('any') + + def test_dynamic_headers_all(self): + self.do_test_dynamic_headers('all') + + + def do_test_dynamic_headers(self, match_mode): session = self.session r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) - r_session = r_conn.session("test_dynamic_headers") + r_session = r_conn.session("test_dynamic_headers_%s" % match_mode) session.exchange_declare(exchange="fed.headers", type="headers") r_session.exchange_declare(exchange="fed.headers", type="headers") @@ -671,7 +678,7 @@ class FederationTests(TestBase010): sleep(5) session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) - session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':'any', 'class':'first'}) + session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':match_mode, 'class':'first'}) self.subscribe(queue="fed1", destination="f1") queue = session.incoming("f1") |