diff options
author | Alan Conway <aconway@apache.org> | 2014-08-22 14:13:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-08-22 14:13:09 +0000 |
commit | dee3a10e026896da06ff178335c0b3f86a7e6604 (patch) | |
tree | a38eb717a568285748cb8c7176d154a6d44615c8 /qpid/cpp/src/tests/test_store.cpp | |
parent | 3f16bac6a8e6ae88fe6620a21d4af690fc58cd0a (diff) | |
download | qpid-python-dee3a10e026896da06ff178335c0b3f86a7e6604.tar.gz |
NO-JIRA: Clean up test_store.cpp async functionality.
Clean up test_store.cpp to allow control over async completion of messsages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619814 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/test_store.cpp')
-rw-r--r-- | qpid/cpp/src/tests/test_store.cpp | 199 |
1 files changed, 135 insertions, 64 deletions
diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp index e299161c68..ee04dddd6a 100644 --- a/qpid/cpp/src/tests/test_store.cpp +++ b/qpid/cpp/src/tests/test_store.cpp @@ -21,15 +21,18 @@ /**@file - * Plug-in message store for tests. * - * Add functionality as required, build up a comprehensive set of - * features to support persistent behavior tests. + * Message store for tests, with two roles: + * + * 1. Dump store events to a text file that can be compared to expected event + * sequence + * + * 2. Emulate hard-to-recreate conditions such as asynchronous completion delays + * or store errors. + * + * Messages with specially formatted contents trigger various actions. + * See class Action below for available actions and message format.. * - * Current features special "action" messages can: - * - raise exception from enqueue. - * - force host process to exit. - * - do async completion after a delay. */ #include "qpid/broker/NullMessageStore.h" @@ -58,12 +61,91 @@ using namespace qpid::sys; namespace qpid { namespace tests { +namespace { + +bool startswith(const string& s, const string& prefix) { + return s.compare(0, prefix.size(), prefix) == 0; +} + +void split(const string& s, vector<string>& result, const char* sep=" \t\n") { + size_t i = s.find_first_not_of(sep); + while (i != string::npos) { + size_t j = s.find_first_of(sep, i); + if (j == string::npos) { + result.push_back(s.substr(i)); + break; + } + result.push_back(s.substr(i, j-i)); + i = s.find_first_not_of(sep, j); + } +} + +} + +/** + * Action message format is TEST_STORE_DO [<name>...]:<action> [<args>...] + * + * A list of store <name> can be included so the action only executes on one of + * the named stores. This is useful in a cluster setting where the same message + * is replicated to all broker's stores but should only trigger an action on + * specific ones. If no <name> is given, execute on any store. + * + */ +class Action { + public: + /** Available actions */ + enum ActionEnum { + NONE, + THROW, ///< Throw an exception from enqueue + DELAY, ///< Delay completion, takes an ID string to complete. + COMPLETE, ///< Complete a previously delayed message, takes ID + + N_ACTIONS // Count of actions, must be last + }; + + string name; + ActionEnum index; + vector<string> storeNames, args; + + Action(const string& s) { + index = NONE; + if (!startswith(s, PREFIX)) return; + size_t colon = s.find_first_of(":"); + if (colon == string::npos) return; + assert(colon >= PREFIX.size()); + split(s.substr(PREFIX.size(), colon-PREFIX.size()), storeNames); + split(s.substr(colon+1), args); + if (args.empty()) return; + for (size_t i = 0; i < N_ACTIONS; ++i) { + if (args[0] == ACTION_NAMES[i]) { + name = args[0]; + index = ActionEnum(i); + args.erase(args.begin()); + break; + } + } + } + + bool executeIn(const string& storeName) { + return storeNames.empty() || + find(storeNames.begin(), storeNames.end(), storeName) !=storeNames.end(); + } + + private: + static string PREFIX; + static const char* ACTION_NAMES[N_ACTIONS]; +}; + +string Action::PREFIX("TEST_STORE_DO"); + +const char* Action::ACTION_NAMES[] = { "none", "throw", "delay", "complete" }; + + struct TestStoreOptions : public Options { string name; string dump; string events; - vector<string> throwMsg; // Throw exception if message content matches. TestStoreOptions() : Options("Test Store Options") { addOptions() @@ -73,22 +155,10 @@ struct TestStoreOptions : public Options { "File to dump enqueued messages.") ("test-store-events", optValue(events, "FILE"), "File to log events, 1 line per event.") - ("test-store-throw", optValue(throwMsg, "CONTENT"), - "Throw exception if message content matches.") ; } }; -struct Completer : public Runnable { - boost::intrusive_ptr<PersistableMessage> message; - int usecs; - Completer(boost::intrusive_ptr<PersistableMessage> m, int u) : message(m), usecs(u) {} - void run() { - qpid::sys::usleep(usecs); - message->enqueueComplete(); - delete this; - } -}; class TestStore : public NullMessageStore { public: @@ -97,8 +167,7 @@ class TestStore : public NullMessageStore { { QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump - << " events=" << options.events - << " throw messages =" << options.throwMsg.size()); + << " events=" << options.events) if (!options.dump.empty()) dump.reset(new ofstream(options.dump.c_str())); @@ -154,7 +223,6 @@ class TestStore : public NullMessageStore { const boost::intrusive_ptr<PersistableMessage>& pmsg, const PersistableQueue& queue) { - QPID_LOG(debug, "TestStore enqueue " << queue.getName()); qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get()); assert(msg); @@ -173,44 +241,50 @@ class TestStore : public NullMessageStore { *dump << endl << " "; *dump << msg->getFrames().getContentSize() << endl; } - - // Check the message for special instructions. + string logPrefix = "TestStore "+name+": "; + // Check the message for special instructions for this store. string data = msg->getFrames().getContent(); - size_t i = string::npos; - size_t j = string::npos; - const vector<string>& throwMsg(options.throwMsg); - if (find(throwMsg.begin(), throwMsg.end(), data) != throwMsg.end()) { - throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data)); - } - else if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0 - && (i = data.find(name+"[")) != string::npos - && (j = data.find("]", i)) != string::npos) - { - size_t start = i+name.size()+1; - string action = data.substr(start, j-start); - - if (action == EXCEPTION) { - throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data)); - } - else if (action == EXIT_PROCESS) { - // FIXME aconway 2009-04-10: this is a dubious way to - // close the process at best, it can cause assertions or seg faults - // rather than clean exit. - QPID_LOG(critical, "TestStore " << name << " forcing process exit for: " << data); - exit(0); - } - else if (strncmp(action.c_str(), ASYNC.c_str(), strlen(ASYNC.c_str())) == 0) { - std::string delayStr(action.substr(ASYNC.size())); - int delay = boost::lexical_cast<int>(delayStr); - threads.push_back(Thread(*new Completer(msg, delay))); - } - else { - QPID_LOG(error, "TestStore " << name << " unknown action " << action); - msg->enqueueComplete(); + Action action(data); + bool doComplete = true; + if (action.index && action.executeIn(name)) { + switch (action.index) { + + case Action::THROW: + throw Exception(logPrefix + data); + break; + + case Action::DELAY: { + if (action.args.empty()) { + QPID_LOG(error, logPrefix << "async-id needs argument: " << data); + break; + } + asyncIds[action.args[0]] = msg; + QPID_LOG(debug, logPrefix << "delayed completion " << action.args[0]); + doComplete = false; + break; + } + + case Action::COMPLETE: { + if (action.args.empty()) { + QPID_LOG(error, logPrefix << "complete-id needs argument: " << data); + break; + } + AsyncIds::iterator i = asyncIds.find(action.args[0]); + if (i != asyncIds.end()) { + i->second->enqueueComplete(); + QPID_LOG(debug, logPrefix << "completed " << action.args[0]); + asyncIds.erase(i); + } else { + QPID_LOG(info, logPrefix << "not found for completion " << action.args[0]); + } + break; + } + + default: + QPID_LOG(error, logPrefix << "unknown action: " << data); } } - else - msg->enqueueComplete(); + if (doComplete) msg->enqueueComplete(); } void dequeue(TransactionContext* tx, @@ -239,22 +313,19 @@ class TestStore : public NullMessageStore { private: - static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; + typedef map<string, boost::intrusive_ptr<PersistableMessage> > AsyncIds; + TestStoreOptions options; string name; Broker& broker; vector<Thread> threads; std::auto_ptr<ofstream> dump; std::auto_ptr<ofstream> events; + AsyncIds asyncIds; }; int TestStore::TxContext::nextId(1); -const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; -const string TestStore::EXCEPTION = "exception"; -const string TestStore::EXIT_PROCESS = "exit_process"; -const string TestStore::ASYNC="async "; - struct TestStorePlugin : public Plugin { TestStoreOptions options; |