summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2009-10-29 20:41:22 +0000
committerKim van der Riet <kpvdr@apache.org>2009-10-29 20:41:22 +0000
commit1439f10f1f98edc05a12c64b44005f9c2fbc124e (patch)
treed9d3de114e5df5d5aa373a6eed342c168b0c2e70 /cpp/src/qpid
parent93e2dbae88c8960b82f46e499b5d91c20547702f (diff)
downloadqpid-python-1439f10f1f98edc05a12c64b44005f9c2fbc124e.tar.gz
Fixed problem of queue alternate-exchange property not being persisted on persistent queues, and on recovery this property is lost. No tests exist as yet for this.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@831082 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp19
-rw-r--r--cpp/src/qpid/broker/Queue.h3
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp4
3 files changed, 21 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index b4160edbd6..6496840b0b 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -917,6 +917,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
+ buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
buffer.put(settings);
if (policy.get()) {
buffer.put(*policy);
@@ -925,7 +926,9 @@ void Queue::encode(Buffer& buffer) const
uint32_t Queue::encodedSize() const
{
- return name.size() + 1/*short string size octet*/ + settings.encodedSize()
+ return name.size() + 1/*short string size octet*/
+ + (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */
+ + settings.encodedSize()
+ (policy.get() ? (*policy).encodedSize() : 0);
}
@@ -933,7 +936,10 @@ Queue::shared_ptr Queue::decode ( QueueRegistry& queues, Buffer& buffer, bool re
{
string name;
buffer.getShortString(name);
+ string altExch;
+ buffer.getShortString(altExch);
std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+ result.first->alternateExchangeName.assign(altExch);
buffer.get(result.first->settings);
result.first->configure(result.first->settings, recovering );
if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) {
@@ -1048,8 +1054,17 @@ void Queue::setQueueEventManager(QueueEvents& mgr)
eventMgr = &mgr;
}
-void Queue::recoveryComplete()
+void Queue::recoveryComplete(ExchangeRegistry& exchanges)
{
+ // set the alternate exchange
+ if (!alternateExchangeName.empty()) {
+ try {
+ Exchange::shared_ptr ae = exchanges.get(alternateExchangeName);
+ setAlternateExchange(ae);
+ } catch (const NotFoundException&) {
+ QPID_LOG(warning, "Could not set alternate exchange \"" << alternateExchangeName << "\" on queue \"" << name << "\": exchange does not exist.");
+ }
+ }
//process any pending dequeues
for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
pendingDequeues.clear();
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index a2dad96fe0..5d9fbebc7d 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -97,6 +97,7 @@ namespace qpid {
std::auto_ptr<QueuePolicy> policy;
bool policyExceeded;
QueueBindings bindings;
+ std::string alternateExchangeName;
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
@@ -332,7 +333,7 @@ namespace qpid {
/**
* Notify queue that recovery has completed.
*/
- void recoveryComplete();
+ void recoveryComplete(ExchangeRegistry& exchanges);
// For cluster update
QueueListeners& getListeners();
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 1b36b2b110..9c3a2b5571 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -155,8 +155,8 @@ RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer
void RecoveryManagerImpl::recoveryComplete()
{
- //notify all queues
- queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1));
+ //notify all queues and exchanges
+ queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1, boost::ref(exchanges)));
exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges)));
}