diff options
author | Ted Ross <tross@apache.org> | 2009-08-13 19:18:03 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-08-13 19:18:03 +0000 |
commit | 0fcbf08aeb53760b51f627d93135d9385b1aff30 (patch) | |
tree | 5573991d3fed8b3a33a76a96aeefc6c5749440a0 | |
parent | 50084def8309c02b4c646670cf1b4d1fe161ddd7 (diff) | |
download | qpid-python-0fcbf08aeb53760b51f627d93135d9385b1aff30.tar.gz |
QPID-1971 - bind in fedOpReorigen mode is not threadsafe for TopicExchange
Applied patch from Ken Giusti.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@803996 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/DirectExchange.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TopicExchange.cpp | 23 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 124 |
3 files changed, 159 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 454b497f11..b9f24dee5f 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -92,12 +92,24 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con if (bk.fedBinding.count() == 0) unbind(queue, routingKey, 0); } else if (fedOp == fedOpReorigin) { - for (std::map<string, BoundKey>::iterator iter = bindings.begin(); - iter != bindings.end(); iter++) { - const BoundKey& bk = iter->second; - if (bk.fedBinding.hasLocal()) { - propagateFedOp(iter->first, string(), fedOpBind, string()); + /** gather up all the keys that need rebinding in a local vector + * while holding the lock. Then propagate once the lock is + * released + */ + std::vector<std::string> keys2prop; + { + Mutex::ScopedLock l(lock); + for (Bindings::iterator iter = bindings.begin(); + iter != bindings.end(); iter++) { + const BoundKey& bk = iter->second; + if (bk.fedBinding.hasLocal()) { + keys2prop.push_back(iter->first); + } } + } /* lock dropped */ + for (std::vector<std::string>::const_iterator key = keys2prop.begin(); + key != keys2prop.end(); key++) { + propagateFedOp( *key, string(), fedOpBind, string()); } } diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index 6c8832179f..6bf0b104ea 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -225,12 +225,25 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons if (reallyUnbind) unbind(queue, routingPattern, 0); } else if (fedOp == fedOpReorigin) { - for (BindingMap::iterator iter = bindings.begin(); - iter != bindings.end(); iter++) { - const BoundKey& bk = iter->second; - if (bk.fedBinding.hasLocal()) { - propagateFedOp(iter->first, string(), fedOpBind, string()); + /** gather up all the keys that need rebinding in a local vector + * while holding the lock. Then propagate once the lock is + * released + */ + std::vector<std::string> keys2prop; + { + RWlock::ScopedRlock l(lock); + for (BindingMap::iterator iter = bindings.begin(); + iter != bindings.end(); iter++) { + const BoundKey& bk = iter->second; + + if (bk.fedBinding.hasLocal()) { + keys2prop.push_back(iter->first); + } } + } /* lock dropped */ + for (std::vector<std::string>::const_iterator key = keys2prop.begin(); + key != keys2prop.end(); key++) { + propagateFedOp( *key, string(), fedOpBind, string()); } } diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index d8b91061a8..daf831f3ed 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -455,7 +455,131 @@ class FederationTests(TestBase010): sleep(3) self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) self.assertEqual(len(qmf.getObjects(_class="link")), 0) + + def test_dynamic_topic_reorigin(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_topic_reorigin") + + session.exchange_declare(exchange="fed.topic_reorigin", type="topic") + r_session.exchange_declare(exchange="fed.topic_reorigin", type="topic") + + session.exchange_declare(exchange="fed.topic_reorigin_2", type="topic") + r_session.exchange_declare(exchange="fed.topic_reorigin_2", type="topic") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + session.queue_declare(queue="fed2", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed2", exchange="fed.topic_reorigin_2", binding_key="ft-key.one.#") + self.subscribe(queue="fed2", destination="f2") + queue2 = session.incoming("f2") + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.topic_reorigin", "fed.topic_reorigin", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + result = link.bridge(False, "fed.topic_reorigin_2", "fed.topic_reorigin_2", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + bridge2 = qmf.getObjects(_class="bridge")[1] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.topic_reorigin", binding_key="ft-key.#") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="ft-key.one.two") + r_session.message_transfer(destination="fed.topic_reorigin", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = bridge2.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) + + + def test_dynamic_direct_reorigin(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_direct_reorigin") + + session.exchange_declare(exchange="fed.direct_reorigin", type="direct") + r_session.exchange_declare(exchange="fed.direct_reorigin", type="direct") + + session.exchange_declare(exchange="fed.direct_reorigin_2", type="direct") + r_session.exchange_declare(exchange="fed.direct_reorigin_2", type="direct") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + session.queue_declare(queue="fed2", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed2", exchange="fed.direct_reorigin_2", binding_key="ft-key.two") + self.subscribe(queue="fed2", destination="f2") + queue2 = session.incoming("f2") + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.direct_reorigin", "fed.direct_reorigin", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + result = link.bridge(False, "fed.direct_reorigin_2", "fed.direct_reorigin_2", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + bridge2 = qmf.getObjects(_class="bridge")[1] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.direct_reorigin", binding_key="ft-key.one") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="ft-key.one") + r_session.message_transfer(destination="fed.direct_reorigin", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = bridge2.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) + + + def getProperty(self, msg, name): for h in msg.headers: if hasattr(h, name): return getattr(h, name) |