summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2009-10-28 17:30:04 +0000
committerKim van der Riet <kpvdr@apache.org>2009-10-28 17:30:04 +0000
commitdb5a5ae4ae773642c02c76a0c5c2723e8342f5a6 (patch)
treefd879e23af901700536661c01c3ebfd192a5074f
parent195476aa648061eb8866046cca0cfd7869d9212b (diff)
downloadqpid-python-db5a5ae4ae773642c02c76a0c5c2723e8342f5a6.tar.gz
Fixed problem where broker does not persist the alternate exchange setting to the store for durable exchanges, and these settings were being lost upon recovery.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@830687 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp18
-rw-r--r--cpp/src/qpid/broker/Exchange.h5
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h2
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp1
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp6
5 files changed, 28 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 9468ba0b62..4ebe126969 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -25,6 +25,7 @@
#include "qpid/management/ManagementAgent.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/DeliverableMessage.h"
using namespace qpid::broker;
@@ -202,16 +203,19 @@ Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffe
{
string name;
string type;
+ string altName;
FieldTable args;
buffer.getShortString(name);
bool durable(buffer.getOctet());
buffer.getShortString(type);
+ buffer.getShortString(altName);
buffer.get(args);
try {
Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first;
exch->sequenceNo = args.getAsInt64(qpidSequenceCounter);
+ exch->alternateName.assign(altName);
return exch;
} catch (const UnknownExchangeTypeException&) {
QPID_LOG(warning, "Could not create exchange " << name << "; type " << type << " is not recognised");
@@ -224,6 +228,7 @@ void Exchange::encode(Buffer& buffer) const
buffer.putShortString(name);
buffer.putOctet(durable);
buffer.putShortString(getType());
+ buffer.putShortString(alternate.get() ? alternate->getName() : string(""));
if (args.isSet(qpidSequenceCounter))
args.setInt64(std::string(qpidSequenceCounter),sequenceNo);
buffer.put(args);
@@ -234,9 +239,22 @@ uint32_t Exchange::encodedSize() const
return name.size() + 1/*short string size*/
+ 1 /*durable*/
+ getType().size() + 1/*short string size*/
+ + (alternate.get() ? alternate->getName().size() : 0) + 1/*short string size*/
+ args.encodedSize();
}
+void Exchange::recoveryComplete(ExchangeRegistry& exchanges)
+{
+ if (!alternateName.empty()) {
+ try {
+ Exchange::shared_ptr ae = exchanges.get(alternateName);
+ setAlternate(ae);
+ } catch (const NotFoundException&) {
+ QPID_LOG(warning, "Could not set alternate exchange \"" << alternateName << "\": does not exist.");
+ }
+ }
+}
+
ManagementObject* Exchange::GetManagementObject (void) const
{
return (ManagementObject*) mgmtExchange;
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 9bea376c28..d630f7ae24 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -59,6 +59,7 @@ public:
private:
const std::string name;
const bool durable;
+ std::string alternateName;
boost::shared_ptr<Exchange> alternate;
uint32_t alternateUsers;
mutable uint64_t persistenceId;
@@ -172,6 +173,10 @@ public:
void removeDynamicBridge(DynamicBridge* db);
virtual bool supportsDynamicBinding() { return false; }
Broker* getBroker() const { return broker; }
+ /**
+ * Notify exchange that recovery has completed.
+ */
+ void recoveryComplete(ExchangeRegistry& exchanges);
protected:
qpid::sys::Mutex bridgeLock;
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index c20a53e598..2b75a8f3cf 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -71,7 +71,7 @@ class ExchangeRegistry{
/** Call f for each exchange in the registry. */
template <class F> void eachExchange(F f) const {
- qpid::sys::RWlock::ScopedWlock l(lock);
+ qpid::sys::RWlock::ScopedRlock l(lock);
for (ExchangeMap::const_iterator i = exchanges.begin(); i != exchanges.end(); ++i)
f(i->second);
}
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 5bc4cdf960..1b36b2b110 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -157,6 +157,7 @@ void RecoveryManagerImpl::recoveryComplete()
{
//notify all queues
queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1));
+ exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges)));
}
RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold)
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 2ac6d66e62..65eadda68a 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -88,13 +88,13 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
try{
std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
if (response.second) {
- if (durable) {
- getBroker().getStore().create(*response.first, args);
- }
if (alternate) {
response.first->setAlternate(alternate);
alternate->incAlternateUsers();
}
+ if (durable) {
+ getBroker().getStore().create(*response.first, args);
+ }
} else {
checkType(response.first, type);
checkAlternate(response.first, alternate);