summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-21 05:04:04 +0000
committerAlan Conway <aconway@apache.org>2008-09-21 05:04:04 +0000
commit558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05 (patch)
tree9b306597ee07b264fa18580546ed5645f0c3766d /cpp/src/qpid/cluster/Connection.cpp
parent7c70d21ca2d788d4432cfa89851c9b928c9f30aa (diff)
downloadqpid-python-558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05.tar.gz
DumpClient send connections & session IDs to new members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697446 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp42
1 files changed, 25 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index b225ba3568..a1ed5f34f5 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -31,18 +31,23 @@ namespace cluster {
using namespace framing;
+// Shadow connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
- : cluster(c), self(myId), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), catchUp(), exCatchUp()
-{}
+ : cluster(c), self(myId), catchUp(false), output(*this, out),
+ connection(&output, cluster.getBroker(), wrappedId)
+{
+ QPID_LOG(debug, "New connection: " << *this);
+}
+// Local connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp)
- : cluster(c), self(myId, this), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId),
- catchUp(isCatchUp), exCatchUp()
-{}
+ : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
+ connection(&output, cluster.getBroker(), wrappedId)
+{
+ QPID_LOG(debug, "New connection: " << *this);
+}
Connection::~Connection() {}
@@ -59,6 +64,7 @@ void Connection::deliverDoOutput(uint32_t requested) {
}
void Connection::received(framing::AMQFrame& f) {
+ QPID_LOG(trace, "EXEC [" << *this << "]: " << f);
// Handle connection controls, deliver other frames to connection.
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
@@ -70,16 +76,10 @@ void Connection::closed() {
// connection around but replace the output handler with a
// no-op handler as the network output handler will be
// deleted.
-
- // FIXME aconway 2008-09-18: output handler reset in right place?
- // connection.setOutputHandler(&discardHandler);
output.setOutputHandler(discardHandler);
if (catchUp) {
- // This was a catch-up connection, may be promoted to a
- // shadow connection.
catchUp = false;
- exCatchUp = true;
- cluster.insert(boost::intrusive_ptr<Connection>(this));
+ cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
}
else {
// This was a local replicated connection. Multicast a deliver closed
@@ -125,8 +125,11 @@ void Connection::sessionState(const SequenceNumber& /*replayStart*/,
// FIXME aconway 2008-09-10: TODO
}
-void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) {
- // FIXME aconway 2008-09-10: TODO
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
+ ConnectionId shadow = ConnectionId(memberId, connectionId);
+ QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow);
+ self = shadow;
+ assert(isShadow());
}
void Connection::dumpComplete() {
@@ -134,6 +137,11 @@ void Connection::dumpComplete() {
}
bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; }
-
+
+std::ostream& operator<<(std::ostream& o, const Connection& c) {
+ return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow")
+ << (c.isCatchUp() ? ",catchup" : "") << ")";
+}
+
}} // namespace qpid::cluster