summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/DumpClient.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-05 15:22:47 +0000
committerAlan Conway <aconway@apache.org>2008-11-05 15:22:47 +0000
commitad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b (patch)
tree9ee2e8cdcad566d355233da8b4a45b92c9f6ed3f /cpp/src/qpid/cluster/DumpClient.cpp
parentd3f652de187cac449e1fae4e00fce59c204f020a (diff)
downloadqpid-python-ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b.tar.gz
Cluster: replicate transaction state to newcomers.
constants.rb: generate type code constants for AMQP types. Useful with Array. framing/Array: - added some std:::vector like functions & typedefs. - use TypeCode enums, human readable ostream << operator. rubygen - fixed error in generation of exceptions for bad codes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp77
1 files changed, 64 insertions, 13 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index a2860f6f32..bb3cfdfa56 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -32,6 +32,12 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/TxOpVisitor.h"
+#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -43,7 +49,7 @@
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
-
+#include <algorithm>
namespace qpid {
namespace cluster {
@@ -198,7 +204,7 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn
shadowConnection = catchUpConnection();
broker::Connection& bc = dumpConnection->getBrokerConnection();
- // FIXME aconway 2008-10-20: What authentication info to reconnect?
+ // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
@@ -227,7 +233,10 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
QPID_LOG(debug, dumperId << " dumping unacknowledged messages.");
- ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
+ broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
+ std::for_each(drs.begin(), drs.end(), boost::bind(&DumpClient::dumpUnacked, this, _1));
+
+ dumpTxState(ss->getSemanticState()); // Tx transaction state.
// Adjust for command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
@@ -283,22 +292,12 @@ void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) {
}
void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
- dumpDeliveryRecordMessage(dr);
- dumpDeliveryRecord(dr);
-}
-
-void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) {
- // Dump the message associated with a dr if need be.
if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
// dumpees queue, put it on the dump queue for dumpee to pick up.
//
MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage());
}
-}
-
-void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) {
- // Assumes the associated message has already been dumped (if needed)
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
dr.getMessage().position,
@@ -312,4 +311,56 @@ void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) {
dr.isWindowing());
}
+class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper {
+ public:
+ TxOpDumper(DumpClient& dc, client::AsyncSession s)
+ : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {}
+
+ void operator()(const broker::DtxAck& ) {
+ throw InternalErrorException("DTX transactions not currently supported by cluster.");
+ }
+
+ void operator()(const broker::RecoveredDequeue& rdeq) {
+ dumpMessage(rdeq.getMessage());
+ proxy.txEnqueue(rdeq.getQueue()->getName());
+ }
+
+ void operator()(const broker::RecoveredEnqueue& renq) {
+ dumpMessage(renq.getMessage());
+ proxy.txEnqueue(renq.getQueue()->getName());
+ }
+
+ void operator()(const broker::TxAccept& txAccept) {
+ proxy.txAccept(txAccept.getAcked());
+ }
+
+ void operator()(const broker::TxPublish& txPub) {
+ dumpMessage(txPub.getMessage());
+ typedef std::list<Queue::shared_ptr> QueueList;
+ const QueueList& qlist = txPub.getQueues();
+ Array qarray(TYPE_CODE_STR8);
+ for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
+ qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+ proxy.txPublish(qarray, txPub.delivered);
+ }
+
+ private:
+ DumpClient& parent;
+ client::AsyncSession session;
+ ClusterConnectionProxy proxy;
+};
+
+void DumpClient::dumpTxState(broker::SemanticState& s) {
+ QPID_LOG(debug, dumperId << " dumping TX transaction state.");
+ ClusterConnectionProxy proxy(shadowSession);
+ proxy.accumulatedAck(s.getAccumulatedAck());
+ broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
+ if (txBuffer) {
+ proxy.txStart();
+ TxOpDumper dumper(*this, shadowSession);
+ txBuffer->accept(dumper);
+ proxy.txEnd();
+ }
+}
+
}} // namespace qpid::cluster