summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-06-08 15:24:12 +0000
committerGordon Sim <gsim@apache.org>2007-06-08 15:24:12 +0000
commit2373874e23fb62e502963878f5e7cd68e100021e (patch)
tree822809efbbe20a6d21aac34641f69b548eb5fec5
parent961e07e4d1ad8eebcc80d4d024f8efa3ab972e05 (diff)
downloadqpid-python-2373874e23fb62e502963878f5e7cd68e100021e.tar.gz
Timeout handling for dtx, plus tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@545531 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am4
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerChannel.cpp35
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerChannel.h2
-rw-r--r--qpid/cpp/src/qpid/broker/DtxBuffer.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/DtxBuffer.h3
-rw-r--r--qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp85
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.cpp82
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.h23
-rw-r--r--qpid/cpp/src/qpid/broker/DtxTimeout.cpp35
-rw-r--r--qpid/cpp/src/qpid/broker/DtxTimeout.h54
-rw-r--r--qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp30
-rw-r--r--qpid/cpp/src/qpid/broker/DtxWorkRecord.h6
-rw-r--r--qpid/cpp/src/qpid/broker/Timer.cpp100
-rw-r--r--qpid/cpp/src/qpid/broker/Timer.h76
-rw-r--r--qpid/cpp/src/qpid/sys/Time.h6
-rw-r--r--qpid/cpp/src/tests/Makefile.am1
-rw-r--r--qpid/cpp/src/tests/TimerTest.cpp128
-rw-r--r--qpid/python/tests_0-9/dtx.py47
18 files changed, 672 insertions, 60 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 02044be01b..75cc670015 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -189,6 +189,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/DtxBuffer.cpp \
qpid/broker/DtxHandlerImpl.cpp \
qpid/broker/DtxManager.cpp \
+ qpid/broker/DtxTimeout.cpp \
qpid/broker/DtxWorkRecord.cpp \
qpid/broker/ExchangeRegistry.cpp \
qpid/broker/FanOutExchange.cpp \
@@ -206,6 +207,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/RecoveredEnqueue.cpp \
qpid/broker/RecoveredDequeue.cpp \
qpid/broker/Reference.cpp \
+ qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
qpid/broker/TxAck.cpp \
qpid/broker/TxBuffer.cpp \
@@ -243,6 +245,7 @@ nobase_include_HEADERS = \
qpid/broker/DtxBuffer.h \
qpid/broker/DtxHandlerImpl.h \
qpid/broker/DtxManager.h \
+ qpid/broker/DtxTimeout.h \
qpid/broker/DtxWorkRecord.h \
qpid/broker/ExchangeRegistry.h \
qpid/broker/FanOutExchange.h \
@@ -285,6 +288,7 @@ nobase_include_HEADERS = \
qpid/broker/PersistableQueue.h \
qpid/broker/QueuePolicy.h \
qpid/broker/RecoveryManagerImpl.h \
+ qpid/broker/Timer.h \
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
qpid/broker/TxAck.h \
diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
index e256566d35..c1f0b44ed4 100644
--- a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -38,6 +38,7 @@
#include "Connection.h"
#include "DeliverableMessage.h"
#include "DtxAck.h"
+#include "DtxTimeout.h"
#include "MessageStore.h"
#include "TxAck.h"
#include "TxPublish.h"
@@ -154,18 +155,15 @@ void Channel::endDtx(const std::string& xid, bool fail){
% dtxBuffer->getXid() % xid);
}
+ txBuffer.reset();//ops on this channel no longer transactional
+
+ checkDtxTimeout();
if (fail) {
- accumulatedAck.clear();
dtxBuffer->fail();
} else {
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
dtxBuffer->markEnded();
- }
-
+ }
dtxBuffer.reset();
- txBuffer.reset();
}
void Channel::suspendDtx(const std::string& xid){
@@ -173,8 +171,10 @@ void Channel::suspendDtx(const std::string& xid){
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
% dtxBuffer->getXid() % xid);
}
+ txBuffer.reset();//ops on this channel no longer transactional
+
+ checkDtxTimeout();
dtxBuffer->setSuspended(true);
- txBuffer.reset();
}
void Channel::resumeDtx(const std::string& xid){
@@ -185,10 +185,20 @@ void Channel::resumeDtx(const std::string& xid){
if (!dtxBuffer->isSuspended()) {
throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
}
- dtxBuffer->setSuspended(true);
+
+ checkDtxTimeout();
+ dtxBuffer->setSuspended(false);
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
}
+void Channel::checkDtxTimeout()
+{
+ if (dtxBuffer->isExpired()) {
+ dtxBuffer.reset();
+ throw DtxTimeoutException();
+ }
+}
+
void Channel::deliver(
Message::shared_ptr& msg, const string& consumerTag,
Queue::shared_ptr& queue, bool ackExpected)
@@ -302,9 +312,14 @@ void Channel::ack(uint64_t deliveryTag, bool multiple){
void Channel::ack(uint64_t firstTag, uint64_t lastTag){
if (txBuffer.get()) {
accumulatedAck.update(firstTag, lastTag);
-
//TODO: I think the outstanding prefetch size & count should be updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
+ if (dtxBuffer.get()) {
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+
} else {
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.h b/qpid/cpp/src/qpid/broker/BrokerChannel.h
index 1fbfc2063e..0529caed5f 100644
--- a/qpid/cpp/src/qpid/broker/BrokerChannel.h
+++ b/qpid/cpp/src/qpid/broker/BrokerChannel.h
@@ -106,6 +106,8 @@ class Channel : public framing::ChannelAdapter,
void deliver(Message::shared_ptr& msg, const string& tag,
Queue::shared_ptr& queue, bool ackExpected);
bool checkPrefetch(Message::shared_ptr& msg);
+
+ void checkDtxTimeout();
public:
Channel(Connection& parent,
diff --git a/qpid/cpp/src/qpid/broker/DtxBuffer.cpp b/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
index 7f816ebcf4..29a07ea6d9 100644
--- a/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
@@ -23,7 +23,8 @@
using namespace qpid::broker;
using qpid::sys::Mutex;
-DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false), failed(false) {}
+DtxBuffer::DtxBuffer(const std::string& _xid)
+ : xid(_xid), ended(false), suspended(false), failed(false), expired(false) {}
DtxBuffer::~DtxBuffer() {}
@@ -68,3 +69,15 @@ const std::string& DtxBuffer::getXid()
return xid;
}
+void DtxBuffer::timedout()
+{
+ Mutex::ScopedLock locker(lock);
+ expired = true;
+ fail();
+}
+
+bool DtxBuffer::isExpired()
+{
+ Mutex::ScopedLock locker(lock);
+ return expired;
+}
diff --git a/qpid/cpp/src/qpid/broker/DtxBuffer.h b/qpid/cpp/src/qpid/broker/DtxBuffer.h
index 0d4e6ccf31..b302632037 100644
--- a/qpid/cpp/src/qpid/broker/DtxBuffer.h
+++ b/qpid/cpp/src/qpid/broker/DtxBuffer.h
@@ -32,6 +32,7 @@ namespace qpid {
bool ended;
bool suspended;
bool failed;
+ bool expired;
public:
typedef boost::shared_ptr<DtxBuffer> shared_ptr;
@@ -44,6 +45,8 @@ namespace qpid {
bool isSuspended();
void fail();
bool isRollbackOnly();
+ void timedout();
+ bool isExpired();
const std::string& getXid();
};
}
diff --git a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 1d7c2df5f4..c7ed95e5d3 100644
--- a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -61,21 +61,24 @@ void DtxHandlerImpl::end(const MethodContext& context,
bool fail,
bool suspend)
{
-
- if (fail) {
- channel.endDtx(xid, true);
- if (suspend) {
- throw ConnectionException(503, "End and suspend cannot both be set.");
- } else {
- dClient.endOk(XA_RBROLLBACK, context.getRequestId());
- }
- } else {
- if (suspend) {
- channel.suspendDtx(xid);
+ try {
+ if (fail) {
+ channel.endDtx(xid, true);
+ if (suspend) {
+ throw ConnectionException(503, "End and suspend cannot both be set.");
+ } else {
+ dClient.endOk(XA_RBROLLBACK, context.getRequestId());
+ }
} else {
- channel.endDtx(xid, false);
+ if (suspend) {
+ channel.suspendDtx(xid);
+ } else {
+ channel.endDtx(xid, false);
+ }
+ dClient.endOk(XA_OK, context.getRequestId());
}
- dClient.endOk(XA_OK, context.getRequestId());
+ } catch (DtxTimeoutException e) {
+ dClient.endOk(XA_RBTIMEOUT, context.getRequestId());
}
}
@@ -88,12 +91,16 @@ void DtxHandlerImpl::start(const MethodContext& context,
if (join && resume) {
throw ConnectionException(503, "Join and resume cannot both be set.");
}
- if (resume) {
- channel.resumeDtx(xid);
- } else {
- channel.startDtx(xid, broker.getDtxManager(), join);
+ try {
+ if (resume) {
+ channel.resumeDtx(xid);
+ } else {
+ channel.startDtx(xid, broker.getDtxManager(), join);
+ }
+ dClient.startOk(XA_OK, context.getRequestId());
+ } catch (DtxTimeoutException e) {
+ dClient.startOk(XA_RBTIMEOUT, context.getRequestId());
}
- dClient.startOk(XA_OK, context.getRequestId());
}
// DtxCoordinationHandler:
@@ -102,8 +109,12 @@ void DtxHandlerImpl::prepare(const MethodContext& context,
u_int16_t /*ticket*/,
const string& xid)
{
- bool ok = broker.getDtxManager().prepare(xid);
- cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+ try {
+ bool ok = broker.getDtxManager().prepare(xid);
+ cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+ } catch (DtxTimeoutException e) {
+ cClient.prepareOk(XA_RBTIMEOUT, context.getRequestId());
+ }
}
void DtxHandlerImpl::commit(const MethodContext& context,
@@ -111,8 +122,12 @@ void DtxHandlerImpl::commit(const MethodContext& context,
const string& xid,
bool onePhase)
{
- bool ok = broker.getDtxManager().commit(xid, onePhase);
- cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+ try {
+ bool ok = broker.getDtxManager().commit(xid, onePhase);
+ cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+ } catch (DtxTimeoutException e) {
+ cClient.commitOk(XA_RBTIMEOUT, context.getRequestId());
+ }
}
@@ -120,8 +135,12 @@ void DtxHandlerImpl::rollback(const MethodContext& context,
u_int16_t /*ticket*/,
const string& xid )
{
- broker.getDtxManager().rollback(xid);
- cClient.rollbackOk(XA_OK, context.getRequestId());
+ try {
+ broker.getDtxManager().rollback(xid);
+ cClient.rollbackOk(XA_OK, context.getRequestId());
+ } catch (DtxTimeoutException e) {
+ cClient.rollbackOk(XA_RBTIMEOUT, context.getRequestId());
+ }
}
void DtxHandlerImpl::recover(const MethodContext& context,
@@ -129,8 +148,6 @@ void DtxHandlerImpl::recover(const MethodContext& context,
bool /*startscan*/,
u_int32_t /*endscan*/ )
{
- //TODO
-
//TODO: what do startscan and endscan actually mean?
// response should hold on key value pair with key = 'xids' and
@@ -171,19 +188,21 @@ void DtxHandlerImpl::forget(const MethodContext& /*context*/,
throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid);
}
-void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/,
- const string& /*xid*/ )
+void DtxHandlerImpl::getTimeout(const MethodContext& context,
+ const string& xid)
{
- //TODO
+ uint32_t timeout = broker.getDtxManager().getTimeout(xid);
+ cClient.getTimeoutOk(timeout, context.getRequestId());
}
-void DtxHandlerImpl::setTimeout(const MethodContext& /*context*/,
+void DtxHandlerImpl::setTimeout(const MethodContext& context,
u_int16_t /*ticket*/,
- const string& /*xid*/,
- u_int32_t /*timeout*/ )
+ const string& xid,
+ u_int32_t timeout)
{
- //TODO
+ broker.getDtxManager().setTimeout(xid, timeout);
+ cClient.setTimeoutOk(context.getRequestId());
}
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp
index b05f7b9784..0d211017de 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp
@@ -19,6 +19,8 @@
*
*/
#include "DtxManager.h"
+#include "DtxTimeout.h"
+#include "qpid/log/Statement.h"
#include <boost/format.hpp>
#include <iostream>
using qpid::sys::Mutex;
@@ -29,37 +31,52 @@ DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {}
DtxManager::~DtxManager() {}
-void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops)
+void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops)
{
createWork(xid)->add(ops);
}
-void DtxManager::join(std::string xid, DtxBuffer::shared_ptr ops)
+void DtxManager::join(const std::string& xid, DtxBuffer::shared_ptr ops)
{
getWork(xid)->add(ops);
}
-void DtxManager::recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops)
+void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops)
{
createWork(xid)->recover(txn, ops);
}
bool DtxManager::prepare(const std::string& xid)
{
- return getWork(xid)->prepare();
+ try {
+ return getWork(xid)->prepare();
+ } catch (DtxTimeoutException& e) {
+ remove(xid);
+ throw e;
+ }
}
bool DtxManager::commit(const std::string& xid, bool onePhase)
{
- bool result = getWork(xid)->commit(onePhase);
- remove(xid);
- return result;
+ try {
+ bool result = getWork(xid)->commit(onePhase);
+ remove(xid);
+ return result;
+ } catch (DtxTimeoutException& e) {
+ remove(xid);
+ throw e;
+ }
}
void DtxManager::rollback(const std::string& xid)
{
- getWork(xid)->rollback();
- remove(xid);
+ try {
+ getWork(xid)->rollback();
+ remove(xid);
+ } catch (DtxTimeoutException& e) {
+ remove(xid);
+ throw e;
+ }
}
DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
@@ -83,7 +100,7 @@ void DtxManager::remove(const std::string& xid)
}
}
-DtxManager::WorkMap::iterator DtxManager::createWork(std::string& xid)
+DtxManager::WorkMap::iterator DtxManager::createWork(std::string xid)
{
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
@@ -93,3 +110,48 @@ DtxManager::WorkMap::iterator DtxManager::createWork(std::string& xid)
return work.insert(xid, new DtxWorkRecord(xid, store)).first;
}
}
+
+void DtxManager::setTimeout(const std::string& xid, uint32_t secs)
+{
+ WorkMap::iterator record = getWork(xid);
+ DtxTimeout::shared_ptr timeout = record->getTimeout();
+ if (timeout.get()) {
+ if (timeout->timeout == secs) return;//no need to do anything further if timeout hasn't changed
+ timeout->cancelled = true;
+ }
+ timeout = DtxTimeout::shared_ptr(new DtxTimeout(secs, *this, xid));
+ record->setTimeout(timeout);
+ timer.add(boost::static_pointer_cast<TimerTask>(timeout));
+
+}
+
+uint32_t DtxManager::getTimeout(const std::string& xid)
+{
+ DtxTimeout::shared_ptr timeout = getWork(xid)->getTimeout();
+ return !timeout ? 0 : timeout->timeout;
+}
+
+void DtxManager::timedout(const std::string& xid)
+{
+ Mutex::ScopedLock locker(lock);
+ WorkMap::iterator i = work.find(xid);
+ if (i == work.end()) {
+ QPID_LOG(warning, "Transaction timeout failed: no record for xid");
+ } else {
+ i->timedout();
+ //TODO: do we want to have a timed task to cleanup, or can we rely on an explicit completion?
+ //timer.add(TimerTask::shared_ptr(new DtxCleanup(60*30/*30 mins*/, *this, xid)));
+ }
+}
+
+DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
+ : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC)), mgr(_mgr), xid(_xid) {}
+
+void DtxManager::DtxCleanup::fire()
+{
+ try {
+ mgr.remove(xid);
+ } catch (ConnectionException& e) {
+ //assume it was explicitly cleaned up after a call to prepare, commit or rollback
+ }
+}
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h
index ce33f77a0f..3db3f70685 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.h
+++ b/qpid/cpp/src/qpid/broker/DtxManager.h
@@ -24,6 +24,7 @@
#include <boost/ptr_container/ptr_map.hpp>
#include "DtxBuffer.h"
#include "DtxWorkRecord.h"
+#include "Timer.h"
#include "TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Mutex.h"
@@ -34,23 +35,37 @@ namespace broker {
class DtxManager{
typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
+
+ struct DtxCleanup : public TimerTask
+ {
+ DtxManager& mgr;
+ const std::string& xid;
+
+ DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
+ void fire();
+ };
+
WorkMap work;
TransactionalStore* const store;
qpid::sys::Mutex lock;
+ Timer timer;
void remove(const std::string& xid);
WorkMap::iterator getWork(const std::string& xid);
- WorkMap::iterator createWork(std::string& xid);
+ WorkMap::iterator createWork(std::string xid);
public:
DtxManager(TransactionalStore* const store);
~DtxManager();
- void start(std::string xid, DtxBuffer::shared_ptr work);
- void join(std::string xid, DtxBuffer::shared_ptr work);
- void recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work);
+ void start(const std::string& xid, DtxBuffer::shared_ptr work);
+ void join(const std::string& xid, DtxBuffer::shared_ptr work);
+ void recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work);
bool prepare(const std::string& xid);
bool commit(const std::string& xid, bool onePhase);
void rollback(const std::string& xid);
+ void setTimeout(const std::string& xid, uint32_t secs);
+ uint32_t getTimeout(const std::string& xid);
+ void timedout(const std::string& xid);
};
}
diff --git a/qpid/cpp/src/qpid/broker/DtxTimeout.cpp b/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
new file mode 100644
index 0000000000..8e0a7741c4
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DtxTimeout.h"
+#include "DtxManager.h"
+#include "qpid/sys/Time.h"
+
+using namespace qpid::broker;
+
+DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
+ : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC)), timeout(_timeout), mgr(_mgr), xid(_xid)
+{
+}
+
+void DtxTimeout::fire()
+{
+ mgr.timedout(xid);
+}
diff --git a/qpid/cpp/src/qpid/broker/DtxTimeout.h b/qpid/cpp/src/qpid/broker/DtxTimeout.h
new file mode 100644
index 0000000000..33da62e7f4
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/DtxTimeout.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DtxTimeout_
+#define _DtxTimeout_
+
+#include "qpid/Exception.h"
+#include "Timer.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxManager;
+
+
+struct DtxTimeoutException : public Exception
+{
+ DtxTimeoutException() {}
+};
+
+
+struct DtxTimeout : public TimerTask
+{
+ typedef boost::shared_ptr<DtxTimeout> shared_ptr;
+ const uint32_t timeout;
+ DtxManager& mgr;
+ const std::string xid;
+
+ DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
+ void fire();
+};
+
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
index 1eb4903672..f2f118c5e4 100644
--- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -27,9 +27,14 @@ using qpid::sys::Mutex;
using namespace qpid::broker;
DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
- xid(_xid), store(_store), completed(false), rolledback(false), prepared(false) {}
+ xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
-DtxWorkRecord::~DtxWorkRecord() {}
+DtxWorkRecord::~DtxWorkRecord()
+{
+ if (timeout.get()) {
+ timeout->cancelled = true;
+ }
+}
bool DtxWorkRecord::prepare()
{
@@ -110,6 +115,9 @@ void DtxWorkRecord::rollback()
void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
{
Mutex::ScopedLock locker(lock);
+ if (expired) {
+ throw DtxTimeoutException();
+ }
if (completed) {
throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid);
}
@@ -118,6 +126,9 @@ void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
bool DtxWorkRecord::check()
{
+ if (expired) {
+ throw DtxTimeoutException();
+ }
if (!completed) {
//iterate through all DtxBuffers and ensure they are all ended
for (Work::iterator i = work.begin(); i != work.end(); i++) {
@@ -149,3 +160,18 @@ void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer
completed = true;
prepared = true;
}
+
+void DtxWorkRecord::timedout()
+{
+ Mutex::ScopedLock locker(lock);
+ expired = true;
+ rolledback = true;
+ if (!completed) {
+ for (Work::iterator i = work.begin(); i != work.end(); i++) {
+ if (!(*i)->isEnded()) {
+ (*i)->timedout();
+ }
+ }
+ }
+ abort();
+}
diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
index 0c6e0ba6bc..1e26dbebcd 100644
--- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
+++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
@@ -25,6 +25,7 @@
#include <functional>
#include <vector>
#include "DtxBuffer.h"
+#include "DtxTimeout.h"
#include "TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Mutex.h"
@@ -46,6 +47,8 @@ class DtxWorkRecord
bool completed;
bool rolledback;
bool prepared;
+ bool expired;
+ DtxTimeout::shared_ptr timeout;
Work work;
std::auto_ptr<TPCTransactionContext> txn;
qpid::sys::Mutex lock;
@@ -61,6 +64,9 @@ public:
void rollback();
void add(DtxBuffer::shared_ptr ops);
void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops);
+ void timedout();
+ void setTimeout(DtxTimeout::shared_ptr t) { timeout = t; }
+ DtxTimeout::shared_ptr getTimeout() { return timeout; }
};
}
diff --git a/qpid/cpp/src/qpid/broker/Timer.cpp b/qpid/cpp/src/qpid/broker/Timer.cpp
new file mode 100644
index 0000000000..be75346578
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Timer.cpp
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Timer.h"
+#include <iostream>
+
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::Monitor;
+using qpid::sys::Thread;
+using namespace qpid::broker;
+
+TimerTask::TimerTask(Duration timeout) : time(AbsTime::now(), timeout), cancelled(false) {}
+TimerTask::TimerTask(AbsTime _time) : time(_time), cancelled(false) {}
+TimerTask::~TimerTask(){}
+
+Timer::Timer() : active(false)
+{
+ start();
+}
+
+Timer::~Timer()
+{
+ stop();
+}
+
+void Timer::run()
+{
+ Monitor::ScopedLock l(monitor);
+ while(active){
+ if (tasks.empty()) {
+ monitor.wait();
+ } else {
+ TimerTask::shared_ptr t = tasks.top();
+ if (t->cancelled) {
+ tasks.pop();
+ } else if(t->time < AbsTime::now()) {
+ tasks.pop();
+ t->fire();
+ } else {
+ monitor.wait(t->time);
+ }
+ }
+ }
+}
+
+void Timer::add(TimerTask::shared_ptr task)
+{
+ Monitor::ScopedLock l(monitor);
+ tasks.push(task);
+ monitor.notify();
+}
+
+void Timer::start()
+{
+ Monitor::ScopedLock l(monitor);
+ if (!active) {
+ active = true;
+ runner = std::auto_ptr<Thread>(new Thread(this));
+ }
+}
+
+void Timer::stop()
+{
+ signalStop();
+ if (runner.get()) {
+ runner->join();
+ runner.reset();
+ }
+}
+void Timer::signalStop()
+{
+ Monitor::ScopedLock l(monitor);
+ if (active) {
+ active = false;
+ monitor.notifyAll();
+ }
+}
+
+bool Later::operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr& b) const
+{
+ return a.get() && b.get() && a->time > b->time;
+}
diff --git a/qpid/cpp/src/qpid/broker/Timer.h b/qpid/cpp/src/qpid/broker/Timer.h
new file mode 100644
index 0000000000..c70ffeaedc
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Timer.h
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Timer_
+#define _Timer_
+
+#include <memory>
+#include <queue>
+#include <boost/shared_ptr.hpp>
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+
+namespace qpid {
+namespace broker {
+
+struct TimerTask
+{
+ typedef boost::shared_ptr<TimerTask> shared_ptr;
+
+ const qpid::sys::AbsTime time;
+ volatile bool cancelled;
+
+ TimerTask(qpid::sys::Duration timeout);
+ TimerTask(qpid::sys::AbsTime time);
+ virtual ~TimerTask();
+ virtual void fire() = 0;
+};
+
+ struct Later
+ {
+ bool operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr& b) const;
+ };
+
+class Timer : private qpid::sys::Runnable
+{
+ qpid::sys::Monitor monitor;
+ std::priority_queue<TimerTask::shared_ptr, std::vector<TimerTask::shared_ptr>, Later> tasks;
+ std::auto_ptr<qpid::sys::Thread> runner;
+ bool active;
+
+ void run();
+ void signalStop();
+
+public:
+ Timer();
+ ~Timer();
+
+ void add(TimerTask::shared_ptr task);
+ void start();
+ void stop();
+
+};
+
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/sys/Time.h b/qpid/cpp/src/qpid/sys/Time.h
index 314f4b5bbb..4bb65e9f4a 100644
--- a/qpid/cpp/src/qpid/sys/Time.h
+++ b/qpid/cpp/src/qpid/sys/Time.h
@@ -44,6 +44,9 @@ public:
static AbsTime now();
inline static AbsTime FarFuture();
+
+ friend bool operator<(const AbsTime& a, const AbsTime& b);
+ friend bool operator>(const AbsTime& a, const AbsTime& b);
};
class Duration {
@@ -67,6 +70,9 @@ AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = std::numeric_limits<int6
inline AbsTime now() { return AbsTime::now(); }
+inline bool operator<(const AbsTime& a, const AbsTime& b) { return a.time_ns < b.time_ns; }
+inline bool operator>(const AbsTime& a, const AbsTime& b) { return a.time_ns > b.time_ns; }
+
Duration::Duration(int64_t time0) :
nanosecs(time0)
{}
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 9649ce354f..a19b5098ce 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -32,6 +32,7 @@ broker_unit_tests = \
QueueRegistryTest \
QueueTest \
QueuePolicyTest \
+ TimerTest \
TopicExchangeTest \
TxAckTest \
TxBufferTest \
diff --git a/qpid/cpp/src/tests/TimerTest.cpp b/qpid/cpp/src/tests/TimerTest.cpp
new file mode 100644
index 0000000000..682699dbd3
--- /dev/null
+++ b/qpid/cpp/src/tests/TimerTest.cpp
@@ -0,0 +1,128 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Timer.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid_test_plugin.h"
+#include <math.h>
+#include <iostream>
+#include <memory>
+#include <boost/format.hpp>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using boost::dynamic_pointer_cast;
+
+class TimerTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(TimerTest);
+ CPPUNIT_TEST(testGeneral);
+ CPPUNIT_TEST_SUITE_END();
+
+ class Counter
+ {
+ Mutex lock;
+ uint counter;
+ public:
+ Counter() : counter(0) {}
+ uint next()
+ {
+ Mutex::ScopedLock l(lock);
+ return ++counter;
+ }
+ };
+
+ class TestTask : public TimerTask
+ {
+ const AbsTime start;
+ const Duration expected;
+ AbsTime end;
+ bool fired;
+ uint position;
+ Monitor monitor;
+ Counter& counter;
+
+ public:
+ TestTask(Duration timeout, Counter& _counter)
+ : TimerTask(timeout), start(now()), expected(timeout), end(start), fired(false), counter(_counter) {}
+
+ void fire()
+ {
+ Monitor::ScopedLock l(monitor);
+ fired = true;
+ position = counter.next();
+ end = now();
+ monitor.notify();
+ }
+
+ void check(uint expected_position, uint64_t tolerance = 500 * TIME_MSEC)
+ {
+ Monitor::ScopedLock l(monitor);
+ CPPUNIT_ASSERT(fired);
+ CPPUNIT_ASSERT_EQUAL(expected_position, position);
+ Duration actual(start, end);
+ uint64_t difference = abs(expected - actual);
+ std::string msg(boost::lexical_cast<std::string>(boost::format("tolerance = %1%, difference = %2%") % tolerance % difference));
+ CPPUNIT_ASSERT_MESSAGE(msg, difference < tolerance);
+ }
+
+ void wait(Duration d)
+ {
+ Monitor::ScopedLock l(monitor);
+ monitor.wait(AbsTime(now(), d));
+ }
+ };
+
+ class DummyRunner : public Runnable
+ {
+ public:
+ void run() {}
+ };
+
+public:
+
+ void testGeneral()
+ {
+ Counter counter;
+ Timer timer;
+ TestTask::shared_ptr task1(new TestTask(Duration(3 * TIME_SEC), counter));
+ TestTask::shared_ptr task2(new TestTask(Duration(1 * TIME_SEC), counter));
+ TestTask::shared_ptr task3(new TestTask(Duration(4 * TIME_SEC), counter));
+ TestTask::shared_ptr task4(new TestTask(Duration(2 * TIME_SEC), counter));
+
+ timer.add(task1);
+ timer.add(task2);
+ timer.add(task3);
+ timer.add(task4);
+
+ dynamic_pointer_cast<TestTask>(task3)->wait(Duration(6 * TIME_SEC));
+
+ dynamic_pointer_cast<TestTask>(task1)->check(3);
+ dynamic_pointer_cast<TestTask>(task2)->check(1);
+ dynamic_pointer_cast<TestTask>(task3)->check(4);
+ dynamic_pointer_cast<TestTask>(task4)->check(2);
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TimerTest);
+
diff --git a/qpid/python/tests_0-9/dtx.py b/qpid/python/tests_0-9/dtx.py
index ec82c72d49..bc268f4129 100644
--- a/qpid/python/tests_0-9/dtx.py
+++ b/qpid/python/tests_0-9/dtx.py
@@ -21,6 +21,7 @@ from qpid.queue import Empty
from qpid.content import Content
from qpid.testlib import testrunner, TestBase
from struct import pack, unpack
+from time import sleep
class DtxTests(TestBase):
"""
@@ -37,6 +38,7 @@ class DtxTests(TestBase):
"""
XA_RBROLLBACK = 1
+ XA_RBTIMEOUT = 2
XA_OK = 8
def test_simple_commit(self):
@@ -451,6 +453,51 @@ class DtxTests(TestBase):
self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags)
channel1.dtx_coordination_rollback(xid=tx)
+ def test_get_timeout(self):
+ """
+ Check that get-timeout returns the correct value, (and that a
+ transaction with a timeout can complete normally)
+ """
+ channel = self.channel
+ tx = self.xid("dummy")
+
+ channel.dtx_demarcation_select()
+ channel.dtx_demarcation_start(xid=tx)
+ self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout)
+ channel.dtx_coordination_set_timeout(xid=tx, timeout=60)
+ self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout)
+ self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).flags)
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)
+
+ def test_set_timeout(self):
+ """
+ Test the timeout of a transaction results in the expected
+ behaviour
+ """
+ #open new channel to allow self.channel to be used in checking te queue
+ channel = self.client.channel(2)
+ channel.channel_open()
+ #setup:
+ tx = self.xid("dummy")
+ channel.queue_declare(queue="queue-a", exclusive=True)
+ channel.queue_declare(queue="queue-b", exclusive=True)
+ channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage")
+
+ channel.dtx_demarcation_select()
+ channel.dtx_demarcation_start(xid=tx)
+ self.swap(channel, "queue-a", "queue-b")
+ channel.dtx_coordination_set_timeout(xid=tx, timeout=2)
+ sleep(3)
+ #check that the work has been rolled back already
+ self.assertMessageCount(1, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+ self.assertMessageId("timeout", "queue-a")
+ #check the correct codes are returned when we try to complete the txn
+ self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).flags)
+ self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).flags)
+
+
+
def test_recover(self):
"""
Test basic recover behaviour