diff options
author | Alan Conway <aconway@apache.org> | 2013-08-01 20:26:58 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-08-01 20:26:58 +0000 |
commit | 72cd95abd9ab2f59c164d60e8f5b0c43cb0b2c0c (patch) | |
tree | 5d5412d110e34b2d62981db162da6a22f0e938a0 | |
parent | 5683a5220e8bd4f0dc1cede8f6d430c1d670f71b (diff) | |
download | qpid-python-72cd95abd9ab2f59c164d60e8f5b0c43cb0b2c0c.tar.gz |
QPID-4327: Added TransactionObserver interface.
Added TransactionObserver interface, called at each point in a transaction's
lifecycle. Currently only a single observer can be associated with a
transaction.
Added startTx, startDtx to BrokerObserver so plugins can observe transactions
starting and set a TransactionObserver.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509421 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerObserver.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerObservers.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxAck.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveredDequeue.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveredEnqueue.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TransactionObserver.h | 82 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxBuffer.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxBuffer.h | 131 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxOp.h | 20 | ||||
-rw-r--r-- | cpp/src/tests/CMakeLists.txt | 1 | ||||
-rw-r--r-- | cpp/src/tests/TransactionObserverTest.cpp | 143 | ||||
-rw-r--r-- | cpp/src/tests/TxMocks.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/brokertest.py | 4 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 45 | ||||
-rw-r--r-- | cpp/src/tests/test_tools.h | 1 |
22 files changed, 412 insertions, 74 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 3e2cd87b77..d420e28d95 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -38,6 +38,7 @@ #include "qpid/broker/PersistableObject.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/QueueSettings.h" +#include "qpid/broker/TransactionObserver.h" #include "qpid/broker/MessageGroupManager.h" #include "qmf/org/apache/qpid/broker/Package.h" diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 79c74a1c66..f0b0c83f61 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -65,6 +65,7 @@ class AclModule; class ExpiryPolicy; class Message; struct QueueSettings; + static const uint16_t DEFAULT_PORT=5672; struct NoSuchTransportException : qpid::Exception diff --git a/cpp/src/qpid/broker/BrokerObserver.h b/cpp/src/qpid/broker/BrokerObserver.h index 8b503309d2..2249d33e64 100644 --- a/cpp/src/qpid/broker/BrokerObserver.h +++ b/cpp/src/qpid/broker/BrokerObserver.h @@ -34,7 +34,8 @@ class FieldTable; namespace broker { class Queue; class Exchange; - +class TxBuffer; +class DtxBuffer; /** * Observer for changes to configuration (aka wiring) @@ -59,6 +60,8 @@ class BrokerObserver const boost::shared_ptr<Queue>& , const std::string& /*key*/, const framing::FieldTable& /*args*/) {} + virtual void startTx(const boost::shared_ptr<TxBuffer>&) {} + virtual void startDtx(const boost::shared_ptr<DtxBuffer>&) {} }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/BrokerObservers.h b/cpp/src/qpid/broker/BrokerObservers.h index 9624c25421..5ba7bac890 100644 --- a/cpp/src/qpid/broker/BrokerObservers.h +++ b/cpp/src/qpid/broker/BrokerObservers.h @@ -64,6 +64,12 @@ class BrokerObservers : public BrokerObserver, each(boost::bind( &BrokerObserver::unbind, _1, exchange, queue, key, args)); } + void startTx(const boost::shared_ptr<TxBuffer>& tx) { + each(boost::bind(&BrokerObserver::startTx, _1, tx)); + } + void startDtx(const boost::shared_ptr<DtxBuffer>& dtx) { + each(boost::bind(&BrokerObserver::startDtx, _1, dtx)); + } }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h index 10d63f5b0c..5775edf5de 100644 --- a/cpp/src/qpid/broker/DtxAck.h +++ b/cpp/src/qpid/broker/DtxAck.h @@ -39,6 +39,9 @@ class DtxAck : public TxOp{ virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); + // TODO aconway 2013-07-08: + virtual void callObserver(const boost::shared_ptr<TransactionObserver>&) {} + virtual ~DtxAck(){} const DeliveryRecords& getPending() const { return pending; } }; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 339cdb7f9d..49aff3ea97 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -34,6 +34,7 @@ #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/Selector.h" +#include "qpid/broker/TransactionObserver.h" //TODO: get rid of this #include "qpid/broker/amqp_0_10/MessageTransfer.h" @@ -167,6 +168,12 @@ void Queue::TxPublish::rollback() throw() } } +void Queue::TxPublish::callObserver( + const boost::shared_ptr<TransactionObserver>& observer) +{ + observer->enqueue(queue, message); +} + Queue::Queue(const string& _name, const QueueSettings& _settings, MessageStore* const _store, Manageable* parent, diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index e66a2171e9..a832b95feb 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -118,6 +118,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool prepare(TransactionContext* ctxt) throw(); void commit() throw(); void rollback() throw(); + void callObserver(const boost::shared_ptr<TransactionObserver>&); }; /** diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h index 87f768eefd..b85919975c 100644 --- a/cpp/src/qpid/broker/RecoveredDequeue.h +++ b/cpp/src/qpid/broker/RecoveredDequeue.h @@ -41,6 +41,8 @@ namespace qpid { virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); + // TODO aconway 2013-07-08: revisit + virtual void callObserver(const boost::shared_ptr<TransactionObserver>&) {} virtual ~RecoveredDequeue(){} boost::shared_ptr<Queue> getQueue() const { return queue; } diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h index d1f8e1106c..01c350af92 100644 --- a/cpp/src/qpid/broker/RecoveredEnqueue.h +++ b/cpp/src/qpid/broker/RecoveredEnqueue.h @@ -41,6 +41,8 @@ class RecoveredEnqueue : public TxOp{ virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); + // TODO aconway 2013-07-08: revisit + virtual void callObserver(const boost::shared_ptr<TransactionObserver>&) {} virtual ~RecoveredEnqueue(){} boost::shared_ptr<Queue> getQueue() const { return queue; } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 4570e3bd87..c1a68923d7 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/Selector.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/SessionOutputException.h" +#include "qpid/broker/TransactionObserver.h" #include "qpid/broker/TxAccept.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/reply_exceptions.h" @@ -65,6 +66,7 @@ namespace broker { using namespace std; using boost::intrusive_ptr; +using boost::shared_ptr; using boost::bind; using namespace qpid::broker; using namespace qpid::framing; @@ -165,13 +167,13 @@ bool SemanticState::cancel(const string& tag) void SemanticState::startTx() { txBuffer = TxBuffer::shared_ptr(new TxBuffer()); + session.getBroker().getBrokerObservers().startTx(txBuffer); } void SemanticState::commit(MessageStore* const store) { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); - TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked))); txBuffer->enlist(txAck); if (txBuffer->commitLocal(store)) { @@ -185,7 +187,6 @@ void SemanticState::rollback() { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); - txBuffer->rollback(); accumulatedAck.clear(); } @@ -202,6 +203,7 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) } dtxBuffer.reset(new DtxBuffer(xid)); txBuffer = dtxBuffer; + session.getBroker().getBrokerObservers().startDtx(dtxBuffer); if (join) { mgr.join(xid, dtxBuffer); } else { @@ -767,7 +769,6 @@ void SemanticState::accepted(const SequenceSet& commands) { TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); dtxBuffer->enlist(txAck); - //mark the relevant messages as 'ended' in unacked //if the messages are already completed, they can be //removed from the record diff --git a/cpp/src/qpid/broker/TransactionObserver.h b/cpp/src/qpid/broker/TransactionObserver.h new file mode 100644 index 0000000000..5333d7b8d6 --- /dev/null +++ b/cpp/src/qpid/broker/TransactionObserver.h @@ -0,0 +1,82 @@ +#ifndef QPID_BROKER_TRANSACTIONOBSERVER_H +#define QPID_BROKER_TRANSACTIONOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "DeliveryRecord.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { + +namespace framing { +class SequenceSet; +} + +namespace broker { +class Queue; +class Message; +class TxBuffer; +class DtxBuffer; + +/** + * Interface for intercepting events in a transaction. + */ +class TransactionObserver { + public: + typedef boost::shared_ptr<Queue> QueuePtr; + typedef framing::SequenceNumber SequenceNumber; + + virtual ~TransactionObserver() {} + + /** Message enqueued in the transaction. */ + virtual void enqueue(const QueuePtr&, const Message&) = 0; + + /** + * Message is dequeued in the transaction (it was accepted by a consumer.) + *@param queuePosition: Sequence number of message on queue. + *@param replicationId: Replication sequence number, may be different. + */ + virtual void dequeue(const QueuePtr& queue, + SequenceNumber queueSeq, + SequenceNumber replicationSeq) = 0; + + virtual bool prepare() = 0; + virtual void commit() = 0; + virtual void rollback() = 0; +}; + +/** + * No-op TransactionObserver. + */ +class NullTransactionObserver : public TransactionObserver { + public: + void enqueue(const QueuePtr&, const Message&) {} + void dequeue(const QueuePtr&, SequenceNumber, SequenceNumber) {} + bool prepare() { return true; } + void commit() {} + void rollback() {} +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_TRANSACTIONOBSERVER_H*/ diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp index fc0c96f467..0343cc00ae 100644 --- a/cpp/src/qpid/broker/TxAccept.cpp +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -19,8 +19,10 @@ * */ #include "qpid/broker/TxAccept.h" +#include "qpid/broker/TransactionObserver.h" #include "qpid/log/Statement.h" + using std::bind1st; using std::bind2nd; using std::mem_fun_ref; @@ -76,3 +78,13 @@ void TxAccept::commit() throw() } void TxAccept::rollback() throw() {} + +namespace { +void callObserverDR(boost::shared_ptr<TransactionObserver> observer, DeliveryRecord& dr) { + observer->dequeue(dr.getQueue(), dr.getMessageId(), dr.getReplicationId()); +} +} // namespace + +void TxAccept::callObserver(const ObserverPtr& observer) { + each(boost::bind(&callObserverDR, observer, _1)); +} diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h index 3a8b663039..7b01fc26dc 100644 --- a/cpp/src/qpid/broker/TxAccept.h +++ b/cpp/src/qpid/broker/TxAccept.h @@ -37,6 +37,7 @@ namespace broker { */ class TxAccept : public TxOp { typedef std::vector<AckRange> AckRanges; + typedef boost::shared_ptr<TransactionObserver> ObserverPtr; void each(boost::function<void(DeliveryRecord&)>); @@ -54,6 +55,7 @@ class TxAccept : public TxOp { virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); + virtual void callObserver(const ObserverPtr&); virtual ~TxAccept(){} }; } diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp index 7663cc525f..a8df4fb214 100644 --- a/cpp/src/qpid/broker/TxBuffer.cpp +++ b/cpp/src/qpid/broker/TxBuffer.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/TxBuffer.h" +#include "qpid/broker/TransactionObserver.h" #include "qpid/log/Statement.h" #include <boost/mem_fn.hpp> @@ -26,8 +27,11 @@ using boost::mem_fn; using namespace qpid::broker; +TxBuffer::TxBuffer() : observer(new NullTransactionObserver) {} + bool TxBuffer::prepare(TransactionContext* const ctxt) { + if (!observer->prepare()) return false; for(op_iterator i = ops.begin(); i != ops.end(); i++){ if(!(*i)->prepare(ctxt)){ return false; @@ -38,18 +42,21 @@ bool TxBuffer::prepare(TransactionContext* const ctxt) void TxBuffer::commit() { + observer->commit(); std::for_each(ops.begin(), ops.end(), mem_fn(&TxOp::commit)); ops.clear(); } void TxBuffer::rollback() { + observer->rollback(); std::for_each(ops.begin(), ops.end(), mem_fn(&TxOp::rollback)); ops.clear(); } void TxBuffer::enlist(TxOp::shared_ptr op) { + op->callObserver(observer); ops.push_back(op); } diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h index 22e2f06be1..47de080fad 100644 --- a/cpp/src/qpid/broker/TxBuffer.h +++ b/cpp/src/qpid/broker/TxBuffer.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,21 +34,21 @@ * transaction. This work can be committed or rolled back. Committing * is a two-stage process: first all the operations should be * prepared, then if that succeeds they can be committed. - * + * * In the 2pc case, a successful prepare may be followed by either a * commit or a rollback. - * + * * Atomicity of prepare is ensured by using a lower level * transactional facility. This saves explicitly rolling back all the * successfully prepared ops when one of them fails. i.e. we do not * use 2pc internally, we instead ensure that prepare is atomic at a * lower level. This makes individual prepare operations easier to * code. - * + * * Transactions on a messaging broker effect three types of 'action': * (1) updates to persistent storage (2) updates to transient storage * or cached data (3) network writes. - * + * * Of these, (1) should always occur atomically during prepare to * ensure that if the broker crashes while a transaction is being * completed the persistent state (which is all that then remains) is @@ -58,59 +58,74 @@ * TransactionalStore in use. */ namespace qpid { - namespace broker { - class TxBuffer{ - typedef std::vector<TxOp::shared_ptr>::iterator op_iterator; - std::vector<TxOp::shared_ptr> ops; - protected: - - public: - typedef boost::shared_ptr<TxBuffer> shared_ptr; - /** - * Adds an operation to the transaction. - */ - QPID_BROKER_EXTERN void enlist(TxOp::shared_ptr op); - - /** - * Requests that all ops are prepared. This should - * primarily involve making sure that a persistent record - * of the operations is stored where necessary. - * - * Once prepared, a transaction can be committed (or in - * the 2pc case, rolled back). - * - * @returns true if all the operations prepared - * successfully, false if not. - */ - QPID_BROKER_EXTERN bool prepare(TransactionContext* const ctxt); - - /** - * Signals that the ops all prepared successfully and can - * now commit, i.e. the operation can now be fully carried - * out. - * - * Should only be called after a call to prepare() returns - * true. - */ - QPID_BROKER_EXTERN void commit(); - - /** - * Signals that all ops can be rolled back. - * - * Should only be called either after a call to prepare() - * returns true (2pc) or instead of a prepare call - * ('server-local') - */ - QPID_BROKER_EXTERN void rollback(); - - /** - * Helper method for managing the process of server local - * commit - */ - QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store); - }; + +namespace broker { +class TransactionObserver; + +class TxBuffer { + private: + typedef std::vector<TxOp::shared_ptr>::iterator op_iterator; + std::vector<TxOp::shared_ptr> ops; + boost::shared_ptr<TransactionObserver> observer; + + public: + typedef boost::shared_ptr<TxBuffer> shared_ptr; + + TxBuffer(); + + /** + * Adds an operation to the transaction. + */ + QPID_BROKER_EXTERN void enlist(TxOp::shared_ptr op); + + /** + * Requests that all ops are prepared. This should + * primarily involve making sure that a persistent record + * of the operations is stored where necessary. + * + * Once prepared, a transaction can be committed (or in + * the 2pc case, rolled back). + * + * @returns true if all the operations prepared + * successfully, false if not. + */ + QPID_BROKER_EXTERN bool prepare(TransactionContext* const ctxt); + + /** + * Signals that the ops all prepared successfully and can + * now commit, i.e. the operation can now be fully carried + * out. + * + * Should only be called after a call to prepare() returns + * true. + */ + QPID_BROKER_EXTERN void commit(); + + /** + * Signals that all ops can be rolled back. + * + * Should only be called either after a call to prepare() + * returns true (2pc) or instead of a prepare call + * ('server-local') + */ + QPID_BROKER_EXTERN void rollback(); + + /** + * Helper method for managing the process of server local + * commit + */ + QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store); + + + QPID_BROKER_EXTERN void setObserver(boost::shared_ptr<TransactionObserver> o) { + observer = o; + } + + QPID_BROKER_EXTERN boost::shared_ptr<TransactionObserver> getObserver() const { + return observer; } -} +}; +}} // namespace qpid::broker #endif diff --git a/cpp/src/qpid/broker/TxOp.h b/cpp/src/qpid/broker/TxOp.h index 775efc92f7..00303eb27c 100644 --- a/cpp/src/qpid/broker/TxOp.h +++ b/cpp/src/qpid/broker/TxOp.h @@ -25,17 +25,19 @@ #include <boost/shared_ptr.hpp> namespace qpid { - namespace broker { +namespace broker { +class TransactionObserver; - class TxOp{ - public: - typedef boost::shared_ptr<TxOp> shared_ptr; +class TxOp{ + public: + typedef boost::shared_ptr<TxOp> shared_ptr; - virtual bool prepare(TransactionContext*) throw() = 0; - virtual void commit() throw() = 0; - virtual void rollback() throw() = 0; - virtual ~TxOp(){} - }; + virtual bool prepare(TransactionContext*) throw() = 0; + virtual void commit() throw() = 0; + virtual void rollback() throw() = 0; + virtual void callObserver(const boost::shared_ptr<TransactionObserver>&) = 0; + virtual ~TxOp(){} +}; }} // namespace qpid::broker diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index 646374d692..a1d29c23a3 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -146,6 +146,7 @@ set(all_unit_tests TimerTest TopicExchangeTest TxBufferTest + TransactionObserverTest Url Uuid Variant diff --git a/cpp/src/tests/TransactionObserverTest.cpp b/cpp/src/tests/TransactionObserverTest.cpp new file mode 100644 index 0000000000..c284417e25 --- /dev/null +++ b/cpp/src/tests/TransactionObserverTest.cpp @@ -0,0 +1,143 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "unit_test.h" +#include "test_tools.h" +#include "MessagingFixture.h" +#include "qpid/broker/BrokerObserver.h" +#include "qpid/broker/TransactionObserver.h" +#include "qpid/broker/TxBuffer.h" +#include "qpid/broker/Queue.h" +#include "qpid/ha/types.h" + +#include <boost/bind.hpp> +#include <boost/function.hpp> +#include <boost/make_shared.hpp> +#include <boost/lexical_cast.hpp> +#include <iostream> +#include <vector> + +namespace qpid { +namespace tests { + +using framing::SequenceSet; +using messaging::Message; + +using namespace boost::assign; +using namespace boost; +using namespace broker; +using namespace std; +using namespace messaging; +using namespace types; + +QPID_AUTO_TEST_SUITE(TransactionalObserverTest) + +Message msg(string content) { return Message(content); } + +struct MockTransactionObserver : public TransactionObserver { + bool prep; + vector<string> events; + + MockTransactionObserver(bool prep_=true) : prep(prep_) {} + + void record(const string& e) { events.push_back(e); } + + void enqueue(const shared_ptr<Queue>& q, const broker::Message& m) { + record("enqueue "+q->getName()+" "+m.getContent()); + } + void dequeue(const Queue::shared_ptr& q, SequenceNumber p, SequenceNumber r) { + record("dequeue "+q->getName()+" "+ + lexical_cast<string>(p)+" "+lexical_cast<string>(r)); + } + bool prepare() { record("prepare"); return prep; } + void commit() { record("commit"); } + void rollback() {record("rollback"); } +}; + +struct MockBrokerObserver : public BrokerObserver { + bool prep; + shared_ptr<MockTransactionObserver> tx; + + MockBrokerObserver(bool prep_=true) : prep(prep_) {} + + void startTx(const shared_ptr<TxBuffer>& buffer) { + tx = make_shared<MockTransactionObserver>(prep); + buffer->setObserver(tx); + } +}; + +Session simpleTxTransaction(MessagingFixture& fix) { + fix.session.createSender("q1;{create:always}").send(msg("foo")); // Not in TX + // Transaction with 1 enqueue and 1 dequeue. + Session txSession = fix.connection.createTransactionalSession(); + BOOST_CHECK_EQUAL("foo", txSession.createReceiver("q1").fetch().getContent()); + txSession.createSender("q2;{create:always}").send(msg("bar")); + return txSession; +} + +QPID_AUTO_TEST_CASE(tesTxtCommit) { + MessagingFixture fix; + shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver); + fix.broker->getBrokerObservers().add(brokerObserver); + Session txSession = simpleTxTransaction(fix); + txSession.commit(); + // Note on ordering: observers see enqueues as they happen, but dequeues just + // before prepare. + BOOST_CHECK_EQUAL( + list_of<string>("enqueue q2 bar")("dequeue q1 1 0")("prepare")("commit"), + brokerObserver->tx->events + ); +} + +QPID_AUTO_TEST_CASE(testTxFail) { + MessagingFixture fix; + shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver(false)); + fix.broker->getBrokerObservers().add(brokerObserver); + Session txSession = simpleTxTransaction(fix); + try { + txSession.commit(); + BOOST_FAIL("Expected exception"); + } catch(...) {} + + BOOST_CHECK_EQUAL( + list_of<string>("enqueue q2 bar")("dequeue q1 1 0")("prepare")("rollback"), + brokerObserver->tx->events + ); +} + +QPID_AUTO_TEST_CASE(testTxRollback) { + MessagingFixture fix; + shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver(false)); + fix.broker->getBrokerObservers().add(brokerObserver); + Session txSession = simpleTxTransaction(fix); + txSession.rollback(); + // Note: The dequeue does not appear here. This is because TxAccepts + // (i.e. dequeues) are not enlisted until SemanticState::commit and are + // never enlisted if the transaction is rolled back. + BOOST_CHECK_EQUAL( + list_of<string>("enqueue q2 bar")("rollback"), + brokerObserver->tx->events + ); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests diff --git a/cpp/src/tests/TxMocks.h b/cpp/src/tests/TxMocks.h index bf21104f70..8b54e7484b 100644 --- a/cpp/src/tests/TxMocks.h +++ b/cpp/src/tests/TxMocks.h @@ -103,6 +103,9 @@ public: if(!debugName.empty()) std::cout << std::endl << "MockTxOp[" << debugName << "]::rollback()" << std::endl; actual.push_back(ROLLBACK); } + + void callObserver(const boost::shared_ptr<TransactionObserver>&) {} + MockTxOp& expectPrepare(){ expected.push_back(PREPARE); return *this; diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index 03defddb58..286beb0258 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -78,7 +78,7 @@ def error_line(filename, n=1): except: return "" return ":\n" + "".join(result) -def retry(function, timeout=10, delay=.01, max_delay=1): +def retry(function, timeout=10, delay=.001, max_delay=1): """Call function until it returns a true value or timeout expires. Double the delay for each retry up to max_delay. Returns what function returns if true, None if timeout expires.""" @@ -398,7 +398,7 @@ def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) assert expect_contents == actual_contents, msg -def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg="browse failed"): +def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.001, transform=lambda m:m.content, msg="browse failed"): """Wait up to timeout for contents of queue to match expect_contents""" test = lambda: browse(session, queue, 0, transform=transform) == expect_contents retry(test, timeout, delay) diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 293712fe80..de5dfb4b10 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -1287,6 +1287,51 @@ class StoreTests(BrokerTest): cluster[0].assert_browse("q2", ["hello", "end"]) cluster[1].assert_browse_backup("q2", ["hello", "end"]) +class TransactionTests(BrokerTest): + + def tx_simple_setup(self, broker): + """Start a transaction: receive 'foo' from 'a' and send 'bar' to 'b'""" + c = broker.connect() + c.session().sender("a;{create:always}").send("foo") + tx = c.session(transactional=True) + self.assertEqual("foo", tx.receiver("a").fetch(1).content) + tx.acknowledge(); + tx.sender("b;{create:always}").send("bar") + return tx + + def test_tx_simple_commit(self): + cluster = HaCluster(self, 2, args=["--log-enable=trace+:ha::"]) + tx = self.tx_simple_setup(cluster[0]) + tx.commit() + for b in cluster: + b.assert_browse_backup("a", [], msg=b) + b.assert_browse_backup("b", ["bar"], msg=b) + + def test_tx_simple_rollback(self): + cluster = HaCluster(self, 2) + tx = self.tx_simple_setup(cluster[0]) + tx.rollback() + for b in cluster: + b.assert_browse_backup("a", ["foo"], msg=b) + b.assert_browse_backup("b", [], msg=b) + + def test_tx_simple_failover(self): + cluster = HaCluster(self, 2) + tx = self.tx_simple_setup(cluster[0]) + cluster.bounce(0) # Should cause roll-back + for b in cluster: + b.assert_browse_backup("a", ["foo"], msg=b) + b.assert_browse_backup("b", [], msg=b) + + def test_tx_simple_join(self): + cluster = HaCluster(self, 2) + tx = self.tx_simple_setup(cluster[0]) + cluster.bounce(1) # Should catch up with tx + tx.commit() + for b in cluster: + b.assert_browse_backup("a", [], msg=b) + b.assert_browse_backup("b", ["bar"], msg=b) + if __name__ == "__main__": outdir = "ha_tests.tmp" shutil.rmtree(outdir, True) diff --git a/cpp/src/tests/test_tools.h b/cpp/src/tests/test_tools.h index de672f938a..7950a36913 100644 --- a/cpp/src/tests/test_tools.h +++ b/cpp/src/tests/test_tools.h @@ -23,7 +23,6 @@ #include <limits.h> // Include before boost/test headers. #include <boost/test/test_tools.hpp> #include <boost/assign/list_of.hpp> -#include <boost/assign/list_of.hpp> #include <vector> #include <set> #include <ostream> |