summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-21 14:55:49 +0000
committerAlan Conway <aconway@apache.org>2011-09-21 14:55:49 +0000
commitc84c26c8f03f6f37058dfbab2c13ba63ee06a2b7 (patch)
tree24d0ed2c99659d0a24922e24d35b4a67d02bef3c
parente253587dd57ffd1788d8adcb2133a6901bab995d (diff)
downloadqpid-python-c84c26c8f03f6f37058dfbab2c13ba63ee06a2b7.tar.gz
QPID-2920: Fixing hangs in qid-cpp-benchmark with 2 brokers.
This test hangs: qpid-cpp-benchmark -b localhost:5556,localhost:5555 -r2 -m10000 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1173695 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp22
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp21
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp29
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp14
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.h1
-rw-r--r--qpid/cpp/src/qpid/sys/Stoppable.h1
-rw-r--r--qpid/cpp/src/qpid/sys/Timer.cpp6
-rw-r--r--qpid/cpp/src/tests/BrokerClusterCalls.cpp1
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp5
10 files changed, 60 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 1eec0c0b0a..3d7b27738f 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -227,7 +227,7 @@ void Queue::requeue(const QueuedMessage& msg){
}
}
}
- if (broker) broker->getCluster().requeue(msg); // FIXME aconway 2011-09-12: review. rename requeue?
+ if (broker) broker->getCluster().requeue(msg);
copy.notify();
}
@@ -255,7 +255,6 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
ClusterAcquireScope acquireScope; // Outside lock
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
- QPID_LOG(debug, "Attempting to acquire message at " << position);
if (messages->remove(position, message)) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
acquireScope.qmsg = message;
@@ -307,13 +306,13 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
while (true) {
Stoppable::Scope consumeScope(consuming);
if (!consumeScope) {
- QPID_LOG(trace, "Queue is stopped: " << name);
+ QPID_LOG(trace, "Queue stopped, can't consume: " << name);
listeners.addListener(c);
return NO_MESSAGES;
}
ClusterAcquireScope acquireScope; // Outside the lock
Mutex::ScopedLock locker(messageLock);
- if (messages->empty()) { // FIXME aconway 2011-06-07: ugly
+ if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
@@ -914,10 +913,6 @@ void Queue::notifyDeleted()
set.notifyAll();
}
-void Queue::consumingStopped() {
- if (broker) broker->getCluster().stopped(*this);
-}
-
void Queue::bound(const string& exchange, const string& key,
const FieldTable& args)
{
@@ -1287,12 +1282,19 @@ void Queue::UsageBarrier::destroy()
}
void Queue::stopConsumers() {
- QPID_LOG(trace, "Queue stopped: " << getName());
+ QPID_LOG(trace, "Stopping consumers on " << getName());
consuming.stop();
}
void Queue::startConsumers() {
- QPID_LOG(trace, "Queue started: " << getName());
+ QPID_LOG(trace, "Starting consumers on " << getName());
consuming.start();
notifyListener();
}
+
+// Called when all busy threads exitd due to stopConsumers()
+void Queue::consumingStopped() {
+ QPID_LOG(trace, "Stopped consumers on " << getName());
+ if (broker) broker->getCluster().stopped(*this);
+}
+
diff --git a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
index 5d16ce6e10..b7ec2e4fb1 100644
--- a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
+++ b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
@@ -23,6 +23,7 @@
*/
#include "qpid/sys/Timer.h"
+#include "qpid/log/Statement.h" // FIXME aconway 2011-09-19: remove
#include <boost/function.hpp>
namespace qpid {
@@ -44,6 +45,7 @@ class CountdownTimer {
/** Start the countdown if not already started. */
void start() {
+ QPID_LOG(debug, "FIXME CountdownTimer::start");
sys::Mutex::ScopedLock l(lock);
if (!timerRunning) {
timerRunning = true;
@@ -54,6 +56,7 @@ class CountdownTimer {
/** Stop the countdown if not already stopped. */
void stop() {
+ QPID_LOG(debug, "FIXME CountdownTimer::stop");
sys::Mutex::ScopedLock l(lock);
if (timerRunning) {
timerRunning = false;
@@ -73,6 +76,7 @@ class CountdownTimer {
// Called when countdown expires.
void fire() {
+ QPID_LOG(debug, "FIXME CountdownTimer::fire");
bool doCallback = false;
{
sys::Mutex::ScopedLock l(lock);
@@ -87,6 +91,7 @@ class CountdownTimer {
bool timerRunning;
boost::function<void()> callback;
sys::Timer& timer;
+ sys::Duration duration;
};
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 14a39e1e61..dbe008e33e 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -89,6 +89,9 @@ void MessageHandler::routed(RoutingId routingId) {
// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet
// and scan queue once.
void MessageHandler::acquire(const std::string& q, uint32_t position) {
+ // FIXME aconway 2011-09-15: systematic logging across cluster module.
+ QPID_LOG(trace, "cluster message " << q << "[" << position
+ << "] acquired by " << PrettyId(sender(), self()));
// Note acquires from other members. My own acquires were executed in
// the connection thread
if (sender() != self()) {
@@ -102,18 +105,20 @@ void MessageHandler::acquire(const std::string& q, uint32_t position) {
assert(qm.payload);
// Save on context for possible requeue if released/rejected.
QueueContext::get(*queue)->acquire(qm);
+ // FIXME aconway 2011-09-19: need to record by member-ID to requeue if member leaves.
}
+}
+
+void MessageHandler::dequeue(const std::string& q, uint32_t position) {
// FIXME aconway 2011-09-15: systematic logging across cluster module.
QPID_LOG(trace, "cluster message " << q << "[" << position
- << "] acquired by " << PrettyId(sender(), self()));
- }
+ << "] dequeued by " << PrettyId(sender(), self()));
-void MessageHandler::dequeue(const std::string& q, uint32_t position) {
- if (sender() == self()) {
- // FIXME aconway 2010-10-28: we should complete the ack that initiated
- // the dequeue at this point, see BrokerContext::dequeue
- }
- else {
+ // FIXME aconway 2010-10-28: for local dequeues, we should
+ // complete the ack that initiated the dequeue at this point, see
+ // BrokerContext::dequeue
+
+ if (sender() != self()) {
// FIXME aconway 2011-09-15: new cluster, inefficient looks up
// message by position multiple times?
boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index 3d0ba40bce..de7109e131 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -48,36 +48,36 @@ QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
QueueContext::~QueueContext() {}
-// Invariant for ownership:
-// UNSUBSCRIBED, SUBSCRIBED => timer stopped, queue stopped
-// SOLE_OWNER => timer stopped, queue started
-// SHARED_OWNER => timer started, queue started
-
namespace {
bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
}
// Called by QueueReplica in CPG deliver thread when state changes.
void QueueContext::replicaState(QueueOwnership newOwnership) {
+
+ // Invariants for ownership:
+ // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped
+ // SOLE_OWNER <=> timer stopped, queue started
+ // SHARED_OWNER <=> timer started, queue started
+
sys::Mutex::ScopedLock l(lock);
QueueOwnership before = ownership;
QueueOwnership after = newOwnership;
- ownership = after;
- if (!isOwner(before) && !isOwner(after))
- ; // Nothing to do, now ownership change on this transition.
- else if (isOwner(before) && !isOwner(after)) // Lost ownership
- ; // Nothing to do, queue and timer were stopped before
- // sending unsubscribe/resubscribe.
- else if (!isOwner(before) && isOwner(after)) { // Took ownership
+ assert(before != after);
+ ownership = newOwnership;
+
+ if (!isOwner(before) && isOwner(after)) { // Took ownership
queue.startConsumers();
if (after == SHARED_OWNER) timer.start();
}
else if (isOwner(before) && isOwner(after) && before != after) {
+ // Changed from shared to sole owner or vice versa
if (after == SOLE_OWNER) timer.stop();
else timer.start();
}
+ // If we lost ownership then the queue and timer will already have
+ // been stopped by timeout()
}
-
// FIXME aconway 2011-07-27: Dont spin the token on an empty or idle queue.
// Called in connection threads when a consumer is added
@@ -101,6 +101,7 @@ void QueueContext::cancel(size_t n) {
// Called in timer thread.
void QueueContext::timeout() {
+ QPID_LOG(debug, "FIXME QueueContext::timeout");
// When all threads have stopped, queue will call stopped()
queue.stopConsumers();
}
@@ -108,8 +109,8 @@ void QueueContext::timeout() {
// Callback set up by queue.stopConsumers() called in connection thread.
// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
+ QPID_LOG(debug, "FIXME QueueContext::stopped");
sys::Mutex::ScopedLock l(lock);
- // FIXME aconway 2011-07-28: review thread safety of state.
if (consumers == 0)
mcast.mcast(framing::ClusterQueueUnsubscribeBody(
framing::ProtocolVersion(), queue.getName()));
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 0938498fa3..013e50a175 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -46,16 +46,10 @@ std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) {
}
std::ostream& operator<<(std::ostream& o, QueueOwnership s) {
- static char* tags[] = { "UNSUBSCRIBED", "SUBSCRIBED", "SOLE_OWNER", "SHARED_OWNER" };
+ static char* tags[] = { "unsubscribed", "subscribed", "sole_owner", "shared_owner" };
return o << tags[s];
}
-std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) {
- o << qr.queue->getName() << "(" << qr.getState() << "): "
- << PrintSubscribers(qr.subscribers, qr.getSelf());
- return o;
-}
-
void QueueReplica::subscribe(const MemberId& member) {
QueueOwnership before = getState();
subscribers.push_back(member);
@@ -81,15 +75,17 @@ void QueueReplica::resubscribe(const MemberId& member) {
}
void QueueReplica::update(QueueOwnership before) {
- QPID_LOG(trace, "cluster: queue replica " << *this << " (was " << before << ")");
QueueOwnership after = getState();
+ QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ": "
+ << before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]");
if (before != after) context->replicaState(after);
}
QueueOwnership QueueReplica::getState() const {
if (isOwner())
return (subscribers.size() > 1) ? SHARED_OWNER : SOLE_OWNER;
- return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED;
+ else
+ return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED;
}
bool QueueReplica::isOwner() const {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
index 20aef058fc..31faf4853a 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
@@ -72,7 +72,6 @@ class QueueReplica : public RefCounted
friend struct PrintSubscribers;
friend std::ostream& operator<<(std::ostream&, QueueOwnership);
- friend std::ostream& operator<<(std::ostream&, const QueueReplica&);
friend std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps);
};
diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h
index 6ddf926280..6f10935c27 100644
--- a/qpid/cpp/src/qpid/sys/Stoppable.h
+++ b/qpid/cpp/src/qpid/sys/Stoppable.h
@@ -68,7 +68,6 @@ class Stoppable {
*/
void stop() {
sys::Monitor::ScopedLock l(lock);
- if (stopped) return;
stopped = true;
check(l);
}
diff --git a/qpid/cpp/src/qpid/sys/Timer.cpp b/qpid/cpp/src/qpid/sys/Timer.cpp
index 47752e4584..934add5673 100644
--- a/qpid/cpp/src/qpid/sys/Timer.cpp
+++ b/qpid/cpp/src/qpid/sys/Timer.cpp
@@ -68,7 +68,11 @@ void TimerTask::setupNextFire() {
}
// Only allow tasks to be delayed
-void TimerTask::restart() { nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); }
+void TimerTask::restart() {
+ ScopedLock<Mutex> l(callbackLock);
+ nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period));
+ cancelled = false;
+}
void TimerTask::cancel() {
ScopedLock<Mutex> l(callbackLock);
diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
index 7975210e4e..db2dd59579 100644
--- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp
+++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
@@ -96,7 +96,6 @@ class DummyCluster : public broker::Cluster
}
virtual bool dequeue(const broker::QueuedMessage& qm) {
if (!isRouting) recordQm("dequeue", qm);
- return false;
}
// Consumers
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index 81f697dec0..689662c2ba 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -198,6 +198,10 @@ int main(int argc, char ** argv)
std::map<std::string,Sender> replyTo;
while (!done && receiver.fetch(msg, timeout)) {
+ // FIXME aconway 2011-09-19:
+// std::ostringstream os;
+// os << "qpid-receive(" << getpid() << ") seq=" << msg.getProperties()[SN] << endl; // FIXME aconway 2011-09-19:
+// cerr << os.str() << flush;
if (!started) {
// Start the time on receipt of the first message to avoid counting
// idle time at process startup.
@@ -225,6 +229,7 @@ int main(int argc, char ** argv)
if (opts.printContent)
std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
if (opts.messages && count >= opts.messages) {
+ cerr << "qpid-receive(" << getpid() << ") DONE" << endl;
done = true;
}
}