diff options
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 26 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 3 |
4 files changed, 54 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 8b3be54d15..209599cfdd 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -32,6 +32,7 @@ #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" #include "qpid/management/ManagementExchange.h" #include "qpid/log/Statement.h" #include "qpid/framing/AMQFrame.h" @@ -349,6 +350,15 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; } + case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : { + _qmf::ArgsBrokerQueueMoveMessages& moveArgs= + dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); + if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) + status = Manageable::STATUS_OK; + else + return Manageable::STATUS_INVALID_PARAMETER; + break; + } default: status = Manageable::STATUS_NOT_IMPLEMENTED; break; @@ -397,6 +407,22 @@ void Broker::connect( connect(addr.host, addr.port, false, failed, f); } +uint32_t Broker::queueMoveMessages( + const std::string& srcQueue, + const std::string& destQueue, + uint32_t qty) +{ + Queue::shared_ptr src_queue = queues.find(srcQueue); + if (!src_queue) + return 0; + Queue::shared_ptr dest_queue = queues.find(destQueue); + if (!dest_queue) + return 0; + + return src_queue->move(dest_queue, qty); +} + + boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 39b9c334eb..905b16d54f 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -175,6 +175,13 @@ class Broker : public sys::Runnable, public Plugin::Target, boost::function2<void, int, std::string> failed, sys::ConnectionCodec::Factory* =0); + /** Move messages from one queue to another. + A zero quantity means to move all messages + */ + uint32_t queueMoveMessages( const std::string& srcQueue, + const std::string& destQueue, + uint32_t qty); + // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed // For the present just return the first ProtocolFactory registered. boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 8bbccda844..7dc6197fa2 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -405,10 +405,25 @@ uint32_t Queue::purge(const uint32_t purge_request){ uint32_t count = 0; // Either purge them all or just the some (purge_count) while the queue isn't empty. - while((!purge_request || purge_count--) && !messages.empty()) - { + while((!purge_request || purge_count--) && !messages.empty()) { popAndDequeue(); - count++; + count++; + } + return count; +} + +uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { + Mutex::ScopedLock locker(messageLock); + uint32_t move_count = qty; // only comes into play if qty >0 + uint32_t count = 0; // count how many were moved for returning + + while((!qty || move_count--) && !messages.empty()) { + QueuedMessage qmsg = messages.front(); + boost::intrusive_ptr<Message> msg = qmsg.payload; + destq->deliver(msg); // deliver message to the destination queue + messages.pop_front(); + dequeue(0, qmsg); + count++; } return count; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 8f6ae0b967..324e6fa1a1 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -161,6 +161,9 @@ namespace qpid { uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages + //move qty # of messages to destination Queue destq + uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + uint32_t getMessageCount() const; uint32_t getConsumerCount() const; inline const string& getName() const { return name; } |