diff options
| author | Alan Conway <aconway@apache.org> | 2013-08-01 20:27:26 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-08-01 20:27:26 +0000 |
| commit | e6598e9f95d55b80f96dbcb1e12bc1fc38c66af1 (patch) | |
| tree | 7179cb6fa40a59d1390f295a613de64cc242814a /cpp/src/tests/test_store.cpp | |
| parent | 0ffcd71ac9c9f3742aae6e251eafe031068bda31 (diff) | |
| download | qpid-python-e6598e9f95d55b80f96dbcb1e12bc1fc38c66af1.tar.gz | |
QPID-4327: HA TX transactions: basic replication.
On primary a PrimaryTxObserver observes a transaction's TxBuffer and generates
transaction events on a tx-replication-queue. On the backup a TxReplicator
receives the events and constructs a TxBuffer equivalent to the one in the
primary.
Unfinished:
- Primary does not wait for backups to prepare() before committing.
- All connected backups are assumed to be in the transaction, there are race
conditions around brokers joining/leavinv where this assumption is invalid.
- Need more tests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509423 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/test_store.cpp')
| -rw-r--r-- | cpp/src/tests/test_store.cpp | 101 |
1 files changed, 93 insertions, 8 deletions
diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp index eac4deda2d..fc44889f33 100644 --- a/cpp/src/tests/test_store.cpp +++ b/cpp/src/tests/test_store.cpp @@ -40,14 +40,19 @@ #include "qpid/sys/Thread.h" #include "qpid/Plugin.h" #include "qpid/Options.h" +#include "qpid/RefCounted.h" +#include "qpid/Msg.h" #include <boost/cast.hpp> #include <boost/lexical_cast.hpp> #include <memory> +#include <ostream> #include <fstream> +#include <sstream> -using namespace qpid; -using namespace broker; using namespace std; +using namespace boost; +using namespace qpid; +using namespace qpid::broker; using namespace qpid::sys; namespace qpid { @@ -57,11 +62,13 @@ struct TestStoreOptions : public Options { string name; string dump; + string events; TestStoreOptions() : Options("Test Store Options") { addOptions() ("test-store-name", optValue(name, "NAME"), "Name of test store instance.") ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.") + ("test-store-events", optValue(events, "FILE"), "File to log events, 1 line per event.") ; } }; @@ -82,24 +89,74 @@ class TestStore : public NullMessageStore { TestStore(const TestStoreOptions& opts, Broker& broker_) : options(opts), name(opts.name), broker(broker_) { - QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump); - if (!options.dump.empty()) + QPID_LOG(info, "TestStore name=" << name + << " dump=" << options.dump + << " events=" << options.events); + + if (!options.dump.empty()) dump.reset(new ofstream(options.dump.c_str())); + if (!options.events.empty()) + events.reset(new ofstream(options.events.c_str())); } ~TestStore() { for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1)); } - virtual bool isNull() const { return false; } - - void enqueue(TransactionContext* , + // Dummy transaction context. + struct TxContext : public TPCTransactionContext { + static int nextId; + string id; + TxContext() : id(lexical_cast<string>(nextId++)) {} + TxContext(string xid) : id(xid) {} + }; + + static string getId(const TransactionContext& tx) { + const TxContext* tc = dynamic_cast<const TxContext*>(&tx); + assert(tc); + return tc->id; + } + + + bool isNull() const { return false; } + + void log(const string& msg) { + QPID_LOG(info, "test_store: " << msg); + if (events.get()) *events << msg << endl << std::flush; + } + + auto_ptr<TransactionContext> begin() { + auto_ptr<TxContext> tx(new TxContext()); + log(Msg() << "<begin tx " << tx->id << ">"); + return auto_ptr<TransactionContext>(tx); + } + + auto_ptr<TPCTransactionContext> begin(const std::string& xid) { + auto_ptr<TxContext> tx(new TxContext(xid)); + log(Msg() << "<begin tx " << tx->id << ">"); + return auto_ptr<TPCTransactionContext>(tx); + } + + string getContent(const intrusive_ptr<PersistableMessage>& msg) { + intrusive_ptr<broker::Message::Encoding> enc( + dynamic_pointer_cast<broker::Message::Encoding>(msg)); + return enc->getContent(); + } + + void enqueue(TransactionContext* tx, const boost::intrusive_ptr<PersistableMessage>& pmsg, - const PersistableQueue& ) + const PersistableQueue& queue) { + QPID_LOG(debug, "TestStore enqueue " << queue.getName()); qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get()); assert(msg); + ostringstream o; + o << "<enqueue " << queue.getName() << " " << getContent(msg); + if (tx) o << " tx=" << getId(*tx); + o << ">"; + log(o.str()); + // Dump the message if there is a dump file. if (dump.get()) { msg->getFrames().getMethod()->print(*dump); @@ -144,6 +201,31 @@ class TestStore : public NullMessageStore { msg->enqueueComplete(); } + void dequeue(TransactionContext* tx, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) + { + QPID_LOG(debug, "TestStore dequeue " << queue.getName()); + ostringstream o; + o<< "<dequeue " << queue.getName() << " " << getContent(msg); + if (tx) o << " tx=" << getId(*tx); + o << ">"; + log(o.str()); + } + + void prepare(TPCTransactionContext& txn) { + log(Msg() << "<prepare tx=" << getId(txn) << ">"); + } + + void commit(TransactionContext& txn) { + log(Msg() << "<commit tx=" << getId(txn) << ">"); + } + + void abort(TransactionContext& txn) { + log(Msg() << "<abort tx=" << getId(txn) << ">"); + } + + private: static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; TestStoreOptions options; @@ -151,8 +233,11 @@ class TestStore : public NullMessageStore { Broker& broker; vector<Thread> threads; std::auto_ptr<ofstream> dump; + std::auto_ptr<ofstream> events; }; +int TestStore::TxContext::nextId(1); + const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; const string TestStore::EXCEPTION = "exception"; const string TestStore::EXIT_PROCESS = "exit_process"; |
