diff options
author | Alan Conway <aconway@apache.org> | 2011-08-25 20:41:28 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-08-25 20:41:28 +0000 |
commit | 2fdd2cc2ade41e213ae35818532574bbf40f4a00 (patch) | |
tree | 42fb45022ea08fee157abf50713b452acf5eda5d /cpp | |
parent | 7f99badd1c330b3a6032b15a13aca1cde81274d3 (diff) | |
download | qpid-python-2fdd2cc2ade41e213ae35818532574bbf40f4a00.tar.gz |
QPID-3384: Enable DTX transactions in a cluster.
- Replicate DTX state to new members joining.
- Use cluster timer for DTX timeouts.
- Incidental: quote nulls in qpid::Msg messages (XIDs often have null characters)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1161742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
30 files changed, 699 insertions, 284 deletions
diff --git a/cpp/include/qpid/Msg.h b/cpp/include/qpid/Msg.h index e1837c29e5..a4deb15c26 100644 --- a/cpp/include/qpid/Msg.h +++ b/cpp/include/qpid/Msg.h @@ -41,7 +41,7 @@ struct Msg { std::ostringstream os; Msg() {} Msg(const Msg& m) : os(m.str()) {} - std::string str() const { return os.str(); } + std::string str() const; operator std::string() const { return str(); } Msg& operator<<(long n) { os << n; return *this; } diff --git a/cpp/rubygen/amqpgen.rb b/cpp/rubygen/amqpgen.rb index 20aac35194..88720cad5f 100755 --- a/cpp/rubygen/amqpgen.rb +++ b/cpp/rubygen/amqpgen.rb @@ -191,7 +191,8 @@ class AmqpElement "command-fragments" => "session.command-fragment", "in-doubt" => "dtx.xid", "tx-publish" => "str-8", - "queues" => "str-8" + "queues" => "str-8", + "prepared" => "str-8" } def array_type(name) diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 4f9d7ab28a..e502000758 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -89,7 +89,7 @@ rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate . ../include $(specs) all $(rgen_srcs) $(srcdir)/rubygen.mk: rgen.timestamp rgen.timestamp: $(rgen_generator) $(specs) - $(rgen_cmd) $(srcdir)/rubygen.mk; touch $@ + $(rgen_cmd) $(srcdir)/rubygen.mk && touch $@ $(rgen_generator): # The CMake version is needed for dist @@ -435,6 +435,7 @@ libqpidcommon_la_SOURCES += \ qpid/log/OstreamOutput.h \ qpid/log/Selector.cpp \ qpid/log/Statement.cpp \ + qpid/Msg.cpp \ qpid/management/Buffer.cpp \ qpid/management/ConnectionSettings.cpp \ qpid/management/Manageable.cpp \ diff --git a/cpp/src/qpid/Msg.cpp b/cpp/src/qpid/Msg.cpp new file mode 100644 index 0000000000..d441cdd180 --- /dev/null +++ b/cpp/src/qpid/Msg.cpp @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Msg.h" +#include <string> + +namespace qpid { +using namespace std; + +struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } }; + +const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' }; + +std::string quote(const std::string& str) { + NonPrint nonPrint; + size_t n = std::count_if(str.begin(), str.end(), nonPrint); + if (n==0) return str; + std::string ret; + ret.reserve(str.size()+2*n); // Avoid extra allocations. + for (string::const_iterator i = str.begin(); i != str.end(); ++i) { + if (nonPrint(*i)) { + ret.push_back('\\'); + ret.push_back('x'); + ret.push_back(hex[((*i) >> 4)&0xf]); + ret.push_back(hex[(*i) & 0xf]); + } + else ret.push_back(*i); + } + return ret; +} + +// Quote the string so messages with null characters are preserved, e.g. messages with XIDs */ +std::string Msg::str() const { + return quote(os.str()); +} + +} // namespace qpid diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 240b9063f8..598c43b1d8 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -752,6 +752,7 @@ bool Broker::deferDeliveryImpl(const std::string& , void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { clusterTimer = t; queueCleaner.setTimer(clusterTimer.get()); + dtxManager.setTimer(*clusterTimer.get()); } const std::string Broker::TCP_TRANSPORT("tcp"); diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp index bca3f90bbe..c558681d62 100644 --- a/cpp/src/qpid/broker/DtxAck.cpp +++ b/cpp/src/qpid/broker/DtxAck.cpp @@ -32,6 +32,10 @@ DtxAck::DtxAck(const qpid::framing::SequenceSet& acked, DeliveryRecords& unacked not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked))); } +DtxAck::DtxAck(DeliveryRecords& unacked) { + pending = unacked; +} + bool DtxAck::prepare(TransactionContext* ctxt) throw() { try{ diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h index 166147e58d..16c3ff8ba0 100644 --- a/cpp/src/qpid/broker/DtxAck.h +++ b/cpp/src/qpid/broker/DtxAck.h @@ -1,3 +1,6 @@ +#ifndef QPID_BROKER_DTXACK_H +#define QPID_BROKER_DTXACK_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,9 +21,6 @@ * under the License. * */ -#ifndef _DtxAck_ -#define _DtxAck_ - #include <algorithm> #include <functional> #include <list> @@ -29,20 +29,21 @@ #include "qpid/broker/TxOp.h" namespace qpid { - namespace broker { - class DtxAck : public TxOp{ - DeliveryRecords pending; +namespace broker { +class DtxAck : public TxOp{ + DeliveryRecords pending; - public: - DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked); - virtual bool prepare(TransactionContext* ctxt) throw(); - virtual void commit() throw(); - virtual void rollback() throw(); - virtual ~DtxAck(){} - virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } - }; - } -} + public: + DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked); + DtxAck(DeliveryRecords& unacked); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~DtxAck(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } + const DeliveryRecords& getPending() const { return pending; } +}; +}} // qpid::broker -#endif +#endif /*!QPID_BROKER_DTXACK_H*/ diff --git a/cpp/src/qpid/broker/DtxBuffer.cpp b/cpp/src/qpid/broker/DtxBuffer.cpp index f1b8169cf7..13177d3b72 100644 --- a/cpp/src/qpid/broker/DtxBuffer.cpp +++ b/cpp/src/qpid/broker/DtxBuffer.cpp @@ -23,8 +23,11 @@ using namespace qpid::broker; using qpid::sys::Mutex; -DtxBuffer::DtxBuffer(const std::string& _xid) - : xid(_xid), ended(false), suspended(false), failed(false), expired(false) {} +DtxBuffer::DtxBuffer( + const std::string& _xid, + bool ended_, bool suspended_, bool failed_, bool expired_) + : xid(_xid), ended(ended_), suspended(suspended_), failed(failed_), expired(expired_) +{} DtxBuffer::~DtxBuffer() {} @@ -34,7 +37,7 @@ void DtxBuffer::markEnded() ended = true; } -bool DtxBuffer::isEnded() +bool DtxBuffer::isEnded() const { Mutex::ScopedLock locker(lock); return ended; @@ -45,7 +48,7 @@ void DtxBuffer::setSuspended(bool isSuspended) suspended = isSuspended; } -bool DtxBuffer::isSuspended() +bool DtxBuffer::isSuspended() const { return suspended; } @@ -58,13 +61,13 @@ void DtxBuffer::fail() ended = true; } -bool DtxBuffer::isRollbackOnly() +bool DtxBuffer::isRollbackOnly() const { Mutex::ScopedLock locker(lock); return failed; } -const std::string& DtxBuffer::getXid() +std::string DtxBuffer::getXid() const { return xid; } @@ -76,8 +79,13 @@ void DtxBuffer::timedout() fail(); } -bool DtxBuffer::isExpired() +bool DtxBuffer::isExpired() const { Mutex::ScopedLock locker(lock); return expired; } + +bool DtxBuffer::isFailed() const +{ + return failed; +} diff --git a/cpp/src/qpid/broker/DtxBuffer.h b/cpp/src/qpid/broker/DtxBuffer.h index 1511cb032f..cabd37647a 100644 --- a/cpp/src/qpid/broker/DtxBuffer.h +++ b/cpp/src/qpid/broker/DtxBuffer.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,31 +26,34 @@ #include "qpid/sys/Mutex.h" namespace qpid { - namespace broker { - class DtxBuffer : public TxBuffer{ - sys::Mutex lock; - const std::string xid; - bool ended; - bool suspended; - bool failed; - bool expired; +namespace broker { +class DtxBuffer : public TxBuffer{ + mutable sys::Mutex lock; + const std::string xid; + bool ended; + bool suspended; + bool failed; + bool expired; - public: - typedef boost::shared_ptr<DtxBuffer> shared_ptr; + public: + typedef boost::shared_ptr<DtxBuffer> shared_ptr; - QPID_BROKER_EXTERN DtxBuffer(const std::string& xid = ""); - QPID_BROKER_EXTERN ~DtxBuffer(); - QPID_BROKER_EXTERN void markEnded(); - bool isEnded(); - void setSuspended(bool suspended); - bool isSuspended(); - void fail(); - bool isRollbackOnly(); - void timedout(); - bool isExpired(); - const std::string& getXid(); - }; - } + QPID_BROKER_EXTERN DtxBuffer( + const std::string& xid = "", + bool ended=false, bool suspended=false, bool failed=false, bool expired=false); + QPID_BROKER_EXTERN ~DtxBuffer(); + QPID_BROKER_EXTERN void markEnded(); + bool isEnded() const; + void setSuspended(bool suspended); + bool isSuspended() const; + void fail(); + bool isRollbackOnly() const; + void timedout(); + bool isExpired() const; + bool isFailed() const; + std::string getXid() const; +}; +} } diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 3caa41c3f4..febd547478 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,7 +34,7 @@ using qpid::ptr_map_ptr; using namespace qpid::broker; using namespace qpid::framing; -DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(t) {} +DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {} DtxManager::~DtxManager() {} @@ -53,8 +53,8 @@ void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionCon createWork(xid)->recover(txn, ops); } -bool DtxManager::prepare(const std::string& xid) -{ +bool DtxManager::prepare(const std::string& xid) +{ QPID_LOG(debug, "preparing: " << xid); try { return getWork(xid)->prepare(); @@ -64,8 +64,8 @@ bool DtxManager::prepare(const std::string& xid) } } -bool DtxManager::commit(const std::string& xid, bool onePhase) -{ +bool DtxManager::commit(const std::string& xid, bool onePhase) +{ QPID_LOG(debug, "committing: " << xid); try { bool result = getWork(xid)->commit(onePhase); @@ -77,8 +77,8 @@ bool DtxManager::commit(const std::string& xid, bool onePhase) } } -void DtxManager::rollback(const std::string& xid) -{ +void DtxManager::rollback(const std::string& xid) +{ QPID_LOG(debug, "rolling back: " << xid); try { getWork(xid)->rollback(); @@ -91,7 +91,7 @@ void DtxManager::rollback(const std::string& xid) DtxWorkRecord* DtxManager::getWork(const std::string& xid) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { throw NotFoundException(QPID_MSG("Unrecognised xid " << xid)); @@ -99,9 +99,14 @@ DtxWorkRecord* DtxManager::getWork(const std::string& xid) return ptr_map_ptr(i); } +bool DtxManager::exists(const std::string& xid) { + Mutex::ScopedLock locker(lock); + return work.find(xid) != work.end(); +} + void DtxManager::remove(const std::string& xid) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { throw NotFoundException(QPID_MSG("Unrecognised xid " << xid)); @@ -110,14 +115,15 @@ void DtxManager::remove(const std::string& xid) } } -DtxWorkRecord* DtxManager::createWork(std::string xid) +DtxWorkRecord* DtxManager::createWork(const std::string& xid) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i != work.end()) { throw NotAllowedException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)")); } else { - return ptr_map_ptr(work.insert(xid, new DtxWorkRecord(xid, store)).first); + std::string ncxid = xid; // Work around const correctness problems in ptr_map. + return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first); } } @@ -131,7 +137,7 @@ void DtxManager::setTimeout(const std::string& xid, uint32_t secs) } timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid)); record->setTimeout(timeout); - timer.add(timeout); + timer->add(timeout); } uint32_t DtxManager::getTimeout(const std::string& xid) @@ -142,7 +148,7 @@ uint32_t DtxManager::getTimeout(const std::string& xid) void DtxManager::timedout(const std::string& xid) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { QPID_LOG(warning, "Transaction timeout failed: no record for xid"); @@ -153,7 +159,7 @@ void DtxManager::timedout(const std::string& xid) } } -DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) +DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), mgr(_mgr), xid(_xid) {} void DtxManager::DtxCleanup::fire() diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h index 680b62eeb2..b85bcd7e76 100644 --- a/cpp/src/qpid/broker/DtxManager.h +++ b/cpp/src/qpid/broker/DtxManager.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,7 +26,6 @@ #include "qpid/broker/DtxWorkRecord.h" #include "qpid/broker/TransactionalStore.h" #include "qpid/framing/amqp_types.h" -#include "qpid/sys/Timer.h" #include "qpid/sys/Mutex.h" namespace qpid { @@ -39,22 +38,21 @@ class DtxManager{ { DtxManager& mgr; const std::string& xid; - - DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid); + + DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid); void fire(); }; WorkMap work; TransactionalStore* store; qpid::sys::Mutex lock; - qpid::sys::Timer& timer; + qpid::sys::Timer* timer; void remove(const std::string& xid); - DtxWorkRecord* getWork(const std::string& xid); - DtxWorkRecord* createWork(std::string xid); + DtxWorkRecord* createWork(const std::string& xid); public: - DtxManager(qpid::sys::Timer&); + DtxManager(sys::Timer&); ~DtxManager(); void start(const std::string& xid, DtxBuffer::shared_ptr work); void join(const std::string& xid, DtxBuffer::shared_ptr work); @@ -66,6 +64,15 @@ public: uint32_t getTimeout(const std::string& xid); void timedout(const std::string& xid); void setStore(TransactionalStore* store); + void setTimer(sys::Timer& t) { timer = &t; } + + // Used by cluster for replication. + template<class F> void each(F f) const { + for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i) + f(*i); + } + DtxWorkRecord* getWork(const std::string& xid); + bool exists(const std::string& xid); }; } diff --git a/cpp/src/qpid/broker/DtxTimeout.cpp b/cpp/src/qpid/broker/DtxTimeout.cpp index c4c52ec40a..58700846ef 100644 --- a/cpp/src/qpid/broker/DtxTimeout.cpp +++ b/cpp/src/qpid/broker/DtxTimeout.cpp @@ -25,7 +25,7 @@ using namespace qpid::broker; DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) - : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout"), timeout(_timeout), mgr(_mgr), xid(_xid) + : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout-"+_xid), timeout(_timeout), mgr(_mgr), xid(_xid) { } diff --git a/cpp/src/qpid/broker/DtxTimeout.h b/cpp/src/qpid/broker/DtxTimeout.h index 680a210e4f..1fcb4cee2a 100644 --- a/cpp/src/qpid/broker/DtxTimeout.h +++ b/cpp/src/qpid/broker/DtxTimeout.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,7 +29,9 @@ namespace broker { class DtxManager; -struct DtxTimeoutException : public Exception {}; +struct DtxTimeoutException : public Exception { + DtxTimeoutException(const std::string& msg=std::string()) : Exception(msg) {} +}; struct DtxTimeout : public sys::TimerTask { @@ -37,7 +39,7 @@ struct DtxTimeout : public sys::TimerTask DtxManager& mgr; const std::string xid; - DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid); + DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid); void fire(); }; diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index 9f33e698db..a413fe418d 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,19 +28,19 @@ using qpid::sys::Mutex; using namespace qpid::broker; using namespace qpid::framing; -DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : +DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {} -DtxWorkRecord::~DtxWorkRecord() +DtxWorkRecord::~DtxWorkRecord() { - if (timeout.get()) { + if (timeout.get()) { timeout->cancel(); } } bool DtxWorkRecord::prepare() { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); if (check()) { txn = store->begin(xid); if (prepare(txn.get())) { @@ -68,7 +68,7 @@ bool DtxWorkRecord::prepare(TransactionContext* _txn) bool DtxWorkRecord::commit(bool onePhase) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); if (check()) { if (prepared) { //already prepared i.e. 2pc @@ -78,13 +78,13 @@ bool DtxWorkRecord::commit(bool onePhase) store->commit(*txn); txn.reset(); - + std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); return true; } else { //1pc commit optimisation, don't need a 2pc transaction context: if (!onePhase) { - throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!")); + throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!")); } std::auto_ptr<TransactionContext> localtxn = store->begin(); if (prepare(localtxn.get())) { @@ -107,16 +107,16 @@ bool DtxWorkRecord::commit(bool onePhase) void DtxWorkRecord::rollback() { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); check(); abort(); } void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); if (expired) { - throw DtxTimeoutException(); + throw DtxTimeoutException(QPID_MSG("Branch with xid " << xid << " has timed out.")); } if (completed) { throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!")); @@ -163,7 +163,7 @@ void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer void DtxWorkRecord::timedout() { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); expired = true; rolledback = true; if (!completed) { @@ -175,3 +175,17 @@ void DtxWorkRecord::timedout() } abort(); } + +size_t DtxWorkRecord::indexOf(const DtxBuffer::shared_ptr& buf) { + Work::iterator i = std::find(work.begin(), work.end(), buf); + if (i == work.end()) throw NotFoundException( + QPID_MSG("Can't find DTX buffer for xid: " << buf->getXid())); + return i - work.begin(); +} + +DtxBuffer::shared_ptr DtxWorkRecord::operator[](size_t i) const { + if (i > work.size()) + throw NotFoundException( + QPID_MSG("Can't find DTX buffer " << i << " for xid: " << xid)); + return work[i]; +} diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h index aec2d2aed4..331e42fefd 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/cpp/src/qpid/broker/DtxWorkRecord.h @@ -73,9 +73,19 @@ public: void timedout(); void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; } boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; } + std::string getXid() const { return xid; } + bool isCompleted() const { return completed; } + bool isRolledback() const { return rolledback; } + bool isPrepared() const { return prepared; } + bool isExpired() const { return expired; } + + // Used by cluster update; + size_t size() const { return work.size(); } + DtxBuffer::shared_ptr operator[](size_t i) const; + uint32_t getTimeout() const { return timeout? timeout->timeout : 0; } + size_t indexOf(const DtxBuffer::shared_ptr&); }; -} -} +}} // qpid::broker #endif diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index ab3868eda2..37d981d6c4 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -170,8 +170,8 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) if (!dtxSelected) { throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx")); } - dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); - txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); + dtxBuffer.reset(new DtxBuffer(xid)); + txBuffer = dtxBuffer; if (join) { mgr.join(xid, dtxBuffer); } else { @@ -239,7 +239,7 @@ void SemanticState::resumeDtx(const std::string& xid) checkDtxTimeout(); dtxBuffer->setSuspended(false); - txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); + txBuffer = dtxBuffer; } void SemanticState::checkDtxTimeout() diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 8c69d6b89b..8de884c113 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -65,7 +65,7 @@ class SessionContext; * * Message delivery is driven by ConsumerImpl::doOutput(), which is * called when a client's socket is ready to write data. - * + * */ class SemanticState : private boost::noncopyable { public: @@ -99,15 +99,15 @@ class SemanticState : private boost::noncopyable { public: typedef boost::shared_ptr<ConsumerImpl> shared_ptr; - ConsumerImpl(SemanticState* parent, + ConsumerImpl(SemanticState* parent, const std::string& name, boost::shared_ptr<Queue> queue, bool ack, bool acquire, bool exclusive, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); ~ConsumerImpl(); OwnershipToken* getSession(); - bool deliver(QueuedMessage& msg); - bool filter(boost::intrusive_ptr<Message> msg); - bool accept(boost::intrusive_ptr<Message> msg); + bool deliver(QueuedMessage& msg); + bool filter(boost::intrusive_ptr<Message> msg); + bool accept(boost::intrusive_ptr<Message> msg); void disableNotify(); void enableNotify(); @@ -122,7 +122,7 @@ class SemanticState : private boost::noncopyable { void addMessageCredit(uint32_t value); void flush(); void stop(); - void complete(DeliveryRecord&); + void complete(DeliveryRecord&); boost::shared_ptr<Queue> getQueue() const { return queue; } bool isBlocked() const { return blocked; } bool setBlocked(bool set) { std::swap(set, blocked); return set; } @@ -188,7 +188,7 @@ class SemanticState : private boost::noncopyable { const SessionContext& getSession() const { return session; } ConsumerImpl& find(const std::string& destination); - + /** * Get named queue, never returns 0. * @return: named queue @@ -196,11 +196,11 @@ class SemanticState : private boost::noncopyable { * @exception: ConnectionException if name="" and session has no default. */ boost::shared_ptr<Queue> getQueue(const std::string& name) const; - + bool exists(const std::string& consumerTag); - void consume(const std::string& destination, - boost::shared_ptr<Queue> queue, + void consume(const std::string& destination, + boost::shared_ptr<Queue> queue, bool ackRequired, bool acquire, bool exclusive, const std::string& resumeId=std::string(), uint64_t resumeTtl=0, const framing::FieldTable& = framing::FieldTable()); @@ -223,7 +223,7 @@ class SemanticState : private boost::noncopyable { void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); void recover(bool requeue); - void deliver(DeliveryRecord& message, bool sync); + void deliver(DeliveryRecord& message, bool sync); void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired); void release(DeliveryId first, DeliveryId last, bool setRedelivered); void reject(DeliveryId first, DeliveryId last); @@ -244,7 +244,9 @@ class SemanticState : private boost::noncopyable { DeliveryRecords& getUnacked() { return unacked; } framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; } TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; } + DtxBuffer::shared_ptr getDtxBuffer() const { return dtxBuffer; } void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; } + void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; } void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } void record(const DeliveryRecord& delivery); }; diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp index b509778e89..d92e6ace48 100644 --- a/cpp/src/qpid/broker/TxBuffer.cpp +++ b/cpp/src/qpid/broker/TxBuffer.cpp @@ -76,5 +76,5 @@ bool TxBuffer::commitLocal(TransactionalStore* const store) } void TxBuffer::accept(TxOpConstVisitor& v) const { - std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v))); + std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v))); } diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index f0b9c0a302..dba7878af2 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,57 +34,58 @@ #include <boost/intrusive_ptr.hpp> namespace qpid { - namespace broker { - /** - * Defines the behaviour for publish operations on a - * transactional channel. Messages are routed through - * exchanges when received but are not at that stage delivered - * to the matching queues, rather the queues are held in an - * instance of this class. On prepare() the message is marked - * enqueued to the relevant queues in the MessagesStore. On - * commit() the messages will be passed to the queue for - * dispatch or to be added to the in-memory queue. - */ - class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{ +namespace broker { +/** + * Defines the behaviour for publish operations on a + * transactional channel. Messages are routed through + * exchanges when received but are not at that stage delivered + * to the matching queues, rather the queues are held in an + * instance of this class. On prepare() the message is marked + * enqueued to the relevant queues in the MessagesStore. On + * commit() the messages will be passed to the queue for + * dispatch or to be added to the in-memory queue. + */ +class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{ - class Commit{ - boost::intrusive_ptr<Message>& msg; - public: - Commit(boost::intrusive_ptr<Message>& msg); - void operator()(const boost::shared_ptr<Queue>& queue); - }; - class Rollback{ - boost::intrusive_ptr<Message>& msg; - public: - Rollback(boost::intrusive_ptr<Message>& msg); - void operator()(const boost::shared_ptr<Queue>& queue); - }; + class Commit{ + boost::intrusive_ptr<Message>& msg; + public: + Commit(boost::intrusive_ptr<Message>& msg); + void operator()(const boost::shared_ptr<Queue>& queue); + }; + class Rollback{ + boost::intrusive_ptr<Message>& msg; + public: + Rollback(boost::intrusive_ptr<Message>& msg); + void operator()(const boost::shared_ptr<Queue>& queue); + }; - boost::intrusive_ptr<Message> msg; - std::list<boost::shared_ptr<Queue> > queues; - std::list<boost::shared_ptr<Queue> > prepared; + boost::intrusive_ptr<Message> msg; + std::list<boost::shared_ptr<Queue> > queues; + std::list<boost::shared_ptr<Queue> > prepared; - void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>); + void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>); - public: - QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg); - QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw(); - QPID_BROKER_EXTERN virtual void commit() throw(); - QPID_BROKER_EXTERN virtual void rollback() throw(); + public: + QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg); + QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw(); + QPID_BROKER_EXTERN virtual void commit() throw(); + QPID_BROKER_EXTERN virtual void rollback() throw(); - virtual Message& getMessage() { return *msg; }; - - QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue); + virtual Message& getMessage() { return *msg; }; - virtual ~TxPublish(){} - virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } + QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue); - QPID_BROKER_EXTERN uint64_t contentSize(); + virtual ~TxPublish(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } - boost::intrusive_ptr<Message> getMessage() const { return msg; } - const std::list<boost::shared_ptr<Queue> > getQueues() const { return queues; } - }; - } + QPID_BROKER_EXTERN uint64_t contentSize(); + + boost::intrusive_ptr<Message> getMessage() const { return msg; } + const std::list<boost::shared_ptr<Queue> >& getQueues() const { return queues; } + const std::list<boost::shared_ptr<Queue> >& getPrepared() const { return prepared; } +}; +} } diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 82ed8bf8c9..1c398d63f4 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -57,12 +57,12 @@ * - management::ManagementBroker: uses MessageHandler supplied by cluster * to send messages to the broker via the cluster. * - * - Dtx: not yet supported with cluster. - * - * cluster::ExpiryPolicy implements the strategy for message expiry. + * cluster::ExpiryPolicy uses cluster time. * * ClusterTimer implements periodic timed events in the cluster context. - * Used for periodic management events. + * Used for: + * - periodic management events. + * - DTX transaction timeouts. * * <h1>CLUSTER PROTOCOL OVERVIEW</h1> * @@ -199,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1128070; +const uint32_t Cluster::CLUSTER_VERSION = 1159329; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 030d6e34c1..0691aae711 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -24,6 +24,8 @@ #include "Cluster.h" #include "UpdateReceiver.h" #include "qpid/assert.h" +#include "qpid/broker/DtxAck.h" +#include "qpid/broker/DtxBuffer.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/TxBuffer.h" @@ -114,7 +116,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, if (!updateIn.nextShadowMgmtId.empty()) connectionCtor.mgmtId = updateIn.nextShadowMgmtId; updateIn.nextShadowMgmtId.clear(); - } + } init(); QPID_LOG(debug, cluster << " local connection " << *this); } @@ -167,7 +169,7 @@ void Connection::announce( AMQFrame frame; while (frame.decode(buf)) connection->received(frame); - connection->setUserId(username); + connection->setUserId(username); } // Do managment actions now that the connection is replicated. connection->raiseConnectEvent(); @@ -214,16 +216,9 @@ void Connection::received(framing::AMQFrame& f) { } } -bool Connection::checkUnsupported(const AMQBody& body) { - std::string message; - if (body.getMethod()) { - switch (body.getMethod()->amqpClassId()) { - case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; - } - } - if (!message.empty()) - connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message); - return !message.empty(); +bool Connection::checkUnsupported(const AMQBody&) { + // Throw an exception for unsupported commands. Currently all are supported. + return false; } struct GiveReadCreditOnExit { @@ -464,11 +459,21 @@ void Connection::shadowReady( output.setSendMax(sendMax); } +void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) { + broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); + broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID + uint32_t index = v.first.second; // Index + v.second->setDtxBuffer((*record)[index]); +} + +// Marks the end of the update. void Connection::membership(const FieldTable& joiners, const FieldTable& members, const framing::SequenceNumber& frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); updateIn.consumerNumbering.clear(); + for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(), + boost::bind(&Connection::setDtxBuffer, this, _1)); closeUpdated(); cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); } @@ -536,8 +541,16 @@ void Connection::deliveryRecord(const string& qname, } else { // Message at original position in original queue m = queue->find(position); } - if (!m.payload) - throw Exception(QPID_MSG("deliveryRecord no update message")); + // FIXME aconway 2011-08-19: removed: + // if (!m.payload) + // throw Exception(QPID_MSG("deliveryRecord no update message")); + // + // It seems this could happen legitimately in the case one + // session browses message M, then another session acquires + // it. In that case the browsers delivery record is !acquired + // but the message is not on its original Queue. In that case + // we'll get a deliveryRecord with no payload for the browser. + // } broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit); @@ -545,7 +558,11 @@ void Connection::deliveryRecord(const string& qname, if (cancelled) dr.cancel(dr.getTag()); if (completed) dr.complete(); if (ended) dr.setEnded(); // Exsitance of message - semanticState().record(dr); // Part of the session's unacked list. + + if (dtxBuffer) // Record for next dtx-ack + dtxAckRecords.push_back(dr); + else + semanticState().record(dr); // Record on session's unacked list. } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { @@ -561,29 +578,29 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri namespace { - // find a StatefulQueueObserver that matches a given identifier - class ObserverFinder { - const std::string id; - boost::shared_ptr<broker::QueueObserver> target; - ObserverFinder(const ObserverFinder&) {} - public: - ObserverFinder(const std::string& _id) : id(_id) {} - broker::StatefulQueueObserver *getObserver() - { - if (target) - return dynamic_cast<broker::StatefulQueueObserver *>(target.get()); - return 0; - } - void operator() (boost::shared_ptr<broker::QueueObserver> o) - { - if (!target) { - broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); - if (p && p->getId() == id) { - target = o; - } +// find a StatefulQueueObserver that matches a given identifier +class ObserverFinder { + const std::string id; + boost::shared_ptr<broker::QueueObserver> target; + ObserverFinder(const ObserverFinder&) {} + public: + ObserverFinder(const std::string& _id) : id(_id) {} + broker::StatefulQueueObserver *getObserver() + { + if (target) + return dynamic_cast<broker::StatefulQueueObserver *>(target.get()); + return 0; + } + void operator() (boost::shared_ptr<broker::QueueObserver> o) + { + if (!target) { + broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); + if (p && p->getId() == id) { + target = o; } } - }; + } +}; } @@ -615,6 +632,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { void Connection::txStart() { txBuffer.reset(new broker::TxBuffer()); } + void Connection::txAccept(const framing::SequenceSet& acked) { txBuffer->enlist(boost::shared_ptr<broker::TxAccept>( new broker::TxAccept(acked, semanticState().getUnacked()))); @@ -630,8 +648,10 @@ void Connection::txEnqueue(const std::string& queue) { new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload))); } -void Connection::txPublish(const framing::Array& queues, bool delivered) { - boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload)); +void Connection::txPublish(const framing::Array& queues, bool delivered) +{ + boost::shared_ptr<broker::TxPublish> txPub( + new broker::TxPublish(getUpdateMessage().payload)); for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) txPub->deliverTo(findQueue((*i)->get<std::string>())); txPub->delivered = delivered; @@ -646,6 +666,50 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { semanticState().setAccumulatedAck(s); } +void Connection::dtxStart(const std::string& xid, + bool ended, + bool suspended, + bool failed, + bool expired) +{ + dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired)); + txBuffer = dtxBuffer; +} + +void Connection::dtxEnd() { + broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); + std::string xid = dtxBuffer->getXid(); + if (mgr.exists(xid)) + mgr.join(xid, dtxBuffer); + else + mgr.start(xid, dtxBuffer); + dtxBuffer.reset(); + txBuffer.reset(); +} + +// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords +void Connection::dtxAck() { + dtxBuffer->enlist( + boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords))); + dtxAckRecords.clear(); +} + +void Connection::dtxBufferRef(const std::string& xid, uint32_t index) { + // Save the association between DtxBuffer and session so we can + // set the DtxBuffer on the session at the end of the update + // when the DtxManager has been replicated. + updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState(); +} + +// Sent at end of work record. +void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout) +{ + broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); + if (timeout) mgr.setTimeout(xid, timeout); + if (prepared) mgr.prepare(xid); +} + + void Connection::exchange(const std::string& encoded) { Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf); diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index a9740f97f8..5133e4641e 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -29,6 +29,7 @@ #include "qpid/RefCounted.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/SecureConnection.h" #include "qpid/broker/SemanticState.h" #include "qpid/amqp_0_10/Connection.h" @@ -164,6 +165,17 @@ class Connection : void txEnd(); void accumulatedAck(const framing::SequenceSet&); + // Dtx state + void dtxStart(const std::string& xid, + bool ended, + bool suspended, + bool failed, + bool expired); + void dtxEnd(); + void dtxAck(); + void dtxBufferRef(const std::string& xid, uint32_t index); + void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout); + // Encoded exchange replication. void exchange(const std::string& encoded); @@ -251,7 +263,7 @@ class Connection : broker::SemanticState& semanticState(); broker::QueuedMessage getUpdateMessage(); void closeUpdated(); - + void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &); Cluster& cluster; ConnectionId self; bool catchUp; @@ -263,6 +275,9 @@ class Connection : framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; + boost::shared_ptr<broker::DtxBuffer> dtxBuffer; + broker::DeliveryRecords dtxAckRecords; + broker::DtxWorkRecord* dtxCurrent; bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; UpdateReceiver& updateIn; diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index fc104e8ca9..a5662bb2b3 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -45,6 +45,8 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/TxOpVisitor.h" #include "qpid/broker/DtxAck.h" +#include "qpid/broker/DtxBuffer.h" +#include "qpid/broker/DtxWorkRecord.h" #include "qpid/broker/TxAccept.h" #include "qpid/broker/TxPublish.h" #include "qpid/broker/RecoveredDequeue.h" @@ -65,6 +67,7 @@ #include <boost/bind.hpp> #include <boost/cast.hpp> #include <algorithm> +#include <iterator> #include <sstream> namespace qpid { @@ -177,9 +180,9 @@ void UpdateClient::update() { // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); + std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); - session.queueDelete(arg::queue=UPDATE); // some Queue Observers need session state & msgs synced first, so sync observers now b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1)); @@ -189,6 +192,8 @@ void UpdateClient::update() { updateLinks(); updateManagementAgent(); + updateDtxManager(); + session.queueDelete(arg::queue=UPDATE); session.close(); @@ -356,7 +361,8 @@ class MessageUpdater { for (uint64_t offset = 0; morecontent; offset += maxContentSize) { AMQFrame frame((AMQContentBody())); - morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); + morecontent = message.payload->getContentFrame( + *(message.queue), frame, maxContentSize, offset); sb.get()->sendRawFrame(frame); } } @@ -479,9 +485,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, *this << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), - boost::bind(&UpdateClient::updateUnacked, this, _1)); + boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession)); - updateTxState(ss->getSemanticState()); // Tx transaction state. + updateTransactionState(ss->getSemanticState()); // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); @@ -542,14 +548,18 @@ void UpdateClient::updateConsumer( << " on " << shadowSession.getId()); } -void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { - if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { +void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr, + client::AsyncSession& updateSession) +{ + if (!dr.isEnded() && dr.isAcquired()) { + // FIXME aconway 2011-08-19: should this be assert or if? + assert(dr.getMessage().payload); // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. // - MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage()); + MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage()); } - ClusterConnectionProxy(shadowSession).deliveryRecord( + ClusterConnectionProxy(updateSession).deliveryRecord( dr.getQueue()->getName(), dr.getMessage().position, dr.getTag(), @@ -570,8 +580,10 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry) : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {} - void operator()(const broker::DtxAck& ) { - throw InternalErrorException("DTX transactions not currently supported by cluster."); + void operator()(const broker::DtxAck& ack) { + std::for_each(ack.getPending().begin(), ack.getPending().end(), + boost::bind(&UpdateClient::updateUnacked, &parent, _1, session)); + proxy.dtxAck(); } void operator()(const broker::RecoveredDequeue& rdeq) { @@ -588,13 +600,18 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { proxy.txAccept(txAccept.getAcked()); } + typedef std::list<Queue::shared_ptr> QueueList; + + void copy(const QueueList& l, Array& a) { + for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i) + a.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + } + void operator()(const broker::TxPublish& txPub) { updateMessage(txPub.getMessage()); - typedef std::list<Queue::shared_ptr> QueueList; - const QueueList& qlist = txPub.getQueues(); + assert(txPub.getQueues().empty() || txPub.getPrepared().empty()); Array qarray(TYPE_CODE_STR8); - for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) - qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray); proxy.txPublish(qarray, txPub.delivered); } @@ -604,19 +621,33 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { ClusterConnectionProxy proxy; }; -void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, *this << " updating TX transaction state."); +void UpdateClient::updateTransactionState(broker::SemanticState& s) { + broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); + broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); - broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); - if (txBuffer) { + if (dtx) { + broker::DtxWorkRecord* record = + updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found + proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx)); + } else if (tx) { + ClusterConnectionProxy proxy(shadowSession); proxy.txStart(); TxOpUpdater updater(*this, shadowSession, expiry); - txBuffer->accept(updater); + tx->accept(updater); proxy.txEnd(); } } +void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) { + ClusterConnectionProxy proxy(session); + proxy.dtxStart( + dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired()); + TxOpUpdater updater(*this, session, expiry); + dtx->accept(updater); + proxy.dtxEnd(); +} + void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) { queue->getListeners().eachListener( boost::bind(&UpdateClient::updateQueueListener, this, queue->getName(), _1)); @@ -667,5 +698,17 @@ void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q, } } +void UpdateClient::updateDtxManager() { + broker::DtxManager& dtm = updaterBroker.getDtxManager(); + dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1)); +} + +void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) { + QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid()); + for (size_t i = 0; i < r.size(); ++i) + updateDtxBuffer(r[i]); + ClusterConnectionProxy(session).dtxWorkRecord( + r.getXid(), r.isPrepared(), r.getTimeout()); +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 21bf6024e0..83d4cfac81 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -52,7 +52,7 @@ class Decoder; class Link; class Bridge; class QueueObserver; - +class DtxBuffer; } // namespace broker namespace cluster { @@ -88,7 +88,7 @@ class UpdateClient : public sys::Runnable { void update(); void run(); // Will delete this when finished. - void updateUnacked(const broker::DeliveryRecord&); + void updateUnacked(const broker::DeliveryRecord&, client::AsyncSession&); private: void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&); @@ -100,7 +100,7 @@ class UpdateClient : public sys::Runnable { void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding); void updateConnection(const boost::intrusive_ptr<Connection>& connection); void updateSession(broker::SessionHandler& s); - void updateTxState(broker::SemanticState& s); + void updateTransactionState(broker::SemanticState& s); void updateOutputTask(const sys::OutputTask* task); void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); void updateQueueListeners(const boost::shared_ptr<broker::Queue>&); @@ -112,6 +112,9 @@ class UpdateClient : public sys::Runnable { void updateBridge(const boost::shared_ptr<broker::Bridge>&); void updateQueueObservers(const boost::shared_ptr<broker::Queue>&); void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>); + void updateDtxManager(); + void updateDtxBuffer(const boost::shared_ptr<broker::DtxBuffer>& ); + void updateDtxWorkRecord(const broker::DtxWorkRecord&); Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering; diff --git a/cpp/src/qpid/cluster/UpdateReceiver.h b/cpp/src/qpid/cluster/UpdateReceiver.h index 7e8ce47662..512e59e5a1 100644 --- a/cpp/src/qpid/cluster/UpdateReceiver.h +++ b/cpp/src/qpid/cluster/UpdateReceiver.h @@ -39,6 +39,13 @@ class UpdateReceiver { /** Management-id for the next shadow connection */ std::string nextShadowMgmtId; + + /** Relationship between DtxBuffers, identified by xid, index in DtxManager, + * and sessions represented by their SemanticState. + */ + typedef std::pair<std::string, uint32_t> DtxBufferRef; + typedef std::map<DtxBufferRef, broker::SemanticState* > DtxBuffers; + DtxBuffers dtxBuffers; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/log/Statement.cpp b/cpp/src/qpid/log/Statement.cpp index 6a32b50096..85b4d1f155 100644 --- a/cpp/src/qpid/log/Statement.cpp +++ b/cpp/src/qpid/log/Statement.cpp @@ -24,35 +24,9 @@ #include <ctype.h> namespace qpid { +std::string quote(const std::string& str); // Defined in Msg.cpp namespace log { -namespace { -using namespace std; - -struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } }; - -const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' }; - -std::string quote(const std::string& str) { - NonPrint nonPrint; - size_t n = std::count_if(str.begin(), str.end(), nonPrint); - if (n==0) return str; - std::string ret; - ret.reserve(str.size()+2*n); // Avoid extra allocations. - for (string::const_iterator i = str.begin(); i != str.end(); ++i) { - if (nonPrint(*i)) { - ret.push_back('\\'); - ret.push_back('x'); - ret.push_back(hex[((*i) >> 4)&0xf]); - ret.push_back(hex[(*i) & 0xf]); - } - else ret.push_back(*i); - } - return ret; -} - -} - void Statement::log(const std::string& message) { Logger::instance().log(*this, quote(message)); } diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index fd972b4394..7888f44c30 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -493,9 +493,7 @@ class BrokerTest(TestCase): return cluster def browse(self, session, queue, timeout=0): - """Assert that the contents of messages on queue (as retrieved - using session and timeout) exactly match the strings in - expect_contents""" + """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) try: contents = [] diff --git a/cpp/src/tests/cluster_python_tests_failing.txt b/cpp/src/tests/cluster_python_tests_failing.txt index 7ba8089946..f8639d7b59 100644 --- a/cpp/src/tests/cluster_python_tests_failing.txt +++ b/cpp/src/tests/cluster_python_tests_failing.txt @@ -1,32 +1,4 @@ qpid_tests.broker_0_10.management.ManagementTest.test_purge_queue qpid_tests.broker_0_10.management.ManagementTest.test_connection_close -qpid_tests.broker_0_10.dtx.DtxTests.test_bad_resume -qpid_tests.broker_0_10.dtx.DtxTests.test_commit_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_end -qpid_tests.broker_0_10.dtx.DtxTests.test_end_suspend_and_fail -qpid_tests.broker_0_10.dtx.DtxTests.test_end_unknown_xid -qpid_tests.broker_0_10.dtx.DtxTests.test_forget_xid_on_completion -qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout -qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_implicit_end -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_not_ended -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_false -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_true -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_prepare_not_ended -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_rollback_not_ended -qpid_tests.broker_0_10.dtx.DtxTests.test_prepare_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_recover -qpid_tests.broker_0_10.dtx.DtxTests.test_rollback_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_select_required -qpid_tests.broker_0_10.dtx.DtxTests.test_set_timeout -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_commit -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_commit -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_rollback -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_rollback -qpid_tests.broker_0_10.dtx.DtxTests.test_start_already_known -qpid_tests.broker_0_10.dtx.DtxTests.test_start_join -qpid_tests.broker_0_10.dtx.DtxTests.test_start_join_and_resume -qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_resume -qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_start_end_resume qpid_tests.broker_0_10.message.MessageTests.test_ttl qpid_tests.broker_0_10.management.ManagementTest.test_broker_connectivity_oldAPI diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 807e9508c3..e67a691283 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -713,6 +713,204 @@ acl allow all all cluster.start() fetch(cluster[2]) +# Some utility code for transaction tests +XA_RBROLLBACK = 1 +XA_RBTIMEOUT = 2 +XA_OK = 0 +dtx_branch_counter = 0 + +class DtxTestFixture: + """Bundle together some common requirements for dtx tests.""" + def __init__(self, test, broker, name, exclusive=False): + self.test = test + self.broker = broker + self.name = name + # Use old API. DTX is not supported in messaging API. + self.connection = broker.connect_old() + self.session = self.connection.session(name, 1) # 1 second timeout + self.queue = self.session.queue_declare(name, exclusive=exclusive) + self.xid = self.session.xid(format=0, global_id=name) + self.session.dtx_select() + self.consumer = None + + def start(self): + self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid).status) + + def end(self): + self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid).status) + + def prepare(self): + self.test.assertEqual(XA_OK, self.session.dtx_prepare(xid=self.xid).status) + + def commit(self, one_phase=True): + self.test.assertEqual( + XA_OK, self.session.dtx_commit(xid=self.xid, one_phase=one_phase).status) + + def rollback(self): + self.test.assertEqual(XA_OK, self.session.dtx_rollback(xid=self.xid).status) + + def send(self, messages): + for m in messages: + dp=self.session.delivery_properties(routing_key=self.name) + mp=self.session.message_properties() + self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m)) + + def accept(self): + """Accept 1 message from queue""" + consumer_tag="%s-consumer"%(self.name) + self.session.message_subscribe(queue=self.name, destination=consumer_tag) + self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag) + self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) + msg = self.session.incoming(consumer_tag).get(timeout=1) + self.session.message_cancel(destination=consumer_tag) + self.session.message_accept(qpid.datatypes.RangedSet(msg.id)) + return msg + + + def verify(self, cluster, messages): + for b in cluster: + self.test.assert_browse(b.connect().session(), self.name, messages) + + +class DtxTests(BrokerTest): + + def test_dtx_update(self): + """Verify that DTX transaction state is updated to a new broker. + Start a collection of transactions, then add a new cluster member, + then verify they commit/rollback correctly on the new broker.""" + + # Note: multiple test have been bundled into one to avoid the need to start/stop + # multiple brokers per test. + + cluster=self.cluster(1) + + # Transaction that will be open when new member joins, then committed. + t1 = DtxTestFixture(self, cluster[0], "t1") + t1.start() + t1.send(["1", "2"]) + t1.verify(cluster, []) # Not visible outside of transaction + + # Transaction that will be open when new member joins, then rolled back. + t2 = DtxTestFixture(self, cluster[0], "t2") + t2.start() + t2.send(["1", "2"]) + + # Transaction that will be prepared when new member joins, then committed. + t3 = DtxTestFixture(self, cluster[0], "t3") + t3.start() + t3.send(["1", "2"]) + t3.end() + t3.prepare() + t1.verify(cluster, []) # Not visible outside of transaction + + # Transaction that will be prepared when new member joins, then rolled back. + t4 = DtxTestFixture(self, cluster[0], "t4") + t4.start() + t4.send(["1", "2"]) + t4.end() + t4.prepare() + + # Transaction using an exclusive queue + t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True) + t5.start() + t5.send(["1", "2"]) + + # Accept messages in a transaction before/after join then commit + t6 = DtxTestFixture(self, cluster[0], "t6") + t6.send(["a","b","c"]) + t6.start() + t6.verify(cluster, ["a","b","c"]) + self.assertEqual(t6.accept().body, "a"); + t6.verify(cluster, ["b","c"]) + + # Accept messages in a transaction before/after join then roll back + t7 = DtxTestFixture(self, cluster[0], "t7") + t7.send(["a","b","c"]) + t7.start() + t7.verify(cluster, ["a","b","c"]) + self.assertEqual(t7.accept().body, "a"); + t7.verify(cluster, ["b","c"]) + + # Start new member + cluster.start() + + # Commit t1 + t1.send(["3","4"]) + t1.verify(cluster, []) + t1.end() + t1.commit(one_phase=True) + t1.verify(cluster, ["1","2","3","4"]) + + # Rollback t2 + t2.send(["3","4"]) + t2.verify(cluster, []) + t2.end() + t2.rollback() + t2.verify(cluster, []) + + # Commit t3 + t3.verify(cluster, []) + t3.commit(one_phase=False) + t3.verify(cluster, ["1","2"]) + + # Rollback t4 + t4.verify(cluster, []) + t4.rollback() + t4.verify(cluster, []) + + # Commit t5 + t5.send(["3","4"]) + t5.verify(cluster, []) + t5.end() + t5.commit(one_phase=True) + t5.verify(cluster, ["1","2","3","4"]) + + # Commit t7 + t6.verify(cluster, ["b", "c"]) + self.assertEqual(t6.accept().body, "b"); + t6.verify(cluster, ["c"]) + t6.end() + t6.commit(one_phase=True) + t6.verify(cluster, ["c"]) + t6.session.close() # Make sure they're not requeued by the session. + t6.verify(cluster, ["c"]) + + # Rollback t7 + t7.verify(cluster, ["b", "c"]) + self.assertEqual(t7.accept().body, "b"); + t7.verify(cluster, ["c"]) + t7.end() + t7.rollback() + t7.verify(cluster, ["a", "b", "c"]) + + +class TxTests(BrokerTest): + + def test_tx_update(self): + """Verify that transaction state is updated to a new broker""" + + def make_message(session, body=None, key=None, id=None): + dp=session.delivery_properties(routing_key=key) + mp=session.message_properties(correlation_id=id) + return qpid.datatypes.Message(dp, mp, body) + + cluster=self.cluster(1) + # Use old API. TX is not supported in messaging API. + c = cluster[0].connect_old() + s = c.session("tx-session", 1) + s.queue_declare(queue="q") + # Start transaction + s.tx_select() + s.message_transfer(message=make_message(s, "1", "q")) + # Start new member mid-transaction + cluster.start() + # Do more work + s.message_transfer(message=make_message(s, "2", "q")) + # Commit the transaction and verify the results. + s.tx_commit() + for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"]) + + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -1001,6 +1199,8 @@ class LongTests(BrokerTest): logger = logging.getLogger() log_level = logger.getEffectiveLevel() logger.setLevel(logging.ERROR) + sender = None + receiver = None try: # Start sender and receiver threads receiver = Receiver(cluster[0], "q;{create:always}") @@ -1031,8 +1231,8 @@ class LongTests(BrokerTest): finally: # Detach to avoid slow reconnect attempts during shut-down if test fails. - sender.connection.detach() - receiver.connection.detach() + if sender: sender.connection.detach() + if receiver: receiver.connection.detach() logger.setLevel(log_level) class StoreTests(BrokerTest): diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index c33f2e4852..f49f8dbfff 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -212,6 +212,29 @@ <field name="name" type="str8"/> </control> + <!-- Dtx transaction state. --> + <control name="dtx-start" code="0x1A"> + <field name="xid" type="str16"/> + <field name="ended" type="bit"/> + <field name="suspended" type="bit"/> + <field name="failed" type="bit"/> + <field name="expired" type="bit"/> + </control> + <control name="dtx-end" code="0x1B"/> + + <control name="dtx-ack" code="0x1C"/> + + <control name="dtx-buffer-ref" code="0x1D"> + <field name="xid" type="str16"/> + <field name="index" type="uint32"/> + </control> + + <control name="dtx-work-record" code="0x1E"> + <field name="xid" type="str16"/> + <field name="prepared" type="bit"/> + <field name="timeout" type="uint32"/> + </control> + <!-- Complete a session state update. --> <control name="session-state" code="0x1F"> <!-- Target session deduced from channel number. --> |