summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-26 21:49:52 +0000
committerAlan Conway <aconway@apache.org>2008-09-26 21:49:52 +0000
commit9809badd6486af7697767b80f123885eb1892e4a (patch)
tree07de43c0e2dd873743b44159e6fd4b8a1b6008db /qpid/cpp
parent4ae784ffe9c552cf4f113b340c502860cfa3c391 (diff)
downloadqpid-python-9809badd6486af7697767b80f123885eb1892e4a.tar.gz
Clean up end-of-dump protocol for new cluster members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@699513 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp7
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h10
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterHandler.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.cpp9
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp30
-rw-r--r--qpid/cpp/src/qpid/cluster/DumpClient.cpp15
-rw-r--r--qpid/cpp/src/qpid/cluster/JoiningHandler.cpp22
-rw-r--r--qpid/cpp/src/qpid/cluster/JoiningHandler.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/MemberHandler.cpp14
-rw-r--r--qpid/cpp/src/qpid/cluster/MemberHandler.h3
10 files changed, 38 insertions, 79 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 9cd8d1842c..19f4318a9c 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -91,9 +91,12 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
Cluster::~Cluster() {}
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { handler->insert(c); }
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(lock);
+ connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+}
-void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { handler->catchUpClosed(c); }
+void Cluster::dumpComplete() { handler->dumpComplete(); }
void Cluster::erase(ConnectionId id) {
Mutex::ScopedLock l(lock);
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index eaa91202dc..0b56540f9a 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -62,10 +62,10 @@ class Cluster : private Cpg::Handler, public management::Manageable
virtual ~Cluster();
- void insert(const boost::intrusive_ptr<Connection>&); // Insert a local connection
- void erase(ConnectionId); // Erase a connection.
-
- void catchUpClosed(const boost::intrusive_ptr<Connection>&); // Insert a local connection
+ // FIXME aconway 2008-09-26: thread safety
+ void insert(const boost::intrusive_ptr<Connection>&);
+ void erase(ConnectionId);
+ void dumpComplete();
/** Get the URLs of current cluster members. */
std::vector<Url> getUrls() const;
@@ -94,8 +94,6 @@ class Cluster : private Cpg::Handler, public management::Manageable
broker::Broker& getBroker();
- void setDumpComplete();
-
template <class F> void eachConnection(const F& f) {
for (ConnectionMap::const_iterator i = connections.begin(); i != connections.end(); ++i)
f(i->second);
diff --git a/qpid/cpp/src/qpid/cluster/ClusterHandler.h b/qpid/cpp/src/qpid/cluster/ClusterHandler.h
index 37b0ed4d79..d8bcaa8fe8 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterHandler.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterHandler.h
@@ -59,8 +59,7 @@ class ClusterHandler
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined) = 0;
- virtual void insert(const boost::intrusive_ptr<Connection>& c) = 0;
- virtual void catchUpClosed(const boost::intrusive_ptr<Connection>& c) = 0;
+ virtual void dumpComplete() = 0;
protected:
Cluster& cluster;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
index c721749aba..b5b71cd397 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -45,7 +45,7 @@ bool ClusterMap::left(const cpg_address* addrs, size_t nLeft) {
changed = members.erase(*a) || changed;
if (dumper && !isMember(dumper))
dumper = MemberId();
- QPID_LOG_IF(debug, changed, *this);
+ QPID_LOG_IF(debug, changed, "Members left. " << *this);
return changed;
}
@@ -66,7 +66,7 @@ bool ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_)
Url url(i->second->get<std::string>());
changed = members.insert(Members::value_type(id, url)).second || changed;
}
- QPID_LOG_IF(debug, changed, *this);
+ QPID_LOG_IF(debug, changed, "Update: " << *this);
return changed;
}
@@ -94,12 +94,11 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) {
bool changed = members.insert(Members::value_type(id,url)).second;
if (id == dumper) {
dumper = MemberId();
- QPID_LOG(info, id << " finished dump.");
+ QPID_LOG(info, id << " finished dump. " << *this);
}
else {
- QPID_LOG(info, id << " joined cluster, url=" << url);
+ QPID_LOG(info, id << " joined, url=" << url << ". " << *this);
}
- QPID_LOG_IF(debug, changed, *this);
return changed;
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 21eec0f86e..6aab31c177 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -76,10 +76,13 @@ void Connection::deliverDoOutput(uint32_t requested) {
void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, "RECV " << *this << ": " << f);
if (isShadow()) {
- // Final close that completes catch-up for shadow connection.
+ // Intercept the close that completes catch-up for shadow a connection.
if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
+ catchUp = false;
AMQFrame ok(in_place<ConnectionCloseOkBody>());
+ cluster.insert(boost::intrusive_ptr<Connection>(this));
connection.getOutput().send(ok);
+ output.setOutputHandler(discardHandler);
}
else
QPID_LOG(warning, *this << " ignoring unexpected frame: " << f);
@@ -104,27 +107,13 @@ void Connection::delivered(framing::AMQFrame& f) {
void Connection::closed() {
try {
QPID_LOG(debug, "Connection closed " << *this);
-
- if (catchUp) {
- cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
- if (isShadow())
- catchUp = false;
- else {
- connection.closed();
- return;
- }
- }
-
- // Local network connection has closed. We need to keep the
- // connection around but replace the output handler with a
- // no-op handler as the network output handler will be
- // deleted.
- output.setOutputHandler(discardHandler);
-
- if (isLocal() && !catchUp) {
+ if (catchUp)
+ connection.closed();
+ else if (isLocal()) {
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
+ output.setOutputHandler(discardHandler);
cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
++mcastSeq;
}
@@ -188,11 +177,10 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
ConnectionId shadow = ConnectionId(memberId, connectionId);
QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow);
self = shadow;
- assert(isShadow());
}
void Connection::dumpComplete() {
- // FIXME aconway 2008-09-18: use or remove.
+ cluster.dumpComplete();
}
bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; }
diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.cpp b/qpid/cpp/src/qpid/cluster/DumpClient.cpp
index d2d3c9bb15..59542a2e95 100644
--- a/qpid/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/DumpClient.cpp
@@ -59,6 +59,12 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
+struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection {
+ ClusterConnectionProxy(client::Connection& c) :
+ AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {}
+};
+
+
// Create a connection with special version that marks it as a catch-up connection.
client::Connection catchUpConnection() {
client::Connection c;
@@ -101,7 +107,7 @@ void DumpClient::dump() {
session.sync();
session.close();
donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1));
- // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
+ ClusterConnectionProxy(connection).dumpComplete();
connection.close();
QPID_LOG(debug, donor.getSelf() << " dumped all state to " << receiver);
}
@@ -160,10 +166,9 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn
// authentication etc. See ConnectionSettings.
shadowConnection.open(receiver, bc.getUserId());
dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
- boost::shared_ptr<client::ConnectionImpl> impl = client::ConnectionAccess::getImpl(shadowConnection);
- AMQP_AllProxy::ClusterConnection proxy(*impl);
- proxy.shadowReady(dumpConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()));
+ ClusterConnectionProxy(shadowConnection).shadowReady(
+ dumpConnection->getId().getMember(),
+ reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()));
shadowConnection.close();
QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection);
}
diff --git a/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp b/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
index 58c444ac62..6838313263 100644
--- a/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
@@ -105,27 +105,8 @@ void JoiningHandler::ready(const MemberId& id, const std::string& urlStr) {
checkDumpRequest();
}
-void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(cluster.lock);
- if (c->isCatchUp()) {
- ++catchUpConnections;
- QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections);
- }
- cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
-}
-
-void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(cluster.lock);
- QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: " << catchUpConnections-1);
- if (c->isShadow())
- cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
- if (--catchUpConnections == 0)
- dumpComplete();
-}
-
void JoiningHandler::dumpComplete() {
- // FIXME aconway 2008-09-18: need to detect incomplete dump.
- // Called with lock - volatile?
+ Mutex::ScopedLock l(cluster.lock);
if (state == STALLED) {
QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling.");
cluster.ready();
@@ -135,6 +116,7 @@ void JoiningHandler::dumpComplete() {
assert(state == DUMP_REQUESTED);
state = DUMP_COMPLETE;
}
+ // FIXME aconway 2008-09-18: need to detect incomplete dump.
}
diff --git a/qpid/cpp/src/qpid/cluster/JoiningHandler.h b/qpid/cpp/src/qpid/cluster/JoiningHandler.h
index 097d765e5e..cc47690ac5 100644
--- a/qpid/cpp/src/qpid/cluster/JoiningHandler.h
+++ b/qpid/cpp/src/qpid/cluster/JoiningHandler.h
@@ -46,12 +46,10 @@ class JoiningHandler : public ClusterHandler
void dumpRequest(const MemberId&, const std::string& url);
void ready(const MemberId&, const std::string& url);
- void insert(const boost::intrusive_ptr<Connection>& c);
- void catchUpClosed(const boost::intrusive_ptr<Connection>& c);
+ void dumpComplete();
private:
void checkDumpRequest();
- void dumpComplete();
enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state;
size_t catchUpConnections;
diff --git a/qpid/cpp/src/qpid/cluster/MemberHandler.cpp b/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
index 5f02754443..99e7b7d683 100644
--- a/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
@@ -90,18 +90,6 @@ void MemberHandler::dumpError(const std::exception& e) {
dumpSent();
}
-void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(cluster.lock);
- if (c->isCatchUp()) // Not allowed in member mode
- c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, "Not in catch-up mode.");
- else
- cluster.connections[c->getId()] = c;
-}
-
-void MemberHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(cluster.lock);
- QPID_LOG(warning, "Catch-up connection " << c << " closed in member mode");
- assert(0);
-}
+void MemberHandler::dumpComplete() { assert(0); }
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/MemberHandler.h b/qpid/cpp/src/qpid/cluster/MemberHandler.h
index 7655034ed8..37cf653b7b 100644
--- a/qpid/cpp/src/qpid/cluster/MemberHandler.h
+++ b/qpid/cpp/src/qpid/cluster/MemberHandler.h
@@ -52,8 +52,7 @@ class MemberHandler : public ClusterHandler
void dumpSent();
void dumpError(const std::exception&);
- void insert(const boost::intrusive_ptr<Connection>& c);
- void catchUpClosed(const boost::intrusive_ptr<Connection>& );
+ void dumpComplete();
public:
sys::Thread dumpThread;