summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-31 12:51:36 +0000
committerGordon Sim <gsim@apache.org>2008-03-31 12:51:36 +0000
commit36cfeb13b8ad4b532f7f9c2b48ac2353e6217bcd (patch)
tree3131f539a580a7ef905623787d86ba2cdb515f62 /cpp/src
parent4c0600f324a10a59cf505b991e687d9888fa3a16 (diff)
downloadqpid-python-36cfeb13b8ad4b532f7f9c2b48ac2353e6217bcd.tar.gz
Re-introduced old 'no-local' behaviour for exclusive queues via a proprietary arg to queue.declare.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@642981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/ConnectionToken.h1
-rw-r--r--cpp/src/qpid/broker/OwnershipToken.h3
-rw-r--r--cpp/src/qpid/broker/PreviewSessionState.cpp5
-rw-r--r--cpp/src/qpid/broker/PreviewSessionState.h3
-rw-r--r--cpp/src/qpid/broker/Queue.cpp18
-rw-r--r--cpp/src/qpid/broker/Queue.h2
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp7
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h4
-rw-r--r--cpp/src/qpid/broker/SessionContext.h1
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp5
-rw-r--r--cpp/src/qpid/broker/SessionState.h3
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp8
12 files changed, 52 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/ConnectionToken.h b/cpp/src/qpid/broker/ConnectionToken.h
index 38b7d7d098..0e3b301897 100644
--- a/cpp/src/qpid/broker/ConnectionToken.h
+++ b/cpp/src/qpid/broker/ConnectionToken.h
@@ -30,6 +30,7 @@ namespace qpid {
*/
class ConnectionToken : public OwnershipToken {
public:
+ virtual bool isLocal(const ConnectionToken* t) const { return this == t; }
virtual ~ConnectionToken(){}
};
}
diff --git a/cpp/src/qpid/broker/OwnershipToken.h b/cpp/src/qpid/broker/OwnershipToken.h
index 15f333941b..effd2f5b3c 100644
--- a/cpp/src/qpid/broker/OwnershipToken.h
+++ b/cpp/src/qpid/broker/OwnershipToken.h
@@ -24,8 +24,11 @@
namespace qpid {
namespace broker {
+class ConnectionToken;
+
class OwnershipToken{
public:
+ virtual bool isLocal(const ConnectionToken* t) const = 0;
virtual ~OwnershipToken(){}
};
diff --git a/cpp/src/qpid/broker/PreviewSessionState.cpp b/cpp/src/qpid/broker/PreviewSessionState.cpp
index 7188ffbf40..43c3b1509e 100644
--- a/cpp/src/qpid/broker/PreviewSessionState.cpp
+++ b/cpp/src/qpid/broker/PreviewSessionState.cpp
@@ -90,6 +90,11 @@ ConnectionState& PreviewSessionState::getConnection() {
return getHandler()->getConnection();
}
+bool PreviewSessionState::isLocal(const ConnectionToken* t) const
+{
+ return isAttached() && &(handler->getConnection()) == t;
+}
+
void PreviewSessionState::detach() {
getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/broker/PreviewSessionState.h b/cpp/src/qpid/broker/PreviewSessionState.h
index 6e8523317c..1aecb12e72 100644
--- a/cpp/src/qpid/broker/PreviewSessionState.h
+++ b/cpp/src/qpid/broker/PreviewSessionState.h
@@ -64,7 +64,7 @@ class PreviewSessionState : public framing::SessionState,
{
public:
~PreviewSessionState();
- bool isAttached() { return handler; }
+ bool isAttached() const { return handler; }
void detach();
void attach(PreviewSessionHandler& handler);
@@ -77,6 +77,7 @@ class PreviewSessionState : public framing::SessionState,
/** @pre isAttached() */
ConnectionState& getConnection();
+ bool isLocal(const ConnectionToken* t) const;
uint32_t getTimeout() const { return timeout; }
Broker& getBroker() { return broker; }
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 165830151d..436431fce1 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -59,6 +59,7 @@ Queue::Queue(const string& _name, bool _autodelete,
owner(_owner),
consumerCount(0),
exclusive(false),
+ noLocal(false),
persistenceId(0)
{
if (parent != 0)
@@ -90,6 +91,10 @@ void Queue::notifyDurableIOComplete()
notify();
}
+bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
+{
+ return noLocal && owner && owner->isLocal(msg->getPublisher());
+}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
@@ -97,9 +102,10 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
DeliverableMessage deliverable(msg);
alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
}
+ } else if (isLocal(msg)) {
+ //drop message
+ QPID_LOG(debug, "Dropping 'local' message from " << getName());
} else {
-
-
// if no store then mark as enqueued
if (!enqueue(0, msg)){
push(msg);
@@ -468,6 +474,7 @@ namespace
{
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
+ const std::string qpidNoLocal("no-local");
}
void Queue::create(const FieldTable& _settings)
@@ -484,8 +491,13 @@ void Queue::create(const FieldTable& _settings)
void Queue::configure(const FieldTable& _settings)
{
std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings));
- if (_policy->getMaxCount() || _policy->getMaxSize())
+ if (_policy->getMaxCount() || _policy->getMaxSize()) {
setPolicy(_policy);
+ }
+ if (owner) {
+ noLocal = _settings.get(qpidNoLocal);
+ QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+ }
}
void Queue::destroy()
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index e33cd7e5d7..880b048103 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -70,6 +70,7 @@ namespace qpid {
const OwnershipToken* owner;
uint32_t consumerCount;
bool exclusive;
+ bool noLocal;
Listeners listeners;
Messages messages;
mutable qpid::sys::Mutex consumerLock;
@@ -118,6 +119,7 @@ namespace qpid {
bool acquire(const QueuedMessage& msg);
+ bool isLocal(boost::intrusive_ptr<Message>& msg);
/**
* Delivers a message to the queue. Will record it as
* enqueued if persistent then process it.
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 2091e97584..3daf15f269 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -23,6 +23,7 @@
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
+#include "qpid/log/Statement.h"
#include <boost/format.hpp>
#include <boost/cast.hpp>
#include <boost/bind.hpp>
@@ -198,6 +199,12 @@ SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
+
+bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
+{
+ return session.isLocal(t);
+}
+
Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index c8aa9008cc..fc182e0bb6 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -21,11 +21,12 @@
#include "HandlerImpl.h"
+#include "ConnectionToken.h"
+#include "OwnershipToken.h"
#include "qpid/Exception.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/SequenceSet.h"
-#include "OwnershipToken.h"
#include <vector>
#include <boost/function.hpp>
@@ -140,6 +141,7 @@ class Queue;
bool ifUnused, bool ifEmpty);
void purge(const std::string& queue);
framing::Queue010QueryResult query(const std::string& queue);
+ bool isLocal(const ConnectionToken* t) const;
};
class MessageHandlerImpl :
diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h
index a289310b15..e3cc0a5fa3 100644
--- a/cpp/src/qpid/broker/SessionContext.h
+++ b/cpp/src/qpid/broker/SessionContext.h
@@ -38,6 +38,7 @@ class SessionContext : public sys::OutputControl
{
public:
virtual ~SessionContext(){}
+ virtual bool isLocal(const ConnectionToken* t) const = 0;
virtual ConnectionState& getConnection() = 0;
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 19fb0a4a79..d719bbe145 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -96,6 +96,11 @@ ConnectionState& SessionState::getConnection() {
return getHandler()->getConnection();
}
+bool SessionState::isLocal(const ConnectionToken* t) const
+{
+ return isAttached() && &(handler->getConnection()) == t;
+}
+
void SessionState::detach() {
getConnection().outputTasks.removeOutputTask(&semanticState);
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 18acb6f096..4fc2ae4cc5 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -70,7 +70,7 @@ class SessionState : public framing::SessionState,
{
public:
~SessionState();
- bool isAttached() { return handler; }
+ bool isAttached() const { return handler; }
void detach();
void attach(SessionHandler& handler);
@@ -83,6 +83,7 @@ class SessionState : public framing::SessionState,
/** @pre isAttached() */
ConnectionState& getConnection();
+ bool isLocal(const ConnectionToken* t) const;
uint32_t getTimeout() const { return timeout; }
void setTimeout(uint32_t t) { timeout = t; }
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index 46ec308bd2..1a8630eb54 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -44,8 +44,12 @@ void TxPublish::rollback() throw(){
}
void TxPublish::deliverTo(Queue::shared_ptr& queue){
- queues.push_back(queue);
- delivered = true;
+ if (!queue->isLocal(msg)) {
+ queues.push_back(queue);
+ delivered = true;
+ } else {
+ QPID_LOG(debug, "Won't enqueue local message for " << queue->getName());
+ }
}
TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg)