diff options
author | Alan Conway <aconway@apache.org> | 2010-10-27 18:01:27 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-10-27 18:01:27 +0000 |
commit | 326dddd0d0d48401d14ca93044b3fc0e35ad87d9 (patch) | |
tree | 019a45480d8cdf832f62d7176b7a10a5d0971535 /cpp/src/tests/BrokerClusterCalls.cpp | |
parent | aae11121cfcf891b2365241141f9ab9cb47d3024 (diff) | |
download | qpid-python-326dddd0d0d48401d14ca93044b3fc0e35ad87d9.tar.gz |
Revert experimental cluster code, too close to 0.8 release.
Reverts revisions:
r1023966 "Introduce broker::Cluster interface."
r1024275 "Fix compile error: outline set/getCluster fucntions on Broker."
r1027210 "New cluster: core framework and initial implementation of enqueue logic."
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1028055 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/BrokerClusterCalls.cpp')
-rw-r--r-- | cpp/src/tests/BrokerClusterCalls.cpp | 435 |
1 files changed, 0 insertions, 435 deletions
diff --git a/cpp/src/tests/BrokerClusterCalls.cpp b/cpp/src/tests/BrokerClusterCalls.cpp deleted file mode 100644 index f659702387..0000000000 --- a/cpp/src/tests/BrokerClusterCalls.cpp +++ /dev/null @@ -1,435 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -///@file -// Tests using a dummy broker::Cluster implementation to verify the expected -// Cluster functions are called for various actions on the broker. -// - -#include "unit_test.h" -#include "test_tools.h" -#include "qpid/broker/Cluster.h" -#include "qpid/broker/Queue.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Session.h" -#include "qpid/messaging/Connection.h" -#include "qpid/messaging/Session.h" -#include "qpid/messaging/Sender.h" -#include "qpid/messaging/Receiver.h" -#include "qpid/messaging/Message.h" -#include "qpid/messaging/Duration.h" -#include "BrokerFixture.h" -#include <boost/assign.hpp> -#include <boost/format.hpp> - -using namespace std; -using namespace boost; -using namespace boost::assign; -using namespace qpid::messaging; -using boost::format; -using boost::intrusive_ptr; - -namespace qpid { -namespace tests { - -class DummyCluster : public broker::Cluster -{ - private: - /** Flag used to ignore events other than enqueues while routing, - * e.g. acquires and accepts generated in a ring queue to replace an element.. - * In real impl would be a thread-local variable. - */ - bool isRouting; - - void recordQm(const string& op, const broker::QueuedMessage& qm) { - history += (format("%s(%s, %d, %s)") % op % qm.queue->getName() - % qm.position % qm.payload->getFrames().getContent()).str(); - } - void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) { - history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str(); - } - void recordStr(const string& op, const string& name) { - history += (format("%s(%s)") % op % name).str(); - } - public: - // Messages - - virtual void routing(const boost::intrusive_ptr<broker::Message>& m) { - isRouting = true; - history += (format("routing(%s)") % m->getFrames().getContent()).str(); - } - - virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) { - recordMsg("enqueue", q, msg); - return true; - } - - virtual void routed(const boost::intrusive_ptr<broker::Message>& m) { - history += (format("routed(%s)") % m->getFrames().getContent()).str(); - isRouting = false; - } - virtual void acquire(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("acquire", qm); - } - virtual void accept(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("accept", qm); - } - virtual void reject(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("reject", qm); - } - virtual void rejected(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("rejected", qm); - } - virtual void release(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("release", qm); - } - virtual void drop(const broker::QueuedMessage& qm) { - if (!isRouting) recordQm("dequeue", qm); - } - - // Consumers - - virtual void consume(const broker::Queue& q, size_t n) { - history += (format("consume(%s, %d)") % q.getName() % n).str(); - } - virtual void cancel(const broker::Queue& q, size_t n) { - history += (format("cancel(%s, %d)") % q.getName() % n).str(); - } - - // Wiring - - virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); } - virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); } - virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); } - virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); } - virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) { - history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str(); - } - vector<string> history; -}; - -QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite) - -// Broker fixture with DummyCluster set up and some new API client bits. -struct DummyClusterFixture: public BrokerFixture { - Connection c; - Session s; - DummyCluster*dc; - DummyClusterFixture() { - broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster)); - dc = &static_cast<DummyCluster&>(broker->getCluster()); - c = Connection("localhost:"+lexical_cast<string>(getPort())); - c.open(); - s = c.createSession(); - } - ~DummyClusterFixture() { - c.close(); - } -}; - -QPID_AUTO_TEST_CASE(testSimplePubSub) { - DummyClusterFixture f; - vector<string>& h = f.dc->history; - - // Queue creation - Sender sender = f.s.createSender("q;{create:always,delete:always}"); - size_t i = 0; - BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. - BOOST_CHECK_EQUAL(h.size(), i); - - // Consumer - Receiver receiver = f.s.createReceiver("q"); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Send message - sender.send(Message("a")); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - // Don't check size here as it is uncertain whether acquire has happened yet. - - // Acquire message - Message m = receiver.fetch(Duration::SECOND); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Acknowledge message - f.s.acknowledge(true); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Close a consumer - receiver.close(); - BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Destroy the queue - f.c.close(); - BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)"); - BOOST_CHECK_EQUAL(h.size(), i); -} - -QPID_AUTO_TEST_CASE(testReleaseReject) { - DummyClusterFixture f; - vector<string>& h = f.dc->history; - - Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}"); - sender.send(Message("a")); - Receiver receiver = f.s.createReceiver("q"); - Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}"); - Message m = receiver.fetch(Duration::SECOND); - h.clear(); - - // Explicit release - f.s.release(m); - f.s.sync(); - size_t i = 0; - BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Implicit release on closing connection. - Connection c("localhost:"+lexical_cast<string>(f.getPort())); - c.open(); - Session s = c.createSession(); - Receiver r = s.createReceiver("q"); - m = r.fetch(Duration::SECOND); - h.clear(); - i = 0; - c.close(); - BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)"); - BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Reject message, goes to alternate exchange. - m = receiver.fetch(Duration::SECOND); - h.clear(); - i = 0; - f.s.reject(m); - BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); - m = altReceiver.fetch(Duration::SECOND); - BOOST_CHECK_EQUAL(m.getContent(), "a"); - - // Timed out message - h.clear(); - i = 0; - m = Message("t"); - m.setTtl(Duration(1)); // Timeout 1ms - sender.send(m); - usleep(2000); // Sleep 2ms - bool received = receiver.fetch(m, Duration::IMMEDIATE); - BOOST_CHECK(!received); // Timed out - BOOST_CHECK_EQUAL(h.at(i++), "routing(t)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Message replaced on LVQ - sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}"); - m = Message("a"); - m.getProperties()["qpid.LVQ_key"] = "foo"; - sender.send(m); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)"); - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - m = Message("b"); - m.getProperties()["qpid.LVQ_key"] = "foo"; - sender.send(m); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); - BOOST_CHECK_EQUAL(h.size(), i); - - receiver = f.s.createReceiver("lvq"); - BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b"); - f.s.acknowledge(true); - BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(lvq, 1, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)"); - BOOST_CHECK_EQUAL(h.size(), i); -} - -QPID_AUTO_TEST_CASE(testFanout) { - DummyClusterFixture f; - vector<string>& h = f.dc->history; - - Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}"); - Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}"); - Sender sender = f.s.createSender("amq.fanout"); - r1.setCapacity(0); // Don't receive immediately. - r2.setCapacity(0); - h.clear(); - size_t i = 0; - - // Send message - sender.send(Message("a")); - f.s.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r")); - BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r")); - BOOST_CHECK(h.at(i-1) != h.at(i-2)); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - BOOST_CHECK_EQUAL(h.size(), i); - - // Receive messages - Message m1 = r1.fetch(Duration::SECOND); - f.s.acknowledge(m1, true); - Message m2 = r2.fetch(Duration::SECOND); - f.s.acknowledge(m2, true); - - BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r1, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r2, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)"); - BOOST_CHECK_EQUAL(h.size(), i); -} - -QPID_AUTO_TEST_CASE(testRingQueue) { - DummyClusterFixture f; - vector<string>& h = f.dc->history; - - // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working, - // so we can't do this: - // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}"); - // Must use old API to declare ring queue: - qpid::client::Connection c; - f.open(c); - qpid::client::Session s = c.newSession(); - qpid::framing::FieldTable args; - args.setInt("qpid.max_size", 3); - args.setString("qpid.policy_type","ring"); - s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args); - c.close(); - Sender sender = f.s.createSender("ring"); - - size_t i = 0; - // Send message - sender.send(Message("a")); - sender.send(Message("b")); - sender.send(Message("c")); - sender.send(Message("d")); - f.s.sync(); - - BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)"); - - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - - BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); - - BOOST_CHECK_EQUAL(h.at(i++), "routing(c)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(c)"); - - BOOST_CHECK_EQUAL(h.at(i++), "routing(d)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(d)"); - - Receiver receiver = f.s.createReceiver("ring"); - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c"); - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d"); - f.s.acknowledge(true); - - BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 2, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 3, c)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)"); - BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 4, d)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)"); - - BOOST_CHECK_EQUAL(h.size(), i); -} - -QPID_AUTO_TEST_CASE(testTransactions) { - DummyClusterFixture f; - vector<string>& h = f.dc->history; - Session ts = f.c.createTransactionalSession(); - Sender sender = ts.createSender("q;{create:always,delete:always}"); - size_t i = 0; - BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. - BOOST_CHECK_EQUAL(h.size(), i); - - sender.send(Message("a")); - sender.send(Message("b")); - ts.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); - BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit - ts.commit(); - // FIXME aconway 2010-10-18: As things stand the cluster is not - // compatible with transactions - // - enqueues occur after routing is complete - // - no call to Cluster::enqueue, should be in Queue::process? - // - no transaction context associated with messages in the Cluster interface. - // - no call to Cluster::accept in Queue::dequeueCommitted - // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); - // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)"); - BOOST_CHECK_EQUAL(h.size(), i); - - - Receiver receiver = ts.createReceiver("q"); - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a"); - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); - ts.acknowledge(); - ts.sync(); - BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)"); - BOOST_CHECK_EQUAL(h.size(), i); - ts.commit(); - ts.sync(); - // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); - // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)"); - BOOST_CHECK_EQUAL(h.size(), i); -} - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests - |