summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-22 12:05:52 +0000
committerGordon Sim <gsim@apache.org>2008-04-22 12:05:52 +0000
commitab24602c21632ffb3f0748331819b2e099b188da (patch)
treee74bfdae3be3c0220a9d8124d371802d84ea1eeb
parentceabd6c884c0f9f5315a13b6d7207895cd79ac6f (diff)
downloadqpid-python-ab24602c21632ffb3f0748331819b2e099b188da.tar.gz
QPID-944: do no-local checking where requested when there is an exclusive subscription active
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@650450 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h3
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp1
-rw-r--r--qpid/python/tests_0-10/message.py28
-rw-r--r--qpid/python/tests_0-10/queue.py2
9 files changed, 59 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index 00eb41a428..4274ce823e 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -27,6 +27,7 @@ namespace qpid {
}}
#include "Message.h"
+#include "OwnershipToken.h"
namespace qpid {
namespace broker {
@@ -56,6 +57,7 @@ namespace qpid {
virtual void notify() = 0;
virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+ virtual OwnershipToken* getSession() = 0;
virtual ~Consumer(){}
};
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index b3d8fda53b..628d969c69 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete,
store(_store),
owner(_owner),
consumerCount(0),
- exclusive(false),
+ exclusive(0),
noLocal(false),
persistenceId(0)
{
@@ -91,9 +91,18 @@ void Queue::notifyDurableIOComplete()
notify();
}
+bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
+{
+ return token && token->isLocal(msg->getPublisher());
+}
+
bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
{
- return noLocal && owner && owner->isLocal(msg->getPublisher());
+ //message is considered local if it was published on the same
+ //connection as that of the session which declared this queue
+ //exclusive (owner) or which has an exclusive subscription
+ //(exclusive)
+ return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
@@ -328,7 +337,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) {
return false;
}
-void Queue::consume(Consumer&, bool requestExclusive){
+void Queue::consume(Consumer& c, bool requestExclusive){
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
throw AccessRefusedException(
@@ -338,7 +347,7 @@ void Queue::consume(Consumer&, bool requestExclusive){
throw AccessRefusedException(
QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
} else {
- exclusive = true;
+ exclusive = c.getSession();
}
}
consumerCount++;
@@ -352,7 +361,7 @@ void Queue::cancel(Consumer& c){
removeListener(c);
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
- if(exclusive) exclusive = false;
+ if(exclusive) exclusive = 0;
if (mgmtObject.get() != 0){
mgmtObject->dec_consumers ();
}
@@ -485,10 +494,9 @@ void Queue::configure(const FieldTable& _settings)
if (_policy->getMaxCount() || _policy->getMaxSize()) {
setPolicy(_policy);
}
- if (owner) {
- noLocal = _settings.get(qpidNoLocal);
- QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
- }
+ //set this regardless of owner to allow use of no-local with exclusive consumers also
+ noLocal = _settings.get(qpidNoLocal);
+ QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
if (mgmtObject.get() != 0)
mgmtObject->set_arguments (_settings);
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 8b92784b9a..18d28d32fb 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -70,7 +70,7 @@ namespace qpid {
MessageStore* store;
const OwnershipToken* owner;
uint32_t consumerCount;
- bool exclusive;
+ OwnershipToken* exclusive;
bool noLocal;
Listeners listeners;
Messages messages;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 2c2d099fb1..ab6b82a232 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -267,6 +267,11 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
msgCredit(0),
byteCredit(0) {}
+OwnershipToken* SemanticState::ConsumerImpl::getSession()
+{
+ return &(parent->session);
+}
+
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
allocateCredit(msg.payload);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 20a0239db0..84dc0fc5bb 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -80,6 +80,7 @@ class SemanticState : public framing::FrameHandler::Chains,
const string& name, Queue::shared_ptr queue,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
+ OwnershipToken* getSession();
bool deliver(QueuedMessage& msg);
bool filter(boost::intrusive_ptr<Message> msg);
bool accept(boost::intrusive_ptr<Message> msg);
@@ -93,7 +94,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void stop();
void complete(DeliveryRecord&);
Queue::shared_ptr getQueue() { return queue; }
- bool isBlocked() const { return blocked; }
+ bool isBlocked() const { return blocked; }
bool doOutput();
};
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index e3cc0a5fa3..7a277964ab 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -27,6 +27,7 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/OutputControl.h"
#include "ConnectionState.h"
+#include "OwnershipToken.h"
#include <boost/noncopyable.hpp>
@@ -34,7 +35,7 @@
namespace qpid {
namespace broker {
-class SessionContext : public sys::OutputControl
+class SessionContext : public OwnershipToken, public sys::OutputControl
{
public:
virtual ~SessionContext(){}
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 1d454d9f4a..aec59a58bc 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -49,6 +49,7 @@ public:
return true;
};
void notify() {}
+ OwnershipToken* getSession() { return 0; }
};
class FailOnDeliver : public Deliverable
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py
index 2b1b446e7a..8302515b2f 100644
--- a/qpid/python/tests_0-10/message.py
+++ b/qpid/python/tests_0-10/message.py
@@ -92,6 +92,34 @@ class MessageTests(TestBase010):
#check queue is empty
self.assertEqual(0, session.queue_query(queue="test-queue").message_count)
+ def test_no_local_exclusive_subscribe(self):
+ """
+ Test that the no_local flag is honoured in the consume method
+ """
+ session = self.session
+
+ #setup, declare two queues one of which excludes delivery of
+ #locally sent messages but is not declared as exclusive
+ session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'})
+ #establish two consumers
+ self.subscribe(destination="local_included", queue="test-queue-1a")
+ self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True)
+
+ #send a message
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me"))
+
+ #check the queues of the two consumers
+ excluded = session.incoming("local_excluded")
+ included = session.incoming("local_included")
+ msg = included.get(timeout=1)
+ self.assertEqual("deliver-me", msg.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
def test_consume_exclusive(self):
"""
diff --git a/qpid/python/tests_0-10/queue.py b/qpid/python/tests_0-10/queue.py
index 758794dd52..97e7a92b87 100644
--- a/qpid/python/tests_0-10/queue.py
+++ b/qpid/python/tests_0-10/queue.py
@@ -223,7 +223,7 @@ class QueueTests(TestBase010):
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b"))
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c"))
session.queue_delete(queue="delete-me")
- #check that it has gone be declaring passively
+ #check that it has gone by declaring passively
try:
session.queue_declare(queue="delete-me", passive=True)
self.fail("Queue has not been deleted")