diff options
author | Alan Conway <aconway@apache.org> | 2012-02-13 23:50:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-13 23:50:18 +0000 |
commit | 057a0635e081848ba59964c4a8e0923e521b7fe6 (patch) | |
tree | cdad9578140d5dbdb2e90525e2ba9cc870ca50b6 /qpid/cpp/src/qpid/broker/Connection.cpp | |
parent | cf61dbf9b313f9bd69b392eae8fd8d27d4e609a2 (diff) | |
download | qpid-python-qpid-3603-5.tar.gz |
Merge branch 'qpid-3603-4-rebase' into qpid-3603-5qpid-3603-5
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-5@1243748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 38 |
1 files changed, 20 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 14e9abc0d1..1e6aab217c 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/Connection.h" +#include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/SessionOutputException.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/Bridge.h" @@ -103,8 +104,7 @@ Connection::Connection(ConnectionOutputHandler* out_, outboundTracker(*this) { outboundTracker.wrap(out); - if (link) - links.notifyConnection(mgmtId, this); + broker.getConnectionObservers().connection(*this); // In a cluster, allow adding the management object to be delayed. if (!delayManagement) addManagementObject(); if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); @@ -130,7 +130,7 @@ void Connection::requestIOProcessing(boost::function0<void> callback) { ScopedLock<Mutex> l(ioCallbackLock); ioCallbacks.push(callback); - out.activateOutput(); + if (isOpen()) out.activateOutput(); } Connection::~Connection() @@ -142,8 +142,7 @@ Connection::~Connection() if (!link && isClusterSafe()) agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId())); } - if (link) - links.notifyClosed(mgmtId); + broker.getConnectionObservers().closed(*this); if (heartbeatTimer) heartbeatTimer->cancel(); @@ -156,11 +155,16 @@ Connection::~Connection() void Connection::received(framing::AMQFrame& frame) { // Received frame on connection so delay timeout restartTimeout(); + bool wasOpen = isOpen(); adapter.handle(frame); if (link) //i.e. we are acting as the client to another broker recordFromServer(frame); else recordFromClient(frame); + if (!wasOpen && isOpen()) { + doIoCallbacks(); // Do any callbacks registered before we opened. + broker.getConnectionObservers().opened(*this); + } } void Connection::sent(const framing::AMQFrame& frame) @@ -260,8 +264,7 @@ string Connection::getAuthCredentials() void Connection::notifyConnectionForced(const string& text) { - if (link) - links.notifyConnectionForced(mgmtId, text); + broker.getConnectionObservers().forced(*this, text); } void Connection::setUserId(const string& userId) @@ -329,17 +332,16 @@ void Connection::closed(){ // Physically closed, suspend open sessions. } void Connection::doIoCallbacks() { - { - ScopedLock<Mutex> l(ioCallbackLock); - // Although IO callbacks execute in the connection thread context, they are - // not cluster safe because they are queued for execution in non-IO threads. - ClusterUnsafeScope cus; - while (!ioCallbacks.empty()) { - boost::function0<void> cb = ioCallbacks.front(); - ioCallbacks.pop(); - ScopedUnlock<Mutex> ul(ioCallbackLock); - cb(); // Lend the IO thread for management processing - } + if (!isOpen()) return; // Don't process IO callbacks until we are open. + ScopedLock<Mutex> l(ioCallbackLock); + // Although IO callbacks execute in the connection thread context, they are + // not cluster safe because they are queued for execution in non-IO threads. + ClusterUnsafeScope cus; + while (!ioCallbacks.empty()) { + boost::function0<void> cb = ioCallbacks.front(); + ioCallbacks.pop(); + ScopedUnlock<Mutex> ul(ioCallbackLock); + cb(); // Lend the IO thread for management processing } } |