diff options
author | Charles E. Rolke <chug@apache.org> | 2013-05-23 19:38:09 +0000 |
---|---|---|
committer | Charles E. Rolke <chug@apache.org> | 2013-05-23 19:38:09 +0000 |
commit | 03ffba4b8d4c3bb86b607362e4950da2d37d774a (patch) | |
tree | f7438492b2fa3f43a9075ac0010215ef063c020c | |
parent | d741dd4b6ea280bda9b887bf2fd826e891eea34f (diff) | |
download | qpid-python-03ffba4b8d4c3bb86b607362e4950da2d37d774a.tar.gz |
QPID-4650: C++ Broker method to redirect messages between two queues.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1485836 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 131 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 25 | ||||
-rw-r--r-- | cpp/src/tests/CMakeLists.txt | 1 | ||||
-rw-r--r-- | specs/management-schema.xml | 10 | ||||
-rw-r--r-- | tools/src/py/qpidtoollibs/broker.py | 5 |
7 files changed, 197 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 94583aa507..8a7b8c106c 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -52,10 +52,13 @@ #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogHiresTimestamp.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueRedirect.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventUnbind.h" +#include "qmf/org/apache/qpid/broker/EventQueueRedirect.h" +#include "qmf/org/apache/qpid/broker/EventQueueRedirectCancelled.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/management/ManagementDirectExchange.h" #include "qpid/management/ManagementTopicExchange.h" @@ -576,6 +579,14 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; } + case _qmf::Broker::METHOD_QUEUEREDIRECT: + { + string srcQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_sourceQueue); + string tgtQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_targetQueue); + QPID_LOG (debug, "Broker::queueRedirect source queue:" << srcQueue << " to target queue " << tgtQueue); + status = queueRedirect(srcQueue, tgtQueue); + break; + } default: QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; @@ -1046,6 +1057,120 @@ bool Broker::getLogHiresTimestamp() } +Manageable::status_t Broker::queueRedirect(const std::string& srcQueue, + const std::string& tgtQueue) +{ + Queue::shared_ptr srcQ(queues.find(srcQueue)); + if (!srcQ) { + QPID_LOG(error, "Queue redirect failed: source queue not found: " + << srcQueue); + return Manageable::STATUS_UNKNOWN_OBJECT; + } + + if (!tgtQueue.empty()) { + // NonBlank target queue creates partnership + Queue::shared_ptr tgtQ(queues.find(tgtQueue)); + if (!tgtQ) { + QPID_LOG(error, "Queue redirect failed: target queue not found: " + << tgtQueue); + return Manageable::STATUS_UNKNOWN_OBJECT; + } + + if (srcQueue.compare(tgtQueue) == 0) { + QPID_LOG(error, "Queue redirect source queue: " + << tgtQueue << " cannot be its own target"); + return Manageable::STATUS_USER; + } + + if (srcQ->isAutoDelete()) { + QPID_LOG(error, "Queue redirect source queue: " + << srcQueue << " is autodelete and can not be part of redirect"); + return Manageable::STATUS_USER; + } + + if (tgtQ->isAutoDelete()) { + QPID_LOG(error, "Queue redirect target queue: " + << tgtQueue << " is autodelete and can not be part of redirect"); + return Manageable::STATUS_USER; + } + + if (srcQ->getRedirectPeer()) { + QPID_LOG(error, "Queue redirect source queue: " + << srcQueue << " is already redirected"); + return Manageable::STATUS_USER; + } + + if (tgtQ->getRedirectPeer()) { + QPID_LOG(error, "Queue redirect target queue: " + << tgtQueue << " is already redirected"); + return Manageable::STATUS_USER; + } + + // Start the backup overflow partnership + srcQ->setRedirectPeer(tgtQ, true); + tgtQ->setRedirectPeer(srcQ, false); + + // Set management state + srcQ->setMgmtRedirectState(tgtQueue, true, true); + tgtQ->setMgmtRedirectState(srcQueue, true, false); + + // Management event + if (managementAgent.get()) { + managementAgent->raiseEvent(_qmf::EventQueueRedirect(srcQueue, tgtQueue)); + } + + QPID_LOG(info, "Queue redirect complete. queue: " + << srcQueue << " target queue: " << tgtQueue); + return Manageable::STATUS_OK; + } else { + // Blank target queue destroys partnership + Queue::shared_ptr tgtQ(srcQ->getRedirectPeer()); + if (!tgtQ) { + QPID_LOG(error, "Queue redirect source queue: " + << srcQueue << " is not in redirected"); + return Manageable::STATUS_USER; + } + + if (!srcQ->isRedirectSource()) { + QPID_LOG(error, "Queue redirect source queue: " + << srcQueue << " is not a redirect source"); + return Manageable::STATUS_USER; + } + + queueRedirectDestroy(srcQ, tgtQ, true); + + return Manageable::STATUS_OK; + } +} + + +void Broker::queueRedirectDestroy(Queue::shared_ptr srcQ, + Queue::shared_ptr tgtQ, + bool moveMsgs) { + QPID_LOG(notice, "Queue redirect destroyed. queue: " << srcQ->getName() + << " target queue: " << tgtQ->getName()); + + tgtQ->setMgmtRedirectState(empty, false, false); + srcQ->setMgmtRedirectState(empty, false, false); + + if (moveMsgs) { + // TODO: this 'move' works in the static case but has no + // actual locking that does what redirect needs when + // there is a lot of traffic in flight. + tgtQ->move(srcQ, 0); + } + + Queue::shared_ptr np; + + tgtQ->setRedirectPeer(np, false); + srcQ->setRedirectPeer(np, false); + + if (managementAgent.get()) { + managementAgent->raiseEvent(_qmf::EventQueueRedirectCancelled(srcQ->getName(), tgtQ->getName())); + } +} + + const Broker::TransportInfo& Broker::getTransportInfo(const std::string& name) const { static TransportInfo nullTransportInfo; TransportMap::const_iterator i @@ -1135,7 +1260,6 @@ bool Broker::deferDeliveryImpl(const std::string&, const Message&) const std::string Broker::TCP_TRANSPORT("tcp"); - std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( const std::string& name, const QueueSettings& settings, @@ -1210,6 +1334,11 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId, if (check) check(queue); if (acl) acl->recordDestroyQueue(name); + Queue::shared_ptr peerQ(queue->getRedirectPeer()); + if (peerQ) + queueRedirectDestroy(queue->isRedirectSource() ? queue : peerQ, + queue->isRedirectSource() ? peerQ : queue, + false); queues.destroy(name, connectionId, userId); queue->destroyed(); } else { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index c2032ef629..44b09239c4 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -165,6 +165,8 @@ class Broker : public sys::Runnable, public Plugin::Target, const ConnectionState* context); Manageable::status_t setTimestampConfig(const bool receive, const ConnectionState* context); + Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue); + void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue> tgtQ, bool moveMsgs); boost::shared_ptr<sys::Poller> poller; std::auto_ptr<sys::Timer> timer; Options config; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 4c9058e78b..f82fc815c9 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -138,6 +138,7 @@ QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOp } + Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr<Queue> q) : message(m), queue(q), prepared(false) {} bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw() { @@ -186,7 +187,8 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, broker(b), deleted(false), barrier(*this), - allocator(new FifoDistributor( *messages )) + allocator(new FifoDistributor( *messages )), + redirectSource(false) { if (settings.maxDepth.hasCount()) current.setCount(0); if (settings.maxDepth.hasSize()) current.setSize(0); @@ -267,6 +269,15 @@ bool Queue::accept(const Message& msg) void Queue::deliver(Message msg, TxBuffer* txn) { + if (redirectPeer) { + redirectPeer->deliverTo(msg, txn); + } else { + deliverTo(msg, txn); + } +} + +void Queue::deliverTo(Message msg, TxBuffer* txn) +{ if (accept(msg)) { if (txn) { TxOp::shared_ptr op(new TxPublish(msg, shared_from_this())); @@ -1123,6 +1134,7 @@ void Queue::unbind(ExchangeRegistry& exchanges) bindings.unbind(exchanges, shared_from_this()); } + uint64_t Queue::getPersistenceId() const { return persistenceId; @@ -1626,5 +1638,19 @@ void Queue::addArgument(const string& key, const types::Variant& value) { if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap()); } + +void Queue::setRedirectPeer ( Queue::shared_ptr peer, bool isSrc) { + Mutex::ScopedLock locker(messageLock); + redirectPeer = peer; + redirectSource = isSrc; +} + +void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) { + if (mgmtObject != 0) { + mgmtObject->set_redirectPeer(enabled ? peer : ""); + mgmtObject->set_redirectSource(isSrc); + } +} + }} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index ee9c54df29..68d793c970 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -82,6 +82,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable { public: typedef boost::function1<bool, const Message&> MessagePredicate; + + typedef boost::shared_ptr<Queue> shared_ptr; + protected: struct UsageBarrier { @@ -169,6 +172,10 @@ class Queue : public boost::enable_shared_from_this<Queue>, boost::shared_ptr<MessageDistributor> allocator; boost::scoped_ptr<Selector> selector; + // Redirect source and target refer to each other. Only one is source. + Queue::shared_ptr redirectPeer; + bool redirectSource; + virtual void push(Message& msg, bool isRecovery=false); bool accept(const Message&); void process(Message& msg); @@ -202,8 +209,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, public: - typedef boost::shared_ptr<Queue> shared_ptr; - typedef std::vector<shared_ptr> vector; QPID_BROKER_EXTERN Queue(const std::string& name, @@ -250,10 +255,16 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN bool dequeueMessageAt(const qpid::framing::SequenceNumber& position); /** + * Delivers a message to the queue or to overflow partner. + */ + QPID_BROKER_EXTERN void deliver(Message, TxBuffer* = 0); + /** * Delivers a message to the queue. Will record it as * enqueued if persistent then process it. */ - QPID_BROKER_EXTERN void deliver(Message, TxBuffer* = 0); + private: + QPID_BROKER_EXTERN void deliverTo(Message, TxBuffer* = 0); + public: /** * Returns a message to the in-memory queue (due to lack * of acknowledegement from a receiver). If a consumer is @@ -428,6 +439,14 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** Add an argument to be included in management messages about this queue. */ QPID_BROKER_EXTERN void addArgument(const std::string& key, const types::Variant& value); + /** + * Atomic Redirect + */ + QPID_BROKER_EXTERN void setRedirectPeer ( Queue::shared_ptr peer, bool isSrc ); + QPID_BROKER_EXTERN Queue::shared_ptr getRedirectPeer() { return redirectPeer; } + QPID_BROKER_EXTERN bool isRedirectSource() const { return redirectSource; } + QPID_BROKER_EXTERN void setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ); + friend class QueueFactory; }; } diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index f3ee40621f..af96944b2d 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -349,6 +349,7 @@ endif (BUILD_MSSQL) if (BUILD_MSCLFS) add_test (store_tests_clfs ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_store_tests${test_script_suffix} MSSQL-CLFS) endif (BUILD_MSCLFS) +add_test (queue_redirect ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_queue_redirect${test_script_suffix}) endif (PYTHON_EXECUTABLE) add_library(test_store MODULE test_store.cpp) diff --git a/specs/management-schema.xml b/specs/management-schema.xml index 58c6d59716..cee3c921ba 100644 --- a/specs/management-schema.xml +++ b/specs/management-schema.xml @@ -189,6 +189,10 @@ <arg name="logHires" dir="I" type="bool" desc="True to enable enable high resolution timestamp in logs."/> </method> + <method name="queueRedirect" desc="Enable/disable delivery redirect for indicated queues"> + <arg name="sourceQueue" dir="I" type="sstr" desc="Source queue."/> + <arg name="targetQueue" dir="I" type="sstr" desc="Redirect target queue. Blank disables redirect."/> + </method> </class> @@ -278,6 +282,9 @@ <statistic name="flowStopped" type="bool" desc="Flow control active."/> <statistic name="flowStoppedCount" type="count32" desc="Number of times flow control was activated for this queue"/> + <statistic name="redirectPeer" type="sstr" desc="Partner queue for redirected pair"/> + <statistic name="redirectSource" type="bool" desc="This queue is the redirect source"/> + <method name="purge" desc="Discard all or some messages on a queue"> <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> <arg name="filter" dir="I" type="map" desc="if specified, purge only those messages matching this filter"/> @@ -546,6 +553,7 @@ <arg name="reason" type="lstr" desc="Reason for a failure"/> <arg name="rhost" type="sstr" desc="Address (i.e. DNS name, IP address, etc.) of a remotely connected host"/> <arg name="user" type="sstr" desc="Authentication identity"/> + <arg name="qTarget" type="sstr" desc="Redirect target queue"/> <arg name="msgDepth" type="count64" desc="Current size of queue in messages"/> <arg name="byteDepth" type="count64" desc="Current size of queue in bytes"/> <arg name="properties" type="map" desc="optional identifying information sent by the remote"/> @@ -566,6 +574,8 @@ <event name="unsubscribe" sev="inform" args="rhost, user, dest"/> <event name="queueThresholdCrossedUpward" sev="inform" args="qName, msgDepth, byteDepth"/> <event name="queueThresholdCrossedDownward" sev="inform" args="qName, msgDepth, byteDepth"/> + <event name="queueRedirect" sev="inform" args="qName, qTarget"/> + <event name="queueRedirectCancelled" sev="inform" args="qName, qTarget"/> <!-- The following are deprecated --> <event name="queueThresholdExceeded" sev="warn" args="qName, msgDepth, byteDepth"/> diff --git a/tools/src/py/qpidtoollibs/broker.py b/tools/src/py/qpidtoollibs/broker.py index c496ab0908..4fad8cc8ad 100644 --- a/tools/src/py/qpidtoollibs/broker.py +++ b/tools/src/py/qpidtoollibs/broker.py @@ -292,6 +292,11 @@ class BrokerAgent(object): 'routingKey': key} return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + def Redirect(self, sourceQueue, targetQueue): + args = {'sourceQueue': sourceQueue, + 'targetQueue': targetQueue} + return self._method('queueRedirect', args, "org.apache.qpid.broker:broker:amqp-broker") + def create(self, _type, name, properties={}, strict=False): """Create an object of the specified type""" args = {'type': _type, |