summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-18 15:11:17 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-18 15:11:17 +0000
commit26aefed2e694000d408ed5c03ab8e70f2e92a249 (patch)
tree1414d86a56b3e1a2d8fa2460bdd709d9f44d2d5d
parente0f84a182936fb0c1e01db9c9339c864e02525b9 (diff)
downloadqpid-python-26aefed2e694000d408ed5c03ab8e70f2e92a249.tar.gz
QPID-2935: clean up race between session destructor and scheduled callback.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1072018 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp103
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h28
2 files changed, 89 insertions, 42 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 2e69102537..6a4db874d4 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -62,7 +62,7 @@ SessionState::SessionState(
msgBuilder(&broker.getStore()),
mgmtObject(0),
rateFlowcontrol(0),
- scheduledCmds(new std::list<SequenceNumber>)
+ scheduledCompleterContext(new ScheduledCompleterContext(this))
{
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
@@ -103,21 +103,24 @@ SessionState::~SessionState() {
flowControlTimer->cancel();
// clean up any outstanding incomplete commands
- qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
- std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds);
- incompleteCmds.clear();
- while (!copy.empty()) {
- boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
- copy.erase(copy.begin());
- {
- // note: need to drop lock, as callback may attempt to take it.
- qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
- ref->cancel();
+ {
+ qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds);
+ incompleteCmds.clear();
+ while (!copy.empty()) {
+ boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
+ copy.erase(copy.begin());
+ {
+ // note: need to drop lock, as callback may attempt to take it.
+ qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
+ ref->cancel();
+ }
}
}
+
// At this point, we are guaranteed no further completion callbacks will be
- // made.
- scheduledCmds->clear(); // keeps IO thread from running more completions.
+ // made. Cancel any outstanding scheduledCompleter calls...
+ scheduledCompleterContext->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -469,18 +472,18 @@ SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg)
*/
void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
{
- qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
if (!sync) {
- // note well: this path may execute in any thread.
+ /** note well: this path may execute in any thread. It is safe to access
+ * the session, as the SessionState destructor will cancel all outstanding
+ * callbacks before getting destroyed (so we'll never get here).
+ */
QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
- session->scheduledCmds->push_back(id);
- if (session->scheduledCmds->size() == 1) {
+ if (session->scheduledCompleterContext->scheduleCompletion(id))
session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
- session->scheduledCmds,
- session));
- }
+ session->scheduledCompleterContext));
} else { // command is being completed in IO thread.
// this path runs only on the IO thread.
+ qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
cmd = session->incompleteCmds.find(id);
if (cmd != session->incompleteCmds.end()) {
@@ -502,29 +505,57 @@ void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
* completed commands in the IO Thread. Guaranteed not to be running at the same
* time as the message receive code.
*/
-void SessionState::scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > completedCmds,
- SessionState *session)
+void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt)
{
- // when session is destroyed, it clears the list below. If the list is empty,
- // the passed session pointer is not valid - do nothing.
- if (completedCmds->empty()) return;
+ ctxt->completeCommands();
+}
- qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
- std::list<SequenceNumber> cmds(*completedCmds); // make copy so we can drop lock
- completedCmds->clear();
- while (!cmds.empty()) {
- SequenceNumber id = cmds.front();
- cmds.pop_front();
- std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+/** mark a command (sequence) as completed, return True if caller should
+ * schedule a call to completeCommands()
+ */
+bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
- cmd = session->incompleteCmds.find(id);
- if (cmd != session->incompleteCmds.end()) {
- qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
- cmd->second->do_completion(); // retakes lock
+ completedCmds.push_back(cmd);
+ return (completedCmds.size() == 1);
+}
+
+
+/** Cause the session to complete all completed commands */
+void SessionState::ScheduledCompleterContext::completeCommands()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+
+ // when session is destroyed, it clears the session pointer via cancel().
+ if (!session) return;
+
+ while (!completedCmds.empty()) {
+ SequenceNumber id = completedCmds.front();
+ completedCmds.pop_front();
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock);
+
+ cmd = session->incompleteCmds.find(id);
+ if (cmd !=session->incompleteCmds.end()) {
+ boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
+ {
+ qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock);
+ tmp->do_completion(); // retakes incompleteCmdslock
+ }
+ }
}
}
}
+/** cancel any pending calls to scheduleComplete */
+void SessionState::ScheduledCompleterContext::cancel()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+ session = 0;
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 568e8593fa..5e162e6475 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -226,13 +226,29 @@ class SessionState : public qpid::SessionState,
* flow-control, etc). before the command can be completed to the client
*/
std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds;
- // identifies those commands in incompleteCmds that are waiting for IO thread to run in order to be completed.
- boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds;
- qpid::sys::Mutex incompleteCmdsLock; // locks both above containers
+ qpid::sys::Mutex incompleteCmdsLock; // locks above container
- /** runs in IO thread, completes commands that where finished asynchronously. */
- static void scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds,
- SessionState *session);
+ /** This context is shared between the SessionState and scheduledCompleter,
+ * holds the sequence numbers of all commands that have completed asynchronously.
+ */
+ class ScheduledCompleterContext {
+ private:
+ std::list<SequenceNumber> completedCmds;
+ // ordering: take this lock first, then incompleteCmdsLock
+ qpid::sys::Mutex completedCmdsLock;
+ SessionState *session;
+ public:
+ ScheduledCompleterContext(SessionState *s) : session(s) {};
+ bool scheduleCompletion(SequenceNumber cmd);
+ void completeCommands();
+ void cancel();
+ };
+ boost::shared_ptr<ScheduledCompleterContext> scheduledCompleterContext;
+
+ /** The following method runs the in IO thread and completes commands that
+ * where finished asynchronously.
+ */
+ static void scheduledCompleter(boost::shared_ptr<ScheduledCompleterContext>);
friend class SessionManager;
};