diff options
author | Gordon Sim <gsim@apache.org> | 2008-04-22 12:05:52 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-04-22 12:05:52 +0000 |
commit | ab24602c21632ffb3f0748331819b2e099b188da (patch) | |
tree | e74bfdae3be3c0220a9d8124d371802d84ea1eeb | |
parent | ceabd6c884c0f9f5315a13b6d7207895cd79ac6f (diff) | |
download | qpid-python-ab24602c21632ffb3f0748331819b2e099b188da.tar.gz |
QPID-944: do no-local checking where requested when there is an exclusive subscription active
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@650450 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 26 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionContext.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 1 | ||||
-rw-r--r-- | qpid/python/tests_0-10/message.py | 28 | ||||
-rw-r--r-- | qpid/python/tests_0-10/queue.py | 2 |
9 files changed, 59 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index 00eb41a428..4274ce823e 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -27,6 +27,7 @@ namespace qpid { }} #include "Message.h" +#include "OwnershipToken.h" namespace qpid { namespace broker { @@ -56,6 +57,7 @@ namespace qpid { virtual void notify() = 0; virtual bool filter(boost::intrusive_ptr<Message>) { return true; } virtual bool accept(boost::intrusive_ptr<Message>) { return true; } + virtual OwnershipToken* getSession() = 0; virtual ~Consumer(){} }; } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index b3d8fda53b..628d969c69 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete, store(_store), owner(_owner), consumerCount(0), - exclusive(false), + exclusive(0), noLocal(false), persistenceId(0) { @@ -91,9 +91,18 @@ void Queue::notifyDurableIOComplete() notify(); } +bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) +{ + return token && token->isLocal(msg->getPublisher()); +} + bool Queue::isLocal(boost::intrusive_ptr<Message>& msg) { - return noLocal && owner && owner->isLocal(msg->getPublisher()); + //message is considered local if it was published on the same + //connection as that of the session which declared this queue + //exclusive (owner) or which has an exclusive subscription + //(exclusive) + return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg)); } void Queue::deliver(boost::intrusive_ptr<Message>& msg){ @@ -328,7 +337,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) { return false; } -void Queue::consume(Consumer&, bool requestExclusive){ +void Queue::consume(Consumer& c, bool requestExclusive){ Mutex::ScopedLock locker(consumerLock); if(exclusive) { throw AccessRefusedException( @@ -338,7 +347,7 @@ void Queue::consume(Consumer&, bool requestExclusive){ throw AccessRefusedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); } else { - exclusive = true; + exclusive = c.getSession(); } } consumerCount++; @@ -352,7 +361,7 @@ void Queue::cancel(Consumer& c){ removeListener(c); Mutex::ScopedLock locker(consumerLock); consumerCount--; - if(exclusive) exclusive = false; + if(exclusive) exclusive = 0; if (mgmtObject.get() != 0){ mgmtObject->dec_consumers (); } @@ -485,10 +494,9 @@ void Queue::configure(const FieldTable& _settings) if (_policy->getMaxCount() || _policy->getMaxSize()) { setPolicy(_policy); } - if (owner) { - noLocal = _settings.get(qpidNoLocal); - QPID_LOG(debug, "Configured queue with no-local=" << noLocal); - } + //set this regardless of owner to allow use of no-local with exclusive consumers also + noLocal = _settings.get(qpidNoLocal); + QPID_LOG(debug, "Configured queue with no-local=" << noLocal); if (mgmtObject.get() != 0) mgmtObject->set_arguments (_settings); } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 8b92784b9a..18d28d32fb 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -70,7 +70,7 @@ namespace qpid { MessageStore* store; const OwnershipToken* owner; uint32_t consumerCount; - bool exclusive; + OwnershipToken* exclusive; bool noLocal; Listeners listeners; Messages messages; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 2c2d099fb1..ab6b82a232 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -267,6 +267,11 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, msgCredit(0), byteCredit(0) {} +OwnershipToken* SemanticState::ConsumerImpl::getSession() +{ + return &(parent->session); +} + bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { allocateCredit(msg.payload); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 20a0239db0..84dc0fc5bb 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -80,6 +80,7 @@ class SemanticState : public framing::FrameHandler::Chains, const string& name, Queue::shared_ptr queue, bool ack, bool nolocal, bool acquire); ~ConsumerImpl(); + OwnershipToken* getSession(); bool deliver(QueuedMessage& msg); bool filter(boost::intrusive_ptr<Message> msg); bool accept(boost::intrusive_ptr<Message> msg); @@ -93,7 +94,7 @@ class SemanticState : public framing::FrameHandler::Chains, void stop(); void complete(DeliveryRecord&); Queue::shared_ptr getQueue() { return queue; } - bool isBlocked() const { return blocked; } + bool isBlocked() const { return blocked; } bool doOutput(); }; diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index e3cc0a5fa3..7a277964ab 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -27,6 +27,7 @@ #include "qpid/framing/amqp_types.h" #include "qpid/sys/OutputControl.h" #include "ConnectionState.h" +#include "OwnershipToken.h" #include <boost/noncopyable.hpp> @@ -34,7 +35,7 @@ namespace qpid { namespace broker { -class SessionContext : public sys::OutputControl +class SessionContext : public OwnershipToken, public sys::OutputControl { public: virtual ~SessionContext(){} diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 1d454d9f4a..aec59a58bc 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -49,6 +49,7 @@ public: return true; }; void notify() {} + OwnershipToken* getSession() { return 0; } }; class FailOnDeliver : public Deliverable diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index 2b1b446e7a..8302515b2f 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -92,6 +92,34 @@ class MessageTests(TestBase010): #check queue is empty self.assertEqual(0, session.queue_query(queue="test-queue").message_count) + def test_no_local_exclusive_subscribe(self): + """ + Test that the no_local flag is honoured in the consume method + """ + session = self.session + + #setup, declare two queues one of which excludes delivery of + #locally sent messages but is not declared as exclusive + session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'}) + #establish two consumers + self.subscribe(destination="local_included", queue="test-queue-1a") + self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True) + + #send a message + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) + + #check the queues of the two consumers + excluded = session.incoming("local_excluded") + included = session.incoming("local_included") + msg = included.get(timeout=1) + self.assertEqual("deliver-me", msg.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + def test_consume_exclusive(self): """ diff --git a/qpid/python/tests_0-10/queue.py b/qpid/python/tests_0-10/queue.py index 758794dd52..97e7a92b87 100644 --- a/qpid/python/tests_0-10/queue.py +++ b/qpid/python/tests_0-10/queue.py @@ -223,7 +223,7 @@ class QueueTests(TestBase010): session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b")) session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c")) session.queue_delete(queue="delete-me") - #check that it has gone be declaring passively + #check that it has gone by declaring passively try: session.queue_declare(queue="delete-me", passive=True) self.fail("Queue has not been deleted") |