diff options
author | Alan Conway <aconway@apache.org> | 2008-11-05 15:22:47 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-11-05 15:22:47 +0000 |
commit | ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b (patch) | |
tree | 9ee2e8cdcad566d355233da8b4a45b92c9f6ed3f /cpp/src/qpid/cluster/DumpClient.cpp | |
parent | d3f652de187cac449e1fae4e00fce59c204f020a (diff) | |
download | qpid-python-ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b.tar.gz |
Cluster: replicate transaction state to newcomers.
constants.rb: generate type code constants for AMQP types. Useful with Array.
framing/Array:
- added some std:::vector like functions & typedefs.
- use TypeCode enums, human readable ostream << operator.
rubygen - fixed error in generation of exceptions for bad codes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 77 |
1 files changed, 64 insertions, 13 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index a2860f6f32..bb3cfdfa56 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -32,6 +32,12 @@ #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/SessionHandler.h" #include "qpid/broker/SessionState.h" +#include "qpid/broker/TxOpVisitor.h" +#include "qpid/broker/DtxAck.h" +#include "qpid/broker/TxAccept.h" +#include "qpid/broker/TxPublish.h" +#include "qpid/broker/RecoveredDequeue.h" +#include "qpid/broker/RecoveredEnqueue.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/ClusterConnectionShadowReadyBody.h" @@ -43,7 +49,7 @@ #include "qpid/log/Statement.h" #include "qpid/Url.h" #include <boost/bind.hpp> - +#include <algorithm> namespace qpid { namespace cluster { @@ -198,7 +204,7 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn shadowConnection = catchUpConnection(); broker::Connection& bc = dumpConnection->getBrokerConnection(); - // FIXME aconway 2008-10-20: What authentication info to reconnect? + // FIXME aconway 2008-10-20: What authentication info to use on reconnect? shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax()); bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( @@ -227,7 +233,10 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); QPID_LOG(debug, dumperId << " dumping unacknowledged messages."); - ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1)); + broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); + std::for_each(drs.begin(), drs.end(), boost::bind(&DumpClient::dumpUnacked, this, _1)); + + dumpTxState(ss->getSemanticState()); // Tx transaction state. // Adjust for command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); @@ -283,22 +292,12 @@ void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) { } void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) { - dumpDeliveryRecordMessage(dr); - dumpDeliveryRecord(dr); -} - -void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) { - // Dump the message associated with a dr if need be. if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { // If the message is acquired then it is no longer on the // dumpees queue, put it on the dump queue for dumpee to pick up. // MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage()); } -} - -void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) { - // Assumes the associated message has already been dumped (if needed) ClusterConnectionProxy(shadowSession).deliveryRecord( dr.getQueue()->getName(), dr.getMessage().position, @@ -312,4 +311,56 @@ void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) { dr.isWindowing()); } +class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper { + public: + TxOpDumper(DumpClient& dc, client::AsyncSession s) + : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {} + + void operator()(const broker::DtxAck& ) { + throw InternalErrorException("DTX transactions not currently supported by cluster."); + } + + void operator()(const broker::RecoveredDequeue& rdeq) { + dumpMessage(rdeq.getMessage()); + proxy.txEnqueue(rdeq.getQueue()->getName()); + } + + void operator()(const broker::RecoveredEnqueue& renq) { + dumpMessage(renq.getMessage()); + proxy.txEnqueue(renq.getQueue()->getName()); + } + + void operator()(const broker::TxAccept& txAccept) { + proxy.txAccept(txAccept.getAcked()); + } + + void operator()(const broker::TxPublish& txPub) { + dumpMessage(txPub.getMessage()); + typedef std::list<Queue::shared_ptr> QueueList; + const QueueList& qlist = txPub.getQueues(); + Array qarray(TYPE_CODE_STR8); + for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) + qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + proxy.txPublish(qarray, txPub.delivered); + } + + private: + DumpClient& parent; + client::AsyncSession session; + ClusterConnectionProxy proxy; +}; + +void DumpClient::dumpTxState(broker::SemanticState& s) { + QPID_LOG(debug, dumperId << " dumping TX transaction state."); + ClusterConnectionProxy proxy(shadowSession); + proxy.accumulatedAck(s.getAccumulatedAck()); + broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); + if (txBuffer) { + proxy.txStart(); + TxOpDumper dumper(*this, shadowSession); + txBuffer->accept(dumper); + proxy.txEnd(); + } +} + }} // namespace qpid::cluster |