summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-05 15:22:47 +0000
committerAlan Conway <aconway@apache.org>2008-11-05 15:22:47 +0000
commitad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b (patch)
tree9ee2e8cdcad566d355233da8b4a45b92c9f6ed3f /cpp/src
parentd3f652de187cac449e1fae4e00fce59c204f020a (diff)
downloadqpid-python-ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b.tar.gz
Cluster: replicate transaction state to newcomers.
constants.rb: generate type code constants for AMQP types. Useful with Array. framing/Array: - added some std:::vector like functions & typedefs. - use TypeCode enums, human readable ostream << operator. rubygen - fixed error in generation of exceptions for bad codes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/broker/DtxAck.h1
-rw-r--r--cpp/src/qpid/broker/RecoveredDequeue.h4
-rw-r--r--cpp/src/qpid/broker/RecoveredEnqueue.h5
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.h9
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp2
-rw-r--r--cpp/src/qpid/broker/TxAccept.h10
-rw-r--r--cpp/src/qpid/broker/TxBuffer.cpp5
-rw-r--r--cpp/src/qpid/broker/TxBuffer.h3
-rw-r--r--cpp/src/qpid/broker/TxOp.h10
-rw-r--r--cpp/src/qpid/broker/TxOpVisitor.h100
-rw-r--r--cpp/src/qpid/broker/TxPublish.h4
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp3
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp65
-rw-r--r--cpp/src/qpid/cluster/Connection.h10
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp77
-rw-r--r--cpp/src/qpid/cluster/DumpClient.h6
-rw-r--r--cpp/src/qpid/framing/SequenceSet.h4
-rw-r--r--cpp/src/tests/TxMocks.h3
-rw-r--r--cpp/src/tests/cluster_test.cpp49
21 files changed, 298 insertions, 75 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 2b0becf836..8f77d4c3c6 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -506,6 +506,7 @@ nobase_include_HEADERS = \
qpid/broker/TxAccept.h \
qpid/broker/TxBuffer.h \
qpid/broker/TxOp.h \
+ qpid/broker/TxOpVisitor.h \
qpid/broker/TxPublish.h \
qpid/broker/Vhost.h \
qpid/client/AckMode.h \
diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h
index 05c4499839..d43532906a 100644
--- a/cpp/src/qpid/broker/DtxAck.h
+++ b/cpp/src/qpid/broker/DtxAck.h
@@ -39,6 +39,7 @@ namespace qpid {
virtual void commit() throw();
virtual void rollback() throw();
virtual ~DtxAck(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
};
}
}
diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h
index 276e1f4c5c..ef6b8f1f74 100644
--- a/cpp/src/qpid/broker/RecoveredDequeue.h
+++ b/cpp/src/qpid/broker/RecoveredDequeue.h
@@ -45,6 +45,10 @@ namespace qpid {
virtual void commit() throw();
virtual void rollback() throw();
virtual ~RecoveredDequeue(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+ Queue::shared_ptr getQueue() const { return queue; }
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
};
}
}
diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h
index 6525179769..2d97768e65 100644
--- a/cpp/src/qpid/broker/RecoveredEnqueue.h
+++ b/cpp/src/qpid/broker/RecoveredEnqueue.h
@@ -45,6 +45,11 @@ namespace qpid {
virtual void commit() throw();
virtual void rollback() throw();
virtual ~RecoveredEnqueue(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+ Queue::shared_ptr getQueue() const { return queue; }
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
+
};
}
}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 22f6316974..73dfc8cde8 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -436,7 +436,7 @@ void SemanticState::recover(bool requeue)
if(requeue){
//take copy and clear unacked as requeue may result in redelivery to this session
//which will in turn result in additions to unacked
- std::list<DeliveryRecord> copy = unacked;
+ DeliveryRecords copy = unacked;
unacked.clear();
for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index dbb3e1d3b6..340017ddf0 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -134,7 +134,7 @@ class SemanticState : public sys::OutputTask,
DeliveryAdapter& deliveryAdapter;
ConsumerImplMap consumers;
NameGenerator tagGenerator;
- std::list<DeliveryRecord> unacked;
+ DeliveryRecords unacked;
TxBuffer::shared_ptr txBuffer;
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
@@ -216,8 +216,11 @@ class SemanticState : public sys::OutputTask,
static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); }
template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); }
- template <class F> void eachUnacked(F f) { std::for_each(unacked.begin(), unacked.end(), f); }
-
+ DeliveryRecords& getUnacked() { return unacked; }
+ framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
+ TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
+ void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
+ void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
};
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
index 6d307bf735..594a466453 100644
--- a/cpp/src/qpid/broker/TxAccept.cpp
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -69,7 +69,7 @@ void TxAccept::RangeOps::commit()
}
}
-TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) :
+TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) :
acked(_acked), unacked(_unacked), ops(unacked)
{
//populate the ops
diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h
index 5474327f7c..0a5fdedb0a 100644
--- a/cpp/src/qpid/broker/TxAccept.h
+++ b/cpp/src/qpid/broker/TxAccept.h
@@ -56,8 +56,8 @@ namespace qpid {
void commit();
};
- framing::SequenceSet& acked;
- std::list<DeliveryRecord>& unacked;
+ framing::SequenceSet acked;
+ DeliveryRecords& unacked;
RangeOps ops;
public:
@@ -66,11 +66,15 @@ namespace qpid {
* acks received
* @param unacked the record of delivered messages
*/
- TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked);
+ TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~TxAccept(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+ // Used by cluster replication.
+ const framing::SequenceSet& getAcked() const { return acked; }
};
}
}
diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp
index 8fe2c17bf0..ae18e0f318 100644
--- a/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/cpp/src/qpid/broker/TxBuffer.cpp
@@ -22,6 +22,7 @@
#include "qpid/log/Statement.h"
#include <boost/mem_fn.hpp>
+#include <boost/bind.hpp>
using boost::mem_fn;
using namespace qpid::broker;
@@ -73,3 +74,7 @@ bool TxBuffer::commitLocal(TransactionalStore* const store)
}
return false;
}
+
+void TxBuffer::accept(TxOpConstVisitor& v) const {
+ std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
+}
diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h
index 361c47e92c..aabb5ea0b1 100644
--- a/cpp/src/qpid/broker/TxBuffer.h
+++ b/cpp/src/qpid/broker/TxBuffer.h
@@ -107,6 +107,9 @@ namespace qpid {
* commit
*/
bool commitLocal(TransactionalStore* const store);
+
+ // Used by cluster to replicate transaction status.
+ void accept(TxOpConstVisitor& v) const;
};
}
}
diff --git a/cpp/src/qpid/broker/TxOp.h b/cpp/src/qpid/broker/TxOp.h
index e687c437cc..5265478e36 100644
--- a/cpp/src/qpid/broker/TxOp.h
+++ b/cpp/src/qpid/broker/TxOp.h
@@ -21,11 +21,15 @@
#ifndef _TxOp_
#define _TxOp_
+#include "TxOpVisitor.h"
#include "TransactionalStore.h"
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
+
+class TxOpConstVisitor;
+
class TxOp{
public:
typedef boost::shared_ptr<TxOp> shared_ptr;
@@ -34,9 +38,11 @@ namespace qpid {
virtual void commit() throw() = 0;
virtual void rollback() throw() = 0;
virtual ~TxOp(){}
+
+ virtual void accept(TxOpConstVisitor&) const = 0;
};
- }
-}
+
+}} // namespace qpid::broker
#endif
diff --git a/cpp/src/qpid/broker/TxOpVisitor.h b/cpp/src/qpid/broker/TxOpVisitor.h
new file mode 100644
index 0000000000..a5f2a018c9
--- /dev/null
+++ b/cpp/src/qpid/broker/TxOpVisitor.h
@@ -0,0 +1,100 @@
+#ifndef QPID_BROKER_TXOPVISITOR_H
+#define QPID_BROKER_TXOPVISITOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/shared_ptr.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxAck;
+class RecoveredDequeue;
+class RecoveredEnqueue;
+class TxAccept;
+class TxPublish;
+
+/**
+ * Visitor for TxOp familly of classes.
+ */
+struct TxOpConstVisitor
+{
+ virtual ~TxOpConstVisitor() {}
+ virtual void operator()(const DtxAck&) = 0;
+ virtual void operator()(const RecoveredDequeue&) = 0;
+ virtual void operator()(const RecoveredEnqueue&) = 0;
+ virtual void operator()(const TxAccept&) = 0;
+ virtual void operator()(const TxPublish&) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_TXOPVISITOR_H*/
+#ifndef QPID_BROKER_TXOPVISITOR_H
+#define QPID_BROKER_TXOPVISITOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/shared_ptr.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxAck;
+class RecoveredDequeue;
+class RecoveredEnqueue;
+class TxAccept;
+class TxPublish;
+
+/**
+ * Visitor for TxOp familly of classes.
+ */
+struct TxOpConstVisitor
+{
+ virtual ~TxOpConstVisitor() {}
+ virtual void operator()(const DtxAck&) = 0;
+ virtual void operator()(const RecoveredDequeue&) = 0;
+ virtual void operator()(const RecoveredEnqueue&) = 0;
+ virtual void operator()(const TxAccept&) = 0;
+ virtual void operator()(const TxPublish&) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_TXOPVISITOR_H*/
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index 018437f1ed..1f73cb8767 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -75,8 +75,12 @@ namespace qpid {
virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
virtual ~TxPublish(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
uint64_t contentSize();
+
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
+ const std::list<Queue::shared_ptr> getQueues() const { return queues; }
};
}
}
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 9526a33ac6..fad0563872 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -90,7 +90,4 @@ struct ClusterPlugin : public Plugin {
static ClusterPlugin instance; // Static initialization.
-// For test purposes.
-Cluster& getGlobalCluster() { assert(instance.cluster); return *instance.cluster; }
-
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index ada26ab2fb..513816735d 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -24,7 +24,11 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
@@ -36,7 +40,7 @@
#include <boost/current_function.hpp>
-// FIXME aconway 2008-11-03:
+// TODO aconway 2008-11-03:
//
// Disproportionate amount of code here is dedicated to receiving a
// brain-dump when joining a cluster and building initial
@@ -113,7 +117,6 @@ bool Connection::checkUnsupported(const AMQBody& body) {
std::string message;
if (body.getMethod()) {
switch (body.getMethod()->amqpClassId()) {
- case TX_CLASS_ID: message = "TX transactions are not currently supported by cluster."; break;
case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
}
}
@@ -122,13 +125,13 @@ bool Connection::checkUnsupported(const AMQBody& body) {
if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster.";
}
if (!message.empty())
- connection.close(execution::ERROR_CODE_INTERNAL_ERROR, message, 0, 0);
+ connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message, 0, 0);
return !message.empty();
}
// Delivered from cluster.
void Connection::delivered(framing::AMQFrame& f) {
- QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f);
+ QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f);
assert(!catchUp);
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol.
@@ -247,11 +250,15 @@ bool Connection::isDumped() const {
return self.first == cluster.getId() && self.second == 0;
}
+
+shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
+ shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname);
+ if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname));
+ return queue;
+}
+
broker::QueuedMessage Connection::getDumpMessage() {
- // Get a message from the DUMP queue.
- broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
- if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue"));
- broker::QueuedMessage m = dumpQueue->get();
+ broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get();
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
return m;
}
@@ -267,14 +274,11 @@ void Connection::deliveryRecord(const string& qname,
bool ended,
bool windowing)
{
- broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname);
- if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname));
broker::QueuedMessage m;
+ broker::Queue::shared_ptr queue = findQueue(qname);
if (!ended) { // Has a message
- if (acquired) { // Message at front of dump queue
- broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
- m = dumpQueue->get();
- }
+ if (acquired) // Message is on the dump queue
+ m = getDumpMessage();
else // Message at original position in original queue
m = queue->find(position);
if (!m.payload)
@@ -286,8 +290,7 @@ void Connection::deliveryRecord(const string& qname,
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
-
- semanticState().record(dr);
+ semanticState().record(dr); // Part of the session's unacked list.
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -304,6 +307,36 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
}
+void Connection::txStart() {
+ txBuffer = make_shared_ptr(new broker::TxBuffer());
+}
+void Connection::txAccept(const framing::SequenceSet& acked) {
+ txBuffer->enlist(make_shared_ptr(new broker::TxAccept(acked, semanticState().getUnacked())));
+}
+
+void Connection::txDequeue(const std::string& queue) {
+ txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload)));
+}
+
+void Connection::txEnqueue(const std::string& queue) {
+ txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload)));
+}
+
+void Connection::txPublish(const framing::Array& queues, bool delivered) {
+ boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload));
+ for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
+ txPub->deliverTo(findQueue((*i)->get<std::string>()));
+ txPub->delivered = delivered;
+ txBuffer->enlist(txPub);
+}
+
+void Connection::txEnd() {
+ semanticState().setTxBuffer(txBuffer);
+}
+
+void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
+ semanticState().setAccumulatedAck(s);
+}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 331ac33ab0..2eafa90f32 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -125,12 +125,21 @@ class Connection :
void queuePosition(const std::string&, const framing::SequenceNumber&);
+ void txStart();
+ void txAccept(const framing::SequenceSet&);
+ void txDequeue(const std::string&);
+ void txEnqueue(const std::string&);
+ void txPublish(const qpid::framing::Array&, bool);
+ void txEnd();
+ void accumulatedAck(const qpid::framing::SequenceSet&);
+
private:
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
void deliverDoOutput(uint32_t requested);
void sendDoOutput();
+ boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
broker::SessionState& sessionState();
broker::SemanticState& semanticState();
broker::QueuedMessage getDumpMessage();
@@ -148,6 +157,7 @@ class Connection :
framing::SequenceNumber mcastSeq;
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
+ boost::shared_ptr<broker::TxBuffer> txBuffer;
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index a2860f6f32..bb3cfdfa56 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -32,6 +32,12 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/TxOpVisitor.h"
+#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -43,7 +49,7 @@
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
-
+#include <algorithm>
namespace qpid {
namespace cluster {
@@ -198,7 +204,7 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn
shadowConnection = catchUpConnection();
broker::Connection& bc = dumpConnection->getBrokerConnection();
- // FIXME aconway 2008-10-20: What authentication info to reconnect?
+ // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
@@ -227,7 +233,10 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
QPID_LOG(debug, dumperId << " dumping unacknowledged messages.");
- ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
+ broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
+ std::for_each(drs.begin(), drs.end(), boost::bind(&DumpClient::dumpUnacked, this, _1));
+
+ dumpTxState(ss->getSemanticState()); // Tx transaction state.
// Adjust for command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
@@ -283,22 +292,12 @@ void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) {
}
void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
- dumpDeliveryRecordMessage(dr);
- dumpDeliveryRecord(dr);
-}
-
-void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) {
- // Dump the message associated with a dr if need be.
if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
// dumpees queue, put it on the dump queue for dumpee to pick up.
//
MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage());
}
-}
-
-void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) {
- // Assumes the associated message has already been dumped (if needed)
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
dr.getMessage().position,
@@ -312,4 +311,56 @@ void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) {
dr.isWindowing());
}
+class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper {
+ public:
+ TxOpDumper(DumpClient& dc, client::AsyncSession s)
+ : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {}
+
+ void operator()(const broker::DtxAck& ) {
+ throw InternalErrorException("DTX transactions not currently supported by cluster.");
+ }
+
+ void operator()(const broker::RecoveredDequeue& rdeq) {
+ dumpMessage(rdeq.getMessage());
+ proxy.txEnqueue(rdeq.getQueue()->getName());
+ }
+
+ void operator()(const broker::RecoveredEnqueue& renq) {
+ dumpMessage(renq.getMessage());
+ proxy.txEnqueue(renq.getQueue()->getName());
+ }
+
+ void operator()(const broker::TxAccept& txAccept) {
+ proxy.txAccept(txAccept.getAcked());
+ }
+
+ void operator()(const broker::TxPublish& txPub) {
+ dumpMessage(txPub.getMessage());
+ typedef std::list<Queue::shared_ptr> QueueList;
+ const QueueList& qlist = txPub.getQueues();
+ Array qarray(TYPE_CODE_STR8);
+ for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
+ qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+ proxy.txPublish(qarray, txPub.delivered);
+ }
+
+ private:
+ DumpClient& parent;
+ client::AsyncSession session;
+ ClusterConnectionProxy proxy;
+};
+
+void DumpClient::dumpTxState(broker::SemanticState& s) {
+ QPID_LOG(debug, dumperId << " dumping TX transaction state.");
+ ClusterConnectionProxy proxy(shadowSession);
+ proxy.accumulatedAck(s.getAccumulatedAck());
+ broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
+ if (txBuffer) {
+ proxy.txStart();
+ TxOpDumper dumper(*this, shadowSession);
+ txBuffer->accept(dumper);
+ proxy.txEnd();
+ }
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h
index 716e7dcc3a..23676e7646 100644
--- a/cpp/src/qpid/cluster/DumpClient.h
+++ b/cpp/src/qpid/cluster/DumpClient.h
@@ -71,6 +71,8 @@ class DumpClient : public sys::Runnable {
void dump();
void run(); // Will delete this when finished.
+ void dumpUnacked(const broker::DeliveryRecord&);
+
private:
void dumpQueue(const boost::shared_ptr<broker::Queue>&);
void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
@@ -79,10 +81,8 @@ class DumpClient : public sys::Runnable {
void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
void dumpSession(broker::SessionHandler& s);
+ void dumpTxState(broker::SemanticState& s);
void dumpConsumer(const broker::SemanticState::ConsumerImpl*);
- void dumpUnacked(const broker::DeliveryRecord&);
- void dumpDeliveryRecord(const broker::DeliveryRecord&);
- void dumpDeliveryRecordMessage(const broker::DeliveryRecord&);
MemberId dumperId;
MemberId dumpeeId;
diff --git a/cpp/src/qpid/framing/SequenceSet.h b/cpp/src/qpid/framing/SequenceSet.h
index 8d97d03c4b..57b9c2c8e1 100644
--- a/cpp/src/qpid/framing/SequenceSet.h
+++ b/cpp/src/qpid/framing/SequenceSet.h
@@ -31,9 +31,9 @@ class Buffer;
class SequenceSet : public RangeSet<SequenceNumber> {
public:
SequenceSet() {}
- explicit SequenceSet(const RangeSet<SequenceNumber>& r)
+ SequenceSet(const RangeSet<SequenceNumber>& r)
: RangeSet<SequenceNumber>(r) {}
- explicit SequenceSet(const SequenceNumber& s) { add(s); }
+ SequenceSet(const SequenceNumber& s) { add(s); }
SequenceSet(const SequenceNumber& start, const SequenceNumber finish) { add(start,finish); }
diff --git a/cpp/src/tests/TxMocks.h b/cpp/src/tests/TxMocks.h
index 86864b987e..fe103c5fe5 100644
--- a/cpp/src/tests/TxMocks.h
+++ b/cpp/src/tests/TxMocks.h
@@ -114,6 +114,9 @@ public:
void check(){
assertEqualVector(expected, actual);
}
+
+ void accept(TxOpConstVisitor&) const {}
+
~MockTxOp(){}
};
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 623c033e4c..cf6c2e73de 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -45,13 +45,6 @@
#include <algorithm>
#include <iterator>
-namespace qpid {
-namespace cluster {
-// FIXME aconway 2008-11-04: remove.
-Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
-}} // namespace qpid::cluster
-
-
namespace std { // ostream operators in std:: namespace
template <class T>
ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); }
@@ -69,7 +62,6 @@ using qpid::sys::TIME_SEC;
using qpid::broker::Broker;
using boost::shared_ptr;
using qpid::cluster::Cluster;
-using qpid::cluster::getGlobalCluster;
/** Parse broker & cluster options */
Broker::Options parseOpts(size_t argc, const char* argv[]) {
@@ -216,8 +208,19 @@ class Sender {
uint16_t channel;
};
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) {
- ClusterFixture cluster(1, 1); // FIXME aconway 2008-11-04: local broker at index 1
+QPID_AUTO_TEST_CASE(testUnsupported) {
+ ScopedSuppressLogging sl;
+ ClusterFixture cluster(1);
+ Client c1(cluster[0], "c1");
+ BOOST_CHECK_THROW(c1.session.dtxSelect(), FramingErrorException);
+ Client c2(cluster[0], "c2");
+ Message m;
+ m.getDeliveryProperties().setTtl(1);
+ BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);
+}
+
+QPID_AUTO_TEST_CASE(testTxTransaction) {
+ ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
c0.session.queueDeclare(arg::queue="q");
c0.session.messageTransfer(arg::content=Message("A", "q"));
@@ -236,7 +239,8 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) {
SubscriptionManager rollbackSubs(rollbackSession);
rollbackSession.txSelect();
rollbackSession.messageTransfer(arg::content=Message("1", "q"));
- BOOST_CHECK_EQUAL(rollbackSubs.get("q", TIME_SEC).getData(), "B");
+ Message rollbackMessage = rollbackSubs.get("q", TIME_SEC);
+ BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
// Add new member mid transaction.
@@ -250,10 +254,14 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) {
rollbackSession.messageTransfer(arg::content=Message("3", "q"));
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
// Commit/roll back.
commitSession.txCommit();
rollbackSession.txRollback();
- // Verify queue status: just the comitted messages
+ rollbackSession.messageRelease(rollbackMessage.getId());
+
+
+ // Verify queue status: just the comitted messages and dequeues should remain.
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B");
BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a");
@@ -261,20 +269,6 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) {
BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c");
}
-QPID_AUTO_TEST_CASE(testUnsupported) {
- ScopedSuppressLogging sl;
- ClusterFixture cluster(1);
- Client c0(cluster[0], "c0");
- BOOST_CHECK_THROW(c0.session.txSelect(), Exception);
- BOOST_CHECK(!c0.connection.isOpen());
- Client c1(cluster[0], "c1");
- BOOST_CHECK_THROW(c1.session.dtxCommit(), Exception);
- Client c2(cluster[0], "c2");
- Message m;
- m.getDeliveryProperties().setTtl(1);
- BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);
-}
-
QPID_AUTO_TEST_CASE(testUnacked) {
// Verify replication of unacknowledged messages.
ClusterFixture cluster(1);
@@ -388,8 +382,7 @@ QPID_AUTO_TEST_CASE(testDumpMessageBuilder) {
Client c1(cluster[1], "c1");
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "abcd");
-
- BOOST_CHECK_EQUAL(2u, getGlobalCluster().getUrls().size());
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size());
}
QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {