summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Consumer.h2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp26
-rw-r--r--cpp/src/qpid/broker/Queue.h2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp5
-rw-r--r--cpp/src/qpid/broker/SemanticState.h3
-rw-r--r--cpp/src/qpid/broker/SessionContext.h3
-rw-r--r--cpp/src/tests/QueueTest.cpp1
7 files changed, 30 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index 00eb41a428..4274ce823e 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -27,6 +27,7 @@ namespace qpid {
}}
#include "Message.h"
+#include "OwnershipToken.h"
namespace qpid {
namespace broker {
@@ -56,6 +57,7 @@ namespace qpid {
virtual void notify() = 0;
virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+ virtual OwnershipToken* getSession() = 0;
virtual ~Consumer(){}
};
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index b3d8fda53b..628d969c69 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete,
store(_store),
owner(_owner),
consumerCount(0),
- exclusive(false),
+ exclusive(0),
noLocal(false),
persistenceId(0)
{
@@ -91,9 +91,18 @@ void Queue::notifyDurableIOComplete()
notify();
}
+bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
+{
+ return token && token->isLocal(msg->getPublisher());
+}
+
bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
{
- return noLocal && owner && owner->isLocal(msg->getPublisher());
+ //message is considered local if it was published on the same
+ //connection as that of the session which declared this queue
+ //exclusive (owner) or which has an exclusive subscription
+ //(exclusive)
+ return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
@@ -328,7 +337,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) {
return false;
}
-void Queue::consume(Consumer&, bool requestExclusive){
+void Queue::consume(Consumer& c, bool requestExclusive){
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
throw AccessRefusedException(
@@ -338,7 +347,7 @@ void Queue::consume(Consumer&, bool requestExclusive){
throw AccessRefusedException(
QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
} else {
- exclusive = true;
+ exclusive = c.getSession();
}
}
consumerCount++;
@@ -352,7 +361,7 @@ void Queue::cancel(Consumer& c){
removeListener(c);
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
- if(exclusive) exclusive = false;
+ if(exclusive) exclusive = 0;
if (mgmtObject.get() != 0){
mgmtObject->dec_consumers ();
}
@@ -485,10 +494,9 @@ void Queue::configure(const FieldTable& _settings)
if (_policy->getMaxCount() || _policy->getMaxSize()) {
setPolicy(_policy);
}
- if (owner) {
- noLocal = _settings.get(qpidNoLocal);
- QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
- }
+ //set this regardless of owner to allow use of no-local with exclusive consumers also
+ noLocal = _settings.get(qpidNoLocal);
+ QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
if (mgmtObject.get() != 0)
mgmtObject->set_arguments (_settings);
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 8b92784b9a..18d28d32fb 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -70,7 +70,7 @@ namespace qpid {
MessageStore* store;
const OwnershipToken* owner;
uint32_t consumerCount;
- bool exclusive;
+ OwnershipToken* exclusive;
bool noLocal;
Listeners listeners;
Messages messages;
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 2c2d099fb1..ab6b82a232 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -267,6 +267,11 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
msgCredit(0),
byteCredit(0) {}
+OwnershipToken* SemanticState::ConsumerImpl::getSession()
+{
+ return &(parent->session);
+}
+
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
allocateCredit(msg.payload);
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 20a0239db0..84dc0fc5bb 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -80,6 +80,7 @@ class SemanticState : public framing::FrameHandler::Chains,
const string& name, Queue::shared_ptr queue,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
+ OwnershipToken* getSession();
bool deliver(QueuedMessage& msg);
bool filter(boost::intrusive_ptr<Message> msg);
bool accept(boost::intrusive_ptr<Message> msg);
@@ -93,7 +94,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void stop();
void complete(DeliveryRecord&);
Queue::shared_ptr getQueue() { return queue; }
- bool isBlocked() const { return blocked; }
+ bool isBlocked() const { return blocked; }
bool doOutput();
};
diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h
index e3cc0a5fa3..7a277964ab 100644
--- a/cpp/src/qpid/broker/SessionContext.h
+++ b/cpp/src/qpid/broker/SessionContext.h
@@ -27,6 +27,7 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/OutputControl.h"
#include "ConnectionState.h"
+#include "OwnershipToken.h"
#include <boost/noncopyable.hpp>
@@ -34,7 +35,7 @@
namespace qpid {
namespace broker {
-class SessionContext : public sys::OutputControl
+class SessionContext : public OwnershipToken, public sys::OutputControl
{
public:
virtual ~SessionContext(){}
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 1d454d9f4a..aec59a58bc 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -49,6 +49,7 @@ public:
return true;
};
void notify() {}
+ OwnershipToken* getSession() { return 0; }
};
class FailOnDeliver : public Deliverable