summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp19
1 files changed, 17 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index b4160edbd6..6496840b0b 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -917,6 +917,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
+ buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
buffer.put(settings);
if (policy.get()) {
buffer.put(*policy);
@@ -925,7 +926,9 @@ void Queue::encode(Buffer& buffer) const
uint32_t Queue::encodedSize() const
{
- return name.size() + 1/*short string size octet*/ + settings.encodedSize()
+ return name.size() + 1/*short string size octet*/
+ + (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */
+ + settings.encodedSize()
+ (policy.get() ? (*policy).encodedSize() : 0);
}
@@ -933,7 +936,10 @@ Queue::shared_ptr Queue::decode ( QueueRegistry& queues, Buffer& buffer, bool re
{
string name;
buffer.getShortString(name);
+ string altExch;
+ buffer.getShortString(altExch);
std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+ result.first->alternateExchangeName.assign(altExch);
buffer.get(result.first->settings);
result.first->configure(result.first->settings, recovering );
if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) {
@@ -1048,8 +1054,17 @@ void Queue::setQueueEventManager(QueueEvents& mgr)
eventMgr = &mgr;
}
-void Queue::recoveryComplete()
+void Queue::recoveryComplete(ExchangeRegistry& exchanges)
{
+ // set the alternate exchange
+ if (!alternateExchangeName.empty()) {
+ try {
+ Exchange::shared_ptr ae = exchanges.get(alternateExchangeName);
+ setAlternateExchange(ae);
+ } catch (const NotFoundException&) {
+ QPID_LOG(warning, "Could not set alternate exchange \"" << alternateExchangeName << "\" on queue \"" << name << "\": exchange does not exist.");
+ }
+ }
//process any pending dequeues
for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
pendingDequeues.clear();