diff options
author | Alan Conway <aconway@apache.org> | 2009-12-11 20:55:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-12-11 20:55:45 +0000 |
commit | d490fba74749bcde972e5a0d95f84b165f8ea05e (patch) | |
tree | ffc58006adb15ec8fa29955911f5f3a0f02dfa69 /cpp/src/tests | |
parent | e4aee82085958588458ba34d2bf7dd0db90a257d (diff) | |
download | qpid-python-d490fba74749bcde972e5a0d95f84b165f8ea05e.tar.gz |
QPID-2266: error sending update: Enqueue capacity threshold exceeded
Fix for the problem with a test to verify that messages going to the store
have the same headers and content-size for an updatee or a broker that
receives the publish directly.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@889813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 28 | ||||
-rw-r--r-- | cpp/src/tests/test_store.cpp | 37 |
2 files changed, 60 insertions, 5 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index e57a826000..178271e977 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -57,6 +57,34 @@ class ShortTests(BrokerTest): self.assertEqual("y", m.content) s2.connection.close() + def test_store_direct_update_match(self): + """Verify that brokers stores an identical message whether they receive it + direct from clients or during an update, no header or other differences""" + cluster = self.cluster(0, args=["--load-module", self.test_store_lib]) + cluster.start(args=["--test-store-dump", "direct.dump"]) + # Try messages with various headers + cluster[0].send_message("q", Message(durable=True, content="foobar", + subject="subject", + reply_to="reply_to", + properties={"n":10})) + # Try messages of different sizes + for size in range(0,10000,100): + cluster[0].send_message("q", Message(content="x"*size, durable=True)) + # Try sending via named exchange + c = cluster[0].connect_old() + s = c.session(str(qpid.datatypes.uuid4())) + s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q") + props = s.delivery_properties(routing_key="foo", delivery_mode=2) + s.message_transfer( + destination="amq.direct", + message=qpid.datatypes.Message(props, "content")) + + # Now update a new member and compare their dumps. + cluster.start(args=["--test-store-dump", "updatee.dump"]) + assert file("direct.dump").read() == file("updatee.dump").read() + os.remove("direct.dump") + os.remove("updatee.dump") + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp index c675c6daa3..257e77b6b4 100644 --- a/cpp/src/tests/test_store.cpp +++ b/cpp/src/tests/test_store.cpp @@ -34,11 +34,14 @@ #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/Broker.h" +#include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" #include "qpid/Plugin.h" #include "qpid/Options.h" #include <boost/cast.hpp> #include <boost/lexical_cast.hpp> +#include <memory> +#include <fstream> using namespace qpid; using namespace broker; @@ -51,10 +54,13 @@ namespace tests { struct TestStoreOptions : public Options { string name; + string dump; TestStoreOptions() : Options("Test Store Options") { addOptions() - ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance."); + ("test-store-name", optValue(name, "NAME"), "Name of test store instance.") + ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.") + ; } }; @@ -71,19 +77,38 @@ struct Completer : public Runnable { class TestStore : public NullMessageStore { public: - TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {} + TestStore(const TestStoreOptions& opts, Broker& broker_) + : options(opts), name(opts.name), broker(broker_) + { + QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump); + if (!options.dump.empty()) + dump.reset(new ofstream(options.dump.c_str())); + } ~TestStore() { for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1)); } + virtual bool isNull() const { return false; } + void enqueue(TransactionContext* , - const boost::intrusive_ptr<PersistableMessage>& msg, + const boost::intrusive_ptr<PersistableMessage>& pmsg, const PersistableQueue& ) { - string data = boost::polymorphic_downcast<Message*>(msg.get())->getFrames().getContent(); + Message* msg = dynamic_cast<Message*>(pmsg.get()); + assert(msg); + + // Dump the message if there is a dump file. + if (dump.get()) { + msg->getFrames().getMethod()->print(*dump); + *dump << endl << " "; + msg->getFrames().getHeaders()->print(*dump); + *dump << endl << " "; + *dump << msg->getFrames().getContentSize() << endl; + } // Check the message for special instructions. + string data = msg->getFrames().getContent(); size_t i = string::npos; size_t j = string::npos; if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0 @@ -119,9 +144,11 @@ class TestStore : public NullMessageStore { private: static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; + TestStoreOptions options; string name; Broker& broker; vector<Thread> threads; + std::auto_ptr<ofstream> dump; }; const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; @@ -139,7 +166,7 @@ struct TestStorePlugin : public Plugin { { Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; - boost::shared_ptr<MessageStore> p(new TestStore(options.name, *broker)); + boost::shared_ptr<MessageStore> p(new TestStore(options, *broker)); broker->setStore (p); } |