diff options
author | Alan Conway <aconway@apache.org> | 2011-04-19 17:46:03 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-04-19 17:46:03 +0000 |
commit | 300bfee5f5de83f9f114caa21968f8ad918c44a3 (patch) | |
tree | 85ad46b51541991fa05900731b1b403a2018b693 | |
parent | 33e1a4b98e7d5ca7e569d03e918d92a7a56a5b14 (diff) | |
download | qpid-python-300bfee5f5de83f9f114caa21968f8ad918c44a3.tar.gz |
QPID-3215: cached exchange reference can cause cluster inconsistencies if exchange is deleted/recreated
SemanticState::route() uses a simple cache variable to avoid looking
up the exchange for every message. However if the exchange in question
is deleted, even if then recreated, this can cause inconsistencies in
a cluster.
Even in a stand-alone broker messages can be routed by a deleted
exchange because of the cache.
Fix is to mark the exchange deleted and check the status when using
the cached exchange.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1095144 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.h | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 48 |
5 files changed, 67 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 3f14d61144..569c82fd7a 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -161,7 +161,7 @@ void Exchange::routeIVE(){ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), ive(false), mgmtExchange(0), broker(b) + sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false) { if (parent != 0 && broker != 0) { diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index 7199162ddc..e853155951 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -82,15 +82,15 @@ protected: private: Exchange* parent; }; - + typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList; typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList; void doRoute(Deliverable& msg, ConstBindingList b); void routeIVE(); - + struct MatchQueue { - const boost::shared_ptr<Queue> queue; + const boost::shared_ptr<Queue> queue; MatchQueue(boost::shared_ptr<Queue> q); bool operator()(Exchange::Binding::shared_ptr b); }; @@ -195,7 +195,7 @@ public: virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0; QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&); virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; - + //PersistableExchange: QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } @@ -228,14 +228,18 @@ public: bool routeWithAlternate(Deliverable& message); + void destroy() { destroyed = true; } + bool isDestroyed() const { return destroyed; } + protected: qpid::sys::Mutex bridgeLock; std::vector<DynamicBridge*> bridgeVector; Broker* broker; + bool destroyed; QPID_BROKER_EXTERN virtual void handleHelloRequest(); void propagateFedOp(const std::string& routingKey, const std::string& tags, - const std::string& op, const std::string& origin, + const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0); }; diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp index 99b121cbce..1c8d26c4f7 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,7 +39,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c return declare(name, type, false, FieldTable()); } -pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, +pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, bool durable, const FieldTable& args){ RWlock::ScopedWlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); @@ -61,7 +61,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c }else{ FunctionMap::iterator i = factory.find(type); if (i == factory.end()) { - throw UnknownExchangeTypeException(); + throw UnknownExchangeTypeException(); } else { exchange = i->second(name, durable, args, parent, broker); } @@ -82,6 +82,7 @@ void ExchangeRegistry::destroy(const string& name){ RWlock::ScopedWlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i != exchanges.end()) { + i->second->destroy(); exchanges.erase(i); } } @@ -104,7 +105,7 @@ void ExchangeRegistry::registerType(const std::string& type, FactoryFunction f) } -namespace +namespace { const std::string empty; } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index ba1f989f7c..ce86253f4a 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -462,7 +462,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); std::string exchangeName = msg->getExchangeName(); - if (!cacheExchange || cacheExchange->getName() != exchangeName) + if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) cacheExchange = session.getBroker().getExchanges().get(exchangeName); cacheExchange->setProperties(msg); diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 73c20d451d..12f7a2ca9a 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -479,6 +479,54 @@ acl allow all all for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)]) self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)]) + def test_deleted_exchange(self): + """QPID-3215: cached exchange reference can cause cluster inconsistencies + if exchange is deleted/recreated + Verify stand-alone case + """ + cluster = self.cluster() + # Verify we do not route message via an exchange that has been destroyed. + cluster.start() + s0 = cluster[0].connect().session() + self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") + self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") + send0 = s0.sender("ex/foo") + send0.send("foo") + self.assert_browse(s0, "q", ["foo"]) + self.evaluate_address(s0, "ex;{delete:always}") + try: + send0.send("bar") # Should fail, exchange is deleted. + self.fail("Expected not-found exception") + except qpid.messaging.NotFound: pass + # FIXME aconway 2011-04-19: s0 is broken, new session + self.assert_browse(cluster[0].connect().session(), "q", ["foo"]) + + def test_deleted_exchange_inconsistent(self): + """QPID-3215: cached exchange reference can cause cluster inconsistencies + if exchange is deleted/recreated + + Verify cluster inconsistency. + """ + cluster = self.cluster() + cluster.start() + s0 = cluster[0].connect().session() + self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") + self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") + send0 = s0.sender("ex/foo") + send0.send("foo") + self.assert_browse(s0, "q", ["foo"]) + + cluster.start() + s1 = cluster[1].connect().session() + self.evaluate_address(s0, "ex;{delete:always}") + try: + send0.send("bar") + self.fail("Expected not-found exception") + except qpid.messaging.NotFound: pass + + self.assert_browse(s1, "q", ["foo"]) + + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): |