diff options
author | Alan Conway <aconway@apache.org> | 2008-10-03 20:56:38 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-03 20:56:38 +0000 |
commit | ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022 (patch) | |
tree | d8109d15ce3a85a9b6175ba2c9b3c51d8706fe9c /cpp/src/qpid/cluster/DumpClient.cpp | |
parent | 2141967346b884e592a42353ae596d37eb90fe7b (diff) | |
download | qpid-python-ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022.tar.gz |
Cluster join & brain-dumps working.
cluster: improved join protocol, fixed race conditions.
client/ConnectionHandler,ConnectionImpl: fixed connection close race causing client hang.
src/qpid/sys/PollableQueue.h: fixed incorrect use of startWatch/stopWatch.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@701532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 59542a2e95..ed339b2f85 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -20,6 +20,7 @@ */ #include "DumpClient.h" #include "Cluster.h" +#include "ClusterMap.h" #include "Connection.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/broker/Broker.h" @@ -31,7 +32,7 @@ #include "qpid/broker/SessionHandler.h" #include "qpid/broker/SessionState.h" #include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/ClusterConnectionDumpCompleteBody.h" +#include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/ClusterConnectionShadowReadyBody.h" #include "qpid/framing/enum.h" #include "qpid/framing/ProtocolVersion.h" @@ -63,6 +64,10 @@ struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection { ClusterConnectionProxy(client::Connection& c) : AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {} }; +struct ClusterProxy : public AMQP_AllProxy::Cluster { + ClusterProxy(client::Connection& c) : + AMQP_AllProxy::Cluster(*client::ConnectionAccess::getImpl(c)) {} +}; // Create a connection with special version that marks it as a catch-up connection. @@ -80,10 +85,11 @@ void send(client::Session& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel. -DumpClient::DumpClient(const Url& url, Cluster& c, +DumpClient::DumpClient(const MemberId& from, const MemberId& to, const Url& url, + broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail) - : receiver(url), donor(c), + : dumperId(to), dumpeeId(from), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail) { @@ -98,18 +104,19 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange"; static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); void DumpClient::dump() { - QPID_LOG(debug, donor.getSelf() << " starting dump to " << receiver); - Broker& b = donor.getBroker(); + QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl); + Broker& b = dumperBroker; b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); // Catch-up exchange is used to route messages to the proper queue without modifying routing key. session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true); b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); session.sync(); session.close(); - donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1)); - ClusterConnectionProxy(connection).dumpComplete(); + std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1)); + AMQFrame frame(map.asMethodBody()); + client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); - QPID_LOG(debug, donor.getSelf() << " dumped all state to " << receiver); + QPID_LOG(debug, dumperId << " dumped state to " << dumpeeId << " at " << dumpeeUrl); } void DumpClient::run() { @@ -160,25 +167,22 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi } void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) { + QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection); shadowConnection = catchUpConnection(); broker::Connection& bc = dumpConnection->getBrokerConnection(); // FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size, // authentication etc. See ConnectionSettings. - shadowConnection.open(receiver, bc.getUserId()); + shadowConnection.open(dumpeeUrl, bc.getUserId()); dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( dumpConnection->getId().getMember(), - reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr())); + reinterpret_cast<uint64_t>(dumpConnection->getId().getPointer())); shadowConnection.close(); - QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection); + QPID_LOG(debug, dumperId << " dumped connection " << *dumpConnection); } -// FIXME aconway 2008-09-26: REMOVE -void foo(broker::SemanticState::ConsumerImpl*) {} - - void DumpClient::dumpSession(broker::SessionHandler& sh) { - QPID_LOG(debug, donor.getId() << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " + QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " << sh.getSession()->getId()); broker::SessionState* s = sh.getSession(); if (!s) return; // no session. @@ -214,17 +218,18 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { // FIXME aconway 2008-09-23: session replay list. - QPID_LOG(debug, donor.getId() << " dumped session " << sh.getSession()->getId()); + QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId()); } void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { + QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), arg::destination = ci->getName(), arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, - arg::exclusive = false , // FIXME aconway 2008-09-23: how to read. + arg::exclusive = false , // FIXME aconway 2008-09-23: duplicate from consumer // TODO aconway 2008-09-23: remaining args not used by current broker. // Update this code when they are. @@ -236,7 +241,7 @@ void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); // FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and notifyEnabled? - QPID_LOG(debug, donor.getId() << " dumped consumer " << ci->getName() << " on " << shadowSession.getId()); + QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId()); } }} // namespace qpid::cluster |