summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-07-17 00:03:50 +0000
committerAlan Conway <aconway@apache.org>2008-07-17 00:03:50 +0000
commite65b0086a2924ff04640b1350393a816249d01b3 (patch)
treeb372c5386cc44e3ad16c4ae585088ed038a629e4 /cpp/src/qpid/broker/Connection.cpp
parente596837411d54a16dd3cb1e5de717664496c2bd0 (diff)
downloadqpid-python-e65b0086a2924ff04640b1350393a816249d01b3.tar.gz
Cluster: shadow connections, fix lifecycle & valgrind issues.
- tests/ForkedBroker: improved broker forking, exec full qpidd. - Plugin::addFinalizer - more flexible way to shutdown plugins. - Reworked cluster extension points using boost::function. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@677471 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp23
1 files changed, 13 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 5e85d3c89c..e77911bd10 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -49,14 +49,14 @@ namespace broker {
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
ConnectionState(out_, broker_),
+ receivedFn(boost::bind(&Connection::receivedImpl, this, _1)),
+ closedFn(boost::bind(&Connection::closedImpl, this)),
adapter(*this, isLink_),
isLink(isLink_),
mgmtClosing(false),
mgmtId(mgmtId_),
mgmtObject(0),
- links(broker_.getLinks()),
- lastInHandler(*this),
- inChain(lastInHandler)
+ links(broker_.getLinks())
{
Manageable* parent = broker.GetVhostObject();
@@ -71,6 +71,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink);
agent->addObject(mgmtObject);
}
+
+ Plugin::initializeAll(*this); // Let plug-ins update extension points.
}
void Connection::requestIOProcessing(boost::function0<void> callback)
@@ -79,7 +81,6 @@ void Connection::requestIOProcessing(boost::function0<void> callback)
out->activateOutput();
}
-
Connection::~Connection()
{
if (mgmtObject != 0)
@@ -88,9 +89,9 @@ Connection::~Connection()
links.notifyClosed(mgmtId);
}
-void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); }
-
-void Connection::receivedLast(framing::AMQFrame& frame){
+void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); }
+
+void Connection::receivedImpl(framing::AMQFrame& frame){
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
@@ -170,10 +171,13 @@ void Connection::idleOut(){}
void Connection::idleIn(){}
-void Connection::closed(){ // Physically closed, suspend open sessions.
+void Connection::closed() { closedFn(); }
+
+void Connection::closedImpl(){ // Physically closed, suspend open sessions.
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
+ // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10.
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -183,8 +187,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
exclusiveQueues.erase(exclusiveQueues.begin());
}
} catch(std::exception& e) {
- QPID_LOG(error, " Unhandled exception while closing session: " <<
- e.what());
+ QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
assert(0);
}
}