summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/ConnectionToken.h1
-rw-r--r--cpp/src/qpid/broker/OwnershipToken.h3
-rw-r--r--cpp/src/qpid/broker/PreviewSessionState.cpp5
-rw-r--r--cpp/src/qpid/broker/PreviewSessionState.h3
-rw-r--r--cpp/src/qpid/broker/Queue.cpp18
-rw-r--r--cpp/src/qpid/broker/Queue.h2
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp7
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h4
-rw-r--r--cpp/src/qpid/broker/SessionContext.h1
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp5
-rw-r--r--cpp/src/qpid/broker/SessionState.h3
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp8
-rw-r--r--python/cpp_failing_0-10.txt2
-rw-r--r--python/qpid/testlib.py7
-rw-r--r--python/tests_0-10/dtx.py7
-rw-r--r--python/tests_0-10/message.py31
16 files changed, 74 insertions, 33 deletions
diff --git a/cpp/src/qpid/broker/ConnectionToken.h b/cpp/src/qpid/broker/ConnectionToken.h
index 38b7d7d098..0e3b301897 100644
--- a/cpp/src/qpid/broker/ConnectionToken.h
+++ b/cpp/src/qpid/broker/ConnectionToken.h
@@ -30,6 +30,7 @@ namespace qpid {
*/
class ConnectionToken : public OwnershipToken {
public:
+ virtual bool isLocal(const ConnectionToken* t) const { return this == t; }
virtual ~ConnectionToken(){}
};
}
diff --git a/cpp/src/qpid/broker/OwnershipToken.h b/cpp/src/qpid/broker/OwnershipToken.h
index 15f333941b..effd2f5b3c 100644
--- a/cpp/src/qpid/broker/OwnershipToken.h
+++ b/cpp/src/qpid/broker/OwnershipToken.h
@@ -24,8 +24,11 @@
namespace qpid {
namespace broker {
+class ConnectionToken;
+
class OwnershipToken{
public:
+ virtual bool isLocal(const ConnectionToken* t) const = 0;
virtual ~OwnershipToken(){}
};
diff --git a/cpp/src/qpid/broker/PreviewSessionState.cpp b/cpp/src/qpid/broker/PreviewSessionState.cpp
index 7188ffbf40..43c3b1509e 100644
--- a/cpp/src/qpid/broker/PreviewSessionState.cpp
+++ b/cpp/src/qpid/broker/PreviewSessionState.cpp
@@ -90,6 +90,11 @@ ConnectionState& PreviewSessionState::getConnection() {
return getHandler()->getConnection();
}
+bool PreviewSessionState::isLocal(const ConnectionToken* t) const
+{
+ return isAttached() && &(handler->getConnection()) == t;
+}
+
void PreviewSessionState::detach() {
getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/broker/PreviewSessionState.h b/cpp/src/qpid/broker/PreviewSessionState.h
index 6e8523317c..1aecb12e72 100644
--- a/cpp/src/qpid/broker/PreviewSessionState.h
+++ b/cpp/src/qpid/broker/PreviewSessionState.h
@@ -64,7 +64,7 @@ class PreviewSessionState : public framing::SessionState,
{
public:
~PreviewSessionState();
- bool isAttached() { return handler; }
+ bool isAttached() const { return handler; }
void detach();
void attach(PreviewSessionHandler& handler);
@@ -77,6 +77,7 @@ class PreviewSessionState : public framing::SessionState,
/** @pre isAttached() */
ConnectionState& getConnection();
+ bool isLocal(const ConnectionToken* t) const;
uint32_t getTimeout() const { return timeout; }
Broker& getBroker() { return broker; }
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 165830151d..436431fce1 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -59,6 +59,7 @@ Queue::Queue(const string& _name, bool _autodelete,
owner(_owner),
consumerCount(0),
exclusive(false),
+ noLocal(false),
persistenceId(0)
{
if (parent != 0)
@@ -90,6 +91,10 @@ void Queue::notifyDurableIOComplete()
notify();
}
+bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
+{
+ return noLocal && owner && owner->isLocal(msg->getPublisher());
+}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
@@ -97,9 +102,10 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
DeliverableMessage deliverable(msg);
alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
}
+ } else if (isLocal(msg)) {
+ //drop message
+ QPID_LOG(debug, "Dropping 'local' message from " << getName());
} else {
-
-
// if no store then mark as enqueued
if (!enqueue(0, msg)){
push(msg);
@@ -468,6 +474,7 @@ namespace
{
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
+ const std::string qpidNoLocal("no-local");
}
void Queue::create(const FieldTable& _settings)
@@ -484,8 +491,13 @@ void Queue::create(const FieldTable& _settings)
void Queue::configure(const FieldTable& _settings)
{
std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings));
- if (_policy->getMaxCount() || _policy->getMaxSize())
+ if (_policy->getMaxCount() || _policy->getMaxSize()) {
setPolicy(_policy);
+ }
+ if (owner) {
+ noLocal = _settings.get(qpidNoLocal);
+ QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+ }
}
void Queue::destroy()
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index e33cd7e5d7..880b048103 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -70,6 +70,7 @@ namespace qpid {
const OwnershipToken* owner;
uint32_t consumerCount;
bool exclusive;
+ bool noLocal;
Listeners listeners;
Messages messages;
mutable qpid::sys::Mutex consumerLock;
@@ -118,6 +119,7 @@ namespace qpid {
bool acquire(const QueuedMessage& msg);
+ bool isLocal(boost::intrusive_ptr<Message>& msg);
/**
* Delivers a message to the queue. Will record it as
* enqueued if persistent then process it.
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 2091e97584..3daf15f269 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -23,6 +23,7 @@
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
+#include "qpid/log/Statement.h"
#include <boost/format.hpp>
#include <boost/cast.hpp>
#include <boost/bind.hpp>
@@ -198,6 +199,12 @@ SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
+
+bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
+{
+ return session.isLocal(t);
+}
+
Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index c8aa9008cc..fc182e0bb6 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -21,11 +21,12 @@
#include "HandlerImpl.h"
+#include "ConnectionToken.h"
+#include "OwnershipToken.h"
#include "qpid/Exception.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/SequenceSet.h"
-#include "OwnershipToken.h"
#include <vector>
#include <boost/function.hpp>
@@ -140,6 +141,7 @@ class Queue;
bool ifUnused, bool ifEmpty);
void purge(const std::string& queue);
framing::Queue010QueryResult query(const std::string& queue);
+ bool isLocal(const ConnectionToken* t) const;
};
class MessageHandlerImpl :
diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h
index a289310b15..e3cc0a5fa3 100644
--- a/cpp/src/qpid/broker/SessionContext.h
+++ b/cpp/src/qpid/broker/SessionContext.h
@@ -38,6 +38,7 @@ class SessionContext : public sys::OutputControl
{
public:
virtual ~SessionContext(){}
+ virtual bool isLocal(const ConnectionToken* t) const = 0;
virtual ConnectionState& getConnection() = 0;
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 19fb0a4a79..d719bbe145 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -96,6 +96,11 @@ ConnectionState& SessionState::getConnection() {
return getHandler()->getConnection();
}
+bool SessionState::isLocal(const ConnectionToken* t) const
+{
+ return isAttached() && &(handler->getConnection()) == t;
+}
+
void SessionState::detach() {
getConnection().outputTasks.removeOutputTask(&semanticState);
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 18acb6f096..4fc2ae4cc5 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -70,7 +70,7 @@ class SessionState : public framing::SessionState,
{
public:
~SessionState();
- bool isAttached() { return handler; }
+ bool isAttached() const { return handler; }
void detach();
void attach(SessionHandler& handler);
@@ -83,6 +83,7 @@ class SessionState : public framing::SessionState,
/** @pre isAttached() */
ConnectionState& getConnection();
+ bool isLocal(const ConnectionToken* t) const;
uint32_t getTimeout() const { return timeout; }
void setTimeout(uint32_t t) { timeout = t; }
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index 46ec308bd2..1a8630eb54 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -44,8 +44,12 @@ void TxPublish::rollback() throw(){
}
void TxPublish::deliverTo(Queue::shared_ptr& queue){
- queues.push_back(queue);
- delivered = true;
+ if (!queue->isLocal(msg)) {
+ queues.push_back(queue);
+ delivered = true;
+ } else {
+ QPID_LOG(debug, "Won't enqueue local message for " << queue->getName());
+ }
}
TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg)
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt
index 58cad3344b..cc1e57bca0 100644
--- a/python/cpp_failing_0-10.txt
+++ b/python/cpp_failing_0-10.txt
@@ -3,8 +3,6 @@ tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair
tests.codec.FieldTableTestCase.test_field_table_name_value_pair
tests_0-10.execution.ExecutionTests.test_flush
tests_0-10.dtx.DtxTests.test_recover
-tests_0-10.message.MessageTests.test_consume_no_local
-tests_0-10.message.MessageTests.test_consume_no_local_awkward
tests_0-10.message.MessageTests.test_no_size
tests_0-10.message.MessageTests.test_qos_prefetch_count
tests_0-10.message.MessageTests.test_qos_prefetch_size
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index f633c4e77d..b6cf6d7446 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -361,3 +361,10 @@ class TestBase010(unittest.TestCase):
def tearDown(self):
if not self.session.error(): self.session.close(timeout=10)
self.conn.close(timeout=10)
+
+ def subscribe(self, session=None, **keys):
+ session = session or self.session
+ consumer_tag = keys["destination"]
+ session.message_subscribe(**keys)
+ session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py
index 2483c6f16d..042df521ae 100644
--- a/python/tests_0-10/dtx.py
+++ b/python/tests_0-10/dtx.py
@@ -667,10 +667,3 @@ class DtxTests(TestBase010):
dp=session.delivery_properties(routing_key=key)
mp=session.message_properties(correlation_id=id)
session.message_transfer(message=Message(dp, mp, body))
-
- def subscribe(self, session=None, **keys):
- session = session or self.session
- consumer_tag = keys["destination"]
- session.message_subscribe(**keys)
- session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
- session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index df9fde1593..2b1b446e7a 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -28,33 +28,33 @@ from qpid.content import Content
class MessageTests(TestBase010):
"""Tests for 'methods' on the amqp message 'class'"""
- def test_consume_no_local(self):
+ def test_no_local(self):
"""
Test that the no_local flag is honoured in the consume method
"""
session = self.session
- #setup, declare two queues:
+ #setup, declare two queues one of which excludes delivery of locally sent messages
session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
- session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True)
- #establish two consumers one of which excludes delivery of locally sent messages
+ session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True, arguments={'no-local':'true'})
+ #establish two consumers
self.subscribe(destination="local_included", queue="test-queue-1a")
- self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
+ self.subscribe(destination="local_excluded", queue="test-queue-1b")
#send a message
- session.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
- session.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me"))
#check the queues of the two consumers
excluded = session.incoming("local_excluded")
included = session.incoming("local_included")
msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.body)
+ self.assertEqual("deliver-me", msg.body)
try:
excluded.get(timeout=1)
self.fail("Received locally published message though no_local=true")
except Empty: None
- def test_consume_no_local_awkward(self):
+ def test_no_local_awkward(self):
"""
If an exclusive queue gets a no-local delivered to it, that
@@ -67,19 +67,18 @@ class MessageTests(TestBase010):
session = self.session
#setup:
- session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'no-local':'true'})
#establish consumer which excludes delivery of locally sent messages
- self.subscribe(destination="local_excluded", queue="test-queue", no_local=True)
+ self.subscribe(destination="local_excluded", queue="test-queue")
#send a 'local' message
- session.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "local"))
#send a non local message
other = self.connect()
- session2 = other.session(1)
- session2.session_open()
- session2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign"))
- session2.session_close()
+ session2 = other.session("my-session", 1)
+ session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"), "foreign"))
+ session2.close()
other.close()
#check that the second message only is delivered