diff options
author | Gordon Sim <gsim@apache.org> | 2008-04-22 12:05:52 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-04-22 12:05:52 +0000 |
commit | 61959e29ee69f9cebb61b845272eededaec6f11e (patch) | |
tree | 2900fad5b05b665219197e2cbd6c7254b476d09e /cpp/src/qpid/broker/Queue.cpp | |
parent | 2b8e82875776feb8393c7791975acc9cf9fdb5e1 (diff) | |
download | qpid-python-61959e29ee69f9cebb61b845272eededaec6f11e.tar.gz |
QPID-944: do no-local checking where requested when there is an exclusive subscription active
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@650450 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index b3d8fda53b..628d969c69 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete, store(_store), owner(_owner), consumerCount(0), - exclusive(false), + exclusive(0), noLocal(false), persistenceId(0) { @@ -91,9 +91,18 @@ void Queue::notifyDurableIOComplete() notify(); } +bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) +{ + return token && token->isLocal(msg->getPublisher()); +} + bool Queue::isLocal(boost::intrusive_ptr<Message>& msg) { - return noLocal && owner && owner->isLocal(msg->getPublisher()); + //message is considered local if it was published on the same + //connection as that of the session which declared this queue + //exclusive (owner) or which has an exclusive subscription + //(exclusive) + return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg)); } void Queue::deliver(boost::intrusive_ptr<Message>& msg){ @@ -328,7 +337,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) { return false; } -void Queue::consume(Consumer&, bool requestExclusive){ +void Queue::consume(Consumer& c, bool requestExclusive){ Mutex::ScopedLock locker(consumerLock); if(exclusive) { throw AccessRefusedException( @@ -338,7 +347,7 @@ void Queue::consume(Consumer&, bool requestExclusive){ throw AccessRefusedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); } else { - exclusive = true; + exclusive = c.getSession(); } } consumerCount++; @@ -352,7 +361,7 @@ void Queue::cancel(Consumer& c){ removeListener(c); Mutex::ScopedLock locker(consumerLock); consumerCount--; - if(exclusive) exclusive = false; + if(exclusive) exclusive = 0; if (mgmtObject.get() != 0){ mgmtObject->dec_consumers (); } @@ -485,10 +494,9 @@ void Queue::configure(const FieldTable& _settings) if (_policy->getMaxCount() || _policy->getMaxSize()) { setPolicy(_policy); } - if (owner) { - noLocal = _settings.get(qpidNoLocal); - QPID_LOG(debug, "Configured queue with no-local=" << noLocal); - } + //set this regardless of owner to allow use of no-local with exclusive consumers also + noLocal = _settings.get(qpidNoLocal); + QPID_LOG(debug, "Configured queue with no-local=" << noLocal); if (mgmtObject.get() != 0) mgmtObject->set_arguments (_settings); } |