summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp26
1 files changed, 17 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index b3d8fda53b..628d969c69 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete,
store(_store),
owner(_owner),
consumerCount(0),
- exclusive(false),
+ exclusive(0),
noLocal(false),
persistenceId(0)
{
@@ -91,9 +91,18 @@ void Queue::notifyDurableIOComplete()
notify();
}
+bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
+{
+ return token && token->isLocal(msg->getPublisher());
+}
+
bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
{
- return noLocal && owner && owner->isLocal(msg->getPublisher());
+ //message is considered local if it was published on the same
+ //connection as that of the session which declared this queue
+ //exclusive (owner) or which has an exclusive subscription
+ //(exclusive)
+ return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
@@ -328,7 +337,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) {
return false;
}
-void Queue::consume(Consumer&, bool requestExclusive){
+void Queue::consume(Consumer& c, bool requestExclusive){
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
throw AccessRefusedException(
@@ -338,7 +347,7 @@ void Queue::consume(Consumer&, bool requestExclusive){
throw AccessRefusedException(
QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
} else {
- exclusive = true;
+ exclusive = c.getSession();
}
}
consumerCount++;
@@ -352,7 +361,7 @@ void Queue::cancel(Consumer& c){
removeListener(c);
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
- if(exclusive) exclusive = false;
+ if(exclusive) exclusive = 0;
if (mgmtObject.get() != 0){
mgmtObject->dec_consumers ();
}
@@ -485,10 +494,9 @@ void Queue::configure(const FieldTable& _settings)
if (_policy->getMaxCount() || _policy->getMaxSize()) {
setPolicy(_policy);
}
- if (owner) {
- noLocal = _settings.get(qpidNoLocal);
- QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
- }
+ //set this regardless of owner to allow use of no-local with exclusive consumers also
+ noLocal = _settings.get(qpidNoLocal);
+ QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
if (mgmtObject.get() != 0)
mgmtObject->set_arguments (_settings);
}