summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharles E. Rolke <chug@apache.org>2013-05-23 19:38:09 +0000
committerCharles E. Rolke <chug@apache.org>2013-05-23 19:38:09 +0000
commit03ffba4b8d4c3bb86b607362e4950da2d37d774a (patch)
treef7438492b2fa3f43a9075ac0010215ef063c020c
parentd741dd4b6ea280bda9b887bf2fd826e891eea34f (diff)
downloadqpid-python-03ffba4b8d4c3bb86b607362e4950da2d37d774a.tar.gz
QPID-4650: C++ Broker method to redirect messages between two queues.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1485836 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Broker.cpp131
-rw-r--r--cpp/src/qpid/broker/Broker.h2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp28
-rw-r--r--cpp/src/qpid/broker/Queue.h25
-rw-r--r--cpp/src/tests/CMakeLists.txt1
-rw-r--r--specs/management-schema.xml10
-rw-r--r--tools/src/py/qpidtoollibs/broker.py5
7 files changed, 197 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 94583aa507..8a7b8c106c 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -52,10 +52,13 @@
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogHiresTimestamp.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueRedirect.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qmf/org/apache/qpid/broker/EventQueueRedirect.h"
+#include "qmf/org/apache/qpid/broker/EventQueueRedirectCancelled.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
@@ -576,6 +579,14 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
status = Manageable::STATUS_OK;
break;
}
+ case _qmf::Broker::METHOD_QUEUEREDIRECT:
+ {
+ string srcQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_sourceQueue);
+ string tgtQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_targetQueue);
+ QPID_LOG (debug, "Broker::queueRedirect source queue:" << srcQueue << " to target queue " << tgtQueue);
+ status = queueRedirect(srcQueue, tgtQueue);
+ break;
+ }
default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -1046,6 +1057,120 @@ bool Broker::getLogHiresTimestamp()
}
+Manageable::status_t Broker::queueRedirect(const std::string& srcQueue,
+ const std::string& tgtQueue)
+{
+ Queue::shared_ptr srcQ(queues.find(srcQueue));
+ if (!srcQ) {
+ QPID_LOG(error, "Queue redirect failed: source queue not found: "
+ << srcQueue);
+ return Manageable::STATUS_UNKNOWN_OBJECT;
+ }
+
+ if (!tgtQueue.empty()) {
+ // NonBlank target queue creates partnership
+ Queue::shared_ptr tgtQ(queues.find(tgtQueue));
+ if (!tgtQ) {
+ QPID_LOG(error, "Queue redirect failed: target queue not found: "
+ << tgtQueue);
+ return Manageable::STATUS_UNKNOWN_OBJECT;
+ }
+
+ if (srcQueue.compare(tgtQueue) == 0) {
+ QPID_LOG(error, "Queue redirect source queue: "
+ << tgtQueue << " cannot be its own target");
+ return Manageable::STATUS_USER;
+ }
+
+ if (srcQ->isAutoDelete()) {
+ QPID_LOG(error, "Queue redirect source queue: "
+ << srcQueue << " is autodelete and can not be part of redirect");
+ return Manageable::STATUS_USER;
+ }
+
+ if (tgtQ->isAutoDelete()) {
+ QPID_LOG(error, "Queue redirect target queue: "
+ << tgtQueue << " is autodelete and can not be part of redirect");
+ return Manageable::STATUS_USER;
+ }
+
+ if (srcQ->getRedirectPeer()) {
+ QPID_LOG(error, "Queue redirect source queue: "
+ << srcQueue << " is already redirected");
+ return Manageable::STATUS_USER;
+ }
+
+ if (tgtQ->getRedirectPeer()) {
+ QPID_LOG(error, "Queue redirect target queue: "
+ << tgtQueue << " is already redirected");
+ return Manageable::STATUS_USER;
+ }
+
+ // Start the backup overflow partnership
+ srcQ->setRedirectPeer(tgtQ, true);
+ tgtQ->setRedirectPeer(srcQ, false);
+
+ // Set management state
+ srcQ->setMgmtRedirectState(tgtQueue, true, true);
+ tgtQ->setMgmtRedirectState(srcQueue, true, false);
+
+ // Management event
+ if (managementAgent.get()) {
+ managementAgent->raiseEvent(_qmf::EventQueueRedirect(srcQueue, tgtQueue));
+ }
+
+ QPID_LOG(info, "Queue redirect complete. queue: "
+ << srcQueue << " target queue: " << tgtQueue);
+ return Manageable::STATUS_OK;
+ } else {
+ // Blank target queue destroys partnership
+ Queue::shared_ptr tgtQ(srcQ->getRedirectPeer());
+ if (!tgtQ) {
+ QPID_LOG(error, "Queue redirect source queue: "
+ << srcQueue << " is not in redirected");
+ return Manageable::STATUS_USER;
+ }
+
+ if (!srcQ->isRedirectSource()) {
+ QPID_LOG(error, "Queue redirect source queue: "
+ << srcQueue << " is not a redirect source");
+ return Manageable::STATUS_USER;
+ }
+
+ queueRedirectDestroy(srcQ, tgtQ, true);
+
+ return Manageable::STATUS_OK;
+ }
+}
+
+
+void Broker::queueRedirectDestroy(Queue::shared_ptr srcQ,
+ Queue::shared_ptr tgtQ,
+ bool moveMsgs) {
+ QPID_LOG(notice, "Queue redirect destroyed. queue: " << srcQ->getName()
+ << " target queue: " << tgtQ->getName());
+
+ tgtQ->setMgmtRedirectState(empty, false, false);
+ srcQ->setMgmtRedirectState(empty, false, false);
+
+ if (moveMsgs) {
+ // TODO: this 'move' works in the static case but has no
+ // actual locking that does what redirect needs when
+ // there is a lot of traffic in flight.
+ tgtQ->move(srcQ, 0);
+ }
+
+ Queue::shared_ptr np;
+
+ tgtQ->setRedirectPeer(np, false);
+ srcQ->setRedirectPeer(np, false);
+
+ if (managementAgent.get()) {
+ managementAgent->raiseEvent(_qmf::EventQueueRedirectCancelled(srcQ->getName(), tgtQ->getName()));
+ }
+}
+
+
const Broker::TransportInfo& Broker::getTransportInfo(const std::string& name) const {
static TransportInfo nullTransportInfo;
TransportMap::const_iterator i
@@ -1135,7 +1260,6 @@ bool Broker::deferDeliveryImpl(const std::string&, const Message&)
const std::string Broker::TCP_TRANSPORT("tcp");
-
std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
const std::string& name,
const QueueSettings& settings,
@@ -1210,6 +1334,11 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId,
if (check) check(queue);
if (acl)
acl->recordDestroyQueue(name);
+ Queue::shared_ptr peerQ(queue->getRedirectPeer());
+ if (peerQ)
+ queueRedirectDestroy(queue->isRedirectSource() ? queue : peerQ,
+ queue->isRedirectSource() ? peerQ : queue,
+ false);
queues.destroy(name, connectionId, userId);
queue->destroyed();
} else {
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index c2032ef629..44b09239c4 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -165,6 +165,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
const ConnectionState* context);
Manageable::status_t setTimestampConfig(const bool receive,
const ConnectionState* context);
+ Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue);
+ void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue> tgtQ, bool moveMsgs);
boost::shared_ptr<sys::Poller> poller;
std::auto_ptr<sys::Timer> timer;
Options config;
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 4c9058e78b..f82fc815c9 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -138,6 +138,7 @@ QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOp
}
+
Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr<Queue> q) : message(m), queue(q), prepared(false) {}
bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw()
{
@@ -186,7 +187,8 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
broker(b),
deleted(false),
barrier(*this),
- allocator(new FifoDistributor( *messages ))
+ allocator(new FifoDistributor( *messages )),
+ redirectSource(false)
{
if (settings.maxDepth.hasCount()) current.setCount(0);
if (settings.maxDepth.hasSize()) current.setSize(0);
@@ -267,6 +269,15 @@ bool Queue::accept(const Message& msg)
void Queue::deliver(Message msg, TxBuffer* txn)
{
+ if (redirectPeer) {
+ redirectPeer->deliverTo(msg, txn);
+ } else {
+ deliverTo(msg, txn);
+ }
+}
+
+void Queue::deliverTo(Message msg, TxBuffer* txn)
+{
if (accept(msg)) {
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
@@ -1123,6 +1134,7 @@ void Queue::unbind(ExchangeRegistry& exchanges)
bindings.unbind(exchanges, shared_from_this());
}
+
uint64_t Queue::getPersistenceId() const
{
return persistenceId;
@@ -1626,5 +1638,19 @@ void Queue::addArgument(const string& key, const types::Variant& value) {
if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap());
}
+
+void Queue::setRedirectPeer ( Queue::shared_ptr peer, bool isSrc) {
+ Mutex::ScopedLock locker(messageLock);
+ redirectPeer = peer;
+ redirectSource = isSrc;
+}
+
+void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) {
+ if (mgmtObject != 0) {
+ mgmtObject->set_redirectPeer(enabled ? peer : "");
+ mgmtObject->set_redirectSource(isSrc);
+ }
+}
+
}}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index ee9c54df29..68d793c970 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -82,6 +82,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
public:
typedef boost::function1<bool, const Message&> MessagePredicate;
+
+ typedef boost::shared_ptr<Queue> shared_ptr;
+
protected:
struct UsageBarrier
{
@@ -169,6 +172,10 @@ class Queue : public boost::enable_shared_from_this<Queue>,
boost::shared_ptr<MessageDistributor> allocator;
boost::scoped_ptr<Selector> selector;
+ // Redirect source and target refer to each other. Only one is source.
+ Queue::shared_ptr redirectPeer;
+ bool redirectSource;
+
virtual void push(Message& msg, bool isRecovery=false);
bool accept(const Message&);
void process(Message& msg);
@@ -202,8 +209,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
public:
- typedef boost::shared_ptr<Queue> shared_ptr;
-
typedef std::vector<shared_ptr> vector;
QPID_BROKER_EXTERN Queue(const std::string& name,
@@ -250,10 +255,16 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN bool dequeueMessageAt(const qpid::framing::SequenceNumber& position);
/**
+ * Delivers a message to the queue or to overflow partner.
+ */
+ QPID_BROKER_EXTERN void deliver(Message, TxBuffer* = 0);
+ /**
* Delivers a message to the queue. Will record it as
* enqueued if persistent then process it.
*/
- QPID_BROKER_EXTERN void deliver(Message, TxBuffer* = 0);
+ private:
+ QPID_BROKER_EXTERN void deliverTo(Message, TxBuffer* = 0);
+ public:
/**
* Returns a message to the in-memory queue (due to lack
* of acknowledegement from a receiver). If a consumer is
@@ -428,6 +439,14 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** Add an argument to be included in management messages about this queue. */
QPID_BROKER_EXTERN void addArgument(const std::string& key, const types::Variant& value);
+ /**
+ * Atomic Redirect
+ */
+ QPID_BROKER_EXTERN void setRedirectPeer ( Queue::shared_ptr peer, bool isSrc );
+ QPID_BROKER_EXTERN Queue::shared_ptr getRedirectPeer() { return redirectPeer; }
+ QPID_BROKER_EXTERN bool isRedirectSource() const { return redirectSource; }
+ QPID_BROKER_EXTERN void setMgmtRedirectState( std::string peer, bool enabled, bool isSrc );
+
friend class QueueFactory;
};
}
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index f3ee40621f..af96944b2d 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -349,6 +349,7 @@ endif (BUILD_MSSQL)
if (BUILD_MSCLFS)
add_test (store_tests_clfs ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_store_tests${test_script_suffix} MSSQL-CLFS)
endif (BUILD_MSCLFS)
+add_test (queue_redirect ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_queue_redirect${test_script_suffix})
endif (PYTHON_EXECUTABLE)
add_library(test_store MODULE test_store.cpp)
diff --git a/specs/management-schema.xml b/specs/management-schema.xml
index 58c6d59716..cee3c921ba 100644
--- a/specs/management-schema.xml
+++ b/specs/management-schema.xml
@@ -189,6 +189,10 @@
<arg name="logHires" dir="I" type="bool" desc="True to enable enable high resolution timestamp in logs."/>
</method>
+ <method name="queueRedirect" desc="Enable/disable delivery redirect for indicated queues">
+ <arg name="sourceQueue" dir="I" type="sstr" desc="Source queue."/>
+ <arg name="targetQueue" dir="I" type="sstr" desc="Redirect target queue. Blank disables redirect."/>
+ </method>
</class>
@@ -278,6 +282,9 @@
<statistic name="flowStopped" type="bool" desc="Flow control active."/>
<statistic name="flowStoppedCount" type="count32" desc="Number of times flow control was activated for this queue"/>
+ <statistic name="redirectPeer" type="sstr" desc="Partner queue for redirected pair"/>
+ <statistic name="redirectSource" type="bool" desc="This queue is the redirect source"/>
+
<method name="purge" desc="Discard all or some messages on a queue">
<arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
<arg name="filter" dir="I" type="map" desc="if specified, purge only those messages matching this filter"/>
@@ -546,6 +553,7 @@
<arg name="reason" type="lstr" desc="Reason for a failure"/>
<arg name="rhost" type="sstr" desc="Address (i.e. DNS name, IP address, etc.) of a remotely connected host"/>
<arg name="user" type="sstr" desc="Authentication identity"/>
+ <arg name="qTarget" type="sstr" desc="Redirect target queue"/>
<arg name="msgDepth" type="count64" desc="Current size of queue in messages"/>
<arg name="byteDepth" type="count64" desc="Current size of queue in bytes"/>
<arg name="properties" type="map" desc="optional identifying information sent by the remote"/>
@@ -566,6 +574,8 @@
<event name="unsubscribe" sev="inform" args="rhost, user, dest"/>
<event name="queueThresholdCrossedUpward" sev="inform" args="qName, msgDepth, byteDepth"/>
<event name="queueThresholdCrossedDownward" sev="inform" args="qName, msgDepth, byteDepth"/>
+ <event name="queueRedirect" sev="inform" args="qName, qTarget"/>
+ <event name="queueRedirectCancelled" sev="inform" args="qName, qTarget"/>
<!-- The following are deprecated -->
<event name="queueThresholdExceeded" sev="warn" args="qName, msgDepth, byteDepth"/>
diff --git a/tools/src/py/qpidtoollibs/broker.py b/tools/src/py/qpidtoollibs/broker.py
index c496ab0908..4fad8cc8ad 100644
--- a/tools/src/py/qpidtoollibs/broker.py
+++ b/tools/src/py/qpidtoollibs/broker.py
@@ -292,6 +292,11 @@ class BrokerAgent(object):
'routingKey': key}
return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
+ def Redirect(self, sourceQueue, targetQueue):
+ args = {'sourceQueue': sourceQueue,
+ 'targetQueue': targetQueue}
+ return self._method('queueRedirect', args, "org.apache.qpid.broker:broker:amqp-broker")
+
def create(self, _type, name, properties={}, strict=False):
"""Create an object of the specified type"""
args = {'type': _type,