diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Consumer.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionContext.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 1 |
7 files changed, 30 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index 00eb41a428..4274ce823e 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -27,6 +27,7 @@ namespace qpid { }} #include "Message.h" +#include "OwnershipToken.h" namespace qpid { namespace broker { @@ -56,6 +57,7 @@ namespace qpid { virtual void notify() = 0; virtual bool filter(boost::intrusive_ptr<Message>) { return true; } virtual bool accept(boost::intrusive_ptr<Message>) { return true; } + virtual OwnershipToken* getSession() = 0; virtual ~Consumer(){} }; } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index b3d8fda53b..628d969c69 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete, store(_store), owner(_owner), consumerCount(0), - exclusive(false), + exclusive(0), noLocal(false), persistenceId(0) { @@ -91,9 +91,18 @@ void Queue::notifyDurableIOComplete() notify(); } +bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) +{ + return token && token->isLocal(msg->getPublisher()); +} + bool Queue::isLocal(boost::intrusive_ptr<Message>& msg) { - return noLocal && owner && owner->isLocal(msg->getPublisher()); + //message is considered local if it was published on the same + //connection as that of the session which declared this queue + //exclusive (owner) or which has an exclusive subscription + //(exclusive) + return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg)); } void Queue::deliver(boost::intrusive_ptr<Message>& msg){ @@ -328,7 +337,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) { return false; } -void Queue::consume(Consumer&, bool requestExclusive){ +void Queue::consume(Consumer& c, bool requestExclusive){ Mutex::ScopedLock locker(consumerLock); if(exclusive) { throw AccessRefusedException( @@ -338,7 +347,7 @@ void Queue::consume(Consumer&, bool requestExclusive){ throw AccessRefusedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); } else { - exclusive = true; + exclusive = c.getSession(); } } consumerCount++; @@ -352,7 +361,7 @@ void Queue::cancel(Consumer& c){ removeListener(c); Mutex::ScopedLock locker(consumerLock); consumerCount--; - if(exclusive) exclusive = false; + if(exclusive) exclusive = 0; if (mgmtObject.get() != 0){ mgmtObject->dec_consumers (); } @@ -485,10 +494,9 @@ void Queue::configure(const FieldTable& _settings) if (_policy->getMaxCount() || _policy->getMaxSize()) { setPolicy(_policy); } - if (owner) { - noLocal = _settings.get(qpidNoLocal); - QPID_LOG(debug, "Configured queue with no-local=" << noLocal); - } + //set this regardless of owner to allow use of no-local with exclusive consumers also + noLocal = _settings.get(qpidNoLocal); + QPID_LOG(debug, "Configured queue with no-local=" << noLocal); if (mgmtObject.get() != 0) mgmtObject->set_arguments (_settings); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 8b92784b9a..18d28d32fb 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -70,7 +70,7 @@ namespace qpid { MessageStore* store; const OwnershipToken* owner; uint32_t consumerCount; - bool exclusive; + OwnershipToken* exclusive; bool noLocal; Listeners listeners; Messages messages; diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 2c2d099fb1..ab6b82a232 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -267,6 +267,11 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, msgCredit(0), byteCredit(0) {} +OwnershipToken* SemanticState::ConsumerImpl::getSession() +{ + return &(parent->session); +} + bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { allocateCredit(msg.payload); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 20a0239db0..84dc0fc5bb 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -80,6 +80,7 @@ class SemanticState : public framing::FrameHandler::Chains, const string& name, Queue::shared_ptr queue, bool ack, bool nolocal, bool acquire); ~ConsumerImpl(); + OwnershipToken* getSession(); bool deliver(QueuedMessage& msg); bool filter(boost::intrusive_ptr<Message> msg); bool accept(boost::intrusive_ptr<Message> msg); @@ -93,7 +94,7 @@ class SemanticState : public framing::FrameHandler::Chains, void stop(); void complete(DeliveryRecord&); Queue::shared_ptr getQueue() { return queue; } - bool isBlocked() const { return blocked; } + bool isBlocked() const { return blocked; } bool doOutput(); }; diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h index e3cc0a5fa3..7a277964ab 100644 --- a/cpp/src/qpid/broker/SessionContext.h +++ b/cpp/src/qpid/broker/SessionContext.h @@ -27,6 +27,7 @@ #include "qpid/framing/amqp_types.h" #include "qpid/sys/OutputControl.h" #include "ConnectionState.h" +#include "OwnershipToken.h" #include <boost/noncopyable.hpp> @@ -34,7 +35,7 @@ namespace qpid { namespace broker { -class SessionContext : public sys::OutputControl +class SessionContext : public OwnershipToken, public sys::OutputControl { public: virtual ~SessionContext(){} diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 1d454d9f4a..aec59a58bc 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -49,6 +49,7 @@ public: return true; }; void notify() {} + OwnershipToken* getSession() { return 0; } }; class FailOnDeliver : public Deliverable |