diff options
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 11 | ||||
-rwxr-xr-x | cpp/src/tests/federation.py | 51 |
2 files changed, 60 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 6e53ef5fd2..66ace42cfa 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/TopicExchange.h" +#include "qpid/log/Statement.h" #include <algorithm> @@ -214,6 +215,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } + QPID_LOG(debug, "Bound [" << routingPattern << "] to queue " << queue->getName() + << " (origin=" << fedOrigin << ")"); } } else if (fedOp == fedOpUnbind) { { @@ -274,6 +277,7 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKe if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); } + QPID_LOG(debug, "Unbound [" << routingKey << "] from queue " << queue->getName()); if (propagate) propagateFedOp(routingKey, string(), fedOpUnbind, string()); @@ -294,16 +298,19 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern) void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) { - Binding::vector mb; BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); PreRoute pr(msg, this); + std::set<std::string> qSet; { RWlock::ScopedRlock l(lock); for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (match(i->first, routingKey)) { Binding::vector& qv(i->second.bindingVector); for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){ - b->push_back(*j); + // do not duplicate queues on the binding list + if (qSet.insert(j->get()->queue->getName()).second) { + b->push_back(*j); + } } } } diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 15fa858c68..5f269c8363 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -738,6 +738,57 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_dynamic_topic_nodup(self): + """Verify that a message whose routing key matches more than one + binding does not get duplicated to the same queue. + """ + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_topic_nodup") + + session.exchange_declare(exchange="fed.topic", type="topic") + r_session.exchange_declare(exchange="fed.topic", type="topic") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="red.*") + session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="*.herring") + + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="red.herring") + r_session.message_transfer(destination="fed.topic", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + + def getProperty(self, msg, name): for h in msg.headers: if hasattr(h, name): return getattr(h, name) |