summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp26
1 files changed, 14 insertions, 12 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 8451f35cb0..0e20719252 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -130,7 +130,7 @@ void Connection::requestIOProcessing(boost::function0<void> callback)
{
ScopedLock<Mutex> l(ioCallbackLock);
ioCallbacks.push(callback);
- out.activateOutput();
+ if (isOpen()) out.activateOutput();
}
Connection::~Connection()
@@ -156,11 +156,14 @@ Connection::~Connection()
void Connection::received(framing::AMQFrame& frame) {
// Received frame on connection so delay timeout
restartTimeout();
+ bool wasOpen = isOpen();
adapter.handle(frame);
if (isLink) //i.e. we are acting as the client to another broker
recordFromServer(frame);
else
recordFromClient(frame);
+ if (!wasOpen && isOpen())
+ doIoCallbacks(); // Do any callbacks registered before we opened.
}
void Connection::sent(const framing::AMQFrame& frame)
@@ -329,17 +332,16 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
}
void Connection::doIoCallbacks() {
- {
- ScopedLock<Mutex> l(ioCallbackLock);
- // Although IO callbacks execute in the connection thread context, they are
- // not cluster safe because they are queued for execution in non-IO threads.
- ClusterUnsafeScope cus;
- while (!ioCallbacks.empty()) {
- boost::function0<void> cb = ioCallbacks.front();
- ioCallbacks.pop();
- ScopedUnlock<Mutex> ul(ioCallbackLock);
- cb(); // Lend the IO thread for management processing
- }
+ if (!isOpen()) return; // Don't process IO callbacks until we are open.
+ ScopedLock<Mutex> l(ioCallbackLock);
+ // Although IO callbacks execute in the connection thread context, they are
+ // not cluster safe because they are queued for execution in non-IO threads.
+ ClusterUnsafeScope cus;
+ while (!ioCallbacks.empty()) {
+ boost::function0<void> cb = ioCallbacks.front();
+ ioCallbacks.pop();
+ ScopedUnlock<Mutex> ul(ioCallbackLock);
+ cb(); // Lend the IO thread for management processing
}
}