diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/SessionManager.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 107 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.cpp | 111 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.h | 73 | ||||
-rw-r--r-- | cpp/src/tests/Cluster_child.cpp | 53 | ||||
-rw-r--r-- | cpp/src/tests/Cpg.cpp | 8 | ||||
-rwxr-xr-x | cpp/src/tests/ais_check | 28 | ||||
-rwxr-xr-x | cpp/src/tests/ais_run | 15 | ||||
-rw-r--r-- | cpp/src/tests/ais_test.cpp | 23 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 63 | ||||
-rw-r--r-- | cpp/src/tests/cluster_client.cpp | 44 | ||||
-rwxr-xr-x | cpp/src/tests/start_cluster | 2 |
16 files changed, 210 insertions, 389 deletions
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index aadb2b9004..571d3365db 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -43,12 +43,13 @@ SessionManager::SessionManager(uint32_t a) : ack(a) {} SessionManager::~SessionManager() {} +// FIXME aconway 2008-02-01: pass handler*, allow open unattached. std::auto_ptr<SessionState> SessionManager::open( SessionHandler& h, uint32_t timeout_) { Mutex::ScopedLock l(lock); std::auto_ptr<SessionState> session( - new SessionState(*this, h, timeout_, ack)); + new SessionState(this, &h, timeout_, ack)); active.insert(session->getId()); for_each(observers.begin(), observers.end(), boost::bind(&Observer::opened, _1,boost::ref(*session))); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index a75b32cbb5..1021cca1b1 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -36,23 +36,17 @@ using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; -void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); } - -void SessionState::handleOut(AMQFrame& f) { - assert(handler); - handler->out.handle(f); -} - SessionState::SessionState( - SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack) + SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) : framing::SessionState(ack, timeout_ > 0), - factory(f), handler(&h), id(true), timeout(timeout_), - broker(h.getConnection().broker), - version(h.getConnection().getVersion()), + factory(f), handler(h), id(true), timeout(timeout_), + broker(h->getConnection().broker), + version(h->getConnection().getVersion()), semanticHandler(new SemanticHandler(*this)) { - // TODO aconway 2007-09-20: SessionManager may add plugin - // handlers to the chain. + in.next = semanticHandler.get(); + out.next = &handler->out; + getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); Manageable* parent = broker.GetVhostObject (); @@ -66,8 +60,8 @@ SessionState::SessionState( mgmtObject = management::Session::shared_ptr (new management::Session (this, parent, id.str ())); mgmtObject->set_attached (1); - mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); - mgmtObject->set_channelId (h.getChannel()); + mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h->getChannel()); mgmtObject->set_detachedLifespan (getTimeout()); agent->addObject (mgmtObject); } @@ -76,12 +70,10 @@ SessionState::SessionState( SessionState::~SessionState() { // Remove ID from active session list. - factory.erase(getId()); - + if (factory) + factory->erase(getId()); if (mgmtObject.get () != 0) - { mgmtObject->resourceDestroy (); - } } SessionHandler* SessionState::getHandler() { @@ -101,7 +93,7 @@ Connection& SessionState::getConnection() { void SessionState::detach() { getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); Mutex::ScopedLock l(lock); - handler = 0; + handler = 0; out.next = 0; if (mgmtObject.get() != 0) { mgmtObject->set_attached (0); @@ -112,6 +104,7 @@ void SessionState::attach(SessionHandler& h) { { Mutex::ScopedLock l(lock); handler = &h; + out.next = &handler->out; if (mgmtObject.get() != 0) { mgmtObject->set_attached (1); diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index c8c32a046d..bc1b974eaa 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -58,7 +58,7 @@ class Connection; * themselves have state. */ class SessionState : public framing::SessionState, - public framing::FrameHandler::InOutHandler, + public framing::FrameHandler::Chains, public sys::OutputControl, public management::Manageable { @@ -90,18 +90,15 @@ class SessionState : public framing::SessionState, management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); - protected: - void handleIn(framing::AMQFrame&); - void handleOut(framing::AMQFrame&); - - private: - // SessionManager creates sessions. - SessionState(SessionManager&, - SessionHandler& out, + // Normally SessionManager creates sessions. + SessionState(SessionManager*, + SessionHandler* out, uint32_t timeout, uint32_t ackInterval); - SessionManager& factory; + + private: + SessionManager* factory; SessionHandler* handler; framing::Uuid id; uint32_t timeout; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 49270bcfef..bca6c49c13 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,6 +17,7 @@ */ #include "Cluster.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" @@ -31,7 +32,70 @@ namespace cluster { using namespace qpid::framing; using namespace qpid::sys; using namespace std; +using broker::SessionState; +namespace { + +// Beginning of inbound chain: send to cluster. +struct ClusterSendHandler : public FrameHandler { + SessionState& session; + Cluster& cluster; + bool busy; + Monitor lock; + + ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} + + void handle(AMQFrame& f) { + Mutex::ScopedLock l(lock); + assert(!busy); + // FIXME aconway 2008-01-29: refcount Sessions. + // session.addRef(); // Keep the session till the message is self delivered. + cluster.send(f, next); // Indirectly send to next via cluster. + + // FIXME aconway 2008-01-29: need to get this blocking out of the loop. + // But cluster needs to agree on order of side-effects on the shared model. + // OK for wiring to block, for messages use queue tokens? + // Both in & out transfers must be orderd per queue. + // May need out-of-order completion. + busy=true; + while (busy) lock.wait(); + } +}; + +// Next in inbound chain, self delivered from cluster. +struct ClusterDeliverHandler : public FrameHandler { + Cluster& cluster; + ClusterSendHandler& sender; + + ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {} + + void handle(AMQFrame& f) { + next->handle(f); + Mutex::ScopedLock l(sender.lock); + sender.busy=false; + sender.lock.notify(); + } +}; + +// FIXME aconway 2008-01-29: IList +void insert(FrameHandler::Chain& c, FrameHandler* h) { + h->next = c.next; + c.next = h; +} + +struct SessionObserver : public broker::SessionManager::Observer { + Cluster& cluster; + SessionObserver(Cluster& c) : cluster(c) {} + + void opened(SessionState& s) { + // FIXME aconway 2008-01-29: IList for memory management. + ClusterSendHandler* sender=new ClusterSendHandler(s, cluster); + ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster); + insert(s.in, deliverer); + insert(s.in, sender); + } +}; +} ostream& operator <<(ostream& out, const Cluster& cluster) { return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]"; @@ -48,10 +112,10 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { } Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) : - FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer cpg(*this), name(name_), - url(url_) + url(url_), + observer(new SessionObserver(*this)) { QPID_LOG(trace, *this << " Joining cluster: " << name_); cpg.join(name); @@ -77,18 +141,19 @@ Cluster::~Cluster() { } } -void Cluster::handle(AMQFrame& frame) { +void Cluster::send(AMQFrame& frame, FrameHandler* next) { QPID_LOG(trace, *this << " SEND: " << frame); - boost::scoped_array<char> store(new char[frame.size()]); // FIXME aconway 2008-01-29: Better buffer handling. - Buffer buf(store.get()); + char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling. + Buffer buf(data); frame.encode(buf); - iovec iov = { store.get(), frame.size() }; + buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer. + iovec iov = { data, frame.size()+sizeof(next) }; cpg.mcast(name, &iov, 1); } void Cluster::notify() { AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())); - handle(frame); + send(frame, 0); } size_t Cluster::size() const { @@ -112,15 +177,25 @@ void Cluster::deliver( void* msg, int msg_len) { - Id from(nodeid, pid); - Buffer buf(static_cast<char*>(msg), msg_len); - AMQFrame frame; - frame.decode(buf); - QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); - if (frame.getChannel() == 0) - handleClusterFrame(from, frame); - else - next->handle(frame); + try { + Id from(nodeid, pid); + Buffer buf(static_cast<char*>(msg), msg_len); + AMQFrame frame; + frame.decode(buf); + QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); + if (frame.getChannel() == 0) + handleClusterFrame(from, frame); + else if (from == self) { + FrameHandler* next; + buf.getRawData((uint8_t*)&next, sizeof(next)); + next->handle(frame); + } + // FIXME aconway 2008-01-30: apply frames from foreign sessions. + } + catch (const std::exception& e) { + // FIXME aconway 2008-01-30: exception handling. + QPID_LOG(error, "Error handling frame from cluster " << e.what()); + } } bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index e9809f2264..b62b2be5f1 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -21,7 +21,6 @@ #include "Cpg.h" -#include "qpid/framing/FrameHandler.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" @@ -39,15 +38,10 @@ namespace qpid { namespace cluster { /** - * Connection to the cluster. Maintains cluster membership - * data. - * - * As FrameHandler, handles frames by sending them to the - * cluster. Frames received from the cluster are sent to the next - * FrameHandler in the chain. + * Connection to the cluster. + * Keeps cluster membership data. */ -class Cluster : public framing::FrameHandler, - private sys::Runnable, private Cpg::Handler +class Cluster : private sys::Runnable, private Cpg::Handler { public: /** Details of a cluster member */ @@ -68,7 +62,7 @@ class Cluster : public framing::FrameHandler, virtual ~Cluster(); // FIXME aconway 2008-01-29: - //framing::HandlerUpdater& getHandlerUpdater() { return sessions; } + intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; } /** Get the current cluster membership. */ MemberList getMembers() const; @@ -87,7 +81,7 @@ class Cluster : public framing::FrameHandler, sys::Duration timeout=sys::TIME_INFINITE) const; /** Send frame to the cluster */ - void handle(framing::AMQFrame&); + void send(framing::AMQFrame&, framing::FrameHandler*); private: typedef Cpg::Id Id; @@ -122,6 +116,7 @@ class Cluster : public framing::FrameHandler, MemberMap members; sys::Thread dispatcher; boost::function<void()> callback; + intrusive_ptr<broker::SessionManager::Observer> observer; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index e6b5f1a0bd..ceafa389b0 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -69,7 +69,7 @@ struct ClusterPlugin : public Plugin { cluster = boost::in_place(options.name, options.getUrl(broker->getPort()), boost::ref(*broker)); - // FIXME aconway 2008-02-01: Add observer. + broker->getSessionManager().add(cluster->getObserver()); } } }; diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp deleted file mode 100644 index b448128620..0000000000 --- a/cpp/src/tests/Cluster.cpp +++ /dev/null @@ -1,111 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "Cluster.h" -#include "test_tools.h" - -#include "qpid/framing/SessionOpenBody.h" -#include "qpid/framing/SessionAttachedBody.h" -#include "qpid/framing/all_method_bodies.h" -#include "qpid/cluster/ClassifierHandler.h" - -#include "unit_test.h" - -#include <sys/wait.h> - -QPID_AUTO_TEST_SUITE(ClusterTestSuite) - -static const ProtocolVersion VER; - -/** Verify membership in a cluster with one member. */ -BOOST_AUTO_TEST_CASE(testClusterOne) { - TestCluster cluster("clusterOne", "amqp:one:1"); - AMQFrame send(in_place<SessionOpenBody>(VER)); - send.setChannel(1); - cluster.handle(send); - AMQFrame received = cluster.received.pop(); - BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *received.getBody()); - BOOST_CHECK_EQUAL(1u, cluster.size()); - Cluster::MemberList members = cluster.getMembers(); - BOOST_CHECK_EQUAL(1u, members.size()); - Cluster::Member me=members.front(); - BOOST_REQUIRE_EQUAL(me.url, "amqp:one:1"); -} - -/** Fork a process to test a cluster with two members */ -BOOST_AUTO_TEST_CASE(testClusterTwo) { - bool nofork=getenv("NOFORK") != 0; - pid_t pid=0; - if (!nofork) { - pid = fork(); - BOOST_REQUIRE(pid >= 0); - } - if (pid || nofork) { // Parent - BOOST_MESSAGE("Parent start"); - TestCluster cluster("clusterTwo", "amqp:parent:1"); - BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child. - - // Exchange frames with child. - AMQFrame send(SessionOpenBody(VER)); - send.setChannel(1); - cluster.handle(send); - AMQFrame received = cluster.received.pop(); - BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *received.getBody()); - - received=cluster.received.pop(); - BOOST_CHECK_TYPEID_EQUAL(SessionAttachedBody, *received.getBody()); - - if (!nofork) { - // Wait for child to exit. - int status; - BOOST_CHECK_EQUAL(::wait(&status), pid); - BOOST_CHECK_EQUAL(0, status); - BOOST_CHECK(cluster.waitFor(1)); - BOOST_CHECK_EQUAL(1u, cluster.size()); - } - } - else { // Child - BOOST_REQUIRE(execl("./Cluster_child", "./Cluster_child", NULL)); - } -} - -struct CountHandler : public FrameHandler { - CountHandler() : count(0) {} - void handle(AMQFrame&) { count++; } - size_t count; -}; - -/** Test the ClassifierHandler */ -BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) { - AMQFrame queueDecl(in_place<QueueDeclareBody>(VER)); - AMQFrame messageTrans(in_place<MessageTransferBody>(VER)); - CountHandler wiring; - CountHandler other; - - ClassifierHandler classify(wiring, other); - - classify.handle(queueDecl); - BOOST_CHECK_EQUAL(1u, wiring.count); - BOOST_CHECK_EQUAL(0u, other.count); - - classify.handle(messageTrans); - BOOST_CHECK_EQUAL(1u, wiring.count); - BOOST_CHECK_EQUAL(1u, other.count); -} - -QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h deleted file mode 100644 index 6ff5c21fdb..0000000000 --- a/cpp/src/tests/Cluster.h +++ /dev/null @@ -1,73 +0,0 @@ -#ifndef CLUSTER_H -#define CLUSTER_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/cluster/Cluster.h" -#include "qpid/sys/BlockingQueue.h" -#include "qpid/framing/AMQFrame.h" - -#include <boost/bind.hpp> -#include <boost/test/test_tools.hpp> - -#include <iostream> -#include <functional> - -/** - * Definitions for the Cluster.cpp and Cluster_child.cpp child program. - */ - -// using namespace in header file is bad manners, but this is strictly for -// the tests. -using namespace std; -using namespace qpid; -using namespace qpid::cluster; -using namespace qpid::framing; -using namespace qpid::sys; -using namespace boost; - -void null_deleter(void*) {} - -template <class T> -class TestHandler : public Handler<T&>, public BlockingQueue<T> -{ - public: - void handle(T& frame) { push(frame); } -}; - -typedef TestHandler<AMQFrame> TestFrameHandler; - -struct TestCluster : public Cluster -{ - TestCluster(string name, string url) - : Cluster(name, url, *(qpid::broker::Broker*)0) {} - - /** Wait for cluster to be of size n. */ - bool waitFor(size_t n) { - BOOST_CHECKPOINT("About to call Cluster::wait"); - return wait(boost::bind( - equal_to<size_t>(), bind(&Cluster::size,this), n)); - } - - TestFrameHandler received; -}; - - - -#endif /*!CLUSTER_H*/ diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp deleted file mode 100644 index 75591bb68f..0000000000 --- a/cpp/src/tests/Cluster_child.cpp +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Child process for the Cluster test suite multi-process tests. - -#include "Cluster.h" -#include "test_tools.h" -#include "qpid/framing/SessionOpenBody.h" -#include "qpid/framing/SessionAttachedBody.h" - -using namespace std; -using namespace qpid; -using namespace qpid::cluster; -using namespace qpid::framing; -using namespace qpid::sys; -using namespace qpid::log; - -static const ProtocolVersion VER; - -/** Child part of Cluster::clusterTwo test */ -void clusterTwo() { - TestCluster cluster("clusterTwo", "amqp:child:2"); - AMQFrame frame = cluster.received.pop(frame); // Frame from parent. - BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *frame.getBody()); - BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent - - AMQFrame send(in_place<SessionAttachedBody>(VER)); - send.setChannel(1); - cluster.handle(send); - BOOST_REQUIRE(cluster.received.waitPop(frame)); - BOOST_CHECK_TYPEID_EQUAL(SessionAttachedBody, *frame.getBody()); -} - -int test_main(int, char**) { - clusterTwo(); - return 0; -} - diff --git a/cpp/src/tests/Cpg.cpp b/cpp/src/tests/Cpg.cpp index fddfe9ef05..e8339bf773 100644 --- a/cpp/src/tests/Cpg.cpp +++ b/cpp/src/tests/Cpg.cpp @@ -78,12 +78,16 @@ struct Callback : public Cpg::Handler { cpg_handle_t /*handle*/, struct cpg_name *grp, struct cpg_address */*members*/, int nMembers, - struct cpg_address */*left*/, int /*nLeft*/, - struct cpg_address */*joined*/, int /*nJoined*/ + struct cpg_address */*left*/, int nLeft, + struct cpg_address */*joined*/, int nJoined ) { BOOST_CHECK_EQUAL(group, Cpg::str(*grp)); configChanges.push_back(nMembers); + BOOST_MESSAGE("configChange: "<< + nLeft<<" left "<< + nJoined<<" joined "<< + nMembers<<" members."); } }; diff --git a/cpp/src/tests/ais_check b/cpp/src/tests/ais_check index ce3bbe1b1c..ae0edf88c1 100755 --- a/cpp/src/tests/ais_check +++ b/cpp/src/tests/ais_check @@ -2,10 +2,12 @@ # Check for requirements, run AIS tests if found. # -test `id -ng` = "ais" || BADGROUP=yes -{ ps -u root | grep aisexec >/dev/null; } || NOAISEXEC=yes +id -nG | grep '\<ais\>' || \ + NOGROUP="You are not a member of the ais group." +ps -u root | grep aisexec >/dev/null || \ + NOAISEXEC="The aisexec daemon is not running as root" -if test -n "$BADGROUP" -o -n "$NOAISEXEC"; then +if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then cat <<EOF =========== WARNING: NOT RUNNING AIS TESTS ============== @@ -13,18 +15,8 @@ if test -n "$BADGROUP" -o -n "$NOAISEXEC"; then Tests that depend on the openais library (used for clustering) will not be run because: -EOF - test -n "$BADGROUP" && cat <<EOF - You do not appear to have you group ID set to "ais". Make ais your - primary group, or run "newgrp ais" before running the tests. - -EOF - test -n "$NOAISEXEC" && cat <<EOF - The aisexec daemon is not running. Make sure /etc/ais/openais.conf - is a valid configuration and aisexec is run by root. -EOF - - cat <<EOF + $NOGROUP + $NOAISEXEC ========================================================== @@ -32,8 +24,4 @@ EOF exit 0; # A warning, not a failure. fi -FAILED=0 -for test in `cat ais_tests`; do - ./$test || FAILED=`expr $FAILED + 1` -done -exit $FAILED +echo ./ais_run | newgrp ais diff --git a/cpp/src/tests/ais_run b/cpp/src/tests/ais_run new file mode 100755 index 0000000000..0f45edc39c --- /dev/null +++ b/cpp/src/tests/ais_run @@ -0,0 +1,15 @@ +#!/bin/sh +# +# Run AIS tests, assumes that ais_check has passed and we are +# running with the ais group ID. +# + +# FIXME aconway 2008-01-30: we should valgrind the cluster brokers. + +srcdir=`dirname $0` +$srcdir/start_cluster 4 +./ais_test +ret=$? +$srcdir/stop_cluster +exit $ret + diff --git a/cpp/src/tests/ais_test.cpp b/cpp/src/tests/ais_test.cpp new file mode 100644 index 0000000000..00c61242e4 --- /dev/null +++ b/cpp/src/tests/ais_test.cpp @@ -0,0 +1,23 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Defines test_main function to link with actual unit test code. +#define BOOST_AUTO_TEST_MAIN // Boost 1.33 +#define BOOST_TEST_MAIN +#include "unit_test.h" + diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index a6f7fa90b4..3db0604e2c 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -1,55 +1,20 @@ -# FIXME aconway 2007-08-31: Disabled cluster compilation, -# has not been kept up to date with recent commits. +if CLUSTER +# +# Cluster tests makefile fragment, to be included in Makefile.am # -# if CLUSTER -# # Cluster tests makefile fragment, to be included in Makefile.am -# # +lib_cluster = $(abs_builddir)/../libqpidcluster.la -# lib_cluster = $(abs_builddir)/../libqpidcluster.la - -# # NOTE: Programs using the openais library must be run with gid=ais -# # You should do "newgrp ais" before running the tests to run these. -# # - -# # -# # Cluster tests. -# # - -# # ais_check runs ais if the conditions to run AIS tests -# # are met, otherwise it prints a warning. -# TESTS+=ais_check -# EXTRA_DIST+=ais_check -# AIS_TESTS= - -# ais_check: ais_tests -# ais_tests: -# echo $(AIS_TESTS) -# echo "# AIS tests" >$@ -# for t in $(AIS_TESTS); do echo ./$$t >$@; done -# chmod a+x $@ - -# CLEANFILES+=ais_tests - -# AIS_TESTS+=Cpg -# check_PROGRAMS+=Cpg -# Cpg_SOURCES=Cpg.cpp -# Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework - -# # TODO aconway 2007-07-26: Fix this test. -# #AIS_TESTS+=Cluster -# # check_PROGRAMS+=Cluster -# # Cluster_SOURCES=Cluster.cpp Cluster.h -# # Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework +# NOTE: Programs using the openais library must be run with gid=ais +# You should do "newgrp ais" before running the tests to run these. +# -# check_PROGRAMS+=Cluster_child -# Cluster_child_SOURCES=Cluster_child.cpp Cluster.h -# Cluster_child_LDADD=$(lib_cluster) -lboost_test_exec_monitor +# ais_check checks conditions for AIS tests and runs if ok. +TESTS+=ais_check +EXTRA_DIST+=ais_check ais_run -# # TODO aconway 2007-07-03: In progress -# #AIS_TESTS+=cluster_client -# check_PROGRAMS+=cluster_client -# cluster_client_SOURCES=cluster_client.cpp -# cluster_client_LDADD=$(lib_client) -lboost_unit_test_framework +check_PROGRAMS+=ais_test +ais_test_SOURCES=ais_test.cpp Cpg.cpp +ais_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework -# endif +endif diff --git a/cpp/src/tests/cluster_client.cpp b/cpp/src/tests/cluster_client.cpp index c74d7329f0..30b7e38801 100644 --- a/cpp/src/tests/cluster_client.cpp +++ b/cpp/src/tests/cluster_client.cpp @@ -16,21 +16,25 @@ * */ -#include "qpid/client/Connection.h" -#include "qpid/shared_ptr.h" - #include "unit_test.h" +#include "BrokerFixture.h" +#include "qpid/client/Session.h" #include <fstream> #include <vector> #include <functional> - QPID_AUTO_TEST_SUITE(cluster_clientTestSuite) -using namespace std; using namespace qpid; using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::client::arg; +using framing::TransferContent; +using std::vector; +using std::string; +using std::ifstream; +using std::ws; struct ClusterConnections : public vector<shared_ptr<Connection> > { ClusterConnections() { @@ -58,25 +62,23 @@ BOOST_AUTO_TEST_CASE(testWiringReplication) { ClusterConnections cluster; BOOST_REQUIRE(cluster.size() > 1); - Exchange fooEx("FooEx", Exchange::TOPIC_EXCHANGE); - Queue fooQ("FooQ"); - - Channel broker0; - cluster[0]->openChannel(broker0); - broker0.declareExchange(fooEx); - broker0.declareQueue(fooQ); - broker0.bind(fooEx, fooQ, "FooKey"); + Session broker0 = cluster[0]->newSession(); + broker0.exchangeDeclare(exchange="ex"); + broker0.queueDeclare(queue="q"); + broker0.queueBind(exchange="ex", queue="q", routingKey="key"); broker0.close(); for (size_t i = 1; i < cluster.size(); ++i) { - Channel ch; - cluster[i]->openChannel(ch); - ch.publish(Message("hello"), fooEx, "FooKey"); - Message m; - BOOST_REQUIRE(ch.get(m, fooQ)); - BOOST_REQUIRE_EQUAL(m.getData(), "hello"); - ch.close(); - } + Session s = cluster[i]->newSession(); + s.messageTransfer(content=TransferContent("data", "key", "ex")); + s.messageSubscribe(queue="q", destination="q"); + s.messageFlow(destination="q", unit=0, value=1);//messages + FrameSet::shared_ptr msg = s.get(); + BOOST_CHECK(msg->isA<MessageTransferBody>()); + BOOST_CHECK_EQUAL(string("data"), msg->getContent()); + s.getExecution().completed(msg->getId(), true, true); + cluster[i]->close(); + } } QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster index 5992a75d8d..03fb671bdf 100755 --- a/cpp/src/tests/start_cluster +++ b/cpp/src/tests/start_cluster @@ -12,7 +12,7 @@ shift OPTS=$* CLUSTER=`whoami` # Cluster name=user name, avoid clashes. for (( i=0; i<SIZE; ++i )); do - PORT=`../qpidd -dp0 --log-output=cluster$i.log --cluster $CLUSTER $OPTS` || exit 1 + PORT=`../qpidd --load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log --cluster-name $CLUSTER $OPTS` || exit 1 echo $PORT >> cluster.ports done |