summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-12-11 20:55:45 +0000
committerAlan Conway <aconway@apache.org>2009-12-11 20:55:45 +0000
commitd490fba74749bcde972e5a0d95f84b165f8ea05e (patch)
treeffc58006adb15ec8fa29955911f5f3a0f02dfa69 /cpp/src/tests
parente4aee82085958588458ba34d2bf7dd0db90a257d (diff)
downloadqpid-python-d490fba74749bcde972e5a0d95f84b165f8ea05e.tar.gz
QPID-2266: error sending update: Enqueue capacity threshold exceeded
Fix for the problem with a test to verify that messages going to the store have the same headers and content-size for an updatee or a broker that receives the publish directly. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@889813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rwxr-xr-xcpp/src/tests/cluster_tests.py28
-rw-r--r--cpp/src/tests/test_store.cpp37
2 files changed, 60 insertions, 5 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index e57a826000..178271e977 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -57,6 +57,34 @@ class ShortTests(BrokerTest):
self.assertEqual("y", m.content)
s2.connection.close()
+ def test_store_direct_update_match(self):
+ """Verify that brokers stores an identical message whether they receive it
+ direct from clients or during an update, no header or other differences"""
+ cluster = self.cluster(0, args=["--load-module", self.test_store_lib])
+ cluster.start(args=["--test-store-dump", "direct.dump"])
+ # Try messages with various headers
+ cluster[0].send_message("q", Message(durable=True, content="foobar",
+ subject="subject",
+ reply_to="reply_to",
+ properties={"n":10}))
+ # Try messages of different sizes
+ for size in range(0,10000,100):
+ cluster[0].send_message("q", Message(content="x"*size, durable=True))
+ # Try sending via named exchange
+ c = cluster[0].connect_old()
+ s = c.session(str(qpid.datatypes.uuid4()))
+ s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q")
+ props = s.delivery_properties(routing_key="foo", delivery_mode=2)
+ s.message_transfer(
+ destination="amq.direct",
+ message=qpid.datatypes.Message(props, "content"))
+
+ # Now update a new member and compare their dumps.
+ cluster.start(args=["--test-store-dump", "updatee.dump"])
+ assert file("direct.dump").read() == file("updatee.dump").read()
+ os.remove("direct.dump")
+ os.remove("updatee.dump")
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp
index c675c6daa3..257e77b6b4 100644
--- a/cpp/src/tests/test_store.cpp
+++ b/cpp/src/tests/test_store.cpp
@@ -34,11 +34,14 @@
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/Broker.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include <boost/cast.hpp>
#include <boost/lexical_cast.hpp>
+#include <memory>
+#include <fstream>
using namespace qpid;
using namespace broker;
@@ -51,10 +54,13 @@ namespace tests {
struct TestStoreOptions : public Options {
string name;
+ string dump;
TestStoreOptions() : Options("Test Store Options") {
addOptions()
- ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance.");
+ ("test-store-name", optValue(name, "NAME"), "Name of test store instance.")
+ ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.")
+ ;
}
};
@@ -71,19 +77,38 @@ struct Completer : public Runnable {
class TestStore : public NullMessageStore {
public:
- TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {}
+ TestStore(const TestStoreOptions& opts, Broker& broker_)
+ : options(opts), name(opts.name), broker(broker_)
+ {
+ QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump);
+ if (!options.dump.empty())
+ dump.reset(new ofstream(options.dump.c_str()));
+ }
~TestStore() {
for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
}
+ virtual bool isNull() const { return false; }
+
void enqueue(TransactionContext* ,
- const boost::intrusive_ptr<PersistableMessage>& msg,
+ const boost::intrusive_ptr<PersistableMessage>& pmsg,
const PersistableQueue& )
{
- string data = boost::polymorphic_downcast<Message*>(msg.get())->getFrames().getContent();
+ Message* msg = dynamic_cast<Message*>(pmsg.get());
+ assert(msg);
+
+ // Dump the message if there is a dump file.
+ if (dump.get()) {
+ msg->getFrames().getMethod()->print(*dump);
+ *dump << endl << " ";
+ msg->getFrames().getHeaders()->print(*dump);
+ *dump << endl << " ";
+ *dump << msg->getFrames().getContentSize() << endl;
+ }
// Check the message for special instructions.
+ string data = msg->getFrames().getContent();
size_t i = string::npos;
size_t j = string::npos;
if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
@@ -119,9 +144,11 @@ class TestStore : public NullMessageStore {
private:
static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
+ TestStoreOptions options;
string name;
Broker& broker;
vector<Thread> threads;
+ std::auto_ptr<ofstream> dump;
};
const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
@@ -139,7 +166,7 @@ struct TestStorePlugin : public Plugin {
{
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- boost::shared_ptr<MessageStore> p(new TestStore(options.name, *broker));
+ boost::shared_ptr<MessageStore> p(new TestStore(options, *broker));
broker->setStore (p);
}