summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-08-13 19:18:03 +0000
committerTed Ross <tross@apache.org>2009-08-13 19:18:03 +0000
commit0fcbf08aeb53760b51f627d93135d9385b1aff30 (patch)
tree5573991d3fed8b3a33a76a96aeefc6c5749440a0
parent50084def8309c02b4c646670cf1b4d1fe161ddd7 (diff)
downloadqpid-python-0fcbf08aeb53760b51f627d93135d9385b1aff30.tar.gz
QPID-1971 - bind in fedOpReorigen mode is not threadsafe for TopicExchange
Applied patch from Ken Giusti. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@803996 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.cpp22
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.cpp23
-rwxr-xr-xqpid/cpp/src/tests/federation.py124
3 files changed, 159 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
index 454b497f11..b9f24dee5f 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
@@ -92,12 +92,24 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
if (bk.fedBinding.count() == 0)
unbind(queue, routingKey, 0);
} else if (fedOp == fedOpReorigin) {
- for (std::map<string, BoundKey>::iterator iter = bindings.begin();
- iter != bindings.end(); iter++) {
- const BoundKey& bk = iter->second;
- if (bk.fedBinding.hasLocal()) {
- propagateFedOp(iter->first, string(), fedOpBind, string());
+ /** gather up all the keys that need rebinding in a local vector
+ * while holding the lock. Then propagate once the lock is
+ * released
+ */
+ std::vector<std::string> keys2prop;
+ {
+ Mutex::ScopedLock l(lock);
+ for (Bindings::iterator iter = bindings.begin();
+ iter != bindings.end(); iter++) {
+ const BoundKey& bk = iter->second;
+ if (bk.fedBinding.hasLocal()) {
+ keys2prop.push_back(iter->first);
+ }
}
+ } /* lock dropped */
+ for (std::vector<std::string>::const_iterator key = keys2prop.begin();
+ key != keys2prop.end(); key++) {
+ propagateFedOp( *key, string(), fedOpBind, string());
}
}
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
index 6c8832179f..6bf0b104ea 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
@@ -225,12 +225,25 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
if (reallyUnbind)
unbind(queue, routingPattern, 0);
} else if (fedOp == fedOpReorigin) {
- for (BindingMap::iterator iter = bindings.begin();
- iter != bindings.end(); iter++) {
- const BoundKey& bk = iter->second;
- if (bk.fedBinding.hasLocal()) {
- propagateFedOp(iter->first, string(), fedOpBind, string());
+ /** gather up all the keys that need rebinding in a local vector
+ * while holding the lock. Then propagate once the lock is
+ * released
+ */
+ std::vector<std::string> keys2prop;
+ {
+ RWlock::ScopedRlock l(lock);
+ for (BindingMap::iterator iter = bindings.begin();
+ iter != bindings.end(); iter++) {
+ const BoundKey& bk = iter->second;
+
+ if (bk.fedBinding.hasLocal()) {
+ keys2prop.push_back(iter->first);
+ }
}
+ } /* lock dropped */
+ for (std::vector<std::string>::const_iterator key = keys2prop.begin();
+ key != keys2prop.end(); key++) {
+ propagateFedOp( *key, string(), fedOpBind, string());
}
}
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index d8b91061a8..daf831f3ed 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -455,7 +455,131 @@ class FederationTests(TestBase010):
sleep(3)
self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+ def test_dynamic_topic_reorigin(self):
+ session = self.session
+ r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+ r_session = r_conn.session("test_dynamic_topic_reorigin")
+
+ session.exchange_declare(exchange="fed.topic_reorigin", type="topic")
+ r_session.exchange_declare(exchange="fed.topic_reorigin", type="topic")
+
+ session.exchange_declare(exchange="fed.topic_reorigin_2", type="topic")
+ r_session.exchange_declare(exchange="fed.topic_reorigin_2", type="topic")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ session.queue_declare(queue="fed2", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed2", exchange="fed.topic_reorigin_2", binding_key="ft-key.one.#")
+ self.subscribe(queue="fed2", destination="f2")
+ queue2 = session.incoming("f2")
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.topic_reorigin", "fed.topic_reorigin", "", "", "", False, False, True, 0)
+ self.assertEqual(result.status, 0)
+ result = link.bridge(False, "fed.topic_reorigin_2", "fed.topic_reorigin_2", "", "", "", False, False, True, 0)
+ self.assertEqual(result.status, 0)
+
+ bridge = qmf.getObjects(_class="bridge")[0]
+ bridge2 = qmf.getObjects(_class="bridge")[1]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="fed.topic_reorigin", binding_key="ft-key.#")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ for i in range(1, 11):
+ dp = r_session.delivery_properties(routing_key="ft-key.one.two")
+ r_session.message_transfer(destination="fed.topic_reorigin", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = bridge2.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+
+ def test_dynamic_direct_reorigin(self):
+ session = self.session
+ r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+ r_session = r_conn.session("test_dynamic_direct_reorigin")
+
+ session.exchange_declare(exchange="fed.direct_reorigin", type="direct")
+ r_session.exchange_declare(exchange="fed.direct_reorigin", type="direct")
+
+ session.exchange_declare(exchange="fed.direct_reorigin_2", type="direct")
+ r_session.exchange_declare(exchange="fed.direct_reorigin_2", type="direct")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ session.queue_declare(queue="fed2", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed2", exchange="fed.direct_reorigin_2", binding_key="ft-key.two")
+ self.subscribe(queue="fed2", destination="f2")
+ queue2 = session.incoming("f2")
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.direct_reorigin", "fed.direct_reorigin", "", "", "", False, False, True, 0)
+ self.assertEqual(result.status, 0)
+ result = link.bridge(False, "fed.direct_reorigin_2", "fed.direct_reorigin_2", "", "", "", False, False, True, 0)
+ self.assertEqual(result.status, 0)
+
+ bridge = qmf.getObjects(_class="bridge")[0]
+ bridge2 = qmf.getObjects(_class="bridge")[1]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="fed.direct_reorigin", binding_key="ft-key.one")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ for i in range(1, 11):
+ dp = r_session.delivery_properties(routing_key="ft-key.one")
+ r_session.message_transfer(destination="fed.direct_reorigin", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = bridge2.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+
+
def getProperty(self, msg, name):
for h in msg.headers:
if hasattr(h, name): return getattr(h, name)