summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcpp/rubygen/99-0/Proxy.rb1
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp2
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h1
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp12
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h3
-rw-r--r--cpp/src/qpid/broker/DtxAck.cpp1
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp3
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp23
-rw-r--r--cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp128
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h34
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp1
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp61
-rw-r--r--cpp/src/qpid/broker/TxAccept.h57
-rw-r--r--cpp/xml/extra.xml101
-rw-r--r--python/cpp_failing_0-10.txt20
-rw-r--r--python/qpid/datatypes.py3
-rw-r--r--python/qpid/session.py5
-rw-r--r--python/qpid/spec010.py2
-rw-r--r--python/tests_0-10/dtx.py505
21 files changed, 691 insertions, 276 deletions
diff --git a/cpp/rubygen/99-0/Proxy.rb b/cpp/rubygen/99-0/Proxy.rb
index 85db52da5b..87d809d4ad 100755
--- a/cpp/rubygen/99-0/Proxy.rb
+++ b/cpp/rubygen/99-0/Proxy.rb
@@ -43,6 +43,7 @@ EOS
include "qpid/framing/Proxy.h"
include "qpid/framing/Array.h"
include "qpid/framing/amqp_types.h"
+ include "qpid/framing/amqp_structs.h"
namespace("qpid::framing") {
cpp_class(@classname, "public Proxy") {
public
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index d40bbf699e..b71bd754f7 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -229,6 +229,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/System.cpp \
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
+ qpid/broker/TxAccept.cpp \
qpid/broker/TxAck.cpp \
qpid/broker/TxBuffer.cpp \
qpid/broker/TxPublish.cpp \
@@ -352,6 +353,7 @@ nobase_include_HEADERS = \
qpid/broker/Timer.h \
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
+ qpid/broker/TxAccept.h \
qpid/broker/TxAck.h \
qpid/broker/TxBuffer.h \
qpid/broker/TxOp.h \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 526b58cb14..d7fd20db1d 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -340,7 +340,7 @@ void BrokerAdapter::TxHandlerImpl::select()
void BrokerAdapter::TxHandlerImpl::commit()
{
- state.commit(&getBroker().getStore());
+ state.commit(&getBroker().getStore(), true);
}
void BrokerAdapter::TxHandlerImpl::rollback()
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 0e69a03465..b28c4ebdcc 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -84,6 +84,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Tx010Handler* getTx010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ Dtx010Handler* getDtx010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); }
// Handlers no longer implemented in BrokerAdapter:
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index ca90f32a5d..c5bf325afc 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -67,12 +67,8 @@ void DeliveryRecord::setEnded()
ended = true;
//reset msg pointer, don't need to hold on to it anymore
msg.payload = boost::intrusive_ptr<Message>();
-}
-void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
- if (acquired && !ended) {
- queue->dequeue(ctxt, msg.payload);
- }
+ QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id);
}
bool DeliveryRecord::matches(DeliveryId tag) const{
@@ -136,6 +132,12 @@ void DeliveryRecord::accept(TransactionContext* ctxt) {
}
}
+void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
+ if (acquired && !ended) {
+ queue->dequeue(ctxt, msg.payload);
+ }
+}
+
void DeliveryRecord::reject()
{
Exchange::shared_ptr alternate = queue->getAlternateExchange();
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index b2672345b4..eed7b81748 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -55,8 +55,6 @@ class DeliveryRecord{
bool completed;
bool ended;
- void setEnded();
-
public:
DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token,
const DeliveryId id, bool acquired, bool confirmed = false);
@@ -76,6 +74,7 @@ class DeliveryRecord{
void acquire(DeliveryIds& results);
void complete();
void accept(TransactionContext* ctxt);
+ void setEnded();
bool isAcquired() const { return acquired; }
bool isComplete() const { return completed; }
diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp
index 25186b4102..0e6f94d4e0 100644
--- a/cpp/src/qpid/broker/DtxAck.cpp
+++ b/cpp/src/qpid/broker/DtxAck.cpp
@@ -30,7 +30,6 @@ DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>&
{
remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()),
not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
- unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked));
}
bool DtxAck::prepare(TransactionContext* ctxt) throw()
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index cef3a4b02b..fb6b3f019e 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -54,6 +54,7 @@ void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionCon
bool DtxManager::prepare(const std::string& xid)
{
+ QPID_LOG(debug, "preparing: " << xid);
try {
return getWork(xid)->prepare();
} catch (DtxTimeoutException& e) {
@@ -64,6 +65,7 @@ bool DtxManager::prepare(const std::string& xid)
bool DtxManager::commit(const std::string& xid, bool onePhase)
{
+ QPID_LOG(debug, "committing: " << xid);
try {
bool result = getWork(xid)->commit(onePhase);
remove(xid);
@@ -76,6 +78,7 @@ bool DtxManager::commit(const std::string& xid, bool onePhase)
void DtxManager::rollback(const std::string& xid)
{
+ QPID_LOG(debug, "rolling back: " << xid);
try {
getWork(xid)->rollback();
remove(xid);
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 1b5bcc2f7e..6f5577de5a 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -29,6 +29,7 @@
#include "Message.h"
#include "SemanticHandler.h"
#include "SessionHandler.h"
+#include "TxAccept.h"
#include "TxAck.h"
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
@@ -119,12 +120,14 @@ void SemanticState::startTx()
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void SemanticState::commit(MessageStore* const store)
+void SemanticState::commit(MessageStore* const store, bool completeOnCommit)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
- TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+ TxOp::shared_ptr txAck(completeOnCommit ?
+ static_cast<TxOp*>(new TxAck(accumulatedAck, unacked)) :
+ static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
accumulatedAck.clear();
@@ -402,9 +405,11 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
- //unacked and record it against that transaction
+ //if enlisted in a dtx, copy the relevant slice from
+ //unacked and record it against that transaction:
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ //then remove that slice from the unacked record:
+ unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &accumulatedAck));
accumulatedAck.clear();
dtxBuffer->enlist(txAck);
}
@@ -660,11 +665,19 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet
if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
+ //if enlisted in a dtx, copy the relevant slice from
//unacked and record it against that transaction
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
accumulatedAck.clear();
dtxBuffer->enlist(txAck);
+
+ //mark the relevant messages as 'ended' in unacked
+ for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
+
+ //if the messages are already completed, they can be
+ //removed from the record
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+
}
} else {
for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0));
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index f9f343de38..4bcd0dddb3 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -167,7 +167,7 @@ class SemanticState : public framing::FrameHandler::Chains,
bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
void startTx();
- void commit(MessageStore* const store);
+ void commit(MessageStore* const store, bool completeOnCommit);
void rollback();
void selectDtx();
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 5d33e68fab..2091e97584 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -22,6 +22,7 @@
#include "Queue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/constants.h"
#include <boost/format.hpp>
#include <boost/cast.hpp>
#include <boost/bind.hpp>
@@ -40,7 +41,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) :
queueImpl(s),
messageImpl(s),
executionImpl(s),
- txImpl(s)
+ txImpl(s),
+ dtxImpl(s)
{}
@@ -431,7 +433,7 @@ void SessionAdapter::TxHandlerImpl::select()
void SessionAdapter::TxHandlerImpl::commit()
{
- state.commit(&getBroker().getStore());
+ state.commit(&getBroker().getStore(), false);
}
void SessionAdapter::TxHandlerImpl::rollback()
@@ -439,6 +441,128 @@ void SessionAdapter::TxHandlerImpl::rollback()
state.rollback();
}
+std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid)
+{
+ std::stringstream out;
+ out << xid.getFormat() << xid.getGlobalId() << xid.getBranchId();
+ return out.str();
+}
+
+
+void SessionAdapter::DtxHandlerImpl::select()
+{
+ state.selectDtx();
+}
+
+Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
+ bool fail,
+ bool suspend)
+{
+ try {
+ if (fail) {
+ state.endDtx(convert(xid), true);
+ if (suspend) {
+ throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
+ } else {
+ return Dtx010EndResult(XA_RBROLLBACK);
+ }
+ } else {
+ if (suspend) {
+ state.suspendDtx(convert(xid));
+ } else {
+ state.endDtx(convert(xid), false);
+ }
+ return Dtx010EndResult(XA_OK);
+ }
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010EndResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid,
+ bool join,
+ bool resume)
+{
+ if (join && resume) {
+ throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set."));
+ }
+ try {
+ if (resume) {
+ state.resumeDtx(convert(xid));
+ } else {
+ state.startDtx(convert(xid), getBroker().getDtxManager(), join);
+ }
+ return Dtx010StartResult(XA_OK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010StartResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid)
+{
+ try {
+ bool ok = getBroker().getDtxManager().prepare(convert(xid));
+ return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010PrepareResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid,
+ bool onePhase)
+{
+ try {
+ bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
+ return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010CommitResult(XA_RBTIMEOUT);
+ }
+}
+
+
+Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid)
+{
+ try {
+ getBroker().getDtxManager().rollback(convert(xid));
+ return Dtx010RollbackResult(XA_OK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010RollbackResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover()
+{
+ std::set<std::string> xids;
+ getBroker().getStore().collectPreparedXids(xids);
+
+ //TODO: remove the need to copy from one container type to another
+ std::vector<std::string> data;
+ for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
+ data.push_back(*i);
+ }
+ Array indoubt(data);
+ return Dtx010RecoverResult(indoubt);
+}
+
+void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid)
+{
+ //Currently no heuristic completion is supported, so this should never be used.
+ throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
+}
+
+Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid)
+{
+ uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
+ return Dtx010GetTimeoutResult(timeout);
+}
+
+
+void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid,
+ u_int32_t timeout)
+{
+ getBroker().getDtxManager().setTimeout(convert(xid), timeout);
+}
+
Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const {
Queue::shared_ptr queue;
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index dad89cd123..c8aa9008cc 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -61,6 +61,7 @@ class Queue;
Queue010Handler* getQueue010Handler(){ return &queueImpl; }
Execution010Handler* getExecution010Handler(){ return &executionImpl; }
Tx010Handler* getTx010Handler(){ return &txImpl; }
+ Dtx010Handler* getDtx010Handler(){ return &dtxImpl; }
BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); }
ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented"); }
@@ -218,12 +219,45 @@ class Queue;
void rollback();
};
+ class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper
+ {
+ std::string convert(const framing::Xid010& xid);
+
+ public:
+ DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
+
+ void select();
+
+ framing::Dtx010StartResult start(const framing::Xid010& xid,
+ bool join,
+ bool resume);
+
+ framing::Dtx010EndResult end(const framing::Xid010& xid,
+ bool fail,
+ bool suspend);
+
+ framing::Dtx010CommitResult commit(const framing::Xid010& xid,
+ bool onePhase);
+
+ void forget(const framing::Xid010& xid);
+
+ framing::Dtx010GetTimeoutResult getTimeout(const framing::Xid010& xid);
+
+ framing::Dtx010PrepareResult prepare(const framing::Xid010& xid);
+
+ framing::Dtx010RecoverResult recover();
+
+ framing::Dtx010RollbackResult rollback(const framing::Xid010& xid);
+
+ void setTimeout(const framing::Xid010& xid, uint32_t timeout);
+ };
ExchangeHandlerImpl exchangeImpl;
QueueHandlerImpl queueImpl;
MessageHandlerImpl messageImpl;
ExecutionHandlerImpl executionImpl;
TxHandlerImpl txImpl;
+ DtxHandlerImpl dtxImpl;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 105f946dcb..35ad562a22 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -178,6 +178,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
if (!invocation.wasHandled()) {
throw NotImplementedException("Not implemented");
} else if (invocation.hasResult()) {
+ nextOut++;//execution result is now a command, so the counter must be incremented
getProxy().getExecution010().result(id, invocation.getResult());
}
if (method->isSync()) {
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
new file mode 100644
index 0000000000..634d066ecc
--- /dev/null
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "TxAccept.h"
+#include "qpid/log/Statement.h"
+
+using std::bind1st;
+using std::bind2nd;
+using std::mem_fun_ref;
+using namespace qpid::broker;
+using qpid::framing::AccumulatedAck;
+
+TxAccept::TxAccept(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) :
+ acked(_acked), unacked(_unacked) {}
+
+bool TxAccept::prepare(TransactionContext* ctxt) throw()
+{
+ try{
+ //dequeue messages from their respective queues:
+ for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
+ if (i->coveredBy(&acked)) {
+ i->dequeue(ctxt);
+ }
+ }
+ return true;
+ }catch(const std::exception& e){
+ QPID_LOG(error, "Failed to prepare: " << e.what());
+ return false;
+ }catch(...){
+ QPID_LOG(error, "Failed to prepare");
+ return false;
+ }
+}
+
+void TxAccept::commit() throw()
+{
+ for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
+ if (i->coveredBy(&acked)) i->setEnded();
+ }
+
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+}
+
+void TxAccept::rollback() throw() {}
diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h
new file mode 100644
index 0000000000..011acf5d9e
--- /dev/null
+++ b/cpp/src/qpid/broker/TxAccept.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxAccept_
+#define _TxAccept_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "qpid/framing/AccumulatedAck.h"
+#include "DeliveryRecord.h"
+#include "TxOp.h"
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Defines the transactional behaviour for accepts received by
+ * a transactional channel.
+ */
+ class TxAccept : public TxOp{
+ framing::AccumulatedAck& acked;
+ std::list<DeliveryRecord>& unacked;
+
+ public:
+ /**
+ * @param acked a representation of the accumulation of
+ * acks received
+ * @param unacked the record of delivered messages
+ */
+ TxAccept(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~TxAccept(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/xml/extra.xml b/cpp/xml/extra.xml
index 0acc2260ad..6080800f60 100644
--- a/cpp/xml/extra.xml
+++ b/cpp/xml/extra.xml
@@ -33,6 +33,14 @@
</doc>
</domain>
+ <domain name="xid010">
+ <struct size="short" pack="short">
+ <field name="format" domain="long" />
+ <field name="global-id" domain="shortstr" />
+ <field name="branch-id" domain="shortstr" />
+ </struct>
+ </domain>
+
<domain name="delivery-properties-010">
<struct size="long" pack="short" type="1025">
<field name="discard-unroutable" domain="bit" label="controls discard of unroutable messages"/>
@@ -739,6 +747,99 @@
</method>
</class>
+<class name="dtx010" index="6">
+ <doc>blah, blah</doc>
+ <method name = "select" index="1">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ </method>
+ <method name = "start" index="2">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <field name="join" domain="bit"/>
+ <field name="resume" domain="bit"/>
+ <result>
+ <struct size="long" pack="short" type="1">
+ <field name="status" domain="short" />
+ </struct>
+ </result>
+ </method>
+ <method name = "end" index="3">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <field name="fail" domain="bit"/>
+ <field name="suspend" domain="bit"/>
+ <result>
+ <struct size="long" pack="short" type="1">
+ <field name="status" domain="short" />
+ </struct>
+ </result>
+ </method>
+ <method name = "commit" index="4">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <field name="one-phase" domain="bit"/>
+ <result>
+ <struct size="long" pack="short" type="1">
+ <field name="status" domain="short" />
+ </struct>
+ </result>
+ </method>
+ <method name = "forget" index="5">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ </method>
+ <method name = "get-timeout" index="6">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <result>
+ <struct size="long" pack="short" type="2">
+ <field name="timeout" domain="long" />
+ </struct>
+ </result>
+ </method>
+ <method name = "prepare" index="7">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <result>
+ <struct size="long" pack="short" type="1">
+ <field name="status" domain="short" />
+ </struct>
+ </result>
+ </method>
+ <method name = "recover" index="8">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <result>
+ <struct size="long" pack="short" type="3">
+ <field name="in-doubt" domain="array" />
+ </struct>
+ </result>
+ </method>
+ <method name = "rollback" index="9">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <result>
+ <struct size="long" pack="short" type="1">
+ <field name="status" domain="short" />
+ </struct>
+ </result>
+ </method>
+ <method name = "set-timeout" index="10">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <field name="timeout" domain="long"/>
+ </method>
+</class>
+
<class name="exchange010" index="7">
<doc>blah, blah</doc>
<method name = "declare" index="1">
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt
index 70cd3d19ed..58cad3344b 100644
--- a/python/cpp_failing_0-10.txt
+++ b/python/cpp_failing_0-10.txt
@@ -2,27 +2,7 @@ tests.codec.FieldTableTestCase.test_field_table_decode
tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair
tests.codec.FieldTableTestCase.test_field_table_name_value_pair
tests_0-10.execution.ExecutionTests.test_flush
-tests_0-10.dtx.DtxTests.test_bad_resume
-tests_0-10.dtx.DtxTests.test_end
-tests_0-10.dtx.DtxTests.test_end_suspend_and_fail
-tests_0-10.dtx.DtxTests.test_end_unknown_xid
-tests_0-10.dtx.DtxTests.test_forget_xid_on_completion
-tests_0-10.dtx.DtxTests.test_get_timeout
-tests_0-10.dtx.DtxTests.test_implicit_end
-tests_0-10.dtx.DtxTests.test_invalid_commit_one_phase_false
-tests_0-10.dtx.DtxTests.test_invalid_commit_one_phase_true
tests_0-10.dtx.DtxTests.test_recover
-tests_0-10.dtx.DtxTests.test_select_required
-tests_0-10.dtx.DtxTests.test_set_timeout
-tests_0-10.dtx.DtxTests.test_simple_commit
-tests_0-10.dtx.DtxTests.test_simple_prepare_commit
-tests_0-10.dtx.DtxTests.test_simple_prepare_rollback
-tests_0-10.dtx.DtxTests.test_simple_rollback
-tests_0-10.dtx.DtxTests.test_start_already_known
-tests_0-10.dtx.DtxTests.test_start_join
-tests_0-10.dtx.DtxTests.test_start_join_and_resume
-tests_0-10.dtx.DtxTests.test_suspend_resume
-tests_0-10.dtx.DtxTests.test_suspend_start_end_resume
tests_0-10.message.MessageTests.test_consume_no_local
tests_0-10.message.MessageTests.test_consume_no_local_awkward
tests_0-10.message.MessageTests.test_no_size
diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py
index 59299b6e04..0893269174 100644
--- a/python/qpid/datatypes.py
+++ b/python/qpid/datatypes.py
@@ -154,6 +154,7 @@ class Future:
self.value = initial
self._error = None
self._set = threading.Event()
+ self.exception = exception
def error(self, error):
self._error = error
@@ -166,7 +167,7 @@ class Future:
def get(self, timeout=None):
self._set.wait(timeout)
if self._error != None:
- raise exception(self._error)
+ raise self.exception(self._error)
return self.value
def is_set(self):
diff --git a/python/qpid/session.py b/python/qpid/session.py
index 4da6f883b4..bbe2b326d6 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -110,6 +110,8 @@ class Session(Invoker):
for st in self.spec.structs.values():
if st.name == name:
return st
+ if self.spec.structs_by_name.has_key(name):
+ return self.spec.structs_by_name[name]
return None
def invoke(self, type, args, kwargs):
@@ -289,8 +291,9 @@ class Delegate:
self.session.exceptions.append(ex)
error = self.session.error()
for id in self.session.results:
- f = self.session.results.pop(id)
+ f = self.session.results[id]
f.error(error)
+ self.session.results.clear()
for q in self.session._incoming.values():
q.close(error)
diff --git a/python/qpid/spec010.py b/python/qpid/spec010.py
index 815c1d064a..4eb03008d0 100644
--- a/python/qpid/spec010.py
+++ b/python/qpid/spec010.py
@@ -257,6 +257,7 @@ class Struct(Composite):
Composite.register(self, node)
if self.code is not None:
self.spec.structs[self.code] = self
+ self.spec.structs_by_name[self.name] = self
def __str__(self):
fields = ",\n ".join(["%s: %s" % (f.name, f.type.qname)
@@ -443,6 +444,7 @@ class Spec(Node):
self.controls = {}
self.commands = {}
self.structs = {}
+ self.structs_by_name = {}
def encoding(self, klass):
if Spec.ENCODINGS.has_key(klass):
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py
index f84f91c75a..2483c6f16d 100644
--- a/python/tests_0-10/dtx.py
+++ b/python/tests_0-10/dtx.py
@@ -18,12 +18,13 @@
#
from qpid.client import Client, Closed
from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.datatypes import Message, RangedSet
+from qpid.session import SessionException
+from qpid.testlib import TestBase010
from struct import pack, unpack
from time import sleep
-class DtxTests(TestBase):
+class DtxTests(TestBase010):
"""
Tests for the amqp dtx related classes.
@@ -43,15 +44,15 @@ class DtxTests(TestBase):
tx_counter = 0
def reset_channel(self):
- self.channel.session_close()
- self.channel = self.client.channel(self.channel.id + 1)
- self.channel.session_open()
+ self.session.close()
+ self.session = self.conn.session("dtx-session", 1)
def test_simple_commit(self):
"""
Test basic one-phase commit behaviour.
"""
- channel = self.channel
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "commit")
@@ -60,9 +61,9 @@ class DtxTests(TestBase):
self.assertMessageCount(0, "queue-b")
#commit
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status)
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status)
- #should close and reopen channel to ensure no unacked messages are held
+ #should close and reopen session to ensure no unacked messages are held
self.reset_channel()
#check result
@@ -74,19 +75,20 @@ class DtxTests(TestBase):
"""
Test basic two-phase commit behaviour.
"""
- channel = self.channel
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "prepare-commit")
#prepare
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
#neither queue should have any messages accessible
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(0, "queue-b")
#commit
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status)
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status)
self.reset_channel()
@@ -100,7 +102,8 @@ class DtxTests(TestBase):
"""
Test basic rollback behaviour.
"""
- channel = self.channel
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "rollback")
@@ -109,7 +112,7 @@ class DtxTests(TestBase):
self.assertMessageCount(0, "queue-b")
#rollback
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
self.reset_channel()
@@ -122,19 +125,20 @@ class DtxTests(TestBase):
"""
Test basic rollback behaviour after the transaction has been prepared.
"""
- channel = self.channel
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "prepare-rollback")
#prepare
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
#neither queue should have any messages accessible
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(0, "queue-b")
#rollback
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
self.reset_channel()
@@ -148,17 +152,17 @@ class DtxTests(TestBase):
check that an error is flagged if select is not issued before
start or end
"""
- channel = self.channel
+ session = self.session
tx = self.xid("dummy")
try:
- channel.dtx_demarcation_start(xid=tx)
+ session.dtx_start(xid=tx)
#if we get here we have failed, but need to do some cleanup:
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
- self.fail("Channel not selected for use with dtx, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
+ session.dtx_end(xid=tx)
+ session.dtx_rollback(xid=tx)
+ self.fail("Session not selected for use with dtx, expected exception!")
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
def test_start_already_known(self):
"""
@@ -166,36 +170,35 @@ class DtxTests(TestBase):
transaction that is already known is not allowed (unless the
join flag is set).
"""
- #create two channels on different connection & select them for use with dtx:
- channel1 = self.channel
- channel1.dtx_demarcation_select()
+ #create two sessions on different connection & select them for use with dtx:
+ session1 = self.session
+ session1.dtx_select()
other = self.connect()
- channel2 = other.channel(1)
- channel2.session_open()
- channel2.dtx_demarcation_select()
+ session2 = other.session("other", 0)
+ session2.dtx_select()
#create a xid
tx = self.xid("dummy")
- #start work on one channel under that xid:
- channel1.dtx_demarcation_start(xid=tx)
+ #start work on one session under that xid:
+ session1.dtx_start(xid=tx)
#then start on the other without the join set
failed = False
try:
- channel2.dtx_demarcation_start(xid=tx)
- except Closed, e:
+ session2.dtx_start(xid=tx)
+ except SessionException, e:
failed = True
error = e
#cleanup:
if not failed:
- channel2.dtx_demarcation_end(xid=tx)
+ session2.dtx_end(xid=tx)
other.close()
- channel1.dtx_demarcation_end(xid=tx)
- channel1.dtx_coordination_rollback(xid=tx)
+ session1.dtx_end(xid=tx)
+ session1.dtx_rollback(xid=tx)
#verification:
- if failed: self.assertConnectionException(503, e.args[0])
+ if failed: self.assertEquals(503, error.args[0].error_code)
else: self.fail("Xid already known, expected exception!")
def test_forget_xid_on_completion(self):
@@ -205,69 +208,69 @@ class DtxTests(TestBase):
"""
#do some transactional work & complete the transaction
self.test_simple_commit()
- # channel has been reset, so reselect for use with dtx
- self.channel.dtx_demarcation_select()
+ # session has been reset, so reselect for use with dtx
+ self.session.dtx_select()
#start association for the same xid as the previously completed txn
tx = self.xid("my-xid")
- self.channel.dtx_demarcation_start(xid=tx)
- self.channel.dtx_demarcation_end(xid=tx)
- self.channel.dtx_coordination_rollback(xid=tx)
+ self.session.dtx_start(xid=tx)
+ self.session.dtx_end(xid=tx)
+ self.session.dtx_rollback(xid=tx)
def test_start_join_and_resume(self):
"""
Ensure the correct error is signalled when both the join and
resume flags are set on starting an association between a
- channel and a transcation.
+ session and a transcation.
"""
- channel = self.channel
- channel.dtx_demarcation_select()
+ session = self.session
+ session.dtx_select()
tx = self.xid("dummy")
try:
- channel.dtx_demarcation_start(xid=tx, join=True, resume=True)
+ session.dtx_start(xid=tx, join=True, resume=True)
#failed, but need some cleanup:
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
+ session.dtx_end(xid=tx)
+ session.dtx_rollback(xid=tx)
self.fail("Join and resume both set, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
def test_start_join(self):
"""
- Verify 'join' behaviour, where a channel is associated with a
- transaction that is already associated with another channel.
+ Verify 'join' behaviour, where a session is associated with a
+ transaction that is already associated with another session.
"""
- #create two channels & select them for use with dtx:
- channel1 = self.channel
- channel1.dtx_demarcation_select()
+ guard = self.keepQueuesAlive(["one", "two"])
+ #create two sessions & select them for use with dtx:
+ session1 = self.session
+ session1.dtx_select()
- channel2 = self.client.channel(2)
- channel2.session_open()
- channel2.dtx_demarcation_select()
+ session2 = self.conn.session("second", 2)
+ session2.dtx_select()
#setup
- channel1.queue_declare(queue="one", exclusive=True, auto_delete=True)
- channel1.queue_declare(queue="two", exclusive=True, auto_delete=True)
- channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
- channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+ session1.queue_declare(queue="one", auto_delete=True)
+ session1.queue_declare(queue="two", auto_delete=True)
+ session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage"))
+ session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage"))
#create a xid
tx = self.xid("dummy")
- #start work on one channel under that xid:
- channel1.dtx_demarcation_start(xid=tx)
+ #start work on one session under that xid:
+ session1.dtx_start(xid=tx)
#then start on the other with the join flag set
- channel2.dtx_demarcation_start(xid=tx, join=True)
+ session2.dtx_start(xid=tx, join=True)
- #do work through each channel
- self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two'
- self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one'
+ #do work through each session
+ self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two'
+ self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one'
- #mark end on both channels
- channel1.dtx_demarcation_end(xid=tx)
- channel2.dtx_demarcation_end(xid=tx)
+ #mark end on both sessions
+ session1.dtx_end(xid=tx)
+ session2.dtx_end(xid=tx)
#commit and check
- channel1.dtx_coordination_commit(xid=tx, one_phase=True)
+ session1.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
self.assertMessageCount(1, "two")
self.assertMessageId("a", "two")
@@ -278,27 +281,27 @@ class DtxTests(TestBase):
"""
Test suspension and resumption of an association
"""
- channel = self.channel
- channel.dtx_demarcation_select()
+ session = self.session
+ session.dtx_select()
#setup
- channel.queue_declare(queue="one", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="two", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
- channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+ session.queue_declare(queue="one", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="two", exclusive=True, auto_delete=True)
+ session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
+ session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
tx = self.xid("dummy")
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
- channel.dtx_demarcation_end(xid=tx, suspend=True)
+ session.dtx_start(xid=tx)
+ self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
+ session.dtx_end(xid=tx, suspend=True)
- channel.dtx_demarcation_start(xid=tx, resume=True)
- self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
- channel.dtx_demarcation_end(xid=tx)
+ session.dtx_start(xid=tx, resume=True)
+ self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
+ session.dtx_end(xid=tx)
#commit and check
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
+ session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
self.assertMessageCount(1, "two")
self.assertMessageId("a", "two")
@@ -310,27 +313,27 @@ class DtxTests(TestBase):
done on another transaction when the first transaction is
suspended
"""
- channel = self.channel
- channel.dtx_demarcation_select()
+ session = self.session
+ session.dtx_select()
#setup
- channel.queue_declare(queue="one", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="two", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
- channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+ session.queue_declare(queue="one", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="two", exclusive=True, auto_delete=True)
+ session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
+ session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
tx = self.xid("dummy")
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
- channel.dtx_demarcation_end(xid=tx, suspend=True)
+ session.dtx_start(xid=tx)
+ self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
+ session.dtx_end(xid=tx, suspend=True)
- channel.dtx_demarcation_start(xid=tx, resume=True)
- self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
- channel.dtx_demarcation_end(xid=tx)
+ session.dtx_start(xid=tx, resume=True)
+ self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
+ session.dtx_end(xid=tx)
#commit and check
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
+ session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
self.assertMessageCount(1, "two")
self.assertMessageId("a", "two")
@@ -340,24 +343,23 @@ class DtxTests(TestBase):
"""
Verify that the correct error is signalled if the suspend and
fail flag are both set when disassociating a transaction from
- the channel
+ the session
"""
- channel = self.channel
- channel.dtx_demarcation_select()
+ session = self.session
+ session.dtx_select()
tx = self.xid("suspend_and_fail")
- channel.dtx_demarcation_start(xid=tx)
+ session.dtx_start(xid=tx)
try:
- channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True)
+ session.dtx_end(xid=tx, suspend=True, fail=True)
self.fail("Suspend and fail both set, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
#cleanup
other = self.connect()
- channel = other.channel(1)
- channel.session_open()
- channel.dtx_coordination_rollback(xid=tx)
- channel.session_close()
+ session = other.session("cleanup", 1)
+ session.dtx_rollback(xid=tx)
+ session.close()
other.close()
@@ -365,51 +367,51 @@ class DtxTests(TestBase):
"""
Verifies that the correct exception is thrown when an attempt
is made to end the association for a xid not previously
- associated with the channel
+ associated with the session
"""
- channel = self.channel
- channel.dtx_demarcation_select()
+ session = self.session
+ session.dtx_select()
tx = self.xid("unknown-xid")
try:
- channel.dtx_demarcation_end(xid=tx)
+ session.dtx_end(xid=tx)
self.fail("Attempted to end association with unknown xid, expected exception!")
- except Closed, e:
+ except SessionException, e:
#FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming...
- self.assertConnectionException(503, e.args[0])
+ self.assertEquals(503, e.args[0].error_code)
def test_end(self):
"""
Verify that the association is terminated by end and subsequent
operations are non-transactional
"""
- channel = self.client.channel(2)
- channel.session_open()
- channel.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True)
+ guard = self.keepQueuesAlive(["tx-queue"])
+ session = self.conn.session("alternate", 1)
+ session.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True)
#publish a message under a transaction
- channel.dtx_demarcation_select()
+ session.dtx_select()
tx = self.xid("dummy")
- channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"one"}, body="DtxMessage"))
- channel.dtx_demarcation_end(xid=tx)
+ session.dtx_start(xid=tx)
+ session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage"))
+ session.dtx_end(xid=tx)
#now that association with txn is ended, publish another message
- channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"two"}, body="DtxMessage"))
+ session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage"))
#check the second message is available, but not the first
self.assertMessageCount(1, "tx-queue")
- self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1)
- msg = self.client.queue("results").get(timeout=1)
- self.assertEqual("two", msg.content['message_id'])
- channel.message_cancel(destination="results")
- #ack the message then close the channel
- msg.complete()
- channel.session_close()
-
- channel = self.channel
+ self.subscribe(session, queue="tx-queue", destination="results")
+ msg = session.incoming("results").get(timeout=1)
+ self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id'))
+ session.message_cancel(destination="results")
+ #ack the message then close the session
+ session.message_accept(RangedSet(msg.id))
+ session.close()
+
+ session = self.session
#commit the transaction and check that the first message (and
#only the first message) is then delivered
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
+ session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "tx-queue")
self.assertMessageId("one", "tx-queue")
@@ -419,27 +421,26 @@ class DtxTests(TestBase):
transaction in question has already been prepared.
"""
other = self.connect()
- tester = other.channel(1)
- tester.session_open()
+ tester = other.session("tester", 1)
tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
- tester.dtx_demarcation_select()
+ tester.dtx_select()
tx = self.xid("dummy")
- tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
- tester.dtx_demarcation_end(xid=tx)
- tester.dtx_coordination_prepare(xid=tx)
+ tester.dtx_start(xid=tx)
+ tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+ tester.dtx_end(xid=tx)
+ tester.dtx_prepare(xid=tx)
failed = False
try:
- tester.dtx_coordination_commit(xid=tx, one_phase=True)
- except Closed, e:
+ tester.dtx_commit(xid=tx, one_phase=True)
+ except SessionException, e:
failed = True
error = e
if failed:
- self.channel.dtx_coordination_rollback(xid=tx)
- self.assertConnectionException(503, e.args[0])
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(503, error.args[0].error_code)
else:
- tester.session_close()
+ tester.close()
other.close()
self.fail("Invalid use of one_phase=True, expected exception!")
@@ -453,99 +454,99 @@ class DtxTests(TestBase):
transaction in question has already been prepared.
"""
other = self.connect()
- tester = other.channel(1)
- tester.session_open()
+ tester = other.session("tester", 1)
tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
- tester.dtx_demarcation_select()
+ tester.dtx_select()
tx = self.xid("dummy")
- tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
- tester.dtx_demarcation_end(xid=tx)
+ tester.dtx_start(xid=tx)
+ tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+ tester.dtx_end(xid=tx)
failed = False
try:
- tester.dtx_coordination_commit(xid=tx, one_phase=False)
- except Closed, e:
+ tester.dtx_commit(xid=tx, one_phase=False)
+ except SessionException, e:
failed = True
error = e
if failed:
- self.channel.dtx_coordination_rollback(xid=tx)
- self.assertConnectionException(503, e.args[0])
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(503, error.args[0].error_code)
else:
- tester.session_close()
+ tester.close()
other.close()
self.fail("Invalid use of one_phase=False, expected exception!")
def test_implicit_end(self):
"""
- Test that an association is implicitly ended when the channel
+ Test that an association is implicitly ended when the session
is closed (whether by exception or explicit client request)
and the transaction in question is marked as rollback only.
"""
- channel1 = self.channel
- channel2 = self.client.channel(2)
- channel2.session_open()
+ session1 = self.session
+ session2 = self.conn.session("other", 2)
#setup:
- channel2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
- channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
+ session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever"))
tx = self.xid("dummy")
- channel2.dtx_demarcation_select()
- channel2.dtx_demarcation_start(xid=tx)
- channel2.message_subscribe(queue="dummy", destination="dummy", confirm_mode=1)
- channel2.message_flow(destination="dummy", unit=0, value=1)
- channel2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF)
- self.client.queue("dummy").get(timeout=1).complete()
- channel2.message_cancel(destination="dummy")
- channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
- channel2.session_close()
+ session2.dtx_select()
+ session2.dtx_start(xid=tx)
+ session2.message_subscribe(queue="dummy", destination="dummy")
+ session2.message_flow(destination="dummy", unit=0, value=1)
+ session2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF)
+ msg = session2.incoming("dummy").get(timeout=1)
+ session2.message_accept(RangedSet(msg.id))
+ session2.message_cancel(destination="dummy")
+ session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever"))
+ session2.close()
- self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status)
- channel1.dtx_coordination_rollback(xid=tx)
+ self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status)
+ session1.dtx_rollback(xid=tx)
def test_get_timeout(self):
"""
Check that get-timeout returns the correct value, (and that a
transaction with a timeout can complete normally)
"""
- channel = self.channel
+ session = self.session
tx = self.xid("dummy")
- channel.dtx_demarcation_select()
- channel.dtx_demarcation_start(xid=tx)
- self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout)
- channel.dtx_coordination_set_timeout(xid=tx, timeout=60)
- self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout)
- self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).status)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ session.dtx_select()
+ session.dtx_start(xid=tx)
+ self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout)
+ session.dtx_set_timeout(xid=tx, timeout=60)
+ self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout)
+ self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
def test_set_timeout(self):
"""
Test the timeout of a transaction results in the expected
behaviour
"""
- #open new channel to allow self.channel to be used in checking te queue
- channel = self.client.channel(2)
- channel.session_open()
+
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ #open new session to allow self.session to be used in checking the queue
+ session = self.conn.session("worker", 1)
#setup:
tx = self.xid("dummy")
- channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage"))
-
- channel.dtx_demarcation_select()
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "queue-a", "queue-b")
- channel.dtx_coordination_set_timeout(xid=tx, timeout=2)
+ session.queue_declare(queue="queue-a", auto_delete=True)
+ session.queue_declare(queue="queue-b", auto_delete=True)
+ session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage"))
+
+ session.dtx_select()
+ session.dtx_start(xid=tx)
+ self.swap(session, "queue-a", "queue-b")
+ session.dtx_set_timeout(xid=tx, timeout=2)
sleep(3)
#check that the work has been rolled back already
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
self.assertMessageId("timeout", "queue-a")
#check the correct codes are returned when we try to complete the txn
- self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).status)
- self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).status)
+ self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status)
+ self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)
@@ -553,28 +554,29 @@ class DtxTests(TestBase):
"""
Test basic recover behaviour
"""
- channel = self.channel
+ session = self.session
- channel.dtx_demarcation_select()
- channel.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ session.dtx_select()
+ session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
prepared = []
for i in range(1, 10):
tx = self.xid("tx%s" % (i))
- channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="message%s" % (i)))
- channel.dtx_demarcation_end(xid=tx)
+ session.dtx_start(xid=tx)
+ session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i)))
+ session.dtx_end(xid=tx)
if i in [2, 5, 6, 8]:
- channel.dtx_coordination_prepare(xid=tx)
+ session.dtx_prepare(xid=tx)
prepared.append(tx)
else:
- channel.dtx_coordination_rollback(xid=tx)
+ session.dtx_rollback(xid=tx)
- xids = channel.dtx_coordination_recover().in_doubt
+ xids = session.dtx_recover().in_doubt
+ print "xids=%s" % xids
#rollback the prepared transactions returned by recover
for x in xids:
- channel.dtx_coordination_rollback(xid=x)
+ session.dtx_rollback(xid=x)
#validate against the expected list of prepared transactions
actual = set(xids)
@@ -585,61 +587,90 @@ class DtxTests(TestBase):
missing = expected.difference(actual)
extra = actual.difference(expected)
for x in missing:
- channel.dtx_coordination_rollback(xid=x)
+ session.dtx_rollback(xid=x)
self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
def test_bad_resume(self):
"""
Test that a resume on a session not selected for use with dtx fails
"""
- channel = self.channel
+ session = self.session
try:
- channel.dtx_demarcation_start(resume=True)
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
+ session.dtx_start(resume=True)
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
def xid(self, txid):
DtxTests.tx_counter += 1
branchqual = "v%s" % DtxTests.tx_counter
- return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
-
+ return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
+
def txswap(self, tx, id):
- channel = self.channel
+ session = self.session
#declare two queues:
- channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="queue-a", auto_delete=True)
+ session.queue_declare(queue="queue-b", auto_delete=True)
+
#put message with specified id on one queue:
- channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage"))
+ dp=session.delivery_properties(routing_key="queue-a")
+ mp=session.message_properties(correlation_id=id)
+ session.message_transfer(message=Message(dp, mp, "DtxMessage"))
#start the transaction:
- channel.dtx_demarcation_select()
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status)
+ session.dtx_select()
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
#'swap' the message from one queue to the other, under that transaction:
- self.swap(self.channel, "queue-a", "queue-b")
+ self.swap(self.session, "queue-a", "queue-b")
#mark the end of the transactional work:
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
- def swap(self, channel, src, dest):
+ def swap(self, session, src, dest):
#consume from src:
- channel.message_subscribe(destination="temp-swap", queue=src, confirm_mode=1)
- channel.message_flow(destination="temp-swap", unit=0, value=1)
- channel.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
- msg = self.client.queue("temp-swap").get(timeout=1)
- channel.message_cancel(destination="temp-swap")
- msg.complete();
-
- #re-publish to dest
- channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id']},
- body=msg.content.body))
+ session.message_subscribe(destination="temp-swap", queue=src)
+ session.message_flow(destination="temp-swap", unit=0, value=1)
+ session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
+ msg = session.incoming("temp-swap").get(timeout=1)
+ session.message_cancel(destination="temp-swap")
+ session.message_accept(RangedSet(msg.id))
+ #todo: also complete at this point?
+
+ #re-publish to dest:
+ dp=session.delivery_properties(routing_key=dest)
+ mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id'))
+ session.message_transfer(message=Message(dp, mp, msg.body))
def assertMessageCount(self, expected, queue):
- self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count)
+ self.assertEqual(expected, self.session.queue_query(queue=queue).message_count)
def assertMessageId(self, expected, queue):
- self.channel.message_subscribe(queue=queue, destination="results")
- self.channel.message_flow(destination="results", unit=0, value=1)
- self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF)
- self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id'])
- self.channel.message_cancel(destination="results")
+ self.session.message_subscribe(queue=queue, destination="results")
+ self.session.message_flow(destination="results", unit=0, value=1)
+ self.session.message_flow(destination="results", unit=1, value=0xFFFFFFFF)
+ self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id'))
+ self.session.message_cancel(destination="results")
+
+ def getMessageProperty(self, msg, prop):
+ for h in msg.headers:
+ if hasattr(h, prop): return getattr(h, prop)
+ return None
+
+ def keepQueuesAlive(self, names):
+ session = self.conn.session("nasty", 99)
+ for n in names:
+ session.queue_declare(queue=n, auto_delete=True)
+ session.message_subscribe(destination=n, queue=n)
+ return session
+
+ def createMessage(self, session, key, id, body):
+ dp=session.delivery_properties(routing_key=key)
+ mp=session.message_properties(correlation_id=id)
+ session.message_transfer(message=Message(dp, mp, body))
+
+ def subscribe(self, session=None, **keys):
+ session = session or self.session
+ consumer_tag = keys["destination"]
+ session.message_subscribe(**keys)
+ session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)