summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-02-01 18:02:42 +0000
committerAlan Conway <aconway@apache.org>2008-02-01 18:02:42 +0000
commit4db96f7ad47c69982cdc6cf7b5e5c47b00f1144b (patch)
treed125438eb115c0b21171b27d17e6ca1f57622542 /cpp/src
parentae3201b50554b23f52132635f2e852a4fc78617e (diff)
downloadqpid-python-4db96f7ad47c69982cdc6cf7b5e5c47b00f1144b.tar.gz
Cluster code fixed for changes in codebase.
- Using SessionManager::Observer - Better ais test setup, only need to be member of ais group. - Update cluster_client - SessionState holds handler chains. - Cluster frames include next handler ptr. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@617582 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/SessionManager.cpp3
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp33
-rw-r--r--cpp/src/qpid/broker/SessionState.h17
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp107
-rw-r--r--cpp/src/qpid/cluster/Cluster.h17
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp2
-rw-r--r--cpp/src/tests/Cluster.cpp111
-rw-r--r--cpp/src/tests/Cluster.h73
-rw-r--r--cpp/src/tests/Cluster_child.cpp53
-rw-r--r--cpp/src/tests/Cpg.cpp8
-rwxr-xr-xcpp/src/tests/ais_check28
-rwxr-xr-xcpp/src/tests/ais_run15
-rw-r--r--cpp/src/tests/ais_test.cpp23
-rw-r--r--cpp/src/tests/cluster.mk63
-rw-r--r--cpp/src/tests/cluster_client.cpp44
-rwxr-xr-xcpp/src/tests/start_cluster2
16 files changed, 210 insertions, 389 deletions
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp
index aadb2b9004..571d3365db 100644
--- a/cpp/src/qpid/broker/SessionManager.cpp
+++ b/cpp/src/qpid/broker/SessionManager.cpp
@@ -43,12 +43,13 @@ SessionManager::SessionManager(uint32_t a) : ack(a) {}
SessionManager::~SessionManager() {}
+// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
std::auto_ptr<SessionState> SessionManager::open(
SessionHandler& h, uint32_t timeout_)
{
Mutex::ScopedLock l(lock);
std::auto_ptr<SessionState> session(
- new SessionState(*this, h, timeout_, ack));
+ new SessionState(this, &h, timeout_, ack));
active.insert(session->getId());
for_each(observers.begin(), observers.end(),
boost::bind(&Observer::opened, _1,boost::ref(*session)));
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index a75b32cbb5..1021cca1b1 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -36,23 +36,17 @@ using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
-void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
-
-void SessionState::handleOut(AMQFrame& f) {
- assert(handler);
- handler->out.handle(f);
-}
-
SessionState::SessionState(
- SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack)
+ SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack)
: framing::SessionState(ack, timeout_ > 0),
- factory(f), handler(&h), id(true), timeout(timeout_),
- broker(h.getConnection().broker),
- version(h.getConnection().getVersion()),
+ factory(f), handler(h), id(true), timeout(timeout_),
+ broker(h->getConnection().broker),
+ version(h->getConnection().getVersion()),
semanticHandler(new SemanticHandler(*this))
{
- // TODO aconway 2007-09-20: SessionManager may add plugin
- // handlers to the chain.
+ in.next = semanticHandler.get();
+ out.next = &handler->out;
+
getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
Manageable* parent = broker.GetVhostObject ();
@@ -66,8 +60,8 @@ SessionState::SessionState(
mgmtObject = management::Session::shared_ptr
(new management::Session (this, parent, id.str ()));
mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
- mgmtObject->set_channelId (h.getChannel());
+ mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h->getChannel());
mgmtObject->set_detachedLifespan (getTimeout());
agent->addObject (mgmtObject);
}
@@ -76,12 +70,10 @@ SessionState::SessionState(
SessionState::~SessionState() {
// Remove ID from active session list.
- factory.erase(getId());
-
+ if (factory)
+ factory->erase(getId());
if (mgmtObject.get () != 0)
- {
mgmtObject->resourceDestroy ();
- }
}
SessionHandler* SessionState::getHandler() {
@@ -101,7 +93,7 @@ Connection& SessionState::getConnection() {
void SessionState::detach() {
getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
Mutex::ScopedLock l(lock);
- handler = 0;
+ handler = 0; out.next = 0;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (0);
@@ -112,6 +104,7 @@ void SessionState::attach(SessionHandler& h) {
{
Mutex::ScopedLock l(lock);
handler = &h;
+ out.next = &handler->out;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (1);
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index c8c32a046d..bc1b974eaa 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -58,7 +58,7 @@ class Connection;
* themselves have state.
*/
class SessionState : public framing::SessionState,
- public framing::FrameHandler::InOutHandler,
+ public framing::FrameHandler::Chains,
public sys::OutputControl,
public management::Manageable
{
@@ -90,18 +90,15 @@ class SessionState : public framing::SessionState,
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
- protected:
- void handleIn(framing::AMQFrame&);
- void handleOut(framing::AMQFrame&);
-
- private:
- // SessionManager creates sessions.
- SessionState(SessionManager&,
- SessionHandler& out,
+ // Normally SessionManager creates sessions.
+ SessionState(SessionManager*,
+ SessionHandler* out,
uint32_t timeout,
uint32_t ackInterval);
- SessionManager& factory;
+
+ private:
+ SessionManager* factory;
SessionHandler* handler;
framing::Uuid id;
uint32_t timeout;
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 49270bcfef..bca6c49c13 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,6 +17,7 @@
*/
#include "Cluster.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -31,7 +32,70 @@ namespace cluster {
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
+using broker::SessionState;
+namespace {
+
+// Beginning of inbound chain: send to cluster.
+struct ClusterSendHandler : public FrameHandler {
+ SessionState& session;
+ Cluster& cluster;
+ bool busy;
+ Monitor lock;
+
+ ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+
+ void handle(AMQFrame& f) {
+ Mutex::ScopedLock l(lock);
+ assert(!busy);
+ // FIXME aconway 2008-01-29: refcount Sessions.
+ // session.addRef(); // Keep the session till the message is self delivered.
+ cluster.send(f, next); // Indirectly send to next via cluster.
+
+ // FIXME aconway 2008-01-29: need to get this blocking out of the loop.
+ // But cluster needs to agree on order of side-effects on the shared model.
+ // OK for wiring to block, for messages use queue tokens?
+ // Both in & out transfers must be orderd per queue.
+ // May need out-of-order completion.
+ busy=true;
+ while (busy) lock.wait();
+ }
+};
+
+// Next in inbound chain, self delivered from cluster.
+struct ClusterDeliverHandler : public FrameHandler {
+ Cluster& cluster;
+ ClusterSendHandler& sender;
+
+ ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {}
+
+ void handle(AMQFrame& f) {
+ next->handle(f);
+ Mutex::ScopedLock l(sender.lock);
+ sender.busy=false;
+ sender.lock.notify();
+ }
+};
+
+// FIXME aconway 2008-01-29: IList
+void insert(FrameHandler::Chain& c, FrameHandler* h) {
+ h->next = c.next;
+ c.next = h;
+}
+
+struct SessionObserver : public broker::SessionManager::Observer {
+ Cluster& cluster;
+ SessionObserver(Cluster& c) : cluster(c) {}
+
+ void opened(SessionState& s) {
+ // FIXME aconway 2008-01-29: IList for memory management.
+ ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
+ ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
+ insert(s.in, deliverer);
+ insert(s.in, sender);
+ }
+};
+}
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
@@ -48,10 +112,10 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
}
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
- FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer
cpg(*this),
name(name_),
- url(url_)
+ url(url_),
+ observer(new SessionObserver(*this))
{
QPID_LOG(trace, *this << " Joining cluster: " << name_);
cpg.join(name);
@@ -77,18 +141,19 @@ Cluster::~Cluster() {
}
}
-void Cluster::handle(AMQFrame& frame) {
+void Cluster::send(AMQFrame& frame, FrameHandler* next) {
QPID_LOG(trace, *this << " SEND: " << frame);
- boost::scoped_array<char> store(new char[frame.size()]); // FIXME aconway 2008-01-29: Better buffer handling.
- Buffer buf(store.get());
+ char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling.
+ Buffer buf(data);
frame.encode(buf);
- iovec iov = { store.get(), frame.size() };
+ buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer.
+ iovec iov = { data, frame.size()+sizeof(next) };
cpg.mcast(name, &iov, 1);
}
void Cluster::notify() {
AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
- handle(frame);
+ send(frame, 0);
}
size_t Cluster::size() const {
@@ -112,15 +177,25 @@ void Cluster::deliver(
void* msg,
int msg_len)
{
- Id from(nodeid, pid);
- Buffer buf(static_cast<char*>(msg), msg_len);
- AMQFrame frame;
- frame.decode(buf);
- QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (frame.getChannel() == 0)
- handleClusterFrame(from, frame);
- else
- next->handle(frame);
+ try {
+ Id from(nodeid, pid);
+ Buffer buf(static_cast<char*>(msg), msg_len);
+ AMQFrame frame;
+ frame.decode(buf);
+ QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
+ if (frame.getChannel() == 0)
+ handleClusterFrame(from, frame);
+ else if (from == self) {
+ FrameHandler* next;
+ buf.getRawData((uint8_t*)&next, sizeof(next));
+ next->handle(frame);
+ }
+ // FIXME aconway 2008-01-30: apply frames from foreign sessions.
+ }
+ catch (const std::exception& e) {
+ // FIXME aconway 2008-01-30: exception handling.
+ QPID_LOG(error, "Error handling frame from cluster " << e.what());
+ }
}
bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index e9809f2264..b62b2be5f1 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -21,7 +21,6 @@
#include "Cpg.h"
-#include "qpid/framing/FrameHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
@@ -39,15 +38,10 @@
namespace qpid { namespace cluster {
/**
- * Connection to the cluster. Maintains cluster membership
- * data.
- *
- * As FrameHandler, handles frames by sending them to the
- * cluster. Frames received from the cluster are sent to the next
- * FrameHandler in the chain.
+ * Connection to the cluster.
+ * Keeps cluster membership data.
*/
-class Cluster : public framing::FrameHandler,
- private sys::Runnable, private Cpg::Handler
+class Cluster : private sys::Runnable, private Cpg::Handler
{
public:
/** Details of a cluster member */
@@ -68,7 +62,7 @@ class Cluster : public framing::FrameHandler,
virtual ~Cluster();
// FIXME aconway 2008-01-29:
- //framing::HandlerUpdater& getHandlerUpdater() { return sessions; }
+ intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -87,7 +81,7 @@ class Cluster : public framing::FrameHandler,
sys::Duration timeout=sys::TIME_INFINITE) const;
/** Send frame to the cluster */
- void handle(framing::AMQFrame&);
+ void send(framing::AMQFrame&, framing::FrameHandler*);
private:
typedef Cpg::Id Id;
@@ -122,6 +116,7 @@ class Cluster : public framing::FrameHandler,
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
+ intrusive_ptr<broker::SessionManager::Observer> observer;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index e6b5f1a0bd..ceafa389b0 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -69,7 +69,7 @@ struct ClusterPlugin : public Plugin {
cluster = boost::in_place(options.name,
options.getUrl(broker->getPort()),
boost::ref(*broker));
- // FIXME aconway 2008-02-01: Add observer.
+ broker->getSessionManager().add(cluster->getObserver());
}
}
};
diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp
deleted file mode 100644
index b448128620..0000000000
--- a/cpp/src/tests/Cluster.cpp
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "Cluster.h"
-#include "test_tools.h"
-
-#include "qpid/framing/SessionOpenBody.h"
-#include "qpid/framing/SessionAttachedBody.h"
-#include "qpid/framing/all_method_bodies.h"
-#include "qpid/cluster/ClassifierHandler.h"
-
-#include "unit_test.h"
-
-#include <sys/wait.h>
-
-QPID_AUTO_TEST_SUITE(ClusterTestSuite)
-
-static const ProtocolVersion VER;
-
-/** Verify membership in a cluster with one member. */
-BOOST_AUTO_TEST_CASE(testClusterOne) {
- TestCluster cluster("clusterOne", "amqp:one:1");
- AMQFrame send(in_place<SessionOpenBody>(VER));
- send.setChannel(1);
- cluster.handle(send);
- AMQFrame received = cluster.received.pop();
- BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *received.getBody());
- BOOST_CHECK_EQUAL(1u, cluster.size());
- Cluster::MemberList members = cluster.getMembers();
- BOOST_CHECK_EQUAL(1u, members.size());
- Cluster::Member me=members.front();
- BOOST_REQUIRE_EQUAL(me.url, "amqp:one:1");
-}
-
-/** Fork a process to test a cluster with two members */
-BOOST_AUTO_TEST_CASE(testClusterTwo) {
- bool nofork=getenv("NOFORK") != 0;
- pid_t pid=0;
- if (!nofork) {
- pid = fork();
- BOOST_REQUIRE(pid >= 0);
- }
- if (pid || nofork) { // Parent
- BOOST_MESSAGE("Parent start");
- TestCluster cluster("clusterTwo", "amqp:parent:1");
- BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
-
- // Exchange frames with child.
- AMQFrame send(SessionOpenBody(VER));
- send.setChannel(1);
- cluster.handle(send);
- AMQFrame received = cluster.received.pop();
- BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *received.getBody());
-
- received=cluster.received.pop();
- BOOST_CHECK_TYPEID_EQUAL(SessionAttachedBody, *received.getBody());
-
- if (!nofork) {
- // Wait for child to exit.
- int status;
- BOOST_CHECK_EQUAL(::wait(&status), pid);
- BOOST_CHECK_EQUAL(0, status);
- BOOST_CHECK(cluster.waitFor(1));
- BOOST_CHECK_EQUAL(1u, cluster.size());
- }
- }
- else { // Child
- BOOST_REQUIRE(execl("./Cluster_child", "./Cluster_child", NULL));
- }
-}
-
-struct CountHandler : public FrameHandler {
- CountHandler() : count(0) {}
- void handle(AMQFrame&) { count++; }
- size_t count;
-};
-
-/** Test the ClassifierHandler */
-BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) {
- AMQFrame queueDecl(in_place<QueueDeclareBody>(VER));
- AMQFrame messageTrans(in_place<MessageTransferBody>(VER));
- CountHandler wiring;
- CountHandler other;
-
- ClassifierHandler classify(wiring, other);
-
- classify.handle(queueDecl);
- BOOST_CHECK_EQUAL(1u, wiring.count);
- BOOST_CHECK_EQUAL(0u, other.count);
-
- classify.handle(messageTrans);
- BOOST_CHECK_EQUAL(1u, wiring.count);
- BOOST_CHECK_EQUAL(1u, other.count);
-}
-
-QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h
deleted file mode 100644
index 6ff5c21fdb..0000000000
--- a/cpp/src/tests/Cluster.h
+++ /dev/null
@@ -1,73 +0,0 @@
-#ifndef CLUSTER_H
-#define CLUSTER_H
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/cluster/Cluster.h"
-#include "qpid/sys/BlockingQueue.h"
-#include "qpid/framing/AMQFrame.h"
-
-#include <boost/bind.hpp>
-#include <boost/test/test_tools.hpp>
-
-#include <iostream>
-#include <functional>
-
-/**
- * Definitions for the Cluster.cpp and Cluster_child.cpp child program.
- */
-
-// using namespace in header file is bad manners, but this is strictly for
-// the tests.
-using namespace std;
-using namespace qpid;
-using namespace qpid::cluster;
-using namespace qpid::framing;
-using namespace qpid::sys;
-using namespace boost;
-
-void null_deleter(void*) {}
-
-template <class T>
-class TestHandler : public Handler<T&>, public BlockingQueue<T>
-{
- public:
- void handle(T& frame) { push(frame); }
-};
-
-typedef TestHandler<AMQFrame> TestFrameHandler;
-
-struct TestCluster : public Cluster
-{
- TestCluster(string name, string url)
- : Cluster(name, url, *(qpid::broker::Broker*)0) {}
-
- /** Wait for cluster to be of size n. */
- bool waitFor(size_t n) {
- BOOST_CHECKPOINT("About to call Cluster::wait");
- return wait(boost::bind(
- equal_to<size_t>(), bind(&Cluster::size,this), n));
- }
-
- TestFrameHandler received;
-};
-
-
-
-#endif /*!CLUSTER_H*/
diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp
deleted file mode 100644
index 75591bb68f..0000000000
--- a/cpp/src/tests/Cluster_child.cpp
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-// Child process for the Cluster test suite multi-process tests.
-
-#include "Cluster.h"
-#include "test_tools.h"
-#include "qpid/framing/SessionOpenBody.h"
-#include "qpid/framing/SessionAttachedBody.h"
-
-using namespace std;
-using namespace qpid;
-using namespace qpid::cluster;
-using namespace qpid::framing;
-using namespace qpid::sys;
-using namespace qpid::log;
-
-static const ProtocolVersion VER;
-
-/** Child part of Cluster::clusterTwo test */
-void clusterTwo() {
- TestCluster cluster("clusterTwo", "amqp:child:2");
- AMQFrame frame = cluster.received.pop(frame); // Frame from parent.
- BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *frame.getBody());
- BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
-
- AMQFrame send(in_place<SessionAttachedBody>(VER));
- send.setChannel(1);
- cluster.handle(send);
- BOOST_REQUIRE(cluster.received.waitPop(frame));
- BOOST_CHECK_TYPEID_EQUAL(SessionAttachedBody, *frame.getBody());
-}
-
-int test_main(int, char**) {
- clusterTwo();
- return 0;
-}
-
diff --git a/cpp/src/tests/Cpg.cpp b/cpp/src/tests/Cpg.cpp
index fddfe9ef05..e8339bf773 100644
--- a/cpp/src/tests/Cpg.cpp
+++ b/cpp/src/tests/Cpg.cpp
@@ -78,12 +78,16 @@ struct Callback : public Cpg::Handler {
cpg_handle_t /*handle*/,
struct cpg_name *grp,
struct cpg_address */*members*/, int nMembers,
- struct cpg_address */*left*/, int /*nLeft*/,
- struct cpg_address */*joined*/, int /*nJoined*/
+ struct cpg_address */*left*/, int nLeft,
+ struct cpg_address */*joined*/, int nJoined
)
{
BOOST_CHECK_EQUAL(group, Cpg::str(*grp));
configChanges.push_back(nMembers);
+ BOOST_MESSAGE("configChange: "<<
+ nLeft<<" left "<<
+ nJoined<<" joined "<<
+ nMembers<<" members.");
}
};
diff --git a/cpp/src/tests/ais_check b/cpp/src/tests/ais_check
index ce3bbe1b1c..ae0edf88c1 100755
--- a/cpp/src/tests/ais_check
+++ b/cpp/src/tests/ais_check
@@ -2,10 +2,12 @@
# Check for requirements, run AIS tests if found.
#
-test `id -ng` = "ais" || BADGROUP=yes
-{ ps -u root | grep aisexec >/dev/null; } || NOAISEXEC=yes
+id -nG | grep '\<ais\>' || \
+ NOGROUP="You are not a member of the ais group."
+ps -u root | grep aisexec >/dev/null || \
+ NOAISEXEC="The aisexec daemon is not running as root"
-if test -n "$BADGROUP" -o -n "$NOAISEXEC"; then
+if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
cat <<EOF
=========== WARNING: NOT RUNNING AIS TESTS ==============
@@ -13,18 +15,8 @@ if test -n "$BADGROUP" -o -n "$NOAISEXEC"; then
Tests that depend on the openais library (used for clustering)
will not be run because:
-EOF
- test -n "$BADGROUP" && cat <<EOF
- You do not appear to have you group ID set to "ais". Make ais your
- primary group, or run "newgrp ais" before running the tests.
-
-EOF
- test -n "$NOAISEXEC" && cat <<EOF
- The aisexec daemon is not running. Make sure /etc/ais/openais.conf
- is a valid configuration and aisexec is run by root.
-EOF
-
- cat <<EOF
+ $NOGROUP
+ $NOAISEXEC
==========================================================
@@ -32,8 +24,4 @@ EOF
exit 0; # A warning, not a failure.
fi
-FAILED=0
-for test in `cat ais_tests`; do
- ./$test || FAILED=`expr $FAILED + 1`
-done
-exit $FAILED
+echo ./ais_run | newgrp ais
diff --git a/cpp/src/tests/ais_run b/cpp/src/tests/ais_run
new file mode 100755
index 0000000000..0f45edc39c
--- /dev/null
+++ b/cpp/src/tests/ais_run
@@ -0,0 +1,15 @@
+#!/bin/sh
+#
+# Run AIS tests, assumes that ais_check has passed and we are
+# running with the ais group ID.
+#
+
+# FIXME aconway 2008-01-30: we should valgrind the cluster brokers.
+
+srcdir=`dirname $0`
+$srcdir/start_cluster 4
+./ais_test
+ret=$?
+$srcdir/stop_cluster
+exit $ret
+
diff --git a/cpp/src/tests/ais_test.cpp b/cpp/src/tests/ais_test.cpp
new file mode 100644
index 0000000000..00c61242e4
--- /dev/null
+++ b/cpp/src/tests/ais_test.cpp
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Defines test_main function to link with actual unit test code.
+#define BOOST_AUTO_TEST_MAIN // Boost 1.33
+#define BOOST_TEST_MAIN
+#include "unit_test.h"
+
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index a6f7fa90b4..3db0604e2c 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -1,55 +1,20 @@
-# FIXME aconway 2007-08-31: Disabled cluster compilation,
-# has not been kept up to date with recent commits.
+if CLUSTER
+#
+# Cluster tests makefile fragment, to be included in Makefile.am
#
-# if CLUSTER
-# # Cluster tests makefile fragment, to be included in Makefile.am
-# #
+lib_cluster = $(abs_builddir)/../libqpidcluster.la
-# lib_cluster = $(abs_builddir)/../libqpidcluster.la
-
-# # NOTE: Programs using the openais library must be run with gid=ais
-# # You should do "newgrp ais" before running the tests to run these.
-# #
-
-# #
-# # Cluster tests.
-# #
-
-# # ais_check runs ais if the conditions to run AIS tests
-# # are met, otherwise it prints a warning.
-# TESTS+=ais_check
-# EXTRA_DIST+=ais_check
-# AIS_TESTS=
-
-# ais_check: ais_tests
-# ais_tests:
-# echo $(AIS_TESTS)
-# echo "# AIS tests" >$@
-# for t in $(AIS_TESTS); do echo ./$$t >$@; done
-# chmod a+x $@
-
-# CLEANFILES+=ais_tests
-
-# AIS_TESTS+=Cpg
-# check_PROGRAMS+=Cpg
-# Cpg_SOURCES=Cpg.cpp
-# Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
-
-# # TODO aconway 2007-07-26: Fix this test.
-# #AIS_TESTS+=Cluster
-# # check_PROGRAMS+=Cluster
-# # Cluster_SOURCES=Cluster.cpp Cluster.h
-# # Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
+# NOTE: Programs using the openais library must be run with gid=ais
+# You should do "newgrp ais" before running the tests to run these.
+#
-# check_PROGRAMS+=Cluster_child
-# Cluster_child_SOURCES=Cluster_child.cpp Cluster.h
-# Cluster_child_LDADD=$(lib_cluster) -lboost_test_exec_monitor
+# ais_check checks conditions for AIS tests and runs if ok.
+TESTS+=ais_check
+EXTRA_DIST+=ais_check ais_run
-# # TODO aconway 2007-07-03: In progress
-# #AIS_TESTS+=cluster_client
-# check_PROGRAMS+=cluster_client
-# cluster_client_SOURCES=cluster_client.cpp
-# cluster_client_LDADD=$(lib_client) -lboost_unit_test_framework
+check_PROGRAMS+=ais_test
+ais_test_SOURCES=ais_test.cpp Cpg.cpp
+ais_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
-# endif
+endif
diff --git a/cpp/src/tests/cluster_client.cpp b/cpp/src/tests/cluster_client.cpp
index c74d7329f0..30b7e38801 100644
--- a/cpp/src/tests/cluster_client.cpp
+++ b/cpp/src/tests/cluster_client.cpp
@@ -16,21 +16,25 @@
*
*/
-#include "qpid/client/Connection.h"
-#include "qpid/shared_ptr.h"
-
#include "unit_test.h"
+#include "BrokerFixture.h"
+#include "qpid/client/Session.h"
#include <fstream>
#include <vector>
#include <functional>
-
QPID_AUTO_TEST_SUITE(cluster_clientTestSuite)
-using namespace std;
using namespace qpid;
using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::client::arg;
+using framing::TransferContent;
+using std::vector;
+using std::string;
+using std::ifstream;
+using std::ws;
struct ClusterConnections : public vector<shared_ptr<Connection> > {
ClusterConnections() {
@@ -58,25 +62,23 @@ BOOST_AUTO_TEST_CASE(testWiringReplication) {
ClusterConnections cluster;
BOOST_REQUIRE(cluster.size() > 1);
- Exchange fooEx("FooEx", Exchange::TOPIC_EXCHANGE);
- Queue fooQ("FooQ");
-
- Channel broker0;
- cluster[0]->openChannel(broker0);
- broker0.declareExchange(fooEx);
- broker0.declareQueue(fooQ);
- broker0.bind(fooEx, fooQ, "FooKey");
+ Session broker0 = cluster[0]->newSession();
+ broker0.exchangeDeclare(exchange="ex");
+ broker0.queueDeclare(queue="q");
+ broker0.queueBind(exchange="ex", queue="q", routingKey="key");
broker0.close();
for (size_t i = 1; i < cluster.size(); ++i) {
- Channel ch;
- cluster[i]->openChannel(ch);
- ch.publish(Message("hello"), fooEx, "FooKey");
- Message m;
- BOOST_REQUIRE(ch.get(m, fooQ));
- BOOST_REQUIRE_EQUAL(m.getData(), "hello");
- ch.close();
- }
+ Session s = cluster[i]->newSession();
+ s.messageTransfer(content=TransferContent("data", "key", "ex"));
+ s.messageSubscribe(queue="q", destination="q");
+ s.messageFlow(destination="q", unit=0, value=1);//messages
+ FrameSet::shared_ptr msg = s.get();
+ BOOST_CHECK(msg->isA<MessageTransferBody>());
+ BOOST_CHECK_EQUAL(string("data"), msg->getContent());
+ s.getExecution().completed(msg->getId(), true, true);
+ cluster[i]->close();
+ }
}
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster
index 5992a75d8d..03fb671bdf 100755
--- a/cpp/src/tests/start_cluster
+++ b/cpp/src/tests/start_cluster
@@ -12,7 +12,7 @@ shift
OPTS=$*
CLUSTER=`whoami` # Cluster name=user name, avoid clashes.
for (( i=0; i<SIZE; ++i )); do
- PORT=`../qpidd -dp0 --log-output=cluster$i.log --cluster $CLUSTER $OPTS` || exit 1
+ PORT=`../qpidd --load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log --cluster-name $CLUSTER $OPTS` || exit 1
echo $PORT >> cluster.ports
done