diff options
author | Ted Ross <tross@apache.org> | 2010-01-29 22:58:22 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-01-29 22:58:22 +0000 |
commit | 726b23f43478a85b961365e4de3a9302a261f6b3 (patch) | |
tree | d1403b4c5ee3fcb1681c502cdd95277bbcb768bd /cpp | |
parent | 73ad8a2de26f0c7830aacb608b4b6ea44914f683 (diff) | |
download | qpid-python-726b23f43478a85b961365e4de3a9302a261f6b3.tar.gz |
QPID-2365 - Reroute messages from a queue feature
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@904654 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 65 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 2 |
2 files changed, 58 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 6e813e936d..4ff3cf6d2c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -37,6 +37,7 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" +#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" #include <iostream> #include <algorithm> @@ -518,17 +519,43 @@ void Queue::purgeExpired() * purge_request == 0 then purge all messages * == N then purge N messages from queue * Sometimes purge_request == 1 to unblock the top of queue + * + * The dest exchange may be supplied to re-route messages through the exchange. + * It is safe to re-route messages such that they arrive back on the same queue, + * even if the queue is ordered by priority. */ -uint32_t Queue::purge(const uint32_t purge_request){ +uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest) +{ Mutex::ScopedLock locker(messageLock); uint32_t purge_count = purge_request; // only comes into play if >0 + std::deque<DeliverableMessage> rerouteQueue; uint32_t count = 0; // Either purge them all or just the some (purge_count) while the queue isn't empty. while((!purge_request || purge_count--) && !messages.empty()) { + if (dest.get()) { + // + // If there is a destination exchange, stage the messages onto a reroute queue + // so they don't wind up getting purged more than once. + // + DeliverableMessage msg(getFront().payload); + rerouteQueue.push_back(msg); + } popAndDequeue(); count++; } + + // + // Re-route purged messages into the destination exchange. Note that there's no need + // to test dest.get() here because if it is NULL, the rerouteQueue will be empty. + // + while (!rerouteQueue.empty()) { + DeliverableMessage msg(rerouteQueue.front()); + rerouteQueue.pop_front(); + dest->route(msg, msg.getMessage().getRoutingKey(), + msg.getMessage().getApplicationHeaders()); + } + return count; } @@ -1038,18 +1065,40 @@ ManagementObject* Queue::GetManagementObject (void) const return (ManagementObject*) mgmtObject; } -Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string&) +Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); - switch (methodId) - { - case _qmf::Queue::METHOD_PURGE : - _qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args; - purge (iargs.i_request); - status = Manageable::STATUS_OK; + switch (methodId) { + case _qmf::Queue::METHOD_PURGE : + { + _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args; + purge(purgeArgs.i_request); + status = Manageable::STATUS_OK; + } + break; + + case _qmf::Queue::METHOD_REROUTE : + { + _qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args; + boost::shared_ptr<Exchange> dest; + if (rerouteArgs.i_useAltExchange) + dest = alternateExchange; + else { + try { + dest = broker->getExchanges().get(rerouteArgs.i_exchange); + } catch(const std::exception&) { + status = Manageable::STATUS_PARAMETER_INVALID; + etext = "Exchange not found"; + break; + } + } + + purge(rerouteArgs.i_request, dest); + status = Manageable::STATUS_OK; + } break; } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 513403d535..0b690e7cc5 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -211,7 +211,7 @@ namespace qpid { bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages + uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages QPID_BROKER_EXTERN void purgeExpired(); //move qty # of messages to destination Queue destq |