summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-04-06 18:18:44 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-04-06 18:18:44 +0000
commit7e080f3ef470dcec94079f3d7e59edbf4c791844 (patch)
treeda103251d43bc432098bf977224caf437b661cab
parente70e9f746a726130b848f45563b28a015b4e3fa2 (diff)
downloadqpid-python-7e080f3ef470dcec94079f3d7e59edbf4c791844.tar.gz
QPID-2482: prevent duplication of messages that match multiple binding keys on a topic exchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@931257 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp11
-rwxr-xr-xcpp/src/tests/federation.py51
2 files changed, 60 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 6e53ef5fd2..66ace42cfa 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/TopicExchange.h"
+#include "qpid/log/Statement.h"
#include <algorithm>
@@ -214,6 +215,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
+ QPID_LOG(debug, "Bound [" << routingPattern << "] to queue " << queue->getName()
+ << " (origin=" << fedOrigin << ")");
}
} else if (fedOp == fedOpUnbind) {
{
@@ -274,6 +277,7 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKe
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
}
+ QPID_LOG(debug, "Unbound [" << routingKey << "] from queue " << queue->getName());
if (propagate)
propagateFedOp(routingKey, string(), fedOpUnbind, string());
@@ -294,16 +298,19 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern)
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
{
- Binding::vector mb;
BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
PreRoute pr(msg, this);
+ std::set<std::string> qSet;
{
RWlock::ScopedRlock l(lock);
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (match(i->first, routingKey)) {
Binding::vector& qv(i->second.bindingVector);
for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){
- b->push_back(*j);
+ // do not duplicate queues on the binding list
+ if (qSet.insert(j->get()->queue->getName()).second) {
+ b->push_back(*j);
+ }
}
}
}
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index 15fa858c68..5f269c8363 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -738,6 +738,57 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+ def test_dynamic_topic_nodup(self):
+ """Verify that a message whose routing key matches more than one
+ binding does not get duplicated to the same queue.
+ """
+ session = self.session
+ r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+ r_session = r_conn.session("test_dynamic_topic_nodup")
+
+ session.exchange_declare(exchange="fed.topic", type="topic")
+ r_session.exchange_declare(exchange="fed.topic", type="topic")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="red.*")
+ session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="*.herring")
+
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ for i in range(1, 11):
+ dp = r_session.delivery_properties(routing_key="red.herring")
+ r_session.message_transfer(destination="fed.topic", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ self.verify_cleanup()
+
+
+
def getProperty(self, msg, name):
for h in msg.headers:
if hasattr(h, name): return getattr(h, name)