summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-07-28 21:38:42 +0000
committerAlan Conway <aconway@apache.org>2011-07-28 21:38:42 +0000
commit6d882c19e689954c5da2a41173334216d1ce5a76 (patch)
tree6a84040c8f020833cc831a10d97ce87c13395e40
parentc42d9df9b8af5dc7d5decdcb5818a100ee8df0a3 (diff)
downloadqpid-python-6d882c19e689954c5da2a41173334216d1ce5a76.tar.gz
QPID-2920: Initial stab at time-based queue sharing.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1152009 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp13
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp113
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h54
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp28
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.h13
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp1
-rw-r--r--qpid/cpp/src/qpid/cluster/types.h10
-rw-r--r--qpid/cpp/src/tests/BrokerClusterCalls.cpp3
-rwxr-xr-xqpid/cpp/src/tests/ais_check2
-rwxr-xr-xqpid/cpp/src/tests/cluster2_tests.py11
11 files changed, 133 insertions, 118 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 84f025824c..03a288e924 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1285,12 +1285,13 @@ void Queue::UsageBarrier::destroy()
// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
void Queue::stop() {
+ QPID_LOG(critical, "FIXME Queue stopped " << getName());
// FIXME aconway 2011-05-25: rename dispatching - acquiring?
dispatching.stop();
}
void Queue::start() {
- QPID_LOG(critical, "FIXME start context=" << clusterContext);
+ QPID_LOG(critical, "FIXME Queue started " << getName());
assert(clusterContext); // FIXME aconway 2011-06-08: XXX
dispatching.start();
notifyListener();
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 465a5de021..fa247ae8f5 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -109,7 +109,6 @@ void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
void BrokerContext::acquire(const broker::QueuedMessage& qm) {
if (tssNoReplicate) return;
- QueueContext::get(*qm.queue)->acquire();
core.mcast(ClusterMessageAcquireBody(
ProtocolVersion(), qm.queue->getName(), qm.position));
}
@@ -177,15 +176,12 @@ void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex,
// n is the number of consumers including the one just added.
// FIXME aconway 2011-06-27: rename, conflicting terms.
void BrokerContext::consume(broker::Queue& q, size_t n) {
- if (n == 1) {
- // FIXME aconway 2011-06-27: should be on QueueContext for symmetry?
- core.mcast(ClusterQueueSubscribeBody(ProtocolVersion(), q.getName()));
- }
+ QueueContext::get(q)->consume(n);
}
// n is the number of consumers after the cancel.
void BrokerContext::cancel(broker::Queue& q, size_t n) {
- if (n == 0) QueueContext::get(q)->unsubscribed();
+ QueueContext::get(q)->cancel(n);
}
void BrokerContext::empty(broker::Queue& ) {
@@ -196,10 +192,7 @@ void BrokerContext::stopped(broker::Queue& q) {
boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
// Don't forward the stopped call if the queue does not yet have a cluster context
// this when the queue is first created locally.
- if (qc){
- QPID_LOG(critical, "FIXME BrokerContext::stopped " << q.getName());
- qc->stopped();
- }
+ if (qc) qc->stopped();
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index 6c97c906e8..122163ee7e 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -21,79 +21,101 @@
#include "QueueContext.h"
#include "Multicaster.h"
+#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/ClusterQueueResubscribeBody.h"
+#include "qpid/framing/ClusterQueueSubscribeBody.h"
#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
+#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
-
+#include "qpid/sys/Timer.h"
namespace qpid {
namespace cluster {
+
+class OwnershipTimeout : public sys::TimerTask {
+ QueueContext& queueContext;
+
+ public:
+ OwnershipTimeout(QueueContext& qc, const sys::Duration& interval) :
+ TimerTask(interval, "QueueContext::OwnershipTimeout"), queueContext(qc) {}
+
+ // FIXME aconway 2011-07-27: thread safety on deletion?
+ void fire() { queueContext.timeout(); }
+};
+
QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
- : owner(NOT_OWNER), count(0), queue(q), mcast(m)
+ : timer(q.getBroker()->getTimer()), queue(q), mcast(m), consumers(0)
{
- QPID_LOG(debug, "Assign cluster context to queue " << q.getName());
- q.stop(); // Initially stopped. Must all before setClusterContext
q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
+ q.stop(); // Initially stopped.
+}
+QueueContext::~QueueContext() {
+ // FIXME aconway 2011-07-27: revisit shutdown logic.
+ // timeout() could be called concurrently with destructor.
+ sys::Mutex::ScopedLock l(lock);
+ timerTask->cancel();
}
-// Called by QueueReplica in deliver thread.
-void QueueContext::sharedOwner(size_t limit) {
- QPID_LOG(critical, "FIXME QueueContext::sharedOwner " << queue.getName() << queue.getClusterContext().get());
+void QueueContext::replicaState(QueueOwnership state) {
sys::Mutex::ScopedLock l(lock);
- count = limit;
- if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex?
- owner = SHARED_OWNER;
+ switch (state) {
+ case UNSUBSCRIBED:
+ case SUBSCRIBED:
+ break;
+ case SOLE_OWNER:
+ queue.start();
+ if (timerTask) { // no need for timeout.
+ timerTask->cancel();
+ timerTask = 0;
+ }
+ break;
+ case SHARED_OWNER:
+ queue.start();
+ if (timerTask) timerTask->cancel();
+ // FIXME aconway 2011-07-28: configurable interval.
+ timerTask = new OwnershipTimeout(*this, 100*sys::TIME_MSEC);
+ timer.add(timerTask);
+ break;
+ }
}
-// Called by QueueReplica in deliver thread.
-void QueueContext::soleOwner() {
- QPID_LOG(critical, "FIXME QueueContext::soleOwner " << queue.getName() << queue.getClusterContext().get());
+// FIXME aconway 2011-07-27: Dont spin token on an empty queue. Cancel timer.
+
+void QueueContext::consume(size_t n) {
sys::Mutex::ScopedLock l(lock);
- count = 0;
- if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex?
- owner = SOLE_OWNER;
+ consumers = n;
+ if (n == 1) mcast.mcast(
+ framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
}
-// Called by BrokerContext in connection thread(s) on acquiring a message
-void QueueContext::acquire() {
- bool stop = false;
- {
- sys::Mutex::ScopedLock l(lock);
- assert(owner != NOT_OWNER); // Can't acquire on a queue we don't own.
- QPID_LOG(critical, "FIXME QueueContext::acquire " << queue.getName()
- << " owner=" << owner << " count=" << count);
- if (owner == SHARED_OWNER) {
- // Note count could be 0 if there are concurrent calls to acquire.
- if (count && --count == 0) {
- stop = true;
- }
- }
- }
- // FIXME aconway 2011-06-28: could have multiple stop() threads...
- if (stop) queue.stop();
+void QueueContext::cancel(size_t n) {
+ sys::Mutex::ScopedLock l(lock);
+ consumers = n;
+ if (n == 0) queue.stop(); // FIXME aconway 2011-07-28: Ok inside lock?
+}
+
+void QueueContext::timeout() {
+ QPID_LOG(critical, "FIXME Ownership timeout on queue " << queue.getName());
+ queue.stop();
}
-// Callback set up by queue.stop()
+
+// Callback set up by queue.stop(), called when no threads are dispatching from the queue.
+// Release the queue.
void QueueContext::stopped() {
sys::Mutex::ScopedLock l(lock);
- if (owner == NOT_OWNER) {
+ // FIXME aconway 2011-07-28: review thread safety of state.
+ // Deffered call to stopped doesn't sit well.
+ // queueActive is invaled while stop is in progress?
+ if (consumers == 0)
mcast.mcast(framing::ClusterQueueUnsubscribeBody(
framing::ProtocolVersion(), queue.getName()));
- } else {
- owner = NOT_OWNER;
+ else
mcast.mcast(framing::ClusterQueueResubscribeBody(
framing::ProtocolVersion(), queue.getName()));
- }
-}
-
-void QueueContext::unsubscribed() {
- QPID_LOG(critical, "FIXME QueueContext unsubscribed, stopping " << queue.getName());
- queue.stop();
- sys::Mutex::ScopedLock l(lock);
- owner = NOT_OWNER;
}
boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
@@ -102,4 +124,5 @@ boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index 5bafb5eb0f..c244b57a2e 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -24,10 +24,11 @@
#include <qpid/RefCounted.h>
+#include "qpid/sys/Time.h"
#include <qpid/sys/Mutex.h>
+#include "qpid/cluster/types.h"
#include <boost/intrusive_ptr.hpp>
-
// FIXME aconway 2011-06-08: refactor broker::Cluster to put queue ups on
// class broker::Cluster::Queue. This becomes the cluster context.
@@ -35,55 +36,58 @@ namespace qpid {
namespace broker {
class Queue;
}
+namespace sys {
+class Timer;
+class TimerTask;
+}
+
namespace cluster {
class Multicaster;
/**
* Queue state that is not replicated to the cluster.
- * Manages the local queue start/stop status
+ * Manages the local queue start/stop status.
*
- * Thread safe: Called by connection and dispatch threads.
+ * Thread safe: Called by connection, dispatch and timer threads.
*/
class QueueContext : public RefCounted {
- // FIXME aconway 2011-06-07: consistent use of shared vs. intrusive ptr?
public:
QueueContext(broker::Queue& q, Multicaster& m);
+ ~QueueContext();
- /** Sharing ownership of queue, can acquire up to limit before releasing.
- * Called in deliver thread.
- */
- void sharedOwner(size_t limit);
-
- /** Sole owner of queue, no limits to acquiring */
- void soleOwner();
+ /** Replica state has changed, called in deliver thread. */
+ void replicaState(QueueOwnership);
- /**
- * Count an acquired message against the limit.
- * Called from connection threads while consuming messages
+ /** Called when queue is stopped, no threads are dispatching.
+ * Connection or deliver thread.
*/
- void acquire();
-
- /** Called if the queue becomes empty, from connection thread. */
- void empty();
-
- /** Called when queue is stopped, connection or deliver thread. */
void stopped();
- /** Called when the last subscription to a queue is cancelled */
- void unsubscribed();
+ /** Called when a consumer is added to the queue.
+ *@param n: nubmer of consumers after new one is added.
+ */
+ void consume(size_t n);
+
+ /** Called when a consumer is cancelled on the queue.
+ *@param n: nubmer of consumers after the cancel.
+ */
+ void cancel(size_t n);
/** Get the context for a broker queue. */
static boost::intrusive_ptr<QueueContext> get(broker::Queue&);
+ /** Called when the timer runs out: stop the queue. */
+ void timeout();
+
private:
- void release();
+ sys::Timer& timer;
sys::Mutex lock;
- enum { NOT_OWNER, SOLE_OWNER, SHARED_OWNER } owner;
- size_t count; // Count of dequeues remaining, 0 means no limit.
broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr?
Multicaster& mcast;
+ boost::intrusive_ptr<sys::TimerTask> timerTask;
+ size_t consumers;
// FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing.
};
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 551477a920..7bbd6e1422 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -44,7 +44,7 @@ std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) {
return o;
}
-std::ostream& operator<<(std::ostream& o, QueueReplica::State s) {
+std::ostream& operator<<(std::ostream& o, QueueOwnership s) {
static char* tags[] = { "UNSUBSCRIBED", "SUBSCRIBED", "SOLE_OWNER", "SHARED_OWNER" };
return o << tags[s];
}
@@ -58,13 +58,13 @@ std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) {
// FIXME aconway 2011-05-17: error handling for asserts.
void QueueReplica::subscribe(const MemberId& member) {
- State before = getState();
+ QueueOwnership before = getState();
subscribers.push_back(member);
update(before);
}
void QueueReplica::unsubscribe(const MemberId& member) {
- State before = getState();
+ QueueOwnership before = getState();
MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
if (i != subscribers.end()) {
subscribers.erase(i, subscribers.end());
@@ -74,30 +74,20 @@ void QueueReplica::unsubscribe(const MemberId& member) {
void QueueReplica::resubscribe(const MemberId& member) {
assert (member == subscribers.front()); // FIXME aconway 2011-06-27: error handling
- State before = getState();
+ QueueOwnership before = getState();
subscribers.pop_front();
subscribers.push_back(member);
update(before);
}
-void QueueReplica::update(State before) {
- const int acquireLimit = 10; // FIXME aconway 2011-06-23: configurable
- State after = getState();
- if (before == after) return;
+void QueueReplica::update(QueueOwnership before) {
QPID_LOG(trace, "QueueReplica " << *this << " (was " << before << ")");
- switch (after) {
- case UNSUBSCRIBED: break;
- case SUBSCRIBED: break;
- case SOLE_OWNER:
- context->soleOwner();
- break;
- case SHARED_OWNER:
- context->sharedOwner(acquireLimit);
- break;
- }
+ QueueOwnership after = getState();
+ if (before == after) return;
+ context->replicaState(after);
}
-QueueReplica::State QueueReplica::getState() const {
+QueueOwnership QueueReplica::getState() const {
if (isOwner())
return (subscribers.size() > 1) ? SHARED_OWNER : SOLE_OWNER;
return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED;
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
index a322a8b9c0..4ebbc84ef0 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
@@ -35,7 +35,6 @@ class Queue;
}
namespace cluster {
-class QueueHandler;
class QueueContext;
/**
@@ -56,15 +55,9 @@ class QueueReplica : public RefCounted
void resubscribe(const MemberId&);
private:
- enum State {
- UNSUBSCRIBED,
- SUBSCRIBED,
- SOLE_OWNER,
- SHARED_OWNER
- };
friend class PrintSubscribers;
- friend std::ostream& operator<<(std::ostream&, State);
+ friend std::ostream& operator<<(std::ostream&, QueueOwnership);
friend std::ostream& operator<<(std::ostream&, const QueueReplica&);
typedef std::deque<MemberId> MemberQueue;
@@ -74,10 +67,10 @@ class QueueReplica : public RefCounted
MemberId self;
boost::intrusive_ptr<QueueContext> context;
- State getState() const;
+ QueueOwnership getState() const;
bool isOwner() const;
bool isSubscriber(const MemberId&) const;
- void update(State before);
+ void update(QueueOwnership before);
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
index 1b3286792f..ef4df3cf97 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
@@ -72,7 +72,6 @@ void WiringHandler::createQueue(const std::string& data) {
assert(q); // FIXME aconway 2011-05-10: error handling.
// TODO aconway 2011-05-10: if we implement multi-group for queues then
// this call is a problem: comes from wiring delivery thread, not queues.
- // FIXME aconway 2011-06-08: move wiring ops to Queue and Exchange handlers..
queueHandler->add(q);
QPID_LOG(debug, "cluster: create queue " << q->getName());
}
diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h
index 5225576579..6f88d171be 100644
--- a/qpid/cpp/src/qpid/cluster/types.h
+++ b/qpid/cpp/src/qpid/cluster/types.h
@@ -81,6 +81,16 @@ std::ostream& operator<<(std::ostream&, EventType);
/** Number to identify a message being routed. */
typedef uint32_t RoutingId;
+// FIXME aconway 2011-07-28: can we put these 2 back in the
+// QueueReplica & QueueContext?
+/** State of a queue with respect to a cluster member. */
+enum QueueOwnership {
+ UNSUBSCRIBED,
+ SUBSCRIBED,
+ SOLE_OWNER,
+ SHARED_OWNER
+};
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_TYPES_H*/
diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
index 4311cf51cf..b2c07f7469 100644
--- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp
+++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
@@ -249,8 +249,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
- // Note: empty is called once for each receiver.
- BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
+ // FIXME aconway 2011-07-25: empty called once per receiver?
BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
BOOST_CHECK_EQUAL(h.size(), i);
diff --git a/qpid/cpp/src/tests/ais_check b/qpid/cpp/src/tests/ais_check
index 92eaa9dd39..277e3b3f7e 100755
--- a/qpid/cpp/src/tests/ais_check
+++ b/qpid/cpp/src/tests/ais_check
@@ -18,8 +18,6 @@
# under the License.
#
-srcdir=`dirname $0`
-
# Check AIS requirements and run tests if found.
ps -u root | grep 'aisexec\|corosync' >/dev/null || {
echo WARNING: Skipping cluster tests, the aisexec or corosync daemon is not running.
diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py
index 1cf749cdb4..ad13986ad3 100755
--- a/qpid/cpp/src/tests/cluster2_tests.py
+++ b/qpid/cpp/src/tests/cluster2_tests.py
@@ -137,6 +137,12 @@ class Cluster2Tests(BrokerTest):
# FIXME aconway 2010-10-29: test unbind, may need to use old API.
+ def duration(self):
+ d = self.config.defines.get("DURATION")
+ if d: return float(d)*60
+ else: return 3 # Default is to be quick
+
+
def test_dequeue_mutex(self):
"""Ensure that one and only one consumer receives each dequeued message."""
class Receiver(Thread):
@@ -163,13 +169,12 @@ class Cluster2Tests(BrokerTest):
for r in receivers: r.start()
n = 0
- t = time.time() + 1 # Send for 1 second.
+ t = time.time() + self.duration()
while time.time() < t:
sender.send(str(n))
n += 1
for r in receivers: r.join();
- print "FIXME", [len(r.messages) for r in receivers] # FIXME aconway 2011-05-17:
- for r in receivers: assert len(r.messages) # At least one message to each
+ for r in receivers: len(r.messages) > n/6 # Fairness test.
messages = [int(m.content) for r in receivers for m in r.messages ]
messages.sort()
self.assertEqual(range(n), messages)