diff options
author | Alan Conway <aconway@apache.org> | 2011-09-06 21:47:25 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-06 21:47:25 +0000 |
commit | d1f0390856db98c9b9072e934aa4211469ad63d3 (patch) | |
tree | 1de8534fa047fa6c7d75c2f0c98c73eac0aea6f0 | |
parent | 394c0db301fbc79bfb056a02d8d1de108a804f87 (diff) | |
download | qpid-python-d1f0390856db98c9b9072e934aa4211469ad63d3.tar.gz |
QPID-2920: Initial stab at time-based queue sharing.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-1@1165887 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 113 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.h | 54 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp | 28 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueReplica.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/types.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/tests/BrokerClusterCalls.cpp | 3 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ais_check | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster2_tests.py | 11 |
11 files changed, 133 insertions, 118 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 7c4b0a71f1..8f807cd0fe 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1293,12 +1293,13 @@ void Queue::UsageBarrier::destroy() // FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()? void Queue::stop() { + QPID_LOG(critical, "FIXME Queue stopped " << getName()); // FIXME aconway 2011-05-25: rename dispatching - acquiring? dispatching.stop(); } void Queue::start() { - QPID_LOG(critical, "FIXME start context=" << clusterContext); + QPID_LOG(critical, "FIXME Queue started " << getName()); assert(clusterContext); // FIXME aconway 2011-06-08: XXX dispatching.start(); notifyListener(); diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index 465a5de021..fa247ae8f5 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -109,7 +109,6 @@ void BrokerContext::routed(const boost::intrusive_ptr<Message>&) { void BrokerContext::acquire(const broker::QueuedMessage& qm) { if (tssNoReplicate) return; - QueueContext::get(*qm.queue)->acquire(); core.mcast(ClusterMessageAcquireBody( ProtocolVersion(), qm.queue->getName(), qm.position)); } @@ -177,15 +176,12 @@ void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex, // n is the number of consumers including the one just added. // FIXME aconway 2011-06-27: rename, conflicting terms. void BrokerContext::consume(broker::Queue& q, size_t n) { - if (n == 1) { - // FIXME aconway 2011-06-27: should be on QueueContext for symmetry? - core.mcast(ClusterQueueSubscribeBody(ProtocolVersion(), q.getName())); - } + QueueContext::get(q)->consume(n); } // n is the number of consumers after the cancel. void BrokerContext::cancel(broker::Queue& q, size_t n) { - if (n == 0) QueueContext::get(q)->unsubscribed(); + QueueContext::get(q)->cancel(n); } void BrokerContext::empty(broker::Queue& ) { @@ -196,10 +192,7 @@ void BrokerContext::stopped(broker::Queue& q) { boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q); // Don't forward the stopped call if the queue does not yet have a cluster context // this when the queue is first created locally. - if (qc){ - QPID_LOG(critical, "FIXME BrokerContext::stopped " << q.getName()); - qc->stopped(); - } + if (qc) qc->stopped(); } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index 6c97c906e8..122163ee7e 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -21,79 +21,101 @@ #include "QueueContext.h" #include "Multicaster.h" +#include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/ClusterQueueResubscribeBody.h" +#include "qpid/framing/ClusterQueueSubscribeBody.h" #include "qpid/framing/ClusterQueueUnsubscribeBody.h" +#include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/log/Statement.h" - +#include "qpid/sys/Timer.h" namespace qpid { namespace cluster { + +class OwnershipTimeout : public sys::TimerTask { + QueueContext& queueContext; + + public: + OwnershipTimeout(QueueContext& qc, const sys::Duration& interval) : + TimerTask(interval, "QueueContext::OwnershipTimeout"), queueContext(qc) {} + + // FIXME aconway 2011-07-27: thread safety on deletion? + void fire() { queueContext.timeout(); } +}; + QueueContext::QueueContext(broker::Queue& q, Multicaster& m) - : owner(NOT_OWNER), count(0), queue(q), mcast(m) + : timer(q.getBroker()->getTimer()), queue(q), mcast(m), consumers(0) { - QPID_LOG(debug, "Assign cluster context to queue " << q.getName()); - q.stop(); // Initially stopped. Must all before setClusterContext q.setClusterContext(boost::intrusive_ptr<QueueContext>(this)); + q.stop(); // Initially stopped. +} +QueueContext::~QueueContext() { + // FIXME aconway 2011-07-27: revisit shutdown logic. + // timeout() could be called concurrently with destructor. + sys::Mutex::ScopedLock l(lock); + timerTask->cancel(); } -// Called by QueueReplica in deliver thread. -void QueueContext::sharedOwner(size_t limit) { - QPID_LOG(critical, "FIXME QueueContext::sharedOwner " << queue.getName() << queue.getClusterContext().get()); +void QueueContext::replicaState(QueueOwnership state) { sys::Mutex::ScopedLock l(lock); - count = limit; - if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex? - owner = SHARED_OWNER; + switch (state) { + case UNSUBSCRIBED: + case SUBSCRIBED: + break; + case SOLE_OWNER: + queue.start(); + if (timerTask) { // no need for timeout. + timerTask->cancel(); + timerTask = 0; + } + break; + case SHARED_OWNER: + queue.start(); + if (timerTask) timerTask->cancel(); + // FIXME aconway 2011-07-28: configurable interval. + timerTask = new OwnershipTimeout(*this, 100*sys::TIME_MSEC); + timer.add(timerTask); + break; + } } -// Called by QueueReplica in deliver thread. -void QueueContext::soleOwner() { - QPID_LOG(critical, "FIXME QueueContext::soleOwner " << queue.getName() << queue.getClusterContext().get()); +// FIXME aconway 2011-07-27: Dont spin token on an empty queue. Cancel timer. + +void QueueContext::consume(size_t n) { sys::Mutex::ScopedLock l(lock); - count = 0; - if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex? - owner = SOLE_OWNER; + consumers = n; + if (n == 1) mcast.mcast( + framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName())); } -// Called by BrokerContext in connection thread(s) on acquiring a message -void QueueContext::acquire() { - bool stop = false; - { - sys::Mutex::ScopedLock l(lock); - assert(owner != NOT_OWNER); // Can't acquire on a queue we don't own. - QPID_LOG(critical, "FIXME QueueContext::acquire " << queue.getName() - << " owner=" << owner << " count=" << count); - if (owner == SHARED_OWNER) { - // Note count could be 0 if there are concurrent calls to acquire. - if (count && --count == 0) { - stop = true; - } - } - } - // FIXME aconway 2011-06-28: could have multiple stop() threads... - if (stop) queue.stop(); +void QueueContext::cancel(size_t n) { + sys::Mutex::ScopedLock l(lock); + consumers = n; + if (n == 0) queue.stop(); // FIXME aconway 2011-07-28: Ok inside lock? +} + +void QueueContext::timeout() { + QPID_LOG(critical, "FIXME Ownership timeout on queue " << queue.getName()); + queue.stop(); } -// Callback set up by queue.stop() + +// Callback set up by queue.stop(), called when no threads are dispatching from the queue. +// Release the queue. void QueueContext::stopped() { sys::Mutex::ScopedLock l(lock); - if (owner == NOT_OWNER) { + // FIXME aconway 2011-07-28: review thread safety of state. + // Deffered call to stopped doesn't sit well. + // queueActive is invaled while stop is in progress? + if (consumers == 0) mcast.mcast(framing::ClusterQueueUnsubscribeBody( framing::ProtocolVersion(), queue.getName())); - } else { - owner = NOT_OWNER; + else mcast.mcast(framing::ClusterQueueResubscribeBody( framing::ProtocolVersion(), queue.getName())); - } -} - -void QueueContext::unsubscribed() { - QPID_LOG(critical, "FIXME QueueContext unsubscribed, stopping " << queue.getName()); - queue.stop(); - sys::Mutex::ScopedLock l(lock); - owner = NOT_OWNER; } boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) { @@ -102,4 +124,5 @@ boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) { } + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h index 5bafb5eb0f..c244b57a2e 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -24,10 +24,11 @@ #include <qpid/RefCounted.h> +#include "qpid/sys/Time.h" #include <qpid/sys/Mutex.h> +#include "qpid/cluster/types.h" #include <boost/intrusive_ptr.hpp> - // FIXME aconway 2011-06-08: refactor broker::Cluster to put queue ups on // class broker::Cluster::Queue. This becomes the cluster context. @@ -35,55 +36,58 @@ namespace qpid { namespace broker { class Queue; } +namespace sys { +class Timer; +class TimerTask; +} + namespace cluster { class Multicaster; /** * Queue state that is not replicated to the cluster. - * Manages the local queue start/stop status + * Manages the local queue start/stop status. * - * Thread safe: Called by connection and dispatch threads. + * Thread safe: Called by connection, dispatch and timer threads. */ class QueueContext : public RefCounted { - // FIXME aconway 2011-06-07: consistent use of shared vs. intrusive ptr? public: QueueContext(broker::Queue& q, Multicaster& m); + ~QueueContext(); - /** Sharing ownership of queue, can acquire up to limit before releasing. - * Called in deliver thread. - */ - void sharedOwner(size_t limit); - - /** Sole owner of queue, no limits to acquiring */ - void soleOwner(); + /** Replica state has changed, called in deliver thread. */ + void replicaState(QueueOwnership); - /** - * Count an acquired message against the limit. - * Called from connection threads while consuming messages + /** Called when queue is stopped, no threads are dispatching. + * Connection or deliver thread. */ - void acquire(); - - /** Called if the queue becomes empty, from connection thread. */ - void empty(); - - /** Called when queue is stopped, connection or deliver thread. */ void stopped(); - /** Called when the last subscription to a queue is cancelled */ - void unsubscribed(); + /** Called when a consumer is added to the queue. + *@param n: nubmer of consumers after new one is added. + */ + void consume(size_t n); + + /** Called when a consumer is cancelled on the queue. + *@param n: nubmer of consumers after the cancel. + */ + void cancel(size_t n); /** Get the context for a broker queue. */ static boost::intrusive_ptr<QueueContext> get(broker::Queue&); + /** Called when the timer runs out: stop the queue. */ + void timeout(); + private: - void release(); + sys::Timer& timer; sys::Mutex lock; - enum { NOT_OWNER, SOLE_OWNER, SHARED_OWNER } owner; - size_t count; // Count of dequeues remaining, 0 means no limit. broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr? Multicaster& mcast; + boost::intrusive_ptr<sys::TimerTask> timerTask; + size_t consumers; // FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing. }; diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp index 551477a920..7bbd6e1422 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp @@ -44,7 +44,7 @@ std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) { return o; } -std::ostream& operator<<(std::ostream& o, QueueReplica::State s) { +std::ostream& operator<<(std::ostream& o, QueueOwnership s) { static char* tags[] = { "UNSUBSCRIBED", "SUBSCRIBED", "SOLE_OWNER", "SHARED_OWNER" }; return o << tags[s]; } @@ -58,13 +58,13 @@ std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) { // FIXME aconway 2011-05-17: error handling for asserts. void QueueReplica::subscribe(const MemberId& member) { - State before = getState(); + QueueOwnership before = getState(); subscribers.push_back(member); update(before); } void QueueReplica::unsubscribe(const MemberId& member) { - State before = getState(); + QueueOwnership before = getState(); MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member); if (i != subscribers.end()) { subscribers.erase(i, subscribers.end()); @@ -74,30 +74,20 @@ void QueueReplica::unsubscribe(const MemberId& member) { void QueueReplica::resubscribe(const MemberId& member) { assert (member == subscribers.front()); // FIXME aconway 2011-06-27: error handling - State before = getState(); + QueueOwnership before = getState(); subscribers.pop_front(); subscribers.push_back(member); update(before); } -void QueueReplica::update(State before) { - const int acquireLimit = 10; // FIXME aconway 2011-06-23: configurable - State after = getState(); - if (before == after) return; +void QueueReplica::update(QueueOwnership before) { QPID_LOG(trace, "QueueReplica " << *this << " (was " << before << ")"); - switch (after) { - case UNSUBSCRIBED: break; - case SUBSCRIBED: break; - case SOLE_OWNER: - context->soleOwner(); - break; - case SHARED_OWNER: - context->sharedOwner(acquireLimit); - break; - } + QueueOwnership after = getState(); + if (before == after) return; + context->replicaState(after); } -QueueReplica::State QueueReplica::getState() const { +QueueOwnership QueueReplica::getState() const { if (isOwner()) return (subscribers.size() > 1) ? SHARED_OWNER : SOLE_OWNER; return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED; diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h index a322a8b9c0..4ebbc84ef0 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h @@ -35,7 +35,6 @@ class Queue; } namespace cluster { -class QueueHandler; class QueueContext; /** @@ -56,15 +55,9 @@ class QueueReplica : public RefCounted void resubscribe(const MemberId&); private: - enum State { - UNSUBSCRIBED, - SUBSCRIBED, - SOLE_OWNER, - SHARED_OWNER - }; friend class PrintSubscribers; - friend std::ostream& operator<<(std::ostream&, State); + friend std::ostream& operator<<(std::ostream&, QueueOwnership); friend std::ostream& operator<<(std::ostream&, const QueueReplica&); typedef std::deque<MemberId> MemberQueue; @@ -74,10 +67,10 @@ class QueueReplica : public RefCounted MemberId self; boost::intrusive_ptr<QueueContext> context; - State getState() const; + QueueOwnership getState() const; bool isOwner() const; bool isSubscriber(const MemberId&) const; - void update(State before); + void update(QueueOwnership before); }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp index 1b3286792f..ef4df3cf97 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp @@ -72,7 +72,6 @@ void WiringHandler::createQueue(const std::string& data) { assert(q); // FIXME aconway 2011-05-10: error handling. // TODO aconway 2011-05-10: if we implement multi-group for queues then // this call is a problem: comes from wiring delivery thread, not queues. - // FIXME aconway 2011-06-08: move wiring ops to Queue and Exchange handlers.. queueHandler->add(q); QPID_LOG(debug, "cluster: create queue " << q->getName()); } diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index dec377b173..667d9b89fa 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -82,6 +82,16 @@ std::ostream& operator<<(std::ostream&, EventType); /** Number to identify a message being routed. */ typedef uint32_t RoutingId; +// FIXME aconway 2011-07-28: can we put these 2 back in the +// QueueReplica & QueueContext? +/** State of a queue with respect to a cluster member. */ +enum QueueOwnership { + UNSUBSCRIBED, + SUBSCRIBED, + SOLE_OWNER, + SHARED_OWNER +}; + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_TYPES_H*/ diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp index 4311cf51cf..b2c07f7469 100644 --- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp +++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp @@ -249,8 +249,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); - // Note: empty is called once for each receiver. - BOOST_CHECK_EQUAL(h.at(i++), "empty(q)"); + // FIXME aconway 2011-07-25: empty called once per receiver? BOOST_CHECK_EQUAL(h.at(i++), "empty(q)"); BOOST_CHECK_EQUAL(h.at(i++), "empty(q)"); BOOST_CHECK_EQUAL(h.size(), i); diff --git a/qpid/cpp/src/tests/ais_check b/qpid/cpp/src/tests/ais_check index 92eaa9dd39..277e3b3f7e 100755 --- a/qpid/cpp/src/tests/ais_check +++ b/qpid/cpp/src/tests/ais_check @@ -18,8 +18,6 @@ # under the License. # -srcdir=`dirname $0` - # Check AIS requirements and run tests if found. ps -u root | grep 'aisexec\|corosync' >/dev/null || { echo WARNING: Skipping cluster tests, the aisexec or corosync daemon is not running. diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py index 1cf749cdb4..ad13986ad3 100755 --- a/qpid/cpp/src/tests/cluster2_tests.py +++ b/qpid/cpp/src/tests/cluster2_tests.py @@ -137,6 +137,12 @@ class Cluster2Tests(BrokerTest): # FIXME aconway 2010-10-29: test unbind, may need to use old API. + def duration(self): + d = self.config.defines.get("DURATION") + if d: return float(d)*60 + else: return 3 # Default is to be quick + + def test_dequeue_mutex(self): """Ensure that one and only one consumer receives each dequeued message.""" class Receiver(Thread): @@ -163,13 +169,12 @@ class Cluster2Tests(BrokerTest): for r in receivers: r.start() n = 0 - t = time.time() + 1 # Send for 1 second. + t = time.time() + self.duration() while time.time() < t: sender.send(str(n)) n += 1 for r in receivers: r.join(); - print "FIXME", [len(r.messages) for r in receivers] # FIXME aconway 2011-05-17: - for r in receivers: assert len(r.messages) # At least one message to each + for r in receivers: len(r.messages) > n/6 # Fairness test. messages = [int(m.content) for r in receivers for m in r.messages ] messages.sort() self.assertEqual(range(n), messages) |