diff options
author | Alan Conway <aconway@apache.org> | 2011-09-21 14:55:49 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-21 14:55:49 +0000 |
commit | c84c26c8f03f6f37058dfbab2c13ba63ee06a2b7 (patch) | |
tree | 24d0ed2c99659d0a24922e24d35b4a67d02bef3c | |
parent | e253587dd57ffd1788d8adcb2133a6901bab995d (diff) | |
download | qpid-python-c84c26c8f03f6f37058dfbab2c13ba63ee06a2b7.tar.gz |
QPID-2920: Fixing hangs in qid-cpp-benchmark with 2 brokers.
This test hangs: qpid-cpp-benchmark -b localhost:5556,localhost:5555 -r2 -m10000
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1173695 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueReplica.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Stoppable.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Timer.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/tests/BrokerClusterCalls.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-receive.cpp | 5 |
10 files changed, 60 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 1eec0c0b0a..3d7b27738f 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -227,7 +227,7 @@ void Queue::requeue(const QueuedMessage& msg){ } } } - if (broker) broker->getCluster().requeue(msg); // FIXME aconway 2011-09-12: review. rename requeue? + if (broker) broker->getCluster().requeue(msg); copy.notify(); } @@ -255,7 +255,6 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess ClusterAcquireScope acquireScope; // Outside lock Mutex::ScopedLock locker(messageLock); assertClusterSafe(); - QPID_LOG(debug, "Attempting to acquire message at " << position); if (messages->remove(position, message)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); acquireScope.qmsg = message; @@ -307,13 +306,13 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ while (true) { Stoppable::Scope consumeScope(consuming); if (!consumeScope) { - QPID_LOG(trace, "Queue is stopped: " << name); + QPID_LOG(trace, "Queue stopped, can't consume: " << name); listeners.addListener(c); return NO_MESSAGES; } ClusterAcquireScope acquireScope; // Outside the lock Mutex::ScopedLock locker(messageLock); - if (messages->empty()) { // FIXME aconway 2011-06-07: ugly + if (messages->empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; @@ -914,10 +913,6 @@ void Queue::notifyDeleted() set.notifyAll(); } -void Queue::consumingStopped() { - if (broker) broker->getCluster().stopped(*this); -} - void Queue::bound(const string& exchange, const string& key, const FieldTable& args) { @@ -1287,12 +1282,19 @@ void Queue::UsageBarrier::destroy() } void Queue::stopConsumers() { - QPID_LOG(trace, "Queue stopped: " << getName()); + QPID_LOG(trace, "Stopping consumers on " << getName()); consuming.stop(); } void Queue::startConsumers() { - QPID_LOG(trace, "Queue started: " << getName()); + QPID_LOG(trace, "Starting consumers on " << getName()); consuming.start(); notifyListener(); } + +// Called when all busy threads exitd due to stopConsumers() +void Queue::consumingStopped() { + QPID_LOG(trace, "Stopped consumers on " << getName()); + if (broker) broker->getCluster().stopped(*this); +} + diff --git a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h index 5d16ce6e10..b7ec2e4fb1 100644 --- a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h +++ b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h @@ -23,6 +23,7 @@ */ #include "qpid/sys/Timer.h" +#include "qpid/log/Statement.h" // FIXME aconway 2011-09-19: remove #include <boost/function.hpp> namespace qpid { @@ -44,6 +45,7 @@ class CountdownTimer { /** Start the countdown if not already started. */ void start() { + QPID_LOG(debug, "FIXME CountdownTimer::start"); sys::Mutex::ScopedLock l(lock); if (!timerRunning) { timerRunning = true; @@ -54,6 +56,7 @@ class CountdownTimer { /** Stop the countdown if not already stopped. */ void stop() { + QPID_LOG(debug, "FIXME CountdownTimer::stop"); sys::Mutex::ScopedLock l(lock); if (timerRunning) { timerRunning = false; @@ -73,6 +76,7 @@ class CountdownTimer { // Called when countdown expires. void fire() { + QPID_LOG(debug, "FIXME CountdownTimer::fire"); bool doCallback = false; { sys::Mutex::ScopedLock l(lock); @@ -87,6 +91,7 @@ class CountdownTimer { bool timerRunning; boost::function<void()> callback; sys::Timer& timer; + sys::Duration duration; }; diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index 14a39e1e61..dbe008e33e 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -89,6 +89,9 @@ void MessageHandler::routed(RoutingId routingId) { // FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet // and scan queue once. void MessageHandler::acquire(const std::string& q, uint32_t position) { + // FIXME aconway 2011-09-15: systematic logging across cluster module. + QPID_LOG(trace, "cluster message " << q << "[" << position + << "] acquired by " << PrettyId(sender(), self())); // Note acquires from other members. My own acquires were executed in // the connection thread if (sender() != self()) { @@ -102,18 +105,20 @@ void MessageHandler::acquire(const std::string& q, uint32_t position) { assert(qm.payload); // Save on context for possible requeue if released/rejected. QueueContext::get(*queue)->acquire(qm); + // FIXME aconway 2011-09-19: need to record by member-ID to requeue if member leaves. } +} + +void MessageHandler::dequeue(const std::string& q, uint32_t position) { // FIXME aconway 2011-09-15: systematic logging across cluster module. QPID_LOG(trace, "cluster message " << q << "[" << position - << "] acquired by " << PrettyId(sender(), self())); - } + << "] dequeued by " << PrettyId(sender(), self())); -void MessageHandler::dequeue(const std::string& q, uint32_t position) { - if (sender() == self()) { - // FIXME aconway 2010-10-28: we should complete the ack that initiated - // the dequeue at this point, see BrokerContext::dequeue - } - else { + // FIXME aconway 2010-10-28: for local dequeues, we should + // complete the ack that initiated the dequeue at this point, see + // BrokerContext::dequeue + + if (sender() != self()) { // FIXME aconway 2011-09-15: new cluster, inefficient looks up // message by position multiple times? boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed"); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index 3d0ba40bce..de7109e131 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -48,36 +48,36 @@ QueueContext::QueueContext(broker::Queue& q, Multicaster& m) QueueContext::~QueueContext() {} -// Invariant for ownership: -// UNSUBSCRIBED, SUBSCRIBED => timer stopped, queue stopped -// SOLE_OWNER => timer stopped, queue started -// SHARED_OWNER => timer started, queue started - namespace { bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } } // Called by QueueReplica in CPG deliver thread when state changes. void QueueContext::replicaState(QueueOwnership newOwnership) { + + // Invariants for ownership: + // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped + // SOLE_OWNER <=> timer stopped, queue started + // SHARED_OWNER <=> timer started, queue started + sys::Mutex::ScopedLock l(lock); QueueOwnership before = ownership; QueueOwnership after = newOwnership; - ownership = after; - if (!isOwner(before) && !isOwner(after)) - ; // Nothing to do, now ownership change on this transition. - else if (isOwner(before) && !isOwner(after)) // Lost ownership - ; // Nothing to do, queue and timer were stopped before - // sending unsubscribe/resubscribe. - else if (!isOwner(before) && isOwner(after)) { // Took ownership + assert(before != after); + ownership = newOwnership; + + if (!isOwner(before) && isOwner(after)) { // Took ownership queue.startConsumers(); if (after == SHARED_OWNER) timer.start(); } else if (isOwner(before) && isOwner(after) && before != after) { + // Changed from shared to sole owner or vice versa if (after == SOLE_OWNER) timer.stop(); else timer.start(); } + // If we lost ownership then the queue and timer will already have + // been stopped by timeout() } - // FIXME aconway 2011-07-27: Dont spin the token on an empty or idle queue. // Called in connection threads when a consumer is added @@ -101,6 +101,7 @@ void QueueContext::cancel(size_t n) { // Called in timer thread. void QueueContext::timeout() { + QPID_LOG(debug, "FIXME QueueContext::timeout"); // When all threads have stopped, queue will call stopped() queue.stopConsumers(); } @@ -108,8 +109,8 @@ void QueueContext::timeout() { // Callback set up by queue.stopConsumers() called in connection thread. // Called when no threads are dispatching from the queue. void QueueContext::stopped() { + QPID_LOG(debug, "FIXME QueueContext::stopped"); sys::Mutex::ScopedLock l(lock); - // FIXME aconway 2011-07-28: review thread safety of state. if (consumers == 0) mcast.mcast(framing::ClusterQueueUnsubscribeBody( framing::ProtocolVersion(), queue.getName())); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp index 0938498fa3..013e50a175 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp @@ -46,16 +46,10 @@ std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) { } std::ostream& operator<<(std::ostream& o, QueueOwnership s) { - static char* tags[] = { "UNSUBSCRIBED", "SUBSCRIBED", "SOLE_OWNER", "SHARED_OWNER" }; + static char* tags[] = { "unsubscribed", "subscribed", "sole_owner", "shared_owner" }; return o << tags[s]; } -std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) { - o << qr.queue->getName() << "(" << qr.getState() << "): " - << PrintSubscribers(qr.subscribers, qr.getSelf()); - return o; -} - void QueueReplica::subscribe(const MemberId& member) { QueueOwnership before = getState(); subscribers.push_back(member); @@ -81,15 +75,17 @@ void QueueReplica::resubscribe(const MemberId& member) { } void QueueReplica::update(QueueOwnership before) { - QPID_LOG(trace, "cluster: queue replica " << *this << " (was " << before << ")"); QueueOwnership after = getState(); + QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ": " + << before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]"); if (before != after) context->replicaState(after); } QueueOwnership QueueReplica::getState() const { if (isOwner()) return (subscribers.size() > 1) ? SHARED_OWNER : SOLE_OWNER; - return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED; + else + return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED; } bool QueueReplica::isOwner() const { diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h index 20aef058fc..31faf4853a 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h @@ -72,7 +72,6 @@ class QueueReplica : public RefCounted friend struct PrintSubscribers; friend std::ostream& operator<<(std::ostream&, QueueOwnership); - friend std::ostream& operator<<(std::ostream&, const QueueReplica&); friend std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps); }; diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h index 6ddf926280..6f10935c27 100644 --- a/qpid/cpp/src/qpid/sys/Stoppable.h +++ b/qpid/cpp/src/qpid/sys/Stoppable.h @@ -68,7 +68,6 @@ class Stoppable { */ void stop() { sys::Monitor::ScopedLock l(lock); - if (stopped) return; stopped = true; check(l); } diff --git a/qpid/cpp/src/qpid/sys/Timer.cpp b/qpid/cpp/src/qpid/sys/Timer.cpp index 47752e4584..934add5673 100644 --- a/qpid/cpp/src/qpid/sys/Timer.cpp +++ b/qpid/cpp/src/qpid/sys/Timer.cpp @@ -68,7 +68,11 @@ void TimerTask::setupNextFire() { } // Only allow tasks to be delayed -void TimerTask::restart() { nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); } +void TimerTask::restart() { + ScopedLock<Mutex> l(callbackLock); + nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); + cancelled = false; +} void TimerTask::cancel() { ScopedLock<Mutex> l(callbackLock); diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp index 7975210e4e..db2dd59579 100644 --- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp +++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp @@ -96,7 +96,6 @@ class DummyCluster : public broker::Cluster } virtual bool dequeue(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("dequeue", qm); - return false; } // Consumers diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 81f697dec0..689662c2ba 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -198,6 +198,10 @@ int main(int argc, char ** argv) std::map<std::string,Sender> replyTo; while (!done && receiver.fetch(msg, timeout)) { + // FIXME aconway 2011-09-19: +// std::ostringstream os; +// os << "qpid-receive(" << getpid() << ") seq=" << msg.getProperties()[SN] << endl; // FIXME aconway 2011-09-19: +// cerr << os.str() << flush; if (!started) { // Start the time on receipt of the first message to avoid counting // idle time at process startup. @@ -225,6 +229,7 @@ int main(int argc, char ** argv) if (opts.printContent) std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages if (opts.messages && count >= opts.messages) { + cerr << "qpid-receive(" << getpid() << ") DONE" << endl; done = true; } } |