summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-11 13:26:10 +0000
committerAlan Conway <aconway@apache.org>2008-09-11 13:26:10 +0000
commit6c24486bbc4ffcf8c70e7d0fbac846512c83b440 (patch)
treebf7543286e7b3ff90810b01b77dacc18d3dfb638 /cpp/src
parentb55c7f2d8fa39b358b8a7fbf400db5fbc71335dd (diff)
downloadqpid-python-6c24486bbc4ffcf8c70e7d0fbac846512c83b440.tar.gz
Moved PollableCondition, PollableQueue and to sys. Fixed cluster shutdown issues.
sys/PollableCondition: is a generic mechansim to poll for non-IO events in the Poller. sys/PollableQueue: is a thread-safe queue template that can be dispatched from the Poller when there are items on the queue. It uses PollableCondition. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694243 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am3
-rw-r--r--cpp/src/cluster.mk3
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp2
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp66
-rw-r--r--cpp/src/qpid/cluster/Cluster.h19
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp9
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp16
-rw-r--r--cpp/src/qpid/cluster/Connection.h19
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp8
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.h3
-rw-r--r--cpp/src/qpid/sys/PollableCondition.cpp (renamed from cpp/src/qpid/cluster/PollableCondition.cpp)12
-rw-r--r--cpp/src/qpid/sys/PollableCondition.h (renamed from cpp/src/qpid/cluster/PollableCondition.h)4
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h (renamed from cpp/src/qpid/cluster/PollableQueue.h)12
-rw-r--r--cpp/src/tests/BrokerFixture.h3
-rw-r--r--cpp/src/tests/cluster_test.cpp73
16 files changed, 151 insertions, 103 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 7c02516575..98dec2b12d 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -264,6 +264,9 @@ libqpidcommon_la_SOURCES = \
qpid/sys/AggregateOutput.cpp \
qpid/sys/AsynchIOHandler.cpp \
qpid/sys/Dispatcher.cpp \
+ qpid/sys/PollableCondition.h \
+ qpid/sys/PollableCondition.cpp \
+ qpid/sys/PollableQueue.h \
qpid/sys/Runnable.cpp \
qpid/sys/SystemInfo.cpp \
qpid/sys/Shlib.cpp \
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index bb9546f387..f02e5e1644 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -18,9 +18,6 @@ cluster_la_SOURCES = \
qpid/cluster/Connection.h \
qpid/cluster/Connection.cpp \
qpid/cluster/NoOpConnectionOutputHandler.h \
- qpid/cluster/PollableCondition.h \
- qpid/cluster/PollableCondition.cpp \
- qpid/cluster/PollableQueue.h \
qpid/cluster/WriteEstimate.h \
qpid/cluster/WriteEstimate.cpp \
qpid/cluster/OutputInterceptor.h \
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 913188845f..027f8a212d 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -70,8 +70,6 @@ SessionState::SessionState(
}
SessionState::~SessionState() {
- // Remove ID from active session list.
- broker.getSessionManager().forget(getId());
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
}
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index b736d116e1..7b1cacb640 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -649,7 +649,7 @@ void SessionImpl::checkOpen() const //call with lock held.
{
check();
if (state != ATTACHED) {
- throw NotAttachedException("Session isn't attached");
+ throw NotAttachedException(QPID_MSG("Session " << getId() << " isn't attached"));
}
}
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index ce156e85e4..07ed4596e0 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -61,7 +61,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
- broker(&b),
+ broker(b),
poller(b.getPoller()),
cpg(*this),
name(name_),
@@ -74,15 +74,17 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
),
deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1)))
{
- broker->addFinalizer(boost::bind(&Cluster::leave, this));
- QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
+ QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str());
+ broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
cpg.join(name);
deliverQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
}
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+ QPID_LOG(debug, "~Cluster()");
+}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Mutex::ScopedLock l(lock);
@@ -94,20 +96,13 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
+// FIXME aconway 2008-09-10: leave is currently not called,
+// It should be called if we are shut down by a cluster admin command.
+// Any other type of exit is caught in disconnect().
+//
void Cluster::leave() {
- Mutex::ScopedLock l(lock);
- if (!broker) return; // Already left.
- // Leave is called by from Broker destructor after the poller has
- // been shut down. No dispatches can occur.
-
- QPID_LOG(notice, "Leaving cluster " << name.str());
+ QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str());
cpg.leave(name);
- // broker= is set to 0 when the final config-change is delivered.
- while(broker) {
- Mutex::ScopedUnlock u(lock);
- cpg.dispatchAll();
- }
- cpg.shutdown();
}
template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -177,6 +172,7 @@ void Cluster::deliver(
{
try {
MemberId from(nodeid, pid);
+ QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10:
deliverQueue.push(Event::delivered(from, msg, msg_len));
}
catch (const std::exception& e) {
@@ -238,7 +234,7 @@ void Cluster::configChange(
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined)
{
- QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
+ QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
<< AddrList(joined, nJoined) << AddrList(left, nLeft));
if (nJoined) // Notfiy new members of my URL.
@@ -246,13 +242,14 @@ void Cluster::configChange(
AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())),
ConnectionId(self,0));
-
+ if (find(left, left+nLeft, self) != left+nLeft) {
+ // We have left the group, this is the final config change.
+ QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str());
+ broker.shutdown();
+ }
Mutex::ScopedLock l(lock);
for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
// Add new members when their URL notice arraives.
-
- if (find(left, left+nLeft, self) != left+nLeft)
- broker = 0; // We have left the group, this is the final config change.
lock.notifyAll(); // Threads waiting for membership changes.
}
@@ -261,22 +258,35 @@ void Cluster::dispatch(sys::DispatchHandle& h) {
h.rewatch();
}
-void Cluster::disconnect(sys::DispatchHandle& h) {
- h.stopWatch();
- QPID_LOG(critical, "Disconnected from cluster, shutting down");
- broker->shutdown();
+void Cluster::disconnect(sys::DispatchHandle& ) {
+ // FIXME aconway 2008-09-11: this should be logged as critical,
+ // when we provide admin option to shut down cluster and let
+ // members leave cleanly.
+ QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str());
+ broker.shutdown();
}
void Cluster::joining(const MemberId& m, const string& url) {
- QPID_LOG(notice, "Cluster member " << m << " has URL " << url);
+ QPID_LOG(info, "Cluster member " << m << " has URL " << url);
urls.insert(UrlMap::value_type(m,Url(url)));
}
void Cluster::ready(const MemberId& ) {
// FIXME aconway 2008-09-08: TODO
}
-
-}} // namespace qpid::cluster
+// Called from Broker::~Broker when broker is shut down. At this
+// point we know the poller has stopped so no poller callbacks will be
+// invoked. We must ensure that CPG has also shut down so no CPG
+// callbacks will be invoked.
+//
+void Cluster::shutdown() {
+ QPID_LOG(notice, "Cluster member " << self << " shutting down.");
+ try { cpg.shutdown(); }
+ catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
+ delete this;
+}
+broker::Broker& Cluster::getBroker(){ return broker; }
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index a25b62ea12..3a254684ad 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -21,7 +21,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/Event.h"
-#include "qpid/cluster/PollableQueue.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/cluster/NoOpConnectionOutputHandler.h"
#include "qpid/broker/Broker.h"
@@ -43,7 +43,7 @@ class Connection;
* Connection to the cluster.
* Keeps cluster membership data.
*/
-class Cluster : public RefCounted, private Cpg::Handler
+class Cluster : private Cpg::Handler
{
public:
@@ -78,17 +78,16 @@ class Cluster : public RefCounted, private Cpg::Handler
void joining(const MemberId&, const std::string& url);
void ready(const MemberId&);
- broker::Broker& getBroker() { assert(broker); return *broker; }
-
MemberId getSelf() const { return self; }
+ void shutdown();
+
+ broker::Broker& getBroker();
+
private:
typedef std::map<MemberId, Url> UrlMap;
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
-
- /** Message sent over the cluster. */
- typedef std::pair<framing::AMQFrame, ConnectionId> Message;
- typedef PollableQueue<Event> EventQueue;
+ typedef sys::PollableQueue<Event> EventQueue;
boost::function<void()> shutdownNext;
@@ -127,7 +126,7 @@ class Cluster : public RefCounted, private Cpg::Handler
boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
mutable sys::Monitor lock; // Protect access to members.
- broker::Broker* broker;
+ broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
Cpg::Name name;
@@ -137,7 +136,7 @@ class Cluster : public RefCounted, private Cpg::Handler
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- PollableQueue<Event> deliverQueue;
+ EventQueue deliverQueue;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 31447f2fd0..f4128634a6 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -66,10 +66,10 @@ struct ClusterPlugin : public Plugin {
ClusterValues values;
ClusterOptions options;
- boost::intrusive_ptr<Cluster> cluster;
+ Cluster* cluster;
boost::scoped_ptr<ConnectionCodec::Factory> factory;
- ClusterPlugin() : options(values) {}
+ ClusterPlugin() : options(values), cluster(0) {}
Options* getOptions() { return &options; }
@@ -78,20 +78,17 @@ struct ClusterPlugin : public Plugin {
if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified.
QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin.");
cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker);
- broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
}
void earlyInitialize(Plugin::Target&) {}
-
- void shutdown() { cluster = 0; }
};
static ClusterPlugin instance; // Static initialization.
// For test purposes.
-boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; }
+Cluster& getGlobalCluster() { assert(instance.cluster); return *instance.cluster; }
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 506e982ffd..68d1b16dfa 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -106,5 +106,21 @@ void Connection::deliverBuffer(Buffer& buf) {
deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread.
}
+
+void Connection::sessionState(const SequenceNumber& /*replayStart*/,
+ const SequenceSet& /*sentIncomplete*/,
+ const SequenceNumber& /*expected*/,
+ const SequenceNumber& /*received*/,
+ const SequenceSet& /*unknownCompleted*/,
+ const SequenceSet& /*receivedIncomplete*/)
+{
+ // FIXME aconway 2008-09-10: TODO
+}
+
+void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/)
+{
+ // FIXME aconway 2008-09-10: TODO
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index b3e151ce51..a30350585f 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -40,9 +40,7 @@ namespace framing { class AMQFrame; }
namespace cluster {
-/**
- * Plug-in associated with broker::Connections, both local and shadow.
- */
+/** Intercept broker::Connection calls for shadow and local cluster connections. */
class Connection :
public RefCounted,
public sys::ConnectionInputHandler,
@@ -90,16 +88,13 @@ class Connection :
sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient);
// State dump methods.
- virtual void sessionState(const framing::SequenceNumber& /*replayId*/,
- const framing::SequenceNumber& /*sendId*/,
- const framing::SequenceSet& /*sentIncomplete*/,
- const framing::SequenceNumber& /*expectedId*/,
- const framing::SequenceNumber& /*receivedId*/,
- const framing::SequenceSet& /*unknownCompleted*/,
- const framing::SequenceSet& /*receivedIncomplete*/) {}
+ virtual void sessionState(const SequenceNumber& replayStart,
+ const SequenceSet& sentIncomplete,
+ const SequenceNumber& expected,
+ const SequenceNumber& received,
+ const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
- virtual void shadowReady(uint64_t /*clusterId*/,
- const std::string& /*userId*/) {}
+ virtual void shadowReady(uint64_t memberId, uint64_t connectionId);
private:
void sendDoOutput();
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index f093a0cc1c..6179eab724 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -30,16 +30,16 @@ namespace cluster {
sys::ConnectionCodec*
ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
- if (v == framing::ProtocolVersion(0, 10))
+ if (v == framing::ProtocolVersion(0, 10))
return new ConnectionCodec(out, id, cluster);
return 0;
}
+// FIXME aconway 2008-08-27: outbound connections need to be made
+// with proper qpid::client code for failover, get rid of this
+// broker-side hack.
sys::ConnectionCodec*
ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
- // FIXME aconway 2008-08-27: outbound connections need to be made
- // with proper qpid::client code for failover, get rid of this
- // broker-side hack.
return next->create(out, id);
}
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h
index 59ce20d821..22d752d174 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.h
+++ b/cpp/src/qpid/cluster/ConnectionCodec.h
@@ -50,7 +50,8 @@ class ConnectionCodec : public sys::ConnectionCodec {
struct Factory : public sys::ConnectionCodec::Factory {
boost::shared_ptr<sys::ConnectionCodec::Factory> next;
Cluster& cluster;
- Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) : next(f), cluster(c) {}
+ Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c)
+ : next(f), cluster(c) {}
sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
};
diff --git a/cpp/src/qpid/cluster/PollableCondition.cpp b/cpp/src/qpid/sys/PollableCondition.cpp
index eecf95ff8d..5a3bd583cf 100644
--- a/cpp/src/qpid/cluster/PollableCondition.cpp
+++ b/cpp/src/qpid/sys/PollableCondition.cpp
@@ -27,14 +27,14 @@
//
#include "qpid/sys/posix/PrivatePosix.h"
-#include "qpid/cluster/PollableCondition.h"
+#include "qpid/sys/PollableCondition.h"
#include "qpid/Exception.h"
#include <unistd.h>
#include <fcntl.h>
namespace qpid {
-namespace cluster {
+namespace sys {
PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
int fds[2];
@@ -67,13 +67,13 @@ void PollableCondition::set() {
#if 0
// FIXME aconway 2008-08-12: More efficient Linux implementation using
-// eventfd system call. Do a configure.ac test to enable this when
-// eventfd is available.
+// eventfd system call. Move to separate file & do configure.ac test
+// to enable this when ::eventfd() is available.
#include <sys/eventfd.h>
namespace qpid {
-namespace cluster {
+namespace sys {
PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
impl->fd = ::eventfd(0, 0);
@@ -95,6 +95,6 @@ void PollableCondition::set() {
#endif
-}} // namespace qpid::cluster
+}} // namespace qpid::sys
#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/
diff --git a/cpp/src/qpid/cluster/PollableCondition.h b/cpp/src/qpid/sys/PollableCondition.h
index 6bfca6cabe..6f0e12a474 100644
--- a/cpp/src/qpid/cluster/PollableCondition.h
+++ b/cpp/src/qpid/sys/PollableCondition.h
@@ -29,7 +29,7 @@
//
namespace qpid {
-namespace cluster {
+namespace sys {
/**
* A pollable condition to integrate in-process conditions with IO
@@ -55,6 +55,6 @@ class PollableCondition : public sys::IOHandle {
private:
int writeFd;
};
-}} // namespace qpid::cluster
+}} // namespace qpid::sys
#endif /*!QPID_SYS_POLLABLECONDITION_H*/
diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index 1c7720f5c6..2e5d3a0d3d 100644
--- a/cpp/src/qpid/cluster/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_POLLABLEQUEUE_H
-#define QPID_CLUSTER_POLLABLEQUEUE_H
+#ifndef QPID_SYS_POLLABLEQUEUE_H
+#define QPID_SYS_POLLABLEQUEUE_H
/*
*
@@ -22,7 +22,7 @@
*
*/
-#include "qpid/cluster/PollableCondition.h"
+#include "qpid/sys/PollableCondition.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Mutex.h"
#include <boost/function.hpp>
@@ -34,7 +34,7 @@ namespace qpid {
namespace sys { class Poller; }
-namespace cluster {
+namespace sys {
// FIXME aconway 2008-08-11: this could be of more general interest,
// move to common lib.
@@ -108,6 +108,6 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
batch.clear();
}
-}} // namespace qpid::cluster
+}} // namespace qpid::sys
-#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/
+#endif /*!QPID_SYS_POLLABLEQUEUE_H*/
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index 09cca066ef..4e10f82809 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -92,7 +92,8 @@ struct ClientT {
SessionType session;
qpid::client::SubscriptionManager subs;
qpid::client::LocalQueue lq;
- ClientT(uint16_t port) : connection(port), session(connection.newSession()), subs(session) {}
+ ClientT(uint16_t port, const std::string& name=std::string())
+ : connection(port), session(connection.newSession(name)), subs(session) {}
~ClientT() { connection.close(); }
};
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index d082d74367..871aa0c657 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -21,13 +21,14 @@
#include "ForkedBroker.h"
#include "BrokerFixture.h"
-#include "qpid/cluster/Cpg.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/DumpClient.h"
#include "qpid/framing/AMQBody.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Session.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
@@ -41,7 +42,7 @@
namespace qpid {
namespace cluster {
-boost::intrusive_ptr<Cluster> getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
+Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
}} // namespace qpid::cluster
@@ -81,11 +82,11 @@ ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) {
add(n);
// Wait for all n members to join the cluster
int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up.
- while (retry && getGlobalCluster()->size() != n) {
+ while (retry && getGlobalCluster().size() != n) {
::sleep(1);
--retry;
}
- BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size());
+ BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size());
}
void ClusterFixture::add() {
@@ -135,7 +136,37 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-QPID_AUTO_TEST_CASE(testDumpClient) {
+#if 0 // FIXME aconway 2008-09-10: finish & enable
+QPID_AUTO_TEST_CASE(testDumpConsumers) {
+ ClusterFixture cluster(1);
+ Client a(cluster[0]);
+ a.session.queueDeclare("q");
+ a.subs.subscribe(a.lq, "q");
+
+ cluster.add();
+ Client b(cluster[1]);
+ try {
+ b.connection.newSession(a.session.getId().getName());
+ BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
+ } catch (const SessionBusyException&) {}
+
+ // Transfer some messages to the subscription by client a.
+ Message m;
+ a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "aaa");
+
+ b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bbb");
+
+ // Verify that the queue has been drained on both brokers.
+ // This proves that the consumer was replicated when the second broker joined.
+ BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
+}
+#endif
+
+QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
BrokerFixture donor, receiver;
{
Client c(donor.getPort());
@@ -146,13 +177,13 @@ QPID_AUTO_TEST_CASE(testDumpClient) {
c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args);
c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo");
- c.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("one", "foo"));
+ c.session.messageTransfer(arg::destination="exd", arg::content=Message("one", "foo"));
c.session.exchangeDeclare("ext", arg::type="topic");
c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar");
c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0));
- c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("one", "bar"));
- c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("two", "bar"));
+ c.session.messageTransfer(arg::destination="ext", arg::content=Message("one", "bar"));
+ c.session.messageTransfer(arg::destination="ext", arg::content=Message("two", "bar"));
c.session.close();
c.connection.close();
@@ -202,11 +233,11 @@ QPID_AUTO_TEST_CASE(testDumpClient) {
BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
// Verify bindings
- r.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("xxx", "foo"));
+ r.session.messageTransfer(arg::destination="exd", arg::content=Message("xxx", "foo"));
BOOST_CHECK(r.subs.get(m, "qa"));
BOOST_CHECK_EQUAL(m.getData(), "xxx");
- r.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("yyy", "bar"));
+ r.session.messageTransfer(arg::destination="ext", arg::content=Message("yyy", "bar"));
BOOST_CHECK(r.subs.get(m, "qb"));
BOOST_CHECK_EQUAL(m.getData(), "yyy");
@@ -254,8 +285,8 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
ClusterFixture cluster(2);
Client c0(cluster[0]);
c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
- c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+ c0.session.messageTransfer(arg::content=Message("foo", "q"));
+ c0.session.messageTransfer(arg::content=Message("bar", "q"));
c0.session.close();
Client c1(cluster[1]);
Message msg;
@@ -268,19 +299,19 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
QPID_AUTO_TEST_CASE(testMessageDequeue) {
// Enqueue on one broker, dequeue on two others.
ClusterFixture cluster (3);
- Client c0(cluster[0]);
+ Client c0(cluster[0], "c0");
c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
- c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+ c0.session.messageTransfer(arg::content=Message("foo", "q"));
+ c0.session.messageTransfer(arg::content=Message("bar", "q"));
Message msg;
// Dequeue on 2 others, ensure correct order.
- Client c1(cluster[1]);
+ Client c1(cluster[1], "c1");
BOOST_CHECK(c1.subs.get(msg, "q"));
BOOST_CHECK_EQUAL("foo", msg.getData());
- Client c2(cluster[2]);
+ Client c2(cluster[2], "c2");
BOOST_CHECK(c1.subs.get(msg, "q"));
BOOST_CHECK_EQUAL("bar", msg.getData());
@@ -298,8 +329,8 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
// Now send messages
Client c1(cluster[1]);
- c1.session.messageTransfer(arg::content=TransferContent("foo", "q"));
- c1.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+ c1.session.messageTransfer(arg::content=Message("foo", "q"));
+ c1.session.messageTransfer(arg::content=Message("bar", "q"));
// Check they arrived
Message m;