summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp38
1 files changed, 20 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 14e9abc0d1..1e6aab217c 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/Connection.h"
+#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Bridge.h"
@@ -103,8 +104,7 @@ Connection::Connection(ConnectionOutputHandler* out_,
outboundTracker(*this)
{
outboundTracker.wrap(out);
- if (link)
- links.notifyConnection(mgmtId, this);
+ broker.getConnectionObservers().connection(*this);
// In a cluster, allow adding the management object to be delayed.
if (!delayManagement) addManagementObject();
if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
@@ -130,7 +130,7 @@ void Connection::requestIOProcessing(boost::function0<void> callback)
{
ScopedLock<Mutex> l(ioCallbackLock);
ioCallbacks.push(callback);
- out.activateOutput();
+ if (isOpen()) out.activateOutput();
}
Connection::~Connection()
@@ -142,8 +142,7 @@ Connection::~Connection()
if (!link && isClusterSafe())
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
}
- if (link)
- links.notifyClosed(mgmtId);
+ broker.getConnectionObservers().closed(*this);
if (heartbeatTimer)
heartbeatTimer->cancel();
@@ -156,11 +155,16 @@ Connection::~Connection()
void Connection::received(framing::AMQFrame& frame) {
// Received frame on connection so delay timeout
restartTimeout();
+ bool wasOpen = isOpen();
adapter.handle(frame);
if (link) //i.e. we are acting as the client to another broker
recordFromServer(frame);
else
recordFromClient(frame);
+ if (!wasOpen && isOpen()) {
+ doIoCallbacks(); // Do any callbacks registered before we opened.
+ broker.getConnectionObservers().opened(*this);
+ }
}
void Connection::sent(const framing::AMQFrame& frame)
@@ -260,8 +264,7 @@ string Connection::getAuthCredentials()
void Connection::notifyConnectionForced(const string& text)
{
- if (link)
- links.notifyConnectionForced(mgmtId, text);
+ broker.getConnectionObservers().forced(*this, text);
}
void Connection::setUserId(const string& userId)
@@ -329,17 +332,16 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
}
void Connection::doIoCallbacks() {
- {
- ScopedLock<Mutex> l(ioCallbackLock);
- // Although IO callbacks execute in the connection thread context, they are
- // not cluster safe because they are queued for execution in non-IO threads.
- ClusterUnsafeScope cus;
- while (!ioCallbacks.empty()) {
- boost::function0<void> cb = ioCallbacks.front();
- ioCallbacks.pop();
- ScopedUnlock<Mutex> ul(ioCallbackLock);
- cb(); // Lend the IO thread for management processing
- }
+ if (!isOpen()) return; // Don't process IO callbacks until we are open.
+ ScopedLock<Mutex> l(ioCallbackLock);
+ // Although IO callbacks execute in the connection thread context, they are
+ // not cluster safe because they are queued for execution in non-IO threads.
+ ClusterUnsafeScope cus;
+ while (!ioCallbacks.empty()) {
+ boost::function0<void> cb = ioCallbacks.front();
+ ioCallbacks.pop();
+ ScopedUnlock<Mutex> ul(ioCallbackLock);
+ cb(); // Lend the IO thread for management processing
}
}