diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index b3d8fda53b..628d969c69 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete, store(_store), owner(_owner), consumerCount(0), - exclusive(false), + exclusive(0), noLocal(false), persistenceId(0) { @@ -91,9 +91,18 @@ void Queue::notifyDurableIOComplete() notify(); } +bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) +{ + return token && token->isLocal(msg->getPublisher()); +} + bool Queue::isLocal(boost::intrusive_ptr<Message>& msg) { - return noLocal && owner && owner->isLocal(msg->getPublisher()); + //message is considered local if it was published on the same + //connection as that of the session which declared this queue + //exclusive (owner) or which has an exclusive subscription + //(exclusive) + return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg)); } void Queue::deliver(boost::intrusive_ptr<Message>& msg){ @@ -328,7 +337,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) { return false; } -void Queue::consume(Consumer&, bool requestExclusive){ +void Queue::consume(Consumer& c, bool requestExclusive){ Mutex::ScopedLock locker(consumerLock); if(exclusive) { throw AccessRefusedException( @@ -338,7 +347,7 @@ void Queue::consume(Consumer&, bool requestExclusive){ throw AccessRefusedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); } else { - exclusive = true; + exclusive = c.getSession(); } } consumerCount++; @@ -352,7 +361,7 @@ void Queue::cancel(Consumer& c){ removeListener(c); Mutex::ScopedLock locker(consumerLock); consumerCount--; - if(exclusive) exclusive = false; + if(exclusive) exclusive = 0; if (mgmtObject.get() != 0){ mgmtObject->dec_consumers (); } @@ -485,10 +494,9 @@ void Queue::configure(const FieldTable& _settings) if (_policy->getMaxCount() || _policy->getMaxSize()) { setPolicy(_policy); } - if (owner) { - noLocal = _settings.get(qpidNoLocal); - QPID_LOG(debug, "Configured queue with no-local=" << noLocal); - } + //set this regardless of owner to allow use of no-local with exclusive consumers also + noLocal = _settings.get(qpidNoLocal); + QPID_LOG(debug, "Configured queue with no-local=" << noLocal); if (mgmtObject.get() != 0) mgmtObject->set_arguments (_settings); } |