From 8d4fa85054553e158cc1536506c6328031a7c643 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Tue, 16 Jun 2009 14:57:06 +0000 Subject: Added a new qmf-console example program. This new program illustrates how individual nodes of a broker cluster may be monitored. It connects to all nodes, tracks their connectivity, and queries only one of the connected nodes. This mechanism is used in lieu of cluster-failover which is not appropriate for broker management. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@785243 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/examples/qmf-console/Makefile.am | 6 +- qpid/cpp/examples/qmf-console/cluster-qmon.cpp | 170 +++++++++++++++++++++++++ 2 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 qpid/cpp/examples/qmf-console/cluster-qmon.cpp diff --git a/qpid/cpp/examples/qmf-console/Makefile.am b/qpid/cpp/examples/qmf-console/Makefile.am index 320883e528..94211ec187 100644 --- a/qpid/cpp/examples/qmf-console/Makefile.am +++ b/qpid/cpp/examples/qmf-console/Makefile.am @@ -22,7 +22,7 @@ examplesdir=$(pkgdatadir)/examples/qmf-console MAKELDFLAGS=$(CONSOLEFLAGS) include $(top_srcdir)/examples/makedist.mk -noinst_PROGRAMS=console printevents ping queuestats +noinst_PROGRAMS=console printevents ping queuestats cluster-qmon console_SOURCES=console.cpp console_LDADD=$(CONSOLE_LIB) @@ -36,11 +36,15 @@ ping_LDADD=$(CONSOLE_LIB) queuestats_SOURCES=queuestats.cpp queuestats_LDADD=$(CONSOLE_LIB) +cluster_qmon_SOURCES=cluster-qmon.cpp +cluster_qmon_LDADD=$(CONSOLE_LIB) + examples_DATA= \ console.cpp \ printevents.cpp \ ping.cpp \ queuestats.cpp \ + cluster-qmon.cpp \ $(MAKEDIST) EXTRA_DIST= \ diff --git a/qpid/cpp/examples/qmf-console/cluster-qmon.cpp b/qpid/cpp/examples/qmf-console/cluster-qmon.cpp new file mode 100644 index 0000000000..4ee2ce1edd --- /dev/null +++ b/qpid/cpp/examples/qmf-console/cluster-qmon.cpp @@ -0,0 +1,170 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/console/ConsoleListener.h" +#include "qpid/console/SessionManager.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Mutex.h" +#include +#include + +using namespace std; +using namespace qpid::console; +using qpid::sys::Mutex; + +// +// This example maintains connections to a number of brokers (assumed +// to be running on localhost and at ports listed in the command line +// arguments). +// +// The program then periodically polls queue information from a +// single operational broker. This is a useful illustration of how +// one might monitor statistics on a cluster of brokers. +// + +//============================================================== +// Main program +//============================================================== +class Main : public ConsoleListener { + bool stopping; // Used to tell the program to exit + Mutex lock; // Mutex to protect the broker-map + map brokerMap; // Map of broker-pointers to boolean "operational" status + +public: + Main() : stopping(false) {} + + /** Invoked when a connection is established to a broker + */ + void brokerConnected(const Broker& broker) + { + Mutex::ScopedLock l(lock); + brokerMap[const_cast(&broker)] = true; + } + + /** Invoked when the connection to a broker is lost + */ + void brokerDisconnected(const Broker& broker) + { + Mutex::ScopedLock l(lock); + brokerMap[const_cast(&broker)] = false; + } + + int run(int argc, char** argv) + { + // + // Tune the settings for this application: We will operate synchronously only, we don't + // wish to use the bandwidth needed to aysnchronously receive objects or events. + // + SessionManager::Settings sessionSettings; + sessionSettings.rcvObjects = false; + sessionSettings.rcvEvents = false; + sessionSettings.rcvHeartbeats = false; + + SessionManager sm(this, sessionSettings); + + // + // Connect to the brokers. + // + for (int idx = 1; idx < argc; idx++) { + Mutex::ScopedLock l(lock); + qpid::client::ConnectionSettings connSettings; + connSettings.host = "localhost"; + connSettings.port = atoi(argv[idx]); + Broker* broker = sm.addBroker(connSettings); + brokerMap[broker] = false; // initially assume broker is disconnected + } + + // + // Periodically poll the first connected broker. + // + while (!stopping) { + // + // Find an operational broker + // + Broker* operationalBroker = 0; + for (map::iterator iter = brokerMap.begin(); + iter != brokerMap.end(); iter++) { + if (iter->second) { + operationalBroker = iter->first; + break; + } + } + + if (operationalBroker != 0) { + Object::Vector list; + sm.getObjects(list, "queue", operationalBroker); + for (Object::Vector::iterator i = list.begin(); i != list.end(); i++) { + cout << "queue: " << i->attrString("name"); + cout << " bindingCount=" << i->attrUint64("bindingCount") << endl; + } + } else { + cout << "No operational brokers" << endl; + } + + qpid::sys::sleep(10); + if (stopping) + break; + } + + { + // + // The following code structure uses the mutex to protect the broker map while + // ensuring that sm.delBroker is called without the mutex held (which leads to + // a deadlock). + // + Mutex::ScopedLock l(lock); + map::iterator iter = brokerMap.begin(); + while (iter != brokerMap.end()) { + Broker* broker = iter->first; + brokerMap.erase(iter); + { + Mutex::ScopedUnlock ul(lock); + sm.delBroker(broker); + } + iter = brokerMap.begin(); + } + } + + return 0; + } + + void stop() { + stopping = true; + } +}; + +Main main_program; + +void signal_handler(int) +{ + main_program.stop(); +} + +int main(int argc, char** argv) +{ + signal(SIGINT, signal_handler); + try { + return main_program.run(argc, argv); + } catch(std::exception& e) { + cout << "Top Level Exception: " << e.what() << endl; + } +} + -- cgit v1.2.1