summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-01-29 22:58:22 +0000
committerTed Ross <tross@apache.org>2010-01-29 22:58:22 +0000
commit726b23f43478a85b961365e4de3a9302a261f6b3 (patch)
treed1403b4c5ee3fcb1681c502cdd95277bbcb768bd /cpp
parent73ad8a2de26f0c7830aacb608b4b6ea44914f683 (diff)
downloadqpid-python-726b23f43478a85b961365e4de3a9302a261f6b3.tar.gz
QPID-2365 - Reroute messages from a queue feature
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@904654 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp65
-rw-r--r--cpp/src/qpid/broker/Queue.h2
2 files changed, 58 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 6e813e936d..4ff3cf6d2c 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -37,6 +37,7 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
+#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
#include <iostream>
#include <algorithm>
@@ -518,17 +519,43 @@ void Queue::purgeExpired()
* purge_request == 0 then purge all messages
* == N then purge N messages from queue
* Sometimes purge_request == 1 to unblock the top of queue
+ *
+ * The dest exchange may be supplied to re-route messages through the exchange.
+ * It is safe to re-route messages such that they arrive back on the same queue,
+ * even if the queue is ordered by priority.
*/
-uint32_t Queue::purge(const uint32_t purge_request){
+uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
+{
Mutex::ScopedLock locker(messageLock);
uint32_t purge_count = purge_request; // only comes into play if >0
+ std::deque<DeliverableMessage> rerouteQueue;
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
while((!purge_request || purge_count--) && !messages.empty()) {
+ if (dest.get()) {
+ //
+ // If there is a destination exchange, stage the messages onto a reroute queue
+ // so they don't wind up getting purged more than once.
+ //
+ DeliverableMessage msg(getFront().payload);
+ rerouteQueue.push_back(msg);
+ }
popAndDequeue();
count++;
}
+
+ //
+ // Re-route purged messages into the destination exchange. Note that there's no need
+ // to test dest.get() here because if it is NULL, the rerouteQueue will be empty.
+ //
+ while (!rerouteQueue.empty()) {
+ DeliverableMessage msg(rerouteQueue.front());
+ rerouteQueue.pop_front();
+ dest->route(msg, msg.getMessage().getRoutingKey(),
+ msg.getMessage().getApplicationHeaders());
+ }
+
return count;
}
@@ -1038,18 +1065,40 @@ ManagementObject* Queue::GetManagementObject (void) const
return (ManagementObject*) mgmtObject;
}
-Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string&)
+Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
- switch (methodId)
- {
- case _qmf::Queue::METHOD_PURGE :
- _qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args;
- purge (iargs.i_request);
- status = Manageable::STATUS_OK;
+ switch (methodId) {
+ case _qmf::Queue::METHOD_PURGE :
+ {
+ _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
+ purge(purgeArgs.i_request);
+ status = Manageable::STATUS_OK;
+ }
+ break;
+
+ case _qmf::Queue::METHOD_REROUTE :
+ {
+ _qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args;
+ boost::shared_ptr<Exchange> dest;
+ if (rerouteArgs.i_useAltExchange)
+ dest = alternateExchange;
+ else {
+ try {
+ dest = broker->getExchanges().get(rerouteArgs.i_exchange);
+ } catch(const std::exception&) {
+ status = Manageable::STATUS_PARAMETER_INVALID;
+ etext = "Exchange not found";
+ break;
+ }
+ }
+
+ purge(rerouteArgs.i_request, dest);
+ status = Manageable::STATUS_OK;
+ }
break;
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 513403d535..0b690e7cc5 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -211,7 +211,7 @@ namespace qpid {
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
+ uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
QPID_BROKER_EXTERN void purgeExpired();
//move qty # of messages to destination Queue destq