summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-10-31 18:47:28 +0000
committerGordon Sim <gsim@apache.org>2007-10-31 18:47:28 +0000
commit7990138bb3eb014c85bfb806c91e23def530ef37 (patch)
tree50179d4837d87f9e7f079ca44088c383561c9a97 /cpp/src
parentff6f9f47dbbaaf84dde2a86dd8f854fcf3060378 (diff)
downloadqpid-python-7990138bb3eb014c85bfb806c91e23def530ef37.tar.gz
Simple fix to prevent concurrent disconnection and sending of frames causing seg faults.
A more complete solution may follow. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@590786 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h3
-rw-r--r--cpp/src/qpid/sys/Mutex.h12
-rw-r--r--cpp/src/qpid/sys/Semaphore.h67
4 files changed, 92 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 1d0e67f6ab..0dafcba7bd 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -81,12 +81,14 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
void SessionHandler::handleOut(AMQFrame& f) {
- if (!session.get())
- throw InternalErrorException(
- QPID_MSG("attempt to send frame on detached channel."));
- channel.handle(f); // Send it.
- if (session->sent(f))
- peerSession.solicitAck();
+ ConditionalScopedLock<Semaphore> s(suspension);
+ if (s.lockAcquired() && session.get() && session->isAttached()) {
+ channel.handle(f); // Send it.
+ if (session->sent(f))
+ peerSession.solicitAck();
+ } else {
+ QPID_LOG(warning, "Dropping frame as session is no longer attached to a channel: " << f);
+ }
}
void SessionHandler::assertAttached(const char* method) const {
@@ -150,7 +152,8 @@ void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
}
void SessionHandler::localSuspend() {
- if (session.get()) {
+ ScopedLock<Semaphore> s(suspension);
+ if (session.get() && session->isAttached()) {
session->detach();
connection.broker.getSessionManager().suspend(session);
}
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 52f64779d4..800b886bbf 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -27,6 +27,8 @@
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelHandler.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Semaphore.h"
#include <boost/noncopyable.hpp>
@@ -94,6 +96,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
bool ignoring;
bool resuming;
std::auto_ptr<SessionState> session;
+ sys::Semaphore suspension;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/sys/Mutex.h b/cpp/src/qpid/sys/Mutex.h
index b8545d4449..b4bd3a9b4a 100644
--- a/cpp/src/qpid/sys/Mutex.h
+++ b/cpp/src/qpid/sys/Mutex.h
@@ -66,6 +66,18 @@ class ScopedWlock
L& mutex;
};
+template <class L>
+class ConditionalScopedLock
+{
+ public:
+ ConditionalScopedLock(L& l) : mutex(l) { acquired = l.trylock(); }
+ ~ConditionalScopedLock() { if (acquired) mutex.unlock(); }
+ bool lockAcquired() { return acquired; }
+ private:
+ L& mutex;
+ bool acquired;
+};
+
}}
#ifdef USE_APR_PLATFORM
diff --git a/cpp/src/qpid/sys/Semaphore.h b/cpp/src/qpid/sys/Semaphore.h
new file mode 100644
index 0000000000..3efb7ce2df
--- /dev/null
+++ b/cpp/src/qpid/sys/Semaphore.h
@@ -0,0 +1,67 @@
+#ifndef _sys_Semaphore_h
+#define _sys_Semaphore_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Monitor.h"
+
+namespace qpid {
+namespace sys {
+
+class Semaphore
+{
+public:
+ Semaphore(uint c = 1) : count(c) {}
+
+ void lock() { acquire(); }
+ void unlock() { release(); }
+ bool trylock() { return tryAcquire(); }
+
+ bool tryAcquire()
+ {
+ Monitor::ScopedLock l(monitor);
+ if (count) {
+ count--;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void acquire()
+ {
+ Monitor::ScopedLock l(monitor);
+ while (count == 0) monitor.wait();
+ count--;
+ }
+
+ void release()
+ {
+ Monitor::ScopedLock l(monitor);
+ if (!count++) monitor.notifyAll();
+ }
+
+private:
+ Monitor monitor;
+ uint count;
+};
+
+}}
+
+#endif /*!_sys_Semaphore_h*/