diff options
author | Kim van der Riet <kpvdr@apache.org> | 2009-10-28 17:30:04 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2009-10-28 17:30:04 +0000 |
commit | db5a5ae4ae773642c02c76a0c5c2723e8342f5a6 (patch) | |
tree | fd879e23af901700536661c01c3ebfd192a5074f | |
parent | 195476aa648061eb8866046cca0cfd7869d9212b (diff) | |
download | qpid-python-db5a5ae4ae773642c02c76a0c5c2723e8342f5a6.tar.gz |
Fixed problem where broker does not persist the alternate exchange setting to the store for durable exchanges, and these settings were being lost upon recovery.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@830687 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 6 |
5 files changed, 28 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 9468ba0b62..4ebe126969 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -25,6 +25,7 @@ #include "qpid/management/ManagementAgent.h" #include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/broker/DeliverableMessage.h" using namespace qpid::broker; @@ -202,16 +203,19 @@ Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffe { string name; string type; + string altName; FieldTable args; buffer.getShortString(name); bool durable(buffer.getOctet()); buffer.getShortString(type); + buffer.getShortString(altName); buffer.get(args); try { Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first; exch->sequenceNo = args.getAsInt64(qpidSequenceCounter); + exch->alternateName.assign(altName); return exch; } catch (const UnknownExchangeTypeException&) { QPID_LOG(warning, "Could not create exchange " << name << "; type " << type << " is not recognised"); @@ -224,6 +228,7 @@ void Exchange::encode(Buffer& buffer) const buffer.putShortString(name); buffer.putOctet(durable); buffer.putShortString(getType()); + buffer.putShortString(alternate.get() ? alternate->getName() : string("")); if (args.isSet(qpidSequenceCounter)) args.setInt64(std::string(qpidSequenceCounter),sequenceNo); buffer.put(args); @@ -234,9 +239,22 @@ uint32_t Exchange::encodedSize() const return name.size() + 1/*short string size*/ + 1 /*durable*/ + getType().size() + 1/*short string size*/ + + (alternate.get() ? alternate->getName().size() : 0) + 1/*short string size*/ + args.encodedSize(); } +void Exchange::recoveryComplete(ExchangeRegistry& exchanges) +{ + if (!alternateName.empty()) { + try { + Exchange::shared_ptr ae = exchanges.get(alternateName); + setAlternate(ae); + } catch (const NotFoundException&) { + QPID_LOG(warning, "Could not set alternate exchange \"" << alternateName << "\": does not exist."); + } + } +} + ManagementObject* Exchange::GetManagementObject (void) const { return (ManagementObject*) mgmtExchange; diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 9bea376c28..d630f7ae24 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -59,6 +59,7 @@ public: private: const std::string name; const bool durable; + std::string alternateName; boost::shared_ptr<Exchange> alternate; uint32_t alternateUsers; mutable uint64_t persistenceId; @@ -172,6 +173,10 @@ public: void removeDynamicBridge(DynamicBridge* db); virtual bool supportsDynamicBinding() { return false; } Broker* getBroker() const { return broker; } + /** + * Notify exchange that recovery has completed. + */ + void recoveryComplete(ExchangeRegistry& exchanges); protected: qpid::sys::Mutex bridgeLock; diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index c20a53e598..2b75a8f3cf 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -71,7 +71,7 @@ class ExchangeRegistry{ /** Call f for each exchange in the registry. */ template <class F> void eachExchange(F f) const { - qpid::sys::RWlock::ScopedWlock l(lock); + qpid::sys::RWlock::ScopedRlock l(lock); for (ExchangeMap::const_iterator i = exchanges.begin(); i != exchanges.end(); ++i) f(i->second); } diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 5bc4cdf960..1b36b2b110 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -157,6 +157,7 @@ void RecoveryManagerImpl::recoveryComplete() { //notify all queues queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1)); + exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges))); } RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold) diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 2ac6d66e62..65eadda68a 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -88,13 +88,13 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const try{ std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args); if (response.second) { - if (durable) { - getBroker().getStore().create(*response.first, args); - } if (alternate) { response.first->setAlternate(alternate); alternate->incAlternateUsers(); } + if (durable) { + getBroker().getStore().create(*response.first, args); + } } else { checkType(response.first, type); checkAlternate(response.first, alternate); |