summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-10-23 14:50:56 +0000
committerGordon Sim <gsim@apache.org>2007-10-23 14:50:56 +0000
commit47c3698db7e5a52a7958ebb635e736b2f95df1f9 (patch)
tree0f2c5026876d5ef822c93c1d6d95f26757735085 /cpp/src
parent6b509c871fae213fc2cf6b434e670c8d3bd953b4 (diff)
downloadqpid-python-47c3698db7e5a52a7958ebb635e736b2f95df1f9.tar.gz
Hack for no-local when used with jms topics
Fix for releasing of exclusive ownership of queues second time around git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@587525 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp4
-rw-r--r--cpp/src/qpid/broker/Consumer.h1
-rw-r--r--cpp/src/qpid/broker/Queue.cpp103
-rw-r--r--cpp/src/qpid/broker/Queue.h13
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp8
-rw-r--r--cpp/src/qpid/broker/SemanticState.h1
6 files changed, 102 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index daa63f4b0c..99b585406e 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -202,8 +202,8 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
getConnection().exclusiveQueues.push_back(queue);
}
} else {
- if (exclusive && !queue->hasExclusiveOwner()) {
- queue->setExclusiveOwner(&getConnection());
+ if (exclusive && queue->setExclusiveOwner(&getConnection())) {
+ getConnection().exclusiveQueues.push_back(queue);
}
}
}
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index bf46ecbe1f..8c57d7d2b8 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -46,6 +46,7 @@ namespace qpid {
Consumer(bool preAcquires = true) : acquires(preAcquires) {}
bool preAcquires() const { return acquires; }
virtual bool deliver(QueuedMessage& msg) = 0;
+ virtual bool filter(Message::shared_ptr) { return true; }
virtual ~Consumer(){}
};
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index df34669dc2..2444684d7e 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -151,15 +151,33 @@ void Queue::flush(DispatchCompletion& completion)
serializer.execute(f);
}
+/**
+ * Return true if the message can be excluded. This is currently the
+ * case if the queue has an exclusive consumer that will never want
+ * the message, or if the queue is exclusive to a single connection
+ * and has a single consumer (covers the JMS topic case).
+ */
+bool Queue::exclude(Message::shared_ptr msg)
+{
+ RWlock::ScopedWlock locker(consumerLock);
+ if (exclusive) {
+ return !exclusive->filter(msg);
+ } else if (hasExclusiveOwner() && acquirers.size() == 1) {
+ return !acquirers[0]->filter(msg);
+ } else {
+ return false;
+ }
+}
+
Consumer::ptr Queue::allocate()
{
RWlock::ScopedWlock locker(consumerLock);
- if(acquirers.empty()){
+ if (acquirers.empty()) {
return Consumer::ptr();
- }else if(exclusive){
+ } else if (exclusive){
return exclusive;
- }else{
+ } else {
next = next % acquirers.size();
return acquirers[next++];
}
@@ -171,9 +189,9 @@ bool Queue::dispatch(QueuedMessage& msg)
//request, so won't result in anyone being missed
uint counter = getAcquirerCount();
Consumer::ptr c = allocate();
- while(c && counter--){
- if(c->deliver(msg)) {
- return true;
+ while (c && counter--){
+ if (c->deliver(msg)) {
+ return true;
} else {
c = allocate();
}
@@ -181,22 +199,31 @@ bool Queue::dispatch(QueuedMessage& msg)
return false;
}
-void Queue::dispatch(){
+bool Queue::getNextMessage(QueuedMessage& msg)
+{
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.empty()) {
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ return false;
+ } else {
+ msg = messages.front();
+ return true;
+ }
+}
+
+void Queue::dispatch()
+{
QueuedMessage msg;
- while(true){
- {
- Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
- break;
- }
- msg = messages.front();
- }
- if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
- pop();
- } else {
- break;
- }
+ while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
+ if (dispatch(msg)) {
+ pop();
+ } else if (exclude(msg.payload)) {
+ pop();
+ dequeue(0, msg.payload);
+ QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]");
+ } else {
+ break;
+ }
}
serviceAllBrowsers();
}
@@ -479,3 +506,37 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
}
}
+
+bool Queue::isExclusiveOwner(const ConnectionToken* const o) const
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ return o == owner;
+}
+
+void Queue::releaseExclusiveOwnership()
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ owner = 0;
+}
+
+bool Queue::setExclusiveOwner(const ConnectionToken* const o)
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ if (owner) {
+ return false;
+ } else {
+ owner = o;
+ return true;
+ }
+}
+
+bool Queue::hasExclusiveOwner() const
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ return owner != 0;
+}
+
+bool Queue::hasExclusiveConsumer() const
+{
+ return exclusive;
+}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 24a9959d14..5146024b6b 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -85,6 +85,7 @@ namespace qpid {
int next;
mutable qpid::sys::RWlock consumerLock;
mutable qpid::sys::Mutex messageLock;
+ mutable qpid::sys::Mutex ownershipLock;
Consumer::ptr exclusive;
mutable uint64_t persistenceId;
framing::FieldTable settings;
@@ -110,6 +111,8 @@ namespace qpid {
Consumer::ptr allocate();
bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
uint32_t getAcquirerCount() const;
+ bool getNextMessage(QueuedMessage& msg);
+ bool exclude(Message::shared_ptr msg);
protected:
/**
@@ -172,11 +175,11 @@ namespace qpid {
uint32_t getMessageCount() const;
uint32_t getConsumerCount() const;
inline const string& getName() const { return name; }
- inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
- inline void releaseExclusiveOwnership() { owner = 0; }
- inline void setExclusiveOwner(const ConnectionToken* const o) { owner = o; }
- inline bool hasExclusiveConsumer() const { return exclusive; }
- inline bool hasExclusiveOwner() const { return owner != 0; }
+ bool isExclusiveOwner(const ConnectionToken* const o) const;
+ void releaseExclusiveOwnership();
+ bool setExclusiveOwner(const ConnectionToken* const o);
+ bool hasExclusiveConsumer() const;
+ bool hasExclusiveOwner() const;
inline bool isDurable() const { return store != 0; }
inline const framing::FieldTable& getSettings() const { return settings; }
inline bool isAutoDelete() const { return autodelete; }
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index f47726bcf8..1f7436da94 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -260,12 +260,20 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->deliveryAdapter.deliver(msg.payload, token);
if (windowing || ackExpected) {
parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
+ } else if (!ackExpected) {
+ queue->dequeue(0, msg.payload);
}
}
return !blocked;
}
}
+bool SemanticState::ConsumerImpl::filter(Message::shared_ptr msg)
+{
+ return !(nolocal &&
+ &parent->getSession().getConnection() == msg->getPublisher());
+}
+
bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
{
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index d2c2d4b188..87ae937cfb 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -78,6 +78,7 @@ class SemanticState : public framing::FrameHandler::Chains,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
bool deliver(QueuedMessage& msg);
+ bool filter(Message::shared_ptr msg);
void setWindowMode();
void setCreditMode();