summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorCharles E. Rolke <chug@apache.org>2013-05-23 19:38:09 +0000
committerCharles E. Rolke <chug@apache.org>2013-05-23 19:38:09 +0000
commit1d942470fa243680260b1e1f299e95a52fb0624e (patch)
treef7438492b2fa3f43a9075ac0010215ef063c020c /cpp/src/qpid/broker/Queue.cpp
parentcd8de8b4d9737f43894eca5c751802fdb85638ad (diff)
downloadqpid-python-1d942470fa243680260b1e1f299e95a52fb0624e.tar.gz
QPID-4650: C++ Broker method to redirect messages between two queues.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1485836 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp28
1 files changed, 27 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 4c9058e78b..f82fc815c9 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -138,6 +138,7 @@ QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOp
}
+
Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr<Queue> q) : message(m), queue(q), prepared(false) {}
bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw()
{
@@ -186,7 +187,8 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
broker(b),
deleted(false),
barrier(*this),
- allocator(new FifoDistributor( *messages ))
+ allocator(new FifoDistributor( *messages )),
+ redirectSource(false)
{
if (settings.maxDepth.hasCount()) current.setCount(0);
if (settings.maxDepth.hasSize()) current.setSize(0);
@@ -267,6 +269,15 @@ bool Queue::accept(const Message& msg)
void Queue::deliver(Message msg, TxBuffer* txn)
{
+ if (redirectPeer) {
+ redirectPeer->deliverTo(msg, txn);
+ } else {
+ deliverTo(msg, txn);
+ }
+}
+
+void Queue::deliverTo(Message msg, TxBuffer* txn)
+{
if (accept(msg)) {
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
@@ -1123,6 +1134,7 @@ void Queue::unbind(ExchangeRegistry& exchanges)
bindings.unbind(exchanges, shared_from_this());
}
+
uint64_t Queue::getPersistenceId() const
{
return persistenceId;
@@ -1626,5 +1638,19 @@ void Queue::addArgument(const string& key, const types::Variant& value) {
if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap());
}
+
+void Queue::setRedirectPeer ( Queue::shared_ptr peer, bool isSrc) {
+ Mutex::ScopedLock locker(messageLock);
+ redirectPeer = peer;
+ redirectSource = isSrc;
+}
+
+void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) {
+ if (mgmtObject != 0) {
+ mgmtObject->set_redirectPeer(enabled ? peer : "");
+ mgmtObject->set_redirectSource(isSrc);
+ }
+}
+
}}