diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-07 22:51:45 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-07 22:51:45 +0000 |
commit | f31b86c7d7dc7e76a8008860d28049e609b4a7ab (patch) | |
tree | f736c33c134ea28459eac0d648177b4785960b14 | |
parent | c7ed111cf813eda770866d1b62fa4eb1f83c1f7c (diff) | |
download | qpid-python-f31b86c7d7dc7e76a8008860d28049e609b4a7ab.tar.gz |
QPID-2935: clean up SessionState changes, fix races
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1068199 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/AsyncCompletion.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 87 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 26 |
5 files changed, 73 insertions, 78 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h index 382e2f7565..c66609f8a6 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h +++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h @@ -92,9 +92,12 @@ namespace qpid { qpid::sys::Mutex::ScopedLock l(callbackLock); inCallback = true; if (handler) { - qpid::sys::Mutex::ScopedUnlock ul(callbackLock); - (*handler)(sync); - handler.reset(); + boost::shared_ptr<CompletionHandler> tmp; + tmp.swap(handler); + { + qpid::sys::Mutex::ScopedUnlock ul(callbackLock); + (*tmp)(sync); + } } inCallback = false; callbackLock.notifyAll(); @@ -142,7 +145,6 @@ namespace qpid { /** called by initiator after all potential completers have called * startCompleter(). */ - //void end(CompletionHandler::shared_ptr& _handler) void end(boost::shared_ptr<CompletionHandler> _handler) { assert(completionsNeeded.get() > 0); // ensure begin() has been called! diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index ad7e8f9bb3..7cd7b99557 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -116,7 +116,7 @@ bool QueueFlowLimit::consume(const QueuedMessage& msg) if (!msg.payload) return false; - sys::Mutex::ScopedLock l(pendingFlowLock); + sys::Mutex::ScopedLock l(indexLock); ++count; size += msg.payload->contentSize(); @@ -153,7 +153,7 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg) if (!msg.payload) return false; - sys::Mutex::ScopedLock l(pendingFlowLock); + sys::Mutex::ScopedLock l(indexLock); if (count > 0) { --count; @@ -190,15 +190,6 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg) if (itr != index.end()) { // this msg is flow controlled, release it: (*itr)->getReceiveCompletion().finishCompleter(); index.erase(itr); - //// stupid: (hopefully this is the next pending msg) - //std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(), - // pendingFlow.end(), - // msg.payload); - //if (itr2 == pendingFlow.end()) { - // QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position); - //} else { - // pendingFlow.erase(itr2); - //} } } } @@ -274,14 +265,3 @@ std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f) } } -/** - * TBD: - * - Is there a direct way to determine if QM is on pendingFlow list? - * - Rate limit the granting of flow. - * - What about LVQ? A newer msg may replace the older one. - * - What about queueing during a recovery? - * - What about queue purge? - * - What about message move? - * - How do we treat orphaned messages? - * -- Xfer a message to an alternate exchange - do we ack? - */ diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index 57d06c6bdc..2de214801f 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -86,9 +86,7 @@ class QueueFlowLimit protected: // msgs waiting for flow to become available. std::set< boost::intrusive_ptr<Message> > index; - // KAG: is this necessary? Not if we release all pending when level < low (?) - // std::list< boost::intrusive_ptr<Message> > pendingFlow; // ordered, oldest @front - qpid::sys::Mutex pendingFlowLock; + qpid::sys::Mutex indexLock; QueueFlowLimit(Queue *queue, uint32_t flowStopCount, uint32_t flowResumeCount, diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index ca98ee1437..d9af4b13c5 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -60,7 +60,8 @@ SessionState::SessionState( adapter(semanticState), msgBuilder(&broker.getStore()), mgmtObject(0), - rateFlowcontrol(0) + rateFlowcontrol(0), + scheduledRcvMsgs(new IncompleteRcvMsg::deque) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -95,16 +96,19 @@ SessionState::~SessionState() { flowControlTimer->cancel(); // clean up any outstanding incomplete receive messages - qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock); - while (!incompleteRcvMsgs.empty()) { - boost::shared_ptr<IncompleteRcvMsg> ref(incompleteRcvMsgs.front()); - incompleteRcvMsgs.pop_front(); + std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> copy(incompleteRcvMsgs); + incompleteRcvMsgs.clear(); + while (!copy.empty()) { + boost::shared_ptr<IncompleteRcvMsg> ref(copy.begin()->second); + copy.erase(copy.begin()); { + // note: need to drop lock, as callback may attempt to take it. qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock); ref->cancel(); } } + scheduledRcvMsgs->clear(); // no need to lock - shared with IO thread. } AMQP_ClientProxy& SessionState::getProxy() { @@ -264,21 +268,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) msg->getReceiveCompletion().begin(); semanticState.handle(msg); msgBuilder.end(); - if (msg->getReceiveCompletion().getPendingCompleters() == 1) { - // There are no other pending receive completers (just this SessionState). - // Mark the message as completed. - completeRcvMsg( msg ); - } else { - // There are outstanding receive completers. Save the message until - // they are all done. - QPID_LOG(debug, getId() << ": delaying completion of msg seq=" << msg->getCommandId()); - boost::shared_ptr<IncompleteRcvMsg> pendingMsg(new IncompleteRcvMsg(*this, msg)); - { - qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock); - incompleteRcvMsgs.push_back(pendingMsg); - } - msg->getReceiveCompletion().end( pendingMsg ); // allows others to complete - } + msg->getReceiveCompletion().end( createPendingMsg(msg) ); // allows msg to complete } // Handle producer session flow control @@ -453,40 +443,45 @@ void SessionState::addPendingExecutionSync() void SessionState::IncompleteRcvMsg::operator() (bool sync) { QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId() << " sync=" << sync); - boost::shared_ptr<IncompleteRcvMsg> tmp; - { - qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock); - for (std::list< boost::shared_ptr<IncompleteRcvMsg> >::iterator i = session->incompleteRcvMsgs.begin(); - i != session->incompleteRcvMsgs.end(); ++i) { - if (i->get() == this) { - tmp.swap(*i); - session->incompleteRcvMsgs.remove(*i); - break; - } - } - } - if (session->isAttached()) { - if (sync) { - QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId()); - session->completeRcvMsg(msg); - } else { // potentially called from a different thread - QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId()); - session->getConnection().requestIOProcessing(boost::bind(&SessionState::IncompleteRcvMsg::scheduledCompleter, tmp)); + qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock); + std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr>::iterator i = session->incompleteRcvMsgs.find(this); + if (i != session->incompleteRcvMsgs.end()) { + boost::shared_ptr<IncompleteRcvMsg> tmp(i->second); + session->incompleteRcvMsgs.erase(i); + + if (session->isAttached()) { + if (sync) { + qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteRcvMsgsLock); + QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId()); + session->completeRcvMsg(msg); + return; + } else { // potentially called from a different thread + QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId()); + session->scheduledRcvMsgs->push_back(tmp); + if (session->scheduledRcvMsgs->size() == 1) { + session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter, + session->scheduledRcvMsgs)); + } + } } } } -/** Scheduled from IncompleteRcvMsg callback, completes the message receive - * asynchronously +/** Scheduled from IncompleteRcvMsg callback, completes all pending message + * receives asynchronously. */ -void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<SessionState::IncompleteRcvMsg> iMsg) +void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<deque> msgs) { - QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId()); - if (iMsg->session && iMsg->session->isAttached()) { - QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId()); - iMsg->session->completeRcvMsg(iMsg->msg); + while (!msgs->empty()) { + boost::shared_ptr<IncompleteRcvMsg> iMsg = msgs->front(); + msgs->pop_front(); + QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId()); + if (iMsg->session && iMsg->session->isAttached()) { + QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId()); + iMsg->session->completeRcvMsg(iMsg->msg); + } } } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index b33181eee4..fc9fec7871 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -131,6 +131,9 @@ class SessionState : public qpid::SessionState, void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); + + // indicate that the given ingress msg has been completely received by the + // broker, and the msg's message.transfer command can be considered completed. void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg); void handleIn(framing::AMQFrame& frame); @@ -169,21 +172,38 @@ class SessionState : public qpid::SessionState, std::queue<SequenceNumber> pendingExecutionSyncs; bool currentCommandComplete; + // A list of ingress messages whose message.transfer command is pending + // completion. These messages are awaiting some set of asynchronous + // operations to complete (eg: store, flow-control, etc). before + // the message.transfer can be completed. class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler { public: IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message> _msg) : session(&_session), msg(_msg) {} - virtual void operator() (bool sync); + virtual void operator() (bool sync); // invoked when msg is completed. void cancel(); // cancel pending incomplete callback [operator() above]. + typedef boost::shared_ptr<IncompleteRcvMsg> shared_ptr; + typedef std::deque<shared_ptr> deque; + private: SessionState *session; boost::intrusive_ptr<Message> msg; - static void scheduledCompleter( boost::shared_ptr<IncompleteRcvMsg> incompleteMsg ); + + static void scheduledCompleter(boost::shared_ptr<deque>); }; - std::list< boost::shared_ptr<IncompleteRcvMsg> > incompleteRcvMsgs; + std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> incompleteRcvMsgs; // msgs pending completion qpid::sys::Mutex incompleteRcvMsgsLock; + boost::shared_ptr<IncompleteRcvMsg> createPendingMsg(boost::intrusive_ptr<Message>& msg) { + boost::shared_ptr<IncompleteRcvMsg> pending(new IncompleteRcvMsg(*this, msg)); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(incompleteRcvMsgsLock); + incompleteRcvMsgs[pending.get()] = pending; + return pending; + } + + // holds msgs waiting for IO thread to run scheduledCompleter() + boost::shared_ptr<IncompleteRcvMsg::deque> scheduledRcvMsgs; friend class SessionManager; friend class IncompleteRcvMsg; |