summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-08-25 20:41:28 +0000
committerAlan Conway <aconway@apache.org>2011-08-25 20:41:28 +0000
commit2fdd2cc2ade41e213ae35818532574bbf40f4a00 (patch)
tree42fb45022ea08fee157abf50713b452acf5eda5d /cpp
parent7f99badd1c330b3a6032b15a13aca1cde81274d3 (diff)
downloadqpid-python-2fdd2cc2ade41e213ae35818532574bbf40f4a00.tar.gz
QPID-3384: Enable DTX transactions in a cluster.
- Replicate DTX state to new members joining. - Use cluster timer for DTX timeouts. - Incidental: quote nulls in qpid::Msg messages (XIDs often have null characters) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1161742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/qpid/Msg.h2
-rwxr-xr-xcpp/rubygen/amqpgen.rb3
-rw-r--r--cpp/src/Makefile.am3
-rw-r--r--cpp/src/qpid/Msg.cpp55
-rw-r--r--cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--cpp/src/qpid/broker/DtxAck.cpp4
-rw-r--r--cpp/src/qpid/broker/DtxAck.h35
-rw-r--r--cpp/src/qpid/broker/DtxBuffer.cpp22
-rw-r--r--cpp/src/qpid/broker/DtxBuffer.h53
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp40
-rw-r--r--cpp/src/qpid/broker/DtxManager.h25
-rw-r--r--cpp/src/qpid/broker/DtxTimeout.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxTimeout.h10
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.cpp40
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.h14
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp6
-rw-r--r--cpp/src/qpid/broker/SemanticState.h24
-rw-r--r--cpp/src/qpid/broker/TxBuffer.cpp2
-rw-r--r--cpp/src/qpid/broker/TxPublish.h91
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp10
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp140
-rw-r--r--cpp/src/qpid/cluster/Connection.h17
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp81
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h9
-rw-r--r--cpp/src/qpid/cluster/UpdateReceiver.h7
-rw-r--r--cpp/src/qpid/log/Statement.cpp28
-rw-r--r--cpp/src/tests/brokertest.py4
-rw-r--r--cpp/src/tests/cluster_python_tests_failing.txt28
-rwxr-xr-xcpp/src/tests/cluster_tests.py204
-rw-r--r--cpp/xml/cluster.xml23
30 files changed, 699 insertions, 284 deletions
diff --git a/cpp/include/qpid/Msg.h b/cpp/include/qpid/Msg.h
index e1837c29e5..a4deb15c26 100644
--- a/cpp/include/qpid/Msg.h
+++ b/cpp/include/qpid/Msg.h
@@ -41,7 +41,7 @@ struct Msg {
std::ostringstream os;
Msg() {}
Msg(const Msg& m) : os(m.str()) {}
- std::string str() const { return os.str(); }
+ std::string str() const;
operator std::string() const { return str(); }
Msg& operator<<(long n) { os << n; return *this; }
diff --git a/cpp/rubygen/amqpgen.rb b/cpp/rubygen/amqpgen.rb
index 20aac35194..88720cad5f 100755
--- a/cpp/rubygen/amqpgen.rb
+++ b/cpp/rubygen/amqpgen.rb
@@ -191,7 +191,8 @@ class AmqpElement
"command-fragments" => "session.command-fragment",
"in-doubt" => "dtx.xid",
"tx-publish" => "str-8",
- "queues" => "str-8"
+ "queues" => "str-8",
+ "prepared" => "str-8"
}
def array_type(name)
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 4f9d7ab28a..e502000758 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -89,7 +89,7 @@ rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate . ../include $(specs) all
$(rgen_srcs) $(srcdir)/rubygen.mk: rgen.timestamp
rgen.timestamp: $(rgen_generator) $(specs)
- $(rgen_cmd) $(srcdir)/rubygen.mk; touch $@
+ $(rgen_cmd) $(srcdir)/rubygen.mk && touch $@
$(rgen_generator):
# The CMake version is needed for dist
@@ -435,6 +435,7 @@ libqpidcommon_la_SOURCES += \
qpid/log/OstreamOutput.h \
qpid/log/Selector.cpp \
qpid/log/Statement.cpp \
+ qpid/Msg.cpp \
qpid/management/Buffer.cpp \
qpid/management/ConnectionSettings.cpp \
qpid/management/Manageable.cpp \
diff --git a/cpp/src/qpid/Msg.cpp b/cpp/src/qpid/Msg.cpp
new file mode 100644
index 0000000000..d441cdd180
--- /dev/null
+++ b/cpp/src/qpid/Msg.cpp
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Msg.h"
+#include <string>
+
+namespace qpid {
+using namespace std;
+
+struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } };
+
+const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
+
+std::string quote(const std::string& str) {
+ NonPrint nonPrint;
+ size_t n = std::count_if(str.begin(), str.end(), nonPrint);
+ if (n==0) return str;
+ std::string ret;
+ ret.reserve(str.size()+2*n); // Avoid extra allocations.
+ for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
+ if (nonPrint(*i)) {
+ ret.push_back('\\');
+ ret.push_back('x');
+ ret.push_back(hex[((*i) >> 4)&0xf]);
+ ret.push_back(hex[(*i) & 0xf]);
+ }
+ else ret.push_back(*i);
+ }
+ return ret;
+}
+
+// Quote the string so messages with null characters are preserved, e.g. messages with XIDs */
+std::string Msg::str() const {
+ return quote(os.str());
+}
+
+} // namespace qpid
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 240b9063f8..598c43b1d8 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -752,6 +752,7 @@ bool Broker::deferDeliveryImpl(const std::string& ,
void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
clusterTimer = t;
queueCleaner.setTimer(clusterTimer.get());
+ dtxManager.setTimer(*clusterTimer.get());
}
const std::string Broker::TCP_TRANSPORT("tcp");
diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp
index bca3f90bbe..c558681d62 100644
--- a/cpp/src/qpid/broker/DtxAck.cpp
+++ b/cpp/src/qpid/broker/DtxAck.cpp
@@ -32,6 +32,10 @@ DtxAck::DtxAck(const qpid::framing::SequenceSet& acked, DeliveryRecords& unacked
not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
}
+DtxAck::DtxAck(DeliveryRecords& unacked) {
+ pending = unacked;
+}
+
bool DtxAck::prepare(TransactionContext* ctxt) throw()
{
try{
diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h
index 166147e58d..16c3ff8ba0 100644
--- a/cpp/src/qpid/broker/DtxAck.h
+++ b/cpp/src/qpid/broker/DtxAck.h
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_DTXACK_H
+#define QPID_BROKER_DTXACK_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,9 +21,6 @@
* under the License.
*
*/
-#ifndef _DtxAck_
-#define _DtxAck_
-
#include <algorithm>
#include <functional>
#include <list>
@@ -29,20 +29,21 @@
#include "qpid/broker/TxOp.h"
namespace qpid {
- namespace broker {
- class DtxAck : public TxOp{
- DeliveryRecords pending;
+namespace broker {
+class DtxAck : public TxOp{
+ DeliveryRecords pending;
- public:
- DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked);
- virtual bool prepare(TransactionContext* ctxt) throw();
- virtual void commit() throw();
- virtual void rollback() throw();
- virtual ~DtxAck(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
- };
- }
-}
+ public:
+ DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked);
+ DtxAck(DeliveryRecords& unacked);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~DtxAck(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+ const DeliveryRecords& getPending() const { return pending; }
+};
+}} // qpid::broker
-#endif
+#endif /*!QPID_BROKER_DTXACK_H*/
diff --git a/cpp/src/qpid/broker/DtxBuffer.cpp b/cpp/src/qpid/broker/DtxBuffer.cpp
index f1b8169cf7..13177d3b72 100644
--- a/cpp/src/qpid/broker/DtxBuffer.cpp
+++ b/cpp/src/qpid/broker/DtxBuffer.cpp
@@ -23,8 +23,11 @@
using namespace qpid::broker;
using qpid::sys::Mutex;
-DtxBuffer::DtxBuffer(const std::string& _xid)
- : xid(_xid), ended(false), suspended(false), failed(false), expired(false) {}
+DtxBuffer::DtxBuffer(
+ const std::string& _xid,
+ bool ended_, bool suspended_, bool failed_, bool expired_)
+ : xid(_xid), ended(ended_), suspended(suspended_), failed(failed_), expired(expired_)
+{}
DtxBuffer::~DtxBuffer() {}
@@ -34,7 +37,7 @@ void DtxBuffer::markEnded()
ended = true;
}
-bool DtxBuffer::isEnded()
+bool DtxBuffer::isEnded() const
{
Mutex::ScopedLock locker(lock);
return ended;
@@ -45,7 +48,7 @@ void DtxBuffer::setSuspended(bool isSuspended)
suspended = isSuspended;
}
-bool DtxBuffer::isSuspended()
+bool DtxBuffer::isSuspended() const
{
return suspended;
}
@@ -58,13 +61,13 @@ void DtxBuffer::fail()
ended = true;
}
-bool DtxBuffer::isRollbackOnly()
+bool DtxBuffer::isRollbackOnly() const
{
Mutex::ScopedLock locker(lock);
return failed;
}
-const std::string& DtxBuffer::getXid()
+std::string DtxBuffer::getXid() const
{
return xid;
}
@@ -76,8 +79,13 @@ void DtxBuffer::timedout()
fail();
}
-bool DtxBuffer::isExpired()
+bool DtxBuffer::isExpired() const
{
Mutex::ScopedLock locker(lock);
return expired;
}
+
+bool DtxBuffer::isFailed() const
+{
+ return failed;
+}
diff --git a/cpp/src/qpid/broker/DtxBuffer.h b/cpp/src/qpid/broker/DtxBuffer.h
index 1511cb032f..cabd37647a 100644
--- a/cpp/src/qpid/broker/DtxBuffer.h
+++ b/cpp/src/qpid/broker/DtxBuffer.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,31 +26,34 @@
#include "qpid/sys/Mutex.h"
namespace qpid {
- namespace broker {
- class DtxBuffer : public TxBuffer{
- sys::Mutex lock;
- const std::string xid;
- bool ended;
- bool suspended;
- bool failed;
- bool expired;
+namespace broker {
+class DtxBuffer : public TxBuffer{
+ mutable sys::Mutex lock;
+ const std::string xid;
+ bool ended;
+ bool suspended;
+ bool failed;
+ bool expired;
- public:
- typedef boost::shared_ptr<DtxBuffer> shared_ptr;
+ public:
+ typedef boost::shared_ptr<DtxBuffer> shared_ptr;
- QPID_BROKER_EXTERN DtxBuffer(const std::string& xid = "");
- QPID_BROKER_EXTERN ~DtxBuffer();
- QPID_BROKER_EXTERN void markEnded();
- bool isEnded();
- void setSuspended(bool suspended);
- bool isSuspended();
- void fail();
- bool isRollbackOnly();
- void timedout();
- bool isExpired();
- const std::string& getXid();
- };
- }
+ QPID_BROKER_EXTERN DtxBuffer(
+ const std::string& xid = "",
+ bool ended=false, bool suspended=false, bool failed=false, bool expired=false);
+ QPID_BROKER_EXTERN ~DtxBuffer();
+ QPID_BROKER_EXTERN void markEnded();
+ bool isEnded() const;
+ void setSuspended(bool suspended);
+ bool isSuspended() const;
+ void fail();
+ bool isRollbackOnly() const;
+ void timedout();
+ bool isExpired() const;
+ bool isFailed() const;
+ std::string getXid() const;
+};
+}
}
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index 3caa41c3f4..febd547478 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,7 +34,7 @@ using qpid::ptr_map_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
DtxManager::~DtxManager() {}
@@ -53,8 +53,8 @@ void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionCon
createWork(xid)->recover(txn, ops);
}
-bool DtxManager::prepare(const std::string& xid)
-{
+bool DtxManager::prepare(const std::string& xid)
+{
QPID_LOG(debug, "preparing: " << xid);
try {
return getWork(xid)->prepare();
@@ -64,8 +64,8 @@ bool DtxManager::prepare(const std::string& xid)
}
}
-bool DtxManager::commit(const std::string& xid, bool onePhase)
-{
+bool DtxManager::commit(const std::string& xid, bool onePhase)
+{
QPID_LOG(debug, "committing: " << xid);
try {
bool result = getWork(xid)->commit(onePhase);
@@ -77,8 +77,8 @@ bool DtxManager::commit(const std::string& xid, bool onePhase)
}
}
-void DtxManager::rollback(const std::string& xid)
-{
+void DtxManager::rollback(const std::string& xid)
+{
QPID_LOG(debug, "rolling back: " << xid);
try {
getWork(xid)->rollback();
@@ -91,7 +91,7 @@ void DtxManager::rollback(const std::string& xid)
DtxWorkRecord* DtxManager::getWork(const std::string& xid)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
@@ -99,9 +99,14 @@ DtxWorkRecord* DtxManager::getWork(const std::string& xid)
return ptr_map_ptr(i);
}
+bool DtxManager::exists(const std::string& xid) {
+ Mutex::ScopedLock locker(lock);
+ return work.find(xid) != work.end();
+}
+
void DtxManager::remove(const std::string& xid)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
@@ -110,14 +115,15 @@ void DtxManager::remove(const std::string& xid)
}
}
-DtxWorkRecord* DtxManager::createWork(std::string xid)
+DtxWorkRecord* DtxManager::createWork(const std::string& xid)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i != work.end()) {
throw NotAllowedException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
} else {
- return ptr_map_ptr(work.insert(xid, new DtxWorkRecord(xid, store)).first);
+ std::string ncxid = xid; // Work around const correctness problems in ptr_map.
+ return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
}
}
@@ -131,7 +137,7 @@ void DtxManager::setTimeout(const std::string& xid, uint32_t secs)
}
timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid));
record->setTimeout(timeout);
- timer.add(timeout);
+ timer->add(timeout);
}
uint32_t DtxManager::getTimeout(const std::string& xid)
@@ -142,7 +148,7 @@ uint32_t DtxManager::getTimeout(const std::string& xid)
void DtxManager::timedout(const std::string& xid)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
QPID_LOG(warning, "Transaction timeout failed: no record for xid");
@@ -153,7 +159,7 @@ void DtxManager::timedout(const std::string& xid)
}
}
-DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
+DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
: TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), mgr(_mgr), xid(_xid) {}
void DtxManager::DtxCleanup::fire()
diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h
index 680b62eeb2..b85bcd7e76 100644
--- a/cpp/src/qpid/broker/DtxManager.h
+++ b/cpp/src/qpid/broker/DtxManager.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,7 +26,6 @@
#include "qpid/broker/DtxWorkRecord.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Timer.h"
#include "qpid/sys/Mutex.h"
namespace qpid {
@@ -39,22 +38,21 @@ class DtxManager{
{
DtxManager& mgr;
const std::string& xid;
-
- DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
+
+ DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
void fire();
};
WorkMap work;
TransactionalStore* store;
qpid::sys::Mutex lock;
- qpid::sys::Timer& timer;
+ qpid::sys::Timer* timer;
void remove(const std::string& xid);
- DtxWorkRecord* getWork(const std::string& xid);
- DtxWorkRecord* createWork(std::string xid);
+ DtxWorkRecord* createWork(const std::string& xid);
public:
- DtxManager(qpid::sys::Timer&);
+ DtxManager(sys::Timer&);
~DtxManager();
void start(const std::string& xid, DtxBuffer::shared_ptr work);
void join(const std::string& xid, DtxBuffer::shared_ptr work);
@@ -66,6 +64,15 @@ public:
uint32_t getTimeout(const std::string& xid);
void timedout(const std::string& xid);
void setStore(TransactionalStore* store);
+ void setTimer(sys::Timer& t) { timer = &t; }
+
+ // Used by cluster for replication.
+ template<class F> void each(F f) const {
+ for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i)
+ f(*i);
+ }
+ DtxWorkRecord* getWork(const std::string& xid);
+ bool exists(const std::string& xid);
};
}
diff --git a/cpp/src/qpid/broker/DtxTimeout.cpp b/cpp/src/qpid/broker/DtxTimeout.cpp
index c4c52ec40a..58700846ef 100644
--- a/cpp/src/qpid/broker/DtxTimeout.cpp
+++ b/cpp/src/qpid/broker/DtxTimeout.cpp
@@ -25,7 +25,7 @@
using namespace qpid::broker;
DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
- : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout"), timeout(_timeout), mgr(_mgr), xid(_xid)
+ : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout-"+_xid), timeout(_timeout), mgr(_mgr), xid(_xid)
{
}
diff --git a/cpp/src/qpid/broker/DtxTimeout.h b/cpp/src/qpid/broker/DtxTimeout.h
index 680a210e4f..1fcb4cee2a 100644
--- a/cpp/src/qpid/broker/DtxTimeout.h
+++ b/cpp/src/qpid/broker/DtxTimeout.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,7 +29,9 @@ namespace broker {
class DtxManager;
-struct DtxTimeoutException : public Exception {};
+struct DtxTimeoutException : public Exception {
+ DtxTimeoutException(const std::string& msg=std::string()) : Exception(msg) {}
+};
struct DtxTimeout : public sys::TimerTask
{
@@ -37,7 +39,7 @@ struct DtxTimeout : public sys::TimerTask
DtxManager& mgr;
const std::string xid;
- DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
+ DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
void fire();
};
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp
index 9f33e698db..a413fe418d 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,19 +28,19 @@ using qpid::sys::Mutex;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
-DtxWorkRecord::~DtxWorkRecord()
+DtxWorkRecord::~DtxWorkRecord()
{
- if (timeout.get()) {
+ if (timeout.get()) {
timeout->cancel();
}
}
bool DtxWorkRecord::prepare()
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
if (check()) {
txn = store->begin(xid);
if (prepare(txn.get())) {
@@ -68,7 +68,7 @@ bool DtxWorkRecord::prepare(TransactionContext* _txn)
bool DtxWorkRecord::commit(bool onePhase)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
if (check()) {
if (prepared) {
//already prepared i.e. 2pc
@@ -78,13 +78,13 @@ bool DtxWorkRecord::commit(bool onePhase)
store->commit(*txn);
txn.reset();
-
+
std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
return true;
} else {
//1pc commit optimisation, don't need a 2pc transaction context:
if (!onePhase) {
- throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
+ throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
}
std::auto_ptr<TransactionContext> localtxn = store->begin();
if (prepare(localtxn.get())) {
@@ -107,16 +107,16 @@ bool DtxWorkRecord::commit(bool onePhase)
void DtxWorkRecord::rollback()
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
check();
abort();
}
void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
if (expired) {
- throw DtxTimeoutException();
+ throw DtxTimeoutException(QPID_MSG("Branch with xid " << xid << " has timed out."));
}
if (completed) {
throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!"));
@@ -163,7 +163,7 @@ void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer
void DtxWorkRecord::timedout()
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
expired = true;
rolledback = true;
if (!completed) {
@@ -175,3 +175,17 @@ void DtxWorkRecord::timedout()
}
abort();
}
+
+size_t DtxWorkRecord::indexOf(const DtxBuffer::shared_ptr& buf) {
+ Work::iterator i = std::find(work.begin(), work.end(), buf);
+ if (i == work.end()) throw NotFoundException(
+ QPID_MSG("Can't find DTX buffer for xid: " << buf->getXid()));
+ return i - work.begin();
+}
+
+DtxBuffer::shared_ptr DtxWorkRecord::operator[](size_t i) const {
+ if (i > work.size())
+ throw NotFoundException(
+ QPID_MSG("Can't find DTX buffer " << i << " for xid: " << xid));
+ return work[i];
+}
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h
index aec2d2aed4..331e42fefd 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.h
+++ b/cpp/src/qpid/broker/DtxWorkRecord.h
@@ -73,9 +73,19 @@ public:
void timedout();
void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; }
boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; }
+ std::string getXid() const { return xid; }
+ bool isCompleted() const { return completed; }
+ bool isRolledback() const { return rolledback; }
+ bool isPrepared() const { return prepared; }
+ bool isExpired() const { return expired; }
+
+ // Used by cluster update;
+ size_t size() const { return work.size(); }
+ DtxBuffer::shared_ptr operator[](size_t i) const;
+ uint32_t getTimeout() const { return timeout? timeout->timeout : 0; }
+ size_t indexOf(const DtxBuffer::shared_ptr&);
};
-}
-}
+}} // qpid::broker
#endif
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index ab3868eda2..37d981d6c4 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -170,8 +170,8 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
if (!dtxSelected) {
throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
}
- dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
- txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
+ dtxBuffer.reset(new DtxBuffer(xid));
+ txBuffer = dtxBuffer;
if (join) {
mgr.join(xid, dtxBuffer);
} else {
@@ -239,7 +239,7 @@ void SemanticState::resumeDtx(const std::string& xid)
checkDtxTimeout();
dtxBuffer->setSuspended(false);
- txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
+ txBuffer = dtxBuffer;
}
void SemanticState::checkDtxTimeout()
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 8c69d6b89b..8de884c113 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -65,7 +65,7 @@ class SessionContext;
*
* Message delivery is driven by ConsumerImpl::doOutput(), which is
* called when a client's socket is ready to write data.
- *
+ *
*/
class SemanticState : private boost::noncopyable {
public:
@@ -99,15 +99,15 @@ class SemanticState : private boost::noncopyable {
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
- ConsumerImpl(SemanticState* parent,
+ ConsumerImpl(SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive,
const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
~ConsumerImpl();
OwnershipToken* getSession();
- bool deliver(QueuedMessage& msg);
- bool filter(boost::intrusive_ptr<Message> msg);
- bool accept(boost::intrusive_ptr<Message> msg);
+ bool deliver(QueuedMessage& msg);
+ bool filter(boost::intrusive_ptr<Message> msg);
+ bool accept(boost::intrusive_ptr<Message> msg);
void disableNotify();
void enableNotify();
@@ -122,7 +122,7 @@ class SemanticState : private boost::noncopyable {
void addMessageCredit(uint32_t value);
void flush();
void stop();
- void complete(DeliveryRecord&);
+ void complete(DeliveryRecord&);
boost::shared_ptr<Queue> getQueue() const { return queue; }
bool isBlocked() const { return blocked; }
bool setBlocked(bool set) { std::swap(set, blocked); return set; }
@@ -188,7 +188,7 @@ class SemanticState : private boost::noncopyable {
const SessionContext& getSession() const { return session; }
ConsumerImpl& find(const std::string& destination);
-
+
/**
* Get named queue, never returns 0.
* @return: named queue
@@ -196,11 +196,11 @@ class SemanticState : private boost::noncopyable {
* @exception: ConnectionException if name="" and session has no default.
*/
boost::shared_ptr<Queue> getQueue(const std::string& name) const;
-
+
bool exists(const std::string& consumerTag);
- void consume(const std::string& destination,
- boost::shared_ptr<Queue> queue,
+ void consume(const std::string& destination,
+ boost::shared_ptr<Queue> queue,
bool ackRequired, bool acquire, bool exclusive,
const std::string& resumeId=std::string(), uint64_t resumeTtl=0,
const framing::FieldTable& = framing::FieldTable());
@@ -223,7 +223,7 @@ class SemanticState : private boost::noncopyable {
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
void recover(bool requeue);
- void deliver(DeliveryRecord& message, bool sync);
+ void deliver(DeliveryRecord& message, bool sync);
void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
@@ -244,7 +244,9 @@ class SemanticState : private boost::noncopyable {
DeliveryRecords& getUnacked() { return unacked; }
framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
+ DtxBuffer::shared_ptr getDtxBuffer() const { return dtxBuffer; }
void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
+ void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; }
void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
};
diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp
index b509778e89..d92e6ace48 100644
--- a/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/cpp/src/qpid/broker/TxBuffer.cpp
@@ -76,5 +76,5 @@ bool TxBuffer::commitLocal(TransactionalStore* const store)
}
void TxBuffer::accept(TxOpConstVisitor& v) const {
- std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
+ std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
}
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index f0b9c0a302..dba7878af2 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,57 +34,58 @@
#include <boost/intrusive_ptr.hpp>
namespace qpid {
- namespace broker {
- /**
- * Defines the behaviour for publish operations on a
- * transactional channel. Messages are routed through
- * exchanges when received but are not at that stage delivered
- * to the matching queues, rather the queues are held in an
- * instance of this class. On prepare() the message is marked
- * enqueued to the relevant queues in the MessagesStore. On
- * commit() the messages will be passed to the queue for
- * dispatch or to be added to the in-memory queue.
- */
- class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
+namespace broker {
+/**
+ * Defines the behaviour for publish operations on a
+ * transactional channel. Messages are routed through
+ * exchanges when received but are not at that stage delivered
+ * to the matching queues, rather the queues are held in an
+ * instance of this class. On prepare() the message is marked
+ * enqueued to the relevant queues in the MessagesStore. On
+ * commit() the messages will be passed to the queue for
+ * dispatch or to be added to the in-memory queue.
+ */
+class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
- class Commit{
- boost::intrusive_ptr<Message>& msg;
- public:
- Commit(boost::intrusive_ptr<Message>& msg);
- void operator()(const boost::shared_ptr<Queue>& queue);
- };
- class Rollback{
- boost::intrusive_ptr<Message>& msg;
- public:
- Rollback(boost::intrusive_ptr<Message>& msg);
- void operator()(const boost::shared_ptr<Queue>& queue);
- };
+ class Commit{
+ boost::intrusive_ptr<Message>& msg;
+ public:
+ Commit(boost::intrusive_ptr<Message>& msg);
+ void operator()(const boost::shared_ptr<Queue>& queue);
+ };
+ class Rollback{
+ boost::intrusive_ptr<Message>& msg;
+ public:
+ Rollback(boost::intrusive_ptr<Message>& msg);
+ void operator()(const boost::shared_ptr<Queue>& queue);
+ };
- boost::intrusive_ptr<Message> msg;
- std::list<boost::shared_ptr<Queue> > queues;
- std::list<boost::shared_ptr<Queue> > prepared;
+ boost::intrusive_ptr<Message> msg;
+ std::list<boost::shared_ptr<Queue> > queues;
+ std::list<boost::shared_ptr<Queue> > prepared;
- void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
+ void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
- public:
- QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
- QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
- QPID_BROKER_EXTERN virtual void commit() throw();
- QPID_BROKER_EXTERN virtual void rollback() throw();
+ public:
+ QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
+ QPID_BROKER_EXTERN virtual void commit() throw();
+ QPID_BROKER_EXTERN virtual void rollback() throw();
- virtual Message& getMessage() { return *msg; };
-
- QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
+ virtual Message& getMessage() { return *msg; };
- virtual ~TxPublish(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+ QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
- QPID_BROKER_EXTERN uint64_t contentSize();
+ virtual ~TxPublish(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
- boost::intrusive_ptr<Message> getMessage() const { return msg; }
- const std::list<boost::shared_ptr<Queue> > getQueues() const { return queues; }
- };
- }
+ QPID_BROKER_EXTERN uint64_t contentSize();
+
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
+ const std::list<boost::shared_ptr<Queue> >& getQueues() const { return queues; }
+ const std::list<boost::shared_ptr<Queue> >& getPrepared() const { return prepared; }
+};
+}
}
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 82ed8bf8c9..1c398d63f4 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -57,12 +57,12 @@
* - management::ManagementBroker: uses MessageHandler supplied by cluster
* to send messages to the broker via the cluster.
*
- * - Dtx: not yet supported with cluster.
- *
- * cluster::ExpiryPolicy implements the strategy for message expiry.
+ * cluster::ExpiryPolicy uses cluster time.
*
* ClusterTimer implements periodic timed events in the cluster context.
- * Used for periodic management events.
+ * Used for:
+ * - periodic management events.
+ * - DTX transaction timeouts.
*
* <h1>CLUSTER PROTOCOL OVERVIEW</h1>
*
@@ -199,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1128070;
+const uint32_t Cluster::CLUSTER_VERSION = 1159329;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 030d6e34c1..0691aae711 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -24,6 +24,8 @@
#include "Cluster.h"
#include "UpdateReceiver.h"
#include "qpid/assert.h"
+#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/TxBuffer.h"
@@ -114,7 +116,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
if (!updateIn.nextShadowMgmtId.empty())
connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
updateIn.nextShadowMgmtId.clear();
- }
+ }
init();
QPID_LOG(debug, cluster << " local connection " << *this);
}
@@ -167,7 +169,7 @@ void Connection::announce(
AMQFrame frame;
while (frame.decode(buf))
connection->received(frame);
- connection->setUserId(username);
+ connection->setUserId(username);
}
// Do managment actions now that the connection is replicated.
connection->raiseConnectEvent();
@@ -214,16 +216,9 @@ void Connection::received(framing::AMQFrame& f) {
}
}
-bool Connection::checkUnsupported(const AMQBody& body) {
- std::string message;
- if (body.getMethod()) {
- switch (body.getMethod()->amqpClassId()) {
- case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
- }
- }
- if (!message.empty())
- connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
- return !message.empty();
+bool Connection::checkUnsupported(const AMQBody&) {
+ // Throw an exception for unsupported commands. Currently all are supported.
+ return false;
}
struct GiveReadCreditOnExit {
@@ -464,11 +459,21 @@ void Connection::shadowReady(
output.setSendMax(sendMax);
}
+void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) {
+ broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+ broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID
+ uint32_t index = v.first.second; // Index
+ v.second->setDtxBuffer((*record)[index]);
+}
+
+// Marks the end of the update.
void Connection::membership(const FieldTable& joiners, const FieldTable& members,
const framing::SequenceNumber& frameSeq)
{
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
updateIn.consumerNumbering.clear();
+ for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(),
+ boost::bind(&Connection::setDtxBuffer, this, _1));
closeUpdated();
cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
}
@@ -536,8 +541,16 @@ void Connection::deliveryRecord(const string& qname,
} else { // Message at original position in original queue
m = queue->find(position);
}
- if (!m.payload)
- throw Exception(QPID_MSG("deliveryRecord no update message"));
+ // FIXME aconway 2011-08-19: removed:
+ // if (!m.payload)
+ // throw Exception(QPID_MSG("deliveryRecord no update message"));
+ //
+ // It seems this could happen legitimately in the case one
+ // session browses message M, then another session acquires
+ // it. In that case the browsers delivery record is !acquired
+ // but the message is not on its original Queue. In that case
+ // we'll get a deliveryRecord with no payload for the browser.
+ //
}
broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
@@ -545,7 +558,11 @@ void Connection::deliveryRecord(const string& qname,
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
- semanticState().record(dr); // Part of the session's unacked list.
+
+ if (dtxBuffer) // Record for next dtx-ack
+ dtxAckRecords.push_back(dr);
+ else
+ semanticState().record(dr); // Record on session's unacked list.
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -561,29 +578,29 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri
namespace {
- // find a StatefulQueueObserver that matches a given identifier
- class ObserverFinder {
- const std::string id;
- boost::shared_ptr<broker::QueueObserver> target;
- ObserverFinder(const ObserverFinder&) {}
- public:
- ObserverFinder(const std::string& _id) : id(_id) {}
- broker::StatefulQueueObserver *getObserver()
- {
- if (target)
- return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
- return 0;
- }
- void operator() (boost::shared_ptr<broker::QueueObserver> o)
- {
- if (!target) {
- broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
- if (p && p->getId() == id) {
- target = o;
- }
+// find a StatefulQueueObserver that matches a given identifier
+class ObserverFinder {
+ const std::string id;
+ boost::shared_ptr<broker::QueueObserver> target;
+ ObserverFinder(const ObserverFinder&) {}
+ public:
+ ObserverFinder(const std::string& _id) : id(_id) {}
+ broker::StatefulQueueObserver *getObserver()
+ {
+ if (target)
+ return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
+ return 0;
+ }
+ void operator() (boost::shared_ptr<broker::QueueObserver> o)
+ {
+ if (!target) {
+ broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+ if (p && p->getId() == id) {
+ target = o;
}
}
- };
+ }
+};
}
@@ -615,6 +632,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
void Connection::txStart() {
txBuffer.reset(new broker::TxBuffer());
}
+
void Connection::txAccept(const framing::SequenceSet& acked) {
txBuffer->enlist(boost::shared_ptr<broker::TxAccept>(
new broker::TxAccept(acked, semanticState().getUnacked())));
@@ -630,8 +648,10 @@ void Connection::txEnqueue(const std::string& queue) {
new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
}
-void Connection::txPublish(const framing::Array& queues, bool delivered) {
- boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
+void Connection::txPublish(const framing::Array& queues, bool delivered)
+{
+ boost::shared_ptr<broker::TxPublish> txPub(
+ new broker::TxPublish(getUpdateMessage().payload));
for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
txPub->deliverTo(findQueue((*i)->get<std::string>()));
txPub->delivered = delivered;
@@ -646,6 +666,50 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
semanticState().setAccumulatedAck(s);
}
+void Connection::dtxStart(const std::string& xid,
+ bool ended,
+ bool suspended,
+ bool failed,
+ bool expired)
+{
+ dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired));
+ txBuffer = dtxBuffer;
+}
+
+void Connection::dtxEnd() {
+ broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+ std::string xid = dtxBuffer->getXid();
+ if (mgr.exists(xid))
+ mgr.join(xid, dtxBuffer);
+ else
+ mgr.start(xid, dtxBuffer);
+ dtxBuffer.reset();
+ txBuffer.reset();
+}
+
+// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords
+void Connection::dtxAck() {
+ dtxBuffer->enlist(
+ boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords)));
+ dtxAckRecords.clear();
+}
+
+void Connection::dtxBufferRef(const std::string& xid, uint32_t index) {
+ // Save the association between DtxBuffer and session so we can
+ // set the DtxBuffer on the session at the end of the update
+ // when the DtxManager has been replicated.
+ updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState();
+}
+
+// Sent at end of work record.
+void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout)
+{
+ broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+ if (timeout) mgr.setTimeout(xid, timeout);
+ if (prepared) mgr.prepare(xid);
+}
+
+
void Connection::exchange(const std::string& encoded) {
Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index a9740f97f8..5133e4641e 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -29,6 +29,7 @@
#include "qpid/RefCounted.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SecureConnection.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/amqp_0_10/Connection.h"
@@ -164,6 +165,17 @@ class Connection :
void txEnd();
void accumulatedAck(const framing::SequenceSet&);
+ // Dtx state
+ void dtxStart(const std::string& xid,
+ bool ended,
+ bool suspended,
+ bool failed,
+ bool expired);
+ void dtxEnd();
+ void dtxAck();
+ void dtxBufferRef(const std::string& xid, uint32_t index);
+ void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout);
+
// Encoded exchange replication.
void exchange(const std::string& encoded);
@@ -251,7 +263,7 @@ class Connection :
broker::SemanticState& semanticState();
broker::QueuedMessage getUpdateMessage();
void closeUpdated();
-
+ void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &);
Cluster& cluster;
ConnectionId self;
bool catchUp;
@@ -263,6 +275,9 @@ class Connection :
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
+ boost::shared_ptr<broker::DtxBuffer> dtxBuffer;
+ broker::DeliveryRecords dtxAckRecords;
+ broker::DtxWorkRecord* dtxCurrent;
bool expectProtocolHeader;
McastFrameHandler mcastFrameHandler;
UpdateReceiver& updateIn;
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index fc104e8ca9..a5662bb2b3 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -45,6 +45,8 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/TxOpVisitor.h"
#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/DtxBuffer.h"
+#include "qpid/broker/DtxWorkRecord.h"
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/TxPublish.h"
#include "qpid/broker/RecoveredDequeue.h"
@@ -65,6 +67,7 @@
#include <boost/bind.hpp>
#include <boost/cast.hpp>
#include <algorithm>
+#include <iterator>
#include <sstream>
namespace qpid {
@@ -177,9 +180,9 @@ void UpdateClient::update() {
// longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
+
std::for_each(connections.begin(), connections.end(),
boost::bind(&UpdateClient::updateConnection, this, _1));
- session.queueDelete(arg::queue=UPDATE);
// some Queue Observers need session state & msgs synced first, so sync observers now
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1));
@@ -189,6 +192,8 @@ void UpdateClient::update() {
updateLinks();
updateManagementAgent();
+ updateDtxManager();
+ session.queueDelete(arg::queue=UPDATE);
session.close();
@@ -356,7 +361,8 @@ class MessageUpdater {
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
{
AMQFrame frame((AMQContentBody()));
- morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
+ morecontent = message.payload->getContentFrame(
+ *(message.queue), frame, maxContentSize, offset);
sb.get()->sendRawFrame(frame);
}
}
@@ -479,9 +485,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, *this << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(),
- boost::bind(&UpdateClient::updateUnacked, this, _1));
+ boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession));
- updateTxState(ss->getSemanticState()); // Tx transaction state.
+ updateTransactionState(ss->getSemanticState());
// Adjust command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
@@ -542,14 +548,18 @@ void UpdateClient::updateConsumer(
<< " on " << shadowSession.getId());
}
-void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
- if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
+void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr,
+ client::AsyncSession& updateSession)
+{
+ if (!dr.isEnded() && dr.isAcquired()) {
+ // FIXME aconway 2011-08-19: should this be assert or if?
+ assert(dr.getMessage().payload);
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
//
- MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
+ MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage());
}
- ClusterConnectionProxy(shadowSession).deliveryRecord(
+ ClusterConnectionProxy(updateSession).deliveryRecord(
dr.getQueue()->getName(),
dr.getMessage().position,
dr.getTag(),
@@ -570,8 +580,10 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
: MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
- void operator()(const broker::DtxAck& ) {
- throw InternalErrorException("DTX transactions not currently supported by cluster.");
+ void operator()(const broker::DtxAck& ack) {
+ std::for_each(ack.getPending().begin(), ack.getPending().end(),
+ boost::bind(&UpdateClient::updateUnacked, &parent, _1, session));
+ proxy.dtxAck();
}
void operator()(const broker::RecoveredDequeue& rdeq) {
@@ -588,13 +600,18 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
proxy.txAccept(txAccept.getAcked());
}
+ typedef std::list<Queue::shared_ptr> QueueList;
+
+ void copy(const QueueList& l, Array& a) {
+ for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i)
+ a.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+ }
+
void operator()(const broker::TxPublish& txPub) {
updateMessage(txPub.getMessage());
- typedef std::list<Queue::shared_ptr> QueueList;
- const QueueList& qlist = txPub.getQueues();
+ assert(txPub.getQueues().empty() || txPub.getPrepared().empty());
Array qarray(TYPE_CODE_STR8);
- for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
- qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+ copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray);
proxy.txPublish(qarray, txPub.delivered);
}
@@ -604,19 +621,33 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
ClusterConnectionProxy proxy;
};
-void UpdateClient::updateTxState(broker::SemanticState& s) {
- QPID_LOG(debug, *this << " updating TX transaction state.");
+void UpdateClient::updateTransactionState(broker::SemanticState& s) {
+ broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
+ broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
- broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
- if (txBuffer) {
+ if (dtx) {
+ broker::DtxWorkRecord* record =
+ updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found
+ proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx));
+ } else if (tx) {
+ ClusterConnectionProxy proxy(shadowSession);
proxy.txStart();
TxOpUpdater updater(*this, shadowSession, expiry);
- txBuffer->accept(updater);
+ tx->accept(updater);
proxy.txEnd();
}
}
+void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {
+ ClusterConnectionProxy proxy(session);
+ proxy.dtxStart(
+ dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired());
+ TxOpUpdater updater(*this, session, expiry);
+ dtx->accept(updater);
+ proxy.dtxEnd();
+}
+
void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) {
queue->getListeners().eachListener(
boost::bind(&UpdateClient::updateQueueListener, this, queue->getName(), _1));
@@ -667,5 +698,17 @@ void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q,
}
}
+void UpdateClient::updateDtxManager() {
+ broker::DtxManager& dtm = updaterBroker.getDtxManager();
+ dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1));
+}
+
+void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) {
+ QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid());
+ for (size_t i = 0; i < r.size(); ++i)
+ updateDtxBuffer(r[i]);
+ ClusterConnectionProxy(session).dtxWorkRecord(
+ r.getXid(), r.isPrepared(), r.getTimeout());
+}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 21bf6024e0..83d4cfac81 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -52,7 +52,7 @@ class Decoder;
class Link;
class Bridge;
class QueueObserver;
-
+class DtxBuffer;
} // namespace broker
namespace cluster {
@@ -88,7 +88,7 @@ class UpdateClient : public sys::Runnable {
void update();
void run(); // Will delete this when finished.
- void updateUnacked(const broker::DeliveryRecord&);
+ void updateUnacked(const broker::DeliveryRecord&, client::AsyncSession&);
private:
void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&);
@@ -100,7 +100,7 @@ class UpdateClient : public sys::Runnable {
void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding);
void updateConnection(const boost::intrusive_ptr<Connection>& connection);
void updateSession(broker::SessionHandler& s);
- void updateTxState(broker::SemanticState& s);
+ void updateTransactionState(broker::SemanticState& s);
void updateOutputTask(const sys::OutputTask* task);
void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
void updateQueueListeners(const boost::shared_ptr<broker::Queue>&);
@@ -112,6 +112,9 @@ class UpdateClient : public sys::Runnable {
void updateBridge(const boost::shared_ptr<broker::Bridge>&);
void updateQueueObservers(const boost::shared_ptr<broker::Queue>&);
void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>);
+ void updateDtxManager();
+ void updateDtxBuffer(const boost::shared_ptr<broker::DtxBuffer>& );
+ void updateDtxWorkRecord(const broker::DtxWorkRecord&);
Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;
diff --git a/cpp/src/qpid/cluster/UpdateReceiver.h b/cpp/src/qpid/cluster/UpdateReceiver.h
index 7e8ce47662..512e59e5a1 100644
--- a/cpp/src/qpid/cluster/UpdateReceiver.h
+++ b/cpp/src/qpid/cluster/UpdateReceiver.h
@@ -39,6 +39,13 @@ class UpdateReceiver {
/** Management-id for the next shadow connection */
std::string nextShadowMgmtId;
+
+ /** Relationship between DtxBuffers, identified by xid, index in DtxManager,
+ * and sessions represented by their SemanticState.
+ */
+ typedef std::pair<std::string, uint32_t> DtxBufferRef;
+ typedef std::map<DtxBufferRef, broker::SemanticState* > DtxBuffers;
+ DtxBuffers dtxBuffers;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/log/Statement.cpp b/cpp/src/qpid/log/Statement.cpp
index 6a32b50096..85b4d1f155 100644
--- a/cpp/src/qpid/log/Statement.cpp
+++ b/cpp/src/qpid/log/Statement.cpp
@@ -24,35 +24,9 @@
#include <ctype.h>
namespace qpid {
+std::string quote(const std::string& str); // Defined in Msg.cpp
namespace log {
-namespace {
-using namespace std;
-
-struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } };
-
-const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
-
-std::string quote(const std::string& str) {
- NonPrint nonPrint;
- size_t n = std::count_if(str.begin(), str.end(), nonPrint);
- if (n==0) return str;
- std::string ret;
- ret.reserve(str.size()+2*n); // Avoid extra allocations.
- for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
- if (nonPrint(*i)) {
- ret.push_back('\\');
- ret.push_back('x');
- ret.push_back(hex[((*i) >> 4)&0xf]);
- ret.push_back(hex[(*i) & 0xf]);
- }
- else ret.push_back(*i);
- }
- return ret;
-}
-
-}
-
void Statement::log(const std::string& message) {
Logger::instance().log(*this, quote(message));
}
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py
index fd972b4394..7888f44c30 100644
--- a/cpp/src/tests/brokertest.py
+++ b/cpp/src/tests/brokertest.py
@@ -493,9 +493,7 @@ class BrokerTest(TestCase):
return cluster
def browse(self, session, queue, timeout=0):
- """Assert that the contents of messages on queue (as retrieved
- using session and timeout) exactly match the strings in
- expect_contents"""
+ """Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
try:
contents = []
diff --git a/cpp/src/tests/cluster_python_tests_failing.txt b/cpp/src/tests/cluster_python_tests_failing.txt
index 7ba8089946..f8639d7b59 100644
--- a/cpp/src/tests/cluster_python_tests_failing.txt
+++ b/cpp/src/tests/cluster_python_tests_failing.txt
@@ -1,32 +1,4 @@
qpid_tests.broker_0_10.management.ManagementTest.test_purge_queue
qpid_tests.broker_0_10.management.ManagementTest.test_connection_close
-qpid_tests.broker_0_10.dtx.DtxTests.test_bad_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_commit_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_suspend_and_fail
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_unknown_xid
-qpid_tests.broker_0_10.dtx.DtxTests.test_forget_xid_on_completion
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_implicit_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_false
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_true
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_prepare_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_rollback_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_prepare_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_recover
-qpid_tests.broker_0_10.dtx.DtxTests.test_rollback_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_select_required
-qpid_tests.broker_0_10.dtx.DtxTests.test_set_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_already_known
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join_and_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_start_end_resume
qpid_tests.broker_0_10.message.MessageTests.test_ttl
qpid_tests.broker_0_10.management.ManagementTest.test_broker_connectivity_oldAPI
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 807e9508c3..e67a691283 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -713,6 +713,204 @@ acl allow all all
cluster.start()
fetch(cluster[2])
+# Some utility code for transaction tests
+XA_RBROLLBACK = 1
+XA_RBTIMEOUT = 2
+XA_OK = 0
+dtx_branch_counter = 0
+
+class DtxTestFixture:
+ """Bundle together some common requirements for dtx tests."""
+ def __init__(self, test, broker, name, exclusive=False):
+ self.test = test
+ self.broker = broker
+ self.name = name
+ # Use old API. DTX is not supported in messaging API.
+ self.connection = broker.connect_old()
+ self.session = self.connection.session(name, 1) # 1 second timeout
+ self.queue = self.session.queue_declare(name, exclusive=exclusive)
+ self.xid = self.session.xid(format=0, global_id=name)
+ self.session.dtx_select()
+ self.consumer = None
+
+ def start(self):
+ self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid).status)
+
+ def end(self):
+ self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid).status)
+
+ def prepare(self):
+ self.test.assertEqual(XA_OK, self.session.dtx_prepare(xid=self.xid).status)
+
+ def commit(self, one_phase=True):
+ self.test.assertEqual(
+ XA_OK, self.session.dtx_commit(xid=self.xid, one_phase=one_phase).status)
+
+ def rollback(self):
+ self.test.assertEqual(XA_OK, self.session.dtx_rollback(xid=self.xid).status)
+
+ def send(self, messages):
+ for m in messages:
+ dp=self.session.delivery_properties(routing_key=self.name)
+ mp=self.session.message_properties()
+ self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m))
+
+ def accept(self):
+ """Accept 1 message from queue"""
+ consumer_tag="%s-consumer"%(self.name)
+ self.session.message_subscribe(queue=self.name, destination=consumer_tag)
+ self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag)
+ self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+ msg = self.session.incoming(consumer_tag).get(timeout=1)
+ self.session.message_cancel(destination=consumer_tag)
+ self.session.message_accept(qpid.datatypes.RangedSet(msg.id))
+ return msg
+
+
+ def verify(self, cluster, messages):
+ for b in cluster:
+ self.test.assert_browse(b.connect().session(), self.name, messages)
+
+
+class DtxTests(BrokerTest):
+
+ def test_dtx_update(self):
+ """Verify that DTX transaction state is updated to a new broker.
+ Start a collection of transactions, then add a new cluster member,
+ then verify they commit/rollback correctly on the new broker."""
+
+ # Note: multiple test have been bundled into one to avoid the need to start/stop
+ # multiple brokers per test.
+
+ cluster=self.cluster(1)
+
+ # Transaction that will be open when new member joins, then committed.
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.start()
+ t1.send(["1", "2"])
+ t1.verify(cluster, []) # Not visible outside of transaction
+
+ # Transaction that will be open when new member joins, then rolled back.
+ t2 = DtxTestFixture(self, cluster[0], "t2")
+ t2.start()
+ t2.send(["1", "2"])
+
+ # Transaction that will be prepared when new member joins, then committed.
+ t3 = DtxTestFixture(self, cluster[0], "t3")
+ t3.start()
+ t3.send(["1", "2"])
+ t3.end()
+ t3.prepare()
+ t1.verify(cluster, []) # Not visible outside of transaction
+
+ # Transaction that will be prepared when new member joins, then rolled back.
+ t4 = DtxTestFixture(self, cluster[0], "t4")
+ t4.start()
+ t4.send(["1", "2"])
+ t4.end()
+ t4.prepare()
+
+ # Transaction using an exclusive queue
+ t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True)
+ t5.start()
+ t5.send(["1", "2"])
+
+ # Accept messages in a transaction before/after join then commit
+ t6 = DtxTestFixture(self, cluster[0], "t6")
+ t6.send(["a","b","c"])
+ t6.start()
+ t6.verify(cluster, ["a","b","c"])
+ self.assertEqual(t6.accept().body, "a");
+ t6.verify(cluster, ["b","c"])
+
+ # Accept messages in a transaction before/after join then roll back
+ t7 = DtxTestFixture(self, cluster[0], "t7")
+ t7.send(["a","b","c"])
+ t7.start()
+ t7.verify(cluster, ["a","b","c"])
+ self.assertEqual(t7.accept().body, "a");
+ t7.verify(cluster, ["b","c"])
+
+ # Start new member
+ cluster.start()
+
+ # Commit t1
+ t1.send(["3","4"])
+ t1.verify(cluster, [])
+ t1.end()
+ t1.commit(one_phase=True)
+ t1.verify(cluster, ["1","2","3","4"])
+
+ # Rollback t2
+ t2.send(["3","4"])
+ t2.verify(cluster, [])
+ t2.end()
+ t2.rollback()
+ t2.verify(cluster, [])
+
+ # Commit t3
+ t3.verify(cluster, [])
+ t3.commit(one_phase=False)
+ t3.verify(cluster, ["1","2"])
+
+ # Rollback t4
+ t4.verify(cluster, [])
+ t4.rollback()
+ t4.verify(cluster, [])
+
+ # Commit t5
+ t5.send(["3","4"])
+ t5.verify(cluster, [])
+ t5.end()
+ t5.commit(one_phase=True)
+ t5.verify(cluster, ["1","2","3","4"])
+
+ # Commit t7
+ t6.verify(cluster, ["b", "c"])
+ self.assertEqual(t6.accept().body, "b");
+ t6.verify(cluster, ["c"])
+ t6.end()
+ t6.commit(one_phase=True)
+ t6.verify(cluster, ["c"])
+ t6.session.close() # Make sure they're not requeued by the session.
+ t6.verify(cluster, ["c"])
+
+ # Rollback t7
+ t7.verify(cluster, ["b", "c"])
+ self.assertEqual(t7.accept().body, "b");
+ t7.verify(cluster, ["c"])
+ t7.end()
+ t7.rollback()
+ t7.verify(cluster, ["a", "b", "c"])
+
+
+class TxTests(BrokerTest):
+
+ def test_tx_update(self):
+ """Verify that transaction state is updated to a new broker"""
+
+ def make_message(session, body=None, key=None, id=None):
+ dp=session.delivery_properties(routing_key=key)
+ mp=session.message_properties(correlation_id=id)
+ return qpid.datatypes.Message(dp, mp, body)
+
+ cluster=self.cluster(1)
+ # Use old API. TX is not supported in messaging API.
+ c = cluster[0].connect_old()
+ s = c.session("tx-session", 1)
+ s.queue_declare(queue="q")
+ # Start transaction
+ s.tx_select()
+ s.message_transfer(message=make_message(s, "1", "q"))
+ # Start new member mid-transaction
+ cluster.start()
+ # Do more work
+ s.message_transfer(message=make_message(s, "2", "q"))
+ # Commit the transaction and verify the results.
+ s.tx_commit()
+ for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"])
+
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -1001,6 +1199,8 @@ class LongTests(BrokerTest):
logger = logging.getLogger()
log_level = logger.getEffectiveLevel()
logger.setLevel(logging.ERROR)
+ sender = None
+ receiver = None
try:
# Start sender and receiver threads
receiver = Receiver(cluster[0], "q;{create:always}")
@@ -1031,8 +1231,8 @@ class LongTests(BrokerTest):
finally:
# Detach to avoid slow reconnect attempts during shut-down if test fails.
- sender.connection.detach()
- receiver.connection.detach()
+ if sender: sender.connection.detach()
+ if receiver: receiver.connection.detach()
logger.setLevel(log_level)
class StoreTests(BrokerTest):
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index c33f2e4852..f49f8dbfff 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -212,6 +212,29 @@
<field name="name" type="str8"/>
</control>
+ <!-- Dtx transaction state. -->
+ <control name="dtx-start" code="0x1A">
+ <field name="xid" type="str16"/>
+ <field name="ended" type="bit"/>
+ <field name="suspended" type="bit"/>
+ <field name="failed" type="bit"/>
+ <field name="expired" type="bit"/>
+ </control>
+ <control name="dtx-end" code="0x1B"/>
+
+ <control name="dtx-ack" code="0x1C"/>
+
+ <control name="dtx-buffer-ref" code="0x1D">
+ <field name="xid" type="str16"/>
+ <field name="index" type="uint32"/>
+ </control>
+
+ <control name="dtx-work-record" code="0x1E">
+ <field name="xid" type="str16"/>
+ <field name="prepared" type="bit"/>
+ <field name="timeout" type="uint32"/>
+ </control>
+
<!-- Complete a session state update. -->
<control name="session-state" code="0x1F">
<!-- Target session deduced from channel number. -->