summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/DumpClient.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-03 20:56:38 +0000
committerAlan Conway <aconway@apache.org>2008-10-03 20:56:38 +0000
commitff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022 (patch)
treed8109d15ce3a85a9b6175ba2c9b3c51d8706fe9c /cpp/src/qpid/cluster/DumpClient.cpp
parent2141967346b884e592a42353ae596d37eb90fe7b (diff)
downloadqpid-python-ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022.tar.gz
Cluster join & brain-dumps working.
cluster: improved join protocol, fixed race conditions. client/ConnectionHandler,ConnectionImpl: fixed connection close race causing client hang. src/qpid/sys/PollableQueue.h: fixed incorrect use of startWatch/stopWatch. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@701532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp43
1 files changed, 24 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 59542a2e95..ed339b2f85 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -20,6 +20,7 @@
*/
#include "DumpClient.h"
#include "Cluster.h"
+#include "ClusterMap.h"
#include "Connection.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/broker/Broker.h"
@@ -31,7 +32,7 @@
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionState.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/ClusterConnectionDumpCompleteBody.h"
+#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ProtocolVersion.h"
@@ -63,6 +64,10 @@ struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection {
ClusterConnectionProxy(client::Connection& c) :
AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {}
};
+struct ClusterProxy : public AMQP_AllProxy::Cluster {
+ ClusterProxy(client::Connection& c) :
+ AMQP_AllProxy::Cluster(*client::ConnectionAccess::getImpl(c)) {}
+};
// Create a connection with special version that marks it as a catch-up connection.
@@ -80,10 +85,11 @@ void send(client::Session& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel.
-DumpClient::DumpClient(const Url& url, Cluster& c,
+DumpClient::DumpClient(const MemberId& from, const MemberId& to, const Url& url,
+ broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail)
- : receiver(url), donor(c),
+ : dumperId(to), dumpeeId(from), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail)
{
@@ -98,18 +104,19 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
void DumpClient::dump() {
- QPID_LOG(debug, donor.getSelf() << " starting dump to " << receiver);
- Broker& b = donor.getBroker();
+ QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl);
+ Broker& b = dumperBroker;
b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
// Catch-up exchange is used to route messages to the proper queue without modifying routing key.
session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
session.sync();
session.close();
- donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1));
- ClusterConnectionProxy(connection).dumpComplete();
+ std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1));
+ AMQFrame frame(map.asMethodBody());
+ client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
- QPID_LOG(debug, donor.getSelf() << " dumped all state to " << receiver);
+ QPID_LOG(debug, dumperId << " dumped state to " << dumpeeId << " at " << dumpeeUrl);
}
void DumpClient::run() {
@@ -160,25 +167,22 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi
}
void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) {
+ QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection);
shadowConnection = catchUpConnection();
broker::Connection& bc = dumpConnection->getBrokerConnection();
// FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size,
// authentication etc. See ConnectionSettings.
- shadowConnection.open(receiver, bc.getUserId());
+ shadowConnection.open(dumpeeUrl, bc.getUserId());
dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
dumpConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()));
+ reinterpret_cast<uint64_t>(dumpConnection->getId().getPointer()));
shadowConnection.close();
- QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection);
+ QPID_LOG(debug, dumperId << " dumped connection " << *dumpConnection);
}
-// FIXME aconway 2008-09-26: REMOVE
-void foo(broker::SemanticState::ConsumerImpl*) {}
-
-
void DumpClient::dumpSession(broker::SessionHandler& sh) {
- QPID_LOG(debug, donor.getId() << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
+ QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
<< sh.getSession()->getId());
broker::SessionState* s = sh.getSession();
if (!s) return; // no session.
@@ -214,17 +218,18 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
// FIXME aconway 2008-09-23: session replay list.
- QPID_LOG(debug, donor.getId() << " dumped session " << sh.getSession()->getId());
+ QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId());
}
void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
+ QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
arg::destination = ci->getName(),
arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
- arg::exclusive = false , // FIXME aconway 2008-09-23: how to read.
+ arg::exclusive = false , // FIXME aconway 2008-09-23: duplicate from consumer
// TODO aconway 2008-09-23: remaining args not used by current broker.
// Update this code when they are.
@@ -236,7 +241,7 @@ void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
// FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and notifyEnabled?
- QPID_LOG(debug, donor.getId() << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
+ QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
}
}} // namespace qpid::cluster