summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-09-23 18:42:45 +0000
committerTed Ross <tross@apache.org>2008-09-23 18:42:45 +0000
commitf5cf315acd4ae95d12c020187c9623a16d061e92 (patch)
treec7d912d15be46200ee4de5f66de7b95902cf0cf7 /qpid/cpp
parent574c338d3731043fd598040ccba211e2e339d977 (diff)
downloadqpid-python-f5cf315acd4ae95d12c020187c9623a16d061e92.tar.gz
QPID-1290 - Patch from William Henry
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@698275 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h7
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp21
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h3
4 files changed, 54 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 8b3be54d15..209599cfdd 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -32,6 +32,7 @@
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qpid/management/ManagementExchange.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/AMQFrame.h"
@@ -349,6 +350,15 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
status = Manageable::STATUS_OK;
break;
}
+ case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : {
+ _qmf::ArgsBrokerQueueMoveMessages& moveArgs=
+ dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
+ if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
+ status = Manageable::STATUS_OK;
+ else
+ return Manageable::STATUS_INVALID_PARAMETER;
+ break;
+ }
default:
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
@@ -397,6 +407,22 @@ void Broker::connect(
connect(addr.host, addr.port, false, failed, f);
}
+uint32_t Broker::queueMoveMessages(
+ const std::string& srcQueue,
+ const std::string& destQueue,
+ uint32_t qty)
+{
+ Queue::shared_ptr src_queue = queues.find(srcQueue);
+ if (!src_queue)
+ return 0;
+ Queue::shared_ptr dest_queue = queues.find(destQueue);
+ if (!dest_queue)
+ return 0;
+
+ return src_queue->move(dest_queue, qty);
+}
+
+
boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; }
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 39b9c334eb..905b16d54f 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -175,6 +175,13 @@ class Broker : public sys::Runnable, public Plugin::Target,
boost::function2<void, int, std::string> failed,
sys::ConnectionCodec::Factory* =0);
+ /** Move messages from one queue to another.
+ A zero quantity means to move all messages
+ */
+ uint32_t queueMoveMessages( const std::string& srcQueue,
+ const std::string& destQueue,
+ uint32_t qty);
+
// TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
// For the present just return the first ProtocolFactory registered.
boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 8bbccda844..7dc6197fa2 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -405,10 +405,25 @@ uint32_t Queue::purge(const uint32_t purge_request){
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages.empty())
- {
+ while((!purge_request || purge_count--) && !messages.empty()) {
popAndDequeue();
- count++;
+ count++;
+ }
+ return count;
+}
+
+uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
+ Mutex::ScopedLock locker(messageLock);
+ uint32_t move_count = qty; // only comes into play if qty >0
+ uint32_t count = 0; // count how many were moved for returning
+
+ while((!qty || move_count--) && !messages.empty()) {
+ QueuedMessage qmsg = messages.front();
+ boost::intrusive_ptr<Message> msg = qmsg.payload;
+ destq->deliver(msg); // deliver message to the destination queue
+ messages.pop_front();
+ dequeue(0, qmsg);
+ count++;
}
return count;
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 8f6ae0b967..324e6fa1a1 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -161,6 +161,9 @@ namespace qpid {
uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
+ //move qty # of messages to destination Queue destq
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+
uint32_t getMessageCount() const;
uint32_t getConsumerCount() const;
inline const string& getName() const { return name; }