diff options
author | Alan Conway <aconway@apache.org> | 2008-11-26 17:37:16 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-11-26 17:37:16 +0000 |
commit | 43e26ecb7cdb04cf7a9c7e87fa7902b7ebe3f5ce (patch) | |
tree | 604fadb84ec6df689ca25b7e4ae7f2edf78f71b8 /cpp | |
parent | 300063322dd80c0dee30475de494afdb6a846d6a (diff) | |
download | qpid-python-43e26ecb7cdb04cf7a9c7e87fa7902b7ebe3f5ce.tar.gz |
Cluster.cpp: Fixed last-node-standing logic, better logging.
Shlib.cpp: added file name to errors messages.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@720924 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 142 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Shlib.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 4 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 1 | ||||
-rwxr-xr-x | cpp/src/tests/perfdist | 8 |
6 files changed, 84 insertions, 76 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d08e06c863..7db4e1a3b9 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -102,7 +102,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b mcastId(0), mgmtObject(0), state(INIT), - lastSize(1) + lastSize(0), + lastBroker(false) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ @@ -115,7 +116,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b failoverExchange.reset(new FailoverExchange(this)); cpgDispatchHandle.startWatch(poller); deliverQueue.start(); - QPID_LOG(notice, *this << " joining cluster " << name.str()); + QPID_LOG(notice, *this << " joining cluster " << name.str() << " with url=" << myUrl); if (useQuorum) quorum.init(); cpg.join(name); broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety. @@ -198,9 +199,8 @@ void Cluster::leave() { void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; - if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); QPID_LOG(notice, *this << " leaving cluster " << name.str()); - + if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); if (!deliverQueue.isStopped()) deliverQueue.stop(); try { cpg.leave(name); } catch (const std::exception& e) { @@ -258,47 +258,48 @@ void Cluster::deliver(const Event& e, Lock&) { deliverQueue.push(e); // Otherwise enqueue for processing. } +// Entry point: called when deliverQueue has events to process. void Cluster::delivered(const Event& e) { - Lock l(lock); - delivered(e,l); + try { + Lock l(lock); + delivered(e,l); + } catch (const std::exception& e) { + QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); + leave(); + } + } void Cluster::delivered(const Event& e, Lock& l) { - try { - Buffer buf(e); - AMQFrame frame; - if (e.isCluster()) { - while (frame.decode(buf)) { - QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); - ClusterDispatcher dispatch(*this, e.getMemberId(), l); - if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) - throw Exception(QPID_MSG("Invalid cluster control")); - } + Buffer buf(e); + AMQFrame frame; + if (e.isCluster()) { + while (frame.decode(buf)) { + QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); + ClusterDispatcher dispatch(*this, e.getMemberId(), l); + if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) + throw Exception(QPID_MSG("Invalid cluster control")); } - else { // e.isConnection() - if (state == NEWBIE) { - QPID_LOG(trace, *this << " DROP: " << e); - } - else { - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); - if (!connection) return; - if (e.getType() == CONTROL) { - while (frame.decode(buf)) { - QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); - connection->delivered(frame); - } - } - else { - QPID_LOG(trace, *this << " DLVR: " << e); - connection->deliverBuffer(buf); + } + else { // e.isConnection() + if (state == NEWBIE) { + QPID_LOG(trace, *this << " DROP: " << e); + } + else { + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); + if (!connection) return; + if (e.getType() == CONTROL) { + while (frame.decode(buf)) { + QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); + connection->delivered(frame); } } + else { + QPID_LOG(trace, *this << " DLVR: " << e); + connection->deliverBuffer(buf); + } } } - catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster delivered: " << e.what()); - leave(l); - } } struct AddrList { @@ -328,23 +329,24 @@ ostream& operator<<(ostream& o, const AddrList& a) { return o << a.suffix; } +// Entry point: called by IO to dispatch CPG events. void Cluster::dispatch(sys::DispatchHandle& h) { try { cpg.dispatchAll(); h.rewatch(); - } - catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster deliver: " << e.what()); + } catch (const std::exception& e) { + QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what()); leave(); } } +// Entry point: called if disconnected from CPG. void Cluster::disconnect(sys::DispatchHandle& ) { - QPID_LOG(critical, *this << " disconnected from cluster, shutting down"); + QPID_LOG(critical, *this << " error disconnected from cluster"); broker.shutdown(); } -void Cluster::configChange ( +void Cluster::configChange ( cpg_handle_t /*handle*/, cpg_name */*group*/, cpg_address *current, int nCurrent, @@ -372,16 +374,16 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& if (state == INIT) { // First configChange if (map.aliveCount() == 1) { setClusterId(true); - QPID_LOG(info, *this << " first in cluster at " << myUrl); state = READY; + QPID_LOG(notice, *this << " first in cluster"); if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); map = ClusterMap(myId, myUrl, true); memberUpdate(l); } else { // Joining established group. state = NEWBIE; + QPID_LOG(info, *this << " request state dump"); mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l); - QPID_LOG(debug, *this << " send dump-request " << myUrl); } } else if (state >= READY && memberChange) @@ -394,7 +396,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& void Cluster::tryMakeOffer(const MemberId& id, Lock& l) { if (state == READY && map.isNewbie(id)) { state = OFFER; - QPID_LOG(debug, *this << " send dump-offer to " << id); + QPID_LOG(info, *this << " send dump-offer to " << id); mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), l); } } @@ -424,8 +426,8 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { if (map.ready(id, Url(url))) memberUpdate(l); if (state == CATCHUP && id == myId) { - QPID_LOG(debug, *this << " caught-up, going to ready mode."); state = READY; + QPID_LOG(notice, *this << " caught up"); if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l))); mcastQueue.clear(); @@ -442,16 +444,16 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& dumpStart(myId, dumpee, url->str(), l); } else { // Another offer was first. - QPID_LOG(debug, *this << " cancel dump offer to " << dumpee); state = READY; + QPID_LOG(info, *this << " cancelled dump offer to " << dumpee); tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer. } } else if (dumpee == myId && url) { assert(state == NEWBIE); - QPID_LOG(debug, *this << " accepted dump-offer from " << dumper); setClusterId(uuid); state = DUMPEE; + QPID_LOG(info, *this << " receiving dump from " << dumper); deliverQueue.stop(); checkDumpIn(l); } @@ -465,8 +467,8 @@ void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& Url url(urlStr); assert(state == OFFER); state = DUMPER; + QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << urlStr); deliverQueue.stop(); - QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr); if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. dumpThread = Thread( new DumpClient(myId, dumpee, url, broker, map, getConnections(l), @@ -484,10 +486,10 @@ void Cluster::checkDumpIn(Lock& l) { if (state == LEFT) return; if (state == DUMPEE && dumpedMap) { map = *dumpedMap; - QPID_LOG(debug, *this << " incoming dump complete, start catchup. map=" << map); mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l); // Don't flush the mcast queue till we are READY, on self-deliver. state = CATCHUP; + QPID_LOG(info, *this << " received dump, starting catch-up"); deliverQueue.start(); } } @@ -498,16 +500,16 @@ void Cluster::dumpOutDone() { } void Cluster::dumpOutDone(Lock& l) { - QPID_LOG(debug, *this << " finished sending dump."); assert(state == DUMPER); state = READY; + QPID_LOG(info, *this << " sent dump"); deliverQueue.start(); tryMakeOffer(map.firstNewbie(), l); // Try another offer } void Cluster::dumpOutError(const std::exception& e) { Monitor::ScopedLock l(lock); - QPID_LOG(error, *this << " error sending state dump: " << e.what()); + QPID_LOG(error, *this << " error sending dump: " << e.what()); dumpOutDone(l); } @@ -529,9 +531,9 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string return Manageable::STATUS_OK; } -void Cluster::stopClusterNode(Lock&) { +void Cluster::stopClusterNode(Lock& l) { QPID_LOG(notice, *this << " stopped by admin"); - leave(); + leave(l); } void Cluster::stopFullCluster(Lock& l) { @@ -541,27 +543,27 @@ void Cluster::stopFullCluster(Lock& l) { void Cluster::memberUpdate(Lock& l) { QPID_LOG(debug, *this << " member update, map=" << map); - std::vector<Url> vectUrl = getUrls(l); - size_t size = vectUrl.size(); - - failoverExchange->setUrls(vectUrl); + std::vector<Url> urls = getUrls(l); + size_t size = urls.size(); + failoverExchange->setUrls(urls); + + if (size == 1 && lastSize > 1 && state >= READY) { + QPID_LOG(info, *this << " last broker standing, update queue policies"); + lastBroker = true; + broker.getQueues().updateQueueClusterState(true); + } + else if (size > 1 && lastBroker) { + QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); + lastBroker = false; + broker.getQueues().updateQueueClusterState(false); + } + lastSize = size; if (mgmtObject) { - - if (lastSize != size && size == 1){ - QPID_LOG(info, *this << " last node standing, updating queue policies."); - broker.getQueues().updateQueueClusterState(true); - } - else if (lastSize != size && size > 1) { - QPID_LOG(info, *this << " recovered from last node standing, updating queue policies, size:" << size); - broker.getQueues().updateQueueClusterState(false); - } - lastSize = size; - mgmtObject->set_clusterSize(size); string urlstr; - for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) { - if (iter != vectUrl.begin()) urlstr += "\n"; + for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) { + if (iter != urls.begin()) urlstr += "\n"; urlstr += iter->str(); } mgmtObject->set_members(urlstr); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 2a659be2f1..d8be3f101d 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -213,6 +213,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { boost::optional<ClusterMap> dumpedMap; size_t lastSize; + bool lastBroker; boost::shared_ptr<FailoverExchange> failoverExchange; Quorum quorum; diff --git a/cpp/src/qpid/sys/posix/Shlib.cpp b/cpp/src/qpid/sys/posix/Shlib.cpp index 1552aa06b5..62dbfb3dd9 100644 --- a/cpp/src/qpid/sys/posix/Shlib.cpp +++ b/cpp/src/qpid/sys/posix/Shlib.cpp @@ -31,7 +31,7 @@ void Shlib::load(const char* name) { handle = ::dlopen(name, RTLD_NOW); const char* error = ::dlerror(); if (error) { - throw Exception(QPID_MSG(error)); + throw Exception(QPID_MSG(error << ": " << name)); } } @@ -52,7 +52,7 @@ void* Shlib::getSymbol(const char* name) { void* sym = ::dlsym(handle, name); const char* error = ::dlerror(); if (error) - throw Exception(QPID_MSG(error)); + throw Exception(QPID_MSG(error << ": " << name)); return sym; } diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 78aadf0da0..0883ea1ea7 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -197,6 +197,10 @@ libdlclose_noop_la_SOURCES = dlclose_noop.c CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers) +check_PROGRAMS+=tsxtest +tsxtest_SOURCES=tsxtest.cpp +tsxtest_LDADD=$(lib_client) + # FIXME aconway 2008-05-23: Disabled interop_runner because it uses # the obsolete Channel class. Convert to Session and re-enable. # diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 862860693e..f4a38ae861 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -120,6 +120,7 @@ void ClusterFixture::add() { std::string prefix = os.str(); const char* argv[] = { "qpidd " __FILE__ , + "--no-module-dir", "--load-module=../.libs/cluster.so", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir", diff --git a/cpp/src/tests/perfdist b/cpp/src/tests/perfdist index 4edd9fa6ad..afbee99cd9 100755 --- a/cpp/src/tests/perfdist +++ b/cpp/src/tests/perfdist @@ -10,10 +10,10 @@ cat <<EOF usage: $0 <perftest-args> -- <client-hosts ...> [ --- <broker hosts...> ] Client & broker hosts can also be set in env vars CLIENTS and BROKERS. -Run perftest with clients running on the clients and brokers running -on the specified hosts. Clients are assigned to client hosts round -robin: publishers first, then subscribers. If there are multiple -brokers (for cluster tests) clients connect to them round robin. +Run perftest clients on the client hosts against brokers on the broker +hosts Clients are assigned to client hosts round robin: publishers +first, then subscribers. If there are multiple brokers (for cluster +tests) clients connect to them round robin. Broker hosts can be listed with -b in perftest-args or after --- at the end of the arguments. |