summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-08-30 19:35:36 +0000
committerAlan Conway <aconway@apache.org>2011-08-30 19:35:36 +0000
commit539cf5c3713be45fbba15a1ca5932f86c4c6fdbe (patch)
tree86bbe4b4941f272a58d33c1e67ed1c475f560a0e
parent9e5da3bbd090a7757a39e19bc78c1c11f2c7045d (diff)
downloadqpid-python-539cf5c3713be45fbba15a1ca5932f86c4c6fdbe.tar.gz
QPID-3384: DTX transactions - replicate suspended transactions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1163347 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp29
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp28
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateReceiver.h15
-rw-r--r--qpid/cpp/src/tests/brokertest.py4
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py95
-rw-r--r--qpid/cpp/xml/cluster.xml2
9 files changed, 126 insertions, 59 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 8de884c113..22bc272c50 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -148,9 +148,10 @@ class SemanticState : private boost::noncopyable {
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};
+ typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
+
private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
- typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
SessionContext& session;
DeliveryAdapter& deliveryAdapter;
@@ -181,6 +182,7 @@ class SemanticState : private boost::noncopyable {
void disable(ConsumerImpl::shared_ptr);
public:
+
SemanticState(DeliveryAdapter&, SessionContext&);
~SemanticState();
@@ -218,6 +220,7 @@ class SemanticState : private boost::noncopyable {
void commit(MessageStore* const store);
void rollback();
void selectDtx();
+ bool getDtxSelected() const { return dtxSelected; }
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
@@ -249,6 +252,7 @@ class SemanticState : private boost::noncopyable {
void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; }
void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
+ DtxBufferMap& getSuspendedXids() { return suspendedXids; }
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 0691aae711..e0ce8cf9e0 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -419,7 +419,8 @@ void Connection::sessionState(
const SequenceNumber& expected,
const SequenceNumber& received,
const SequenceSet& unknownCompleted,
- const SequenceSet& receivedIncomplete)
+ const SequenceSet& receivedIncomplete,
+ bool dtxSelected)
{
sessionState().setState(
replayStart,
@@ -429,7 +430,9 @@ void Connection::sessionState(
received,
unknownCompleted,
receivedIncomplete);
- QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+ if (dtxSelected) semanticState().selectDtx();
+ QPID_LOG(debug, cluster << " received session state update for "
+ << sessionState().getId());
// The output tasks will be added later in the update process.
connection->getOutputTasks().removeAll();
}
@@ -459,11 +462,14 @@ void Connection::shadowReady(
output.setSendMax(sendMax);
}
-void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) {
+void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) {
broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
- broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID
- uint32_t index = v.first.second; // Index
- v.second->setDtxBuffer((*record)[index]);
+ broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid);
+ broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index];
+ if (bufRef.suspended)
+ bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer;
+ else
+ bufRef.semanticState->setDtxBuffer(buffer);
}
// Marks the end of the update.
@@ -694,11 +700,12 @@ void Connection::dtxAck() {
dtxAckRecords.clear();
}
-void Connection::dtxBufferRef(const std::string& xid, uint32_t index) {
- // Save the association between DtxBuffer and session so we can
- // set the DtxBuffer on the session at the end of the update
- // when the DtxManager has been replicated.
- updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState();
+void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) {
+ // Save the association between DtxBuffers and the session so we
+ // can set the DtxBuffers at the end of the update when the
+ // DtxManager has been replicated.
+ updateIn.dtxBuffers.push_back(
+ UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState()));
}
// Sent at end of work record.
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 5133e4641e..fe66b77238 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -124,7 +124,8 @@ class Connection :
const framing::SequenceNumber& expected,
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted,
- const SequenceSet& receivedIncomplete);
+ const SequenceSet& receivedIncomplete,
+ bool dtxSelected);
void outputTask(uint16_t channel, const std::string& name);
@@ -173,7 +174,7 @@ class Connection :
bool expired);
void dtxEnd();
void dtxAck();
- void dtxBufferRef(const std::string& xid, uint32_t index);
+ void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended);
void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout);
// Encoded exchange replication.
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index a5662bb2b3..3142b44d71 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -506,7 +506,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
std::max(received, ss->receiverGetExpected().command),
received,
ss->receiverGetUnknownComplete(),
- ss->receiverGetIncomplete()
+ ss->receiverGetIncomplete(),
+ ss->getSemanticState().getDtxSelected()
);
// Send frames for partial message in progress.
@@ -552,7 +553,6 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr,
client::AsyncSession& updateSession)
{
if (!dr.isEnded() && dr.isAcquired()) {
- // FIXME aconway 2011-08-19: should this be assert or if?
assert(dr.getMessage().payload);
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
@@ -621,22 +621,34 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
ClusterConnectionProxy proxy;
};
+void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended)
+{
+ ClusterConnectionProxy proxy(shadowSession);
+ broker::DtxWorkRecord* record =
+ updaterBroker.getDtxManager().getWork(dtx->getXid());
+ proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended);
+
+}
+
void UpdateClient::updateTransactionState(broker::SemanticState& s) {
- broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
- broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
+ broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
+ broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
if (dtx) {
- broker::DtxWorkRecord* record =
- updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found
- proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx));
+ updateBufferRef(dtx, false); // Current transaction.
} else if (tx) {
- ClusterConnectionProxy proxy(shadowSession);
proxy.txStart();
TxOpUpdater updater(*this, shadowSession, expiry);
tx->accept(updater);
proxy.txEnd();
}
+ for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin();
+ i != s.getSuspendedXids().end();
+ ++i)
+ {
+ updateBufferRef(i->second, true);
+ }
}
void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 83d4cfac81..481ee357c7 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -100,6 +100,7 @@ class UpdateClient : public sys::Runnable {
void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding);
void updateConnection(const boost::intrusive_ptr<Connection>& connection);
void updateSession(broker::SessionHandler& s);
+ void updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx, bool suspended);
void updateTransactionState(broker::SemanticState& s);
void updateOutputTask(const sys::OutputTask* task);
void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
diff --git a/qpid/cpp/src/qpid/cluster/UpdateReceiver.h b/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
index 512e59e5a1..81ee3a5ffe 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
@@ -40,11 +40,18 @@ class UpdateReceiver {
/** Management-id for the next shadow connection */
std::string nextShadowMgmtId;
- /** Relationship between DtxBuffers, identified by xid, index in DtxManager,
- * and sessions represented by their SemanticState.
+ /** Record the position of a DtxBuffer in the DtxManager (xid + index)
+ * and the association with a session, either suspended or current.
*/
- typedef std::pair<std::string, uint32_t> DtxBufferRef;
- typedef std::map<DtxBufferRef, broker::SemanticState* > DtxBuffers;
+ struct DtxBufferRef {
+ std::string xid;
+ uint32_t index; // Index in WorkRecord in DtxManager
+ bool suspended; // Is this a suspended or current transaction?
+ broker::SemanticState* semanticState; // Associated session
+ DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState* ss)
+ : xid(x), index(i), suspended(s), semanticState(ss) {}
+ };
+ typedef std::vector<DtxBufferRef> DtxBuffers;
DtxBuffers dtxBuffers;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 7888f44c30..705857ee46 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -420,6 +420,9 @@ class Cluster:
self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
return self._brokers[-1]
+ def ready(self):
+ for b in self: b.ready()
+
def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
@@ -495,6 +498,7 @@ class BrokerTest(TestCase):
def browse(self, session, queue, timeout=0):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
+ r.capacity = 100
try:
contents = []
try:
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index e67a691283..db08f118da 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -733,11 +733,11 @@ class DtxTestFixture:
self.session.dtx_select()
self.consumer = None
- def start(self):
- self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid).status)
+ def start(self, resume=False):
+ self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid, resume=resume).status)
- def end(self):
- self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid).status)
+ def end(self, suspend=False):
+ self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid, suspend=suspend).status)
def prepare(self):
self.test.assertEqual(XA_OK, self.session.dtx_prepare(xid=self.xid).status)
@@ -767,10 +767,9 @@ class DtxTestFixture:
return msg
- def verify(self, cluster, messages):
- for b in cluster:
- self.test.assert_browse(b.connect().session(), self.name, messages)
-
+ def verify(self, sessions, messages):
+ for s in sessions:
+ self.test.assert_browse(s, self.name, messages)
class DtxTests(BrokerTest):
@@ -783,12 +782,13 @@ class DtxTests(BrokerTest):
# multiple brokers per test.
cluster=self.cluster(1)
+ sessions = [cluster[0].connect().session()] # For verify
# Transaction that will be open when new member joins, then committed.
t1 = DtxTestFixture(self, cluster[0], "t1")
t1.start()
t1.send(["1", "2"])
- t1.verify(cluster, []) # Not visible outside of transaction
+ t1.verify(sessions, []) # Not visible outside of transaction
# Transaction that will be open when new member joins, then rolled back.
t2 = DtxTestFixture(self, cluster[0], "t2")
@@ -801,7 +801,7 @@ class DtxTests(BrokerTest):
t3.send(["1", "2"])
t3.end()
t3.prepare()
- t1.verify(cluster, []) # Not visible outside of transaction
+ t1.verify(sessions, []) # Not visible outside of transaction
# Transaction that will be prepared when new member joins, then rolled back.
t4 = DtxTestFixture(self, cluster[0], "t4")
@@ -819,70 +819,99 @@ class DtxTests(BrokerTest):
t6 = DtxTestFixture(self, cluster[0], "t6")
t6.send(["a","b","c"])
t6.start()
- t6.verify(cluster, ["a","b","c"])
self.assertEqual(t6.accept().body, "a");
- t6.verify(cluster, ["b","c"])
# Accept messages in a transaction before/after join then roll back
t7 = DtxTestFixture(self, cluster[0], "t7")
t7.send(["a","b","c"])
t7.start()
- t7.verify(cluster, ["a","b","c"])
self.assertEqual(t7.accept().body, "a");
- t7.verify(cluster, ["b","c"])
+
+ # Suspended transaction across join.
+ t8 = DtxTestFixture(self, cluster[0], "t8")
+ t8.start()
+ t8.send(["x"])
+ t8.end(suspend=True)
# Start new member
cluster.start()
+ sessions.append(cluster[1].connect().session())
# Commit t1
t1.send(["3","4"])
- t1.verify(cluster, [])
+ t1.verify(sessions, [])
t1.end()
t1.commit(one_phase=True)
- t1.verify(cluster, ["1","2","3","4"])
+ t1.verify(sessions, ["1","2","3","4"])
# Rollback t2
t2.send(["3","4"])
- t2.verify(cluster, [])
t2.end()
t2.rollback()
- t2.verify(cluster, [])
+ t2.verify(sessions, [])
# Commit t3
- t3.verify(cluster, [])
t3.commit(one_phase=False)
- t3.verify(cluster, ["1","2"])
+ t3.verify(sessions, ["1","2"])
# Rollback t4
- t4.verify(cluster, [])
t4.rollback()
- t4.verify(cluster, [])
+ t4.verify(sessions, [])
# Commit t5
t5.send(["3","4"])
- t5.verify(cluster, [])
+ t5.verify(sessions, [])
t5.end()
t5.commit(one_phase=True)
- t5.verify(cluster, ["1","2","3","4"])
+ t5.verify(sessions, ["1","2","3","4"])
- # Commit t7
- t6.verify(cluster, ["b", "c"])
+ # Commit t6
self.assertEqual(t6.accept().body, "b");
- t6.verify(cluster, ["c"])
+ t6.verify(sessions, ["c"])
t6.end()
t6.commit(one_phase=True)
- t6.verify(cluster, ["c"])
t6.session.close() # Make sure they're not requeued by the session.
- t6.verify(cluster, ["c"])
+ t6.verify(sessions, ["c"])
# Rollback t7
- t7.verify(cluster, ["b", "c"])
self.assertEqual(t7.accept().body, "b");
- t7.verify(cluster, ["c"])
t7.end()
t7.rollback()
- t7.verify(cluster, ["a", "b", "c"])
-
+ t7.verify(sessions, ["a", "b", "c"])
+
+ # Resume t8
+ t8.start(resume=True)
+ t8.send(["y"])
+ t8.end()
+ t8.commit(one_phase=True)
+ t8.verify(sessions, ["x","y"])
+
+ def test_dtx_failover_rollback(self):
+ """Kill a broker during a transaction, verify we roll back correctly"""
+ cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ cluster.start(expect=EXPECT_RUNNING)
+
+ # Test unprepared at crash
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.send(["a"]) # Not in transaction
+ t1.start()
+ t1.send(["b"]) # In transaction
+
+ # Test prepared at crash
+ t2 = DtxTestFixture(self, cluster[0], "t2")
+ t2.send(["a"]) # Not in transaction
+ t2.start()
+ t2.send(["b"]) # In transaction
+ t2.end()
+ t2.prepare()
+
+ # Crash the broker
+ cluster[0].kill()
+
+ # Transactional changes should not appear
+ s = cluster[1].connect().session();
+ self.assert_browse(s, "t1", ["a"])
+ self.assert_browse(s, "t2", ["a"])
class TxTests(BrokerTest):
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index f49f8dbfff..899625f5ec 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -227,6 +227,7 @@
<control name="dtx-buffer-ref" code="0x1D">
<field name="xid" type="str16"/>
<field name="index" type="uint32"/>
+ <field name="suspended" type="bit"/>
</control>
<control name="dtx-work-record" code="0x1E">
@@ -246,6 +247,7 @@
<field name="received" type="sequence-no"/> <!-- Received up to here (>= expected) -->
<field name="unknown-completed" type="sequence-set"/> <!-- Completed but not known to peer. -->
<field name="received-incomplete" type="sequence-set"/> <!-- Received and incomplete -->
+ <field name="dtx-selected" type="bit"/>
</control>
<!-- Complete a shadow connection update. -->