summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-01-29 22:58:22 +0000
committerTed Ross <tross@apache.org>2010-01-29 22:58:22 +0000
commit645461d645d5d6720bd4be7229e254787a109367 (patch)
treed1dd30a7102ffa9dfc293c52439b76b88286993c
parent6f16d9d4fa398056a817726ca8512f356422353d (diff)
downloadqpid-python-645461d645d5d6720bd4be7229e254787a109367.tar.gz
QPID-2365 - Reroute messages from a queue feature
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@904654 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp65
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/python/tests_0-10/management.py69
-rw-r--r--qpid/specs/management-schema.xml8
4 files changed, 134 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 6e813e936d..4ff3cf6d2c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -37,6 +37,7 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
+#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
#include <iostream>
#include <algorithm>
@@ -518,17 +519,43 @@ void Queue::purgeExpired()
* purge_request == 0 then purge all messages
* == N then purge N messages from queue
* Sometimes purge_request == 1 to unblock the top of queue
+ *
+ * The dest exchange may be supplied to re-route messages through the exchange.
+ * It is safe to re-route messages such that they arrive back on the same queue,
+ * even if the queue is ordered by priority.
*/
-uint32_t Queue::purge(const uint32_t purge_request){
+uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
+{
Mutex::ScopedLock locker(messageLock);
uint32_t purge_count = purge_request; // only comes into play if >0
+ std::deque<DeliverableMessage> rerouteQueue;
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
while((!purge_request || purge_count--) && !messages.empty()) {
+ if (dest.get()) {
+ //
+ // If there is a destination exchange, stage the messages onto a reroute queue
+ // so they don't wind up getting purged more than once.
+ //
+ DeliverableMessage msg(getFront().payload);
+ rerouteQueue.push_back(msg);
+ }
popAndDequeue();
count++;
}
+
+ //
+ // Re-route purged messages into the destination exchange. Note that there's no need
+ // to test dest.get() here because if it is NULL, the rerouteQueue will be empty.
+ //
+ while (!rerouteQueue.empty()) {
+ DeliverableMessage msg(rerouteQueue.front());
+ rerouteQueue.pop_front();
+ dest->route(msg, msg.getMessage().getRoutingKey(),
+ msg.getMessage().getApplicationHeaders());
+ }
+
return count;
}
@@ -1038,18 +1065,40 @@ ManagementObject* Queue::GetManagementObject (void) const
return (ManagementObject*) mgmtObject;
}
-Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string&)
+Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
- switch (methodId)
- {
- case _qmf::Queue::METHOD_PURGE :
- _qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args;
- purge (iargs.i_request);
- status = Manageable::STATUS_OK;
+ switch (methodId) {
+ case _qmf::Queue::METHOD_PURGE :
+ {
+ _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
+ purge(purgeArgs.i_request);
+ status = Manageable::STATUS_OK;
+ }
+ break;
+
+ case _qmf::Queue::METHOD_REROUTE :
+ {
+ _qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args;
+ boost::shared_ptr<Exchange> dest;
+ if (rerouteArgs.i_useAltExchange)
+ dest = alternateExchange;
+ else {
+ try {
+ dest = broker->getExchanges().get(rerouteArgs.i_exchange);
+ } catch(const std::exception&) {
+ status = Manageable::STATUS_PARAMETER_INVALID;
+ etext = "Exchange not found";
+ break;
+ }
+ }
+
+ purge(rerouteArgs.i_request, dest);
+ status = Manageable::STATUS_OK;
+ }
break;
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 513403d535..0b690e7cc5 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -211,7 +211,7 @@ namespace qpid {
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
+ uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
QPID_BROKER_EXTERN void purgeExpired();
//move qty # of messages to destination Queue destq
diff --git a/qpid/python/tests_0-10/management.py b/qpid/python/tests_0-10/management.py
index 9dd03bbda4..677645fa2c 100644
--- a/qpid/python/tests_0-10/management.py
+++ b/qpid/python/tests_0-10/management.py
@@ -241,6 +241,75 @@ class ManagementTest (TestBase010):
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,0)
+ def test_reroute_queue(self):
+ """
+ Test ability to reroute messages from the head of a queue.
+ Need to test moving all, 1 (top message) and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up test queue"
+ session.exchange_declare(exchange="alt.direct1", type="direct")
+ session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key")
+ session.exchange_declare(exchange="alt.direct2", type="direct")
+ session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key")
+ session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1")
+ session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Reroute Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
+
+ "Reroute top message from reroute-queue to alternate exchange"
+ result = pq.reroute(1, True, "")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
+ self.assertEqual(pq.msgDepth,19)
+ self.assertEqual(aq.msgDepth,1)
+
+ "Reroute top 9 messages from reroute-queue to alt.direct2"
+ result = pq.reroute(9, False, "alt.direct2")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+ self.assertEqual(pq.msgDepth,10)
+ self.assertEqual(aq.msgDepth,9)
+
+ "Reroute using a non-existent exchange"
+ result = pq.reroute(0, False, "amq.nosuchexchange")
+ self.assertEqual(result.status, 4)
+
+ "Reroute all messages from reroute-queue"
+ result = pq.reroute(0, False, "alt.direct2")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+ self.assertEqual(pq.msgDepth,0)
+ self.assertEqual(aq.msgDepth,19)
+
+ "Make more messages"
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Reroute Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ "Reroute onto the same queue"
+ result = pq.reroute(0, False, "amq.direct")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ self.assertEqual(pq.msgDepth,20)
+
+
def test_methods_async (self):
"""
"""
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index b2e732e9e3..f8be051c62 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -155,7 +155,13 @@
<statistic name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/>
<method name="purge" desc="Discard all or some messages on a queue">
- <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
+ <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
+ </method>
+
+ <method name="reroute" desc="Remove all or some messages on this queue and route them to an exchange">
+ <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
+ <arg name="useAltExchange" dir="I" type="bool" desc="Iff true, use the queue's configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/>
+ <arg name="exchange" dir="I" type="sstr" desc="Name of the exchange to route the messages through"/>
</method>
</class>