diff options
author | Pavel Moravec <pmoravec@apache.org> | 2014-02-05 09:58:15 +0000 |
---|---|---|
committer | Pavel Moravec <pmoravec@apache.org> | 2014-02-05 09:58:15 +0000 |
commit | cf3259bf2b915af138c746cb1b1ccfb743670dc5 (patch) | |
tree | 8e683a55537eccc6e7062abf2d2c2146087ad073 | |
parent | 393032c64fc33199bb70c3352bd3280ce02410d9 (diff) | |
download | qpid-python-cf3259bf2b915af138c746cb1b1ccfb743670dc5.tar.gz |
QPID-5531: [C++ broker] Set timeout for every DTX transaction
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1564694 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 2 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py | 20 |
6 files changed, 41 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 4017fdbfe3..7f076e92cd 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -148,6 +148,8 @@ Broker::Options::Options(const std::string& name) : timestampRcvMsgs(false), // set the 0.10 timestamp delivery property linkMaintenanceInterval(2*sys::TIME_SEC), linkHeartbeatInterval(120*sys::TIME_SEC), + dtxDefaultTimeout(60), // 60s + dtxMaxTimeout(3600), // 3600s maxNegotiateTime(10000) // 10s { int c = sys::SystemInfo::concurrency(); @@ -192,6 +194,8 @@ Broker::Options::Options(const std::string& name) : "Interval to check link health and re-connect if need be") ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"), "Heartbeat interval for a federation link") + ("dtx-default-timeout", optValue(dtxDefaultTimeout, "SECONDS"), "Default timeout for DTX transaction before aborting it") + ("dtx-max-timeout", optValue(dtxMaxTimeout, "SECONDS"), "Maximum allowed timeout for DTX transaction. A value of zero disables maximum timeout limit checks and allows arbitrarily large timeout settings.") ("max-negotiate-time", optValue(maxNegotiateTime, "MILLISECONDS"), "Maximum time a connection can take to send the initial protocol negotiation") ("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag") ; @@ -224,7 +228,7 @@ Broker::Broker(const Broker::Options& conf) : exchanges(this), links(this), factory(new SecureConnectionFactory(*this)), - dtxManager(*timer.get()), + dtxManager(*timer.get(), getOptions().dtxDefaultTimeout), sessionManager( qpid::SessionState::Configuration( conf.replayFlushLimit*1024, // convert kb to bytes. diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 4bad8f2960..5d1e241be9 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -121,6 +121,8 @@ class Broker : public sys::Runnable, public Plugin::Target, bool timestampRcvMsgs; sys::Duration linkMaintenanceInterval; sys::Duration linkHeartbeatInterval; + uint32_t dtxDefaultTimeout; // Default timeout of a DTX transaction + uint32_t dtxMaxTimeout; // Maximal timeout of a DTX transaction uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation std::string fedTag; diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp index 5ba1ce4dac..4fb82bb41b 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.cpp +++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp @@ -62,7 +62,12 @@ namespace { } -DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {} +DtxManager::DtxManager(qpid::sys::Timer& t, uint32_t _dtxDefaultTimeout) : + store(0), + timer(&t), + dtxDefaultTimeout(_dtxDefaultTimeout) +{ +} DtxManager::~DtxManager() {} @@ -150,8 +155,12 @@ DtxWorkRecord* DtxManager::createWork(const std::string& xid) if (i != work.end()) { throw NotAllowedException(QPID_MSG("Xid " << convert(xid) << " is already known (use 'join' to add work to an existing xid)")); } else { - std::string ncxid = xid; // Work around const correctness problems in ptr_map. - return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first); + std::string ncxid = xid; // Work around const correctness problems with work.insert + DtxWorkRecord* dtxWorkRecord = new DtxWorkRecord(xid, store); + work.insert(ncxid, dtxWorkRecord); + if (dtxDefaultTimeout>0) + setTimeout(xid, dtxDefaultTimeout); + return dtxWorkRecord; } } diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h index 000fc7b4b8..ad30ed61a0 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.h +++ b/qpid/cpp/src/qpid/broker/DtxManager.h @@ -44,12 +44,13 @@ class DtxManager{ TransactionalStore* store; qpid::sys::Mutex lock; qpid::sys::Timer* timer; + uint32_t dtxDefaultTimeout; void remove(const std::string& xid); DtxWorkRecord* createWork(const std::string& xid); public: - DtxManager(sys::Timer&); + DtxManager(sys::Timer&, uint32_t _dtxDefaultTimeout=0); ~DtxManager(); void start(const std::string& xid, boost::intrusive_ptr<DtxBuffer> work); void join(const std::string& xid, boost::intrusive_ptr<DtxBuffer> work); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 5b1a6aa267..7c2d1cf9f5 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -678,6 +678,8 @@ DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid) void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid, uint32_t timeout) { + if ((timeout > getBroker().getOptions().dtxMaxTimeout) && (getBroker().getOptions().dtxMaxTimeout > 0)) + throw InvalidArgumentException(QPID_MSG("xid " << xid << " has timeout " << timeout << " bigger than maximum allowed " << getBroker().getOptions().dtxMaxTimeout)); getBroker().getDtxManager().setTimeout(DtxManager::convert(xid), timeout); } diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py b/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py index f8d6533a78..a9619bcdb8 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py @@ -594,9 +594,10 @@ class DtxTests(TestBase010): session.dtx_select() session.dtx_start(xid=tx) - self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout) - session.dtx_set_timeout(xid=tx, timeout=60) + # below test checks for default value of dtx-default-timeout broker option self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout) + session.dtx_set_timeout(xid=tx, timeout=200) + self.assertEqual(200, session.dtx_get_timeout(xid=tx).timeout) self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status) self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) @@ -628,6 +629,21 @@ class DtxTests(TestBase010): self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status) self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status) + def test_set_timeout_too_high(self): + """ + Test the timeout can't be more than --dtx-max-timeout + broker option + """ + session = self.session + tx = self.xid("dummy") + + session.dtx_select() + session.dtx_start(xid=tx) + try: + session.dtx_set_timeout(xid=tx, timeout=3601) + except SessionException, e: + self.assertEquals(542, e.args[0].error_code) + def test_recover(self): |