summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/DumpClient.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-24 17:34:08 +0000
committerAlan Conway <aconway@apache.org>2008-09-24 17:34:08 +0000
commita2a56cf9a7483e165fb579d0b519b284d02009e3 (patch)
tree11264fc87ea6e54c54b476e245ad4ee9c83faaeb /cpp/src/qpid/cluster/DumpClient.cpp
parent30be110b6914959a1eaee4803ff8c1c9938db7bb (diff)
downloadqpid-python-a2a56cf9a7483e165fb579d0b519b284d02009e3.tar.gz
Cluster replicates session command sequence state and consumers to newcomers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698666 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp79
1 files changed, 55 insertions, 24 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 45ccec7166..ee87afb468 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -72,6 +72,8 @@ void send(client::Session& s, const AMQBody& body) {
sb.get()->send(body);
}
+// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel.
+
DumpClient::DumpClient(const Url& url, Cluster& c,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail)
@@ -79,9 +81,8 @@ DumpClient::DumpClient(const Url& url, Cluster& c,
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail)
{
- QPID_LOG(debug, "DumpClient from " << c.getSelf() << " to " << url);
connection.open(url);
- session = connection.newSession();
+ session = connection.newSession("dump_shared");
}
DumpClient::~DumpClient() {}
@@ -91,6 +92,7 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
void DumpClient::dump() {
+ QPID_LOG(debug, donor.getSelf() << " starting dump to " << receiver);
Broker& b = donor.getBroker();
b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
// Catch-up exchange is used to route messages to the proper queue without modifying routing key.
@@ -99,10 +101,9 @@ void DumpClient::dump() {
session.sync();
session.close();
donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1));
- QPID_LOG(debug, "Dump sent, closing catch_up connection.");
// FIXME aconway 2008-09-18: inidicate successful end-of-dump.
connection.close();
- QPID_LOG(debug, "Dump sent.");
+ QPID_LOG(debug, donor.getSelf() << " dumped all state to " << receiver);
}
void DumpClient::run() {
@@ -153,49 +154,79 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi
}
void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) {
- QPID_LOG(debug, "Dump connection " << *dumpConnection);
-
shadowConnection = catchUpConnection();
- // FIXME aconway 2008-09-19: Open with settings from dumpConnection - userid etc.
- shadowConnection.open(receiver);
+ broker::Connection& bc = dumpConnection->getBrokerConnection();
+ // FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size,
+ // authentication etc. See ConnectionSettings.
+ shadowConnection.open(receiver, bc.getUserId());
dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
boost::shared_ptr<client::ConnectionImpl> impl = client::ConnectionAccess::getImpl(shadowConnection);
- // FIXME aconway 2008-09-19: use proxy for cluster commands?
- AMQFrame ready(in_place<ClusterConnectionShadowReadyBody>(ProtocolVersion(),
- dumpConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr())));
- impl->handle(ready);
- // Will be closed from the other end.
- QPID_LOG(debug, "Dump done, connection " << *dumpConnection);
+ AMQP_AllProxy::ClusterConnection proxy(*impl);
+ proxy.shadowReady(dumpConnection->getId().getMember(),
+ reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()));
+ shadowConnection.close();
+ QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection);
}
void DumpClient::dumpSession(broker::SessionHandler& sh) {
- QPID_LOG(debug, "Dump session " << &sh.getConnection() << "[" << sh.getChannel() << "] "
+ QPID_LOG(debug, donor.getId() << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
<< sh.getSession()->getId());
-
broker::SessionState* s = sh.getSession();
if (!s) return; // no session.
+
// Re-create the session.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize;
- // FIXME aconway 2008-09-19: verify matching ID.
boost::shared_ptr<client::SessionImpl> simpl(
new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size));
cimpl->addSession(simpl);
- simpl->open(0);
- client::Session cs;
- client::SessionBase_0_10Access(cs).set(simpl);
- cs.sync();
+ simpl->open(sh.getSession()->getTimeout());
+ client::SessionBase_0_10Access(shadowSession).set(simpl);
+ AMQP_AllProxy::ClusterConnection proxy(simpl->out);
+ // Re-create session state on remote connection.
broker::SessionState* ss = sh.getSession();
+
ss->eachConsumer(boost::bind(&DumpClient::dumpConsumer, this, _1));
// FIXME aconway 2008-09-19: remaining session state.
- QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId());
+
+ // Reset command-sequence state.
+ proxy.sessionState(
+ ss->senderGetReplayPoint().command,
+ ss->senderGetCommandPoint().command,
+ ss->senderGetIncomplete(),
+ ss->receiverGetExpected().command,
+ ss->receiverGetReceived().command,
+ ss->receiverGetUnknownComplete(),
+ ss->receiverGetIncomplete()
+ );
+
+ // FIXME aconway 2008-09-23: session replay list.
+
+ QPID_LOG(debug, donor.getId() << " dumped session " << sh.getSession()->getId());
}
void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
- QPID_LOG(critical, "DEBUG: dump consumer: " << ci->getName());
+ using namespace message;
+ shadowSession.messageSubscribe(
+ arg::queue = ci->getQueue()->getName(),
+ arg::destination = ci->getName(),
+ arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
+ arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
+ arg::exclusive = false , // FIXME aconway 2008-09-23: how to read.
+
+ // TODO aconway 2008-09-23: remaining args not used by current broker.
+ // Update this code when they are.
+ arg::resumeId=std::string(),
+ arg::resumeTtl=0,
+ arg::arguments=FieldTable()
+ );
+ shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+ shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
+ shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
+ // FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and notifyEnabled?
+ QPID_LOG(debug, donor.getId() << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
}
}} // namespace qpid::cluster