summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Moravec <pmoravec@apache.org>2014-02-05 09:58:15 +0000
committerPavel Moravec <pmoravec@apache.org>2014-02-05 09:58:15 +0000
commitcf3259bf2b915af138c746cb1b1ccfb743670dc5 (patch)
tree8e683a55537eccc6e7062abf2d2c2146087ad073
parent393032c64fc33199bb70c3352bd3280ce02410d9 (diff)
downloadqpid-python-cf3259bf2b915af138c746cb1b1ccfb743670dc5.tar.gz
QPID-5531: [C++ broker] Set timeout for every DTX transaction
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1564694 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h2
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py20
6 files changed, 41 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 4017fdbfe3..7f076e92cd 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -148,6 +148,8 @@ Broker::Options::Options(const std::string& name) :
timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
linkMaintenanceInterval(2*sys::TIME_SEC),
linkHeartbeatInterval(120*sys::TIME_SEC),
+ dtxDefaultTimeout(60), // 60s
+ dtxMaxTimeout(3600), // 3600s
maxNegotiateTime(10000) // 10s
{
int c = sys::SystemInfo::concurrency();
@@ -192,6 +194,8 @@ Broker::Options::Options(const std::string& name) :
"Interval to check link health and re-connect if need be")
("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"),
"Heartbeat interval for a federation link")
+ ("dtx-default-timeout", optValue(dtxDefaultTimeout, "SECONDS"), "Default timeout for DTX transaction before aborting it")
+ ("dtx-max-timeout", optValue(dtxMaxTimeout, "SECONDS"), "Maximum allowed timeout for DTX transaction. A value of zero disables maximum timeout limit checks and allows arbitrarily large timeout settings.")
("max-negotiate-time", optValue(maxNegotiateTime, "MILLISECONDS"), "Maximum time a connection can take to send the initial protocol negotiation")
("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag")
;
@@ -224,7 +228,7 @@ Broker::Broker(const Broker::Options& conf) :
exchanges(this),
links(this),
factory(new SecureConnectionFactory(*this)),
- dtxManager(*timer.get()),
+ dtxManager(*timer.get(), getOptions().dtxDefaultTimeout),
sessionManager(
qpid::SessionState::Configuration(
conf.replayFlushLimit*1024, // convert kb to bytes.
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 4bad8f2960..5d1e241be9 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -121,6 +121,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
bool timestampRcvMsgs;
sys::Duration linkMaintenanceInterval;
sys::Duration linkHeartbeatInterval;
+ uint32_t dtxDefaultTimeout; // Default timeout of a DTX transaction
+ uint32_t dtxMaxTimeout; // Maximal timeout of a DTX transaction
uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation
std::string fedTag;
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp
index 5ba1ce4dac..4fb82bb41b 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp
@@ -62,7 +62,12 @@ namespace {
}
-DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t, uint32_t _dtxDefaultTimeout) :
+ store(0),
+ timer(&t),
+ dtxDefaultTimeout(_dtxDefaultTimeout)
+{
+}
DtxManager::~DtxManager() {}
@@ -150,8 +155,12 @@ DtxWorkRecord* DtxManager::createWork(const std::string& xid)
if (i != work.end()) {
throw NotAllowedException(QPID_MSG("Xid " << convert(xid) << " is already known (use 'join' to add work to an existing xid)"));
} else {
- std::string ncxid = xid; // Work around const correctness problems in ptr_map.
- return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
+ std::string ncxid = xid; // Work around const correctness problems with work.insert
+ DtxWorkRecord* dtxWorkRecord = new DtxWorkRecord(xid, store);
+ work.insert(ncxid, dtxWorkRecord);
+ if (dtxDefaultTimeout>0)
+ setTimeout(xid, dtxDefaultTimeout);
+ return dtxWorkRecord;
}
}
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h
index 000fc7b4b8..ad30ed61a0 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.h
+++ b/qpid/cpp/src/qpid/broker/DtxManager.h
@@ -44,12 +44,13 @@ class DtxManager{
TransactionalStore* store;
qpid::sys::Mutex lock;
qpid::sys::Timer* timer;
+ uint32_t dtxDefaultTimeout;
void remove(const std::string& xid);
DtxWorkRecord* createWork(const std::string& xid);
public:
- DtxManager(sys::Timer&);
+ DtxManager(sys::Timer&, uint32_t _dtxDefaultTimeout=0);
~DtxManager();
void start(const std::string& xid, boost::intrusive_ptr<DtxBuffer> work);
void join(const std::string& xid, boost::intrusive_ptr<DtxBuffer> work);
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 5b1a6aa267..7c2d1cf9f5 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -678,6 +678,8 @@ DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid,
uint32_t timeout)
{
+ if ((timeout > getBroker().getOptions().dtxMaxTimeout) && (getBroker().getOptions().dtxMaxTimeout > 0))
+ throw InvalidArgumentException(QPID_MSG("xid " << xid << " has timeout " << timeout << " bigger than maximum allowed " << getBroker().getOptions().dtxMaxTimeout));
getBroker().getDtxManager().setTimeout(DtxManager::convert(xid), timeout);
}
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py b/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py
index f8d6533a78..a9619bcdb8 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py
@@ -594,9 +594,10 @@ class DtxTests(TestBase010):
session.dtx_select()
session.dtx_start(xid=tx)
- self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout)
- session.dtx_set_timeout(xid=tx, timeout=60)
+ # below test checks for default value of dtx-default-timeout broker option
self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout)
+ session.dtx_set_timeout(xid=tx, timeout=200)
+ self.assertEqual(200, session.dtx_get_timeout(xid=tx).timeout)
self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status)
self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
@@ -628,6 +629,21 @@ class DtxTests(TestBase010):
self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status)
self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)
+ def test_set_timeout_too_high(self):
+ """
+ Test the timeout can't be more than --dtx-max-timeout
+ broker option
+ """
+ session = self.session
+ tx = self.xid("dummy")
+
+ session.dtx_select()
+ session.dtx_start(xid=tx)
+ try:
+ session.dtx_set_timeout(xid=tx, timeout=3601)
+ except SessionException, e:
+ self.assertEquals(542, e.args[0].error_code)
+
def test_recover(self):