diff options
22 files changed, 238 insertions, 205 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 8040d74a75..db18c48d82 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -134,6 +134,9 @@ class Connection : public sys::ConnectionInputHandler, /** Called by cluster to mark shadow connections */ void setShadow() { shadow = true; } + // Used by cluster to update connection status + sys::AggregateOutput& getOutputTasks() { return outputTasks; } + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index e5bcf9ef57..c96b1af6f8 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1025,3 +1025,5 @@ bool Queue::isEnqueued(const QueuedMessage& msg) { return !policy.get() || policy->isEnqueued(msg); } + +QueueListeners& Queue::getListeners() { return listeners; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index a8b775cba7..7890e46b03 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -325,6 +325,9 @@ namespace qpid { * Notify queue that recovery has completed. */ void recoveryComplete(); + + // For cluster update + QueueListeners& getListeners(); }; } } diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.cpp b/qpid/cpp/src/qpid/broker/QueueListeners.cpp index 7baca7d0f4..6b3d90906c 100644 --- a/qpid/cpp/src/qpid/broker/QueueListeners.cpp +++ b/qpid/cpp/src/qpid/broker/QueueListeners.cpp @@ -46,9 +46,11 @@ void QueueListeners::populate(NotificationSet& set) { if (consumers.size()) { set.consumer = consumers.front(); - consumers.pop_front(); + consumers.erase(consumers.begin()); } else { - browsers.swap(set.browsers); + // Don't swap the vectors, hang on to the memory allocated. + set.browsers = browsers; + browsers.clear(); } } @@ -70,4 +72,10 @@ void QueueListeners::NotificationSet::notify() else for_each(browsers.begin(), browsers.end(), boost::mem_fn(&Consumer::notify)); } +bool QueueListeners::contains(Consumer::shared_ptr c) const { + return + find(browsers.begin(), browsers.end(), c) != browsers.end() || + find(consumers.begin(), consumers.end(), c) != consumers.end(); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.h b/qpid/cpp/src/qpid/broker/QueueListeners.h index 53ed6a17e4..32260dd736 100644 --- a/qpid/cpp/src/qpid/broker/QueueListeners.h +++ b/qpid/cpp/src/qpid/broker/QueueListeners.h @@ -22,7 +22,7 @@ * */ #include "Consumer.h" -#include <list> +#include <vector> namespace qpid { namespace broker { @@ -40,7 +40,7 @@ namespace broker { class QueueListeners { public: - typedef std::list<Consumer::shared_ptr> Listeners; + typedef std::vector<Consumer::shared_ptr> Listeners; class NotificationSet { @@ -55,6 +55,8 @@ class QueueListeners void addListener(Consumer::shared_ptr); void removeListener(Consumer::shared_ptr); void populate(NotificationSet&); + bool contains(Consumer::shared_ptr c) const; + private: Listeners consumers; Listeners browsers; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 8f918ff40f..40c9bf296e 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -61,7 +61,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) deliveryAdapter(da), tagGenerator("sgen"), dtxSelected(false), - outputTasks(ss), authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))) { @@ -90,7 +89,6 @@ void SemanticState::consume(const string& tag, { ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception - outputTasks.addOutputTask(c.get()); consumers[tag] = c; } @@ -98,7 +96,7 @@ void SemanticState::cancel(const string& tag){ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { cancel(i->second); - consumers.erase(i); + consumers.erase(i); //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); @@ -257,9 +255,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, msgCredit(0), byteCredit(0), notifyEnabled(true), - queueHasMessages(1), syncFrequency(_arguments.getAsInt("qpid.sync_frequency")), - deliveryCount(0) {} + deliveryCount(0) +{} OwnershipToken* SemanticState::ConsumerImpl::getSession() { @@ -290,6 +288,11 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { + // FIXME aconway 2009-06-08: if we have byte & message credit but + // checkCredit fails because the message is to big, we should + // remain on queue's listener list for possible smaller messages + // in future. + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } @@ -328,7 +331,8 @@ SemanticState::ConsumerImpl::~ConsumerImpl() {} void SemanticState::cancel(ConsumerImpl::shared_ptr c) { c->disableNotify(); - outputTasks.removeOutputTask(c.get()); + if (session.isAttached()) + session.getConnection().outputTasks.removeOutputTask(c.get()); Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); @@ -397,16 +401,18 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { } void SemanticState::requestDispatch() -{ - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - requestDispatch(*(i->second)); - } +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) + i->second->requestDispatch(); } -void SemanticState::requestDispatch(ConsumerImpl& c) -{ - if(c.isBlocked()) - outputTasks.activateOutput(); +void SemanticState::ConsumerImpl::requestDispatch() +{ + if (blocked) { + parent->session.getConnection().outputTasks.addOutputTask(this); + parent->session.getConnection().outputTasks.activateOutput(); + blocked = false; + } } bool SemanticState::complete(DeliveryRecord& delivery) @@ -475,7 +481,7 @@ void SemanticState::addByteCredit(const std::string& destination, uint32_t value { ConsumerImpl& c = find(destination); c.addByteCredit(value); - requestDispatch(c); + c.requestDispatch(); } @@ -483,7 +489,7 @@ void SemanticState::addMessageCredit(const std::string& destination, uint32_t va { ConsumerImpl& c = find(destination); c.addMessageCredit(value); - requestDispatch(c); + c.requestDispatch(); } void SemanticState::flush(const std::string& destination) @@ -593,11 +599,7 @@ bool SemanticState::ConsumerImpl::hasOutput() { bool SemanticState::ConsumerImpl::doOutput() { - if (!haveCredit() || !queueHasMessages.boolCompareAndSwap(1, 0)) - return false; - if (queue->dispatch(shared_from_this())) - queueHasMessages.boolCompareAndSwap(0, 1); - return queueHasMessages.get(); + return haveCredit() && queue->dispatch(shared_from_this()); } void SemanticState::ConsumerImpl::enableNotify() @@ -619,14 +621,11 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const { void SemanticState::ConsumerImpl::notify() { - queueHasMessages.boolCompareAndSwap(0, 1); - - //TODO: alter this, don't want to hold locks across external - //calls; for now its is required to protect the notify() from - //having part of the object chain of the invocation being - //concurrently deleted Mutex::ScopedLock l(lock); - if (notifyEnabled) parent->outputTasks.activateOutput(); + if (notifyEnabled) { + parent->session.getConnection().outputTasks.addOutputTask(this); + parent->session.getConnection().outputTasks.activateOutput(); + } } @@ -670,13 +669,16 @@ void SemanticState::attached() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->enableNotify(); + session.getConnection().outputTasks.addOutputTask(i->second.get()); } + session.getConnection().outputTasks.activateOutput(); } void SemanticState::detached() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->disableNotify(); + session.getConnection().outputTasks.removeOutputTask(i->second.get()); } } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index a69962c083..0f2e08cb3c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -55,9 +55,7 @@ class SessionContext; * SemanticState holds the L3 and L4 state of an open session, whether * attached to a channel or suspended. */ -class SemanticState : public sys::OutputTask, - private boost::noncopyable -{ +class SemanticState : private boost::noncopyable { public: class ConsumerImpl : public Consumer, public sys::OutputTask, public boost::enable_shared_from_this<ConsumerImpl> @@ -77,9 +75,6 @@ class SemanticState : public sys::OutputTask, uint32_t msgCredit; uint32_t byteCredit; bool notifyEnabled; - // queueHasMessages is boolean but valgrind has trouble with - // AtomicValue<bool> so use an int with 1 or 0. - sys:: AtomicValue<int> queueHasMessages; const int syncFrequency; int deliveryCount; @@ -105,6 +100,8 @@ class SemanticState : public sys::OutputTask, void notify(); bool isNotifyEnabled() const; + void requestDispatch(); + void setWindowMode(); void setCreditMode(); void addByteCredit(uint32_t value); @@ -130,6 +127,8 @@ class SemanticState : public sys::OutputTask, std::string getResumeId() const { return resumeId; }; uint64_t getResumeTtl() const { return resumeTtl; } const framing::FieldTable& getArguments() const { return arguments; } + + SemanticState& getParent() { return *parent; } }; private: @@ -147,7 +146,6 @@ class SemanticState : public sys::OutputTask, DtxBufferMap suspendedXids; framing::SequenceSet accumulatedAck; boost::shared_ptr<Exchange> cacheExchange; - sys::AggregateOutput outputTasks; AclModule* acl; const bool authMsg; const string userID; @@ -158,7 +156,6 @@ class SemanticState : public sys::OutputTask, bool complete(DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); - void requestDispatch(ConsumerImpl&); void cancel(ConsumerImpl::shared_ptr); public: @@ -208,8 +205,6 @@ class SemanticState : public sys::OutputTask, void release(DeliveryId first, DeliveryId last, bool setRedelivered); void reject(DeliveryId first, DeliveryId last); void handle(boost::intrusive_ptr<Message> msg); - bool hasOutput() { return outputTasks.hasOutput(); } - bool doOutput() { return outputTasks.doOutput(); } //final 0-10 spec (completed and accepted are distinct): void completed(DeliveryId deliveryTag, DeliveryId endTag); @@ -218,10 +213,11 @@ class SemanticState : public sys::OutputTask, void attached(); void detached(); - // Used by cluster to re-create replica sessions - static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); } - - template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); } + // Used by cluster to re-create sessions + template <class F> void eachConsumer(F f) { + for(ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); ++i) + f(i->second); + } DeliveryRecords& getUnacked() { return unacked; } framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; } TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; } diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index 7a277964ab..7bc14daf5d 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -40,9 +40,11 @@ class SessionContext : public OwnershipToken, public sys::OutputControl public: virtual ~SessionContext(){} virtual bool isLocal(const ConnectionToken* t) const = 0; + virtual bool isAttached() const = 0; virtual ConnectionState& getConnection() = 0; virtual framing::AMQP_ClientProxy& getProxy() = 0; virtual Broker& getBroker() = 0; + virtual uint16_t getChannel() const = 0; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index f5e9139a76..b465a65bd3 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -99,6 +99,11 @@ AMQP_ClientProxy& SessionState::getProxy() { return handler->getProxy(); } +uint16_t SessionState::getChannel() const { + assert(isAttached()); + return handler->getChannel(); +} + ConnectionState& SessionState::getConnection() { assert(isAttached()); return handler->getConnection(); @@ -119,8 +124,7 @@ void SessionState::detach() { void SessionState::disableOutput() { - semanticState.detached();//prevents further activateOutput calls until reattached - getConnection().outputTasks.removeOutputTask(&semanticState); + semanticState.detached(); //prevents further activateOutput calls until reattached } void SessionState::attach(SessionHandler& h) { @@ -362,10 +366,6 @@ void SessionState::readyToSend() { QPID_LOG(debug, getId() << ": ready to send, activating output."); assert(handler); semanticState.attached(); - sys::AggregateOutput& tasks = handler->getConnection().outputTasks; - tasks.addOutputTask(&semanticState); - tasks.activateOutput(); - if (rateFlowcontrol) { qpid::sys::ScopedLock<Mutex> l(rateLock); // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index ef6c56ddbe..f9d35e2aac 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -81,6 +81,9 @@ class SessionState : public qpid::SessionState, framing::AMQP_ClientProxy& getProxy(); /** @pre isAttached() */ + uint16_t getChannel() const; + + /** @pre isAttached() */ ConnectionState& getConnection(); bool isLocal(const ConnectionToken* t) const; diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 37562ce46c..fe6958244f 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -755,13 +755,16 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } -void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) { +void Cluster::errorCheck(const MemberId& m, uint8_t type, uint64_t frameSeq, Lock&) { // If we receive an errorCheck here, it's because we have processed past the point // of the error so respond with ERROR_TYPE_NONE assert(map.getFrameSeq() >= frameSeq); - if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE. + if (type != framing::cluster::ERROR_TYPE_NONE) { // Don't respond to NONE. + QPID_LOG(debug, "Error " << frameSeq << " on " << m << " did not occur locally"); mcast.mcastControl( - ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self); + ClusterErrorCheckBody(ProtocolVersion(), + framing::cluster::ERROR_TYPE_NONE, frameSeq), self); + } } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index b857c8a913..c6b5f8499c 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -113,7 +113,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { Decoder& getDecoder() { return decoder; } ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; } - + private: typedef sys::Monitor::ScopedLock Lock; diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index afecbd50e5..e7dac82159 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -245,10 +245,13 @@ broker::SemanticState& Connection::semanticState() { return sessionState().getSemanticState(); } -void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) { +void Connection::consumerState( + const string& name, bool blocked, bool notifyEnabled, bool isInListener) +{ broker::SemanticState::ConsumerImpl& c = semanticState().find(name); c.setBlocked(blocked); if (notifyEnabled) c.enableNotify(); else c.disableNotify(); + if (isInListener) c.getQueue()->getListeners().addListener(c.shared_from_this()); } void Connection::sessionState( @@ -270,6 +273,17 @@ void Connection::sessionState( unknownCompleted, receivedIncomplete); QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); + // The output tasks will be added later in the update process. + connection.getOutputTasks().removeAll(); +} + +void Connection::outputTask(uint16_t channel, const std::string& name) { + broker::SessionState* session = connection.getChannel(channel).getSession(); + if (!session) + throw Exception(QPID_MSG(cluster << " channel not attached " << *this + << "[" << channel << "] ")); + OutputTask* task = &session->getSemanticState().find(name); + connection.getOutputTasks().addOutputTask(task); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) { diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 8e3b0ad337..51aab92bfc 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -103,7 +103,7 @@ class Connection : // Called for data delivered from the cluster. void deliveredFrame(const EventFrame&); - void consumerState(const std::string& name, bool blocked, bool notifyEnabled); + void consumerState(const std::string& name, bool blocked, bool notifyEnabled, bool isInListener); // ==== Used in catch-up mode to build initial state. // @@ -115,6 +115,8 @@ class Connection : const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); + void outputTask(uint16_t channel, const std::string& name); + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax); void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq); diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index ef99058471..3c3c330787 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -48,8 +48,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) { LATENCY_TRACK(doOutputTracker.finish(f.getBody())); parent.getCluster().checkQuorum(); { - // FIXME aconway 2009-04-28: locking around next-> may be redundant - // with the fixes to read-credit in the IO layer. Review. sys::Mutex::ScopedLock l(lock); next->send(f); } diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 332e74c512..7c305a2e92 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -54,6 +54,7 @@ #include "qpid/log/Statement.h" #include "qpid/Url.h" #include <boost/bind.hpp> +#include <boost/cast.hpp> #include <algorithm> namespace qpid { @@ -64,6 +65,8 @@ using broker::Exchange; using broker::Queue; using broker::QueueBinding; using broker::Message; +using broker::SemanticState; + using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; @@ -125,7 +128,8 @@ void UpdateClient::update() { Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1)); - // Update queue is used to transfer acquired messages that are no longer on their original queue. + // Update queue is used to transfer acquired messages that are no + // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); @@ -256,6 +260,16 @@ void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& que s.exchangeBind(queue, binding.exchange, binding.key, binding.args); } +void UpdateClient::updateOutputTask(const sys::OutputTask* task) { + const SemanticState::ConsumerImpl* cci = + boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task); + SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); + uint16_t channel = ci->getParent().getSession().getChannel(); + ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); + QPID_LOG(debug, updaterId << " updating output task " << ci->getName() + << " channel=" << channel); +} + void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); shadowConnection = catchUpConnection(); @@ -266,6 +280,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); // Safe to use decoder here because we are stalled for update. std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment(); + bc.getOutputTasks().eachOutput( + boost::bind(&UpdateClient::updateOutputTask, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), updateConnection->getId().getNumber(), @@ -294,9 +310,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, updaterId << " updating exclusive queues."); ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1)); - // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33. QPID_LOG(debug, updaterId << " updating consumers."); - ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this)); + ss->getSemanticState().eachConsumer( + boost::bind(&UpdateClient::updateConsumer, this, _1)); QPID_LOG(debug, updaterId << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); @@ -304,7 +320,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { updateTxState(ss->getSemanticState()); // Tx transaction state. - // Adjust for command counter for message in progress, will be sent after state update. + // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) @@ -328,8 +344,11 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); } -void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) { - QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); +void UpdateClient::updateConsumer( + const broker::SemanticState::ConsumerImpl::shared_ptr& ci) +{ + QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " + << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), @@ -344,13 +363,12 @@ void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); - ClusterConnectionConsumerStateBody state( - ProtocolVersion(), + ClusterConnectionProxy(shadowSession).consumerState( ci->getName(), ci->isBlocked(), - ci->isNotifyEnabled() + ci->isNotifyEnabled(), + ci->getQueue()->getListeners().contains(ci) ); - client::SessionBase_0_10Access(shadowSession).get()->send(state); QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index 030566b52d..ba5bdd1d75 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -91,7 +91,8 @@ class UpdateClient : public sys::Runnable { void updateConnection(const boost::intrusive_ptr<Connection>& connection); void updateSession(broker::SessionHandler& s); void updateTxState(broker::SemanticState& s); - void updateConsumer(const broker::SemanticState::ConsumerImpl*); + void updateOutputTask(const sys::OutputTask* task); + void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); MemberId updaterId; MemberId updateeId; diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp index cac4e6681e..e61e3f2edf 100644 --- a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp +++ b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp @@ -26,60 +26,6 @@ using qpid::framing::SequenceNumber; using qpid::framing::Buffer; -SequenceNumber::SequenceNumber() : value(0) {} - -SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {} - -bool SequenceNumber::operator==(const SequenceNumber& other) const -{ - return value == other.value; -} - -bool SequenceNumber::operator!=(const SequenceNumber& other) const -{ - return !(value == other.value); -} - - -SequenceNumber& SequenceNumber::operator++() -{ - value = value + 1; - return *this; -} - -const SequenceNumber SequenceNumber::operator++(int) -{ - SequenceNumber old(value); - value = value + 1; - return old; -} - -SequenceNumber& SequenceNumber::operator--() -{ - value = value - 1; - return *this; -} - -bool SequenceNumber::operator<(const SequenceNumber& other) const -{ - return (value - other.value) < 0; -} - -bool SequenceNumber::operator>(const SequenceNumber& other) const -{ - return other < *this; -} - -bool SequenceNumber::operator<=(const SequenceNumber& other) const -{ - return *this == other || *this < other; -} - -bool SequenceNumber::operator>=(const SequenceNumber& other) const -{ - return *this == other || *this > other; -} - void SequenceNumber::encode(Buffer& buffer) const { buffer.putLong(value); @@ -97,12 +43,6 @@ uint32_t SequenceNumber::encodedSize() const { namespace qpid { namespace framing { -int32_t operator-(const SequenceNumber& a, const SequenceNumber& b) -{ - int32_t result = a.value - b.value; - return result; -} - std::ostream& operator<<(std::ostream& o, const SequenceNumber& n) { return o << n.getValue(); } diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.h b/qpid/cpp/src/qpid/framing/SequenceNumber.h index 3b18ce1360..c208739cdd 100644 --- a/qpid/cpp/src/qpid/framing/SequenceNumber.h +++ b/qpid/cpp/src/qpid/framing/SequenceNumber.h @@ -22,6 +22,7 @@ #define _framing_SequenceNumber_h #include "amqp_types.h" +#include <boost/operators.hpp> #include <iosfwd> #include "qpid/CommonImportExport.h" @@ -33,35 +34,37 @@ class Buffer; /** * 4-byte sequence number that 'wraps around'. */ -class SequenceNumber +class SequenceNumber : public +boost::equality_comparable< + SequenceNumber, boost::less_than_comparable< + SequenceNumber, boost::incrementable< + SequenceNumber, boost::decrementable<SequenceNumber> > > > { int32_t value; - public: - QPID_COMMON_EXTERN SequenceNumber(); - QPID_COMMON_EXTERN SequenceNumber(uint32_t v); - - QPID_COMMON_EXTERN SequenceNumber& operator++();//prefix ++ - QPID_COMMON_EXTERN const SequenceNumber operator++(int);//postfix ++ - QPID_COMMON_EXTERN SequenceNumber& operator--();//prefix ++ - QPID_COMMON_EXTERN bool operator==(const SequenceNumber& other) const; - QPID_COMMON_EXTERN bool operator!=(const SequenceNumber& other) const; - QPID_COMMON_EXTERN bool operator<(const SequenceNumber& other) const; - QPID_COMMON_EXTERN bool operator>(const SequenceNumber& other) const; - QPID_COMMON_EXTERN bool operator<=(const SequenceNumber& other) const; - QPID_COMMON_EXTERN bool operator>=(const SequenceNumber& other) const; - uint32_t getValue() const { return (uint32_t) value; } - operator uint32_t() const { return (uint32_t) value; } - - QPID_COMMON_EXTERN friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b); + public: + SequenceNumber(uint32_t v=0) : value(v) {} + + SequenceNumber& operator++() { ++value; return *this; } + SequenceNumber& operator--() { --value; return *this; } + bool operator==(const SequenceNumber& other) const { return value == other.value; } + bool operator<(const SequenceNumber& other) const { return (value - other.value) < 0; } + uint32_t getValue() const { return uint32_t(value); } + operator uint32_t() const { return uint32_t(value); } void encode(Buffer& buffer) const; void decode(Buffer& buffer); uint32_t encodedSize() const; template <class S> void serialize(S& s) { s(value); } + + friend inline int32_t operator-(const SequenceNumber& a, const SequenceNumber& b); }; +inline int32_t operator-(const SequenceNumber& a, const SequenceNumber& b) { + return int32_t(a.value - b.value); +} + struct Window { SequenceNumber hwm; diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp index 74bf6d0f85..d46fccc208 100644 --- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp @@ -26,50 +26,66 @@ namespace qpid { namespace sys { +AggregateOutput::AggregateOutput(OutputControl& c) : busy(false), control(c) {} + void AggregateOutput::abort() { control.abort(); } void AggregateOutput::activateOutput() { control.activateOutput(); } void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } -bool AggregateOutput::hasOutput() { - for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i) - if ((*i)->hasOutput()) return true; - return false; +bool AggregateOutput::AggregateOutput::hasOutput() { + Mutex::ScopedLock l(lock); + return !tasks.empty(); } -bool AggregateOutput::doOutput() -{ - bool result = false; - if (!tasks.empty()) { - if (next >= tasks.size()) next = next % tasks.size(); +// Clear the busy flag and notify waiting threads in destructor. +struct ScopedBusy { + bool& flag; + Monitor& monitor; + ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; } + ~ScopedBusy() { flag = false; monitor.notifyAll(); } +}; + +bool AggregateOutput::doOutput() { + Mutex::ScopedLock l(lock); + ScopedBusy sb(busy, lock); - size_t start = next; - //loop until a task generated some output - while (!result) { - result = tasks[next++]->doOutput(); - if (tasks.empty()) break; - if (next >= tasks.size()) next = next % tasks.size(); - if (start == next) break; + while (!tasks.empty()) { + OutputTask* t=tasks.front(); + tasks.pop_front(); + bool didOutput; + { + // Allow concurrent call to addOutputTask. + // removeOutputTask will wait till !busy before removing a task. + Mutex::ScopedUnlock u(lock); + didOutput = t->doOutput(); + } + if (didOutput) { + tasks.push_back(t); + return true; } } - return result; + return false; } - -void AggregateOutput::addOutputTask(OutputTask* t) -{ - tasks.push_back(t); + +void AggregateOutput::addOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + tasks.push_back(task); } -void AggregateOutput::removeOutputTask(OutputTask* t) -{ - TaskList::iterator i = std::find(tasks.begin(), tasks.end(), t); - if (i != tasks.end()) tasks.erase(i); +void AggregateOutput::removeOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); + tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end()); } - + void AggregateOutput::removeAll() { + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); tasks.clear(); } + }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h index b33113796c..4e3190a093 100644 --- a/qpid/cpp/src/qpid/sys/AggregateOutput.h +++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h @@ -21,47 +21,58 @@ #ifndef _AggregateOutput_ #define _AggregateOutput_ -#include "Mutex.h" +#include "Monitor.h" #include "OutputControl.h" #include "OutputTask.h" #include "qpid/CommonImportExport.h" #include <algorithm> -#include <vector> +#include <deque> namespace qpid { namespace sys { - class AggregateOutput : public OutputTask, public OutputControl - { - typedef std::vector<OutputTask*> TaskList; +/** + * Holds a collection of output tasks, doOutput picks the next one to execute. + * + * Tasks are automatically removed if their doOutput() or hasOutput() returns false. + * + * Thread safe. addOutputTask may be called in one connection thread while + * doOutput is called in another. + */ + +class AggregateOutput : public OutputTask, public OutputControl +{ + typedef std::deque<OutputTask*> TaskList; + + Monitor lock; + TaskList tasks; + bool busy; + OutputControl& control; - TaskList tasks; - size_t next; - OutputControl& control; + public: + QPID_COMMON_EXTERN AggregateOutput(OutputControl& c); - public: - AggregateOutput(OutputControl& c) : next(0), control(c) {}; - //this may be called on any thread - QPID_COMMON_EXTERN void abort(); - QPID_COMMON_EXTERN void activateOutput(); - QPID_COMMON_EXTERN void giveReadCredit(int32_t); + // These may be called concurrently with any function. + QPID_COMMON_EXTERN void abort(); + QPID_COMMON_EXTERN void activateOutput(); + QPID_COMMON_EXTERN void giveReadCredit(int32_t); + QPID_COMMON_EXTERN void addOutputTask(OutputTask* t); - //all the following will be called on the same thread - QPID_COMMON_EXTERN bool doOutput(); - QPID_COMMON_EXTERN bool hasOutput(); - QPID_COMMON_EXTERN void addOutputTask(OutputTask* t); - QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t); - QPID_COMMON_EXTERN void removeAll(); + // These functions must not be called concurrently with each other. + QPID_COMMON_EXTERN bool doOutput(); + QPID_COMMON_EXTERN bool hasOutput(); + QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t); + QPID_COMMON_EXTERN void removeAll(); - /** Apply f to each OutputTask* in the tasks list */ - template <class F> void eachOutput(F f) { - std::for_each(tasks.begin(), tasks.end(), f); - } - }; + /** Apply f to each OutputTask* in the tasks list */ + template <class F> void eachOutput(F f) { + Mutex::ScopedLock l(lock); + std::for_each(tasks.begin(), tasks.end(), f); + } +}; -} -} +}} // namespace qpid::sys #endif diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 58b067a3db..8b9cbfed1e 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -65,8 +65,6 @@ </class> - <!-- TODO aconway 2008-09-10: support for un-attached connections. --> - <!-- Controls associated with a specific connection. --> <class name="cluster-connection" code="0x81" label="Qpid clustering extensions."> @@ -91,6 +89,8 @@ <field name="name" type="str8"/> <field name="blocked" type="bit"/> <field name="notifyEnabled" type="bit"/> + <!-- Flag set if the consumer is in its queue's listener set. --> + <field name="is-in-listener" type="bit"/> </control> <!-- Delivery-record for outgoing messages sent but not yet accepted. --> @@ -121,8 +121,14 @@ <control name="tx-end" code="0x17"/> <control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control> + <!-- Consumers in the connection's output task --> + <control name="output-task" code="0x19"> + <field name="channel" type="uint16"/> + <field name="name" type="str8"/> + </control> + <!-- Complete a session state update. --> - <control name="session-state" code="0x1F" label="Set session state during a brain update."> + <control name="session-state" code="0x1F"> <!-- Target session deduced from channel number. --> <field name="replay-start" type="sequence-no"/> <!-- Replay frames will start from this point.--> <field name="command-point" type="sequence-no"/> <!-- Id of next command sent --> |