diff options
author | Alan Conway <aconway@apache.org> | 2008-09-21 05:04:04 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-21 05:04:04 +0000 |
commit | 558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05 (patch) | |
tree | 9b306597ee07b264fa18580546ed5645f0c3766d /cpp/src/qpid/cluster/Connection.cpp | |
parent | 7c70d21ca2d788d4432cfa89851c9b928c9f30aa (diff) | |
download | qpid-python-558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05.tar.gz |
DumpClient send connections & session IDs to new members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697446 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 42 |
1 files changed, 25 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index b225ba3568..a1ed5f34f5 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -31,18 +31,23 @@ namespace cluster { using namespace framing; +// Shadow connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) - : cluster(c), self(myId), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), catchUp(), exCatchUp() -{} + : cluster(c), self(myId), catchUp(false), output(*this, out), + connection(&output, cluster.getBroker(), wrappedId) +{ + QPID_LOG(debug, "New connection: " << *this); +} +// Local connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, MemberId myId, bool isCatchUp) - : cluster(c), self(myId, this), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), - catchUp(isCatchUp), exCatchUp() -{} + : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), + connection(&output, cluster.getBroker(), wrappedId) +{ + QPID_LOG(debug, "New connection: " << *this); +} Connection::~Connection() {} @@ -59,6 +64,7 @@ void Connection::deliverDoOutput(uint32_t requested) { } void Connection::received(framing::AMQFrame& f) { + QPID_LOG(trace, "EXEC [" << *this << "]: " << f); // Handle connection controls, deliver other frames to connection. if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); @@ -70,16 +76,10 @@ void Connection::closed() { // connection around but replace the output handler with a // no-op handler as the network output handler will be // deleted. - - // FIXME aconway 2008-09-18: output handler reset in right place? - // connection.setOutputHandler(&discardHandler); output.setOutputHandler(discardHandler); if (catchUp) { - // This was a catch-up connection, may be promoted to a - // shadow connection. catchUp = false; - exCatchUp = true; - cluster.insert(boost::intrusive_ptr<Connection>(this)); + cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this)); } else { // This was a local replicated connection. Multicast a deliver closed @@ -125,8 +125,11 @@ void Connection::sessionState(const SequenceNumber& /*replayStart*/, // FIXME aconway 2008-09-10: TODO } -void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) { - // FIXME aconway 2008-09-10: TODO +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { + ConnectionId shadow = ConnectionId(memberId, connectionId); + QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow); + self = shadow; + assert(isShadow()); } void Connection::dumpComplete() { @@ -134,6 +137,11 @@ void Connection::dumpComplete() { } bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; } - + +std::ostream& operator<<(std::ostream& o, const Connection& c) { + return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow") + << (c.isCatchUp() ? ",catchup" : "") << ")"; +} + }} // namespace qpid::cluster |