summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp47
1 files changed, 28 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index c402415fab..22e1db2036 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -101,19 +101,18 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
if (isLocalClient()) {
// Local clients are announced to the cluster
// and initialized when the announce is received.
- QPID_LOG(info, "new client connection " << *this);
giveReadCredit(cluster.getSettings().readMax); // Flow control
init();
}
else {
// Catch-up shadow connections initialized using nextShadow id.
assert(catchUp);
- QPID_LOG(info, "new catch-up connection " << *this);
- connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
+ if (!updateIn.nextShadowMgmtId.empty())
+ connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
updateIn.nextShadowMgmtId.clear();
init();
}
-
+ QPID_LOG(info, "incoming connection " << *this);
}
void Connection::setSecureConnection(broker::SecureConnection* sc) {
@@ -123,8 +122,6 @@ void Connection::setSecureConnection(broker::SecureConnection* sc) {
void Connection::init() {
connection = connectionCtor.construct();
- QPID_LOG(debug, cluster << " initialized connection: " << *this
- << " ssf=" << connection->getExternalSecuritySettings().ssf);
if (isLocalClient()) {
if (secureConnection) connection->setSecureConnection(secureConnection);
// Actively send cluster-order frames from local node
@@ -171,7 +168,6 @@ void Connection::announce(
Connection::~Connection() {
if (connection.get()) connection->setErrorListener(0);
- QPID_LOG(debug, cluster << " deleted connection: " << *this);
}
bool Connection::doOutput() {
@@ -250,16 +246,15 @@ void Connection::deliveredFrame(const EventFrame& f) {
// A local connection is closed by the network layer.
void Connection::closed() {
try {
- if (catchUp) {
+ if (isUpdated()) {
+ QPID_LOG(debug, cluster << " update connection closed " << *this);
+ close();
+ }
+ else if (catchUp) {
QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
cluster.leave();
}
- else if (isUpdated()) {
- QPID_LOG(debug, cluster << " closed update connection " << *this);
- if (connection.get()) connection->closed();
- }
else if (isLocal()) {
- QPID_LOG(debug, cluster << " local close of replicated connection " << *this);
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
@@ -275,15 +270,20 @@ void Connection::closed() {
// Self-delivery of close message, close the connection.
void Connection::deliverClose () {
assert(!catchUp);
+ close();
+ cluster.erase(self);
+}
+
+// Close the connection
+void Connection::close() {
if (connection.get()) {
connection->closed();
// Ensure we delete the broker::Connection in the deliver thread.
connection.reset();
}
- cluster.erase(self);
}
-// The connection has been killed for misbehaving
+// The connection has been killed for misbehaving, called in connection thread.
void Connection::abort() {
if (connection.get()) {
connection->abort();
@@ -424,7 +424,7 @@ void Connection::shadowReady(
uint64_t memberId, uint64_t connectionId, const string& mgmtId,
const string& username, const string& fragment, uint32_t sendMax)
{
- QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId());
+ QPID_ASSERT(mgmtId == getBrokerConnection()->getMgmtId());
ConnectionId shadowId = ConnectionId(memberId, connectionId);
QPID_LOG(debug, cluster << " catch-up connection " << *this
<< " becomes shadow " << shadowId);
@@ -442,13 +442,19 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
updateIn.consumerNumbering.clear();
- self.second = 0; // Mark this as completed update connection.
+ closeUpdated();
}
void Connection::retractOffer() {
QPID_LOG(info, cluster << " incoming update retracted on connection " << *this);
cluster.updateInRetracted();
- self.second = 0; // Mark this as completed update connection.
+ closeUpdated();
+}
+
+void Connection::closeUpdated() {
+ self.second = 0; // Mark this as completed update connection.
+ if (connection.get())
+ connection->close(connection::CLOSE_CODE_NORMAL, "OK");
}
bool Connection::isLocal() const {
@@ -527,7 +533,10 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
if (c.isLocal()) type = "local";
else if (c.isShadow()) type = "shadow";
else if (c.isUpdated()) type = "updated";
- return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
+ const broker::Connection* bc = c.getBrokerConnection();
+ if (bc) o << bc->getMgmtId();
+ else o << "<disconnected>";
+ return o << "(" << c.getId() << " " << type << (c.isCatchUp() ? ",catchup":"") << ")";
}
void Connection::txStart() {