diff options
author | Gordon Sim <gsim@apache.org> | 2008-03-31 12:51:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-03-31 12:51:36 +0000 |
commit | 36cfeb13b8ad4b532f7f9c2b48ac2353e6217bcd (patch) | |
tree | 3131f539a580a7ef905623787d86ba2cdb515f62 | |
parent | 4c0600f324a10a59cf505b991e687d9888fa3a16 (diff) | |
download | qpid-python-36cfeb13b8ad4b532f7f9c2b48ac2353e6217bcd.tar.gz |
Re-introduced old 'no-local' behaviour for exclusive queues via a proprietary arg to queue.declare.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@642981 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/ConnectionToken.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/OwnershipToken.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PreviewSessionState.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PreviewSessionState.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionContext.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.cpp | 8 | ||||
-rw-r--r-- | python/cpp_failing_0-10.txt | 2 | ||||
-rw-r--r-- | python/qpid/testlib.py | 7 | ||||
-rw-r--r-- | python/tests_0-10/dtx.py | 7 | ||||
-rw-r--r-- | python/tests_0-10/message.py | 31 |
16 files changed, 74 insertions, 33 deletions
diff --git a/cpp/src/qpid/broker/ConnectionToken.h b/cpp/src/qpid/broker/ConnectionToken.h index 38b7d7d098..0e3b301897 100644 --- a/cpp/src/qpid/broker/ConnectionToken.h +++ b/cpp/src/qpid/broker/ConnectionToken.h @@ -30,6 +30,7 @@ namespace qpid { */ class ConnectionToken : public OwnershipToken { public: + virtual bool isLocal(const ConnectionToken* t) const { return this == t; } virtual ~ConnectionToken(){} }; } diff --git a/cpp/src/qpid/broker/OwnershipToken.h b/cpp/src/qpid/broker/OwnershipToken.h index 15f333941b..effd2f5b3c 100644 --- a/cpp/src/qpid/broker/OwnershipToken.h +++ b/cpp/src/qpid/broker/OwnershipToken.h @@ -24,8 +24,11 @@ namespace qpid { namespace broker { +class ConnectionToken; + class OwnershipToken{ public: + virtual bool isLocal(const ConnectionToken* t) const = 0; virtual ~OwnershipToken(){} }; diff --git a/cpp/src/qpid/broker/PreviewSessionState.cpp b/cpp/src/qpid/broker/PreviewSessionState.cpp index 7188ffbf40..43c3b1509e 100644 --- a/cpp/src/qpid/broker/PreviewSessionState.cpp +++ b/cpp/src/qpid/broker/PreviewSessionState.cpp @@ -90,6 +90,11 @@ ConnectionState& PreviewSessionState::getConnection() { return getHandler()->getConnection(); } +bool PreviewSessionState::isLocal(const ConnectionToken* t) const +{ + return isAttached() && &(handler->getConnection()) == t; +} + void PreviewSessionState::detach() { getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/broker/PreviewSessionState.h b/cpp/src/qpid/broker/PreviewSessionState.h index 6e8523317c..1aecb12e72 100644 --- a/cpp/src/qpid/broker/PreviewSessionState.h +++ b/cpp/src/qpid/broker/PreviewSessionState.h @@ -64,7 +64,7 @@ class PreviewSessionState : public framing::SessionState, { public: ~PreviewSessionState(); - bool isAttached() { return handler; } + bool isAttached() const { return handler; } void detach(); void attach(PreviewSessionHandler& handler); @@ -77,6 +77,7 @@ class PreviewSessionState : public framing::SessionState, /** @pre isAttached() */ ConnectionState& getConnection(); + bool isLocal(const ConnectionToken* t) const; uint32_t getTimeout() const { return timeout; } Broker& getBroker() { return broker; } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 165830151d..436431fce1 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -59,6 +59,7 @@ Queue::Queue(const string& _name, bool _autodelete, owner(_owner), consumerCount(0), exclusive(false), + noLocal(false), persistenceId(0) { if (parent != 0) @@ -90,6 +91,10 @@ void Queue::notifyDurableIOComplete() notify(); } +bool Queue::isLocal(boost::intrusive_ptr<Message>& msg) +{ + return noLocal && owner && owner->isLocal(msg->getPublisher()); +} void Queue::deliver(boost::intrusive_ptr<Message>& msg){ if (msg->isImmediate() && getConsumerCount() == 0) { @@ -97,9 +102,10 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ DeliverableMessage deliverable(msg); alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); } + } else if (isLocal(msg)) { + //drop message + QPID_LOG(debug, "Dropping 'local' message from " << getName()); } else { - - // if no store then mark as enqueued if (!enqueue(0, msg)){ push(msg); @@ -468,6 +474,7 @@ namespace { const std::string qpidMaxSize("qpid.max_size"); const std::string qpidMaxCount("qpid.max_count"); + const std::string qpidNoLocal("no-local"); } void Queue::create(const FieldTable& _settings) @@ -484,8 +491,13 @@ void Queue::create(const FieldTable& _settings) void Queue::configure(const FieldTable& _settings) { std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings)); - if (_policy->getMaxCount() || _policy->getMaxSize()) + if (_policy->getMaxCount() || _policy->getMaxSize()) { setPolicy(_policy); + } + if (owner) { + noLocal = _settings.get(qpidNoLocal); + QPID_LOG(debug, "Configured queue with no-local=" << noLocal); + } } void Queue::destroy() diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index e33cd7e5d7..880b048103 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -70,6 +70,7 @@ namespace qpid { const OwnershipToken* owner; uint32_t consumerCount; bool exclusive; + bool noLocal; Listeners listeners; Messages messages; mutable qpid::sys::Mutex consumerLock; @@ -118,6 +119,7 @@ namespace qpid { bool acquire(const QueuedMessage& msg); + bool isLocal(boost::intrusive_ptr<Message>& msg); /** * Delivers a message to the queue. Will record it as * enqueued if persistent then process it. diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 2091e97584..3daf15f269 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -23,6 +23,7 @@ #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" +#include "qpid/log/Statement.h" #include <boost/format.hpp> #include <boost/cast.hpp> #include <boost/bind.hpp> @@ -198,6 +199,12 @@ SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl() exclusiveQueues.erase(exclusiveQueues.begin()); } } + +bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const +{ + return session.isLocal(t); +} + Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) { diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index c8aa9008cc..fc182e0bb6 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -21,11 +21,12 @@ #include "HandlerImpl.h" +#include "ConnectionToken.h" +#include "OwnershipToken.h" #include "qpid/Exception.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/SequenceSet.h" -#include "OwnershipToken.h" #include <vector> #include <boost/function.hpp> @@ -140,6 +141,7 @@ class Queue; bool ifUnused, bool ifEmpty); void purge(const std::string& queue); framing::Queue010QueryResult query(const std::string& queue); + bool isLocal(const ConnectionToken* t) const; }; class MessageHandlerImpl : diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h index a289310b15..e3cc0a5fa3 100644 --- a/cpp/src/qpid/broker/SessionContext.h +++ b/cpp/src/qpid/broker/SessionContext.h @@ -38,6 +38,7 @@ class SessionContext : public sys::OutputControl { public: virtual ~SessionContext(){} + virtual bool isLocal(const ConnectionToken* t) const = 0; virtual ConnectionState& getConnection() = 0; virtual framing::AMQP_ClientProxy& getProxy() = 0; virtual Broker& getBroker() = 0; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 19fb0a4a79..d719bbe145 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -96,6 +96,11 @@ ConnectionState& SessionState::getConnection() { return getHandler()->getConnection(); } +bool SessionState::isLocal(const ConnectionToken* t) const +{ + return isAttached() && &(handler->getConnection()) == t; +} + void SessionState::detach() { getConnection().outputTasks.removeOutputTask(&semanticState); Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 18acb6f096..4fc2ae4cc5 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -70,7 +70,7 @@ class SessionState : public framing::SessionState, { public: ~SessionState(); - bool isAttached() { return handler; } + bool isAttached() const { return handler; } void detach(); void attach(SessionHandler& handler); @@ -83,6 +83,7 @@ class SessionState : public framing::SessionState, /** @pre isAttached() */ ConnectionState& getConnection(); + bool isLocal(const ConnectionToken* t) const; uint32_t getTimeout() const { return timeout; } void setTimeout(uint32_t t) { timeout = t; } diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index 46ec308bd2..1a8630eb54 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -44,8 +44,12 @@ void TxPublish::rollback() throw(){ } void TxPublish::deliverTo(Queue::shared_ptr& queue){ - queues.push_back(queue); - delivered = true; + if (!queue->isLocal(msg)) { + queues.push_back(queue); + delivered = true; + } else { + QPID_LOG(debug, "Won't enqueue local message for " << queue->getName()); + } } TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg) diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index 58cad3344b..cc1e57bca0 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -3,8 +3,6 @@ tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair tests.codec.FieldTableTestCase.test_field_table_name_value_pair tests_0-10.execution.ExecutionTests.test_flush tests_0-10.dtx.DtxTests.test_recover -tests_0-10.message.MessageTests.test_consume_no_local -tests_0-10.message.MessageTests.test_consume_no_local_awkward tests_0-10.message.MessageTests.test_no_size tests_0-10.message.MessageTests.test_qos_prefetch_count tests_0-10.message.MessageTests.test_qos_prefetch_size diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index f633c4e77d..b6cf6d7446 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -361,3 +361,10 @@ class TestBase010(unittest.TestCase): def tearDown(self): if not self.session.error(): self.session.close(timeout=10) self.conn.close(timeout=10) + + def subscribe(self, session=None, **keys): + session = session or self.session + consumer_tag = keys["destination"] + session.message_subscribe(**keys) + session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index 2483c6f16d..042df521ae 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -667,10 +667,3 @@ class DtxTests(TestBase010): dp=session.delivery_properties(routing_key=key) mp=session.message_properties(correlation_id=id) session.message_transfer(message=Message(dp, mp, body)) - - def subscribe(self, session=None, **keys): - session = session or self.session - consumer_tag = keys["destination"] - session.message_subscribe(**keys) - session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) - session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index df9fde1593..2b1b446e7a 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -28,33 +28,33 @@ from qpid.content import Content class MessageTests(TestBase010): """Tests for 'methods' on the amqp message 'class'""" - def test_consume_no_local(self): + def test_no_local(self): """ Test that the no_local flag is honoured in the consume method """ session = self.session - #setup, declare two queues: + #setup, declare two queues one of which excludes delivery of locally sent messages session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) - session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True) - #establish two consumers one of which excludes delivery of locally sent messages + session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) + #establish two consumers self.subscribe(destination="local_included", queue="test-queue-1a") - self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) + self.subscribe(destination="local_excluded", queue="test-queue-1b") #send a message - session.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) - session.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) #check the queues of the two consumers excluded = session.incoming("local_excluded") included = session.incoming("local_included") msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.body) + self.assertEqual("deliver-me", msg.body) try: excluded.get(timeout=1) self.fail("Received locally published message though no_local=true") except Empty: None - def test_consume_no_local_awkward(self): + def test_no_local_awkward(self): """ If an exclusive queue gets a no-local delivered to it, that @@ -67,19 +67,18 @@ class MessageTests(TestBase010): session = self.session #setup: - session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) #establish consumer which excludes delivery of locally sent messages - self.subscribe(destination="local_excluded", queue="test-queue", no_local=True) + self.subscribe(destination="local_excluded", queue="test-queue") #send a 'local' message - session.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "local")) #send a non local message other = self.connect() - session2 = other.session(1) - session2.session_open() - session2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign")) - session2.session_close() + session2 = other.session("my-session", 1) + session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"), "foreign")) + session2.close() other.close() #check that the second message only is delivered |