diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 47 |
1 files changed, 28 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index c402415fab..22e1db2036 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -101,19 +101,18 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, if (isLocalClient()) { // Local clients are announced to the cluster // and initialized when the announce is received. - QPID_LOG(info, "new client connection " << *this); giveReadCredit(cluster.getSettings().readMax); // Flow control init(); } else { // Catch-up shadow connections initialized using nextShadow id. assert(catchUp); - QPID_LOG(info, "new catch-up connection " << *this); - connectionCtor.mgmtId = updateIn.nextShadowMgmtId; + if (!updateIn.nextShadowMgmtId.empty()) + connectionCtor.mgmtId = updateIn.nextShadowMgmtId; updateIn.nextShadowMgmtId.clear(); init(); } - + QPID_LOG(info, "incoming connection " << *this); } void Connection::setSecureConnection(broker::SecureConnection* sc) { @@ -123,8 +122,6 @@ void Connection::setSecureConnection(broker::SecureConnection* sc) { void Connection::init() { connection = connectionCtor.construct(); - QPID_LOG(debug, cluster << " initialized connection: " << *this - << " ssf=" << connection->getExternalSecuritySettings().ssf); if (isLocalClient()) { if (secureConnection) connection->setSecureConnection(secureConnection); // Actively send cluster-order frames from local node @@ -171,7 +168,6 @@ void Connection::announce( Connection::~Connection() { if (connection.get()) connection->setErrorListener(0); - QPID_LOG(debug, cluster << " deleted connection: " << *this); } bool Connection::doOutput() { @@ -250,16 +246,15 @@ void Connection::deliveredFrame(const EventFrame& f) { // A local connection is closed by the network layer. void Connection::closed() { try { - if (catchUp) { + if (isUpdated()) { + QPID_LOG(debug, cluster << " update connection closed " << *this); + close(); + } + else if (catchUp) { QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); cluster.leave(); } - else if (isUpdated()) { - QPID_LOG(debug, cluster << " closed update connection " << *this); - if (connection.get()) connection->closed(); - } else if (isLocal()) { - QPID_LOG(debug, cluster << " local close of replicated connection " << *this); // This was a local replicated connection. Multicast a deliver // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. @@ -275,15 +270,20 @@ void Connection::closed() { // Self-delivery of close message, close the connection. void Connection::deliverClose () { assert(!catchUp); + close(); + cluster.erase(self); +} + +// Close the connection +void Connection::close() { if (connection.get()) { connection->closed(); // Ensure we delete the broker::Connection in the deliver thread. connection.reset(); } - cluster.erase(self); } -// The connection has been killed for misbehaving +// The connection has been killed for misbehaving, called in connection thread. void Connection::abort() { if (connection.get()) { connection->abort(); @@ -424,7 +424,7 @@ void Connection::shadowReady( uint64_t memberId, uint64_t connectionId, const string& mgmtId, const string& username, const string& fragment, uint32_t sendMax) { - QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId()); + QPID_ASSERT(mgmtId == getBrokerConnection()->getMgmtId()); ConnectionId shadowId = ConnectionId(memberId, connectionId); QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); @@ -442,13 +442,19 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); updateIn.consumerNumbering.clear(); - self.second = 0; // Mark this as completed update connection. + closeUpdated(); } void Connection::retractOffer() { QPID_LOG(info, cluster << " incoming update retracted on connection " << *this); cluster.updateInRetracted(); - self.second = 0; // Mark this as completed update connection. + closeUpdated(); +} + +void Connection::closeUpdated() { + self.second = 0; // Mark this as completed update connection. + if (connection.get()) + connection->close(connection::CLOSE_CODE_NORMAL, "OK"); } bool Connection::isLocal() const { @@ -527,7 +533,10 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { if (c.isLocal()) type = "local"; else if (c.isShadow()) type = "shadow"; else if (c.isUpdated()) type = "updated"; - return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; + const broker::Connection* bc = c.getBrokerConnection(); + if (bc) o << bc->getMgmtId(); + else o << "<disconnected>"; + return o << "(" << c.getId() << " " << type << (c.isCatchUp() ? ",catchup":"") << ")"; } void Connection::txStart() { |