diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 58 |
1 files changed, 40 insertions, 18 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 51da5bef25..b225ba3568 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -34,29 +34,31 @@ using namespace framing; Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) : cluster(c), self(myId), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId) + connection(&output, cluster.getBroker(), wrappedId), catchUp(), exCatchUp() {} Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, MemberId myId) + const std::string& wrappedId, MemberId myId, bool isCatchUp) : cluster(c), self(myId, this), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId) + connection(&output, cluster.getBroker(), wrappedId), + catchUp(isCatchUp), exCatchUp() {} Connection::~Connection() {} -bool Connection::doOutput() { return output.doOutput(); } +bool Connection::doOutput() { + return output.doOutput(); +} // Delivery of doOutput allows us to run the real connection doOutput() // which stocks up the write buffers with data. // void Connection::deliverDoOutput(uint32_t requested) { + assert(!catchUp); output.deliverDoOutput(requested); } -// Handle frames delivered from cluster. void Connection::received(framing::AMQFrame& f) { - QPID_LOG(trace, "DLVR [" << self << "]: " << f); // Handle connection controls, deliver other frames to connection. if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); @@ -64,16 +66,28 @@ void Connection::received(framing::AMQFrame& f) { void Connection::closed() { try { - // Called when the local network connection is closed. We still - // need to process any outstanding cluster frames for this - // connection to ensure our sessions are up-to-date. We defer - // closing the Connection object till deliverClosed(), but replace - // its output handler with a null handler since the network output - // handler will be deleted. - // - connection.setOutputHandler(&discardHandler); - cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); - ++mcastSeq; + // Local network connection has closed. We need to keep the + // connection around but replace the output handler with a + // no-op handler as the network output handler will be + // deleted. + + // FIXME aconway 2008-09-18: output handler reset in right place? + // connection.setOutputHandler(&discardHandler); + output.setOutputHandler(discardHandler); + if (catchUp) { + // This was a catch-up connection, may be promoted to a + // shadow connection. + catchUp = false; + exCatchUp = true; + cluster.insert(boost::intrusive_ptr<Connection>(this)); + } + else { + // This was a local replicated connection. Multicast a deliver closed + // and process any outstanding frames from the cluster until + // self-delivery of deliver-closed. + cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); + ++mcastSeq; + } } catch (const std::exception& e) { QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); @@ -81,17 +95,20 @@ void Connection::closed() { } void Connection::deliverClose () { + assert(!catchUp); connection.closed(); cluster.erase(self); } size_t Connection::decode(const char* buffer, size_t size) { + assert(!catchUp); ++mcastSeq; cluster.mcastBuffer(buffer, size, self); return size; } void Connection::deliverBuffer(Buffer& buf) { + assert(!catchUp); ++deliverSeq; while (decoder.decode(buf)) received(decoder.frame); @@ -108,10 +125,15 @@ void Connection::sessionState(const SequenceNumber& /*replayStart*/, // FIXME aconway 2008-09-10: TODO } -void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) -{ +void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) { // FIXME aconway 2008-09-10: TODO } +void Connection::dumpComplete() { + // FIXME aconway 2008-09-18: use or remove. +} + +bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; } + }} // namespace qpid::cluster |