summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/DumpClient.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-01-27 02:08:25 +0000
committerAlan Conway <aconway@apache.org>2009-01-27 02:08:25 +0000
commit306114207d6ff6c3ec6d63f5ab6b4ff9e1dd7d4e (patch)
tree04c1f8f85b0cf469c7c7e526f436e09cd12e350a /cpp/src/qpid/cluster/DumpClient.cpp
parent57acf95c94d52b15b2ad6e6038bf3390d9063282 (diff)
downloadqpid-python-306114207d6ff6c3ec6d63f5ab6b4ff9e1dd7d4e.tar.gz
Cluster rename: dump -> update, newbie -> joiner
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp369
1 files changed, 0 insertions, 369 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
deleted file mode 100644
index 00328eb310..0000000000
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "DumpClient.h"
-#include "Cluster.h"
-#include "ClusterMap.h"
-#include "Connection.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-#include "qpid/client/ConnectionAccess.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Exchange.h"
-#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/SessionHandler.h"
-#include "qpid/broker/SessionState.h"
-#include "qpid/broker/TxOpVisitor.h"
-#include "qpid/broker/DtxAck.h"
-#include "qpid/broker/TxAccept.h"
-#include "qpid/broker/TxPublish.h"
-#include "qpid/broker/RecoveredDequeue.h"
-#include "qpid/broker/RecoveredEnqueue.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/ClusterConnectionMembershipBody.h"
-#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
-#include "qpid/framing/ClusterConnectionSessionStateBody.h"
-#include "qpid/framing/ClusterConnectionConsumerStateBody.h"
-#include "qpid/framing/enum.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/TypeCode.h"
-#include "qpid/log/Statement.h"
-#include "qpid/Url.h"
-#include <boost/bind.hpp>
-#include <algorithm>
-
-namespace qpid {
-namespace cluster {
-
-using broker::Broker;
-using broker::Exchange;
-using broker::Queue;
-using broker::QueueBinding;
-using broker::Message;
-using namespace framing;
-namespace arg=client::arg;
-using client::SessionBase_0_10Access;
-
-struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection {
- ClusterConnectionProxy(client::Connection c) :
- AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {}
- ClusterConnectionProxy(client::AsyncSession s) :
- AMQP_AllProxy::ClusterConnection(SessionBase_0_10Access(s).get()->out) {}
-};
-
-// Create a connection with special version that marks it as a catch-up connection.
-client::Connection catchUpConnection() {
- client::Connection c;
- client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10));
- return c;
-}
-
-// Send a control body directly to the session.
-void send(client::AsyncSession& s, const AMQBody& body) {
- client::SessionBase_0_10Access sb(s);
- sb.get()->send(body);
-}
-
-// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel.
-
-DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons,
- const boost::function<void()>& ok,
- const boost::function<void(const std::exception&)>& fail)
- : dumperId(dumper), dumpeeId(dumpee), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons),
- connection(catchUpConnection()), shadowConnection(catchUpConnection()),
- done(ok), failed(fail)
-{
- connection.open(url);
- session = connection.newSession(DUMP);
-}
-
-DumpClient::~DumpClient() {}
-
-// Illegal exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-static const char DUMP_CHARS[] = "\000qpid-dump";
-const std::string DumpClient::DUMP(DUMP_CHARS, sizeof(DUMP_CHARS));
-
-void DumpClient::dump() {
- QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl);
- Broker& b = dumperBroker;
- b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
-
- // Dump exchange is used to route messages to the proper queue without modifying routing key.
- session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true);
- b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
- // Dump queue is used to transfer acquired messages that are no longer on their original queue.
- session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true);
- session.sync();
- session.close();
-
- std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1));
- AMQFrame frame(map.asMethodBody());
- client::ConnectionAccess::getImpl(connection)->handle(frame);
- connection.close();
- QPID_LOG(debug, dumperId << " dumped state to " << dumpeeId << " at " << dumpeeUrl);
-}
-
-void DumpClient::run() {
- try {
- dump();
- done();
- } catch (const std::exception& e) {
- failed(e);
- }
- delete this;
-}
-
-namespace {
-template <class T> std::string encode(const T& t) {
- std::string encoded;
- encoded.resize(t.encodedSize());
- framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
- t.encode(buf);
- return encoded;
-}
-} // namespace
-
-void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
- QPID_LOG(debug, dumperId << " dumping exchange " << ex->getName());
- ClusterConnectionProxy proxy(session);
- proxy.exchange(encode(*ex));
-}
-
-/** Bind a queue to the dump exchange and dump messges to it
- * setting the message possition as needed.
- */
-class MessageDumper {
- std::string queue;
- bool haveLastPos;
- framing::SequenceNumber lastPos;
- client::AsyncSession session;
-
- public:
-
- MessageDumper(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
- session.exchangeBind(queue, DumpClient::DUMP);
- }
-
- ~MessageDumper() {
- session.exchangeUnbind(queue, DumpClient::DUMP);
- }
-
- void dumpQueuedMessage(const broker::QueuedMessage& message) {
- if (!haveLastPos || message.position - lastPos != 1) {
- ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
- haveLastPos = true;
- }
- lastPos = message.position;
- SessionBase_0_10Access sb(session);
- framing::MessageTransferBody transfer(
- framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
- sb.get()->send(transfer, message.payload->getFrames());
- }
-
- void dumpMessage(const boost::intrusive_ptr<broker::Message>& message) {
- dumpQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
- }
-};
-
-
-void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
- QPID_LOG(debug, dumperId << " dumping queue " << q->getName());
- ClusterConnectionProxy proxy(session);
- proxy.queue(encode(*q));
- MessageDumper dumper(q->getName(), session);
- q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1));
- q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
-}
-
-
-void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) {
- session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
-}
-
-void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) {
- QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection);
- shadowConnection = catchUpConnection();
-
- broker::Connection& bc = dumpConnection->getBrokerConnection();
- // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
- shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
- bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
- ClusterConnectionProxy(shadowConnection).shadowReady(
- dumpConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(dumpConnection->getId().getPointer()));
- shadowConnection.close();
- QPID_LOG(debug, dumperId << " dumped connection " << *dumpConnection);
-}
-
-void DumpClient::dumpSession(broker::SessionHandler& sh) {
- QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
- << sh.getSession()->getId());
- broker::SessionState* ss = sh.getSession();
- if (!ss) return; // no session.
-
- // Create a client session to dump session state.
- boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
- boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
- client::SessionBase_0_10Access(shadowSession).set(simpl);
- AMQP_AllProxy::ClusterConnection proxy(simpl->out);
-
- // Re-create session state on remote connection.
-
- // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
- QPID_LOG(debug, dumperId << " dumping consumers.");
- ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
-
- QPID_LOG(debug, dumperId << " dumping unacknowledged messages.");
- broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
- std::for_each(drs.begin(), drs.end(), boost::bind(&DumpClient::dumpUnacked, this, _1));
-
- dumpTxState(ss->getSemanticState()); // Tx transaction state.
-
- // Adjust for command counter for message in progress, will be sent after state update.
- boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
- SequenceNumber received = ss->receiverGetReceived().command;
- if (inProgress)
- --received;
-
- // Reset command-sequence state.
- proxy.sessionState(
- ss->senderGetReplayPoint().command,
- ss->senderGetCommandPoint().command,
- ss->senderGetIncomplete(),
- std::max(received, ss->receiverGetExpected().command),
- received,
- ss->receiverGetUnknownComplete(),
- ss->receiverGetIncomplete()
- );
-
- // Send frames for partial message in progress.
- if (inProgress) {
- inProgress->getFrames().map(simpl->out);
- }
-
- // FIXME aconway 2008-09-23: update session replay list.
-
- QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId());
-}
-
-void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) {
- QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId());
- using namespace message;
- shadowSession.messageSubscribe(
- arg::queue = ci->getQueue()->getName(),
- arg::destination = ci->getName(),
- arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
- arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
- arg::exclusive = ci->isExclusive(),
- arg::resumeId = ci->getResumeId(),
- arg::resumeTtl = ci->getResumeTtl(),
- arg::arguments = ci->getArguments()
- );
- shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
- shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
- shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
- ClusterConnectionConsumerStateBody state(
- ProtocolVersion(),
- ci->getName(),
- ci->isBlocked(),
- ci->isNotifyEnabled()
- );
- client::SessionBase_0_10Access(shadowSession).get()->send(state);
- QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
-}
-
-void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
- if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
- // If the message is acquired then it is no longer on the
- // dumpees queue, put it on the dump queue for dumpee to pick up.
- //
- MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage());
- }
- ClusterConnectionProxy(shadowSession).deliveryRecord(
- dr.getQueue()->getName(),
- dr.getMessage().position,
- dr.getTag(),
- dr.getId(),
- dr.isAcquired(),
- dr.isAccepted(),
- dr.isCancelled(),
- dr.isComplete(),
- dr.isEnded(),
- dr.isWindowing(),
- dr.getCredit()
- );
-}
-
-class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper {
- public:
- TxOpDumper(DumpClient& dc, client::AsyncSession s)
- : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {}
-
- void operator()(const broker::DtxAck& ) {
- throw InternalErrorException("DTX transactions not currently supported by cluster.");
- }
-
- void operator()(const broker::RecoveredDequeue& rdeq) {
- dumpMessage(rdeq.getMessage());
- proxy.txEnqueue(rdeq.getQueue()->getName());
- }
-
- void operator()(const broker::RecoveredEnqueue& renq) {
- dumpMessage(renq.getMessage());
- proxy.txEnqueue(renq.getQueue()->getName());
- }
-
- void operator()(const broker::TxAccept& txAccept) {
- proxy.txAccept(txAccept.getAcked());
- }
-
- void operator()(const broker::TxPublish& txPub) {
- dumpMessage(txPub.getMessage());
- typedef std::list<Queue::shared_ptr> QueueList;
- const QueueList& qlist = txPub.getQueues();
- Array qarray(TYPE_CODE_STR8);
- for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
- qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
- proxy.txPublish(qarray, txPub.delivered);
- }
-
- private:
- DumpClient& parent;
- client::AsyncSession session;
- ClusterConnectionProxy proxy;
-};
-
-void DumpClient::dumpTxState(broker::SemanticState& s) {
- QPID_LOG(debug, dumperId << " dumping TX transaction state.");
- ClusterConnectionProxy proxy(shadowSession);
- proxy.accumulatedAck(s.getAccumulatedAck());
- broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
- if (txBuffer) {
- proxy.txStart();
- TxOpDumper dumper(*this, shadowSession);
- txBuffer->accept(dumper);
- proxy.txEnd();
- }
-}
-
-}} // namespace qpid::cluster