diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 65 |
1 files changed, 57 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 6e813e936d..4ff3cf6d2c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -37,6 +37,7 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" +#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" #include <iostream> #include <algorithm> @@ -518,17 +519,43 @@ void Queue::purgeExpired() * purge_request == 0 then purge all messages * == N then purge N messages from queue * Sometimes purge_request == 1 to unblock the top of queue + * + * The dest exchange may be supplied to re-route messages through the exchange. + * It is safe to re-route messages such that they arrive back on the same queue, + * even if the queue is ordered by priority. */ -uint32_t Queue::purge(const uint32_t purge_request){ +uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest) +{ Mutex::ScopedLock locker(messageLock); uint32_t purge_count = purge_request; // only comes into play if >0 + std::deque<DeliverableMessage> rerouteQueue; uint32_t count = 0; // Either purge them all or just the some (purge_count) while the queue isn't empty. while((!purge_request || purge_count--) && !messages.empty()) { + if (dest.get()) { + // + // If there is a destination exchange, stage the messages onto a reroute queue + // so they don't wind up getting purged more than once. + // + DeliverableMessage msg(getFront().payload); + rerouteQueue.push_back(msg); + } popAndDequeue(); count++; } + + // + // Re-route purged messages into the destination exchange. Note that there's no need + // to test dest.get() here because if it is NULL, the rerouteQueue will be empty. + // + while (!rerouteQueue.empty()) { + DeliverableMessage msg(rerouteQueue.front()); + rerouteQueue.pop_front(); + dest->route(msg, msg.getMessage().getRoutingKey(), + msg.getMessage().getApplicationHeaders()); + } + return count; } @@ -1038,18 +1065,40 @@ ManagementObject* Queue::GetManagementObject (void) const return (ManagementObject*) mgmtObject; } -Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string&) +Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); - switch (methodId) - { - case _qmf::Queue::METHOD_PURGE : - _qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args; - purge (iargs.i_request); - status = Manageable::STATUS_OK; + switch (methodId) { + case _qmf::Queue::METHOD_PURGE : + { + _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args; + purge(purgeArgs.i_request); + status = Manageable::STATUS_OK; + } + break; + + case _qmf::Queue::METHOD_REROUTE : + { + _qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args; + boost::shared_ptr<Exchange> dest; + if (rerouteArgs.i_useAltExchange) + dest = alternateExchange; + else { + try { + dest = broker->getExchanges().get(rerouteArgs.i_exchange); + } catch(const std::exception&) { + status = Manageable::STATUS_PARAMETER_INVALID; + etext = "Exchange not found"; + break; + } + } + + purge(rerouteArgs.i_request, dest); + status = Manageable::STATUS_OK; + } break; } |