diff options
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 82 |
1 files changed, 71 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 43c30d3b07..c78859cc39 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -19,6 +19,8 @@ * */ #include "DumpClient.h" +#include "Cluster.h" +#include "Connection.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" @@ -26,8 +28,11 @@ #include "qpid/broker/Message.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/ClusterConnectionDumpCompleteBody.h" +#include "qpid/framing/ClusterConnectionShadowReadyBody.h" #include "qpid/framing/enum.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/log/Statement.h" @@ -39,6 +44,7 @@ namespace qpid { namespace client { struct ConnectionAccess { static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; } + static boost::shared_ptr<ConnectionImpl> getImpl(Connection& c) { return c.impl; } }; } // namespace client @@ -50,17 +56,30 @@ using broker::Queue; using broker::QueueBinding; using broker::Message; using namespace framing; -using namespace framing::message; -using namespace client; +namespace arg=client::arg; +using client::SessionBase_0_10Access; + +// Create a connection with special version that marks it as a catch-up connection. +client::Connection catchUpConnection() { + client::Connection c; + client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10)); + return c; +} +// Send a control body directly to the session. +void send(client::Session& s, const AMQBody& body) { + client::SessionBase_0_10Access sb(s); + sb.get()->send(body); +} -DumpClient::DumpClient(const Url& url, Broker& b, +DumpClient::DumpClient(const Url& url, Cluster& c, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail) - : donor(b), done(ok), failed(fail) + : receiver(url), donor(c), + connection(catchUpConnection()), shadowConnection(catchUpConnection()), + done(ok), failed(fail) { - // Special version identifies this as a catch-up connectionn. - client::ConnectionAccess::setVersion(connection, ProtocolVersion(0x80 , 0x80 + 10)); + QPID_LOG(debug, "DumpClient from " << c.getSelf() << " to " << url); connection.open(url); session = connection.newSession(); } @@ -72,15 +91,18 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange"; static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); void DumpClient::dump() { - donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); + Broker& b = donor.getBroker(); + b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); // Catch-up exchange is used to route messages to the proper queue without modifying routing key. session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true); - donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); - SessionBase_0_10Access sb(session); - // FIXME aconway 2008-09-18: inidicate successful end-of-dump. + b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); session.sync(); session.close(); + donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1)); + QPID_LOG(debug, "Dump sent, closing catch_up connection."); + // FIXME aconway 2008-09-18: inidicate successful end-of-dump. connection.close(); + QPID_LOG(debug, "Dump sent."); } void DumpClient::run() { @@ -121,7 +143,8 @@ void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { void DumpClient::dumpMessage(const broker::QueuedMessage& message) { SessionBase_0_10Access sb(session); - framing::MessageTransferBody transfer(framing::ProtocolVersion(), CATCH_UP, ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED); + framing::MessageTransferBody transfer( + framing::ProtocolVersion(), CATCH_UP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); sb.get()->send(transfer, message.payload->getFrames()); } @@ -129,5 +152,42 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi session.exchangeBind(queue, binding.exchange, binding.key, binding.args); } +void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) { + QPID_LOG(debug, "Dump connection " << *dumpConnection); + + shadowConnection = catchUpConnection(); + // FIXME aconway 2008-09-19: Open with settings from dumpConnection - userid etc. + shadowConnection.open(receiver); + dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); + boost::shared_ptr<client::ConnectionImpl> impl = client::ConnectionAccess::getImpl(shadowConnection); + // FIXME aconway 2008-09-19: use proxy for cluster commands? + AMQFrame ready(in_place<ClusterConnectionShadowReadyBody>(ProtocolVersion(), + dumpConnection->getId().getMember(), + reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()))); + impl->handle(ready); + // Will be closed from the other end. + QPID_LOG(debug, "Dump done, connection " << *dumpConnection); +} + +void DumpClient::dumpSession(broker::SessionHandler& sh) { + QPID_LOG(debug, "Dump session " << &sh.getConnection() << "[" << sh.getChannel() << "] " + << sh.getSession()->getId()); + + broker::SessionState* s = sh.getSession(); + if (!s) return; // no session. + // Re-create the session. + boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); + size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize; + // FIXME aconway 2008-09-19: verify matching ID. + boost::shared_ptr<client::SessionImpl> simpl( + new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size)); + cimpl->addSession(simpl); + simpl->open(0); + client::Session cs; + client::SessionBase_0_10Access(cs).set(simpl); + cs.sync(); + // FIXME aconway 2008-09-19: remaining session state. + QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId()); +} }} // namespace qpid::cluster |