summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp58
1 files changed, 40 insertions, 18 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 51da5bef25..b225ba3568 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -34,29 +34,31 @@ using namespace framing;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId)
+ connection(&output, cluster.getBroker(), wrappedId), catchUp(), exCatchUp()
{}
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, MemberId myId)
+ const std::string& wrappedId, MemberId myId, bool isCatchUp)
: cluster(c), self(myId, this), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId)
+ connection(&output, cluster.getBroker(), wrappedId),
+ catchUp(isCatchUp), exCatchUp()
{}
Connection::~Connection() {}
-bool Connection::doOutput() { return output.doOutput(); }
+bool Connection::doOutput() {
+ return output.doOutput();
+}
// Delivery of doOutput allows us to run the real connection doOutput()
// which stocks up the write buffers with data.
//
void Connection::deliverDoOutput(uint32_t requested) {
+ assert(!catchUp);
output.deliverDoOutput(requested);
}
-// Handle frames delivered from cluster.
void Connection::received(framing::AMQFrame& f) {
- QPID_LOG(trace, "DLVR [" << self << "]: " << f);
// Handle connection controls, deliver other frames to connection.
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
@@ -64,16 +66,28 @@ void Connection::received(framing::AMQFrame& f) {
void Connection::closed() {
try {
- // Called when the local network connection is closed. We still
- // need to process any outstanding cluster frames for this
- // connection to ensure our sessions are up-to-date. We defer
- // closing the Connection object till deliverClosed(), but replace
- // its output handler with a null handler since the network output
- // handler will be deleted.
- //
- connection.setOutputHandler(&discardHandler);
- cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
- ++mcastSeq;
+ // Local network connection has closed. We need to keep the
+ // connection around but replace the output handler with a
+ // no-op handler as the network output handler will be
+ // deleted.
+
+ // FIXME aconway 2008-09-18: output handler reset in right place?
+ // connection.setOutputHandler(&discardHandler);
+ output.setOutputHandler(discardHandler);
+ if (catchUp) {
+ // This was a catch-up connection, may be promoted to a
+ // shadow connection.
+ catchUp = false;
+ exCatchUp = true;
+ cluster.insert(boost::intrusive_ptr<Connection>(this));
+ }
+ else {
+ // This was a local replicated connection. Multicast a deliver closed
+ // and process any outstanding frames from the cluster until
+ // self-delivery of deliver-closed.
+ cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
+ ++mcastSeq;
+ }
}
catch (const std::exception& e) {
QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
@@ -81,17 +95,20 @@ void Connection::closed() {
}
void Connection::deliverClose () {
+ assert(!catchUp);
connection.closed();
cluster.erase(self);
}
size_t Connection::decode(const char* buffer, size_t size) {
+ assert(!catchUp);
++mcastSeq;
cluster.mcastBuffer(buffer, size, self);
return size;
}
void Connection::deliverBuffer(Buffer& buf) {
+ assert(!catchUp);
++deliverSeq;
while (decoder.decode(buf))
received(decoder.frame);
@@ -108,10 +125,15 @@ void Connection::sessionState(const SequenceNumber& /*replayStart*/,
// FIXME aconway 2008-09-10: TODO
}
-void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/)
-{
+void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) {
// FIXME aconway 2008-09-10: TODO
}
+void Connection::dumpComplete() {
+ // FIXME aconway 2008-09-18: use or remove.
+}
+
+bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; }
+
}} // namespace qpid::cluster