summaryrefslogtreecommitdiff
path: root/qpid/cpp/examples/qmf-console/cluster-qmon.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/examples/qmf-console/cluster-qmon.cpp')
-rw-r--r--qpid/cpp/examples/qmf-console/cluster-qmon.cpp179
1 files changed, 179 insertions, 0 deletions
diff --git a/qpid/cpp/examples/qmf-console/cluster-qmon.cpp b/qpid/cpp/examples/qmf-console/cluster-qmon.cpp
new file mode 100644
index 0000000000..fe92f8a8ae
--- /dev/null
+++ b/qpid/cpp/examples/qmf-console/cluster-qmon.cpp
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/console/ConsoleListener.h"
+#include "qpid/console/SessionManager.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Mutex.h"
+#include <signal.h>
+#include <map>
+
+using namespace std;
+using namespace qpid::console;
+using qpid::sys::Mutex;
+
+//
+// This example maintains connections to a number of brokers (assumed
+// to be running on localhost and at ports listed in the command line
+// arguments).
+//
+// The program then periodically polls queue information from a
+// single operational broker. This is a useful illustration of how
+// one might monitor statistics on a cluster of brokers.
+//
+
+//==============================================================
+// Main program
+//==============================================================
+
+//
+// The Main class extends ConsoleListener so it can receive broker connected/disconnected
+// notifications.
+//
+class Main : public ConsoleListener {
+ bool stopping; // Used to tell the program to exit
+ Mutex lock; // Mutex to protect the broker-map
+ map<Broker*, bool> brokerMap; // Map of broker-pointers to boolean "operational" status
+
+public:
+ Main() : stopping(false) {}
+
+ /** Invoked when a connection is established to a broker
+ */
+ void brokerConnected(const Broker& broker)
+ {
+ Mutex::ScopedLock l(lock);
+ brokerMap[const_cast<Broker*>(&broker)] = true;
+ }
+
+ /** Invoked when the connection to a broker is lost
+ */
+ void brokerDisconnected(const Broker& broker)
+ {
+ Mutex::ScopedLock l(lock);
+ brokerMap[const_cast<Broker*>(&broker)] = false;
+ }
+
+ int run(int argc, char** argv)
+ {
+ //
+ // Tune the settings for this application: We will operate synchronously only, we don't
+ // wish to use the bandwidth needed to aysnchronously receive objects or events.
+ //
+ SessionManager::Settings sessionSettings;
+ sessionSettings.rcvObjects = false;
+ sessionSettings.rcvEvents = false;
+ sessionSettings.rcvHeartbeats = false;
+
+ SessionManager sm(this, sessionSettings);
+
+ //
+ // Connect to the brokers.
+ //
+ for (int idx = 1; idx < argc; idx++) {
+ qpid::client::ConnectionSettings connSettings;
+ connSettings.host = "localhost";
+ connSettings.port = atoi(argv[idx]);
+ Broker* broker = sm.addBroker(connSettings);
+
+ Mutex::ScopedLock l(lock);
+ brokerMap[broker] = false; // initially assume broker is disconnected
+ }
+
+ //
+ // Periodically poll the first connected broker.
+ //
+ while (!stopping) {
+ //
+ // Find an operational broker
+ //
+ Broker* operationalBroker = 0;
+ {
+ Mutex::ScopedLock l(lock);
+ for (map<Broker*, bool>::iterator iter = brokerMap.begin();
+ iter != brokerMap.end(); iter++) {
+ if (iter->second) {
+ operationalBroker = iter->first;
+ break;
+ }
+ }
+ }
+
+ if (operationalBroker != 0) {
+ Object::Vector list;
+ sm.getObjects(list, "queue", operationalBroker);
+ for (Object::Vector::iterator i = list.begin(); i != list.end(); i++) {
+ cout << "queue: " << i->attrString("name");
+ cout << " bindingCount=" << i->attrUint64("bindingCount") << endl;
+ }
+ } else {
+ cout << "No operational brokers" << endl;
+ }
+
+ qpid::sys::sleep(10);
+ if (stopping)
+ break;
+ }
+
+ {
+ //
+ // The following code structure uses the mutex to protect the broker map while
+ // ensuring that sm.delBroker is called without the mutex held (which leads to
+ // a deadlock).
+ //
+ Mutex::ScopedLock l(lock);
+ map<Broker*, bool>::iterator iter = brokerMap.begin();
+ while (iter != brokerMap.end()) {
+ Broker* broker = iter->first;
+ brokerMap.erase(iter);
+ {
+ Mutex::ScopedUnlock ul(lock);
+ sm.delBroker(broker);
+ }
+ iter = brokerMap.begin();
+ }
+ }
+
+ return 0;
+ }
+
+ void stop() {
+ stopping = true;
+ }
+};
+
+Main main_program;
+
+void signal_handler(int)
+{
+ main_program.stop();
+}
+
+int main(int argc, char** argv)
+{
+ signal(SIGINT, signal_handler);
+ try {
+ return main_program.run(argc, argv);
+ } catch(std::exception& e) {
+ cout << "Top Level Exception: " << e.what() << endl;
+ }
+}
+