diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 26 |
1 files changed, 14 insertions, 12 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 8451f35cb0..0e20719252 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -130,7 +130,7 @@ void Connection::requestIOProcessing(boost::function0<void> callback) { ScopedLock<Mutex> l(ioCallbackLock); ioCallbacks.push(callback); - out.activateOutput(); + if (isOpen()) out.activateOutput(); } Connection::~Connection() @@ -156,11 +156,14 @@ Connection::~Connection() void Connection::received(framing::AMQFrame& frame) { // Received frame on connection so delay timeout restartTimeout(); + bool wasOpen = isOpen(); adapter.handle(frame); if (isLink) //i.e. we are acting as the client to another broker recordFromServer(frame); else recordFromClient(frame); + if (!wasOpen && isOpen()) + doIoCallbacks(); // Do any callbacks registered before we opened. } void Connection::sent(const framing::AMQFrame& frame) @@ -329,17 +332,16 @@ void Connection::closed(){ // Physically closed, suspend open sessions. } void Connection::doIoCallbacks() { - { - ScopedLock<Mutex> l(ioCallbackLock); - // Although IO callbacks execute in the connection thread context, they are - // not cluster safe because they are queued for execution in non-IO threads. - ClusterUnsafeScope cus; - while (!ioCallbacks.empty()) { - boost::function0<void> cb = ioCallbacks.front(); - ioCallbacks.pop(); - ScopedUnlock<Mutex> ul(ioCallbackLock); - cb(); // Lend the IO thread for management processing - } + if (!isOpen()) return; // Don't process IO callbacks until we are open. + ScopedLock<Mutex> l(ioCallbackLock); + // Although IO callbacks execute in the connection thread context, they are + // not cluster safe because they are queued for execution in non-IO threads. + ClusterUnsafeScope cus; + while (!ioCallbacks.empty()) { + boost::function0<void> cb = ioCallbacks.front(); + ioCallbacks.pop(); + ScopedUnlock<Mutex> ul(ioCallbackLock); + cb(); // Lend the IO thread for management processing } } |