From f31b86c7d7dc7e76a8008860d28049e609b4a7ab Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Mon, 7 Feb 2011 22:51:45 +0000 Subject: QPID-2935: clean up SessionState changes, fix races git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1068199 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/AsyncCompletion.h | 10 ++-- qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 24 +------- qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 4 +- qpid/cpp/src/qpid/broker/SessionState.cpp | 87 ++++++++++++++--------------- qpid/cpp/src/qpid/broker/SessionState.h | 26 ++++++++- 5 files changed, 73 insertions(+), 78 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h index 382e2f7565..c66609f8a6 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h +++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h @@ -92,9 +92,12 @@ namespace qpid { qpid::sys::Mutex::ScopedLock l(callbackLock); inCallback = true; if (handler) { - qpid::sys::Mutex::ScopedUnlock ul(callbackLock); - (*handler)(sync); - handler.reset(); + boost::shared_ptr tmp; + tmp.swap(handler); + { + qpid::sys::Mutex::ScopedUnlock ul(callbackLock); + (*tmp)(sync); + } } inCallback = false; callbackLock.notifyAll(); @@ -142,7 +145,6 @@ namespace qpid { /** called by initiator after all potential completers have called * startCompleter(). */ - //void end(CompletionHandler::shared_ptr& _handler) void end(boost::shared_ptr _handler) { assert(completionsNeeded.get() > 0); // ensure begin() has been called! diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index ad7e8f9bb3..7cd7b99557 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -116,7 +116,7 @@ bool QueueFlowLimit::consume(const QueuedMessage& msg) if (!msg.payload) return false; - sys::Mutex::ScopedLock l(pendingFlowLock); + sys::Mutex::ScopedLock l(indexLock); ++count; size += msg.payload->contentSize(); @@ -153,7 +153,7 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg) if (!msg.payload) return false; - sys::Mutex::ScopedLock l(pendingFlowLock); + sys::Mutex::ScopedLock l(indexLock); if (count > 0) { --count; @@ -190,15 +190,6 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg) if (itr != index.end()) { // this msg is flow controlled, release it: (*itr)->getReceiveCompletion().finishCompleter(); index.erase(itr); - //// stupid: (hopefully this is the next pending msg) - //std::list< boost::intrusive_ptr >::iterator itr2 = find(pendingFlow.begin(), - // pendingFlow.end(), - // msg.payload); - //if (itr2 == pendingFlow.end()) { - // QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position); - //} else { - // pendingFlow.erase(itr2); - //} } } } @@ -274,14 +265,3 @@ std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f) } } -/** - * TBD: - * - Is there a direct way to determine if QM is on pendingFlow list? - * - Rate limit the granting of flow. - * - What about LVQ? A newer msg may replace the older one. - * - What about queueing during a recovery? - * - What about queue purge? - * - What about message move? - * - How do we treat orphaned messages? - * -- Xfer a message to an alternate exchange - do we ack? - */ diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index 57d06c6bdc..2de214801f 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -86,9 +86,7 @@ class QueueFlowLimit protected: // msgs waiting for flow to become available. std::set< boost::intrusive_ptr > index; - // KAG: is this necessary? Not if we release all pending when level < low (?) - // std::list< boost::intrusive_ptr > pendingFlow; // ordered, oldest @front - qpid::sys::Mutex pendingFlowLock; + qpid::sys::Mutex indexLock; QueueFlowLimit(Queue *queue, uint32_t flowStopCount, uint32_t flowResumeCount, diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index ca98ee1437..d9af4b13c5 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -60,7 +60,8 @@ SessionState::SessionState( adapter(semanticState), msgBuilder(&broker.getStore()), mgmtObject(0), - rateFlowcontrol(0) + rateFlowcontrol(0), + scheduledRcvMsgs(new IncompleteRcvMsg::deque) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -95,16 +96,19 @@ SessionState::~SessionState() { flowControlTimer->cancel(); // clean up any outstanding incomplete receive messages - qpid::sys::ScopedLock l(incompleteRcvMsgsLock); - while (!incompleteRcvMsgs.empty()) { - boost::shared_ptr ref(incompleteRcvMsgs.front()); - incompleteRcvMsgs.pop_front(); + std::map copy(incompleteRcvMsgs); + incompleteRcvMsgs.clear(); + while (!copy.empty()) { + boost::shared_ptr ref(copy.begin()->second); + copy.erase(copy.begin()); { + // note: need to drop lock, as callback may attempt to take it. qpid::sys::ScopedUnlock ul(incompleteRcvMsgsLock); ref->cancel(); } } + scheduledRcvMsgs->clear(); // no need to lock - shared with IO thread. } AMQP_ClientProxy& SessionState::getProxy() { @@ -264,21 +268,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) msg->getReceiveCompletion().begin(); semanticState.handle(msg); msgBuilder.end(); - if (msg->getReceiveCompletion().getPendingCompleters() == 1) { - // There are no other pending receive completers (just this SessionState). - // Mark the message as completed. - completeRcvMsg( msg ); - } else { - // There are outstanding receive completers. Save the message until - // they are all done. - QPID_LOG(debug, getId() << ": delaying completion of msg seq=" << msg->getCommandId()); - boost::shared_ptr pendingMsg(new IncompleteRcvMsg(*this, msg)); - { - qpid::sys::ScopedLock l(incompleteRcvMsgsLock); - incompleteRcvMsgs.push_back(pendingMsg); - } - msg->getReceiveCompletion().end( pendingMsg ); // allows others to complete - } + msg->getReceiveCompletion().end( createPendingMsg(msg) ); // allows msg to complete } // Handle producer session flow control @@ -453,40 +443,45 @@ void SessionState::addPendingExecutionSync() void SessionState::IncompleteRcvMsg::operator() (bool sync) { QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId() << " sync=" << sync); - boost::shared_ptr tmp; - { - qpid::sys::ScopedLock l(session->incompleteRcvMsgsLock); - for (std::list< boost::shared_ptr >::iterator i = session->incompleteRcvMsgs.begin(); - i != session->incompleteRcvMsgs.end(); ++i) { - if (i->get() == this) { - tmp.swap(*i); - session->incompleteRcvMsgs.remove(*i); - break; - } - } - } - if (session->isAttached()) { - if (sync) { - QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId()); - session->completeRcvMsg(msg); - } else { // potentially called from a different thread - QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId()); - session->getConnection().requestIOProcessing(boost::bind(&SessionState::IncompleteRcvMsg::scheduledCompleter, tmp)); + qpid::sys::ScopedLock l(session->incompleteRcvMsgsLock); + std::map::iterator i = session->incompleteRcvMsgs.find(this); + if (i != session->incompleteRcvMsgs.end()) { + boost::shared_ptr tmp(i->second); + session->incompleteRcvMsgs.erase(i); + + if (session->isAttached()) { + if (sync) { + qpid::sys::ScopedUnlock ul(session->incompleteRcvMsgsLock); + QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId()); + session->completeRcvMsg(msg); + return; + } else { // potentially called from a different thread + QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId()); + session->scheduledRcvMsgs->push_back(tmp); + if (session->scheduledRcvMsgs->size() == 1) { + session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter, + session->scheduledRcvMsgs)); + } + } } } } -/** Scheduled from IncompleteRcvMsg callback, completes the message receive - * asynchronously +/** Scheduled from IncompleteRcvMsg callback, completes all pending message + * receives asynchronously. */ -void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr iMsg) +void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr msgs) { - QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId()); - if (iMsg->session && iMsg->session->isAttached()) { - QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId()); - iMsg->session->completeRcvMsg(iMsg->msg); + while (!msgs->empty()) { + boost::shared_ptr iMsg = msgs->front(); + msgs->pop_front(); + QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId()); + if (iMsg->session && iMsg->session->isAttached()) { + QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId()); + iMsg->session->completeRcvMsg(iMsg->msg); + } } } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index b33181eee4..fc9fec7871 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -131,6 +131,9 @@ class SessionState : public qpid::SessionState, void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); + + // indicate that the given ingress msg has been completely received by the + // broker, and the msg's message.transfer command can be considered completed. void completeRcvMsg(boost::intrusive_ptr msg); void handleIn(framing::AMQFrame& frame); @@ -169,21 +172,38 @@ class SessionState : public qpid::SessionState, std::queue pendingExecutionSyncs; bool currentCommandComplete; + // A list of ingress messages whose message.transfer command is pending + // completion. These messages are awaiting some set of asynchronous + // operations to complete (eg: store, flow-control, etc). before + // the message.transfer can be completed. class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler { public: IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr _msg) : session(&_session), msg(_msg) {} - virtual void operator() (bool sync); + virtual void operator() (bool sync); // invoked when msg is completed. void cancel(); // cancel pending incomplete callback [operator() above]. + typedef boost::shared_ptr shared_ptr; + typedef std::deque deque; + private: SessionState *session; boost::intrusive_ptr msg; - static void scheduledCompleter( boost::shared_ptr incompleteMsg ); + + static void scheduledCompleter(boost::shared_ptr); }; - std::list< boost::shared_ptr > incompleteRcvMsgs; + std::map incompleteRcvMsgs; // msgs pending completion qpid::sys::Mutex incompleteRcvMsgsLock; + boost::shared_ptr createPendingMsg(boost::intrusive_ptr& msg) { + boost::shared_ptr pending(new IncompleteRcvMsg(*this, msg)); + qpid::sys::ScopedLock l(incompleteRcvMsgsLock); + incompleteRcvMsgs[pending.get()] = pending; + return pending; + } + + // holds msgs waiting for IO thread to run scheduledCompleter() + boost::shared_ptr scheduledRcvMsgs; friend class SessionManager; friend class IncompleteRcvMsg; -- cgit v1.2.1