diff options
author | Ted Ross <tross@apache.org> | 2010-01-29 22:58:22 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-01-29 22:58:22 +0000 |
commit | 645461d645d5d6720bd4be7229e254787a109367 (patch) | |
tree | d1dd30a7102ffa9dfc293c52439b76b88286993c | |
parent | 6f16d9d4fa398056a817726ca8512f356422353d (diff) | |
download | qpid-python-645461d645d5d6720bd4be7229e254787a109367.tar.gz |
QPID-2365 - Reroute messages from a queue feature
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@904654 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 65 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | qpid/python/tests_0-10/management.py | 69 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 8 |
4 files changed, 134 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 6e813e936d..4ff3cf6d2c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -37,6 +37,7 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" +#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" #include <iostream> #include <algorithm> @@ -518,17 +519,43 @@ void Queue::purgeExpired() * purge_request == 0 then purge all messages * == N then purge N messages from queue * Sometimes purge_request == 1 to unblock the top of queue + * + * The dest exchange may be supplied to re-route messages through the exchange. + * It is safe to re-route messages such that they arrive back on the same queue, + * even if the queue is ordered by priority. */ -uint32_t Queue::purge(const uint32_t purge_request){ +uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest) +{ Mutex::ScopedLock locker(messageLock); uint32_t purge_count = purge_request; // only comes into play if >0 + std::deque<DeliverableMessage> rerouteQueue; uint32_t count = 0; // Either purge them all or just the some (purge_count) while the queue isn't empty. while((!purge_request || purge_count--) && !messages.empty()) { + if (dest.get()) { + // + // If there is a destination exchange, stage the messages onto a reroute queue + // so they don't wind up getting purged more than once. + // + DeliverableMessage msg(getFront().payload); + rerouteQueue.push_back(msg); + } popAndDequeue(); count++; } + + // + // Re-route purged messages into the destination exchange. Note that there's no need + // to test dest.get() here because if it is NULL, the rerouteQueue will be empty. + // + while (!rerouteQueue.empty()) { + DeliverableMessage msg(rerouteQueue.front()); + rerouteQueue.pop_front(); + dest->route(msg, msg.getMessage().getRoutingKey(), + msg.getMessage().getApplicationHeaders()); + } + return count; } @@ -1038,18 +1065,40 @@ ManagementObject* Queue::GetManagementObject (void) const return (ManagementObject*) mgmtObject; } -Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string&) +Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); - switch (methodId) - { - case _qmf::Queue::METHOD_PURGE : - _qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args; - purge (iargs.i_request); - status = Manageable::STATUS_OK; + switch (methodId) { + case _qmf::Queue::METHOD_PURGE : + { + _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args; + purge(purgeArgs.i_request); + status = Manageable::STATUS_OK; + } + break; + + case _qmf::Queue::METHOD_REROUTE : + { + _qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args; + boost::shared_ptr<Exchange> dest; + if (rerouteArgs.i_useAltExchange) + dest = alternateExchange; + else { + try { + dest = broker->getExchanges().get(rerouteArgs.i_exchange); + } catch(const std::exception&) { + status = Manageable::STATUS_PARAMETER_INVALID; + etext = "Exchange not found"; + break; + } + } + + purge(rerouteArgs.i_request, dest); + status = Manageable::STATUS_OK; + } break; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 513403d535..0b690e7cc5 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -211,7 +211,7 @@ namespace qpid { bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages + uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages QPID_BROKER_EXTERN void purgeExpired(); //move qty # of messages to destination Queue destq diff --git a/qpid/python/tests_0-10/management.py b/qpid/python/tests_0-10/management.py index 9dd03bbda4..677645fa2c 100644 --- a/qpid/python/tests_0-10/management.py +++ b/qpid/python/tests_0-10/management.py @@ -241,6 +241,75 @@ class ManagementTest (TestBase010): pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,0) + def test_reroute_queue(self): + """ + Test ability to reroute messages from the head of a queue. + Need to test moving all, 1 (top message) and N messages. + """ + self.startQmf() + session = self.session + "Set up test queue" + session.exchange_declare(exchange="alt.direct1", type="direct") + session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key") + session.exchange_declare(exchange="alt.direct2", type="direct") + session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True) + session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key") + session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1") + session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key") + + twenty = range(1,21) + props = session.delivery_properties(routing_key="routing_key") + for count in twenty: + body = "Reroute Message %d" % count + msg = Message(props, body) + session.message_transfer(destination="amq.direct", message=msg) + + pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0] + + "Reroute top message from reroute-queue to alternate exchange" + result = pq.reroute(1, True, "") + self.assertEqual(result.status, 0) + pq.update() + aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0] + self.assertEqual(pq.msgDepth,19) + self.assertEqual(aq.msgDepth,1) + + "Reroute top 9 messages from reroute-queue to alt.direct2" + result = pq.reroute(9, False, "alt.direct2") + self.assertEqual(result.status, 0) + pq.update() + aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] + self.assertEqual(pq.msgDepth,10) + self.assertEqual(aq.msgDepth,9) + + "Reroute using a non-existent exchange" + result = pq.reroute(0, False, "amq.nosuchexchange") + self.assertEqual(result.status, 4) + + "Reroute all messages from reroute-queue" + result = pq.reroute(0, False, "alt.direct2") + self.assertEqual(result.status, 0) + pq.update() + aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] + self.assertEqual(pq.msgDepth,0) + self.assertEqual(aq.msgDepth,19) + + "Make more messages" + twenty = range(1,21) + props = session.delivery_properties(routing_key="routing_key") + for count in twenty: + body = "Reroute Message %d" % count + msg = Message(props, body) + session.message_transfer(destination="amq.direct", message=msg) + + "Reroute onto the same queue" + result = pq.reroute(0, False, "amq.direct") + self.assertEqual(result.status, 0) + pq.update() + self.assertEqual(pq.msgDepth,20) + + def test_methods_async (self): """ """ diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index b2e732e9e3..f8be051c62 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -155,7 +155,13 @@ <statistic name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/> <method name="purge" desc="Discard all or some messages on a queue"> - <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> + <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> + </method> + + <method name="reroute" desc="Remove all or some messages on this queue and route them to an exchange"> + <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> + <arg name="useAltExchange" dir="I" type="bool" desc="Iff true, use the queue's configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/> + <arg name="exchange" dir="I" type="sstr" desc="Name of the exchange to route the messages through"/> </method> </class> |