diff options
author | Gordon Sim <gsim@apache.org> | 2007-06-08 15:24:12 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-06-08 15:24:12 +0000 |
commit | 2373874e23fb62e502963878f5e7cd68e100021e (patch) | |
tree | 822809efbbe20a6d21aac34641f69b548eb5fec5 | |
parent | 961e07e4d1ad8eebcc80d4d024f8efa3ab972e05 (diff) | |
download | qpid-python-2373874e23fb62e502963878f5e7cd68e100021e.tar.gz |
Timeout handling for dtx, plus tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@545531 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/Makefile.am | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerChannel.cpp | 35 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerChannel.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxBuffer.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxBuffer.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp | 85 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.cpp | 82 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.h | 23 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxTimeout.cpp | 35 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxTimeout.h | 54 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxWorkRecord.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Timer.cpp | 100 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Timer.h | 76 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Time.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/TimerTest.cpp | 128 | ||||
-rw-r--r-- | qpid/python/tests_0-9/dtx.py | 47 |
18 files changed, 672 insertions, 60 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 02044be01b..75cc670015 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -189,6 +189,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/DtxBuffer.cpp \ qpid/broker/DtxHandlerImpl.cpp \ qpid/broker/DtxManager.cpp \ + qpid/broker/DtxTimeout.cpp \ qpid/broker/DtxWorkRecord.cpp \ qpid/broker/ExchangeRegistry.cpp \ qpid/broker/FanOutExchange.cpp \ @@ -206,6 +207,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/RecoveredEnqueue.cpp \ qpid/broker/RecoveredDequeue.cpp \ qpid/broker/Reference.cpp \ + qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ qpid/broker/TxAck.cpp \ qpid/broker/TxBuffer.cpp \ @@ -243,6 +245,7 @@ nobase_include_HEADERS = \ qpid/broker/DtxBuffer.h \ qpid/broker/DtxHandlerImpl.h \ qpid/broker/DtxManager.h \ + qpid/broker/DtxTimeout.h \ qpid/broker/DtxWorkRecord.h \ qpid/broker/ExchangeRegistry.h \ qpid/broker/FanOutExchange.h \ @@ -285,6 +288,7 @@ nobase_include_HEADERS = \ qpid/broker/PersistableQueue.h \ qpid/broker/QueuePolicy.h \ qpid/broker/RecoveryManagerImpl.h \ + qpid/broker/Timer.h \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ qpid/broker/TxAck.h \ diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp index e256566d35..c1f0b44ed4 100644 --- a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp @@ -38,6 +38,7 @@ #include "Connection.h" #include "DeliverableMessage.h" #include "DtxAck.h" +#include "DtxTimeout.h" #include "MessageStore.h" #include "TxAck.h" #include "TxPublish.h" @@ -154,18 +155,15 @@ void Channel::endDtx(const std::string& xid, bool fail){ % dtxBuffer->getXid() % xid); } + txBuffer.reset();//ops on this channel no longer transactional + + checkDtxTimeout(); if (fail) { - accumulatedAck.clear(); dtxBuffer->fail(); } else { - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); dtxBuffer->markEnded(); - } - + } dtxBuffer.reset(); - txBuffer.reset(); } void Channel::suspendDtx(const std::string& xid){ @@ -173,8 +171,10 @@ void Channel::suspendDtx(const std::string& xid){ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") % dtxBuffer->getXid() % xid); } + txBuffer.reset();//ops on this channel no longer transactional + + checkDtxTimeout(); dtxBuffer->setSuspended(true); - txBuffer.reset(); } void Channel::resumeDtx(const std::string& xid){ @@ -185,10 +185,20 @@ void Channel::resumeDtx(const std::string& xid){ if (!dtxBuffer->isSuspended()) { throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); } - dtxBuffer->setSuspended(true); + + checkDtxTimeout(); + dtxBuffer->setSuspended(false); txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); } +void Channel::checkDtxTimeout() +{ + if (dtxBuffer->isExpired()) { + dtxBuffer.reset(); + throw DtxTimeoutException(); + } +} + void Channel::deliver( Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected) @@ -302,9 +312,14 @@ void Channel::ack(uint64_t deliveryTag, bool multiple){ void Channel::ack(uint64_t firstTag, uint64_t lastTag){ if (txBuffer.get()) { accumulatedAck.update(firstTag, lastTag); - //TODO: I think the outstanding prefetch size & count should be updated at this point... //TODO: ...this may then necessitate dispatching to consumers + if (dtxBuffer.get()) { + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.h b/qpid/cpp/src/qpid/broker/BrokerChannel.h index 1fbfc2063e..0529caed5f 100644 --- a/qpid/cpp/src/qpid/broker/BrokerChannel.h +++ b/qpid/cpp/src/qpid/broker/BrokerChannel.h @@ -106,6 +106,8 @@ class Channel : public framing::ChannelAdapter, void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); bool checkPrefetch(Message::shared_ptr& msg); + + void checkDtxTimeout(); public: Channel(Connection& parent, diff --git a/qpid/cpp/src/qpid/broker/DtxBuffer.cpp b/qpid/cpp/src/qpid/broker/DtxBuffer.cpp index 7f816ebcf4..29a07ea6d9 100644 --- a/qpid/cpp/src/qpid/broker/DtxBuffer.cpp +++ b/qpid/cpp/src/qpid/broker/DtxBuffer.cpp @@ -23,7 +23,8 @@ using namespace qpid::broker; using qpid::sys::Mutex; -DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false), failed(false) {} +DtxBuffer::DtxBuffer(const std::string& _xid) + : xid(_xid), ended(false), suspended(false), failed(false), expired(false) {} DtxBuffer::~DtxBuffer() {} @@ -68,3 +69,15 @@ const std::string& DtxBuffer::getXid() return xid; } +void DtxBuffer::timedout() +{ + Mutex::ScopedLock locker(lock); + expired = true; + fail(); +} + +bool DtxBuffer::isExpired() +{ + Mutex::ScopedLock locker(lock); + return expired; +} diff --git a/qpid/cpp/src/qpid/broker/DtxBuffer.h b/qpid/cpp/src/qpid/broker/DtxBuffer.h index 0d4e6ccf31..b302632037 100644 --- a/qpid/cpp/src/qpid/broker/DtxBuffer.h +++ b/qpid/cpp/src/qpid/broker/DtxBuffer.h @@ -32,6 +32,7 @@ namespace qpid { bool ended; bool suspended; bool failed; + bool expired; public: typedef boost::shared_ptr<DtxBuffer> shared_ptr; @@ -44,6 +45,8 @@ namespace qpid { bool isSuspended(); void fail(); bool isRollbackOnly(); + void timedout(); + bool isExpired(); const std::string& getXid(); }; } diff --git a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 1d7c2df5f4..c7ed95e5d3 100644 --- a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -61,21 +61,24 @@ void DtxHandlerImpl::end(const MethodContext& context, bool fail, bool suspend) { - - if (fail) { - channel.endDtx(xid, true); - if (suspend) { - throw ConnectionException(503, "End and suspend cannot both be set."); - } else { - dClient.endOk(XA_RBROLLBACK, context.getRequestId()); - } - } else { - if (suspend) { - channel.suspendDtx(xid); + try { + if (fail) { + channel.endDtx(xid, true); + if (suspend) { + throw ConnectionException(503, "End and suspend cannot both be set."); + } else { + dClient.endOk(XA_RBROLLBACK, context.getRequestId()); + } } else { - channel.endDtx(xid, false); + if (suspend) { + channel.suspendDtx(xid); + } else { + channel.endDtx(xid, false); + } + dClient.endOk(XA_OK, context.getRequestId()); } - dClient.endOk(XA_OK, context.getRequestId()); + } catch (DtxTimeoutException e) { + dClient.endOk(XA_RBTIMEOUT, context.getRequestId()); } } @@ -88,12 +91,16 @@ void DtxHandlerImpl::start(const MethodContext& context, if (join && resume) { throw ConnectionException(503, "Join and resume cannot both be set."); } - if (resume) { - channel.resumeDtx(xid); - } else { - channel.startDtx(xid, broker.getDtxManager(), join); + try { + if (resume) { + channel.resumeDtx(xid); + } else { + channel.startDtx(xid, broker.getDtxManager(), join); + } + dClient.startOk(XA_OK, context.getRequestId()); + } catch (DtxTimeoutException e) { + dClient.startOk(XA_RBTIMEOUT, context.getRequestId()); } - dClient.startOk(XA_OK, context.getRequestId()); } // DtxCoordinationHandler: @@ -102,8 +109,12 @@ void DtxHandlerImpl::prepare(const MethodContext& context, u_int16_t /*ticket*/, const string& xid) { - bool ok = broker.getDtxManager().prepare(xid); - cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); + try { + bool ok = broker.getDtxManager().prepare(xid); + cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); + } catch (DtxTimeoutException e) { + cClient.prepareOk(XA_RBTIMEOUT, context.getRequestId()); + } } void DtxHandlerImpl::commit(const MethodContext& context, @@ -111,8 +122,12 @@ void DtxHandlerImpl::commit(const MethodContext& context, const string& xid, bool onePhase) { - bool ok = broker.getDtxManager().commit(xid, onePhase); - cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); + try { + bool ok = broker.getDtxManager().commit(xid, onePhase); + cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); + } catch (DtxTimeoutException e) { + cClient.commitOk(XA_RBTIMEOUT, context.getRequestId()); + } } @@ -120,8 +135,12 @@ void DtxHandlerImpl::rollback(const MethodContext& context, u_int16_t /*ticket*/, const string& xid ) { - broker.getDtxManager().rollback(xid); - cClient.rollbackOk(XA_OK, context.getRequestId()); + try { + broker.getDtxManager().rollback(xid); + cClient.rollbackOk(XA_OK, context.getRequestId()); + } catch (DtxTimeoutException e) { + cClient.rollbackOk(XA_RBTIMEOUT, context.getRequestId()); + } } void DtxHandlerImpl::recover(const MethodContext& context, @@ -129,8 +148,6 @@ void DtxHandlerImpl::recover(const MethodContext& context, bool /*startscan*/, u_int32_t /*endscan*/ ) { - //TODO - //TODO: what do startscan and endscan actually mean? // response should hold on key value pair with key = 'xids' and @@ -171,19 +188,21 @@ void DtxHandlerImpl::forget(const MethodContext& /*context*/, throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid); } -void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/, - const string& /*xid*/ ) +void DtxHandlerImpl::getTimeout(const MethodContext& context, + const string& xid) { - //TODO + uint32_t timeout = broker.getDtxManager().getTimeout(xid); + cClient.getTimeoutOk(timeout, context.getRequestId()); } -void DtxHandlerImpl::setTimeout(const MethodContext& /*context*/, +void DtxHandlerImpl::setTimeout(const MethodContext& context, u_int16_t /*ticket*/, - const string& /*xid*/, - u_int32_t /*timeout*/ ) + const string& xid, + u_int32_t timeout) { - //TODO + broker.getDtxManager().setTimeout(xid, timeout); + cClient.setTimeoutOk(context.getRequestId()); } diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp index b05f7b9784..0d211017de 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.cpp +++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp @@ -19,6 +19,8 @@ * */ #include "DtxManager.h" +#include "DtxTimeout.h" +#include "qpid/log/Statement.h" #include <boost/format.hpp> #include <iostream> using qpid::sys::Mutex; @@ -29,37 +31,52 @@ DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {} DtxManager::~DtxManager() {} -void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops) +void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops) { createWork(xid)->add(ops); } -void DtxManager::join(std::string xid, DtxBuffer::shared_ptr ops) +void DtxManager::join(const std::string& xid, DtxBuffer::shared_ptr ops) { getWork(xid)->add(ops); } -void DtxManager::recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops) +void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops) { createWork(xid)->recover(txn, ops); } bool DtxManager::prepare(const std::string& xid) { - return getWork(xid)->prepare(); + try { + return getWork(xid)->prepare(); + } catch (DtxTimeoutException& e) { + remove(xid); + throw e; + } } bool DtxManager::commit(const std::string& xid, bool onePhase) { - bool result = getWork(xid)->commit(onePhase); - remove(xid); - return result; + try { + bool result = getWork(xid)->commit(onePhase); + remove(xid); + return result; + } catch (DtxTimeoutException& e) { + remove(xid); + throw e; + } } void DtxManager::rollback(const std::string& xid) { - getWork(xid)->rollback(); - remove(xid); + try { + getWork(xid)->rollback(); + remove(xid); + } catch (DtxTimeoutException& e) { + remove(xid); + throw e; + } } DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid) @@ -83,7 +100,7 @@ void DtxManager::remove(const std::string& xid) } } -DtxManager::WorkMap::iterator DtxManager::createWork(std::string& xid) +DtxManager::WorkMap::iterator DtxManager::createWork(std::string xid) { Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); @@ -93,3 +110,48 @@ DtxManager::WorkMap::iterator DtxManager::createWork(std::string& xid) return work.insert(xid, new DtxWorkRecord(xid, store)).first; } } + +void DtxManager::setTimeout(const std::string& xid, uint32_t secs) +{ + WorkMap::iterator record = getWork(xid); + DtxTimeout::shared_ptr timeout = record->getTimeout(); + if (timeout.get()) { + if (timeout->timeout == secs) return;//no need to do anything further if timeout hasn't changed + timeout->cancelled = true; + } + timeout = DtxTimeout::shared_ptr(new DtxTimeout(secs, *this, xid)); + record->setTimeout(timeout); + timer.add(boost::static_pointer_cast<TimerTask>(timeout)); + +} + +uint32_t DtxManager::getTimeout(const std::string& xid) +{ + DtxTimeout::shared_ptr timeout = getWork(xid)->getTimeout(); + return !timeout ? 0 : timeout->timeout; +} + +void DtxManager::timedout(const std::string& xid) +{ + Mutex::ScopedLock locker(lock); + WorkMap::iterator i = work.find(xid); + if (i == work.end()) { + QPID_LOG(warning, "Transaction timeout failed: no record for xid"); + } else { + i->timedout(); + //TODO: do we want to have a timed task to cleanup, or can we rely on an explicit completion? + //timer.add(TimerTask::shared_ptr(new DtxCleanup(60*30/*30 mins*/, *this, xid))); + } +} + +DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) + : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC)), mgr(_mgr), xid(_xid) {} + +void DtxManager::DtxCleanup::fire() +{ + try { + mgr.remove(xid); + } catch (ConnectionException& e) { + //assume it was explicitly cleaned up after a call to prepare, commit or rollback + } +} diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h index ce33f77a0f..3db3f70685 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.h +++ b/qpid/cpp/src/qpid/broker/DtxManager.h @@ -24,6 +24,7 @@ #include <boost/ptr_container/ptr_map.hpp> #include "DtxBuffer.h" #include "DtxWorkRecord.h" +#include "Timer.h" #include "TransactionalStore.h" #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" @@ -34,23 +35,37 @@ namespace broker { class DtxManager{ typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap; + + struct DtxCleanup : public TimerTask + { + DtxManager& mgr; + const std::string& xid; + + DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid); + void fire(); + }; + WorkMap work; TransactionalStore* const store; qpid::sys::Mutex lock; + Timer timer; void remove(const std::string& xid); WorkMap::iterator getWork(const std::string& xid); - WorkMap::iterator createWork(std::string& xid); + WorkMap::iterator createWork(std::string xid); public: DtxManager(TransactionalStore* const store); ~DtxManager(); - void start(std::string xid, DtxBuffer::shared_ptr work); - void join(std::string xid, DtxBuffer::shared_ptr work); - void recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work); + void start(const std::string& xid, DtxBuffer::shared_ptr work); + void join(const std::string& xid, DtxBuffer::shared_ptr work); + void recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work); bool prepare(const std::string& xid); bool commit(const std::string& xid, bool onePhase); void rollback(const std::string& xid); + void setTimeout(const std::string& xid, uint32_t secs); + uint32_t getTimeout(const std::string& xid); + void timedout(const std::string& xid); }; } diff --git a/qpid/cpp/src/qpid/broker/DtxTimeout.cpp b/qpid/cpp/src/qpid/broker/DtxTimeout.cpp new file mode 100644 index 0000000000..8e0a7741c4 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/DtxTimeout.cpp @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DtxTimeout.h" +#include "DtxManager.h" +#include "qpid/sys/Time.h" + +using namespace qpid::broker; + +DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) + : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC)), timeout(_timeout), mgr(_mgr), xid(_xid) +{ +} + +void DtxTimeout::fire() +{ + mgr.timedout(xid); +} diff --git a/qpid/cpp/src/qpid/broker/DtxTimeout.h b/qpid/cpp/src/qpid/broker/DtxTimeout.h new file mode 100644 index 0000000000..33da62e7f4 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/DtxTimeout.h @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DtxTimeout_ +#define _DtxTimeout_ + +#include "qpid/Exception.h" +#include "Timer.h" + +namespace qpid { +namespace broker { + +class DtxManager; + + +struct DtxTimeoutException : public Exception +{ + DtxTimeoutException() {} +}; + + +struct DtxTimeout : public TimerTask +{ + typedef boost::shared_ptr<DtxTimeout> shared_ptr; + const uint32_t timeout; + DtxManager& mgr; + const std::string xid; + + DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid); + void fire(); +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp index 1eb4903672..f2f118c5e4 100644 --- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -27,9 +27,14 @@ using qpid::sys::Mutex; using namespace qpid::broker; DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : - xid(_xid), store(_store), completed(false), rolledback(false), prepared(false) {} + xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {} -DtxWorkRecord::~DtxWorkRecord() {} +DtxWorkRecord::~DtxWorkRecord() +{ + if (timeout.get()) { + timeout->cancelled = true; + } +} bool DtxWorkRecord::prepare() { @@ -110,6 +115,9 @@ void DtxWorkRecord::rollback() void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) { Mutex::ScopedLock locker(lock); + if (expired) { + throw DtxTimeoutException(); + } if (completed) { throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid); } @@ -118,6 +126,9 @@ void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) bool DtxWorkRecord::check() { + if (expired) { + throw DtxTimeoutException(); + } if (!completed) { //iterate through all DtxBuffers and ensure they are all ended for (Work::iterator i = work.begin(); i != work.end(); i++) { @@ -149,3 +160,18 @@ void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer completed = true; prepared = true; } + +void DtxWorkRecord::timedout() +{ + Mutex::ScopedLock locker(lock); + expired = true; + rolledback = true; + if (!completed) { + for (Work::iterator i = work.begin(); i != work.end(); i++) { + if (!(*i)->isEnded()) { + (*i)->timedout(); + } + } + } + abort(); +} diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h index 0c6e0ba6bc..1e26dbebcd 100644 --- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h @@ -25,6 +25,7 @@ #include <functional> #include <vector> #include "DtxBuffer.h" +#include "DtxTimeout.h" #include "TransactionalStore.h" #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" @@ -46,6 +47,8 @@ class DtxWorkRecord bool completed; bool rolledback; bool prepared; + bool expired; + DtxTimeout::shared_ptr timeout; Work work; std::auto_ptr<TPCTransactionContext> txn; qpid::sys::Mutex lock; @@ -61,6 +64,9 @@ public: void rollback(); void add(DtxBuffer::shared_ptr ops); void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops); + void timedout(); + void setTimeout(DtxTimeout::shared_ptr t) { timeout = t; } + DtxTimeout::shared_ptr getTimeout() { return timeout; } }; } diff --git a/qpid/cpp/src/qpid/broker/Timer.cpp b/qpid/cpp/src/qpid/broker/Timer.cpp new file mode 100644 index 0000000000..be75346578 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Timer.cpp @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Timer.h" +#include <iostream> + +using qpid::sys::AbsTime; +using qpid::sys::Duration; +using qpid::sys::Monitor; +using qpid::sys::Thread; +using namespace qpid::broker; + +TimerTask::TimerTask(Duration timeout) : time(AbsTime::now(), timeout), cancelled(false) {} +TimerTask::TimerTask(AbsTime _time) : time(_time), cancelled(false) {} +TimerTask::~TimerTask(){} + +Timer::Timer() : active(false) +{ + start(); +} + +Timer::~Timer() +{ + stop(); +} + +void Timer::run() +{ + Monitor::ScopedLock l(monitor); + while(active){ + if (tasks.empty()) { + monitor.wait(); + } else { + TimerTask::shared_ptr t = tasks.top(); + if (t->cancelled) { + tasks.pop(); + } else if(t->time < AbsTime::now()) { + tasks.pop(); + t->fire(); + } else { + monitor.wait(t->time); + } + } + } +} + +void Timer::add(TimerTask::shared_ptr task) +{ + Monitor::ScopedLock l(monitor); + tasks.push(task); + monitor.notify(); +} + +void Timer::start() +{ + Monitor::ScopedLock l(monitor); + if (!active) { + active = true; + runner = std::auto_ptr<Thread>(new Thread(this)); + } +} + +void Timer::stop() +{ + signalStop(); + if (runner.get()) { + runner->join(); + runner.reset(); + } +} +void Timer::signalStop() +{ + Monitor::ScopedLock l(monitor); + if (active) { + active = false; + monitor.notifyAll(); + } +} + +bool Later::operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr& b) const +{ + return a.get() && b.get() && a->time > b->time; +} diff --git a/qpid/cpp/src/qpid/broker/Timer.h b/qpid/cpp/src/qpid/broker/Timer.h new file mode 100644 index 0000000000..c70ffeaedc --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Timer.h @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Timer_ +#define _Timer_ + +#include <memory> +#include <queue> +#include <boost/shared_ptr.hpp> +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" + +namespace qpid { +namespace broker { + +struct TimerTask +{ + typedef boost::shared_ptr<TimerTask> shared_ptr; + + const qpid::sys::AbsTime time; + volatile bool cancelled; + + TimerTask(qpid::sys::Duration timeout); + TimerTask(qpid::sys::AbsTime time); + virtual ~TimerTask(); + virtual void fire() = 0; +}; + + struct Later + { + bool operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr& b) const; + }; + +class Timer : private qpid::sys::Runnable +{ + qpid::sys::Monitor monitor; + std::priority_queue<TimerTask::shared_ptr, std::vector<TimerTask::shared_ptr>, Later> tasks; + std::auto_ptr<qpid::sys::Thread> runner; + bool active; + + void run(); + void signalStop(); + +public: + Timer(); + ~Timer(); + + void add(TimerTask::shared_ptr task); + void start(); + void stop(); + +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/sys/Time.h b/qpid/cpp/src/qpid/sys/Time.h index 314f4b5bbb..4bb65e9f4a 100644 --- a/qpid/cpp/src/qpid/sys/Time.h +++ b/qpid/cpp/src/qpid/sys/Time.h @@ -44,6 +44,9 @@ public: static AbsTime now(); inline static AbsTime FarFuture(); + + friend bool operator<(const AbsTime& a, const AbsTime& b); + friend bool operator>(const AbsTime& a, const AbsTime& b); }; class Duration { @@ -67,6 +70,9 @@ AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = std::numeric_limits<int6 inline AbsTime now() { return AbsTime::now(); } +inline bool operator<(const AbsTime& a, const AbsTime& b) { return a.time_ns < b.time_ns; } +inline bool operator>(const AbsTime& a, const AbsTime& b) { return a.time_ns > b.time_ns; } + Duration::Duration(int64_t time0) : nanosecs(time0) {} diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 9649ce354f..a19b5098ce 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -32,6 +32,7 @@ broker_unit_tests = \ QueueRegistryTest \ QueueTest \ QueuePolicyTest \ + TimerTest \ TopicExchangeTest \ TxAckTest \ TxBufferTest \ diff --git a/qpid/cpp/src/tests/TimerTest.cpp b/qpid/cpp/src/tests/TimerTest.cpp new file mode 100644 index 0000000000..682699dbd3 --- /dev/null +++ b/qpid/cpp/src/tests/TimerTest.cpp @@ -0,0 +1,128 @@ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Timer.h" +#include "qpid/sys/Monitor.h" +#include "qpid_test_plugin.h" +#include <math.h> +#include <iostream> +#include <memory> +#include <boost/format.hpp> + +using namespace qpid::broker; +using namespace qpid::sys; +using boost::dynamic_pointer_cast; + +class TimerTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(TimerTest); + CPPUNIT_TEST(testGeneral); + CPPUNIT_TEST_SUITE_END(); + + class Counter + { + Mutex lock; + uint counter; + public: + Counter() : counter(0) {} + uint next() + { + Mutex::ScopedLock l(lock); + return ++counter; + } + }; + + class TestTask : public TimerTask + { + const AbsTime start; + const Duration expected; + AbsTime end; + bool fired; + uint position; + Monitor monitor; + Counter& counter; + + public: + TestTask(Duration timeout, Counter& _counter) + : TimerTask(timeout), start(now()), expected(timeout), end(start), fired(false), counter(_counter) {} + + void fire() + { + Monitor::ScopedLock l(monitor); + fired = true; + position = counter.next(); + end = now(); + monitor.notify(); + } + + void check(uint expected_position, uint64_t tolerance = 500 * TIME_MSEC) + { + Monitor::ScopedLock l(monitor); + CPPUNIT_ASSERT(fired); + CPPUNIT_ASSERT_EQUAL(expected_position, position); + Duration actual(start, end); + uint64_t difference = abs(expected - actual); + std::string msg(boost::lexical_cast<std::string>(boost::format("tolerance = %1%, difference = %2%") % tolerance % difference)); + CPPUNIT_ASSERT_MESSAGE(msg, difference < tolerance); + } + + void wait(Duration d) + { + Monitor::ScopedLock l(monitor); + monitor.wait(AbsTime(now(), d)); + } + }; + + class DummyRunner : public Runnable + { + public: + void run() {} + }; + +public: + + void testGeneral() + { + Counter counter; + Timer timer; + TestTask::shared_ptr task1(new TestTask(Duration(3 * TIME_SEC), counter)); + TestTask::shared_ptr task2(new TestTask(Duration(1 * TIME_SEC), counter)); + TestTask::shared_ptr task3(new TestTask(Duration(4 * TIME_SEC), counter)); + TestTask::shared_ptr task4(new TestTask(Duration(2 * TIME_SEC), counter)); + + timer.add(task1); + timer.add(task2); + timer.add(task3); + timer.add(task4); + + dynamic_pointer_cast<TestTask>(task3)->wait(Duration(6 * TIME_SEC)); + + dynamic_pointer_cast<TestTask>(task1)->check(3); + dynamic_pointer_cast<TestTask>(task2)->check(1); + dynamic_pointer_cast<TestTask>(task3)->check(4); + dynamic_pointer_cast<TestTask>(task4)->check(2); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TimerTest); + diff --git a/qpid/python/tests_0-9/dtx.py b/qpid/python/tests_0-9/dtx.py index ec82c72d49..bc268f4129 100644 --- a/qpid/python/tests_0-9/dtx.py +++ b/qpid/python/tests_0-9/dtx.py @@ -21,6 +21,7 @@ from qpid.queue import Empty from qpid.content import Content from qpid.testlib import testrunner, TestBase from struct import pack, unpack +from time import sleep class DtxTests(TestBase): """ @@ -37,6 +38,7 @@ class DtxTests(TestBase): """ XA_RBROLLBACK = 1 + XA_RBTIMEOUT = 2 XA_OK = 8 def test_simple_commit(self): @@ -451,6 +453,51 @@ class DtxTests(TestBase): self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags) channel1.dtx_coordination_rollback(xid=tx) + def test_get_timeout(self): + """ + Check that get-timeout returns the correct value, (and that a + transaction with a timeout can complete normally) + """ + channel = self.channel + tx = self.xid("dummy") + + channel.dtx_demarcation_select() + channel.dtx_demarcation_start(xid=tx) + self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout) + channel.dtx_coordination_set_timeout(xid=tx, timeout=60) + self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout) + self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).flags) + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) + + def test_set_timeout(self): + """ + Test the timeout of a transaction results in the expected + behaviour + """ + #open new channel to allow self.channel to be used in checking te queue + channel = self.client.channel(2) + channel.channel_open() + #setup: + tx = self.xid("dummy") + channel.queue_declare(queue="queue-a", exclusive=True) + channel.queue_declare(queue="queue-b", exclusive=True) + channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage") + + channel.dtx_demarcation_select() + channel.dtx_demarcation_start(xid=tx) + self.swap(channel, "queue-a", "queue-b") + channel.dtx_coordination_set_timeout(xid=tx, timeout=2) + sleep(3) + #check that the work has been rolled back already + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("timeout", "queue-a") + #check the correct codes are returned when we try to complete the txn + self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).flags) + self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).flags) + + + def test_recover(self): """ Test basic recover behaviour |