summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-04-12 19:50:07 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-04-12 19:50:07 +0000
commit35f3c1f5fd60abe6088c365255d7e5e5e721b242 (patch)
tree17289c66010dfe7fcb42c963d548a30e4cd65ee6
parent20ea1aa2543fe81775c3a11f17d07c22ded44bc1 (diff)
downloadqpid-python-35f3c1f5fd60abe6088c365255d7e5e5e721b242.tar.gz
QPID-3197: prevent threads from scheduling async completions when session is detached.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.10@1091560 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp22
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h7
2 files changed, 26 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 18dbf63487..eca1883bd9 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -127,6 +127,7 @@ bool SessionState::isLocal(const ConnectionToken* t) const
void SessionState::detach() {
QPID_LOG(debug, getId() << ": detached on broker.");
+ asyncCommandCompleter->detached();
disableOutput();
handler = 0;
if (mgmtObject != 0)
@@ -147,6 +148,7 @@ void SessionState::attach(SessionHandler& h) {
mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
mgmtObject->set_channelId (h.getChannel());
}
+ asyncCommandCompleter->attached();
}
void SessionState::abort() {
@@ -486,7 +488,7 @@ void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber c
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- if (session) {
+ if (session && isAttached) {
MessageInfo msg(cmd, requiresAccept, requiresSync);
completedMsgs.push_back(msg);
if (completedMsgs.size() == 1) {
@@ -522,4 +524,22 @@ void SessionState::AsyncCommandCompleter::cancel()
session = 0;
}
+
+/** inform the completer that the session has attached,
+ * allows command completion scheduling from any thread */
+void SessionState::AsyncCommandCompleter::attached()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ isAttached = true;
+}
+
+
+/** inform the completer that the session has detached,
+ * disables command completion scheduling from any thread */
+void SessionState::AsyncCommandCompleter::detached()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ isAttached = false;
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 2250940102..e847b3fa04 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -187,6 +187,7 @@ class SessionState : public qpid::SessionState,
class AsyncCommandCompleter : public RefCounted {
private:
SessionState *session;
+ bool isAttached;
qpid::sys::Mutex completerLock;
// special-case message.transfer commands for optimization
@@ -205,8 +206,8 @@ class SessionState : public qpid::SessionState,
/** for scheduling a run of "completeCommands()" on the IO thread */
static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
- public:
- AsyncCommandCompleter(SessionState *s) : session(s) {};
+ public:
+ AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
~AsyncCommandCompleter() {};
/** schedule the completion of an ingress message.transfer command */
@@ -214,6 +215,8 @@ class SessionState : public qpid::SessionState,
bool requiresAccept,
bool requiresSync);
void cancel(); // called by SessionState destructor.
+ void attached(); // called by SessionState on attach()
+ void detached(); // called by SessionState on detach()
};
boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;