summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-07-08 14:53:51 +0000
committerGordon Sim <gsim@apache.org>2008-07-08 14:53:51 +0000
commit50d4782e2f3676723a2df5bf6a420d45fb55467d (patch)
tree8ef8248cf98c3e0ecfeffd78031768b5b812c579 /cpp
parent2c2c79d8d6321cf3cf05018f6ee92a1cf9627934 (diff)
downloadqpid-python-50d4782e2f3676723a2df5bf6a420d45fb55467d.tar.gz
* release message lock when notifying queue listeners
* take copy of listeners * remove unused functionality git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@674848 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp66
-rw-r--r--cpp/src/qpid/broker/Queue.h16
2 files changed, 53 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 40f249bc11..ad06b6ecaa 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -202,19 +202,6 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
-/**
- * Return true if the message can be excluded. This is currently the
- * case if the queue is exclusive and has an exclusive consumer that
- * doesn't want the message or has a single consumer that doesn't want
- * the message (covers the JMS topic case).
- */
-bool Queue::canExcludeUnwanted()
-{
- Mutex::ScopedLock locker(consumerLock);
- return hasExclusiveOwner() && (exclusive || consumerCount == 1);
-}
-
-
bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
{
if (c.preAcquires()) {
@@ -252,15 +239,8 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
}
} else {
//consumer will never want this message
- if (canExcludeUnwanted()) {
- //hack for no-local on JMS topics; get rid of this message
- QPID_LOG(debug, "Excluding message from '" << name << "'");
- pop();
- } else {
- //leave it for another consumer
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- return false;
- }
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ return false;
}
}
}
@@ -291,22 +271,35 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c)
return false;
}
+/**
+ * notify listeners that there may be messages to process
+ */
void Queue::notify()
{
- //notify listeners that there may be messages to process
- for_each(listeners.begin(), listeners.end(), mem_fun(&Consumer::notify));
+ if (listeners.empty()) return;
+
+ Listeners copy(listeners);
listeners.clear();
+
+ sys::ScopedLock<Guard> g(notifierLock);//prevent consumers being deleted while held in copy
+ {
+ Mutex::ScopedUnlock u(messageLock);
+ for_each(copy.begin(), copy.end(), mem_fun(&Consumer::notify));
+ }
}
void Queue::removeListener(Consumer& c)
{
Mutex::ScopedLock locker(messageLock);
- listeners.erase(&c);
+ notifierLock.wait(messageLock);//wait until no notifies are in progress
+ Listeners::iterator i = find(listeners.begin(), listeners.end(), &c);
+ if (i != listeners.end()) listeners.erase(i);
}
void Queue::addListener(Consumer& c)
{
- listeners.insert(&c);
+ Listeners::iterator i = find(listeners.begin(), listeners.end(), &c);
+ if (i == listeners.end()) listeners.push_back(&c);
}
bool Queue::dispatch(Consumer& c)
@@ -682,6 +675,27 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
+/*
+ * Use of Guard requires an external lock to be held before calling
+ * any of its methods
+ */
+Queue::Guard::Guard() : count(0) {}
+
+void Queue::Guard::lock()
+{
+ count++;
+}
+
+void Queue::Guard::unlock()
+{
+ if (--count == 0) condition.notifyAll();
+}
+
+void Queue::Guard::wait(sys::Mutex& m)
+{
+ while (count) condition.wait(m);
+}
+
ManagementObject::shared_ptr Queue::GetManagementObject (void) const
{
return dynamic_pointer_cast<ManagementObject> (mgmtObject);
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index f56cee0f22..29c6005d60 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -38,7 +38,6 @@
#include <vector>
#include <memory>
#include <deque>
-#include <set>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
@@ -62,9 +61,20 @@ namespace qpid {
*/
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
- typedef std::set<Consumer*> Listeners;
+ typedef qpid::InlineVector<Consumer*, 5> Listeners;
typedef std::deque<QueuedMessage> Messages;
+ class Guard
+ {
+ qpid::sys::Condition condition;
+ size_t count;
+ public:
+ Guard();
+ void lock();
+ void unlock();
+ void wait(sys::Mutex&);
+ };
+
const string name;
const bool autodelete;
MessageStore* store;
@@ -79,6 +89,7 @@ namespace qpid {
mutable qpid::sys::Mutex consumerLock;
mutable qpid::sys::Mutex messageLock;
mutable qpid::sys::Mutex ownershipLock;
+ Guard notifierLock;
mutable uint64_t persistenceId;
framing::FieldTable settings;
std::auto_ptr<QueuePolicy> policy;
@@ -95,7 +106,6 @@ namespace qpid {
bool getNextMessage(QueuedMessage& msg, Consumer& c);
bool consumeNextMessage(QueuedMessage& msg, Consumer& c);
bool browseNextMessage(QueuedMessage& msg, Consumer& c);
- bool canExcludeUnwanted();
void notify();
void removeListener(Consumer&);