From 29e3b04915ef30f7e0f769cc1ee3994d99711fef Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 4 Nov 2011 20:27:13 +0000 Subject: QPID-2920: Batch acquire/dequeue messages in cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1197749 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/include/qpid/RangeSet.h | 2 + qpid/cpp/src/Makefile.am | 1 + qpid/cpp/src/qpid/broker/Context.h | 38 +++++ qpid/cpp/src/qpid/broker/Queue.h | 7 +- qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp | 12 +- qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp | 2 +- qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h | 2 + qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp | 10 +- qpid/cpp/src/qpid/cluster/exp/EventHandler.h | 8 + qpid/cpp/src/qpid/cluster/exp/Group.cpp | 3 + qpid/cpp/src/qpid/cluster/exp/Group.h | 2 + qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp | 30 +--- qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 180 +++++++++++++++-------- qpid/cpp/src/qpid/cluster/exp/QueueContext.h | 47 ++++-- qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp | 31 ++-- qpid/cpp/src/qpid/cluster/exp/QueueHandler.h | 16 +- qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp | 28 ++-- qpid/cpp/src/qpid/cluster/exp/QueueReplica.h | 12 +- qpid/cpp/src/qpid/cluster/exp/Ticker.cpp | 8 +- qpid/cpp/src/qpid/cluster/exp/Ticker.h | 8 +- qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp | 2 +- qpid/cpp/src/tests/cluster2_tests.py | 31 ++-- qpid/cpp/src/tests/qpid-cpp-benchmark | 3 +- qpid/cpp/xml/cluster.xml | 17 ++- 24 files changed, 318 insertions(+), 182 deletions(-) create mode 100644 qpid/cpp/src/qpid/broker/Context.h diff --git a/qpid/cpp/include/qpid/RangeSet.h b/qpid/cpp/include/qpid/RangeSet.h index 36991fd784..b36030cbcd 100644 --- a/qpid/cpp/include/qpid/RangeSet.h +++ b/qpid/cpp/include/qpid/RangeSet.h @@ -133,6 +133,8 @@ class RangeSet explicit RangeSet(const Range& r) { *this += r; } RangeSet(const T& a, const T& b) { *this += Range(a,b); } + void swap(RangeSet& x) { ranges.swap(x.ranges); } + bool contiguous() const { return ranges.size() <= 1; } bool contains(const T& t) const; diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 250491c394..5e59331b5f 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -530,6 +530,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/ConnectionHandler.h \ qpid/broker/ConnectionState.h \ qpid/broker/ConnectionToken.h \ + qpid/broker/Context .h \ qpid/broker/Consumer.h \ qpid/broker/Daemon.cpp \ qpid/broker/Daemon.h \ diff --git a/qpid/cpp/src/qpid/broker/Context.h b/qpid/cpp/src/qpid/broker/Context.h new file mode 100644 index 0000000000..5770dd78ed --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Context.h @@ -0,0 +1,38 @@ +#ifndef QPID_BROKER_CONTEXT_H +#define QPID_BROKER_CONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace broker { + +/** + * Base class for context objects. + */ +class Context +{ + public: + virtual ~Context() {} +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index aae858f804..b57ec46adc 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -24,6 +24,7 @@ #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/OwnershipToken.h" #include "qpid/broker/Consumer.h" +#include "qpid/broker/Context.h" #include "qpid/broker/Message.h" #include "qpid/broker/Messages.h" #include "qpid/broker/PersistableQueue.h" @@ -130,7 +131,7 @@ class Queue : public boost::enable_shared_from_this, int autoDeleteTimeout; boost::intrusive_ptr autoDeleteTask; sys::Activity consuming; // Allow consumer threads to be stopped, used by cluster - boost::intrusive_ptr clusterContext; // Used by cluster + std::auto_ptr clusterContext; // Clustering state. void push(boost::intrusive_ptr& msg, bool isRecovery=false); void setPolicy(std::auto_ptr policy); @@ -401,8 +402,8 @@ class Queue : public boost::enable_shared_from_this, bool isConsumingStopped(); /** Context information used in a cluster. */ - boost::intrusive_ptr getClusterContext() { return clusterContext; } - void setClusterContext(boost::intrusive_ptr context) { clusterContext = context; } + Context* getClusterContext() { return clusterContext.get(); } + void setClusterContext(std::auto_ptr context) { clusterContext = context; } }; }} // qpid::broker diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index 9943b3d2b5..5871810a78 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -26,8 +26,6 @@ #include "QueueContext.h" #include "hash.h" #include "qpid/framing/ClusterMessageEnqueueBody.h" -#include "qpid/framing/ClusterMessageAcquireBody.h" -#include "qpid/framing/ClusterMessageDequeueBody.h" #include "qpid/framing/ClusterMessageRequeueBody.h" #include "qpid/framing/ClusterWiringCreateQueueBody.h" #include "qpid/framing/ClusterWiringCreateExchangeBody.h" @@ -113,14 +111,13 @@ void BrokerContext::routed(const boost::intrusive_ptr&) {} void BrokerContext::acquire(const broker::QueuedMessage& qm) { if (tssReplicate) { assert(!qm.queue->isConsumingStopped()); - mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position)); + QueueContext::get(*qm.queue)->localAcquire(qm.position); } } void BrokerContext::dequeue(const broker::QueuedMessage& qm) { if (tssReplicate) - mcaster(qm).mcast( - ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position)); + QueueContext::get(*qm.queue)->localDequeue(qm.position); } void BrokerContext::requeue(const broker::QueuedMessage& qm) { @@ -135,8 +132,7 @@ void BrokerContext::requeue(const broker::QueuedMessage& qm) { void BrokerContext::create(broker::Queue& q) { if (!tssReplicate) return; assert(!QueueContext::get(q)); - boost::intrusive_ptr context( - new QueueContext(q, core.getGroup(q.getName()), core.getSettings().consumeTicks)); + new QueueContext(q, core.getGroup(q.getName()), core.getSettings().consumeTicks); std::string data(q.encodedSize(), '\0'); framing::Buffer buf(&data[0], data.size()); q.encode(buf); @@ -188,7 +184,7 @@ void BrokerContext::cancel(broker::Queue& q, size_t n) { } void BrokerContext::stopped(broker::Queue& q) { - boost::intrusive_ptr qc = QueueContext::get(q); + QueueContext* qc = QueueContext::get(q); // Don't forward the stopped call if the queue does not yet have a // cluster context - this when the queue is first created locally. if (qc) qc->stopped(); diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp index 68b9d5075b..52ae3eeea3 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp @@ -36,7 +36,7 @@ struct Cluster2Plugin : public Plugin { addOptions() ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join") ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver.") - ("cluster2-tick", optValue(settings.tick, "uS"), "Length of 'tick' used for timing events in the cluster.") + ("cluster2-tick", optValue(settings.tick, "uS"), "Length of 'tick' used for timing events in microseconds.") ("cluster2-consume-ticks", optValue(settings.consumeTicks, "N"), "Maximum number of ticks a broker can hold the consume lock on a shared queue."); // FIXME aconway 2011-10-05: add all relevant options from ClusterPlugin.h. // FIXME aconway 2011-10-05: rename to final option names. diff --git a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h index 2e86949dfd..144831dde0 100644 --- a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h +++ b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h @@ -61,6 +61,8 @@ class CountdownTimer { } } + bool isRunning() const { return timerRunning; } + private: class Task : public sys::TimerTask { diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp index dc10548f80..3c19967e06 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp @@ -71,11 +71,11 @@ void EventHandler::deliver( try { handle(frame); } catch (const std::exception& e) { - // FIXME aconway 2011-10-19: error handling. - QPID_LOG(error, "cluster: error in deliver on " << cpg.getName() - << " from " << PrettyId(sender, self) - << ": " << frame - << ": " << e.what()); + // FIXME aconway 2011-10-19: error handling. + QPID_LOG(error, "cluster event: " << e.what() + << " (sender=" << PrettyId(sender, self) << " group=" << cpg.getName() + << " " << frame << ")"); + } } } diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h index 9f9ae1a856..f3a678f026 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h @@ -78,6 +78,14 @@ class EventHandler : public Cpg::Handler MemberId getSelf() { return self; } Cpg& getCpg() { return cpg; } + template boost::intrusive_ptr getHandler() { + for (size_t i = 0; i < handlers.size(); ++i) { + boost::intrusive_ptr p(dynamic_cast(handlers[i].get())); + if (p) return p; + } + return 0; + } + private: void handle(const framing::AMQFrame&); diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.cpp b/qpid/cpp/src/qpid/cluster/exp/Group.cpp index c6d98856a1..0bb805da5b 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Group.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Group.cpp @@ -54,4 +54,7 @@ Group::~Group() {} void Group::mcast(const framing::AMQBody& b) { multicaster->mcast(b); } void Group::mcast(const framing::AMQFrame& f) { multicaster->mcast(f); } + +MemberId Group::getSelf() const { return eventHandler->getSelf(); } + }} // namespace qpid::cluster::exp diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.h b/qpid/cpp/src/qpid/cluster/exp/Group.h index 49b33c6a70..cb216faf8a 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Group.h +++ b/qpid/cpp/src/qpid/cluster/exp/Group.h @@ -22,6 +22,7 @@ * */ +#include "qpid/cluster/types.h" #include "qpid/RefCounted.h" #include @@ -57,6 +58,7 @@ class Group : public RefCounted MessageHolder& getMessageHolder() { return *messageHolder; } MessageBuilders& getMessageBuilders() { return *messageBuilders; } Ticker& getTicker() { return *ticker; } + MemberId getSelf() const; void mcast(const framing::AMQBody&); void mcast(const framing::AMQFrame&); diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index 21129b0fae..c8997877c5 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -88,38 +88,26 @@ void MessageHandler::enqueue(const std::string& q, uint16_t channel) { // We only need to build message from other brokers, our own messages // are held by the MessageHolder. if (sender() != self()) { - boost::shared_ptr queue = findQueue(q, "cluster: enqueue"); + boost::shared_ptr queue = findQueue(q, "cluster enqueue"); messageBuilders.announce(sender(), channel, queue); } } -// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet -// and scan queue once. void MessageHandler::acquire(const std::string& q, uint32_t position) { // FIXME aconway 2011-09-15: systematic logging across cluster module. - QPID_LOG(trace, "cluster: message " << q << "[" << position + QPID_LOG(trace, "cluster message " << q << "[" << position << "] acquired by " << PrettyId(sender(), self())); // Note acquires from other members. My own acquires were executed in // the broker thread if (sender() != self()) { - boost::shared_ptr queue = findQueue(q, "cluster: acquire"); - QueuedMessage qm; - BrokerContext::ScopedSuppressReplication ssr; - if (!queue->acquireMessageAt(position, qm)) - throw Exception(QPID_MSG("cluster: acquire: message not found: " - << q << "[" << position << "]")); - assert(qm.position.getValue() == position); - assert(qm.payload); - // Save on context for possible requeue if released/rejected. - QueueContext::get(*queue)->acquire(qm); - // FIXME aconway 2011-09-19: need to record by member-ID to - // requeue if member leaves. + boost::shared_ptr queue = findQueue(q, "cluster acquire"); + QueueContext::get(*queue)->acquire(position); } } void MessageHandler::dequeue(const std::string& q, uint32_t position) { // FIXME aconway 2011-09-15: systematic logging across cluster module. - QPID_LOG(trace, "cluster: message " << q << "[" << position + QPID_LOG(trace, "cluster message " << q << "[" << position << "] dequeued by " << PrettyId(sender(), self())); // FIXME aconway 2010-10-28: for local dequeues, we should @@ -128,16 +116,14 @@ void MessageHandler::dequeue(const std::string& q, uint32_t position) { // My own dequeues were processed in the broker thread before multicasting. if (sender() != self()) { - boost::shared_ptr queue = findQueue(q, "cluster: dequeue"); - QueuedMessage qm = QueueContext::get(*queue)->dequeue(position); - BrokerContext::ScopedSuppressReplication ssr; - if (qm.queue) queue->dequeue(0, qm); + boost::shared_ptr queue = findQueue(q, "cluster dequeue"); + QueueContext::get(*queue)->dequeue(position); } } void MessageHandler::requeue(const std::string& q, uint32_t position, bool redelivered) { if (sender() != self()) { - boost::shared_ptr queue = findQueue(q, "cluster: requeue"); + boost::shared_ptr queue = findQueue(q, "cluster requeue"); QueueContext::get(*queue)->requeue(position, redelivered); } } diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index ba06ee82f2..ff9c050348 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -21,34 +21,45 @@ */ #include "BrokerContext.h" +#include "EventHandler.h" #include "Group.h" #include "Multicaster.h" #include "QueueContext.h" +#include "QueueHandler.h" #include "hash.h" -#include "qpid/cluster/types.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/framing/ClusterQueueResubscribeBody.h" -#include "qpid/framing/ClusterQueueSubscribeBody.h" -#include "qpid/framing/ClusterQueueUnsubscribeBody.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/framing/ClusterMessageAcquireBody.h" +#include "qpid/framing/ClusterMessageDequeueBody.h" +#include "qpid/framing/ClusterQueueConsumedBody.h" +#include "qpid/framing/ClusterQueueSubscribeBody.h" +#include "qpid/framing/ClusterQueueUnsubscribeBody.h" +#include "qpid/framing/ProtocolVersion.h" #include "qpid/log/Statement.h" namespace qpid { namespace cluster { +using framing::SequenceSet; +const framing::ProtocolVersion pv; // shorthand + QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_) - : consumers(0), consuming(true), ticks(0), + : ownership(UNSUBSCRIBED), consumers(0), consuming(false), ticks(0), queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())), - maxTicks(maxTicks_) + maxTicks(maxTicks_), group(g) { - q.setClusterContext(boost::intrusive_ptr(this)); + q.setClusterContext(std::auto_ptr(this)); q.stopConsumers(); // Stop queue initially. - g.getTicker().add(this); + group.getTicker().add(this); } -QueueContext::~QueueContext() {} +QueueContext::~QueueContext() { + // Lifecycle: must remove all references to this context before it is deleted. + // Must be sure that there can be no use of this context later. + group.getTicker().remove(this); + group.getEventHandler().getHandler()->remove(queue); +} namespace { bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } @@ -57,72 +68,66 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } // Called by QueueReplica in CPG deliver thread when state changes. void QueueContext::replicaState(QueueOwnership before, QueueOwnership after) { + sys::Mutex::ScopedLock l(lock); // Interested in state changes which lead to ownership. // We voluntarily give up ownership before multicasting // the state change so we don't need to handle transitions // that lead to non-ownership. + if (before != after && isOwner(after)) { - bool start = false; - { - sys::Mutex::ScopedLock l(lock); - start = !consuming; - consuming = true; - ticks = 0; - } - if (start) queue.startConsumers(); + assert(before == ownership); + if (!consuming) queue.startConsumers(); + consuming = true; + ticks = 0; } + ownership = after; } // FIXME aconway 2011-07-27: Dont spin the token on an empty queue. // Called in broker threads when a consumer is added void QueueContext::consume(size_t n) { - { - sys::Mutex::ScopedLock l(lock); - consumers = n; - } - if (n == 1) mcast.mcast( - framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName())); + sys::Mutex::ScopedLock l(lock); + if (consumers == 0 && n > 0 && ownership == UNSUBSCRIBED) + mcast.mcast( + framing::ClusterQueueSubscribeBody(pv, queue.getName())); + consumers = n; } // Called in broker threads when a consumer is cancelled void QueueContext::cancel(size_t n) { - bool stop = false; - { - sys::Mutex::ScopedLock l(lock); - consumers = n; - stop = (n == 0 && consuming); - } - if (stop) queue.stopConsumers(); + sys::Mutex::ScopedLock l(lock); + consumers = n; + if (n == 0 && consuming) queue.stopConsumers(); } +// FIXME aconway 2011-11-03: review scope of locking around sendConsumed + // Called in Ticker thread. void QueueContext::tick() { - bool stop = false; - { - sys::Mutex::ScopedLock l(lock); - stop = (consuming && ++ticks >= maxTicks); - } - // When all threads have stopped, queue will call stopped() - if (stop) queue.stopConsumers(); + sys::Mutex::ScopedLock l(lock); + if (!consuming) return; // Nothing to do if we don't have the lock. + if (ownership == SHARED_OWNER && ++ticks >= maxTicks) queue.stopConsumers(); + else if (ownership == SOLE_OWNER) sendConsumed(l); // Status report on consumption } // Callback set up by queue.stopConsumers() called in connection or timer thread. // Called when no threads are dispatching from the queue. void QueueContext::stopped() { - bool resubscribe = false; - { - sys::Mutex::ScopedLock l(lock); - assert(consuming); - consuming = false; - resubscribe = consumers; - } - if (resubscribe) - mcast.mcast(framing::ClusterQueueResubscribeBody( - framing::ProtocolVersion(), queue.getName())); - else - mcast.mcast(framing::ClusterQueueUnsubscribeBody( - framing::ProtocolVersion(), queue.getName())); + sys::Mutex::ScopedLock l(lock); + if (!consuming) return; // !consuming => initial stopConsumers in ctor. + sendConsumed(l); + mcast.mcast( + framing::ClusterQueueUnsubscribeBody(pv, queue.getName(), consumers)); + consuming = false; +} + +void QueueContext::sendConsumed(const sys::Mutex::ScopedLock&) { + if (acquired.empty() && dequeued.empty()) return; // Nothing to send + mcast.mcast( + framing::ClusterQueueConsumedBody(pv, queue.getName(), acquired,dequeued)); + acquired.clear(); + dequeued.clear(); } void QueueContext::requeue(uint32_t position, bool redelivered) { @@ -135,17 +140,76 @@ void QueueContext::requeue(uint32_t position, bool redelivered) { } } -void QueueContext::acquire(const broker::QueuedMessage& qm) { - unacked.put(qm.position, qm); +void QueueContext::localAcquire(uint32_t position) { + QPID_LOG(trace, "cluster queue " << queue.getName() << " acquired " << position); + sys::Mutex::ScopedLock l(lock); + assert(consuming); + acquired.add(position); +} + +void QueueContext::localDequeue(uint32_t position) { + QPID_LOG(trace, "cluster queue " << queue.getName() << " dequeued " << position); + // FIXME aconway 2010-10-28: for local dequeues, we should + // complete the ack that initiated the dequeue at this point. + sys::Mutex::ScopedLock l(lock); + + // FIXME aconway 2011-11-03: this assertion fails for explicit accept + // because it doesn't respect the consume lock. + // assert(consuming); + + dequeued.add(position); +} + +void QueueContext::consumed( + const MemberId& sender, + const SequenceSet& acquired, + const SequenceSet& dequeued) +{ + // No lock, doesn't touch any members. + + // FIXME aconway 2011-09-15: systematic logging across cluster module. + // FIXME aconway 2011-09-23: pretty printing for identifier. + QPID_LOG(trace, "cluster: " << sender << " acquired: " << acquired + << " dequeued: " << dequeued << " on queue: " << queue.getName()); + + // Note acquires from other members. My own acquires were executed in + // the connection thread + if (sender != group.getSelf()) { + // FIXME aconway 2011-09-23: avoid individual finds, scan queue once. + for (SequenceSet::iterator i = acquired.begin(); i != acquired.end(); ++i) + acquire(*i); + } + // Process deques from the queue owner. + // FIXME aconway 2011-09-23: avoid individual finds, scan queue once. + for (SequenceSet::iterator i = dequeued.begin(); i != dequeued.end(); ++i) + dequeue(*i); +} + +// Remote acquire +void QueueContext::acquire(uint32_t position) { + // No lock, doesn't touch any members. + broker::QueuedMessage qm; + BrokerContext::ScopedSuppressReplication ssr; + if (!queue.acquireMessageAt(position, qm)) + // FIXME aconway 2011-10-31: error handling + throw Exception(QPID_MSG("cluster: acquire: message not found: " + << queue.getName() << "[" << position << "]")); + assert(qm.position.getValue() == position); + assert(qm.payload); + unacked.put(qm.position, qm); // unacked has its own lock. } -broker::QueuedMessage QueueContext::dequeue(uint32_t position) { - return unacked.pop(position); +void QueueContext::dequeue(uint32_t position) { + // No lock, doesn't touch any members. unacked has its own lock. + broker::QueuedMessage qm = unacked.pop(position); + BrokerContext::ScopedSuppressReplication ssr; + if (qm.queue) queue.dequeue(0, qm); } -boost::intrusive_ptr QueueContext::get(broker::Queue& q) { - return boost::intrusive_ptr( - static_cast(q.getClusterContext().get())); +QueueContext* QueueContext::get(broker::Queue& q) { + return static_cast(q.getClusterContext()); } +// FIXME aconway 2011-09-23: make unacked a plain map, use lock. + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h index 20c2aabc1d..d7079ab8a5 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -25,11 +25,12 @@ #include "LockedMap.h" #include "Ticker.h" #include "qpid/RefCounted.h" +#include "qpid/broker/Context.h" +#include "qpid/cluster/types.h" +#include "qpid/framing/SequenceSet.h" #include "qpid/sys/AtomicValue.h" -#include "qpid/sys/Time.h" #include "qpid/sys/Mutex.h" -#include "qpid/cluster/types.h" -#include +#include "qpid/sys/Time.h" namespace qpid { namespace broker { @@ -43,12 +44,13 @@ class Multicaster; class Group; /** - * Queue state that is not replicated to the cluster. - * Manages the local queue start/stop status. + * Local Queue state, manage start/stop consuming on the queue. + * Destroyed when the queue is destroyed, it must erase itself + * from any cluster data structures in its destructor. * -* THREAD SAFE: Called by connection threads and Ticker dispatch threads. + * THREAD SAFE: Called by connection threads and Ticker dispatch threads. */ -class QueueContext : public Ticker::Tickable { +class QueueContext : public broker::Context, Ticker::Tickable { public: QueueContext(broker::Queue&, Group&, size_t consumeTicks); ~QueueContext(); @@ -80,33 +82,50 @@ class QueueContext : public Ticker::Tickable { /** Called by MessageHandler to requeue a message. */ void requeue(uint32_t position, bool redelivered); + /** Called by BrokerContext when a mesages is acquired locally. */ + void localAcquire(uint32_t position); + /** Called by MessageHandler when a mesages is acquired. */ - void acquire(const broker::QueuedMessage& qm); + void acquire(uint32_t position); /** Called by MesageHandler when a message is dequeued. */ - broker::QueuedMessage dequeue(uint32_t position); + void dequeue(uint32_t position); - size_t getHash() const { return hash; } + /** Called by BrokerContext when a message is dequeued locally. */ + void localDequeue(uint32_t position); + + /** Called in deliver thread, take note of another brokers acquires/dequeues. */ + void consumed(const MemberId&, + const framing::SequenceSet& acquired, + const framing::SequenceSet& dequeued); + size_t getHash() const { return hash; } + broker::Queue& getQueue() { return queue; } + /** Get the cluster context for a broker queue. */ - static boost::intrusive_ptr get(broker::Queue&) ; + static QueueContext* get(broker::Queue&); private: + void sendConsumed(const sys::Mutex::ScopedLock&); + sys::Mutex lock; - size_t consumers; // Number of local consumers - bool consuming; // True if we have the lock & local consumers are active + QueueOwnership ownership; // Ownership status. + size_t consumers; // Number of local consumers. + bool consuming; // True if we have the lock. size_t ticks; // Ticks since we got the lock. // Following members are immutable - broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr? + broker::Queue& queue; Multicaster& mcast; size_t hash; size_t maxTicks; // Max ticks we are allowed. + framing::SequenceSet acquired, dequeued; // Track local acquires/dequeues. // Following members are safe to use without holding a lock typedef LockedMap UnackedMap; UnackedMap unacked; + Group& group; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp index 0c96e9326d..f763841c20 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp @@ -45,25 +45,36 @@ bool QueueHandler::handle(const framing::AMQFrame& frame) { void QueueHandler::subscribe(const std::string& queue) { find(queue)->subscribe(sender()); } -void QueueHandler::unsubscribe(const std::string& queue) { - find(queue)->unsubscribe(sender()); + +void QueueHandler::unsubscribe(const std::string& queue, + bool resubscribe) { + find(queue)->unsubscribe(sender(), resubscribe); } -void QueueHandler::resubscribe(const std::string& queue) { - find(queue)->resubscribe(sender()); + +void QueueHandler::consumed(const std::string& queue, + const framing::SequenceSet& acquired, + const framing::SequenceSet& dequeued) +{ + find(queue)->consumed(sender(), acquired, dequeued); } void QueueHandler::left(const MemberId& member) { // Unsubscribe for members that leave. for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i) - i->second->unsubscribe(member); + i->second->unsubscribe(member, false); } -void QueueHandler::add(boost::shared_ptr q) { +void QueueHandler::add(broker::Queue& q) { // Local queues already have a context, remote queues need one. - if (!QueueContext::get(*q)) - new QueueContext(*q, group, consumeTicks); // Context attaches to the Queue - queues[q->getName()] = boost::intrusive_ptr( - new QueueReplica(q, self())); + if (!QueueContext::get(q)) + new QueueContext(q, group, consumeTicks); // Context attaches to the Queue + assert(QueueContext::get(q)); + queues[q.getName()] = boost::intrusive_ptr( + new QueueReplica(*QueueContext::get(q), self())); +} + +void QueueHandler::remove(broker::Queue& q) { + queues.erase(q.getName()); } boost::intrusive_ptr QueueHandler::find(const std::string& queue) { diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h index 84e8b75cfb..0f9937641b 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h @@ -60,15 +60,17 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler, // Events void subscribe(const std::string& queue); - void unsubscribe(const std::string& queue); - void resubscribe(const std::string& queue); - void left(const MemberId&); - void add(boost::shared_ptr); + void unsubscribe(const std::string& queue, bool resubscribe); + + void consumed(const std::string& queue, + const framing::SequenceSet& acquired, + const framing::SequenceSet& dequeued); + + void left(const MemberId&); - // NB: These functions ar called in broker threads, not deliver threads. - void acquired(const broker::QueuedMessage& qm); - void empty(const broker::Queue& q); + void add(broker::Queue&); + void remove(broker::Queue&); private: typedef std::map > QueueMap; diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp index 11a7496582..66a7a81f33 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp @@ -28,9 +28,8 @@ namespace qpid { namespace cluster { -QueueReplica::QueueReplica(boost::shared_ptr q, - const MemberId& self_) - : queue(q), self(self_), context(QueueContext::get(*q)) +QueueReplica::QueueReplica(QueueContext& qc, const MemberId& self_) + : self(self_), context(qc) {} struct PrintSubscribers { @@ -56,27 +55,28 @@ void QueueReplica::subscribe(const MemberId& member) { update(before); } -// FIXME aconway 2011-09-20: need to requeue. -void QueueReplica::unsubscribe(const MemberId& member) { +void QueueReplica::unsubscribe(const MemberId& member, bool resubscribe) +{ + assert(!resubscribe || member == subscribers.front()); QueueOwnership before = getState(); MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member); - if (i != subscribers.end()) subscribers.erase(i, subscribers.end()); + subscribers.erase(i, subscribers.end()); + if (resubscribe) subscribers.push_back(member); update(before); } -void QueueReplica::resubscribe(const MemberId& member) { - assert (member == subscribers.front()); - QueueOwnership before = getState(); - subscribers.pop_front(); - subscribers.push_back(member); - update(before); +void QueueReplica::consumed(const MemberId& member, + const framing::SequenceSet& acquired, + const framing::SequenceSet& dequeued) +{ + context.consumed(member, acquired, dequeued); } void QueueReplica::update(QueueOwnership before) { QueueOwnership after = getState(); - QPID_LOG(trace, "cluster: queue replica: " << queue->getName() << ": " + QPID_LOG(trace, "cluster: queue replica: " << context.getQueue().getName() << ": " << before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]"); - context->replicaState(before, after); + context.replicaState(before, after); } QueueOwnership QueueReplica::getState() const { diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h index 31faf4853a..ca92de1e30 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h @@ -50,20 +50,22 @@ struct PrintSubscribers; class QueueReplica : public RefCounted { public: - QueueReplica(boost::shared_ptr , const MemberId& ); + QueueReplica(QueueContext&, const MemberId& ); void subscribe(const MemberId&); - void unsubscribe(const MemberId&); - void resubscribe(const MemberId&); + void unsubscribe(const MemberId&, bool resubscribe); + void consumed(const MemberId&, + const framing::SequenceSet& acquired, + const framing::SequenceSet& dequeued); MemberId getSelf() const { return self; } private: typedef std::deque MemberQueue; - boost::shared_ptr queue; + std::string name; MemberQueue subscribers; MemberId self; - boost::intrusive_ptr context; + QueueContext& context; QueueOwnership getState() const; bool isOwner() const; diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp index 9ff04f2f54..1210eb7055 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp @@ -34,21 +34,21 @@ Ticker::Ticker(sys::Duration tick, sys::Timer& timer_, timer.add(this); } -void Ticker::add(boost::intrusive_ptr t) { +void Ticker::add(Tickable* t) { sys::Mutex::ScopedLock l(lock); tickables.push_back(t); } -void Ticker::remove(boost::intrusive_ptr t) { +void Ticker::remove(Tickable* t) { sys::Mutex::ScopedLock l(lock); Tickables::iterator i = std::find(tickables.begin(), tickables.end(), t); if (i != tickables.end()) tickables.erase(i); } -// Called by timer thread, sets condition +// Called by timer thread void Ticker::fire() { condition.set(); - setupNextFire(); + setupNextFire(); // FIXME aconway 2011-11-03: restart()? timer.add(this); } diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.h b/qpid/cpp/src/qpid/cluster/exp/Ticker.h index 0a8d508a70..6910b7a0be 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Ticker.h +++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.h @@ -53,18 +53,18 @@ namespace cluster { class Ticker : public sys::TimerTask { public: - struct Tickable : public RefCounted { + struct Tickable { virtual ~Tickable(); virtual void tick() = 0; }; Ticker(sys::Duration tick, sys::Timer&, boost::shared_ptr); - void add(boost::intrusive_ptr); - void remove(boost::intrusive_ptr); + void add(Tickable*); + void remove(Tickable*); private: - typedef std::vector > Tickables; + typedef std::vector Tickables; void fire(); // Called in timer thread. void dispatch(sys::PollableCondition&); // Called in IO thread diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp index da110fed8f..c2f52f2a27 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp @@ -72,7 +72,7 @@ void WiringHandler::createQueue(const std::string& data) { } boost::shared_ptr q = broker.getQueues().find(name); assert(q); // FIXME aconway 2011-05-10: error handling. - queueHandler->add(q); + queueHandler->add(*q); QPID_LOG(debug, "cluster: create queue " << q->getName()); } diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py index d5c9ffb61d..c5f6157f34 100755 --- a/qpid/cpp/src/tests/cluster2_tests.py +++ b/qpid/cpp/src/tests/cluster2_tests.py @@ -50,12 +50,6 @@ class Cluster2Tests(BrokerTest): if time.time() > timeout: fail("Time out in wait_for_queue(%s))"%queue) time.sleep(0.01) - # FIXME aconway 2011-05-17: remove, use assert_browse. - def verify_content(self, expect, receiver): - actual = [receiver.fetch(1).content for x in expect] - self.assertEqual(expect, actual) - self.assertRaises(Empty, receiver.fetch, 0) - def test_message_enqueue(self): """Test basic replication of enqueued messages. Verify that fanout messages are replicated correctly. @@ -64,13 +58,12 @@ class Cluster2Tests(BrokerTest): cluster = self.cluster(2, cluster2=True) sn0 = cluster[0].connect().session() - r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); - r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); s0 = sn0.sender("amq.fanout"); - sn1 = cluster[1].connect().session() - r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); - r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); + + # Bind queues to amq.fanout + sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); + sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); # Send messages on member 0 @@ -78,10 +71,10 @@ class Cluster2Tests(BrokerTest): for m in content: s0.send(Message(m)) # Browse on both members. - self.verify_content(content, r0p) - self.verify_content(content, r0q) - self.verify_content(content, r1p) - self.verify_content(content, r1q) + self.assert_browse(sn0, "p", content) + self.assert_browse(sn0, "q", content) + self.assert_browse(sn1, "p", content) + self.assert_browse(sn1, "q", content) sn1.connection.close() sn0.connection.close() @@ -98,16 +91,16 @@ class Cluster2Tests(BrokerTest): content = ["a","b","c"] for m in content: s0.send(Message(m)) # Verify enqueued on members 0 and 1 - self.verify_content(content, sn0.receiver("q;{mode:browse}")) - self.verify_content(content, sn1.receiver("q;{mode:browse}")) + self.assert_browse(sn0, "q", content) + self.assert_browse(sn1, "q", content) # Dequeue on cluster[0] self.assertEqual(r0.fetch(1).content, "a") sn0.acknowledge(sync=True) # Verify dequeued on cluster[0] and cluster[1] - self.verify_content(["b", "c"], sn0.receiver("q;{mode:browse}")) - self.verify_content(["b", "c"], sn1.receiver("q;{mode:browse}")) + self.assert_browse(sn0, "q", ["b", "c"]) + self.assert_browse(sn1, "q", ["b", "c"]) def test_wiring(self): """Test replication of wiring""" diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 26283adb6c..9a0ee6b384 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -184,8 +184,7 @@ def recreate_queues(queues, brokers): # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate for b in brokers: while queue_exists(q,b): time.sleep(0.1); - for q in queues: - s.sender("%s;{create:always}"%q) + s.sender("%s;{create:always}"%q) # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate for b in brokers: while not queue_exists(q,b): time.sleep(0.1); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 0d325c4d12..18d4f9bacd 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -351,7 +351,6 @@ - @@ -387,6 +386,9 @@ + + + - + + + + - - + + + + - -- cgit v1.2.1