diff options
author | Alan Conway <aconway@apache.org> | 2008-07-17 00:03:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-07-17 00:03:50 +0000 |
commit | e65b0086a2924ff04640b1350393a816249d01b3 (patch) | |
tree | b372c5386cc44e3ad16c4ae585088ed038a629e4 /cpp/src/qpid/broker/Connection.cpp | |
parent | e596837411d54a16dd3cb1e5de717664496c2bd0 (diff) | |
download | qpid-python-e65b0086a2924ff04640b1350393a816249d01b3.tar.gz |
Cluster: shadow connections, fix lifecycle & valgrind issues.
- tests/ForkedBroker: improved broker forking, exec full qpidd.
- Plugin::addFinalizer - more flexible way to shutdown plugins.
- Reworked cluster extension points using boost::function.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@677471 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 23 |
1 files changed, 13 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 5e85d3c89c..e77911bd10 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -49,14 +49,14 @@ namespace broker { Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) : ConnectionState(out_, broker_), + receivedFn(boost::bind(&Connection::receivedImpl, this, _1)), + closedFn(boost::bind(&Connection::closedImpl, this)), adapter(*this, isLink_), isLink(isLink_), mgmtClosing(false), mgmtId(mgmtId_), mgmtObject(0), - links(broker_.getLinks()), - lastInHandler(*this), - inChain(lastInHandler) + links(broker_.getLinks()) { Manageable* parent = broker.GetVhostObject(); @@ -71,6 +71,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink); agent->addObject(mgmtObject); } + + Plugin::initializeAll(*this); // Let plug-ins update extension points. } void Connection::requestIOProcessing(boost::function0<void> callback) @@ -79,7 +81,6 @@ void Connection::requestIOProcessing(boost::function0<void> callback) out->activateOutput(); } - Connection::~Connection() { if (mgmtObject != 0) @@ -88,9 +89,9 @@ Connection::~Connection() links.notifyClosed(mgmtId); } -void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); } - -void Connection::receivedLast(framing::AMQFrame& frame){ +void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); } + +void Connection::receivedImpl(framing::AMQFrame& frame){ if (frame.getChannel() == 0 && frame.getMethod()) { adapter.handle(frame); } else { @@ -170,10 +171,13 @@ void Connection::idleOut(){} void Connection::idleIn(){} -void Connection::closed(){ // Physically closed, suspend open sessions. +void Connection::closed() { closedFn(); } + +void Connection::closedImpl(){ // Physically closed, suspend open sessions. try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); + // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10. while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -183,8 +187,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions. exclusiveQueues.erase(exclusiveQueues.begin()); } } catch(std::exception& e) { - QPID_LOG(error, " Unhandled exception while closing session: " << - e.what()); + QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); assert(0); } } |