diff options
author | Gordon Sim <gsim@apache.org> | 2007-10-23 14:50:56 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-10-23 14:50:56 +0000 |
commit | 47c3698db7e5a52a7958ebb635e736b2f95df1f9 (patch) | |
tree | 0f2c5026876d5ef822c93c1d6d95f26757735085 /cpp/src | |
parent | 6b509c871fae213fc2cf6b434e670c8d3bd953b4 (diff) | |
download | qpid-python-47c3698db7e5a52a7958ebb635e736b2f95df1f9.tar.gz |
Hack for no-local when used with jms topics
Fix for releasing of exclusive ownership of queues second time around
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@587525 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Consumer.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 103 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 1 |
6 files changed, 102 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index daa63f4b0c..99b585406e 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -202,8 +202,8 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& getConnection().exclusiveQueues.push_back(queue); } } else { - if (exclusive && !queue->hasExclusiveOwner()) { - queue->setExclusiveOwner(&getConnection()); + if (exclusive && queue->setExclusiveOwner(&getConnection())) { + getConnection().exclusiveQueues.push_back(queue); } } } diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index bf46ecbe1f..8c57d7d2b8 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -46,6 +46,7 @@ namespace qpid { Consumer(bool preAcquires = true) : acquires(preAcquires) {} bool preAcquires() const { return acquires; } virtual bool deliver(QueuedMessage& msg) = 0; + virtual bool filter(Message::shared_ptr) { return true; } virtual ~Consumer(){} }; } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index df34669dc2..2444684d7e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -151,15 +151,33 @@ void Queue::flush(DispatchCompletion& completion) serializer.execute(f); } +/** + * Return true if the message can be excluded. This is currently the + * case if the queue has an exclusive consumer that will never want + * the message, or if the queue is exclusive to a single connection + * and has a single consumer (covers the JMS topic case). + */ +bool Queue::exclude(Message::shared_ptr msg) +{ + RWlock::ScopedWlock locker(consumerLock); + if (exclusive) { + return !exclusive->filter(msg); + } else if (hasExclusiveOwner() && acquirers.size() == 1) { + return !acquirers[0]->filter(msg); + } else { + return false; + } +} + Consumer::ptr Queue::allocate() { RWlock::ScopedWlock locker(consumerLock); - if(acquirers.empty()){ + if (acquirers.empty()) { return Consumer::ptr(); - }else if(exclusive){ + } else if (exclusive){ return exclusive; - }else{ + } else { next = next % acquirers.size(); return acquirers[next++]; } @@ -171,9 +189,9 @@ bool Queue::dispatch(QueuedMessage& msg) //request, so won't result in anyone being missed uint counter = getAcquirerCount(); Consumer::ptr c = allocate(); - while(c && counter--){ - if(c->deliver(msg)) { - return true; + while (c && counter--){ + if (c->deliver(msg)) { + return true; } else { c = allocate(); } @@ -181,22 +199,31 @@ bool Queue::dispatch(QueuedMessage& msg) return false; } -void Queue::dispatch(){ +bool Queue::getNextMessage(QueuedMessage& msg) +{ + Mutex::ScopedLock locker(messageLock); + if (messages.empty()) { + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + return false; + } else { + msg = messages.front(); + return true; + } +} + +void Queue::dispatch() +{ QueuedMessage msg; - while(true){ - { - Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); - break; - } - msg = messages.front(); - } - if( msg.payload->isEnqueueComplete() && dispatch(msg) ) { - pop(); - } else { - break; - } + while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){ + if (dispatch(msg)) { + pop(); + } else if (exclude(msg.payload)) { + pop(); + dequeue(0, msg.payload); + QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]"); + } else { + break; + } } serviceAllBrowsers(); } @@ -479,3 +506,37 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) } } + +bool Queue::isExclusiveOwner(const ConnectionToken* const o) const +{ + Mutex::ScopedLock locker(ownershipLock); + return o == owner; +} + +void Queue::releaseExclusiveOwnership() +{ + Mutex::ScopedLock locker(ownershipLock); + owner = 0; +} + +bool Queue::setExclusiveOwner(const ConnectionToken* const o) +{ + Mutex::ScopedLock locker(ownershipLock); + if (owner) { + return false; + } else { + owner = o; + return true; + } +} + +bool Queue::hasExclusiveOwner() const +{ + Mutex::ScopedLock locker(ownershipLock); + return owner != 0; +} + +bool Queue::hasExclusiveConsumer() const +{ + return exclusive; +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 24a9959d14..5146024b6b 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -85,6 +85,7 @@ namespace qpid { int next; mutable qpid::sys::RWlock consumerLock; mutable qpid::sys::Mutex messageLock; + mutable qpid::sys::Mutex ownershipLock; Consumer::ptr exclusive; mutable uint64_t persistenceId; framing::FieldTable settings; @@ -110,6 +111,8 @@ namespace qpid { Consumer::ptr allocate(); bool seek(QueuedMessage& msg, const framing::SequenceNumber& position); uint32_t getAcquirerCount() const; + bool getNextMessage(QueuedMessage& msg); + bool exclude(Message::shared_ptr msg); protected: /** @@ -172,11 +175,11 @@ namespace qpid { uint32_t getMessageCount() const; uint32_t getConsumerCount() const; inline const string& getName() const { return name; } - inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } - inline void releaseExclusiveOwnership() { owner = 0; } - inline void setExclusiveOwner(const ConnectionToken* const o) { owner = o; } - inline bool hasExclusiveConsumer() const { return exclusive; } - inline bool hasExclusiveOwner() const { return owner != 0; } + bool isExclusiveOwner(const ConnectionToken* const o) const; + void releaseExclusiveOwnership(); + bool setExclusiveOwner(const ConnectionToken* const o); + bool hasExclusiveConsumer() const; + bool hasExclusiveOwner() const; inline bool isDurable() const { return store != 0; } inline const framing::FieldTable& getSettings() const { return settings; } inline bool isAutoDelete() const { return autodelete; } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index f47726bcf8..1f7436da94 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -260,12 +260,20 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) parent->deliveryAdapter.deliver(msg.payload, token); if (windowing || ackExpected) { parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); + } else if (!ackExpected) { + queue->dequeue(0, msg.payload); } } return !blocked; } } +bool SemanticState::ConsumerImpl::filter(Message::shared_ptr msg) +{ + return !(nolocal && + &parent->getSession().getConnection() == msg->getPublisher()); +} + bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg) { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index d2c2d4b188..87ae937cfb 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -78,6 +78,7 @@ class SemanticState : public framing::FrameHandler::Chains, bool ack, bool nolocal, bool acquire); ~ConsumerImpl(); bool deliver(QueuedMessage& msg); + bool filter(Message::shared_ptr msg); void setWindowMode(); void setCreditMode(); |