diff options
-rwxr-xr-x | cpp/rubygen/99-0/Proxy.rb | 1 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxAck.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 128 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 34 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 61 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.h | 57 | ||||
-rw-r--r-- | cpp/xml/extra.xml | 101 | ||||
-rw-r--r-- | python/cpp_failing_0-10.txt | 20 | ||||
-rw-r--r-- | python/qpid/datatypes.py | 3 | ||||
-rw-r--r-- | python/qpid/session.py | 5 | ||||
-rw-r--r-- | python/qpid/spec010.py | 2 | ||||
-rw-r--r-- | python/tests_0-10/dtx.py | 505 |
21 files changed, 691 insertions, 276 deletions
diff --git a/cpp/rubygen/99-0/Proxy.rb b/cpp/rubygen/99-0/Proxy.rb index 85db52da5b..87d809d4ad 100755 --- a/cpp/rubygen/99-0/Proxy.rb +++ b/cpp/rubygen/99-0/Proxy.rb @@ -43,6 +43,7 @@ EOS include "qpid/framing/Proxy.h" include "qpid/framing/Array.h" include "qpid/framing/amqp_types.h" + include "qpid/framing/amqp_structs.h" namespace("qpid::framing") { cpp_class(@classname, "public Proxy") { public diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index d40bbf699e..b71bd754f7 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -229,6 +229,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/System.cpp \ qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ + qpid/broker/TxAccept.cpp \ qpid/broker/TxAck.cpp \ qpid/broker/TxBuffer.cpp \ qpid/broker/TxPublish.cpp \ @@ -352,6 +353,7 @@ nobase_include_HEADERS = \ qpid/broker/Timer.h \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ + qpid/broker/TxAccept.h \ qpid/broker/TxAck.h \ qpid/broker/TxBuffer.h \ qpid/broker/TxOp.h \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 526b58cb14..d7fd20db1d 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -340,7 +340,7 @@ void BrokerAdapter::TxHandlerImpl::select() void BrokerAdapter::TxHandlerImpl::commit() { - state.commit(&getBroker().getStore()); + state.commit(&getBroker().getStore(), true); } void BrokerAdapter::TxHandlerImpl::rollback() diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 0e69a03465..b28c4ebdcc 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -84,6 +84,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); } Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); } Tx010Handler* getTx010Handler() { throw framing::NotImplementedException("Class not implemented"); } + Dtx010Handler* getDtx010Handler() { throw framing::NotImplementedException("Class not implemented"); } Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); } // Handlers no longer implemented in BrokerAdapter: diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index ca90f32a5d..c5bf325afc 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -67,12 +67,8 @@ void DeliveryRecord::setEnded() ended = true; //reset msg pointer, don't need to hold on to it anymore msg.payload = boost::intrusive_ptr<Message>(); -} -void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ - if (acquired && !ended) { - queue->dequeue(ctxt, msg.payload); - } + QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id); } bool DeliveryRecord::matches(DeliveryId tag) const{ @@ -136,6 +132,12 @@ void DeliveryRecord::accept(TransactionContext* ctxt) { } } +void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ + if (acquired && !ended) { + queue->dequeue(ctxt, msg.payload); + } +} + void DeliveryRecord::reject() { Exchange::shared_ptr alternate = queue->getAlternateExchange(); diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index b2672345b4..eed7b81748 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -55,8 +55,6 @@ class DeliveryRecord{ bool completed; bool ended; - void setEnded(); - public: DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token, const DeliveryId id, bool acquired, bool confirmed = false); @@ -76,6 +74,7 @@ class DeliveryRecord{ void acquire(DeliveryIds& results); void complete(); void accept(TransactionContext* ctxt); + void setEnded(); bool isAcquired() const { return acquired; } bool isComplete() const { return completed; } diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp index 25186b4102..0e6f94d4e0 100644 --- a/cpp/src/qpid/broker/DtxAck.cpp +++ b/cpp/src/qpid/broker/DtxAck.cpp @@ -30,7 +30,6 @@ DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& { remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()), not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked))); - unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)); } bool DtxAck::prepare(TransactionContext* ctxt) throw() diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index cef3a4b02b..fb6b3f019e 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -54,6 +54,7 @@ void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionCon bool DtxManager::prepare(const std::string& xid) { + QPID_LOG(debug, "preparing: " << xid); try { return getWork(xid)->prepare(); } catch (DtxTimeoutException& e) { @@ -64,6 +65,7 @@ bool DtxManager::prepare(const std::string& xid) bool DtxManager::commit(const std::string& xid, bool onePhase) { + QPID_LOG(debug, "committing: " << xid); try { bool result = getWork(xid)->commit(onePhase); remove(xid); @@ -76,6 +78,7 @@ bool DtxManager::commit(const std::string& xid, bool onePhase) void DtxManager::rollback(const std::string& xid) { + QPID_LOG(debug, "rolling back: " << xid); try { getWork(xid)->rollback(); remove(xid); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 1b5bcc2f7e..6f5577de5a 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -29,6 +29,7 @@ #include "Message.h" #include "SemanticHandler.h" #include "SessionHandler.h" +#include "TxAccept.h" #include "TxAck.h" #include "TxPublish.h" #include "qpid/framing/reply_exceptions.h" @@ -119,12 +120,14 @@ void SemanticState::startTx() txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void SemanticState::commit(MessageStore* const store) +void SemanticState::commit(MessageStore* const store, bool completeOnCommit) { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); - TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); + TxOp::shared_ptr txAck(completeOnCommit ? + static_cast<TxOp*>(new TxAck(accumulatedAck, unacked)) : + static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked))); txBuffer->enlist(txAck); if (txBuffer->commitLocal(store)) { accumulatedAck.clear(); @@ -402,9 +405,11 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); if (dtxBuffer.get()) { - //if enlisted in a dtx, remove the relevant slice from - //unacked and record it against that transaction + //if enlisted in a dtx, copy the relevant slice from + //unacked and record it against that transaction: TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + //then remove that slice from the unacked record: + unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &accumulatedAck)); accumulatedAck.clear(); dtxBuffer->enlist(txAck); } @@ -660,11 +665,19 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet if (dtxBuffer.get()) { - //if enlisted in a dtx, remove the relevant slice from + //if enlisted in a dtx, copy the relevant slice from //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); dtxBuffer->enlist(txAck); + + //mark the relevant messages as 'ended' in unacked + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded)); + + //if the messages are already completed, they can be + //removed from the record + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); + } } else { for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0)); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index f9f343de38..4bcd0dddb3 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -167,7 +167,7 @@ class SemanticState : public framing::FrameHandler::Chains, bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); void startTx(); - void commit(MessageStore* const store); + void commit(MessageStore* const store, bool completeOnCommit); void rollback(); void selectDtx(); void startDtx(const std::string& xid, DtxManager& mgr, bool join); diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 5d33e68fab..2091e97584 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -22,6 +22,7 @@ #include "Queue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/constants.h" #include <boost/format.hpp> #include <boost/cast.hpp> #include <boost/bind.hpp> @@ -40,7 +41,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) : queueImpl(s), messageImpl(s), executionImpl(s), - txImpl(s) + txImpl(s), + dtxImpl(s) {} @@ -431,7 +433,7 @@ void SessionAdapter::TxHandlerImpl::select() void SessionAdapter::TxHandlerImpl::commit() { - state.commit(&getBroker().getStore()); + state.commit(&getBroker().getStore(), false); } void SessionAdapter::TxHandlerImpl::rollback() @@ -439,6 +441,128 @@ void SessionAdapter::TxHandlerImpl::rollback() state.rollback(); } +std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid) +{ + std::stringstream out; + out << xid.getFormat() << xid.getGlobalId() << xid.getBranchId(); + return out.str(); +} + + +void SessionAdapter::DtxHandlerImpl::select() +{ + state.selectDtx(); +} + +Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, + bool fail, + bool suspend) +{ + try { + if (fail) { + state.endDtx(convert(xid), true); + if (suspend) { + throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); + } else { + return Dtx010EndResult(XA_RBROLLBACK); + } + } else { + if (suspend) { + state.suspendDtx(convert(xid)); + } else { + state.endDtx(convert(xid), false); + } + return Dtx010EndResult(XA_OK); + } + } catch (const DtxTimeoutException& e) { + return Dtx010EndResult(XA_RBTIMEOUT); + } +} + +Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid, + bool join, + bool resume) +{ + if (join && resume) { + throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set.")); + } + try { + if (resume) { + state.resumeDtx(convert(xid)); + } else { + state.startDtx(convert(xid), getBroker().getDtxManager(), join); + } + return Dtx010StartResult(XA_OK); + } catch (const DtxTimeoutException& e) { + return Dtx010StartResult(XA_RBTIMEOUT); + } +} + +Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid) +{ + try { + bool ok = getBroker().getDtxManager().prepare(convert(xid)); + return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK); + } catch (const DtxTimeoutException& e) { + return Dtx010PrepareResult(XA_RBTIMEOUT); + } +} + +Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid, + bool onePhase) +{ + try { + bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); + return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK); + } catch (const DtxTimeoutException& e) { + return Dtx010CommitResult(XA_RBTIMEOUT); + } +} + + +Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid) +{ + try { + getBroker().getDtxManager().rollback(convert(xid)); + return Dtx010RollbackResult(XA_OK); + } catch (const DtxTimeoutException& e) { + return Dtx010RollbackResult(XA_RBTIMEOUT); + } +} + +Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover() +{ + std::set<std::string> xids; + getBroker().getStore().collectPreparedXids(xids); + + //TODO: remove the need to copy from one container type to another + std::vector<std::string> data; + for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { + data.push_back(*i); + } + Array indoubt(data); + return Dtx010RecoverResult(indoubt); +} + +void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid) +{ + //Currently no heuristic completion is supported, so this should never be used. + throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); +} + +Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid) +{ + uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid)); + return Dtx010GetTimeoutResult(timeout); +} + + +void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid, + u_int32_t timeout) +{ + getBroker().getDtxManager().setTimeout(convert(xid), timeout); +} + Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const { Queue::shared_ptr queue; diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index dad89cd123..c8aa9008cc 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -61,6 +61,7 @@ class Queue; Queue010Handler* getQueue010Handler(){ return &queueImpl; } Execution010Handler* getExecution010Handler(){ return &executionImpl; } Tx010Handler* getTx010Handler(){ return &txImpl; } + Dtx010Handler* getDtx010Handler(){ return &dtxImpl; } BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); } ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented"); } @@ -218,12 +219,45 @@ class Queue; void rollback(); }; + class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper + { + std::string convert(const framing::Xid010& xid); + + public: + DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {} + + void select(); + + framing::Dtx010StartResult start(const framing::Xid010& xid, + bool join, + bool resume); + + framing::Dtx010EndResult end(const framing::Xid010& xid, + bool fail, + bool suspend); + + framing::Dtx010CommitResult commit(const framing::Xid010& xid, + bool onePhase); + + void forget(const framing::Xid010& xid); + + framing::Dtx010GetTimeoutResult getTimeout(const framing::Xid010& xid); + + framing::Dtx010PrepareResult prepare(const framing::Xid010& xid); + + framing::Dtx010RecoverResult recover(); + + framing::Dtx010RollbackResult rollback(const framing::Xid010& xid); + + void setTimeout(const framing::Xid010& xid, uint32_t timeout); + }; ExchangeHandlerImpl exchangeImpl; QueueHandlerImpl queueImpl; MessageHandlerImpl messageImpl; ExecutionHandlerImpl executionImpl; TxHandlerImpl txImpl; + DtxHandlerImpl dtxImpl; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 105f946dcb..35ad562a22 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -178,6 +178,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& if (!invocation.wasHandled()) { throw NotImplementedException("Not implemented"); } else if (invocation.hasResult()) { + nextOut++;//execution result is now a command, so the counter must be incremented getProxy().getExecution010().result(id, invocation.getResult()); } if (method->isSync()) { diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp new file mode 100644 index 0000000000..634d066ecc --- /dev/null +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "TxAccept.h" +#include "qpid/log/Statement.h" + +using std::bind1st; +using std::bind2nd; +using std::mem_fun_ref; +using namespace qpid::broker; +using qpid::framing::AccumulatedAck; + +TxAccept::TxAccept(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : + acked(_acked), unacked(_unacked) {} + +bool TxAccept::prepare(TransactionContext* ctxt) throw() +{ + try{ + //dequeue messages from their respective queues: + for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { + if (i->coveredBy(&acked)) { + i->dequeue(ctxt); + } + } + return true; + }catch(const std::exception& e){ + QPID_LOG(error, "Failed to prepare: " << e.what()); + return false; + }catch(...){ + QPID_LOG(error, "Failed to prepare"); + return false; + } +} + +void TxAccept::commit() throw() +{ + for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { + if (i->coveredBy(&acked)) i->setEnded(); + } + + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); +} + +void TxAccept::rollback() throw() {} diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h new file mode 100644 index 0000000000..011acf5d9e --- /dev/null +++ b/cpp/src/qpid/broker/TxAccept.h @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TxAccept_ +#define _TxAccept_ + +#include <algorithm> +#include <functional> +#include <list> +#include "qpid/framing/AccumulatedAck.h" +#include "DeliveryRecord.h" +#include "TxOp.h" + +namespace qpid { + namespace broker { + /** + * Defines the transactional behaviour for accepts received by + * a transactional channel. + */ + class TxAccept : public TxOp{ + framing::AccumulatedAck& acked; + std::list<DeliveryRecord>& unacked; + + public: + /** + * @param acked a representation of the accumulation of + * acks received + * @param unacked the record of delivered messages + */ + TxAccept(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~TxAccept(){} + }; + } +} + + +#endif diff --git a/cpp/xml/extra.xml b/cpp/xml/extra.xml index 0acc2260ad..6080800f60 100644 --- a/cpp/xml/extra.xml +++ b/cpp/xml/extra.xml @@ -33,6 +33,14 @@ </doc> </domain> + <domain name="xid010"> + <struct size="short" pack="short"> + <field name="format" domain="long" /> + <field name="global-id" domain="shortstr" /> + <field name="branch-id" domain="shortstr" /> + </struct> + </domain> + <domain name="delivery-properties-010"> <struct size="long" pack="short" type="1025"> <field name="discard-unroutable" domain="bit" label="controls discard of unroutable messages"/> @@ -739,6 +747,99 @@ </method> </class> +<class name="dtx010" index="6"> + <doc>blah, blah</doc> + <method name = "select" index="1"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + </method> + <method name = "start" index="2"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <field name="join" domain="bit"/> + <field name="resume" domain="bit"/> + <result> + <struct size="long" pack="short" type="1"> + <field name="status" domain="short" /> + </struct> + </result> + </method> + <method name = "end" index="3"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <field name="fail" domain="bit"/> + <field name="suspend" domain="bit"/> + <result> + <struct size="long" pack="short" type="1"> + <field name="status" domain="short" /> + </struct> + </result> + </method> + <method name = "commit" index="4"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <field name="one-phase" domain="bit"/> + <result> + <struct size="long" pack="short" type="1"> + <field name="status" domain="short" /> + </struct> + </result> + </method> + <method name = "forget" index="5"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + </method> + <method name = "get-timeout" index="6"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <result> + <struct size="long" pack="short" type="2"> + <field name="timeout" domain="long" /> + </struct> + </result> + </method> + <method name = "prepare" index="7"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <result> + <struct size="long" pack="short" type="1"> + <field name="status" domain="short" /> + </struct> + </result> + </method> + <method name = "recover" index="8"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <result> + <struct size="long" pack="short" type="3"> + <field name="in-doubt" domain="array" /> + </struct> + </result> + </method> + <method name = "rollback" index="9"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <result> + <struct size="long" pack="short" type="1"> + <field name="status" domain="short" /> + </struct> + </result> + </method> + <method name = "set-timeout" index="10"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <field name="timeout" domain="long"/> + </method> +</class> + <class name="exchange010" index="7"> <doc>blah, blah</doc> <method name = "declare" index="1"> diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index 70cd3d19ed..58cad3344b 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -2,27 +2,7 @@ tests.codec.FieldTableTestCase.test_field_table_decode tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair tests.codec.FieldTableTestCase.test_field_table_name_value_pair tests_0-10.execution.ExecutionTests.test_flush -tests_0-10.dtx.DtxTests.test_bad_resume -tests_0-10.dtx.DtxTests.test_end -tests_0-10.dtx.DtxTests.test_end_suspend_and_fail -tests_0-10.dtx.DtxTests.test_end_unknown_xid -tests_0-10.dtx.DtxTests.test_forget_xid_on_completion -tests_0-10.dtx.DtxTests.test_get_timeout -tests_0-10.dtx.DtxTests.test_implicit_end -tests_0-10.dtx.DtxTests.test_invalid_commit_one_phase_false -tests_0-10.dtx.DtxTests.test_invalid_commit_one_phase_true tests_0-10.dtx.DtxTests.test_recover -tests_0-10.dtx.DtxTests.test_select_required -tests_0-10.dtx.DtxTests.test_set_timeout -tests_0-10.dtx.DtxTests.test_simple_commit -tests_0-10.dtx.DtxTests.test_simple_prepare_commit -tests_0-10.dtx.DtxTests.test_simple_prepare_rollback -tests_0-10.dtx.DtxTests.test_simple_rollback -tests_0-10.dtx.DtxTests.test_start_already_known -tests_0-10.dtx.DtxTests.test_start_join -tests_0-10.dtx.DtxTests.test_start_join_and_resume -tests_0-10.dtx.DtxTests.test_suspend_resume -tests_0-10.dtx.DtxTests.test_suspend_start_end_resume tests_0-10.message.MessageTests.test_consume_no_local tests_0-10.message.MessageTests.test_consume_no_local_awkward tests_0-10.message.MessageTests.test_no_size diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py index 59299b6e04..0893269174 100644 --- a/python/qpid/datatypes.py +++ b/python/qpid/datatypes.py @@ -154,6 +154,7 @@ class Future: self.value = initial self._error = None self._set = threading.Event() + self.exception = exception def error(self, error): self._error = error @@ -166,7 +167,7 @@ class Future: def get(self, timeout=None): self._set.wait(timeout) if self._error != None: - raise exception(self._error) + raise self.exception(self._error) return self.value def is_set(self): diff --git a/python/qpid/session.py b/python/qpid/session.py index 4da6f883b4..bbe2b326d6 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -110,6 +110,8 @@ class Session(Invoker): for st in self.spec.structs.values(): if st.name == name: return st + if self.spec.structs_by_name.has_key(name): + return self.spec.structs_by_name[name] return None def invoke(self, type, args, kwargs): @@ -289,8 +291,9 @@ class Delegate: self.session.exceptions.append(ex) error = self.session.error() for id in self.session.results: - f = self.session.results.pop(id) + f = self.session.results[id] f.error(error) + self.session.results.clear() for q in self.session._incoming.values(): q.close(error) diff --git a/python/qpid/spec010.py b/python/qpid/spec010.py index 815c1d064a..4eb03008d0 100644 --- a/python/qpid/spec010.py +++ b/python/qpid/spec010.py @@ -257,6 +257,7 @@ class Struct(Composite): Composite.register(self, node) if self.code is not None: self.spec.structs[self.code] = self + self.spec.structs_by_name[self.name] = self def __str__(self): fields = ",\n ".join(["%s: %s" % (f.name, f.type.qname) @@ -443,6 +444,7 @@ class Spec(Node): self.controls = {} self.commands = {} self.structs = {} + self.structs_by_name = {} def encoding(self, klass): if Spec.ENCODINGS.has_key(klass): diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index f84f91c75a..2483c6f16d 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -18,12 +18,13 @@ # from qpid.client import Client, Closed from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.datatypes import Message, RangedSet +from qpid.session import SessionException +from qpid.testlib import TestBase010 from struct import pack, unpack from time import sleep -class DtxTests(TestBase): +class DtxTests(TestBase010): """ Tests for the amqp dtx related classes. @@ -43,15 +44,15 @@ class DtxTests(TestBase): tx_counter = 0 def reset_channel(self): - self.channel.session_close() - self.channel = self.client.channel(self.channel.id + 1) - self.channel.session_open() + self.session.close() + self.session = self.conn.session("dtx-session", 1) def test_simple_commit(self): """ Test basic one-phase commit behaviour. """ - channel = self.channel + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session tx = self.xid("my-xid") self.txswap(tx, "commit") @@ -60,9 +61,9 @@ class DtxTests(TestBase): self.assertMessageCount(0, "queue-b") #commit - self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status) + self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status) - #should close and reopen channel to ensure no unacked messages are held + #should close and reopen session to ensure no unacked messages are held self.reset_channel() #check result @@ -74,19 +75,20 @@ class DtxTests(TestBase): """ Test basic two-phase commit behaviour. """ - channel = self.channel + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session tx = self.xid("my-xid") self.txswap(tx, "prepare-commit") #prepare - self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status) + self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status) #neither queue should have any messages accessible self.assertMessageCount(0, "queue-a") self.assertMessageCount(0, "queue-b") #commit - self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status) + self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status) self.reset_channel() @@ -100,7 +102,8 @@ class DtxTests(TestBase): """ Test basic rollback behaviour. """ - channel = self.channel + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session tx = self.xid("my-xid") self.txswap(tx, "rollback") @@ -109,7 +112,7 @@ class DtxTests(TestBase): self.assertMessageCount(0, "queue-b") #rollback - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) self.reset_channel() @@ -122,19 +125,20 @@ class DtxTests(TestBase): """ Test basic rollback behaviour after the transaction has been prepared. """ - channel = self.channel + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session tx = self.xid("my-xid") self.txswap(tx, "prepare-rollback") #prepare - self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status) + self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status) #neither queue should have any messages accessible self.assertMessageCount(0, "queue-a") self.assertMessageCount(0, "queue-b") #rollback - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) self.reset_channel() @@ -148,17 +152,17 @@ class DtxTests(TestBase): check that an error is flagged if select is not issued before start or end """ - channel = self.channel + session = self.session tx = self.xid("dummy") try: - channel.dtx_demarcation_start(xid=tx) + session.dtx_start(xid=tx) #if we get here we have failed, but need to do some cleanup: - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) - self.fail("Channel not selected for use with dtx, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) + session.dtx_end(xid=tx) + session.dtx_rollback(xid=tx) + self.fail("Session not selected for use with dtx, expected exception!") + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) def test_start_already_known(self): """ @@ -166,36 +170,35 @@ class DtxTests(TestBase): transaction that is already known is not allowed (unless the join flag is set). """ - #create two channels on different connection & select them for use with dtx: - channel1 = self.channel - channel1.dtx_demarcation_select() + #create two sessions on different connection & select them for use with dtx: + session1 = self.session + session1.dtx_select() other = self.connect() - channel2 = other.channel(1) - channel2.session_open() - channel2.dtx_demarcation_select() + session2 = other.session("other", 0) + session2.dtx_select() #create a xid tx = self.xid("dummy") - #start work on one channel under that xid: - channel1.dtx_demarcation_start(xid=tx) + #start work on one session under that xid: + session1.dtx_start(xid=tx) #then start on the other without the join set failed = False try: - channel2.dtx_demarcation_start(xid=tx) - except Closed, e: + session2.dtx_start(xid=tx) + except SessionException, e: failed = True error = e #cleanup: if not failed: - channel2.dtx_demarcation_end(xid=tx) + session2.dtx_end(xid=tx) other.close() - channel1.dtx_demarcation_end(xid=tx) - channel1.dtx_coordination_rollback(xid=tx) + session1.dtx_end(xid=tx) + session1.dtx_rollback(xid=tx) #verification: - if failed: self.assertConnectionException(503, e.args[0]) + if failed: self.assertEquals(503, error.args[0].error_code) else: self.fail("Xid already known, expected exception!") def test_forget_xid_on_completion(self): @@ -205,69 +208,69 @@ class DtxTests(TestBase): """ #do some transactional work & complete the transaction self.test_simple_commit() - # channel has been reset, so reselect for use with dtx - self.channel.dtx_demarcation_select() + # session has been reset, so reselect for use with dtx + self.session.dtx_select() #start association for the same xid as the previously completed txn tx = self.xid("my-xid") - self.channel.dtx_demarcation_start(xid=tx) - self.channel.dtx_demarcation_end(xid=tx) - self.channel.dtx_coordination_rollback(xid=tx) + self.session.dtx_start(xid=tx) + self.session.dtx_end(xid=tx) + self.session.dtx_rollback(xid=tx) def test_start_join_and_resume(self): """ Ensure the correct error is signalled when both the join and resume flags are set on starting an association between a - channel and a transcation. + session and a transcation. """ - channel = self.channel - channel.dtx_demarcation_select() + session = self.session + session.dtx_select() tx = self.xid("dummy") try: - channel.dtx_demarcation_start(xid=tx, join=True, resume=True) + session.dtx_start(xid=tx, join=True, resume=True) #failed, but need some cleanup: - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) + session.dtx_end(xid=tx) + session.dtx_rollback(xid=tx) self.fail("Join and resume both set, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) def test_start_join(self): """ - Verify 'join' behaviour, where a channel is associated with a - transaction that is already associated with another channel. + Verify 'join' behaviour, where a session is associated with a + transaction that is already associated with another session. """ - #create two channels & select them for use with dtx: - channel1 = self.channel - channel1.dtx_demarcation_select() + guard = self.keepQueuesAlive(["one", "two"]) + #create two sessions & select them for use with dtx: + session1 = self.session + session1.dtx_select() - channel2 = self.client.channel(2) - channel2.session_open() - channel2.dtx_demarcation_select() + session2 = self.conn.session("second", 2) + session2.dtx_select() #setup - channel1.queue_declare(queue="one", exclusive=True, auto_delete=True) - channel1.queue_declare(queue="two", exclusive=True, auto_delete=True) - channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) - channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) + session1.queue_declare(queue="one", auto_delete=True) + session1.queue_declare(queue="two", auto_delete=True) + session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage")) + session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage")) #create a xid tx = self.xid("dummy") - #start work on one channel under that xid: - channel1.dtx_demarcation_start(xid=tx) + #start work on one session under that xid: + session1.dtx_start(xid=tx) #then start on the other with the join flag set - channel2.dtx_demarcation_start(xid=tx, join=True) + session2.dtx_start(xid=tx, join=True) - #do work through each channel - self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two' - self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one' + #do work through each session + self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two' + self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one' - #mark end on both channels - channel1.dtx_demarcation_end(xid=tx) - channel2.dtx_demarcation_end(xid=tx) + #mark end on both sessions + session1.dtx_end(xid=tx) + session2.dtx_end(xid=tx) #commit and check - channel1.dtx_coordination_commit(xid=tx, one_phase=True) + session1.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") self.assertMessageCount(1, "two") self.assertMessageId("a", "two") @@ -278,27 +281,27 @@ class DtxTests(TestBase): """ Test suspension and resumption of an association """ - channel = self.channel - channel.dtx_demarcation_select() + session = self.session + session.dtx_select() #setup - channel.queue_declare(queue="one", exclusive=True, auto_delete=True) - channel.queue_declare(queue="two", exclusive=True, auto_delete=True) - channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) - channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) + session.queue_declare(queue="one", exclusive=True, auto_delete=True) + session.queue_declare(queue="two", exclusive=True, auto_delete=True) + session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage")) + session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage")) tx = self.xid("dummy") - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' - channel.dtx_demarcation_end(xid=tx, suspend=True) + session.dtx_start(xid=tx) + self.swap(session, "one", "two")#swap 'a' from 'one' to 'two' + session.dtx_end(xid=tx, suspend=True) - channel.dtx_demarcation_start(xid=tx, resume=True) - self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' - channel.dtx_demarcation_end(xid=tx) + session.dtx_start(xid=tx, resume=True) + self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' + session.dtx_end(xid=tx) #commit and check - channel.dtx_coordination_commit(xid=tx, one_phase=True) + session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") self.assertMessageCount(1, "two") self.assertMessageId("a", "two") @@ -310,27 +313,27 @@ class DtxTests(TestBase): done on another transaction when the first transaction is suspended """ - channel = self.channel - channel.dtx_demarcation_select() + session = self.session + session.dtx_select() #setup - channel.queue_declare(queue="one", exclusive=True, auto_delete=True) - channel.queue_declare(queue="two", exclusive=True, auto_delete=True) - channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) - channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) + session.queue_declare(queue="one", exclusive=True, auto_delete=True) + session.queue_declare(queue="two", exclusive=True, auto_delete=True) + session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage")) + session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage")) tx = self.xid("dummy") - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' - channel.dtx_demarcation_end(xid=tx, suspend=True) + session.dtx_start(xid=tx) + self.swap(session, "one", "two")#swap 'a' from 'one' to 'two' + session.dtx_end(xid=tx, suspend=True) - channel.dtx_demarcation_start(xid=tx, resume=True) - self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' - channel.dtx_demarcation_end(xid=tx) + session.dtx_start(xid=tx, resume=True) + self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' + session.dtx_end(xid=tx) #commit and check - channel.dtx_coordination_commit(xid=tx, one_phase=True) + session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") self.assertMessageCount(1, "two") self.assertMessageId("a", "two") @@ -340,24 +343,23 @@ class DtxTests(TestBase): """ Verify that the correct error is signalled if the suspend and fail flag are both set when disassociating a transaction from - the channel + the session """ - channel = self.channel - channel.dtx_demarcation_select() + session = self.session + session.dtx_select() tx = self.xid("suspend_and_fail") - channel.dtx_demarcation_start(xid=tx) + session.dtx_start(xid=tx) try: - channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True) + session.dtx_end(xid=tx, suspend=True, fail=True) self.fail("Suspend and fail both set, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) #cleanup other = self.connect() - channel = other.channel(1) - channel.session_open() - channel.dtx_coordination_rollback(xid=tx) - channel.session_close() + session = other.session("cleanup", 1) + session.dtx_rollback(xid=tx) + session.close() other.close() @@ -365,51 +367,51 @@ class DtxTests(TestBase): """ Verifies that the correct exception is thrown when an attempt is made to end the association for a xid not previously - associated with the channel + associated with the session """ - channel = self.channel - channel.dtx_demarcation_select() + session = self.session + session.dtx_select() tx = self.xid("unknown-xid") try: - channel.dtx_demarcation_end(xid=tx) + session.dtx_end(xid=tx) self.fail("Attempted to end association with unknown xid, expected exception!") - except Closed, e: + except SessionException, e: #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming... - self.assertConnectionException(503, e.args[0]) + self.assertEquals(503, e.args[0].error_code) def test_end(self): """ Verify that the association is terminated by end and subsequent operations are non-transactional """ - channel = self.client.channel(2) - channel.session_open() - channel.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True) + guard = self.keepQueuesAlive(["tx-queue"]) + session = self.conn.session("alternate", 1) + session.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True) #publish a message under a transaction - channel.dtx_demarcation_select() + session.dtx_select() tx = self.xid("dummy") - channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"one"}, body="DtxMessage")) - channel.dtx_demarcation_end(xid=tx) + session.dtx_start(xid=tx) + session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage")) + session.dtx_end(xid=tx) #now that association with txn is ended, publish another message - channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"two"}, body="DtxMessage")) + session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage")) #check the second message is available, but not the first self.assertMessageCount(1, "tx-queue") - self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1) - msg = self.client.queue("results").get(timeout=1) - self.assertEqual("two", msg.content['message_id']) - channel.message_cancel(destination="results") - #ack the message then close the channel - msg.complete() - channel.session_close() - - channel = self.channel + self.subscribe(session, queue="tx-queue", destination="results") + msg = session.incoming("results").get(timeout=1) + self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id')) + session.message_cancel(destination="results") + #ack the message then close the session + session.message_accept(RangedSet(msg.id)) + session.close() + + session = self.session #commit the transaction and check that the first message (and #only the first message) is then delivered - channel.dtx_coordination_commit(xid=tx, one_phase=True) + session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "tx-queue") self.assertMessageId("one", "tx-queue") @@ -419,27 +421,26 @@ class DtxTests(TestBase): transaction in question has already been prepared. """ other = self.connect() - tester = other.channel(1) - tester.session_open() + tester = other.session("tester", 1) tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - tester.dtx_demarcation_select() + tester.dtx_select() tx = self.xid("dummy") - tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) - tester.dtx_demarcation_end(xid=tx) - tester.dtx_coordination_prepare(xid=tx) + tester.dtx_start(xid=tx) + tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + tester.dtx_end(xid=tx) + tester.dtx_prepare(xid=tx) failed = False try: - tester.dtx_coordination_commit(xid=tx, one_phase=True) - except Closed, e: + tester.dtx_commit(xid=tx, one_phase=True) + except SessionException, e: failed = True error = e if failed: - self.channel.dtx_coordination_rollback(xid=tx) - self.assertConnectionException(503, e.args[0]) + self.session.dtx_rollback(xid=tx) + self.assertEquals(503, error.args[0].error_code) else: - tester.session_close() + tester.close() other.close() self.fail("Invalid use of one_phase=True, expected exception!") @@ -453,99 +454,99 @@ class DtxTests(TestBase): transaction in question has already been prepared. """ other = self.connect() - tester = other.channel(1) - tester.session_open() + tester = other.session("tester", 1) tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - tester.dtx_demarcation_select() + tester.dtx_select() tx = self.xid("dummy") - tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) - tester.dtx_demarcation_end(xid=tx) + tester.dtx_start(xid=tx) + tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + tester.dtx_end(xid=tx) failed = False try: - tester.dtx_coordination_commit(xid=tx, one_phase=False) - except Closed, e: + tester.dtx_commit(xid=tx, one_phase=False) + except SessionException, e: failed = True error = e if failed: - self.channel.dtx_coordination_rollback(xid=tx) - self.assertConnectionException(503, e.args[0]) + self.session.dtx_rollback(xid=tx) + self.assertEquals(503, error.args[0].error_code) else: - tester.session_close() + tester.close() other.close() self.fail("Invalid use of one_phase=False, expected exception!") def test_implicit_end(self): """ - Test that an association is implicitly ended when the channel + Test that an association is implicitly ended when the session is closed (whether by exception or explicit client request) and the transaction in question is marked as rollback only. """ - channel1 = self.channel - channel2 = self.client.channel(2) - channel2.session_open() + session1 = self.session + session2 = self.conn.session("other", 2) #setup: - channel2.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) + session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever")) tx = self.xid("dummy") - channel2.dtx_demarcation_select() - channel2.dtx_demarcation_start(xid=tx) - channel2.message_subscribe(queue="dummy", destination="dummy", confirm_mode=1) - channel2.message_flow(destination="dummy", unit=0, value=1) - channel2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF) - self.client.queue("dummy").get(timeout=1).complete() - channel2.message_cancel(destination="dummy") - channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) - channel2.session_close() + session2.dtx_select() + session2.dtx_start(xid=tx) + session2.message_subscribe(queue="dummy", destination="dummy") + session2.message_flow(destination="dummy", unit=0, value=1) + session2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF) + msg = session2.incoming("dummy").get(timeout=1) + session2.message_accept(RangedSet(msg.id)) + session2.message_cancel(destination="dummy") + session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever")) + session2.close() - self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status) - channel1.dtx_coordination_rollback(xid=tx) + self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status) + session1.dtx_rollback(xid=tx) def test_get_timeout(self): """ Check that get-timeout returns the correct value, (and that a transaction with a timeout can complete normally) """ - channel = self.channel + session = self.session tx = self.xid("dummy") - channel.dtx_demarcation_select() - channel.dtx_demarcation_start(xid=tx) - self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout) - channel.dtx_coordination_set_timeout(xid=tx, timeout=60) - self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout) - self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).status) - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + session.dtx_select() + session.dtx_start(xid=tx) + self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout) + session.dtx_set_timeout(xid=tx, timeout=60) + self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout) + self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status) + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) def test_set_timeout(self): """ Test the timeout of a transaction results in the expected behaviour """ - #open new channel to allow self.channel to be used in checking te queue - channel = self.client.channel(2) - channel.session_open() + + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + #open new session to allow self.session to be used in checking the queue + session = self.conn.session("worker", 1) #setup: tx = self.xid("dummy") - channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True) - channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) - channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage")) - - channel.dtx_demarcation_select() - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "queue-a", "queue-b") - channel.dtx_coordination_set_timeout(xid=tx, timeout=2) + session.queue_declare(queue="queue-a", auto_delete=True) + session.queue_declare(queue="queue-b", auto_delete=True) + session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage")) + + session.dtx_select() + session.dtx_start(xid=tx) + self.swap(session, "queue-a", "queue-b") + session.dtx_set_timeout(xid=tx, timeout=2) sleep(3) #check that the work has been rolled back already self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") self.assertMessageId("timeout", "queue-a") #check the correct codes are returned when we try to complete the txn - self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).status) - self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).status) + self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status) + self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status) @@ -553,28 +554,29 @@ class DtxTests(TestBase): """ Test basic recover behaviour """ - channel = self.channel + session = self.session - channel.dtx_demarcation_select() - channel.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + session.dtx_select() + session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) prepared = [] for i in range(1, 10): tx = self.xid("tx%s" % (i)) - channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="message%s" % (i))) - channel.dtx_demarcation_end(xid=tx) + session.dtx_start(xid=tx) + session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i))) + session.dtx_end(xid=tx) if i in [2, 5, 6, 8]: - channel.dtx_coordination_prepare(xid=tx) + session.dtx_prepare(xid=tx) prepared.append(tx) else: - channel.dtx_coordination_rollback(xid=tx) + session.dtx_rollback(xid=tx) - xids = channel.dtx_coordination_recover().in_doubt + xids = session.dtx_recover().in_doubt + print "xids=%s" % xids #rollback the prepared transactions returned by recover for x in xids: - channel.dtx_coordination_rollback(xid=x) + session.dtx_rollback(xid=x) #validate against the expected list of prepared transactions actual = set(xids) @@ -585,61 +587,90 @@ class DtxTests(TestBase): missing = expected.difference(actual) extra = actual.difference(expected) for x in missing: - channel.dtx_coordination_rollback(xid=x) + session.dtx_rollback(xid=x) self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) def test_bad_resume(self): """ Test that a resume on a session not selected for use with dtx fails """ - channel = self.channel + session = self.session try: - channel.dtx_demarcation_start(resume=True) - except Closed, e: - self.assertConnectionException(503, e.args[0]) + session.dtx_start(resume=True) + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) def xid(self, txid): DtxTests.tx_counter += 1 branchqual = "v%s" % DtxTests.tx_counter - return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual - + return self.session.xid(format=0, global_id=txid, branch_id=branchqual) + def txswap(self, tx, id): - channel = self.channel + session = self.session #declare two queues: - channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True) - channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) + session.queue_declare(queue="queue-a", auto_delete=True) + session.queue_declare(queue="queue-b", auto_delete=True) + #put message with specified id on one queue: - channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage")) + dp=session.delivery_properties(routing_key="queue-a") + mp=session.message_properties(correlation_id=id) + session.message_transfer(message=Message(dp, mp, "DtxMessage")) #start the transaction: - channel.dtx_demarcation_select() - self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status) + session.dtx_select() + self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status) #'swap' the message from one queue to the other, under that transaction: - self.swap(self.channel, "queue-a", "queue-b") + self.swap(self.session, "queue-a", "queue-b") #mark the end of the transactional work: - self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status) + self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status) - def swap(self, channel, src, dest): + def swap(self, session, src, dest): #consume from src: - channel.message_subscribe(destination="temp-swap", queue=src, confirm_mode=1) - channel.message_flow(destination="temp-swap", unit=0, value=1) - channel.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF) - msg = self.client.queue("temp-swap").get(timeout=1) - channel.message_cancel(destination="temp-swap") - msg.complete(); - - #re-publish to dest - channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id']}, - body=msg.content.body)) + session.message_subscribe(destination="temp-swap", queue=src) + session.message_flow(destination="temp-swap", unit=0, value=1) + session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF) + msg = session.incoming("temp-swap").get(timeout=1) + session.message_cancel(destination="temp-swap") + session.message_accept(RangedSet(msg.id)) + #todo: also complete at this point? + + #re-publish to dest: + dp=session.delivery_properties(routing_key=dest) + mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id')) + session.message_transfer(message=Message(dp, mp, msg.body)) def assertMessageCount(self, expected, queue): - self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count) + self.assertEqual(expected, self.session.queue_query(queue=queue).message_count) def assertMessageId(self, expected, queue): - self.channel.message_subscribe(queue=queue, destination="results") - self.channel.message_flow(destination="results", unit=0, value=1) - self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF) - self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id']) - self.channel.message_cancel(destination="results") + self.session.message_subscribe(queue=queue, destination="results") + self.session.message_flow(destination="results", unit=0, value=1) + self.session.message_flow(destination="results", unit=1, value=0xFFFFFFFF) + self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id')) + self.session.message_cancel(destination="results") + + def getMessageProperty(self, msg, prop): + for h in msg.headers: + if hasattr(h, prop): return getattr(h, prop) + return None + + def keepQueuesAlive(self, names): + session = self.conn.session("nasty", 99) + for n in names: + session.queue_declare(queue=n, auto_delete=True) + session.message_subscribe(destination=n, queue=n) + return session + + def createMessage(self, session, key, id, body): + dp=session.delivery_properties(routing_key=key) + mp=session.message_properties(correlation_id=id) + session.message_transfer(message=Message(dp, mp, body)) + + def subscribe(self, session=None, **keys): + session = session or self.session + consumer_tag = keys["destination"] + session.message_subscribe(**keys) + session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) |