summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-12-19 20:22:03 +0000
committerTed Ross <tross@apache.org>2008-12-19 20:22:03 +0000
commit2660791c236676f28cdbf81da5c0d52110b36d17 (patch)
tree5a4166a17da10fcd64e05ecddec4c6f14dc0385f
parentd98c3cdf286d3ee16904d66337a20d94a8e95dc3 (diff)
downloadqpid-python-2660791c236676f28cdbf81da5c0d52110b36d17.tar.gz
QPID-1412 - c++ implementation of the QMF client API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@728132 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/configure.ac1
-rw-r--r--cpp/examples/Makefile.am2
-rw-r--r--cpp/examples/makedist.mk4
-rw-r--r--cpp/examples/qmf-console/Makefile.am14
-rw-r--r--cpp/examples/qmf-console/console.cpp154
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qmfc.mk50
-rw-r--r--cpp/src/qpid/console/Agent.cpp30
-rw-r--r--cpp/src/qpid/console/Agent.h57
-rw-r--r--cpp/src/qpid/console/Broker.cpp291
-rw-r--r--cpp/src/qpid/console/Broker.h129
-rw-r--r--cpp/src/qpid/console/ClassKey.cpp104
-rw-r--r--cpp/src/qpid/console/ClassKey.h65
-rw-r--r--cpp/src/qpid/console/ConsoleListener.h95
-rw-r--r--cpp/src/qpid/console/Event.h40
-rw-r--r--cpp/src/qpid/console/Object.cpp324
-rw-r--r--cpp/src/qpid/console/Object.h106
-rw-r--r--cpp/src/qpid/console/ObjectId.cpp52
-rw-r--r--cpp/src/qpid/console/ObjectId.h60
-rw-r--r--cpp/src/qpid/console/Package.cpp41
-rw-r--r--cpp/src/qpid/console/Package.h76
-rw-r--r--cpp/src/qpid/console/Schema.cpp155
-rw-r--r--cpp/src/qpid/console/Schema.h104
-rw-r--r--cpp/src/qpid/console/SequenceManager.cpp48
-rw-r--r--cpp/src/qpid/console/SequenceManager.h53
-rw-r--r--cpp/src/qpid/console/SessionManager.cpp446
-rw-r--r--cpp/src/qpid/console/SessionManager.h194
-rw-r--r--cpp/src/qpid/console/Value.cpp168
-rw-r--r--cpp/src/qpid/console/Value.h205
-rw-r--r--cpp/src/qpid/framing/Uuid.cpp2
-rw-r--r--cpp/src/qpid/framing/Uuid.h2
-rw-r--r--cpp/src/qpid/sys/SystemInfo.h11
-rwxr-xr-xcpp/src/qpid/sys/posix/SystemInfo.cpp12
-rw-r--r--cpp/src/tests/ConsoleTest.cpp43
-rw-r--r--cpp/src/tests/Makefile.am6
35 files changed, 3139 insertions, 6 deletions
diff --git a/cpp/configure.ac b/cpp/configure.ac
index 3070ce6687..24588b0950 100644
--- a/cpp/configure.ac
+++ b/cpp/configure.ac
@@ -379,6 +379,7 @@ AC_CONFIG_FILES([
examples/request-response/Makefile
examples/failover/Makefile
examples/xml-exchange/Makefile
+ examples/qmf-console/Makefile
managementgen/Makefile
etc/Makefile
src/Makefile
diff --git a/cpp/examples/Makefile.am b/cpp/examples/Makefile.am
index b870d67802..29a101425c 100644
--- a/cpp/examples/Makefile.am
+++ b/cpp/examples/Makefile.am
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-SUBDIRS = direct fanout pub-sub request-response failover
+SUBDIRS = direct fanout pub-sub request-response failover qmf-console
if HAVE_XML
SUBDIRS += xml-exchange
broker_args = "--no-module-dir --data-dir \"\" --auth no --load-module $(top_builddir)/src/.libs/xml.so"
diff --git a/cpp/examples/makedist.mk b/cpp/examples/makedist.mk
index f579dca1e3..4345378983 100644
--- a/cpp/examples/makedist.mk
+++ b/cpp/examples/makedist.mk
@@ -2,6 +2,8 @@
AM_CXXFLAGS = $(WARNING_CFLAGS)
INCLUDES = -I$(top_srcdir)/src -I$(top_srcdir)/src/gen -I$(top_builddir)/src -I$(top_builddir)/src/gen
CLIENT_LIB=$(top_builddir)/src/libqpidclient.la
+CONSOLE_LIB=$(top_builddir)/src/libqmfconsole.la
+MAKELDFLAG ?= qpidclient
# Generate a simple non-automake Makefile for distribution.
MAKEDIST=.libs/Makefile
@@ -10,7 +12,7 @@ $(MAKEDIST): Makefile
mkdir -p .libs
@$(ECHO) CXX=$(CXX) > $(MAKEDIST)
@$(ECHO) CXXFLAGS=$(CXXFLAGS) >> $(MAKEDIST)
- @$(ECHO) LDFLAGS=-lqpidclient >> $(MAKEDIST)
+ @$(ECHO) LDFLAGS=-l$(MAKELDFLAG) >> $(MAKEDIST)
@$(ECHO) >> $(MAKEDIST)
@$(ECHO) all: $(noinst_PROGRAMS) >> $(MAKEDIST)
@$(ECHO) >> $(MAKEDIST)
diff --git a/cpp/examples/qmf-console/Makefile.am b/cpp/examples/qmf-console/Makefile.am
new file mode 100644
index 0000000000..710d9601d6
--- /dev/null
+++ b/cpp/examples/qmf-console/Makefile.am
@@ -0,0 +1,14 @@
+examplesdir=$(pkgdatadir)/examples/qmf-console
+
+MAKELDFLAG = qmfconsole
+include $(top_srcdir)/examples/makedist.mk
+
+noinst_PROGRAMS=qmfc
+
+qmfc_SOURCES=console.cpp
+qmfc_LDADD=$(CONSOLE_LIB)
+
+examples_DATA= \
+ console.cpp \
+ $(MAKEDIST)
+
diff --git a/cpp/examples/qmf-console/console.cpp b/cpp/examples/qmf-console/console.cpp
new file mode 100644
index 0000000000..c98f1ace34
--- /dev/null
+++ b/cpp/examples/qmf-console/console.cpp
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/console/ConsoleListener.h"
+#include "qpid/console/SessionManager.h"
+#include "qpid/console/Value.h"
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+
+#include <sstream>
+
+using namespace std;
+using namespace qpid::console;
+using std::cout;
+using std::endl;
+
+class Listener : public ConsoleListener {
+public:
+ ~Listener() {}
+
+ void brokerConnected(const Broker& broker) {
+ cout << "brokerConnected: " << broker << endl;
+ }
+
+ void brokerDisconnected(const Broker& broker) {
+ cout << "brokerDisconnected: " << broker << endl;
+ }
+
+ void newPackage(const std::string& name) {
+ cout << "newPackage: " << name << endl;
+ }
+
+ void newClass(const ClassKey& classKey) {
+ cout << "newClass: key=" << classKey << endl;
+ }
+
+ void newAgent(const Agent& agent) {
+ cout << "newAgent: " << agent << endl;
+ }
+
+ void delAgent(const Agent& agent) {
+ cout << "delAgent: " << agent << endl;
+ }
+
+ void objectProps(Broker& broker, Object& object) {
+ cout << "objectProps: broker=" << broker << " object=" << object << endl;
+ }
+
+ void objectStats(Broker& broker, Object& object) {
+ cout << "objectStats: broker=" << broker << " object=" << object << endl;
+ }
+};
+
+
+//==============================================================
+// Main program
+//==============================================================
+int main_int(int /*argc*/, char** /*argv*/)
+{
+ //Listener listener;
+ qpid::client::ConnectionSettings settings;
+
+ cout << "Creating SessionManager" << endl;
+ SessionManager sm;
+ cout << "Adding broker" << endl;
+ Broker* broker;
+
+ broker = sm.addBroker(settings);
+
+ cout << "Package List:" << endl;
+ vector<string> packages;
+ sm.getPackages(packages);
+ for (vector<string>::iterator iter = packages.begin(); iter != packages.end(); iter++) {
+ cout << " " << *iter << endl;
+ SessionManager::KeyVector classKeys;
+ sm.getClasses(classKeys, *iter);
+ for (SessionManager::KeyVector::iterator cIter = classKeys.begin();
+ cIter != classKeys.end(); cIter++)
+ cout << " " << *cIter << endl;
+ }
+
+ Object::Vector list;
+ cout << "getting exchanges..." << endl;
+ sm.getObjects(list, "exchange");
+ cout << " returned " << list.size() << " elements" << endl;
+
+ for (Object::Vector::iterator i = list.begin(); i != list.end(); i++) {
+ cout << "exchange: " << *i << endl;
+ }
+
+ list.clear();
+ cout << "getting queues..." << endl;
+ sm.getObjects(list, "queue");
+ cout << " returned " << list.size() << " elements" << endl;
+
+ for (Object::Vector::iterator i = list.begin(); i != list.end(); i++) {
+ cout << "queue: " << *i << endl;
+ cout << " bindingCount=" << i->attrUint("bindingCount") << endl;
+ cout << " arguments=" << i->attrMap("arguments") << endl;
+ }
+
+ list.clear();
+ sm.getObjects(list, "broker");
+ if (list.size() == 1) {
+ Object& broker = *list.begin();
+
+ cout << "Broker: " << broker << endl;
+
+ Object::AttributeMap args;
+ MethodResponse result;
+ args["sequence"] = new UintValue(1);
+ args["body"] = new StringValue("Testing...");
+
+ cout << "Call echo method..." << endl;
+ broker.invokeMethod("echo", args, result);
+ cout << "Result: code=" << result.code << " text=" << result.text << endl;
+ for (Object::AttributeMap::iterator aIter = result.arguments.begin();
+ aIter != result.arguments.end(); aIter++) {
+ cout << " Output Arg: " << aIter->first << " => " << aIter->second->str() << endl;
+ }
+ }
+
+ sm.delBroker(broker);
+ return 0;
+}
+
+int main(int argc, char** argv)
+{
+ try {
+ return main_int(argc, argv);
+ } catch(std::exception& e) {
+ cout << "Top Level Exception: " << e.what() << endl;
+ }
+}
+
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 396a9dd6a6..606fdb1a41 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -179,6 +179,7 @@ cmodule_LTLIBRARIES =
include cluster.mk
include acl.mk
include qmf.mk
+include qmfc.mk
if HAVE_XML
include xml.mk
endif
diff --git a/cpp/src/qmfc.mk b/cpp/src/qmfc.mk
new file mode 100644
index 0000000000..6fa7d4c988
--- /dev/null
+++ b/cpp/src/qmfc.mk
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# qmf console library makefile fragment, to be included in Makefile.am
+#
+lib_LTLIBRARIES += libqmfconsole.la
+
+libqmfconsole_la_SOURCES = \
+ qpid/console/Agent.h \
+ qpid/console/Agent.cpp \
+ qpid/console/Broker.h \
+ qpid/console/Broker.cpp \
+ qpid/console/ClassKey.h \
+ qpid/console/ClassKey.cpp \
+ qpid/console/ConsoleListener.h \
+ qpid/console/Event.h \
+ qpid/console/Object.h \
+ qpid/console/Object.cpp \
+ qpid/console/ObjectId.h \
+ qpid/console/ObjectId.cpp \
+ qpid/console/Package.h \
+ qpid/console/Package.cpp \
+ qpid/console/Schema.h \
+ qpid/console/Schema.cpp \
+ qpid/console/SequenceManager.h \
+ qpid/console/SequenceManager.cpp \
+ qpid/console/SessionManager.h \
+ qpid/console/SessionManager.cpp \
+ qpid/console/Value.h \
+ qpid/console/Value.cpp
+
+libqmfconsole_la_LIBADD = libqpidclient.la
+
diff --git a/cpp/src/qpid/console/Agent.cpp b/cpp/src/qpid/console/Agent.cpp
new file mode 100644
index 0000000000..8b5a8adbb4
--- /dev/null
+++ b/cpp/src/qpid/console/Agent.cpp
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Agent.h"
+
+std::ostream& qpid::console::operator<<(std::ostream& o, const Agent& agent)
+{
+ o << "Agent at bank " << agent.getBrokerBank() << "." << agent.getAgentBank() <<
+ " (" << agent.getLabel() << ")";
+ return o;
+}
+
diff --git a/cpp/src/qpid/console/Agent.h b/cpp/src/qpid/console/Agent.h
new file mode 100644
index 0000000000..3307a1b44b
--- /dev/null
+++ b/cpp/src/qpid/console/Agent.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_CONSOLE_AGENT_H_
+#define _QPID_CONSOLE_AGENT_H_
+
+#include "Broker.h"
+
+namespace qpid {
+namespace console {
+
+ /**
+ *
+ * \ingroup qmfconsoleapi
+ */
+ class Agent {
+ public:
+ typedef std::vector<Agent*> Vector;
+
+ Agent(Broker* _broker, uint32_t _bank, const std::string& _label) :
+ broker(_broker), brokerBank(broker->getBrokerBank()),
+ agentBank(_bank), label(_label) {}
+ Broker* getBroker() const { return broker; }
+ uint32_t getBrokerBank() const { return brokerBank; }
+ uint32_t getAgentBank() const { return agentBank; }
+ const std::string& getLabel() const { return label; }
+
+ private:
+ Broker* broker;
+ const uint32_t brokerBank;
+ const uint32_t agentBank;
+ const std::string label;
+ };
+
+ std::ostream& operator<<(std::ostream& o, const Agent& agent);
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/console/Broker.cpp b/cpp/src/qpid/console/Broker.cpp
new file mode 100644
index 0000000000..2e7ba95b1d
--- /dev/null
+++ b/cpp/src/qpid/console/Broker.cpp
@@ -0,0 +1,291 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Broker.h"
+#include "Object.h"
+#include "Value.h"
+#include "SessionManager.h"
+#include "ConsoleListener.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/SystemInfo.h"
+
+using namespace qpid::client;
+using namespace qpid::console;
+using namespace qpid::framing;
+using namespace qpid::sys;
+using namespace std;
+
+Broker::Broker(SessionManager& sm, ConnectionSettings& settings) :
+ sessionManager(sm), connected(false), connectionSettings(settings),
+ reqsOutstanding(1), syncInFlight(false), topicBound(false), methodObject(0),
+ connThreadBody(*this), connThread(connThreadBody)
+{
+ string osName;
+ string nodeName;
+ string release;
+ string version;
+ string machine;
+
+ sys::SystemInfo::getSystemId(osName, nodeName, release, version, machine);
+ uint32_t pid = sys::SystemInfo::getParentProcessId();
+
+ stringstream text;
+
+ text << "qmfc-cpp-" << nodeName << "-" << pid;
+ amqpSessionId = string(text.str());
+
+ QPID_LOG(debug, "Broker::Broker: constructed, amqpSessionId=" << amqpSessionId);
+}
+
+Broker::~Broker()
+{
+}
+
+void Broker::encodeHeader(framing::Buffer& buf, uint8_t opcode, uint32_t seq) const
+{
+ buf.putOctet('A');
+ buf.putOctet('M');
+ buf.putOctet('2');
+ buf.putOctet(opcode);
+ buf.putLong (seq);
+}
+
+bool Broker::checkHeader(framing::Buffer& buf, uint8_t *opcode, uint32_t *seq) const
+{
+ if (buf.getSize() < 8)
+ return false;
+
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
+
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '2';
+}
+
+void Broker::received(client::Message& msg)
+{
+ string data = msg.getData();
+ Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
+ uint8_t opcode;
+ uint32_t sequence;
+
+ if (checkHeader(inBuffer, &opcode, &sequence)) {
+ QPID_LOG(trace, "Broker::received: opcode=" << opcode << " seq=" << sequence);
+
+ if (opcode == 'b') sessionManager.handleBrokerResp(this, inBuffer, sequence);
+ else if (opcode == 'p') sessionManager.handlePackageInd(this, inBuffer, sequence);
+ else if (opcode == 'z') sessionManager.handleCommandComplete(this, inBuffer, sequence);
+ else if (opcode == 'q') sessionManager.handleClassInd(this, inBuffer, sequence);
+ else if (opcode == 'm') sessionManager.handleMethodResp(this, inBuffer, sequence);
+ else if (opcode == 'h') sessionManager.handleHeartbeatInd(this, inBuffer, sequence);
+ else if (opcode == 'e') sessionManager.handleEventInd(this, inBuffer, sequence);
+ else if (opcode == 's') sessionManager.handleSchemaResp(this, inBuffer, sequence);
+ else if (opcode == 'c') sessionManager.handleContentInd(this, inBuffer, sequence, true, false);
+ else if (opcode == 'i') sessionManager.handleContentInd(this, inBuffer, sequence, false, true);
+ else if (opcode == 'g') sessionManager.handleContentInd(this, inBuffer, sequence, true, true);
+ }
+}
+
+void Broker::resetAgents()
+{
+ for (AgentMap::iterator iter = agents.begin(); iter != agents.end(); iter++) {
+ if (sessionManager.listener != 0)
+ sessionManager.listener->delAgent(*(iter->second));
+ delete iter->second;
+ }
+
+ agents.clear();
+ agents[0x0000000100000000LL] = new Agent(this, 0, "BrokerAgent");
+}
+
+void Broker::updateAgent(const Object& object)
+{
+ uint32_t brokerBank = object.attrUint("brokerBank");
+ uint32_t agentBank = object.attrUint("agentBank");
+ uint64_t agentKey = ((uint64_t) brokerBank << 32) | (uint64_t) agentBank;
+ AgentMap::iterator iter = agents.find(agentKey);
+
+ if (object.isDeleted()) {
+ if (iter != agents.end()) {
+ if (sessionManager.listener != 0)
+ sessionManager.listener->delAgent(*(iter->second));
+ delete iter->second;
+ agents.erase(iter);
+ }
+ } else {
+ if (iter == agents.end()) {
+ Agent* agent = new Agent(this, agentBank, object.attrString("label"));
+ agents[agentKey] = agent;
+ if (sessionManager.listener != 0)
+ sessionManager.listener->newAgent(*agent);
+ }
+ }
+}
+
+void Broker::ConnectionThread::run()
+{
+ static const int delayMin(1);
+ static const int delayMax(128);
+ static const int delayFactor(2);
+ int delay(delayMin);
+ string dest("qmfc");
+
+ sessionId.generate();
+ queueName << "qmfc-" << sessionId;
+
+ while (true) {
+ try {
+ broker.topicBound = false;
+ broker.reqsOutstanding = 1;
+ connection.open(broker.connectionSettings);
+ session = connection.newSession(queueName.str());
+ subscriptions = new client::SubscriptionManager(session);
+
+ session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true,
+ arg::exclusive=true);
+ session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
+ arg::bindingKey=queueName.str());
+
+ subscriptions->setAcceptMode(ACCEPT_MODE_NONE);
+ subscriptions->setAcquireMode(ACQUIRE_MODE_PRE_ACQUIRED);
+ subscriptions->subscribe(broker, queueName.str(), dest);
+ subscriptions->setFlowControl(dest, FlowControl::unlimited());
+ {
+ Mutex::ScopedLock _lock(connLock);
+ operational = true;
+ broker.resetAgents();
+ broker.connected = true;
+ broker.sessionManager.handleBrokerConnect(&broker);
+ broker.sessionManager.startProtocol(&broker);
+ try {
+ Mutex::ScopedUnlock _unlock(connLock);
+ subscriptions->run();
+ } catch (std::exception) {}
+
+ operational = false;
+ broker.connected = false;
+ broker.sessionManager.handleBrokerDisconnect(&broker);
+ }
+ delay = delayMin;
+ delete subscriptions;
+ subscriptions = 0;
+ session.close();
+ } catch (std::exception &e) {
+ QPID_LOG(debug, " outer exception: " << e.what());
+ if (delay < delayMax)
+ delay *= delayFactor;
+ }
+
+ ::sleep(delay);
+ }
+}
+
+Broker::ConnectionThread::~ConnectionThread()
+{
+ if (subscriptions != 0) {
+ delete subscriptions;
+ }
+}
+
+void Broker::ConnectionThread::sendBuffer(Buffer& buf, uint32_t length,
+ const string& exchange, const string& routingKey)
+{
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (!operational)
+ return;
+ }
+
+ client::Message msg;
+ string data;
+
+ buf.getRawData(data, length);
+ msg.getDeliveryProperties().setRoutingKey(routingKey);
+ msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+ msg.setData(data);
+ try {
+ session.messageTransfer(arg::content=msg, arg::destination=exchange);
+ } catch(std::exception&) {}
+}
+
+void Broker::ConnectionThread::bindExchange(const std::string& exchange, const std::string& key)
+{
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (!operational)
+ return;
+ }
+
+ QPID_LOG(debug, "Broker::ConnectionThread::bindExchange: exchange=" << exchange << " key=" << key);
+ session.exchangeBind(arg::exchange=exchange, arg::queue=queueName.str(),
+ arg::bindingKey=key);
+}
+
+void Broker::waitForStable()
+{
+ Mutex::ScopedLock l(lock);
+ if (reqsOutstanding == 0)
+ return;
+ syncInFlight = true;
+ while (reqsOutstanding != 0) {
+ cond.wait(lock); // TODO: put timeout delay in here!
+ }
+}
+
+void Broker::incOutstanding()
+{
+ Mutex::ScopedLock l(lock);
+ reqsOutstanding++;
+}
+
+void Broker::decOutstanding()
+{
+ Mutex::ScopedLock l(lock);
+ reqsOutstanding--;
+ if (reqsOutstanding == 0) {
+ if (!topicBound) {
+ topicBound = true;
+ for (vector<string>::const_iterator iter = sessionManager.bindingKeyList.begin();
+ iter != sessionManager.bindingKeyList.end(); iter++)
+ connThreadBody.bindExchange("qpid.management", *iter);
+ }
+ if (syncInFlight) {
+ syncInFlight = false;
+ cond.notify();
+ }
+ }
+}
+
+void Broker::appendAgents(Agent::Vector& agentlist) const
+{
+ for (AgentMap::const_iterator iter = agents.begin(); iter != agents.end(); iter++) {
+ agentlist.push_back(iter->second);
+ }
+}
+
+ostream& qpid::console::operator<<(ostream& o, const Broker& k)
+{
+ o << "Broker: " << k.connectionSettings.host << ":" << k.connectionSettings.port;
+ return o;
+}
diff --git a/cpp/src/qpid/console/Broker.h b/cpp/src/qpid/console/Broker.h
new file mode 100644
index 0000000000..8cca99f764
--- /dev/null
+++ b/cpp/src/qpid/console/Broker.h
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_CONSOLE_BROKER_H_
+#define _QPID_CONSOLE_BROKER_H_
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+#include "qpid/Url.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/Uuid.h"
+#include <string>
+#include <iostream>
+
+namespace qpid {
+namespace console {
+ class SessionManager;
+ class Agent;
+ class Object;
+
+ /**
+ *
+ * \ingroup qpidconsoleapi
+ */
+ class Broker : public client::MessageListener {
+ public:
+ Broker(SessionManager& sm, client::ConnectionSettings& settings);
+ ~Broker();
+
+ bool isConnected() const { return connected; }
+ const std::string& getError() const { return error; }
+ const std::string& getSessionId() const { return amqpSessionId; }
+ const framing::Uuid& getBrokerId() const { return brokerId; }
+ uint32_t getBrokerBank() const { return 1; }
+ void addBinding(const std::string& key) {
+ connThreadBody.bindExchange("qpid.management", key);
+ }
+
+ private:
+ friend class SessionManager;
+ friend class Object;
+ typedef std::map<uint64_t,Agent*> AgentMap;
+ static const int SYNC_TIME = 60;
+
+ SessionManager& sessionManager;
+ AgentMap agents;
+ client::SubscriptionManager* subscription;
+ bool connected;
+ std::string error;
+ std::string amqpSessionId;
+ client::ConnectionSettings connectionSettings;
+ sys::Mutex lock;
+ sys::Condition cond;
+ framing::Uuid brokerId;
+ uint32_t reqsOutstanding;
+ bool syncInFlight;
+ bool topicBound;
+ Object* methodObject;
+
+ friend class ConnectionThread;
+ class ConnectionThread : public sys::Runnable {
+ bool operational;
+ Broker& broker;
+ framing::Uuid sessionId;
+ client::Connection connection;
+ client::Session session;
+ client::SubscriptionManager* subscriptions;
+ std::stringstream queueName;
+ sys::Mutex connLock;
+ void run();
+ public:
+ ConnectionThread(Broker& _broker) :
+ operational(false), broker(_broker), subscriptions(0) {}
+ ~ConnectionThread();
+ void sendBuffer(qpid::framing::Buffer& buf,
+ uint32_t length,
+ const std::string& exchange = "qpid.management",
+ const std::string& routingKey = "broker");
+ void bindExchange(const std::string& exchange, const std::string& key);
+ };
+
+ ConnectionThread connThreadBody;
+ sys::Thread connThread;
+
+ void encodeHeader(framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0) const;
+ bool checkHeader(framing::Buffer& buf, uint8_t *opcode, uint32_t *seq) const;
+ void received(client::Message& msg);
+ void resetAgents();
+ void updateAgent(const Object& object);
+ void waitForStable();
+ void incOutstanding();
+ void decOutstanding();
+ void setBrokerId(const framing::Uuid& id) { brokerId = id; }
+ void appendAgents(std::vector<Agent*>& agents) const;
+
+ friend std::ostream& operator<<(std::ostream& o, const Broker& k);
+ };
+
+ std::ostream& operator<<(std::ostream& o, const Broker& k);
+}
+}
+
+#endif
diff --git a/cpp/src/qpid/console/ClassKey.cpp b/cpp/src/qpid/console/ClassKey.cpp
new file mode 100644
index 0000000000..6aa2bcb117
--- /dev/null
+++ b/cpp/src/qpid/console/ClassKey.cpp
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ClassKey.h"
+#include <string.h>
+
+using namespace std;
+using namespace qpid::console;
+
+ClassKey::ClassKey(const string& _package, const string& _name, const uint8_t* _hash) :
+ package(_package), name(_name)
+{
+ ::memcpy(hash, _hash, HASH_SIZE);
+}
+
+string ClassKey::getHashString() const
+{
+ char cstr[36];
+ ::sprintf(cstr, "%02x%02x%02x%02x-%02x%02x%02x%02x-%02x%02x%02x%02x-%02x%02x%02x%02x",
+ hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7],
+ hash[8], hash[9], hash[10], hash[11], hash[12], hash[13], hash[14], hash[15]);
+ return string(cstr);
+}
+
+string ClassKey::str() const
+{
+ string result(package + ":" + name + "(" + getHashString() + ")");
+ return result;
+}
+
+bool ClassKey::operator==(const ClassKey& other) const
+{
+ return ::memcmp(hash, other.hash, HASH_SIZE) == 0 &&
+ name == other.name &&
+ package == other.package;
+}
+
+bool ClassKey::operator!=(const ClassKey& other) const
+{
+ return !(*this == other);
+}
+
+bool ClassKey::operator<(const ClassKey& other) const
+{
+ int cmp = ::memcmp(hash, other.hash, HASH_SIZE);
+ if (cmp != 0)
+ return cmp < 0;
+ cmp = name.compare(other.name);
+ if (cmp != 0)
+ return cmp < 0;
+ return package < other.package;
+}
+
+bool ClassKey::operator>(const ClassKey& other) const
+{
+ int cmp = ::memcmp(hash, other.hash, HASH_SIZE);
+ if (cmp != 0)
+ return cmp > 0;
+ cmp = name.compare(other.name);
+ if (cmp != 0)
+ return cmp > 0;
+ return package > other.package;
+}
+
+bool ClassKey::operator<=(const ClassKey& other) const
+{
+ return !(*this > other);
+}
+
+bool ClassKey::operator>=(const ClassKey& other) const
+{
+ return !(*this < other);
+}
+
+void ClassKey::encode(framing::Buffer& buffer) const
+{
+ buffer.putShortString(package);
+ buffer.putShortString(name);
+ buffer.putBin128(const_cast<uint8_t*>(hash));
+}
+
+ostream& qpid::console::operator<<(ostream& o, const ClassKey& k)
+{
+ o << k.str();
+ return o;
+}
diff --git a/cpp/src/qpid/console/ClassKey.h b/cpp/src/qpid/console/ClassKey.h
new file mode 100644
index 0000000000..f6617e22d5
--- /dev/null
+++ b/cpp/src/qpid/console/ClassKey.h
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_CONSOLE_CLASSKEY_H_
+#define _QPID_CONSOLE_CLASSKEY_H_
+
+#include <string>
+#include "Package.h"
+#include "qpid/framing/Buffer.h"
+
+namespace qpid {
+namespace console {
+
+ /**
+ *
+ * \ingroup qmfconsoleapi
+ */
+ class ClassKey {
+ public:
+ static const int HASH_SIZE = 16;
+
+ ClassKey(const std::string& package, const std::string& name, const uint8_t* hash);
+
+ const std::string& getPackageName() const { return package; }
+ const std::string& getClassName() const { return name; }
+ const uint8_t* getHash() const { return hash; }
+ std::string getHashString() const;
+ std::string str() const;
+ bool operator==(const ClassKey& other) const;
+ bool operator!=(const ClassKey& other) const;
+ bool operator<(const ClassKey& other) const;
+ bool operator>(const ClassKey& other) const;
+ bool operator<=(const ClassKey& other) const;
+ bool operator>=(const ClassKey& other) const;
+ void encode(framing::Buffer& buffer) const;
+
+ private:
+ std::string package;
+ std::string name;
+ uint8_t hash[HASH_SIZE];
+ };
+
+ std::ostream& operator<<(std::ostream& o, const ClassKey& k);
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/console/ConsoleListener.h b/cpp/src/qpid/console/ConsoleListener.h
new file mode 100644
index 0000000000..97fdf158cd
--- /dev/null
+++ b/cpp/src/qpid/console/ConsoleListener.h
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_CONSOLE_CONSOLE_LISTENER_H_
+#define _QPID_CONSOLE_CONSOLE_LISTENER_H_
+
+#include <string>
+#include "Broker.h"
+#include "ClassKey.h"
+#include "Object.h"
+
+namespace qpid {
+namespace console {
+
+ /**
+ * Implement a subclass of ConsoleListener and subscribe it using
+ * the SessionManager to receive indications.
+ *
+ * \ingroup qmfconsoleapi
+ */
+ class ConsoleListener{
+ public:
+ virtual ~ConsoleListener() {};
+
+ /** Invoked when a connection is established to a broker
+ */
+ virtual void brokerConnected(const Broker&) {}
+
+ /** Invoked when the connection to a broker is lost
+ */
+ virtual void brokerDisconnected(const Broker&) {}
+
+ /** Invoked when a QMF package is discovered.
+ */
+ virtual void newPackage(const std::string&) {}
+
+ /** Invoked when a new class is discovered. Session.getSchema can be
+ * used to obtain details about the class.
+ */
+ virtual void newClass(const ClassKey&) {}
+
+ /** Invoked when a QMF agent is discovered.
+ */
+ virtual void newAgent(const Agent&) {}
+
+ /** Invoked when a QMF agent disconects.
+ */
+ virtual void delAgent(const Agent&) {}
+
+ /** Invoked when an object is updated.
+ */
+ virtual void objectProps(Broker&, Object&) {}
+
+ /** Invoked when an object is updated.
+ */
+ virtual void objectStats(Broker&, Object&) {}
+
+ /** Invoked when an event is raised.
+ */
+ //virtual void event(Broker&, Event) {}
+
+ /**
+ */
+ //virtual void heartbeat(Agent&, uint64_t) {}
+
+ /**
+ */
+ virtual void brokerInfo(Broker&) {}
+
+ /**
+ */
+ //virtual void methodResponse(Broker&, uint32_t seq, MethodResponse&) {}
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/console/Event.h b/cpp/src/qpid/console/Event.h
new file mode 100644
index 0000000000..7627a4264d
--- /dev/null
+++ b/cpp/src/qpid/console/Event.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_CONSOLE_EVENT_H_
+#define _QPID_CONSOLE_EVENT_H_
+
+#include <string>
+#include "Message.h"
+
+namespace qpid {
+namespace console {
+
+ /**
+ *
+ * \ingroup qpidconsoleapi
+ */
+ class Event {
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/console/Object.cpp b/cpp/src/qpid/console/Object.cpp
new file mode 100644
index 0000000000..1ca70a616a
--- /dev/null
+++ b/cpp/src/qpid/console/Object.cpp
@@ -0,0 +1,324 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SessionManager.h"
+#include "Broker.h"
+#include "Object.h"
+#include "Schema.h"
+#include "ClassKey.h"
+#include "Value.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/sys/Mutex.h"
+
+using namespace qpid::console;
+using namespace std;
+using qpid::framing::Uuid;
+using qpid::framing::FieldTable;
+using qpid::sys::Mutex;
+
+Object::Object(Broker* b, SchemaClass* s, framing::Buffer& buffer, bool prop, bool stat) :
+ broker(b), schema(s), pendingMethod(0)
+{
+ currentTime = buffer.getLongLong();
+ createTime = buffer.getLongLong();
+ deleteTime = buffer.getLongLong();
+ objectId.decode(buffer);
+
+ if (prop) {
+ set<string> excludes;
+ parsePresenceMasks(buffer, excludes);
+ for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin();
+ pIter != schema->properties.end(); pIter++) {
+ SchemaProperty* property = *pIter;
+ if (excludes.count(property->name) != 0) {
+ attributes[property->name] = new NullValue();
+ } else {
+ attributes[property->name] = property->decodeValue(buffer);
+ }
+ }
+ }
+
+ if (stat) {
+ for (vector<SchemaStatistic*>::const_iterator sIter = schema->statistics.begin();
+ sIter != schema->statistics.end(); sIter++) {
+ SchemaStatistic* statistic = *sIter;
+ attributes[statistic->name] = statistic->decodeValue(buffer);
+ }
+ }
+}
+
+Object::~Object()
+{
+ // for (AttributeMap::iterator iter = attributes.begin(); iter != attributes.end(); iter++)
+ // delete iter->second;
+ // attributes.clear();
+}
+
+const ClassKey& Object::getClassKey() const
+{
+ return schema->getClassKey();
+}
+
+std::string Object::getIndex() const
+{
+ string result;
+
+ for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin();
+ pIter != schema->properties.end(); pIter++) {
+ SchemaProperty* property = *pIter;
+ if (property->isIndex) {
+ AttributeMap::const_iterator vIter = attributes.find(property->name);
+ if (vIter != attributes.end()) {
+ if (!result.empty())
+ result += ":";
+ result += vIter->second->str();
+ }
+ }
+ }
+ return result;
+}
+
+void Object::mergeUpdate(const Object& /*updated*/)
+{
+ // TODO
+}
+
+void Object::invokeMethod(const string name, const AttributeMap& args, MethodResponse& result)
+{
+ for (vector<SchemaMethod*>::const_iterator iter = schema->methods.begin();
+ iter != schema->methods.end(); iter++) {
+ if ((*iter)->name == name) {
+ SchemaMethod* method = *iter;
+ char rawbuffer[65536];
+ framing::Buffer buffer(rawbuffer, 65536);
+ uint32_t sequence = broker->sessionManager.sequenceManager.reserve("method");
+ pendingMethod = method;
+ broker->methodObject = this;
+ broker->encodeHeader(buffer, 'M', sequence);
+ objectId.encode(buffer);
+ schema->key.encode(buffer);
+ buffer.putShortString(name);
+
+ for (vector<SchemaArgument*>::const_iterator aIter = method->arguments.begin();
+ aIter != method->arguments.end(); aIter++) {
+ SchemaArgument* arg = *aIter;
+ if (arg->dirInput) {
+ AttributeMap::const_iterator attr = args.find(arg->name);
+ if (attr != args.end()) {
+ ValueFactory::encodeValue(arg->typeCode, attr->second, buffer);
+ } else {
+ // TODO Use the default value instead of throwing
+ throw Exception("Missing arguments in method call");
+ }
+ }
+ }
+
+ uint32_t length = buffer.getPosition();
+ buffer.reset();
+ stringstream routingKey;
+ routingKey << "agent." << objectId.getBrokerBank() << "." << objectId.getAgentBank();
+ broker->connThreadBody.sendBuffer(buffer, length, "qpid.management", routingKey.str());
+
+ {
+ Mutex::ScopedLock l(broker->lock);
+ while (pendingMethod != 0)
+ broker->cond.wait(broker->lock);
+ result = methodResponse;
+ }
+ }
+ }
+}
+
+void Object::handleMethodResp(framing::Buffer& buffer, uint32_t sequence)
+{
+ broker->sessionManager.sequenceManager.release(sequence);
+ methodResponse.code = buffer.getLong();
+ buffer.getMediumString(methodResponse.text);
+ methodResponse.arguments.clear();
+
+ for (vector<SchemaArgument*>::const_iterator aIter = pendingMethod->arguments.begin();
+ aIter != pendingMethod->arguments.end(); aIter++) {
+ SchemaArgument* arg = *aIter;
+ if (arg->dirOutput) {
+ methodResponse.arguments[arg->name] = arg->decodeValue(buffer);
+ }
+ }
+
+ {
+ Mutex::ScopedLock l(broker->lock);
+ pendingMethod = 0;
+ broker->cond.notify();
+ }
+}
+
+ObjectId Object::attrRef(const std::string& key) const
+{
+ AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return ObjectId();
+ Value* val = iter->second;
+ if (!val->isObjectId())
+ return ObjectId();
+ return val->asObjectId();
+}
+
+uint32_t Object::attrUint(const std::string& key) const
+{
+ AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0;
+ Value* val = iter->second;
+ if (!val->isUint())
+ return 0;
+ return val->asUint();
+}
+
+int32_t Object::attrInt(const std::string& key) const
+{
+ AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0;
+ Value* val = iter->second;
+ if (!val->isInt())
+ return 0;
+ return val->asInt();
+}
+
+uint64_t Object::attrUint64(const std::string& key) const
+{
+ AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0;
+ Value* val = iter->second;
+ if (!val->isUint64())
+ return 0;
+ return val->asUint64();
+}
+
+int64_t Object::attrInt64(const std::string& key) const
+{
+ AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0;
+ Value* val = iter->second;
+ if (!val->isInt64())
+ return 0;
+ return val->asInt64();
+}
+
+string Object::attrString(const std::string& key) const
+{
+ AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return string();
+ Value* val = iter->second;
+ if (!val->isString())
+ return string();
+ return val->asString();
+}
+
+bool Object::attrBool(const std::string& key) const
+{
+ AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return false;
+ Value* val = iter->second;
+ if (!val->isBool())
+ return false;
+ return val->asBool();
+}
+
+float Object::attrFloat(const std::string& key) const
+{
+ AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0.0;
+ Value* val = iter->second;
+ if (!val->isFloat())
+ return 0.0;
+ return val->asFloat();
+}
+
+double Object::attrDouble(const std::string& key) const
+{
+ AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0.0;
+ Value* val = iter->second;
+ if (!val->isDouble())
+ return 0.0;
+ return val->asDouble();
+}
+
+Uuid Object::attrUuid(const std::string& key) const
+{
+ AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return Uuid();
+ Value* val = iter->second;
+ if (!val->isUuid())
+ return Uuid();
+ return val->asUuid();
+}
+
+FieldTable Object::attrMap(const std::string& key) const
+{
+ AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return FieldTable();
+ Value* val = iter->second;
+ if (!val->isMap())
+ return FieldTable();
+ return val->asMap();
+}
+
+void Object::parsePresenceMasks(framing::Buffer& buffer, set<string>& excludeList)
+{
+ excludeList.clear();
+ uint8_t bit = 0;
+ uint8_t mask = 0;
+
+ for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin();
+ pIter != schema->properties.end(); pIter++) {
+ SchemaProperty* property = *pIter;
+ if (property->isOptional) {
+ if (bit == 0) {
+ mask = buffer.getOctet();
+ bit = 1;
+ }
+ if ((mask & bit) == 0)
+ excludeList.insert(property->name);
+ if (bit == 0x80)
+ bit = 0;
+ else
+ bit = bit << 1;
+ }
+ }
+}
+
+ostream& qpid::console::operator<<(ostream& o, const Object& object)
+{
+ const ClassKey& key = object.getClassKey();
+ o << key.getPackageName() << ":" << key.getClassName() << "[" << object.getObjectId() << "] " <<
+ object.getIndex();
+ return o;
+}
+
diff --git a/cpp/src/qpid/console/Object.h b/cpp/src/qpid/console/Object.h
new file mode 100644
index 0000000000..918bee8af1
--- /dev/null
+++ b/cpp/src/qpid/console/Object.h
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_CONSOLE_OBJECT_H_
+#define _QPID_CONSOLE_OBJECT_H_
+
+#include "ObjectId.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FieldTable.h"
+#include <map>
+#include <set>
+#include <vector>
+
+namespace qpid {
+namespace framing {
+ class Buffer;
+}
+namespace console {
+
+ class Broker;
+ class SchemaClass;
+ class SchemaMethod;
+ class ObjectId;
+ class ClassKey;
+ class Value;
+
+ /**
+ * \ingroup qmfconsoleapi
+ */
+ struct MethodResponse {
+ uint32_t code;
+ std::string text;
+ std::map<std::string, Value*> arguments;
+ };
+
+ class Object {
+ public:
+ typedef std::vector<Object> Vector;
+ typedef std::map<std::string, Value*> AttributeMap;
+
+ Object(Broker* broker, SchemaClass* schemaClass, framing::Buffer& buffer, bool prop, bool stat);
+ ~Object();
+
+ Broker* getBroker() const { return broker; }
+ const ObjectId& getObjectId() const { return objectId; }
+ const ClassKey& getClassKey() const;
+ SchemaClass* getSchema() const { return schema; }
+ uint64_t getCurrentTime() const { return currentTime; }
+ uint64_t getCreateTime() const { return createTime; }
+ uint64_t getDeleteTime() const { return deleteTime; }
+ bool isDeleted() const { return deleteTime != 0; }
+ std::string getIndex() const;
+ void mergeUpdate(const Object& updated);
+ const AttributeMap& getAttributes() const { return attributes; }
+ void invokeMethod(const std::string name, const AttributeMap& args, MethodResponse& result);
+ void handleMethodResp(framing::Buffer& buffer, uint32_t sequence);
+
+ ObjectId attrRef(const std::string& key) const;
+ uint32_t attrUint(const std::string& key) const;
+ int32_t attrInt(const std::string& key) const;
+ uint64_t attrUint64(const std::string& key) const;
+ int64_t attrInt64(const std::string& key) const;
+ std::string attrString(const std::string& key) const;
+ bool attrBool(const std::string& key) const;
+ float attrFloat(const std::string& key) const;
+ double attrDouble(const std::string& key) const;
+ framing::Uuid attrUuid(const std::string& key) const;
+ framing::FieldTable attrMap(const std::string& key) const;
+
+ private:
+ Broker* broker;
+ SchemaClass* schema;
+ ObjectId objectId;
+ uint64_t currentTime;
+ uint64_t createTime;
+ uint64_t deleteTime;
+ AttributeMap attributes;
+ SchemaMethod* pendingMethod;
+ MethodResponse methodResponse;
+
+ void parsePresenceMasks(framing::Buffer& buffer, std::set<std::string>& excludeList);
+ };
+
+ std::ostream& operator<<(std::ostream& o, const Object& object);
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/console/ObjectId.cpp b/cpp/src/qpid/console/ObjectId.cpp
new file mode 100644
index 0000000000..535e59e88d
--- /dev/null
+++ b/cpp/src/qpid/console/ObjectId.cpp
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ObjectId.h"
+#include "qpid/framing/Buffer.h"
+
+using namespace qpid::console;
+using namespace std;
+
+ObjectId::ObjectId(framing::Buffer& buffer)
+{
+ decode(buffer);
+}
+
+void ObjectId::decode(framing::Buffer& buffer)
+{
+ first = buffer.getLongLong();
+ second = buffer.getLongLong();
+}
+
+void ObjectId::encode(framing::Buffer& buffer)
+{
+ buffer.putLongLong(first);
+ buffer.putLongLong(second);
+}
+
+ostream& qpid::console::operator<<(ostream& o, const ObjectId& id)
+{
+ o << (int) id.getFlags() << "-" << id.getSequence() << "-" << id.getBrokerBank() << "-" <<
+ id.getAgentBank() << "-" << id.getObject();
+ return o;
+}
+
+
diff --git a/cpp/src/qpid/console/ObjectId.h b/cpp/src/qpid/console/ObjectId.h
new file mode 100644
index 0000000000..c9c2fc852a
--- /dev/null
+++ b/cpp/src/qpid/console/ObjectId.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_CONSOLE_OBJECTID_H
+#define _QPID_CONSOLE_OBJECTID_H
+
+#include <iostream>
+
+namespace qpid {
+namespace framing {
+ class Buffer;
+}
+namespace console {
+
+ /**
+ *
+ * \ingroup qmfconsoleapi
+ */
+ class ObjectId {
+ public:
+ ObjectId() : first(0), second(0) {}
+ ObjectId(framing::Buffer& buffer);
+
+ uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; }
+ uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; }
+ uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; }
+ uint32_t getAgentBank() const { return first & 0x000000000FFFFFFFLL; }
+ uint64_t getObject() const { return second; }
+ bool isDurable() const { return getSequence() == 0; }
+ void decode(framing::Buffer& buffer);
+ void encode(framing::Buffer& buffer);
+ void setValue(uint64_t f, uint64_t s) { first = f; second = s; }
+
+ private:
+ uint64_t first;
+ uint64_t second;
+ };
+
+ std::ostream& operator<<(std::ostream& o, const ObjectId& id);
+}
+}
+
+#endif
diff --git a/cpp/src/qpid/console/Package.cpp b/cpp/src/qpid/console/Package.cpp
new file mode 100644
index 0000000000..81a04445f2
--- /dev/null
+++ b/cpp/src/qpid/console/Package.cpp
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Package.h"
+
+using namespace qpid::console;
+
+SchemaClass* Package::getClass(const std::string& className, uint8_t* hash)
+{
+ NameHash key(className, hash);
+ ClassMap::iterator iter = classes.find(key);
+ if (iter != classes.end())
+ return iter->second;
+ return 0;
+}
+
+void Package::addClass(const std::string& className, uint8_t* hash, SchemaClass* schemaClass)
+{
+ NameHash key(className, hash);
+ ClassMap::iterator iter = classes.find(key);
+ if (iter == classes.end())
+ classes[key] = schemaClass;
+}
diff --git a/cpp/src/qpid/console/Package.h b/cpp/src/qpid/console/Package.h
new file mode 100644
index 0000000000..a8679dff19
--- /dev/null
+++ b/cpp/src/qpid/console/Package.h
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_CONSOLE_PACKAGE_H_
+#define _QPID_CONSOLE_PACKAGE_H_
+
+#include <string>
+#include <map>
+
+namespace qpid {
+namespace console {
+ class SchemaClass;
+
+ /**
+ *
+ * \ingroup qmfconsoleapi
+ */
+ class Package {
+ public:
+ Package(const std::string& n) : name(n) {}
+ const std::string& getName() const { return name; }
+
+ private:
+ friend class SessionManager;
+ struct NameHash {
+ std::string name;
+ uint8_t hash[16];
+ NameHash(const std::string& n, const uint8_t* h) : name(n) {
+ for (int i = 0; i < 16; i++)
+ hash[i] = h[i];
+ }
+ };
+
+ struct NameHashComp {
+ bool operator() (const NameHash& lhs, const NameHash& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+ typedef std::map<NameHash, SchemaClass*, NameHashComp> ClassMap;
+
+ const std::string name;
+ ClassMap classes;
+
+ SchemaClass* getClass(const std::string& className, uint8_t* hash);
+ void addClass(const std::string& className, uint8_t* hash,
+ SchemaClass* schemaClass);
+ };
+}
+}
+
+#endif
diff --git a/cpp/src/qpid/console/Schema.cpp b/cpp/src/qpid/console/Schema.cpp
new file mode 100644
index 0000000000..1cc8b8ee02
--- /dev/null
+++ b/cpp/src/qpid/console/Schema.cpp
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Schema.h"
+#include "Value.h"
+#include "qpid/framing/FieldTable.h"
+
+using namespace qpid::console;
+using std::string;
+using std::vector;
+
+SchemaArgument::SchemaArgument(framing::Buffer& buffer, bool forMethod)
+{
+ framing::FieldTable map;
+ map.decode(buffer);
+
+ name = map.getAsString("name");
+ typeCode = map.getAsInt("type");
+ unit = map.getAsString("unit");
+ min = map.getAsInt("min");
+ max = map.getAsInt("max");
+ maxLen = map.getAsInt("maxlen");
+ desc = map.getAsString("desc");
+
+ dirInput = false;
+ dirOutput = false;
+ if (forMethod) {
+ string dir(map.getAsString("dir"));
+ if (dir.find('I') != dir.npos || dir.find('i') != dir.npos)
+ dirInput = true;
+ if (dir.find('O') != dir.npos || dir.find('o') != dir.npos)
+ dirOutput = true;
+ }
+}
+
+Value* SchemaArgument::decodeValue(framing::Buffer& buffer)
+{
+ return ValueFactory::newValue(typeCode, buffer);
+}
+
+SchemaProperty::SchemaProperty(framing::Buffer& buffer)
+{
+ framing::FieldTable map;
+ map.decode(buffer);
+
+ name = map.getAsString("name");
+ typeCode = map.getAsInt("type");
+ accessCode = map.getAsInt("access");
+ isIndex = map.getAsInt("index") != 0;
+ isOptional = map.getAsInt("optional") != 0;
+ unit = map.getAsString("unit");
+ min = map.getAsInt("min");
+ max = map.getAsInt("max");
+ maxLen = map.getAsInt("maxlen");
+ desc = map.getAsString("desc");
+}
+
+Value* SchemaProperty::decodeValue(framing::Buffer& buffer)
+{
+ return ValueFactory::newValue(typeCode, buffer);
+}
+
+SchemaStatistic::SchemaStatistic(framing::Buffer& buffer)
+{
+ framing::FieldTable map;
+ map.decode(buffer);
+
+ name = map.getAsString("name");
+ typeCode = map.getAsInt("type");
+ unit = map.getAsString("unit");
+ desc = map.getAsString("desc");
+}
+
+Value* SchemaStatistic::decodeValue(framing::Buffer& buffer)
+{
+ return ValueFactory::newValue(typeCode, buffer);
+}
+
+SchemaMethod::SchemaMethod(framing::Buffer& buffer)
+{
+ framing::FieldTable map;
+ map.decode(buffer);
+
+ name = map.getAsString("name");
+ desc = map.getAsString("desc");
+ int argCount = map.getAsInt("argCount");
+
+ for (int i = 0; i < argCount; i++)
+ arguments.push_back(new SchemaArgument(buffer, true));
+}
+
+SchemaMethod::~SchemaMethod()
+{
+ for (vector<SchemaArgument*>::iterator iter = arguments.begin();
+ iter != arguments.end(); iter++)
+ delete *iter;
+}
+
+SchemaClass::SchemaClass(const uint8_t _kind, const ClassKey& _key, framing::Buffer& buffer) :
+ kind(_kind), key(_key)
+{
+ if (kind == KIND_TABLE) {
+ uint16_t propCount = buffer.getShort();
+ uint16_t statCount = buffer.getShort();
+ uint16_t methodCount = buffer.getShort();
+
+ for (uint16_t idx = 0; idx < propCount; idx++)
+ properties.push_back(new SchemaProperty(buffer));
+ for (uint16_t idx = 0; idx < statCount; idx++)
+ statistics.push_back(new SchemaStatistic(buffer));
+ for (uint16_t idx = 0; idx < methodCount; idx++)
+ methods.push_back(new SchemaMethod(buffer));
+
+ } else if (kind == KIND_EVENT) {
+ uint16_t argCount = buffer.getShort();
+
+ for (uint16_t idx = 0; idx < argCount; idx++)
+ arguments.push_back(new SchemaArgument(buffer));
+ }
+}
+
+SchemaClass::~SchemaClass()
+{
+ for (vector<SchemaProperty*>::iterator iter = properties.begin();
+ iter != properties.end(); iter++)
+ delete *iter;
+ for (vector<SchemaStatistic*>::iterator iter = statistics.begin();
+ iter != statistics.end(); iter++)
+ delete *iter;
+ for (vector<SchemaMethod*>::iterator iter = methods.begin();
+ iter != methods.end(); iter++)
+ delete *iter;
+ for (vector<SchemaArgument*>::iterator iter = arguments.begin();
+ iter != arguments.end(); iter++)
+ delete *iter;
+}
+
diff --git a/cpp/src/qpid/console/Schema.h b/cpp/src/qpid/console/Schema.h
new file mode 100644
index 0000000000..accd3a8b16
--- /dev/null
+++ b/cpp/src/qpid/console/Schema.h
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_CONSOLE_SCHEMA_H_
+#define _QPID_CONSOLE_SCHEMA_H_
+
+#include "ClassKey.h"
+#include <vector>
+
+namespace qpid {
+namespace framing {
+ class Buffer;
+}
+namespace console {
+ class Value;
+
+ struct SchemaArgument {
+ SchemaArgument(framing::Buffer& buffer, bool forMethod = false);
+ Value* decodeValue(framing::Buffer& buffer);
+
+ std::string name;
+ uint8_t typeCode;
+ bool dirInput;
+ bool dirOutput;
+ std::string unit;
+ int min;
+ int max;
+ int maxLen;
+ std::string desc;
+ std::string defaultVal;
+ };
+
+ struct SchemaProperty {
+ SchemaProperty(framing::Buffer& buffer);
+ Value* decodeValue(framing::Buffer& buffer);
+
+ std::string name;
+ uint8_t typeCode;
+ uint8_t accessCode;
+ bool isIndex;
+ bool isOptional;
+ std::string unit;
+ int min;
+ int max;
+ int maxLen;
+ std::string desc;
+ };
+
+ struct SchemaStatistic {
+ SchemaStatistic(framing::Buffer& buffer);
+ Value* decodeValue(framing::Buffer& buffer);
+
+ std::string name;
+ uint8_t typeCode;
+ std::string unit;
+ std::string desc;
+ };
+
+ struct SchemaMethod {
+ SchemaMethod(framing::Buffer& buffer);
+ ~SchemaMethod();
+
+ std::string name;
+ std::string desc;
+ std::vector<SchemaArgument*> arguments;
+ };
+
+ struct SchemaClass {
+ static const uint8_t KIND_TABLE = 1;
+ static const uint8_t KIND_EVENT = 2;
+
+ SchemaClass(const uint8_t kind, const ClassKey& key, framing::Buffer& buffer);
+ ~SchemaClass();
+ const ClassKey& getClassKey() const { return key; }
+
+ const uint8_t kind;
+ const ClassKey key;
+ std::vector<SchemaProperty*> properties;
+ std::vector<SchemaStatistic*> statistics;
+ std::vector<SchemaMethod*> methods;
+ std::vector<SchemaArgument*> arguments;
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/console/SequenceManager.cpp b/cpp/src/qpid/console/SequenceManager.cpp
new file mode 100644
index 0000000000..ff777430c0
--- /dev/null
+++ b/cpp/src/qpid/console/SequenceManager.cpp
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SequenceManager.h"
+
+using namespace qpid::console;
+using namespace qpid::sys;
+using std::string;
+using std::cout;
+using std::endl;
+
+uint32_t SequenceManager::reserve(const std::string& context)
+{
+ Mutex::ScopedLock l(lock);
+ uint32_t result = sequence++;
+ pending[result] = context;
+ return result;
+}
+
+std::string SequenceManager::release(uint32_t seq)
+{
+ Mutex::ScopedLock l(lock);
+ std::map<uint32_t, string>::iterator iter = pending.find(seq);
+ if (iter == pending.end())
+ return string();
+ string result(iter->second);
+ pending.erase(iter);
+ return result;
+}
+
diff --git a/cpp/src/qpid/console/SequenceManager.h b/cpp/src/qpid/console/SequenceManager.h
new file mode 100644
index 0000000000..c7a8c20fe6
--- /dev/null
+++ b/cpp/src/qpid/console/SequenceManager.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_CONSOLE_SEQUENCEMANAGER_H_
+#define _QPID_CONSOLE_SEQUENCEMANAGER_H_
+
+#include "qpid/sys/Mutex.h"
+#include <map>
+#include <string>
+#include <set>
+
+namespace qpid {
+namespace console {
+
+ /**
+ *
+ * \ingroup qpidconsoleapi
+ */
+ class SequenceManager {
+ public:
+ typedef std::set<uint32_t> set;
+
+ SequenceManager() : sequence(0) {}
+ uint32_t reserve(const std::string& context = "");
+ std::string release(uint32_t seq);
+
+ private:
+ sys::Mutex lock;
+ uint32_t sequence;
+ std::map<uint32_t, std::string> pending;
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/console/SessionManager.cpp b/cpp/src/qpid/console/SessionManager.cpp
new file mode 100644
index 0000000000..bd06421445
--- /dev/null
+++ b/cpp/src/qpid/console/SessionManager.cpp
@@ -0,0 +1,446 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionManager.h"
+#include "Schema.h"
+#include "Agent.h"
+#include "qpid/console/ConsoleListener.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FieldTable.h"
+
+using namespace qpid::console;
+using namespace qpid::sys;
+using namespace std;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+
+SessionManager::SessionManager(ConsoleListener* _listener,
+ bool _rcvObjects,
+ bool _rcvEvents,
+ bool _rcvHeartbeats,
+ bool _manageConnections,
+ bool _userBindings) :
+ listener(_listener),
+ rcvObjects((listener != 0) && _rcvObjects),
+ rcvEvents((listener != 0) && _rcvEvents),
+ rcvHeartbeats((listener != 0) && _rcvHeartbeats),
+ userBindings(_userBindings),
+ manageConnections(_manageConnections)
+{
+ bindingKeys();
+}
+
+Broker* SessionManager::addBroker(client::ConnectionSettings& settings)
+{
+ Broker* broker(new Broker(*this, settings));
+ {
+ Mutex::ScopedLock l(brokerListLock);
+ brokers.push_back(broker);
+ }
+ return broker;
+}
+
+void SessionManager::delBroker(Broker* broker)
+{
+ Mutex::ScopedLock l(brokerListLock);
+ for (vector<Broker*>::iterator iter = brokers.begin();
+ iter != brokers.end(); iter++)
+ if (*iter == broker) {
+ brokers.erase(iter);
+ return;
+ }
+}
+
+void SessionManager::getPackages(NameVector& packageNames)
+{
+ allBrokersStable();
+ packageNames.clear();
+ {
+ Mutex::ScopedLock l(lock);
+ for (map<string, Package*>::iterator iter = packages.begin();
+ iter != packages.end(); iter++)
+ packageNames.push_back(iter->first);
+ }
+}
+
+void SessionManager::getClasses(KeyVector& classKeys, const std::string& packageName)
+{
+ allBrokersStable();
+ classKeys.clear();
+ map<string, Package*>::iterator iter = packages.find(packageName);
+ if (iter == packages.end())
+ return;
+
+ Package& package = *(iter->second);
+ for (Package::ClassMap::const_iterator piter = package.classes.begin();
+ piter != package.classes.end(); piter++) {
+ ClassKey key(piter->second->getClassKey());
+ classKeys.push_back(key);
+ }
+}
+
+SchemaClass& SessionManager::getSchema(const ClassKey& classKey)
+{
+ allBrokersStable();
+ map<string, Package*>::iterator iter = packages.find(classKey.getPackageName());
+ if (iter == packages.end())
+ throw Exception("Unknown package");
+
+ Package& package = *(iter->second);
+ Package::NameHash key(classKey.getClassName(), classKey.getHash());
+ Package::ClassMap::iterator cIter = package.classes.find(key);
+ if (cIter == package.classes.end())
+ throw Exception("Unknown class");
+
+ return *(cIter->second);
+}
+
+void SessionManager::bindPackage(const std::string& packageName)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << ".#";
+ bindingKeyList.push_back(key.str());
+ for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++)
+ (*iter)->addBinding(key.str());
+}
+
+void SessionManager::bindClass(const ClassKey& classKey)
+{
+ bindClass(classKey.getPackageName(), classKey.getClassName());
+}
+
+void SessionManager::bindClass(const std::string& packageName, const std::string& className)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << "." << className << ".#";
+ bindingKeyList.push_back(key.str());
+ for (vector<Broker*>::iterator iter = brokers.begin();
+ iter != brokers.end(); iter++)
+ (*iter)->addBinding(key.str());
+}
+
+void SessionManager::getAgents(Agent::Vector& agents, Broker* broker)
+{
+ agents.clear();
+ if (broker != 0) {
+ broker->appendAgents(agents);
+ } else {
+ for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) {
+ (*iter)->appendAgents(agents);
+ }
+ }
+}
+
+void SessionManager::getObjects(Object::Vector& objects, const std::string& className,
+ Broker* _broker, Agent* _agent)
+{
+ Agent::Vector agentList;
+
+ if (_agent != 0) {
+ agentList.push_back(_agent);
+ } else {
+ if (_broker != 0) {
+ _broker->appendAgents(agentList);
+ } else {
+ Mutex::ScopedLock _lock(brokerListLock);
+ for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) {
+ (*iter)->appendAgents(agentList);
+ }
+ }
+ }
+
+ FieldTable ft;
+ uint32_t sequence;
+ ft.setString("_class", className);
+
+ getResult.clear();
+ syncSequenceList.clear();
+ error = string();
+
+ for (Agent::Vector::iterator iter = agentList.begin(); iter != agentList.end(); iter++) {
+ Agent* agent = *iter;
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+ stringstream routingKey;
+ routingKey << "agent." << agent->getBrokerBank() << "." << agent->getAgentBank();
+ {
+ Mutex::ScopedLock _lock(lock);
+ sequence = sequenceManager.reserve("multiget");
+ syncSequenceList.insert(sequence);
+ }
+ agent->getBroker()->encodeHeader(buffer, 'G', sequence);
+ ft.encode(buffer);
+ uint32_t length = buffer.getPosition();
+ buffer.reset();
+ agent->getBroker()->connThreadBody.sendBuffer(buffer, length, "qpid.management", routingKey.str());
+ }
+
+ {
+ Mutex::ScopedLock _lock(lock);
+ while (!syncSequenceList.empty() && error.empty()) {
+ cv.wait(lock); // TODO put timeout in
+ }
+ }
+
+ objects = getResult;
+}
+
+void SessionManager::bindingKeys()
+{
+ bindingKeyList.push_back("schema.#");
+ if (rcvObjects && rcvEvents && rcvHeartbeats && !userBindings) {
+ bindingKeyList.push_back("console.#");
+ } else {
+ if (rcvObjects && !userBindings)
+ bindingKeyList.push_back("console.obj.#");
+ else
+ bindingKeyList.push_back("console.obj.*.*.org.apache.qpid.broker.agent");
+ if (rcvEvents)
+ bindingKeyList.push_back("console.event.#");
+ if (rcvHeartbeats)
+ bindingKeyList.push_back("console.heartbeat");
+ }
+}
+
+void SessionManager::allBrokersStable()
+{
+ Mutex::ScopedLock l(brokerListLock);
+ for (vector<Broker*>::iterator iter = brokers.begin();
+ iter != brokers.end(); iter++)
+ (*iter)->waitForStable();
+}
+
+void SessionManager::startProtocol(Broker* broker)
+{
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ broker->encodeHeader(buffer, 'B');
+ uint32_t length = 512 - buffer.available();
+ buffer.reset();
+ broker->connThreadBody.sendBuffer(buffer, length);
+}
+
+
+void SessionManager::handleBrokerResp(Broker* broker, Buffer& inBuffer, uint32_t)
+{
+ framing::Uuid brokerId;
+
+ brokerId.decode(inBuffer);
+ broker->setBrokerId(brokerId);
+
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ uint32_t sequence = sequenceManager.reserve("startup");
+ broker->encodeHeader(buffer, 'P', sequence);
+ uint32_t length = 512 - buffer.available();
+ buffer.reset();
+ broker->connThreadBody.sendBuffer(buffer, length);
+
+ if (listener != 0) {
+ listener->brokerInfo(*broker);
+ }
+}
+
+void SessionManager::handlePackageInd(Broker* broker, Buffer& inBuffer, uint32_t)
+{
+ string packageName;
+ inBuffer.getShortString(packageName);
+
+ {
+ Mutex::ScopedLock l(lock);
+ map<string, Package*>::iterator iter = packages.find(packageName);
+ if (iter == packages.end()) {
+ packages[packageName] = new Package(packageName);
+ if (listener != 0)
+ listener->newPackage(packageName);
+ }
+ }
+
+ broker->incOutstanding();
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ uint32_t sequence = sequenceManager.reserve("startup");
+ broker->encodeHeader(buffer, 'Q', sequence);
+ buffer.putShortString(packageName);
+ uint32_t length = 512 - buffer.available();
+ buffer.reset();
+ broker->connThreadBody.sendBuffer(buffer, length);
+}
+
+void SessionManager::handleCommandComplete(Broker* broker, Buffer& inBuffer, uint32_t sequence)
+{
+ Mutex::ScopedLock l(lock);
+ uint32_t resultCode = inBuffer.getLong();
+ string resultText;
+ inBuffer.getShortString(resultText);
+ string context = sequenceManager.release(sequence);
+ if (resultCode != 0)
+ QPID_LOG(debug, "Received error in completion: " << resultCode << " " << resultText);
+ if (context == "startup") {
+ broker->decOutstanding();
+ } else if (context == "multiget") {
+ if (syncSequenceList.count(sequence) == 1) {
+ syncSequenceList.erase(sequence);
+ if (syncSequenceList.empty()) {
+ cv.notify();
+ }
+ }
+ }
+ // TODO: Other context cases
+}
+
+void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t)
+{
+ uint8_t kind;
+ string packageName;
+ string className;
+ uint8_t hash[16];
+
+ kind = inBuffer.getOctet();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+
+ {
+ Mutex::ScopedLock l(lock);
+ map<string, Package*>::iterator pIter = packages.find(packageName);
+ if (pIter == packages.end() || pIter->second->getClass(className, hash))
+ return;
+ }
+
+ broker->incOutstanding();
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ uint32_t sequence = sequenceManager.reserve("startup");
+ broker->encodeHeader(buffer, 'S', sequence);
+ buffer.putShortString(packageName);
+ buffer.putShortString(className);
+ buffer.putBin128(hash);
+ uint32_t length = 512 - buffer.available();
+ buffer.reset();
+ broker->connThreadBody.sendBuffer(buffer, length);
+}
+
+void SessionManager::handleMethodResp(Broker* broker, Buffer& buffer, uint32_t sequence)
+{
+ if (broker->methodObject) {
+ broker->methodObject->handleMethodResp(buffer, sequence);
+ }
+}
+
+void SessionManager::handleHeartbeatInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/)
+{
+}
+
+void SessionManager::handleEventInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/)
+{
+}
+
+void SessionManager::handleSchemaResp(Broker* broker, Buffer& inBuffer, uint32_t sequence)
+{
+ uint8_t kind;
+ string packageName;
+ string className;
+ uint8_t hash[16];
+
+ kind = inBuffer.getOctet();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+
+ {
+ Mutex::ScopedLock l(lock);
+ map<string, Package*>::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end() && !pIter->second->getClass(className, hash)) {
+ ClassKey key(packageName, className, hash);
+ SchemaClass* schemaClass(new SchemaClass(kind, key, inBuffer));
+ pIter->second->addClass(className, hash, schemaClass);
+ if (listener != 0) {
+ listener->newClass(schemaClass->getClassKey());
+ }
+ }
+ }
+
+ sequenceManager.release(sequence);
+ broker->decOutstanding();
+}
+
+void SessionManager::handleContentInd(Broker* broker, Buffer& buffer, uint32_t sequence, bool prop, bool stat)
+{
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ SchemaClass* schemaClass;
+
+ buffer.getShortString(packageName);
+ buffer.getShortString(className);
+ buffer.getBin128(hash);
+
+ {
+ Mutex::ScopedLock l(lock);
+ map<string, Package*>::iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return;
+ schemaClass = pIter->second->getClass(className, hash);
+ if (schemaClass == 0)
+ return;
+ }
+
+ Object object(broker, schemaClass, buffer, prop, stat);
+
+ if (prop && className == "agent" && packageName == "org.apache.qpid.broker")
+ broker->updateAgent(object);
+
+ {
+ Mutex::ScopedLock l(lock);
+ if (syncSequenceList.count(sequence) == 1) {
+ if (!object.isDeleted())
+ getResult.push_back(object);
+ }
+ return;
+ }
+
+ if (listener) {
+ if (prop)
+ listener->objectProps(*broker, object);
+ if (stat)
+ listener->objectStats(*broker, object);
+ }
+}
+
+void SessionManager::handleBrokerConnect(Broker* broker)
+{
+ if (listener != 0)
+ listener->brokerConnected(*broker);
+}
+
+void SessionManager::handleBrokerDisconnect(Broker* broker)
+{
+ if (listener != 0)
+ listener->brokerDisconnected(*broker);
+}
+
diff --git a/cpp/src/qpid/console/SessionManager.h b/cpp/src/qpid/console/SessionManager.h
new file mode 100644
index 0000000000..f860a4590e
--- /dev/null
+++ b/cpp/src/qpid/console/SessionManager.h
@@ -0,0 +1,194 @@
+#ifndef _QPID_CONSOLE_SESSION_MANAGER_H
+#define _QPID_CONSOLE_SESSION_MANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Broker.h"
+#include "Package.h"
+#include "SequenceManager.h"
+#include "ClassKey.h"
+#include "Schema.h"
+#include "Agent.h"
+#include "Object.h"
+#include "ObjectId.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+#include "qpid/client/ConnectionSettings.h"
+#include <string>
+#include <vector>
+
+namespace qpid {
+namespace console {
+
+class ConsoleListener;
+
+/**
+ *
+ * \ingroup qmfconsoleapi
+ */
+class SessionManager
+{
+ public:
+ typedef std::vector<std::string> NameVector;
+ typedef std::vector<ClassKey> KeyVector;
+ ~SessionManager() {}
+
+ /** Create a new SessionManager
+ *
+ * Provide your own subclass of ConsoleListener to receive updates and indications
+ * asynchronously or leave it as its default and use only synchronous methods.
+ *
+ *@param listener Listener object to receive asynchronous indications.
+ *@param rcvObjects Listener wishes to receive managed object data.
+ *@param rcvEvents Listener wishes to receive events.
+ *@param rcvHeartbeats Listener wishes to receive agent heartbeats.
+ *@param manageConnections SessionManager should retry dropped connections.
+ *@param userBindings If rcvObjects is true, userBindings allows the console client
+ * to control which object classes are received. See the bindPackage and bindClass
+ * methods. If userBindings is false, the listener will receive updates for all
+ * object classes.
+ */
+ SessionManager(ConsoleListener* listener = 0,
+ bool rcvObjects = true,
+ bool rcvEvents = true,
+ bool rcvHeartbeats = true,
+ bool manageConnections = false,
+ bool userBindings = false);
+
+ /** Connect a broker to the console session
+ *
+ *@param settings Connection settings for client access
+ *@return broker object if operation is successful
+ *@exception If 'manageConnections' is disabled and the connection to the broker fails,
+ * an exception shall be thrown.
+ */
+ Broker* addBroker(client::ConnectionSettings& settings);
+
+ /** Disconnect a broker from the console session
+ *
+ *@param broker The broker object returned from an earlier call to addBroker.
+ */
+ void delBroker(Broker* broker);
+
+ /** Get a list of known management packages
+ *
+ *@param packages Vector of package names returned by the session manager.
+ */
+ void getPackages(NameVector& packages);
+
+ /** Get a list of class keys associated with a package
+ *
+ *@param classKeys List of class keys returned by the session manager.
+ *@param packageName Name of package being queried.
+ */
+ void getClasses(KeyVector& classKeys, const std::string& packageName);
+
+ /** Get the schema of a class given its class key
+ *
+ *@param classKey Class key of the desired schema.
+ */
+ SchemaClass& getSchema(const ClassKey& classKey);
+
+ /** Request that updates be received for all classes within a package
+ *
+ * Note that this method is only meaningful if a ConsoleListener was provided at session
+ * creation and if the 'userBindings' flag was set to true.
+ *
+ *@param packageName Name of the package to which to bind.
+ */
+ void bindPackage(const std::string& packageName);
+
+ /** Request update to be received for a particular class
+ *
+ * Note that this method is only meaningful if a ConsoleListener was provided at session
+ * creation and if the 'userBindings' flag was set to true.
+ *
+ *@param classKey Class key of class to which to bind.
+ */
+ void bindClass(const ClassKey& classKey);
+ void bindClass(const std::string& packageName, const std::string& className);
+
+ /** Get a list of qmf agents known to the session manager.
+ *
+ *@param agents Vector of Agent objects returned by the session manager.
+ *@param broker Return agents registered with this broker only. If NULL, return agents
+ * from all connected brokers.
+ */
+ void getAgents(Agent::Vector& agents, Broker* broker = 0);
+
+ /** Get objects from agents. There are four variants of this method with different ways of
+ * specifying from which class objects are being queried.
+ *
+ *@param objects List of objects received.
+ *@param classKey ClassKey object identifying class to be queried.
+ *@param className Class name identifying class to be queried.
+ *@param objectId Object Id of the single object to be queried.
+ *@param broker Restrict the query to this broker, or all brokers if NULL.
+ *@param agent Restrict the query to this agent, or all agents if NULL.
+ */
+ void getObjects(Object::Vector& objects, const std::string& className,
+ Broker* broker = 0, Agent* agent = 0);
+ //void getObjects(Object::Vector& objects, const ClassKey& classKey,
+ // Broker* broker = 0, Agent* agent = 0);
+ //void getObjects(Object::Vector& objects, const ObjectId& objectId,
+ // Broker* broker = 0, Agent* agent = 0);
+
+private:
+ friend class Broker;
+ friend class Object;
+ sys::Mutex lock;
+ sys::Mutex brokerListLock;
+ ConsoleListener* listener;
+ std::vector<Broker*> brokers;
+ std::map<std::string, Package*> packages;
+ SequenceManager sequenceManager;
+ sys::Condition cv;
+ SequenceManager::set syncSequenceList;
+ Object::Vector getResult;
+ std::string error;
+ bool rcvObjects;
+ bool rcvEvents;
+ bool rcvHeartbeats;
+ bool userBindings;
+ bool manageConnections;
+ NameVector bindingKeyList;
+
+ void bindingKeys();
+ void allBrokersStable();
+ void startProtocol(Broker* broker);
+ void handleBrokerResp(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence);
+ void handlePackageInd(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence);
+ void handleCommandComplete(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence);
+ void handleClassInd(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence);
+ void handleMethodResp(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence);
+ void handleHeartbeatInd(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence);
+ void handleEventInd(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence);
+ void handleSchemaResp(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence);
+ void handleContentInd(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence, bool prop, bool stat);
+ void handleBrokerConnect(Broker* broker);
+ void handleBrokerDisconnect(Broker* broker);
+
+};
+
+}} // namespace qpid::console
+
+#endif /*!_QPID_CONSOLE_SESSION_MANAGER_H*/
diff --git a/cpp/src/qpid/console/Value.cpp b/cpp/src/qpid/console/Value.cpp
new file mode 100644
index 0000000000..3ef5d01ec3
--- /dev/null
+++ b/cpp/src/qpid/console/Value.cpp
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Value.h"
+#include "qpid/framing/Buffer.h"
+
+using namespace qpid::console;
+using namespace std;
+
+string NullValue::str() const
+{
+ return "<Null>";
+}
+
+RefValue::RefValue(framing::Buffer& buffer)
+{
+ uint64_t first = buffer.getLongLong();
+ uint64_t second = buffer.getLongLong();
+ value.setValue(first, second);
+}
+
+string RefValue::str() const
+{
+ stringstream s;
+ s << value;
+ return s.str();
+}
+
+string UintValue::str() const
+{
+ stringstream s;
+ s << value;
+ return s.str();
+}
+
+string IntValue::str() const
+{
+ stringstream s;
+ s << value;
+ return s.str();
+}
+
+string Uint64Value::str() const
+{
+ stringstream s;
+ s << value;
+ return s.str();
+}
+
+string Int64Value::str() const
+{
+ stringstream s;
+ s << value;
+ return s.str();
+}
+
+StringValue::StringValue(framing::Buffer& buffer, int tc)
+{
+ if (tc == 6)
+ buffer.getShortString(value);
+ else
+ buffer.getMediumString(value);
+}
+
+string BoolValue::str() const
+{
+ return value ? "T" : "F";
+}
+
+string FloatValue::str() const
+{
+ stringstream s;
+ s << value;
+ return s.str();
+}
+
+string DoubleValue::str() const
+{
+ stringstream s;
+ s << value;
+ return s.str();
+}
+
+UuidValue::UuidValue(framing::Buffer& buffer)
+{
+ value.decode(buffer);
+}
+
+string MapValue::str() const
+{
+ stringstream s;
+ s << value;
+ return s.str();
+}
+
+MapValue::MapValue(framing::Buffer& buffer)
+{
+ value.decode(buffer);
+}
+
+
+Value* ValueFactory::newValue(int typeCode, framing::Buffer& buffer)
+{
+ switch (typeCode) {
+ case 1: return static_cast<Value*>(new UintValue(buffer.getOctet())); // U8
+ case 2: return static_cast<Value*>(new UintValue(buffer.getShort())); // U16
+ case 3: return static_cast<Value*>(new UintValue(buffer.getLong())); // U32
+ case 4: return static_cast<Value*>(new Uint64Value(buffer.getLongLong())); // U64
+ case 6: return static_cast<Value*>(new StringValue(buffer, 6)); // SSTR
+ case 7: return static_cast<Value*>(new StringValue(buffer, 7)); // LSTR
+ case 8: return static_cast<Value*>(new Int64Value(buffer.getLongLong())); // ABSTIME
+ case 9: return static_cast<Value*>(new Uint64Value(buffer.getLongLong())); // DELTATIME
+ case 10: return static_cast<Value*>(new RefValue(buffer)); // REF
+ case 11: return static_cast<Value*>(new BoolValue(buffer.getOctet())); // BOOL
+ case 12: return static_cast<Value*>(new FloatValue(buffer.getFloat())); // FLOAT
+ case 13: return static_cast<Value*>(new DoubleValue(buffer.getDouble())); // DOUBLE
+ case 14: return static_cast<Value*>(new UuidValue(buffer)); // UUID
+ case 15: return static_cast<Value*>(new MapValue(buffer)); // MAP
+ case 16: return static_cast<Value*>(new IntValue(buffer.getOctet())); // S8
+ case 17: return static_cast<Value*>(new IntValue(buffer.getShort())); // S16
+ case 18: return static_cast<Value*>(new IntValue(buffer.getLong())); // S32
+ case 19: return static_cast<Value*>(new Int64Value(buffer.getLongLong())); // S64
+ }
+
+ return 0;
+}
+
+void ValueFactory::encodeValue(int typeCode, Value* value, framing::Buffer& buffer)
+{
+ switch (typeCode) {
+ case 1: buffer.putOctet(value->asUint()); return; // U8
+ case 2: buffer.putShort(value->asUint()); return; // U16
+ case 3: buffer.putLong(value->asUint()); return; // U32
+ case 4: buffer.putLongLong(value->asUint64()); return; // U64
+ case 6: buffer.putShortString(value->asString()); return; // SSTR
+ case 7: buffer.putMediumString(value->asString()); return; // LSTR
+ case 8: buffer.putLongLong(value->asInt64()); return; // ABSTIME
+ case 9: buffer.putLongLong(value->asUint64()); return; // DELTATIME
+ case 10: value->asObjectId().encode(buffer); return; // REF
+ case 11: buffer.putOctet(value->asBool() ? 1 : 0); return; // BOOL
+ case 12: buffer.putFloat(value->asFloat()); return; // FLOAT
+ case 13: buffer.putDouble(value->asDouble()); return; // DOUBLE
+ case 14: value->asUuid().encode(buffer); return; // UUID
+ case 15: value->asMap().encode(buffer); return; // MAP
+ case 16: buffer.putOctet(value->asInt()); return; // S8
+ case 17: buffer.putShort(value->asInt()); return; // S16
+ case 18: buffer.putLong(value->asInt()); return; // S32
+ case 19: buffer.putLongLong(value->asInt64()); return; // S64
+ }
+}
diff --git a/cpp/src/qpid/console/Value.h b/cpp/src/qpid/console/Value.h
new file mode 100644
index 0000000000..42ff5661a2
--- /dev/null
+++ b/cpp/src/qpid/console/Value.h
@@ -0,0 +1,205 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_CONSOLE_VALUE_H_
+#define _QPID_CONSOLE_VALUE_H_
+
+#include "qpid/Exception.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FieldTable.h"
+#include "ObjectId.h"
+
+namespace qpid {
+namespace framing {
+ class Buffer;
+}
+namespace console {
+
+ /**
+ * \ingroup qmfconsoleapi
+ */
+ class Value {
+
+ public:
+ virtual ~Value() {}
+ virtual std::string str() const = 0;
+
+ virtual bool isNull() const { return false; }
+ virtual bool isObjectId() const { return false; }
+ virtual bool isUint() const { return false; }
+ virtual bool isInt() const { return false; }
+ virtual bool isUint64() const { return false; }
+ virtual bool isInt64() const { return false; }
+ virtual bool isString() const { return false; }
+ virtual bool isBool() const { return false; }
+ virtual bool isFloat() const { return false; }
+ virtual bool isDouble() const { return false; }
+ virtual bool isUuid() const { return false; }
+ virtual bool isMap() const { return false; }
+
+ virtual ObjectId asObjectId() const { incompatible(); return ObjectId(); }
+ virtual uint32_t asUint() const { incompatible(); return 0; }
+ virtual int32_t asInt() const { incompatible(); return 0; }
+ virtual uint64_t asUint64() const { incompatible(); return 0; }
+ virtual int64_t asInt64() const { incompatible(); return 0; }
+ virtual std::string asString() const { incompatible(); return std::string(); }
+ virtual bool asBool() const { incompatible(); return false; }
+ virtual float asFloat() const { incompatible(); return 0.0; }
+ virtual double asDouble() const { incompatible(); return 0.0; }
+ virtual framing::Uuid asUuid() const { incompatible(); return framing::Uuid(); }
+ virtual framing::FieldTable asMap() const { incompatible(); return framing::FieldTable(); }
+
+ private:
+ void incompatible() const {
+ throw Exception("Incompatible Type");
+ }
+ };
+
+ class NullValue : public Value {
+ public:
+ NullValue() {}
+ std::string str() const;
+ bool isNull() const { return true; }
+ };
+
+ class RefValue : public Value {
+ public:
+ RefValue(ObjectId v) : value(v) {}
+ RefValue(framing::Buffer& buffer);
+ std::string str() const;
+ bool isObjectId() const { return true; }
+ ObjectId asObjectId() const { return value; }
+ private:
+ ObjectId value;
+ };
+
+ class UintValue : public Value {
+ public:
+ UintValue(uint32_t v) : value(v) {}
+ std::string str() const;
+ bool isUint() const { return true; }
+ uint32_t asUint() const { return value; }
+ private:
+ uint32_t value;
+ };
+
+ class IntValue : public Value {
+ public:
+ IntValue(int32_t v) : value(v) {}
+ std::string str() const;
+ bool isInt() const { return true; }
+ int32_t asInt() const { return value; }
+ private:
+ int32_t value;
+ };
+
+ class Uint64Value : public Value {
+ public:
+ Uint64Value(uint64_t v) : value(v) {}
+ std::string str() const;
+ bool isUint64() const { return true; }
+ uint64_t asUint64() const { return value; }
+ private:
+ uint64_t value;
+ };
+
+ class Int64Value : public Value {
+ public:
+ Int64Value(int64_t v) : value(v) {}
+ std::string str() const;
+ bool isInt64() const { return true; }
+ int64_t asInt64() const { return value; }
+ private:
+ int64_t value;
+ };
+
+ class StringValue : public Value {
+ public:
+ StringValue(const std::string& v) : value(v) {}
+ StringValue(framing::Buffer& buffer, int tc);
+ std::string str() const { return value; }
+ bool isString() const { return true; }
+ std::string asString() const { return value; }
+ private:
+ std::string value;
+ };
+
+ class BoolValue : public Value {
+ public:
+ BoolValue(bool v) : value(v) {}
+ BoolValue(uint8_t v) : value(v != 0) {}
+ std::string str() const;
+ bool isBool() const { return true; }
+ bool asBool() const { return value; }
+ private:
+ bool value;
+ };
+
+ class FloatValue : public Value {
+ public:
+ FloatValue(float v) : value(v) {}
+ std::string str() const;
+ bool isFloat() const { return true; }
+ float asFloat() const { return value; }
+ private:
+ float value;
+ };
+
+ class DoubleValue : public Value {
+ public:
+ DoubleValue(double v) : value(v) {}
+ std::string str() const;
+ bool isDouble() const { return true; }
+ double asDouble() const { return value; }
+ private:
+ double value;
+ };
+
+ class UuidValue : public Value {
+ public:
+ UuidValue(const framing::Uuid& v) : value(v) {}
+ UuidValue(framing::Buffer& buffer);
+ std::string str() const { return value.str(); }
+ bool isUuid() const { return true; }
+ framing::Uuid asUuid() const { return value; }
+ private:
+ framing::Uuid value;
+ };
+
+ class MapValue : public Value {
+ public:
+ MapValue(const framing::FieldTable& v) : value(v) {}
+ MapValue(framing::Buffer& buffer);
+ std::string str() const;
+ bool isMap() const { return true; }
+ framing::FieldTable asMap() const { return value; }
+ private:
+ framing::FieldTable value;
+ };
+
+ class ValueFactory {
+ public:
+ static Value* newValue(int typeCode, framing::Buffer& buffer);
+ static void encodeValue(int typeCode, Value* value, framing::Buffer& buffer);
+ };
+}
+}
+
+#endif
diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp
index e8d8d517dd..fbbaac1cf5 100644
--- a/cpp/src/qpid/framing/Uuid.cpp
+++ b/cpp/src/qpid/framing/Uuid.cpp
@@ -52,7 +52,7 @@ istream& operator>>(istream& in, Uuid& uuid) {
return in;
}
-std::string Uuid::str() {
+std::string Uuid::str() const {
std::ostringstream os;
os << *this;
return os.str();
diff --git a/cpp/src/qpid/framing/Uuid.h b/cpp/src/qpid/framing/Uuid.h
index 4e5de9ce41..2fcbb5a261 100644
--- a/cpp/src/qpid/framing/Uuid.h
+++ b/cpp/src/qpid/framing/Uuid.h
@@ -68,7 +68,7 @@ struct Uuid : public boost::array<uint8_t, 16> {
uint32_t encodedSize() const { return size(); }
/** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */
- std::string str();
+ std::string str() const;
template <class S> void serialize(S& s) {
s.raw(begin(), size());
diff --git a/cpp/src/qpid/sys/SystemInfo.h b/cpp/src/qpid/sys/SystemInfo.h
index 5a116cf8ee..d43fe34b04 100644
--- a/cpp/src/qpid/sys/SystemInfo.h
+++ b/cpp/src/qpid/sys/SystemInfo.h
@@ -63,6 +63,17 @@ namespace SystemInfo {
std::string &version,
std::string &machine);
+ /**
+ * Get the process ID of the current process.
+ */
+ uint32_t getProcessId();
+
+ /**
+ * Get the process ID of the parent of the current process.
+ */
+ uint32_t getParentProcessId();
+
+
}}} // namespace qpid::sys::SystemInfo
#endif /*!QPID_SYS_SYSTEMINFO_H*/
diff --git a/cpp/src/qpid/sys/posix/SystemInfo.cpp b/cpp/src/qpid/sys/posix/SystemInfo.cpp
index 9bbd023e29..938d4861c4 100755
--- a/cpp/src/qpid/sys/posix/SystemInfo.cpp
+++ b/cpp/src/qpid/sys/posix/SystemInfo.cpp
@@ -94,4 +94,16 @@ void SystemInfo::getSystemId (std::string &osName,
}
}
+uint32_t SystemInfo::getProcessId()
+{
+ return (uint32_t) ::getpid();
+}
+
+uint32_t SystemInfo::getParentProcessId()
+{
+ return (uint32_t) ::getppid();
+}
+
+
+
}} // namespace qpid::sys
diff --git a/cpp/src/tests/ConsoleTest.cpp b/cpp/src/tests/ConsoleTest.cpp
new file mode 100644
index 0000000000..1d55b13f3c
--- /dev/null
+++ b/cpp/src/tests/ConsoleTest.cpp
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/console/Package.h"
+#include "qpid/console/ClassKey.h"
+#include "unit_test.h"
+
+QPID_AUTO_TEST_SUITE(ConsoleTestSuite)
+
+using namespace qpid::framing;
+using namespace qpid::console;
+
+QPID_AUTO_TEST_CASE(testClassKey) {
+ uint8_t hash[16] = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
+ ClassKey k("com.redhat.test", "class", hash);
+
+ BOOST_CHECK_EQUAL(k.getPackageName(), "com.redhat.test");
+ BOOST_CHECK_EQUAL(k.getClassName(), "class");
+ BOOST_CHECK_EQUAL(k.getHashString(), "00010203-04050607-08090a0b-0c0d0e0f");
+ BOOST_CHECK_EQUAL(k.str(), "com.redhat.test:class(00010203-04050607-08090a0b-0c0d0e0f)");
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index cc39309dac..3a608b2bae 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -25,6 +25,7 @@ extra_libs =
lib_client = $(abs_builddir)/../libqpidclient.la
lib_common = $(abs_builddir)/../libqpidcommon.la
lib_broker = $(abs_builddir)/../libqpidbroker.la
+lib_console = $(abs_builddir)/../libqmfconsole.la
# lib_amqp_0_10 = $(abs_builddir)/../libqpidamqp_0_10.la
#
@@ -47,7 +48,7 @@ CLEANFILES=
TESTS+=unit_test
check_PROGRAMS+=unit_test
unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \
- $(lib_client) $(lib_broker)
+ $(lib_client) $(lib_broker) $(lib_console)
unit_test_SOURCES= unit_test.cpp unit_test.h \
BrokerFixture.h SocketProxy.h \
@@ -86,7 +87,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
ConnectionOptions.h \
ForkedBroker.h \
ManagementTest.cpp \
- MessageReplayTracker.cpp
+ MessageReplayTracker.cpp \
+ ConsoleTest.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp