diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 42 |
1 files changed, 25 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index b225ba3568..a1ed5f34f5 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -31,18 +31,23 @@ namespace cluster { using namespace framing; +// Shadow connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) - : cluster(c), self(myId), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), catchUp(), exCatchUp() -{} + : cluster(c), self(myId), catchUp(false), output(*this, out), + connection(&output, cluster.getBroker(), wrappedId) +{ + QPID_LOG(debug, "New connection: " << *this); +} +// Local connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, MemberId myId, bool isCatchUp) - : cluster(c), self(myId, this), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), - catchUp(isCatchUp), exCatchUp() -{} + : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), + connection(&output, cluster.getBroker(), wrappedId) +{ + QPID_LOG(debug, "New connection: " << *this); +} Connection::~Connection() {} @@ -59,6 +64,7 @@ void Connection::deliverDoOutput(uint32_t requested) { } void Connection::received(framing::AMQFrame& f) { + QPID_LOG(trace, "EXEC [" << *this << "]: " << f); // Handle connection controls, deliver other frames to connection. if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); @@ -70,16 +76,10 @@ void Connection::closed() { // connection around but replace the output handler with a // no-op handler as the network output handler will be // deleted. - - // FIXME aconway 2008-09-18: output handler reset in right place? - // connection.setOutputHandler(&discardHandler); output.setOutputHandler(discardHandler); if (catchUp) { - // This was a catch-up connection, may be promoted to a - // shadow connection. catchUp = false; - exCatchUp = true; - cluster.insert(boost::intrusive_ptr<Connection>(this)); + cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this)); } else { // This was a local replicated connection. Multicast a deliver closed @@ -125,8 +125,11 @@ void Connection::sessionState(const SequenceNumber& /*replayStart*/, // FIXME aconway 2008-09-10: TODO } -void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) { - // FIXME aconway 2008-09-10: TODO +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { + ConnectionId shadow = ConnectionId(memberId, connectionId); + QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow); + self = shadow; + assert(isShadow()); } void Connection::dumpComplete() { @@ -134,6 +137,11 @@ void Connection::dumpComplete() { } bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; } - + +std::ostream& operator<<(std::ostream& o, const Connection& c) { + return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow") + << (c.isCatchUp() ? ",catchup" : "") << ")"; +} + }} // namespace qpid::cluster |