diff options
author | Alan Conway <aconway@apache.org> | 2008-10-30 21:07:28 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-30 21:07:28 +0000 |
commit | 364b62744a35f7e48332af00217a1848345cd39a (patch) | |
tree | d8cab9a5c824133e4429919a9c129beaec0ce456 /cpp/src/qpid/cluster/DumpClient.cpp | |
parent | 5e79599484a675baabf45e6b2c50635dbd6b1119 (diff) | |
download | qpid-python-364b62744a35f7e48332af00217a1848345cd39a.tar.gz |
Replicate session state for un-acknowledged messages to new cluster members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@709242 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 98 |
1 files changed, 73 insertions, 25 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 802019feb1..40852a0411 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -56,14 +56,11 @@ namespace arg=client::arg; using client::SessionBase_0_10Access; struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection { - ClusterConnectionProxy(client::Connection& c) : + ClusterConnectionProxy(client::Connection c) : AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {} + ClusterConnectionProxy(client::AsyncSession s) : + AMQP_AllProxy::ClusterConnection(SessionBase_0_10Access(s).get()->out) {} }; -struct ClusterProxy : public AMQP_AllProxy::Cluster { - ClusterProxy(client::Connection& c) : - AMQP_AllProxy::Cluster(*client::ConnectionAccess::getImpl(c)) {} -}; - // Create a connection with special version that marks it as a catch-up connection. client::Connection catchUpConnection() { @@ -73,7 +70,7 @@ client::Connection catchUpConnection() { } // Send a control body directly to the session. -void send(client::Session& s, const AMQBody& body) { +void send(client::AsyncSession& s, const AMQBody& body) { client::SessionBase_0_10Access sb(s); sb.get()->send(body); } @@ -94,19 +91,23 @@ DumpClient::DumpClient(const MemberId& from, const MemberId& to, const Url& url, DumpClient::~DumpClient() {} -// Catch-up exchange name: an illegal AMQP exchange name to avoid clashes. -static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange"; -static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); +// Illegal exchange/queue name for catch-up, avoid clashes with user queues/exchanges. +static const char DUMP_CHARS[] = "\000qpid-dump"; +const std::string DumpClient::DUMP(DUMP_CHARS, sizeof(DUMP_CHARS)); void DumpClient::dump() { QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl); Broker& b = dumperBroker; b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); - // Catch-up exchange is used to route messages to the proper queue without modifying routing key. - session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true); + + // Dump exchange is used to route messages to the proper queue without modifying routing key. + session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true); b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); +// Dump queue is used to transfer acquired messages that are no longer on their original queue. + session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true); session.sync(); session.close(); + std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1)); AMQFrame frame(map.asMethodBody()); client::ConnectionAccess::getImpl(connection)->handle(frame); @@ -134,6 +135,39 @@ void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) { arg::arguments=ex->getArgs()); } +/** Bind a queue to the dump exchange and dump messges to it + * setting the message possition as needed. + */ +class MessageDumper { + std::string queue; + bool haveLastPos; + framing::SequenceNumber lastPos; + client::AsyncSession session; + + public: + + MessageDumper(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) { + session.exchangeBind(queue, DumpClient::DUMP); + } + + ~MessageDumper() { + session.exchangeUnbind(queue, DumpClient::DUMP); + } + + void dump(const broker::QueuedMessage& message) { + if (!haveLastPos || message.position - lastPos != 1) { + ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); + haveLastPos = true; + } + lastPos = message.position; + SessionBase_0_10Access sb(session); + framing::MessageTransferBody transfer( + framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); + sb.get()->send(transfer, message.payload->getFrames()); + } +}; + + void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { session.queueDeclare( q->getName(), @@ -143,19 +177,11 @@ void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { arg::exclusive=q->hasExclusiveConsumer(), arg::autoDelete=q->isAutoDelete(), arg::arguments=q->getSettings()); - - session.exchangeBind(q->getName(), CATCH_UP, std::string()); - q->eachMessage(boost::bind(&DumpClient::dumpMessage, this, _1)); - session.exchangeUnbind(q->getName(), CATCH_UP, std::string()); + MessageDumper dumper(q->getName(), session); + q->eachMessage(boost::bind(&MessageDumper::dump, &dumper, _1)); q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1)); } -void DumpClient::dumpMessage(const broker::QueuedMessage& message) { - SessionBase_0_10Access sb(session); - framing::MessageTransferBody transfer( - framing::ProtocolVersion(), CATCH_UP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); - sb.get()->send(transfer, message.payload->getFrames()); -} void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) { session.exchangeBind(queue, binding.exchange, binding.key, binding.args); @@ -190,11 +216,11 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { // Re-create session state on remote connection. - // For reasons unknown, boost::bind does not work here with boost 1.33. + // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33. ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); + ss->eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1)); boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); - // Adjust for message in progress, will be sent after state update. SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) @@ -221,7 +247,7 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId()); } -void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { +void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) { QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( @@ -246,5 +272,27 @@ void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { client::SessionBase_0_10Access(shadowSession).get()->send(state); QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId()); } + +void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) { + assert(dr.isEnded() || dr.getMessage().payload); + + if (!dr.isEnded() && dr.isAcquired()) { + // If the message is acquired then it is no longer on the + // dumpees queue, put it on the dump queue for dumpee to pick up. + // + MessageDumper(DUMP, shadowSession).dump(dr.getMessage()); + } + ClusterConnectionProxy(shadowSession).deliveryRecord( + dr.getQueue()->getName(), + dr.getMessage().position, + dr.getTag(), + dr.getId(), + dr.isAcquired(), + dr.isAccepted(), + dr.isCancelled(), + dr.isComplete(), + dr.isEnded(), + dr.isWindowing()); +} }} // namespace qpid::cluster |