diff options
author | Alan Conway <aconway@apache.org> | 2008-09-24 17:34:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-24 17:34:08 +0000 |
commit | a2a56cf9a7483e165fb579d0b519b284d02009e3 (patch) | |
tree | 11264fc87ea6e54c54b476e245ad4ee9c83faaeb /cpp/src/qpid/cluster/DumpClient.cpp | |
parent | 30be110b6914959a1eaee4803ff8c1c9938db7bb (diff) | |
download | qpid-python-a2a56cf9a7483e165fb579d0b519b284d02009e3.tar.gz |
Cluster replicates session command sequence state and consumers to newcomers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698666 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 79 |
1 files changed, 55 insertions, 24 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 45ccec7166..ee87afb468 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -72,6 +72,8 @@ void send(client::Session& s, const AMQBody& body) { sb.get()->send(body); } +// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel. + DumpClient::DumpClient(const Url& url, Cluster& c, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail) @@ -79,9 +81,8 @@ DumpClient::DumpClient(const Url& url, Cluster& c, connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail) { - QPID_LOG(debug, "DumpClient from " << c.getSelf() << " to " << url); connection.open(url); - session = connection.newSession(); + session = connection.newSession("dump_shared"); } DumpClient::~DumpClient() {} @@ -91,6 +92,7 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange"; static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); void DumpClient::dump() { + QPID_LOG(debug, donor.getSelf() << " starting dump to " << receiver); Broker& b = donor.getBroker(); b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); // Catch-up exchange is used to route messages to the proper queue without modifying routing key. @@ -99,10 +101,9 @@ void DumpClient::dump() { session.sync(); session.close(); donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1)); - QPID_LOG(debug, "Dump sent, closing catch_up connection."); // FIXME aconway 2008-09-18: inidicate successful end-of-dump. connection.close(); - QPID_LOG(debug, "Dump sent."); + QPID_LOG(debug, donor.getSelf() << " dumped all state to " << receiver); } void DumpClient::run() { @@ -153,49 +154,79 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi } void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) { - QPID_LOG(debug, "Dump connection " << *dumpConnection); - shadowConnection = catchUpConnection(); - // FIXME aconway 2008-09-19: Open with settings from dumpConnection - userid etc. - shadowConnection.open(receiver); + broker::Connection& bc = dumpConnection->getBrokerConnection(); + // FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size, + // authentication etc. See ConnectionSettings. + shadowConnection.open(receiver, bc.getUserId()); dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); boost::shared_ptr<client::ConnectionImpl> impl = client::ConnectionAccess::getImpl(shadowConnection); - // FIXME aconway 2008-09-19: use proxy for cluster commands? - AMQFrame ready(in_place<ClusterConnectionShadowReadyBody>(ProtocolVersion(), - dumpConnection->getId().getMember(), - reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()))); - impl->handle(ready); - // Will be closed from the other end. - QPID_LOG(debug, "Dump done, connection " << *dumpConnection); + AMQP_AllProxy::ClusterConnection proxy(*impl); + proxy.shadowReady(dumpConnection->getId().getMember(), + reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr())); + shadowConnection.close(); + QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection); } void DumpClient::dumpSession(broker::SessionHandler& sh) { - QPID_LOG(debug, "Dump session " << &sh.getConnection() << "[" << sh.getChannel() << "] " + QPID_LOG(debug, donor.getId() << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " << sh.getSession()->getId()); - broker::SessionState* s = sh.getSession(); if (!s) return; // no session. + // Re-create the session. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize; - // FIXME aconway 2008-09-19: verify matching ID. boost::shared_ptr<client::SessionImpl> simpl( new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size)); cimpl->addSession(simpl); - simpl->open(0); - client::Session cs; - client::SessionBase_0_10Access(cs).set(simpl); - cs.sync(); + simpl->open(sh.getSession()->getTimeout()); + client::SessionBase_0_10Access(shadowSession).set(simpl); + AMQP_AllProxy::ClusterConnection proxy(simpl->out); + // Re-create session state on remote connection. broker::SessionState* ss = sh.getSession(); + ss->eachConsumer(boost::bind(&DumpClient::dumpConsumer, this, _1)); // FIXME aconway 2008-09-19: remaining session state. - QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId()); + + // Reset command-sequence state. + proxy.sessionState( + ss->senderGetReplayPoint().command, + ss->senderGetCommandPoint().command, + ss->senderGetIncomplete(), + ss->receiverGetExpected().command, + ss->receiverGetReceived().command, + ss->receiverGetUnknownComplete(), + ss->receiverGetIncomplete() + ); + + // FIXME aconway 2008-09-23: session replay list. + + QPID_LOG(debug, donor.getId() << " dumped session " << sh.getSession()->getId()); } void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { - QPID_LOG(critical, "DEBUG: dump consumer: " << ci->getName()); + using namespace message; + shadowSession.messageSubscribe( + arg::queue = ci->getQueue()->getName(), + arg::destination = ci->getName(), + arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, + arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, + arg::exclusive = false , // FIXME aconway 2008-09-23: how to read. + + // TODO aconway 2008-09-23: remaining args not used by current broker. + // Update this code when they are. + arg::resumeId=std::string(), + arg::resumeTtl=0, + arg::arguments=FieldTable() + ); + shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); + shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); + shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); + // FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and notifyEnabled? + QPID_LOG(debug, donor.getId() << " dumped consumer " << ci->getName() << " on " << shadowSession.getId()); } }} // namespace qpid::cluster |