summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/DumpClient.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-21 05:04:04 +0000
committerAlan Conway <aconway@apache.org>2008-09-21 05:04:04 +0000
commit558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05 (patch)
tree9b306597ee07b264fa18580546ed5645f0c3766d /cpp/src/qpid/cluster/DumpClient.cpp
parent7c70d21ca2d788d4432cfa89851c9b928c9f30aa (diff)
downloadqpid-python-558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05.tar.gz
DumpClient send connections & session IDs to new members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697446 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp82
1 files changed, 71 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 43c30d3b07..c78859cc39 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -19,6 +19,8 @@
*
*/
#include "DumpClient.h"
+#include "Cluster.h"
+#include "Connection.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
@@ -26,8 +28,11 @@
#include "qpid/broker/Message.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionDumpCompleteBody.h"
+#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
@@ -39,6 +44,7 @@ namespace qpid {
namespace client {
struct ConnectionAccess {
static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; }
+ static boost::shared_ptr<ConnectionImpl> getImpl(Connection& c) { return c.impl; }
};
} // namespace client
@@ -50,17 +56,30 @@ using broker::Queue;
using broker::QueueBinding;
using broker::Message;
using namespace framing;
-using namespace framing::message;
-using namespace client;
+namespace arg=client::arg;
+using client::SessionBase_0_10Access;
+
+// Create a connection with special version that marks it as a catch-up connection.
+client::Connection catchUpConnection() {
+ client::Connection c;
+ client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10));
+ return c;
+}
+// Send a control body directly to the session.
+void send(client::Session& s, const AMQBody& body) {
+ client::SessionBase_0_10Access sb(s);
+ sb.get()->send(body);
+}
-DumpClient::DumpClient(const Url& url, Broker& b,
+DumpClient::DumpClient(const Url& url, Cluster& c,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail)
- : donor(b), done(ok), failed(fail)
+ : receiver(url), donor(c),
+ connection(catchUpConnection()), shadowConnection(catchUpConnection()),
+ done(ok), failed(fail)
{
- // Special version identifies this as a catch-up connectionn.
- client::ConnectionAccess::setVersion(connection, ProtocolVersion(0x80 , 0x80 + 10));
+ QPID_LOG(debug, "DumpClient from " << c.getSelf() << " to " << url);
connection.open(url);
session = connection.newSession();
}
@@ -72,15 +91,18 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
void DumpClient::dump() {
- donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
+ Broker& b = donor.getBroker();
+ b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
// Catch-up exchange is used to route messages to the proper queue without modifying routing key.
session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
- donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
- SessionBase_0_10Access sb(session);
- // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
+ b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
session.sync();
session.close();
+ donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1));
+ QPID_LOG(debug, "Dump sent, closing catch_up connection.");
+ // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
connection.close();
+ QPID_LOG(debug, "Dump sent.");
}
void DumpClient::run() {
@@ -121,7 +143,8 @@ void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
void DumpClient::dumpMessage(const broker::QueuedMessage& message) {
SessionBase_0_10Access sb(session);
- framing::MessageTransferBody transfer(framing::ProtocolVersion(), CATCH_UP, ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED);
+ framing::MessageTransferBody transfer(
+ framing::ProtocolVersion(), CATCH_UP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
sb.get()->send(transfer, message.payload->getFrames());
}
@@ -129,5 +152,42 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi
session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
}
+void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) {
+ QPID_LOG(debug, "Dump connection " << *dumpConnection);
+
+ shadowConnection = catchUpConnection();
+ // FIXME aconway 2008-09-19: Open with settings from dumpConnection - userid etc.
+ shadowConnection.open(receiver);
+ dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
+ boost::shared_ptr<client::ConnectionImpl> impl = client::ConnectionAccess::getImpl(shadowConnection);
+ // FIXME aconway 2008-09-19: use proxy for cluster commands?
+ AMQFrame ready(in_place<ClusterConnectionShadowReadyBody>(ProtocolVersion(),
+ dumpConnection->getId().getMember(),
+ reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr())));
+ impl->handle(ready);
+ // Will be closed from the other end.
+ QPID_LOG(debug, "Dump done, connection " << *dumpConnection);
+}
+
+void DumpClient::dumpSession(broker::SessionHandler& sh) {
+ QPID_LOG(debug, "Dump session " << &sh.getConnection() << "[" << sh.getChannel() << "] "
+ << sh.getSession()->getId());
+
+ broker::SessionState* s = sh.getSession();
+ if (!s) return; // no session.
+ // Re-create the session.
+ boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
+ size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize;
+ // FIXME aconway 2008-09-19: verify matching ID.
+ boost::shared_ptr<client::SessionImpl> simpl(
+ new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size));
+ cimpl->addSession(simpl);
+ simpl->open(0);
+ client::Session cs;
+ client::SessionBase_0_10Access(cs).set(simpl);
+ cs.sync();
+ // FIXME aconway 2008-09-19: remaining session state.
+ QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId());
+}
}} // namespace qpid::cluster