diff options
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 2 | ||||
-rw-r--r-- | python/tests_0-10/dtx.py | 123 |
4 files changed, 123 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 47f389a57d..942dbdcbc6 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -93,7 +93,7 @@ DtxWorkRecord* DtxManager::getWork(const std::string& xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid)); + throw NotFoundException(QPID_MSG("Unrecognised xid " << xid)); } return ptr_map_ptr(i); } @@ -103,7 +103,7 @@ void DtxManager::remove(const std::string& xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid)); + throw NotFoundException(QPID_MSG("Unrecognised xid " << xid)); } else { work.erase(i); } @@ -114,7 +114,7 @@ DtxWorkRecord* DtxManager::createWork(std::string xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i != work.end()) { - throw CommandInvalidException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)")); + throw NotAllowedException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)")); } else { return ptr_map_ptr(work.insert(xid, new DtxWorkRecord(xid, store)).first); } diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index fe9e42ca32..2f3febed1e 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -73,7 +73,7 @@ bool DtxWorkRecord::commit(bool onePhase) if (prepared) { //already prepared i.e. 2pc if (onePhase) { - throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!")); + throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!")); } store->commit(*txn); @@ -84,7 +84,7 @@ bool DtxWorkRecord::commit(bool onePhase) } else { //1pc commit optimisation, don't need a 2pc transaction context: if (!onePhase) { - throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!")); + throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!")); } std::auto_ptr<TransactionContext> localtxn = store->begin(); if (prepare(localtxn.get())) { @@ -133,7 +133,7 @@ bool DtxWorkRecord::check() //iterate through all DtxBuffers and ensure they are all ended for (Work::iterator i = work.begin(); i != work.end(); i++) { if (!(*i)->isEnded()) { - throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " not completed!")); + throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " not completed!")); } else if ((*i)->isRollbackOnly()) { rolledback = true; } diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 2d0edde27b..1830b2b94c 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -575,7 +575,7 @@ DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover() void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid) { //Currently no heuristic completion is supported, so this should never be used. - throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); + throw NotImplementedException(QPID_MSG("Forget not implemented. Branch with xid " << xid << " not heuristically completed!")); } DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid) diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index d676b5806c..cea742b81c 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -199,7 +199,7 @@ class DtxTests(TestBase010): session1.dtx_rollback(xid=tx) #verification: - if failed: self.assertEquals(503, error.args[0].error_code) + if failed: self.assertEquals(530, error.args[0].error_code) else: self.fail("Xid already known, expected exception!") def test_forget_xid_on_completion(self): @@ -439,7 +439,7 @@ class DtxTests(TestBase010): if failed: self.session.dtx_rollback(xid=tx) - self.assertEquals(503, error.args[0].error_code) + self.assertEquals(409, error.args[0].error_code) else: tester.close() other.close() @@ -450,10 +450,6 @@ class DtxTests(TestBase010): Test that a commit with one_phase = False is rejected if the transaction in question has not yet been prepared. """ - """ - Test that a commit with one_phase = True is rejected if the - transaction in question has already been prepared. - """ other = self.connect() tester = other.session("tester", 1) tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) @@ -471,12 +467,97 @@ class DtxTests(TestBase010): if failed: self.session.dtx_rollback(xid=tx) - self.assertEquals(503, error.args[0].error_code) + self.assertEquals(409, error.args[0].error_code) else: tester.close() other.close() self.fail("Invalid use of one_phase=False, expected exception!") + def test_invalid_commit_not_ended(self): + """ + Test that a commit fails if the xid is still associated with a session. + """ + other = self.connect() + tester = other.session("tester", 1) + self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + self.session.dtx_select() + tx = self.xid("dummy") + self.session.dtx_start(xid=tx) + self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + + failed = False + try: + tester.dtx_commit(xid=tx, one_phase=False) + except SessionException, e: + failed = True + error = e + + if failed: + self.session.dtx_end(xid=tx) + self.session.dtx_rollback(xid=tx) + self.assertEquals(409, error.args[0].error_code) + else: + tester.close() + other.close() + self.fail("Commit should fail as xid is still associated!") + + def test_invalid_rollback_not_ended(self): + """ + Test that a rollback fails if the xid is still associated with a session. + """ + other = self.connect() + tester = other.session("tester", 1) + self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + self.session.dtx_select() + tx = self.xid("dummy") + self.session.dtx_start(xid=tx) + self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + + failed = False + try: + tester.dtx_rollback(xid=tx) + except SessionException, e: + failed = True + error = e + + if failed: + self.session.dtx_end(xid=tx) + self.session.dtx_rollback(xid=tx) + self.assertEquals(409, error.args[0].error_code) + else: + tester.close() + other.close() + self.fail("Rollback should fail as xid is still associated!") + + + def test_invalid_prepare_not_ended(self): + """ + Test that a prepare fails if the xid is still associated with a session. + """ + other = self.connect() + tester = other.session("tester", 1) + self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + self.session.dtx_select() + tx = self.xid("dummy") + self.session.dtx_start(xid=tx) + self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + + failed = False + try: + tester.dtx_prepare(xid=tx) + except SessionException, e: + failed = True + error = e + + if failed: + self.session.dtx_end(xid=tx) + self.session.dtx_rollback(xid=tx) + self.assertEquals(409, error.args[0].error_code) + else: + tester.close() + other.close() + self.fail("Rollback should fail as xid is still associated!") + def test_implicit_end(self): """ Test that an association is implicitly ended when the session @@ -601,6 +682,34 @@ class DtxTests(TestBase010): except SessionException, e: self.assertEquals(503, e.args[0].error_code) + def test_prepare_unknown(self): + session = self.session + try: + session.dtx_prepare(xid=self.xid("unknown")) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def test_commit_unknown(self): + session = self.session + try: + session.dtx_commit(xid=self.xid("unknown")) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def test_rollback_unknown(self): + session = self.session + try: + session.dtx_rollback(xid=self.xid("unknown")) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def test_get_timeout_unknown(self): + session = self.session + try: + session.dtx_get_timeout(xid=self.xid("unknown")) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + def xid(self, txid): DtxTests.tx_counter += 1 branchqual = "v%s" % DtxTests.tx_counter |