summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp6
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.cpp6
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--python/tests_0-10/dtx.py123
4 files changed, 123 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index 47f389a57d..942dbdcbc6 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -93,7 +93,7 @@ DtxWorkRecord* DtxManager::getWork(const std::string& xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
- throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid));
+ throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
}
return ptr_map_ptr(i);
}
@@ -103,7 +103,7 @@ void DtxManager::remove(const std::string& xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
- throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid));
+ throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
} else {
work.erase(i);
}
@@ -114,7 +114,7 @@ DtxWorkRecord* DtxManager::createWork(std::string xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i != work.end()) {
- throw CommandInvalidException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
+ throw NotAllowedException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
} else {
return ptr_map_ptr(work.insert(xid, new DtxWorkRecord(xid, store)).first);
}
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp
index fe9e42ca32..2f3febed1e 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -73,7 +73,7 @@ bool DtxWorkRecord::commit(bool onePhase)
if (prepared) {
//already prepared i.e. 2pc
if (onePhase) {
- throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!"));
+ throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!"));
}
store->commit(*txn);
@@ -84,7 +84,7 @@ bool DtxWorkRecord::commit(bool onePhase)
} else {
//1pc commit optimisation, don't need a 2pc transaction context:
if (!onePhase) {
- throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
+ throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
}
std::auto_ptr<TransactionContext> localtxn = store->begin();
if (prepare(localtxn.get())) {
@@ -133,7 +133,7 @@ bool DtxWorkRecord::check()
//iterate through all DtxBuffers and ensure they are all ended
for (Work::iterator i = work.begin(); i != work.end(); i++) {
if (!(*i)->isEnded()) {
- throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " not completed!"));
+ throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " not completed!"));
} else if ((*i)->isRollbackOnly()) {
rolledback = true;
}
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 2d0edde27b..1830b2b94c 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -575,7 +575,7 @@ DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover()
void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid)
{
//Currently no heuristic completion is supported, so this should never be used.
- throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
+ throw NotImplementedException(QPID_MSG("Forget not implemented. Branch with xid " << xid << " not heuristically completed!"));
}
DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py
index d676b5806c..cea742b81c 100644
--- a/python/tests_0-10/dtx.py
+++ b/python/tests_0-10/dtx.py
@@ -199,7 +199,7 @@ class DtxTests(TestBase010):
session1.dtx_rollback(xid=tx)
#verification:
- if failed: self.assertEquals(503, error.args[0].error_code)
+ if failed: self.assertEquals(530, error.args[0].error_code)
else: self.fail("Xid already known, expected exception!")
def test_forget_xid_on_completion(self):
@@ -439,7 +439,7 @@ class DtxTests(TestBase010):
if failed:
self.session.dtx_rollback(xid=tx)
- self.assertEquals(503, error.args[0].error_code)
+ self.assertEquals(409, error.args[0].error_code)
else:
tester.close()
other.close()
@@ -450,10 +450,6 @@ class DtxTests(TestBase010):
Test that a commit with one_phase = False is rejected if the
transaction in question has not yet been prepared.
"""
- """
- Test that a commit with one_phase = True is rejected if the
- transaction in question has already been prepared.
- """
other = self.connect()
tester = other.session("tester", 1)
tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
@@ -471,12 +467,97 @@ class DtxTests(TestBase010):
if failed:
self.session.dtx_rollback(xid=tx)
- self.assertEquals(503, error.args[0].error_code)
+ self.assertEquals(409, error.args[0].error_code)
else:
tester.close()
other.close()
self.fail("Invalid use of one_phase=False, expected exception!")
+ def test_invalid_commit_not_ended(self):
+ """
+ Test that a commit fails if the xid is still associated with a session.
+ """
+ other = self.connect()
+ tester = other.session("tester", 1)
+ self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ self.session.dtx_select()
+ tx = self.xid("dummy")
+ self.session.dtx_start(xid=tx)
+ self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+
+ failed = False
+ try:
+ tester.dtx_commit(xid=tx, one_phase=False)
+ except SessionException, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.session.dtx_end(xid=tx)
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(409, error.args[0].error_code)
+ else:
+ tester.close()
+ other.close()
+ self.fail("Commit should fail as xid is still associated!")
+
+ def test_invalid_rollback_not_ended(self):
+ """
+ Test that a rollback fails if the xid is still associated with a session.
+ """
+ other = self.connect()
+ tester = other.session("tester", 1)
+ self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ self.session.dtx_select()
+ tx = self.xid("dummy")
+ self.session.dtx_start(xid=tx)
+ self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+
+ failed = False
+ try:
+ tester.dtx_rollback(xid=tx)
+ except SessionException, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.session.dtx_end(xid=tx)
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(409, error.args[0].error_code)
+ else:
+ tester.close()
+ other.close()
+ self.fail("Rollback should fail as xid is still associated!")
+
+
+ def test_invalid_prepare_not_ended(self):
+ """
+ Test that a prepare fails if the xid is still associated with a session.
+ """
+ other = self.connect()
+ tester = other.session("tester", 1)
+ self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ self.session.dtx_select()
+ tx = self.xid("dummy")
+ self.session.dtx_start(xid=tx)
+ self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+
+ failed = False
+ try:
+ tester.dtx_prepare(xid=tx)
+ except SessionException, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.session.dtx_end(xid=tx)
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(409, error.args[0].error_code)
+ else:
+ tester.close()
+ other.close()
+ self.fail("Rollback should fail as xid is still associated!")
+
def test_implicit_end(self):
"""
Test that an association is implicitly ended when the session
@@ -601,6 +682,34 @@ class DtxTests(TestBase010):
except SessionException, e:
self.assertEquals(503, e.args[0].error_code)
+ def test_prepare_unknown(self):
+ session = self.session
+ try:
+ session.dtx_prepare(xid=self.xid("unknown"))
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_commit_unknown(self):
+ session = self.session
+ try:
+ session.dtx_commit(xid=self.xid("unknown"))
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_rollback_unknown(self):
+ session = self.session
+ try:
+ session.dtx_rollback(xid=self.xid("unknown"))
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_get_timeout_unknown(self):
+ session = self.session
+ try:
+ session.dtx_get_timeout(xid=self.xid("unknown"))
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
def xid(self, txid):
DtxTests.tx_counter += 1
branchqual = "v%s" % DtxTests.tx_counter