diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-18 15:11:17 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-18 15:11:17 +0000 |
commit | 26aefed2e694000d408ed5c03ab8e70f2e92a249 (patch) | |
tree | 1414d86a56b3e1a2d8fa2460bdd709d9f44d2d5d | |
parent | e0f84a182936fb0c1e01db9c9339c864e02525b9 (diff) | |
download | qpid-python-26aefed2e694000d408ed5c03ab8e70f2e92a249.tar.gz |
QPID-2935: clean up race between session destructor and scheduled callback.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1072018 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 103 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 28 |
2 files changed, 89 insertions, 42 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 2e69102537..6a4db874d4 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -62,7 +62,7 @@ SessionState::SessionState( msgBuilder(&broker.getStore()), mgmtObject(0), rateFlowcontrol(0), - scheduledCmds(new std::list<SequenceNumber>) + scheduledCompleterContext(new ScheduledCompleterContext(this)) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -103,21 +103,24 @@ SessionState::~SessionState() { flowControlTimer->cancel(); // clean up any outstanding incomplete commands - qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); - std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds); - incompleteCmds.clear(); - while (!copy.empty()) { - boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second); - copy.erase(copy.begin()); - { - // note: need to drop lock, as callback may attempt to take it. - qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock); - ref->cancel(); + { + qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds); + incompleteCmds.clear(); + while (!copy.empty()) { + boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second); + copy.erase(copy.begin()); + { + // note: need to drop lock, as callback may attempt to take it. + qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock); + ref->cancel(); + } } } + // At this point, we are guaranteed no further completion callbacks will be - // made. - scheduledCmds->clear(); // keeps IO thread from running more completions. + // made. Cancel any outstanding scheduledCompleter calls... + scheduledCompleterContext->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -469,18 +472,18 @@ SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg) */ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { - qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); if (!sync) { - // note well: this path may execute in any thread. + /** note well: this path may execute in any thread. It is safe to access + * the session, as the SessionState destructor will cancel all outstanding + * callbacks before getting destroyed (so we'll never get here). + */ QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); - session->scheduledCmds->push_back(id); - if (session->scheduledCmds->size() == 1) { + if (session->scheduledCompleterContext->scheduleCompletion(id)) session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter, - session->scheduledCmds, - session)); - } + session->scheduledCompleterContext)); } else { // command is being completed in IO thread. // this path runs only on the IO thread. + qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; cmd = session->incompleteCmds.find(id); if (cmd != session->incompleteCmds.end()) { @@ -502,29 +505,57 @@ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) * completed commands in the IO Thread. Guaranteed not to be running at the same * time as the message receive code. */ -void SessionState::scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > completedCmds, - SessionState *session) +void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt) { - // when session is destroyed, it clears the list below. If the list is empty, - // the passed session pointer is not valid - do nothing. - if (completedCmds->empty()) return; + ctxt->completeCommands(); +} - qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); - std::list<SequenceNumber> cmds(*completedCmds); // make copy so we can drop lock - completedCmds->clear(); - while (!cmds.empty()) { - SequenceNumber id = cmds.front(); - cmds.pop_front(); - std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; +/** mark a command (sequence) as completed, return True if caller should + * schedule a call to completeCommands() + */ +bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); - cmd = session->incompleteCmds.find(id); - if (cmd != session->incompleteCmds.end()) { - qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock); - cmd->second->do_completion(); // retakes lock + completedCmds.push_back(cmd); + return (completedCmds.size() == 1); +} + + +/** Cause the session to complete all completed commands */ +void SessionState::ScheduledCompleterContext::completeCommands() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + + // when session is destroyed, it clears the session pointer via cancel(). + if (!session) return; + + while (!completedCmds.empty()) { + SequenceNumber id = completedCmds.front(); + completedCmds.pop_front(); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock); + + cmd = session->incompleteCmds.find(id); + if (cmd !=session->incompleteCmds.end()) { + boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); + { + qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock); + tmp->do_completion(); // retakes incompleteCmdslock + } + } } } } +/** cancel any pending calls to scheduleComplete */ +void SessionState::ScheduledCompleterContext::cancel() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + session = 0; +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 568e8593fa..5e162e6475 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -226,13 +226,29 @@ class SessionState : public qpid::SessionState, * flow-control, etc). before the command can be completed to the client */ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds; - // identifies those commands in incompleteCmds that are waiting for IO thread to run in order to be completed. - boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds; - qpid::sys::Mutex incompleteCmdsLock; // locks both above containers + qpid::sys::Mutex incompleteCmdsLock; // locks above container - /** runs in IO thread, completes commands that where finished asynchronously. */ - static void scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds, - SessionState *session); + /** This context is shared between the SessionState and scheduledCompleter, + * holds the sequence numbers of all commands that have completed asynchronously. + */ + class ScheduledCompleterContext { + private: + std::list<SequenceNumber> completedCmds; + // ordering: take this lock first, then incompleteCmdsLock + qpid::sys::Mutex completedCmdsLock; + SessionState *session; + public: + ScheduledCompleterContext(SessionState *s) : session(s) {}; + bool scheduleCompletion(SequenceNumber cmd); + void completeCommands(); + void cancel(); + }; + boost::shared_ptr<ScheduledCompleterContext> scheduledCompleterContext; + + /** The following method runs the in IO thread and completes commands that + * where finished asynchronously. + */ + static void scheduledCompleter(boost::shared_ptr<ScheduledCompleterContext>); friend class SessionManager; }; |