summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-04-19 17:46:03 +0000
committerAlan Conway <aconway@apache.org>2011-04-19 17:46:03 +0000
commit300bfee5f5de83f9f114caa21968f8ad918c44a3 (patch)
tree85ad46b51541991fa05900731b1b403a2018b693
parent33e1a4b98e7d5ca7e569d03e918d92a7a56a5b14 (diff)
downloadqpid-python-300bfee5f5de83f9f114caa21968f8ad918c44a3.tar.gz
QPID-3215: cached exchange reference can cause cluster inconsistencies if exchange is deleted/recreated
SemanticState::route() uses a simple cache variable to avoid looking up the exchange for every message. However if the exchange in question is deleted, even if then recreated, this can cause inconsistencies in a cluster. Even in a stand-alone broker messages can be routed by a deleted exchange because of the cache. Fix is to mark the exchange deleted and check the status when using the cached exchange. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1095144 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h18
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp2
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py48
5 files changed, 67 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 3f14d61144..569c82fd7a 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -161,7 +161,7 @@ void Exchange::routeIVE(){
Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
name(_name), durable(false), persistenceId(0), sequence(false),
- sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
+ sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
{
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
index 7199162ddc..e853155951 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.h
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -82,15 +82,15 @@ protected:
private:
Exchange* parent;
};
-
+
typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList;
typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
void doRoute(Deliverable& msg, ConstBindingList b);
void routeIVE();
-
+
struct MatchQueue {
- const boost::shared_ptr<Queue> queue;
+ const boost::shared_ptr<Queue> queue;
MatchQueue(boost::shared_ptr<Queue> q);
bool operator()(Exchange::Binding::shared_ptr b);
};
@@ -195,7 +195,7 @@ public:
virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
-
+
//PersistableExchange:
QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
@@ -228,14 +228,18 @@ public:
bool routeWithAlternate(Deliverable& message);
+ void destroy() { destroyed = true; }
+ bool isDestroyed() const { return destroyed; }
+
protected:
qpid::sys::Mutex bridgeLock;
std::vector<DynamicBridge*> bridgeVector;
Broker* broker;
+ bool destroyed;
QPID_BROKER_EXTERN virtual void handleHelloRequest();
void propagateFedOp(const std::string& routingKey, const std::string& tags,
- const std::string& op, const std::string& origin,
+ const std::string& op, const std::string& origin,
qpid::framing::FieldTable* extra_args=0);
};
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 99b121cbce..1c8d26c4f7 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,7 +39,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
return declare(name, type, false, FieldTable());
}
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
bool durable, const FieldTable& args){
RWlock::ScopedWlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
@@ -61,7 +61,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
}else{
FunctionMap::iterator i = factory.find(type);
if (i == factory.end()) {
- throw UnknownExchangeTypeException();
+ throw UnknownExchangeTypeException();
} else {
exchange = i->second(name, durable, args, parent, broker);
}
@@ -82,6 +82,7 @@ void ExchangeRegistry::destroy(const string& name){
RWlock::ScopedWlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i != exchanges.end()) {
+ i->second->destroy();
exchanges.erase(i);
}
}
@@ -104,7 +105,7 @@ void ExchangeRegistry::registerType(const std::string& type, FactoryFunction f)
}
-namespace
+namespace
{
const std::string empty;
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index ba1f989f7c..ce86253f4a 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -462,7 +462,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName)
+ if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
cacheExchange->setProperties(msg);
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 73c20d451d..12f7a2ca9a 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -479,6 +479,54 @@ acl allow all all
for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)])
self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)])
+ def test_deleted_exchange(self):
+ """QPID-3215: cached exchange reference can cause cluster inconsistencies
+ if exchange is deleted/recreated
+ Verify stand-alone case
+ """
+ cluster = self.cluster()
+ # Verify we do not route message via an exchange that has been destroyed.
+ cluster.start()
+ s0 = cluster[0].connect().session()
+ self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
+ self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
+ send0 = s0.sender("ex/foo")
+ send0.send("foo")
+ self.assert_browse(s0, "q", ["foo"])
+ self.evaluate_address(s0, "ex;{delete:always}")
+ try:
+ send0.send("bar") # Should fail, exchange is deleted.
+ self.fail("Expected not-found exception")
+ except qpid.messaging.NotFound: pass
+ # FIXME aconway 2011-04-19: s0 is broken, new session
+ self.assert_browse(cluster[0].connect().session(), "q", ["foo"])
+
+ def test_deleted_exchange_inconsistent(self):
+ """QPID-3215: cached exchange reference can cause cluster inconsistencies
+ if exchange is deleted/recreated
+
+ Verify cluster inconsistency.
+ """
+ cluster = self.cluster()
+ cluster.start()
+ s0 = cluster[0].connect().session()
+ self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
+ self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
+ send0 = s0.sender("ex/foo")
+ send0.send("foo")
+ self.assert_browse(s0, "q", ["foo"])
+
+ cluster.start()
+ s1 = cluster[1].connect().session()
+ self.evaluate_address(s0, "ex;{delete:always}")
+ try:
+ send0.send("bar")
+ self.fail("Expected not-found exception")
+ except qpid.messaging.NotFound: pass
+
+ self.assert_browse(s1, "q", ["foo"])
+
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):