From 539cf5c3713be45fbba15a1ca5932f86c4c6fdbe Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 30 Aug 2011 19:35:36 +0000 Subject: QPID-3384: DTX transactions - replicate suspended transactions. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1163347 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/SemanticState.h | 6 +- qpid/cpp/src/qpid/cluster/Connection.cpp | 29 +++++---- qpid/cpp/src/qpid/cluster/Connection.h | 5 +- qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 28 ++++++--- qpid/cpp/src/qpid/cluster/UpdateClient.h | 1 + qpid/cpp/src/qpid/cluster/UpdateReceiver.h | 15 +++-- qpid/cpp/src/tests/brokertest.py | 4 ++ qpid/cpp/src/tests/cluster_tests.py | 95 +++++++++++++++++++----------- qpid/cpp/xml/cluster.xml | 2 + 9 files changed, 126 insertions(+), 59 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 8de884c113..22bc272c50 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -148,9 +148,10 @@ class SemanticState : private boost::noncopyable { management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); }; + typedef std::map DtxBufferMap; + private: typedef std::map ConsumerImplMap; - typedef std::map DtxBufferMap; SessionContext& session; DeliveryAdapter& deliveryAdapter; @@ -181,6 +182,7 @@ class SemanticState : private boost::noncopyable { void disable(ConsumerImpl::shared_ptr); public: + SemanticState(DeliveryAdapter&, SessionContext&); ~SemanticState(); @@ -218,6 +220,7 @@ class SemanticState : private boost::noncopyable { void commit(MessageStore* const store); void rollback(); void selectDtx(); + bool getDtxSelected() const { return dtxSelected; } void startDtx(const std::string& xid, DtxManager& mgr, bool join); void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); @@ -249,6 +252,7 @@ class SemanticState : private boost::noncopyable { void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; } void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } void record(const DeliveryRecord& delivery); + DtxBufferMap& getSuspendedXids() { return suspendedXids; } }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 0691aae711..e0ce8cf9e0 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -419,7 +419,8 @@ void Connection::sessionState( const SequenceNumber& expected, const SequenceNumber& received, const SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete) + const SequenceSet& receivedIncomplete, + bool dtxSelected) { sessionState().setState( replayStart, @@ -429,7 +430,9 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); + if (dtxSelected) semanticState().selectDtx(); + QPID_LOG(debug, cluster << " received session state update for " + << sessionState().getId()); // The output tasks will be added later in the update process. connection->getOutputTasks().removeAll(); } @@ -459,11 +462,14 @@ void Connection::shadowReady( output.setSendMax(sendMax); } -void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) { +void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) { broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); - broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID - uint32_t index = v.first.second; // Index - v.second->setDtxBuffer((*record)[index]); + broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid); + broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index]; + if (bufRef.suspended) + bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer; + else + bufRef.semanticState->setDtxBuffer(buffer); } // Marks the end of the update. @@ -694,11 +700,12 @@ void Connection::dtxAck() { dtxAckRecords.clear(); } -void Connection::dtxBufferRef(const std::string& xid, uint32_t index) { - // Save the association between DtxBuffer and session so we can - // set the DtxBuffer on the session at the end of the update - // when the DtxManager has been replicated. - updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState(); +void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) { + // Save the association between DtxBuffers and the session so we + // can set the DtxBuffers at the end of the update when the + // DtxManager has been replicated. + updateIn.dtxBuffers.push_back( + UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState())); } // Sent at end of work record. diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 5133e4641e..fe66b77238 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -124,7 +124,8 @@ class Connection : const framing::SequenceNumber& expected, const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete); + const SequenceSet& receivedIncomplete, + bool dtxSelected); void outputTask(uint16_t channel, const std::string& name); @@ -173,7 +174,7 @@ class Connection : bool expired); void dtxEnd(); void dtxAck(); - void dtxBufferRef(const std::string& xid, uint32_t index); + void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended); void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout); // Encoded exchange replication. diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index a5662bb2b3..3142b44d71 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -506,7 +506,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { std::max(received, ss->receiverGetExpected().command), received, ss->receiverGetUnknownComplete(), - ss->receiverGetIncomplete() + ss->receiverGetIncomplete(), + ss->getSemanticState().getDtxSelected() ); // Send frames for partial message in progress. @@ -552,7 +553,6 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr, client::AsyncSession& updateSession) { if (!dr.isEnded() && dr.isAcquired()) { - // FIXME aconway 2011-08-19: should this be assert or if? assert(dr.getMessage().payload); // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. @@ -621,22 +621,34 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { ClusterConnectionProxy proxy; }; +void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended) +{ + ClusterConnectionProxy proxy(shadowSession); + broker::DtxWorkRecord* record = + updaterBroker.getDtxManager().getWork(dtx->getXid()); + proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended); + +} + void UpdateClient::updateTransactionState(broker::SemanticState& s) { - broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); - broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); + broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); + broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); if (dtx) { - broker::DtxWorkRecord* record = - updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found - proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx)); + updateBufferRef(dtx, false); // Current transaction. } else if (tx) { - ClusterConnectionProxy proxy(shadowSession); proxy.txStart(); TxOpUpdater updater(*this, shadowSession, expiry); tx->accept(updater); proxy.txEnd(); } + for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin(); + i != s.getSuspendedXids().end(); + ++i) + { + updateBufferRef(i->second, true); + } } void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) { diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index 83d4cfac81..481ee357c7 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -100,6 +100,7 @@ class UpdateClient : public sys::Runnable { void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding); void updateConnection(const boost::intrusive_ptr& connection); void updateSession(broker::SessionHandler& s); + void updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx, bool suspended); void updateTransactionState(broker::SemanticState& s); void updateOutputTask(const sys::OutputTask* task); void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); diff --git a/qpid/cpp/src/qpid/cluster/UpdateReceiver.h b/qpid/cpp/src/qpid/cluster/UpdateReceiver.h index 512e59e5a1..81ee3a5ffe 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateReceiver.h +++ b/qpid/cpp/src/qpid/cluster/UpdateReceiver.h @@ -40,11 +40,18 @@ class UpdateReceiver { /** Management-id for the next shadow connection */ std::string nextShadowMgmtId; - /** Relationship between DtxBuffers, identified by xid, index in DtxManager, - * and sessions represented by their SemanticState. + /** Record the position of a DtxBuffer in the DtxManager (xid + index) + * and the association with a session, either suspended or current. */ - typedef std::pair DtxBufferRef; - typedef std::map DtxBuffers; + struct DtxBufferRef { + std::string xid; + uint32_t index; // Index in WorkRecord in DtxManager + bool suspended; // Is this a suspended or current transaction? + broker::SemanticState* semanticState; // Associated session + DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState* ss) + : xid(x), index(i), suspended(s), semanticState(ss) {} + }; + typedef std::vector DtxBuffers; DtxBuffers dtxBuffers; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 7888f44c30..705857ee46 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -420,6 +420,9 @@ class Cluster: self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd)) return self._brokers[-1] + def ready(self): + for b in self: b.ready() + def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False): for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd) @@ -495,6 +498,7 @@ class BrokerTest(TestCase): def browse(self, session, queue, timeout=0): """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) + r.capacity = 100 try: contents = [] try: diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index e67a691283..db08f118da 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -733,11 +733,11 @@ class DtxTestFixture: self.session.dtx_select() self.consumer = None - def start(self): - self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid).status) + def start(self, resume=False): + self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid, resume=resume).status) - def end(self): - self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid).status) + def end(self, suspend=False): + self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid, suspend=suspend).status) def prepare(self): self.test.assertEqual(XA_OK, self.session.dtx_prepare(xid=self.xid).status) @@ -767,10 +767,9 @@ class DtxTestFixture: return msg - def verify(self, cluster, messages): - for b in cluster: - self.test.assert_browse(b.connect().session(), self.name, messages) - + def verify(self, sessions, messages): + for s in sessions: + self.test.assert_browse(s, self.name, messages) class DtxTests(BrokerTest): @@ -783,12 +782,13 @@ class DtxTests(BrokerTest): # multiple brokers per test. cluster=self.cluster(1) + sessions = [cluster[0].connect().session()] # For verify # Transaction that will be open when new member joins, then committed. t1 = DtxTestFixture(self, cluster[0], "t1") t1.start() t1.send(["1", "2"]) - t1.verify(cluster, []) # Not visible outside of transaction + t1.verify(sessions, []) # Not visible outside of transaction # Transaction that will be open when new member joins, then rolled back. t2 = DtxTestFixture(self, cluster[0], "t2") @@ -801,7 +801,7 @@ class DtxTests(BrokerTest): t3.send(["1", "2"]) t3.end() t3.prepare() - t1.verify(cluster, []) # Not visible outside of transaction + t1.verify(sessions, []) # Not visible outside of transaction # Transaction that will be prepared when new member joins, then rolled back. t4 = DtxTestFixture(self, cluster[0], "t4") @@ -819,70 +819,99 @@ class DtxTests(BrokerTest): t6 = DtxTestFixture(self, cluster[0], "t6") t6.send(["a","b","c"]) t6.start() - t6.verify(cluster, ["a","b","c"]) self.assertEqual(t6.accept().body, "a"); - t6.verify(cluster, ["b","c"]) # Accept messages in a transaction before/after join then roll back t7 = DtxTestFixture(self, cluster[0], "t7") t7.send(["a","b","c"]) t7.start() - t7.verify(cluster, ["a","b","c"]) self.assertEqual(t7.accept().body, "a"); - t7.verify(cluster, ["b","c"]) + + # Suspended transaction across join. + t8 = DtxTestFixture(self, cluster[0], "t8") + t8.start() + t8.send(["x"]) + t8.end(suspend=True) # Start new member cluster.start() + sessions.append(cluster[1].connect().session()) # Commit t1 t1.send(["3","4"]) - t1.verify(cluster, []) + t1.verify(sessions, []) t1.end() t1.commit(one_phase=True) - t1.verify(cluster, ["1","2","3","4"]) + t1.verify(sessions, ["1","2","3","4"]) # Rollback t2 t2.send(["3","4"]) - t2.verify(cluster, []) t2.end() t2.rollback() - t2.verify(cluster, []) + t2.verify(sessions, []) # Commit t3 - t3.verify(cluster, []) t3.commit(one_phase=False) - t3.verify(cluster, ["1","2"]) + t3.verify(sessions, ["1","2"]) # Rollback t4 - t4.verify(cluster, []) t4.rollback() - t4.verify(cluster, []) + t4.verify(sessions, []) # Commit t5 t5.send(["3","4"]) - t5.verify(cluster, []) + t5.verify(sessions, []) t5.end() t5.commit(one_phase=True) - t5.verify(cluster, ["1","2","3","4"]) + t5.verify(sessions, ["1","2","3","4"]) - # Commit t7 - t6.verify(cluster, ["b", "c"]) + # Commit t6 self.assertEqual(t6.accept().body, "b"); - t6.verify(cluster, ["c"]) + t6.verify(sessions, ["c"]) t6.end() t6.commit(one_phase=True) - t6.verify(cluster, ["c"]) t6.session.close() # Make sure they're not requeued by the session. - t6.verify(cluster, ["c"]) + t6.verify(sessions, ["c"]) # Rollback t7 - t7.verify(cluster, ["b", "c"]) self.assertEqual(t7.accept().body, "b"); - t7.verify(cluster, ["c"]) t7.end() t7.rollback() - t7.verify(cluster, ["a", "b", "c"]) - + t7.verify(sessions, ["a", "b", "c"]) + + # Resume t8 + t8.start(resume=True) + t8.send(["y"]) + t8.end() + t8.commit(one_phase=True) + t8.verify(sessions, ["x","y"]) + + def test_dtx_failover_rollback(self): + """Kill a broker during a transaction, verify we roll back correctly""" + cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL) + cluster.start(expect=EXPECT_RUNNING) + + # Test unprepared at crash + t1 = DtxTestFixture(self, cluster[0], "t1") + t1.send(["a"]) # Not in transaction + t1.start() + t1.send(["b"]) # In transaction + + # Test prepared at crash + t2 = DtxTestFixture(self, cluster[0], "t2") + t2.send(["a"]) # Not in transaction + t2.start() + t2.send(["b"]) # In transaction + t2.end() + t2.prepare() + + # Crash the broker + cluster[0].kill() + + # Transactional changes should not appear + s = cluster[1].connect().session(); + self.assert_browse(s, "t1", ["a"]) + self.assert_browse(s, "t2", ["a"]) class TxTests(BrokerTest): diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index f49f8dbfff..899625f5ec 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -227,6 +227,7 @@ + @@ -246,6 +247,7 @@ + -- cgit v1.2.1