summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-26 17:37:16 +0000
committerAlan Conway <aconway@apache.org>2008-11-26 17:37:16 +0000
commit43e26ecb7cdb04cf7a9c7e87fa7902b7ebe3f5ce (patch)
tree604fadb84ec6df689ca25b7e4ae7f2edf78f71b8 /cpp
parent300063322dd80c0dee30475de494afdb6a846d6a (diff)
downloadqpid-python-43e26ecb7cdb04cf7a9c7e87fa7902b7ebe3f5ce.tar.gz
Cluster.cpp: Fixed last-node-standing logic, better logging.
Shlib.cpp: added file name to errors messages. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@720924 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp142
-rw-r--r--cpp/src/qpid/cluster/Cluster.h1
-rw-r--r--cpp/src/qpid/sys/posix/Shlib.cpp4
-rw-r--r--cpp/src/tests/Makefile.am4
-rw-r--r--cpp/src/tests/cluster_test.cpp1
-rwxr-xr-xcpp/src/tests/perfdist8
6 files changed, 84 insertions, 76 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d08e06c863..7db4e1a3b9 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -102,7 +102,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
mcastId(0),
mgmtObject(0),
state(INIT),
- lastSize(1)
+ lastSize(0),
+ lastBroker(false)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
@@ -115,7 +116,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
failoverExchange.reset(new FailoverExchange(this));
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
- QPID_LOG(notice, *this << " joining cluster " << name.str());
+ QPID_LOG(notice, *this << " joining cluster " << name.str() << " with url=" << myUrl);
if (useQuorum) quorum.init();
cpg.join(name);
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety.
@@ -198,9 +199,8 @@ void Cluster::leave() {
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
- if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
QPID_LOG(notice, *this << " leaving cluster " << name.str());
-
+ if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
if (!deliverQueue.isStopped()) deliverQueue.stop();
try { cpg.leave(name); }
catch (const std::exception& e) {
@@ -258,47 +258,48 @@ void Cluster::deliver(const Event& e, Lock&) {
deliverQueue.push(e); // Otherwise enqueue for processing.
}
+// Entry point: called when deliverQueue has events to process.
void Cluster::delivered(const Event& e) {
- Lock l(lock);
- delivered(e,l);
+ try {
+ Lock l(lock);
+ delivered(e,l);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
+ leave();
+ }
+
}
void Cluster::delivered(const Event& e, Lock& l) {
- try {
- Buffer buf(e);
- AMQFrame frame;
- if (e.isCluster()) {
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
- ClusterDispatcher dispatch(*this, e.getMemberId(), l);
- if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
- throw Exception(QPID_MSG("Invalid cluster control"));
- }
+ Buffer buf(e);
+ AMQFrame frame;
+ if (e.isCluster()) {
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+ ClusterDispatcher dispatch(*this, e.getMemberId(), l);
+ if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
+ throw Exception(QPID_MSG("Invalid cluster control"));
}
- else { // e.isConnection()
- if (state == NEWBIE) {
- QPID_LOG(trace, *this << " DROP: " << e);
- }
- else {
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
- if (!connection) return;
- if (e.getType() == CONTROL) {
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
- connection->delivered(frame);
- }
- }
- else {
- QPID_LOG(trace, *this << " DLVR: " << e);
- connection->deliverBuffer(buf);
+ }
+ else { // e.isConnection()
+ if (state == NEWBIE) {
+ QPID_LOG(trace, *this << " DROP: " << e);
+ }
+ else {
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+ if (!connection) return;
+ if (e.getType() == CONTROL) {
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+ connection->delivered(frame);
}
}
+ else {
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ connection->deliverBuffer(buf);
+ }
}
}
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster delivered: " << e.what());
- leave(l);
- }
}
struct AddrList {
@@ -328,23 +329,24 @@ ostream& operator<<(ostream& o, const AddrList& a) {
return o << a.suffix;
}
+// Entry point: called by IO to dispatch CPG events.
void Cluster::dispatch(sys::DispatchHandle& h) {
try {
cpg.dispatchAll();
h.rewatch();
- }
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster deliver: " << e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
leave();
}
}
+// Entry point: called if disconnected from CPG.
void Cluster::disconnect(sys::DispatchHandle& ) {
- QPID_LOG(critical, *this << " disconnected from cluster, shutting down");
+ QPID_LOG(critical, *this << " error disconnected from cluster");
broker.shutdown();
}
-void Cluster::configChange (
+void Cluster::configChange (
cpg_handle_t /*handle*/,
cpg_name */*group*/,
cpg_address *current, int nCurrent,
@@ -372,16 +374,16 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
setClusterId(true);
- QPID_LOG(info, *this << " first in cluster at " << myUrl);
state = READY;
+ QPID_LOG(notice, *this << " first in cluster");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
map = ClusterMap(myId, myUrl, true);
memberUpdate(l);
}
else { // Joining established group.
state = NEWBIE;
+ QPID_LOG(info, *this << " request state dump");
mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l);
- QPID_LOG(debug, *this << " send dump-request " << myUrl);
}
}
else if (state >= READY && memberChange)
@@ -394,7 +396,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
if (state == READY && map.isNewbie(id)) {
state = OFFER;
- QPID_LOG(debug, *this << " send dump-offer to " << id);
+ QPID_LOG(info, *this << " send dump-offer to " << id);
mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), l);
}
}
@@ -424,8 +426,8 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
if (map.ready(id, Url(url)))
memberUpdate(l);
if (state == CATCHUP && id == myId) {
- QPID_LOG(debug, *this << " caught-up, going to ready mode.");
state = READY;
+ QPID_LOG(notice, *this << " caught up");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
mcastQueue.clear();
@@ -442,16 +444,16 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
dumpStart(myId, dumpee, url->str(), l);
}
else { // Another offer was first.
- QPID_LOG(debug, *this << " cancel dump offer to " << dumpee);
state = READY;
+ QPID_LOG(info, *this << " cancelled dump offer to " << dumpee);
tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
}
}
else if (dumpee == myId && url) {
assert(state == NEWBIE);
- QPID_LOG(debug, *this << " accepted dump-offer from " << dumper);
setClusterId(uuid);
state = DUMPEE;
+ QPID_LOG(info, *this << " receiving dump from " << dumper);
deliverQueue.stop();
checkDumpIn(l);
}
@@ -465,8 +467,8 @@ void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string&
Url url(urlStr);
assert(state == OFFER);
state = DUMPER;
+ QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << urlStr);
deliverQueue.stop();
- QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr);
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(
new DumpClient(myId, dumpee, url, broker, map, getConnections(l),
@@ -484,10 +486,10 @@ void Cluster::checkDumpIn(Lock& l) {
if (state == LEFT) return;
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
- QPID_LOG(debug, *this << " incoming dump complete, start catchup. map=" << map);
mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l);
// Don't flush the mcast queue till we are READY, on self-deliver.
state = CATCHUP;
+ QPID_LOG(info, *this << " received dump, starting catch-up");
deliverQueue.start();
}
}
@@ -498,16 +500,16 @@ void Cluster::dumpOutDone() {
}
void Cluster::dumpOutDone(Lock& l) {
- QPID_LOG(debug, *this << " finished sending dump.");
assert(state == DUMPER);
state = READY;
+ QPID_LOG(info, *this << " sent dump");
deliverQueue.start();
tryMakeOffer(map.firstNewbie(), l); // Try another offer
}
void Cluster::dumpOutError(const std::exception& e) {
Monitor::ScopedLock l(lock);
- QPID_LOG(error, *this << " error sending state dump: " << e.what());
+ QPID_LOG(error, *this << " error sending dump: " << e.what());
dumpOutDone(l);
}
@@ -529,9 +531,9 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string
return Manageable::STATUS_OK;
}
-void Cluster::stopClusterNode(Lock&) {
+void Cluster::stopClusterNode(Lock& l) {
QPID_LOG(notice, *this << " stopped by admin");
- leave();
+ leave(l);
}
void Cluster::stopFullCluster(Lock& l) {
@@ -541,27 +543,27 @@ void Cluster::stopFullCluster(Lock& l) {
void Cluster::memberUpdate(Lock& l) {
QPID_LOG(debug, *this << " member update, map=" << map);
- std::vector<Url> vectUrl = getUrls(l);
- size_t size = vectUrl.size();
-
- failoverExchange->setUrls(vectUrl);
+ std::vector<Url> urls = getUrls(l);
+ size_t size = urls.size();
+ failoverExchange->setUrls(urls);
+
+ if (size == 1 && lastSize > 1 && state >= READY) {
+ QPID_LOG(info, *this << " last broker standing, update queue policies");
+ lastBroker = true;
+ broker.getQueues().updateQueueClusterState(true);
+ }
+ else if (size > 1 && lastBroker) {
+ QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+ lastBroker = false;
+ broker.getQueues().updateQueueClusterState(false);
+ }
+ lastSize = size;
if (mgmtObject) {
-
- if (lastSize != size && size == 1){
- QPID_LOG(info, *this << " last node standing, updating queue policies.");
- broker.getQueues().updateQueueClusterState(true);
- }
- else if (lastSize != size && size > 1) {
- QPID_LOG(info, *this << " recovered from last node standing, updating queue policies, size:" << size);
- broker.getQueues().updateQueueClusterState(false);
- }
- lastSize = size;
-
mgmtObject->set_clusterSize(size);
string urlstr;
- for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
- if (iter != vectUrl.begin()) urlstr += "\n";
+ for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
+ if (iter != urls.begin()) urlstr += "\n";
urlstr += iter->str();
}
mgmtObject->set_members(urlstr);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 2a659be2f1..d8be3f101d 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -213,6 +213,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::optional<ClusterMap> dumpedMap;
size_t lastSize;
+ bool lastBroker;
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
diff --git a/cpp/src/qpid/sys/posix/Shlib.cpp b/cpp/src/qpid/sys/posix/Shlib.cpp
index 1552aa06b5..62dbfb3dd9 100644
--- a/cpp/src/qpid/sys/posix/Shlib.cpp
+++ b/cpp/src/qpid/sys/posix/Shlib.cpp
@@ -31,7 +31,7 @@ void Shlib::load(const char* name) {
handle = ::dlopen(name, RTLD_NOW);
const char* error = ::dlerror();
if (error) {
- throw Exception(QPID_MSG(error));
+ throw Exception(QPID_MSG(error << ": " << name));
}
}
@@ -52,7 +52,7 @@ void* Shlib::getSymbol(const char* name) {
void* sym = ::dlsym(handle, name);
const char* error = ::dlerror();
if (error)
- throw Exception(QPID_MSG(error));
+ throw Exception(QPID_MSG(error << ": " << name));
return sym;
}
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 78aadf0da0..0883ea1ea7 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -197,6 +197,10 @@ libdlclose_noop_la_SOURCES = dlclose_noop.c
CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers)
+check_PROGRAMS+=tsxtest
+tsxtest_SOURCES=tsxtest.cpp
+tsxtest_LDADD=$(lib_client)
+
# FIXME aconway 2008-05-23: Disabled interop_runner because it uses
# the obsolete Channel class. Convert to Session and re-enable.
#
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 862860693e..f4a38ae861 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -120,6 +120,7 @@ void ClusterFixture::add() {
std::string prefix = os.str();
const char* argv[] = {
"qpidd " __FILE__ ,
+ "--no-module-dir",
"--load-module=../.libs/cluster.so",
"--cluster-name", name.c_str(),
"--auth=no", "--no-data-dir",
diff --git a/cpp/src/tests/perfdist b/cpp/src/tests/perfdist
index 4edd9fa6ad..afbee99cd9 100755
--- a/cpp/src/tests/perfdist
+++ b/cpp/src/tests/perfdist
@@ -10,10 +10,10 @@ cat <<EOF
usage: $0 <perftest-args> -- <client-hosts ...> [ --- <broker hosts...> ]
Client & broker hosts can also be set in env vars CLIENTS and BROKERS.
-Run perftest with clients running on the clients and brokers running
-on the specified hosts. Clients are assigned to client hosts round
-robin: publishers first, then subscribers. If there are multiple
-brokers (for cluster tests) clients connect to them round robin.
+Run perftest clients on the client hosts against brokers on the broker
+hosts Clients are assigned to client hosts round robin: publishers
+first, then subscribers. If there are multiple brokers (for cluster
+tests) clients connect to them round robin.
Broker hosts can be listed with -b in perftest-args or after ---
at the end of the arguments.