diff options
| author | Charles E. Rolke <chug@apache.org> | 2013-05-23 19:38:09 +0000 |
|---|---|---|
| committer | Charles E. Rolke <chug@apache.org> | 2013-05-23 19:38:09 +0000 |
| commit | 1d942470fa243680260b1e1f299e95a52fb0624e (patch) | |
| tree | f7438492b2fa3f43a9075ac0010215ef063c020c /cpp/src/qpid/broker/Queue.cpp | |
| parent | cd8de8b4d9737f43894eca5c751802fdb85638ad (diff) | |
| download | qpid-python-1d942470fa243680260b1e1f299e95a52fb0624e.tar.gz | |
QPID-4650: C++ Broker method to redirect messages between two queues.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1485836 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 28 |
1 files changed, 27 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 4c9058e78b..f82fc815c9 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -138,6 +138,7 @@ QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOp } + Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr<Queue> q) : message(m), queue(q), prepared(false) {} bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw() { @@ -186,7 +187,8 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, broker(b), deleted(false), barrier(*this), - allocator(new FifoDistributor( *messages )) + allocator(new FifoDistributor( *messages )), + redirectSource(false) { if (settings.maxDepth.hasCount()) current.setCount(0); if (settings.maxDepth.hasSize()) current.setSize(0); @@ -267,6 +269,15 @@ bool Queue::accept(const Message& msg) void Queue::deliver(Message msg, TxBuffer* txn) { + if (redirectPeer) { + redirectPeer->deliverTo(msg, txn); + } else { + deliverTo(msg, txn); + } +} + +void Queue::deliverTo(Message msg, TxBuffer* txn) +{ if (accept(msg)) { if (txn) { TxOp::shared_ptr op(new TxPublish(msg, shared_from_this())); @@ -1123,6 +1134,7 @@ void Queue::unbind(ExchangeRegistry& exchanges) bindings.unbind(exchanges, shared_from_this()); } + uint64_t Queue::getPersistenceId() const { return persistenceId; @@ -1626,5 +1638,19 @@ void Queue::addArgument(const string& key, const types::Variant& value) { if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap()); } + +void Queue::setRedirectPeer ( Queue::shared_ptr peer, bool isSrc) { + Mutex::ScopedLock locker(messageLock); + redirectPeer = peer; + redirectSource = isSrc; +} + +void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) { + if (mgmtObject != 0) { + mgmtObject->set_redirectPeer(enabled ? peer : ""); + mgmtObject->set_redirectSource(isSrc); + } +} + }} |
