diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/ConnectionToken.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/OwnershipToken.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PreviewSessionState.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PreviewSessionState.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionContext.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.cpp | 8 |
12 files changed, 52 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/ConnectionToken.h b/cpp/src/qpid/broker/ConnectionToken.h index 38b7d7d098..0e3b301897 100644 --- a/cpp/src/qpid/broker/ConnectionToken.h +++ b/cpp/src/qpid/broker/ConnectionToken.h @@ -30,6 +30,7 @@ namespace qpid { */ class ConnectionToken : public OwnershipToken { public: + virtual bool isLocal(const ConnectionToken* t) const { return this == t; } virtual ~ConnectionToken(){} }; } diff --git a/cpp/src/qpid/broker/OwnershipToken.h b/cpp/src/qpid/broker/OwnershipToken.h index 15f333941b..effd2f5b3c 100644 --- a/cpp/src/qpid/broker/OwnershipToken.h +++ b/cpp/src/qpid/broker/OwnershipToken.h @@ -24,8 +24,11 @@ namespace qpid { namespace broker { +class ConnectionToken; + class OwnershipToken{ public: + virtual bool isLocal(const ConnectionToken* t) const = 0; virtual ~OwnershipToken(){} }; diff --git a/cpp/src/qpid/broker/PreviewSessionState.cpp b/cpp/src/qpid/broker/PreviewSessionState.cpp index 7188ffbf40..43c3b1509e 100644 --- a/cpp/src/qpid/broker/PreviewSessionState.cpp +++ b/cpp/src/qpid/broker/PreviewSessionState.cpp @@ -90,6 +90,11 @@ ConnectionState& PreviewSessionState::getConnection() { return getHandler()->getConnection(); } +bool PreviewSessionState::isLocal(const ConnectionToken* t) const +{ + return isAttached() && &(handler->getConnection()) == t; +} + void PreviewSessionState::detach() { getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/broker/PreviewSessionState.h b/cpp/src/qpid/broker/PreviewSessionState.h index 6e8523317c..1aecb12e72 100644 --- a/cpp/src/qpid/broker/PreviewSessionState.h +++ b/cpp/src/qpid/broker/PreviewSessionState.h @@ -64,7 +64,7 @@ class PreviewSessionState : public framing::SessionState, { public: ~PreviewSessionState(); - bool isAttached() { return handler; } + bool isAttached() const { return handler; } void detach(); void attach(PreviewSessionHandler& handler); @@ -77,6 +77,7 @@ class PreviewSessionState : public framing::SessionState, /** @pre isAttached() */ ConnectionState& getConnection(); + bool isLocal(const ConnectionToken* t) const; uint32_t getTimeout() const { return timeout; } Broker& getBroker() { return broker; } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 165830151d..436431fce1 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -59,6 +59,7 @@ Queue::Queue(const string& _name, bool _autodelete, owner(_owner), consumerCount(0), exclusive(false), + noLocal(false), persistenceId(0) { if (parent != 0) @@ -90,6 +91,10 @@ void Queue::notifyDurableIOComplete() notify(); } +bool Queue::isLocal(boost::intrusive_ptr<Message>& msg) +{ + return noLocal && owner && owner->isLocal(msg->getPublisher()); +} void Queue::deliver(boost::intrusive_ptr<Message>& msg){ if (msg->isImmediate() && getConsumerCount() == 0) { @@ -97,9 +102,10 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ DeliverableMessage deliverable(msg); alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); } + } else if (isLocal(msg)) { + //drop message + QPID_LOG(debug, "Dropping 'local' message from " << getName()); } else { - - // if no store then mark as enqueued if (!enqueue(0, msg)){ push(msg); @@ -468,6 +474,7 @@ namespace { const std::string qpidMaxSize("qpid.max_size"); const std::string qpidMaxCount("qpid.max_count"); + const std::string qpidNoLocal("no-local"); } void Queue::create(const FieldTable& _settings) @@ -484,8 +491,13 @@ void Queue::create(const FieldTable& _settings) void Queue::configure(const FieldTable& _settings) { std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings)); - if (_policy->getMaxCount() || _policy->getMaxSize()) + if (_policy->getMaxCount() || _policy->getMaxSize()) { setPolicy(_policy); + } + if (owner) { + noLocal = _settings.get(qpidNoLocal); + QPID_LOG(debug, "Configured queue with no-local=" << noLocal); + } } void Queue::destroy() diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index e33cd7e5d7..880b048103 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -70,6 +70,7 @@ namespace qpid { const OwnershipToken* owner; uint32_t consumerCount; bool exclusive; + bool noLocal; Listeners listeners; Messages messages; mutable qpid::sys::Mutex consumerLock; @@ -118,6 +119,7 @@ namespace qpid { bool acquire(const QueuedMessage& msg); + bool isLocal(boost::intrusive_ptr<Message>& msg); /** * Delivers a message to the queue. Will record it as * enqueued if persistent then process it. diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 2091e97584..3daf15f269 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -23,6 +23,7 @@ #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" +#include "qpid/log/Statement.h" #include <boost/format.hpp> #include <boost/cast.hpp> #include <boost/bind.hpp> @@ -198,6 +199,12 @@ SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl() exclusiveQueues.erase(exclusiveQueues.begin()); } } + +bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const +{ + return session.isLocal(t); +} + Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) { diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index c8aa9008cc..fc182e0bb6 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -21,11 +21,12 @@ #include "HandlerImpl.h" +#include "ConnectionToken.h" +#include "OwnershipToken.h" #include "qpid/Exception.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/SequenceSet.h" -#include "OwnershipToken.h" #include <vector> #include <boost/function.hpp> @@ -140,6 +141,7 @@ class Queue; bool ifUnused, bool ifEmpty); void purge(const std::string& queue); framing::Queue010QueryResult query(const std::string& queue); + bool isLocal(const ConnectionToken* t) const; }; class MessageHandlerImpl : diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h index a289310b15..e3cc0a5fa3 100644 --- a/cpp/src/qpid/broker/SessionContext.h +++ b/cpp/src/qpid/broker/SessionContext.h @@ -38,6 +38,7 @@ class SessionContext : public sys::OutputControl { public: virtual ~SessionContext(){} + virtual bool isLocal(const ConnectionToken* t) const = 0; virtual ConnectionState& getConnection() = 0; virtual framing::AMQP_ClientProxy& getProxy() = 0; virtual Broker& getBroker() = 0; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 19fb0a4a79..d719bbe145 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -96,6 +96,11 @@ ConnectionState& SessionState::getConnection() { return getHandler()->getConnection(); } +bool SessionState::isLocal(const ConnectionToken* t) const +{ + return isAttached() && &(handler->getConnection()) == t; +} + void SessionState::detach() { getConnection().outputTasks.removeOutputTask(&semanticState); Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 18acb6f096..4fc2ae4cc5 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -70,7 +70,7 @@ class SessionState : public framing::SessionState, { public: ~SessionState(); - bool isAttached() { return handler; } + bool isAttached() const { return handler; } void detach(); void attach(SessionHandler& handler); @@ -83,6 +83,7 @@ class SessionState : public framing::SessionState, /** @pre isAttached() */ ConnectionState& getConnection(); + bool isLocal(const ConnectionToken* t) const; uint32_t getTimeout() const { return timeout; } void setTimeout(uint32_t t) { timeout = t; } diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index 46ec308bd2..1a8630eb54 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -44,8 +44,12 @@ void TxPublish::rollback() throw(){ } void TxPublish::deliverTo(Queue::shared_ptr& queue){ - queues.push_back(queue); - delivered = true; + if (!queue->isLocal(msg)) { + queues.push_back(queue); + delivered = true; + } else { + QPID_LOG(debug, "Won't enqueue local message for " << queue->getName()); + } } TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg) |