diff options
author | Ted Ross <tross@apache.org> | 2008-12-19 20:22:03 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-12-19 20:22:03 +0000 |
commit | 2660791c236676f28cdbf81da5c0d52110b36d17 (patch) | |
tree | 5a4166a17da10fcd64e05ecddec4c6f14dc0385f | |
parent | d98c3cdf286d3ee16904d66337a20d94a8e95dc3 (diff) | |
download | qpid-python-2660791c236676f28cdbf81da5c0d52110b36d17.tar.gz |
QPID-1412 - c++ implementation of the QMF client API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@728132 13f79535-47bb-0310-9956-ffa450edef68
35 files changed, 3139 insertions, 6 deletions
diff --git a/cpp/configure.ac b/cpp/configure.ac index 3070ce6687..24588b0950 100644 --- a/cpp/configure.ac +++ b/cpp/configure.ac @@ -379,6 +379,7 @@ AC_CONFIG_FILES([ examples/request-response/Makefile examples/failover/Makefile examples/xml-exchange/Makefile + examples/qmf-console/Makefile managementgen/Makefile etc/Makefile src/Makefile diff --git a/cpp/examples/Makefile.am b/cpp/examples/Makefile.am index b870d67802..29a101425c 100644 --- a/cpp/examples/Makefile.am +++ b/cpp/examples/Makefile.am @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. # -SUBDIRS = direct fanout pub-sub request-response failover +SUBDIRS = direct fanout pub-sub request-response failover qmf-console if HAVE_XML SUBDIRS += xml-exchange broker_args = "--no-module-dir --data-dir \"\" --auth no --load-module $(top_builddir)/src/.libs/xml.so" diff --git a/cpp/examples/makedist.mk b/cpp/examples/makedist.mk index f579dca1e3..4345378983 100644 --- a/cpp/examples/makedist.mk +++ b/cpp/examples/makedist.mk @@ -2,6 +2,8 @@ AM_CXXFLAGS = $(WARNING_CFLAGS) INCLUDES = -I$(top_srcdir)/src -I$(top_srcdir)/src/gen -I$(top_builddir)/src -I$(top_builddir)/src/gen CLIENT_LIB=$(top_builddir)/src/libqpidclient.la +CONSOLE_LIB=$(top_builddir)/src/libqmfconsole.la +MAKELDFLAG ?= qpidclient # Generate a simple non-automake Makefile for distribution. MAKEDIST=.libs/Makefile @@ -10,7 +12,7 @@ $(MAKEDIST): Makefile mkdir -p .libs @$(ECHO) CXX=$(CXX) > $(MAKEDIST) @$(ECHO) CXXFLAGS=$(CXXFLAGS) >> $(MAKEDIST) - @$(ECHO) LDFLAGS=-lqpidclient >> $(MAKEDIST) + @$(ECHO) LDFLAGS=-l$(MAKELDFLAG) >> $(MAKEDIST) @$(ECHO) >> $(MAKEDIST) @$(ECHO) all: $(noinst_PROGRAMS) >> $(MAKEDIST) @$(ECHO) >> $(MAKEDIST) diff --git a/cpp/examples/qmf-console/Makefile.am b/cpp/examples/qmf-console/Makefile.am new file mode 100644 index 0000000000..710d9601d6 --- /dev/null +++ b/cpp/examples/qmf-console/Makefile.am @@ -0,0 +1,14 @@ +examplesdir=$(pkgdatadir)/examples/qmf-console + +MAKELDFLAG = qmfconsole +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=qmfc + +qmfc_SOURCES=console.cpp +qmfc_LDADD=$(CONSOLE_LIB) + +examples_DATA= \ + console.cpp \ + $(MAKEDIST) + diff --git a/cpp/examples/qmf-console/console.cpp b/cpp/examples/qmf-console/console.cpp new file mode 100644 index 0000000000..c98f1ace34 --- /dev/null +++ b/cpp/examples/qmf-console/console.cpp @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/console/ConsoleListener.h" +#include "qpid/console/SessionManager.h" +#include "qpid/console/Value.h" +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace std; +using namespace qpid::console; +using std::cout; +using std::endl; + +class Listener : public ConsoleListener { +public: + ~Listener() {} + + void brokerConnected(const Broker& broker) { + cout << "brokerConnected: " << broker << endl; + } + + void brokerDisconnected(const Broker& broker) { + cout << "brokerDisconnected: " << broker << endl; + } + + void newPackage(const std::string& name) { + cout << "newPackage: " << name << endl; + } + + void newClass(const ClassKey& classKey) { + cout << "newClass: key=" << classKey << endl; + } + + void newAgent(const Agent& agent) { + cout << "newAgent: " << agent << endl; + } + + void delAgent(const Agent& agent) { + cout << "delAgent: " << agent << endl; + } + + void objectProps(Broker& broker, Object& object) { + cout << "objectProps: broker=" << broker << " object=" << object << endl; + } + + void objectStats(Broker& broker, Object& object) { + cout << "objectStats: broker=" << broker << " object=" << object << endl; + } +}; + + +//============================================================== +// Main program +//============================================================== +int main_int(int /*argc*/, char** /*argv*/) +{ + //Listener listener; + qpid::client::ConnectionSettings settings; + + cout << "Creating SessionManager" << endl; + SessionManager sm; + cout << "Adding broker" << endl; + Broker* broker; + + broker = sm.addBroker(settings); + + cout << "Package List:" << endl; + vector<string> packages; + sm.getPackages(packages); + for (vector<string>::iterator iter = packages.begin(); iter != packages.end(); iter++) { + cout << " " << *iter << endl; + SessionManager::KeyVector classKeys; + sm.getClasses(classKeys, *iter); + for (SessionManager::KeyVector::iterator cIter = classKeys.begin(); + cIter != classKeys.end(); cIter++) + cout << " " << *cIter << endl; + } + + Object::Vector list; + cout << "getting exchanges..." << endl; + sm.getObjects(list, "exchange"); + cout << " returned " << list.size() << " elements" << endl; + + for (Object::Vector::iterator i = list.begin(); i != list.end(); i++) { + cout << "exchange: " << *i << endl; + } + + list.clear(); + cout << "getting queues..." << endl; + sm.getObjects(list, "queue"); + cout << " returned " << list.size() << " elements" << endl; + + for (Object::Vector::iterator i = list.begin(); i != list.end(); i++) { + cout << "queue: " << *i << endl; + cout << " bindingCount=" << i->attrUint("bindingCount") << endl; + cout << " arguments=" << i->attrMap("arguments") << endl; + } + + list.clear(); + sm.getObjects(list, "broker"); + if (list.size() == 1) { + Object& broker = *list.begin(); + + cout << "Broker: " << broker << endl; + + Object::AttributeMap args; + MethodResponse result; + args["sequence"] = new UintValue(1); + args["body"] = new StringValue("Testing..."); + + cout << "Call echo method..." << endl; + broker.invokeMethod("echo", args, result); + cout << "Result: code=" << result.code << " text=" << result.text << endl; + for (Object::AttributeMap::iterator aIter = result.arguments.begin(); + aIter != result.arguments.end(); aIter++) { + cout << " Output Arg: " << aIter->first << " => " << aIter->second->str() << endl; + } + } + + sm.delBroker(broker); + return 0; +} + +int main(int argc, char** argv) +{ + try { + return main_int(argc, argv); + } catch(std::exception& e) { + cout << "Top Level Exception: " << e.what() << endl; + } +} + diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 396a9dd6a6..606fdb1a41 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -179,6 +179,7 @@ cmodule_LTLIBRARIES = include cluster.mk include acl.mk include qmf.mk +include qmfc.mk if HAVE_XML include xml.mk endif diff --git a/cpp/src/qmfc.mk b/cpp/src/qmfc.mk new file mode 100644 index 0000000000..6fa7d4c988 --- /dev/null +++ b/cpp/src/qmfc.mk @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# qmf console library makefile fragment, to be included in Makefile.am +# +lib_LTLIBRARIES += libqmfconsole.la + +libqmfconsole_la_SOURCES = \ + qpid/console/Agent.h \ + qpid/console/Agent.cpp \ + qpid/console/Broker.h \ + qpid/console/Broker.cpp \ + qpid/console/ClassKey.h \ + qpid/console/ClassKey.cpp \ + qpid/console/ConsoleListener.h \ + qpid/console/Event.h \ + qpid/console/Object.h \ + qpid/console/Object.cpp \ + qpid/console/ObjectId.h \ + qpid/console/ObjectId.cpp \ + qpid/console/Package.h \ + qpid/console/Package.cpp \ + qpid/console/Schema.h \ + qpid/console/Schema.cpp \ + qpid/console/SequenceManager.h \ + qpid/console/SequenceManager.cpp \ + qpid/console/SessionManager.h \ + qpid/console/SessionManager.cpp \ + qpid/console/Value.h \ + qpid/console/Value.cpp + +libqmfconsole_la_LIBADD = libqpidclient.la + diff --git a/cpp/src/qpid/console/Agent.cpp b/cpp/src/qpid/console/Agent.cpp new file mode 100644 index 0000000000..8b5a8adbb4 --- /dev/null +++ b/cpp/src/qpid/console/Agent.cpp @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Agent.h" + +std::ostream& qpid::console::operator<<(std::ostream& o, const Agent& agent) +{ + o << "Agent at bank " << agent.getBrokerBank() << "." << agent.getAgentBank() << + " (" << agent.getLabel() << ")"; + return o; +} + diff --git a/cpp/src/qpid/console/Agent.h b/cpp/src/qpid/console/Agent.h new file mode 100644 index 0000000000..3307a1b44b --- /dev/null +++ b/cpp/src/qpid/console/Agent.h @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_CONSOLE_AGENT_H_ +#define _QPID_CONSOLE_AGENT_H_ + +#include "Broker.h" + +namespace qpid { +namespace console { + + /** + * + * \ingroup qmfconsoleapi + */ + class Agent { + public: + typedef std::vector<Agent*> Vector; + + Agent(Broker* _broker, uint32_t _bank, const std::string& _label) : + broker(_broker), brokerBank(broker->getBrokerBank()), + agentBank(_bank), label(_label) {} + Broker* getBroker() const { return broker; } + uint32_t getBrokerBank() const { return brokerBank; } + uint32_t getAgentBank() const { return agentBank; } + const std::string& getLabel() const { return label; } + + private: + Broker* broker; + const uint32_t brokerBank; + const uint32_t agentBank; + const std::string label; + }; + + std::ostream& operator<<(std::ostream& o, const Agent& agent); +} +} + + +#endif diff --git a/cpp/src/qpid/console/Broker.cpp b/cpp/src/qpid/console/Broker.cpp new file mode 100644 index 0000000000..2e7ba95b1d --- /dev/null +++ b/cpp/src/qpid/console/Broker.cpp @@ -0,0 +1,291 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Broker.h" +#include "Object.h" +#include "Value.h" +#include "SessionManager.h" +#include "ConsoleListener.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/SystemInfo.h" + +using namespace qpid::client; +using namespace qpid::console; +using namespace qpid::framing; +using namespace qpid::sys; +using namespace std; + +Broker::Broker(SessionManager& sm, ConnectionSettings& settings) : + sessionManager(sm), connected(false), connectionSettings(settings), + reqsOutstanding(1), syncInFlight(false), topicBound(false), methodObject(0), + connThreadBody(*this), connThread(connThreadBody) +{ + string osName; + string nodeName; + string release; + string version; + string machine; + + sys::SystemInfo::getSystemId(osName, nodeName, release, version, machine); + uint32_t pid = sys::SystemInfo::getParentProcessId(); + + stringstream text; + + text << "qmfc-cpp-" << nodeName << "-" << pid; + amqpSessionId = string(text.str()); + + QPID_LOG(debug, "Broker::Broker: constructed, amqpSessionId=" << amqpSessionId); +} + +Broker::~Broker() +{ +} + +void Broker::encodeHeader(framing::Buffer& buf, uint8_t opcode, uint32_t seq) const +{ + buf.putOctet('A'); + buf.putOctet('M'); + buf.putOctet('2'); + buf.putOctet(opcode); + buf.putLong (seq); +} + +bool Broker::checkHeader(framing::Buffer& buf, uint8_t *opcode, uint32_t *seq) const +{ + if (buf.getSize() < 8) + return false; + + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); + + *opcode = buf.getOctet(); + *seq = buf.getLong(); + + return h1 == 'A' && h2 == 'M' && h3 == '2'; +} + +void Broker::received(client::Message& msg) +{ + string data = msg.getData(); + Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); + uint8_t opcode; + uint32_t sequence; + + if (checkHeader(inBuffer, &opcode, &sequence)) { + QPID_LOG(trace, "Broker::received: opcode=" << opcode << " seq=" << sequence); + + if (opcode == 'b') sessionManager.handleBrokerResp(this, inBuffer, sequence); + else if (opcode == 'p') sessionManager.handlePackageInd(this, inBuffer, sequence); + else if (opcode == 'z') sessionManager.handleCommandComplete(this, inBuffer, sequence); + else if (opcode == 'q') sessionManager.handleClassInd(this, inBuffer, sequence); + else if (opcode == 'm') sessionManager.handleMethodResp(this, inBuffer, sequence); + else if (opcode == 'h') sessionManager.handleHeartbeatInd(this, inBuffer, sequence); + else if (opcode == 'e') sessionManager.handleEventInd(this, inBuffer, sequence); + else if (opcode == 's') sessionManager.handleSchemaResp(this, inBuffer, sequence); + else if (opcode == 'c') sessionManager.handleContentInd(this, inBuffer, sequence, true, false); + else if (opcode == 'i') sessionManager.handleContentInd(this, inBuffer, sequence, false, true); + else if (opcode == 'g') sessionManager.handleContentInd(this, inBuffer, sequence, true, true); + } +} + +void Broker::resetAgents() +{ + for (AgentMap::iterator iter = agents.begin(); iter != agents.end(); iter++) { + if (sessionManager.listener != 0) + sessionManager.listener->delAgent(*(iter->second)); + delete iter->second; + } + + agents.clear(); + agents[0x0000000100000000LL] = new Agent(this, 0, "BrokerAgent"); +} + +void Broker::updateAgent(const Object& object) +{ + uint32_t brokerBank = object.attrUint("brokerBank"); + uint32_t agentBank = object.attrUint("agentBank"); + uint64_t agentKey = ((uint64_t) brokerBank << 32) | (uint64_t) agentBank; + AgentMap::iterator iter = agents.find(agentKey); + + if (object.isDeleted()) { + if (iter != agents.end()) { + if (sessionManager.listener != 0) + sessionManager.listener->delAgent(*(iter->second)); + delete iter->second; + agents.erase(iter); + } + } else { + if (iter == agents.end()) { + Agent* agent = new Agent(this, agentBank, object.attrString("label")); + agents[agentKey] = agent; + if (sessionManager.listener != 0) + sessionManager.listener->newAgent(*agent); + } + } +} + +void Broker::ConnectionThread::run() +{ + static const int delayMin(1); + static const int delayMax(128); + static const int delayFactor(2); + int delay(delayMin); + string dest("qmfc"); + + sessionId.generate(); + queueName << "qmfc-" << sessionId; + + while (true) { + try { + broker.topicBound = false; + broker.reqsOutstanding = 1; + connection.open(broker.connectionSettings); + session = connection.newSession(queueName.str()); + subscriptions = new client::SubscriptionManager(session); + + session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true, + arg::exclusive=true); + session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), + arg::bindingKey=queueName.str()); + + subscriptions->setAcceptMode(ACCEPT_MODE_NONE); + subscriptions->setAcquireMode(ACQUIRE_MODE_PRE_ACQUIRED); + subscriptions->subscribe(broker, queueName.str(), dest); + subscriptions->setFlowControl(dest, FlowControl::unlimited()); + { + Mutex::ScopedLock _lock(connLock); + operational = true; + broker.resetAgents(); + broker.connected = true; + broker.sessionManager.handleBrokerConnect(&broker); + broker.sessionManager.startProtocol(&broker); + try { + Mutex::ScopedUnlock _unlock(connLock); + subscriptions->run(); + } catch (std::exception) {} + + operational = false; + broker.connected = false; + broker.sessionManager.handleBrokerDisconnect(&broker); + } + delay = delayMin; + delete subscriptions; + subscriptions = 0; + session.close(); + } catch (std::exception &e) { + QPID_LOG(debug, " outer exception: " << e.what()); + if (delay < delayMax) + delay *= delayFactor; + } + + ::sleep(delay); + } +} + +Broker::ConnectionThread::~ConnectionThread() +{ + if (subscriptions != 0) { + delete subscriptions; + } +} + +void Broker::ConnectionThread::sendBuffer(Buffer& buf, uint32_t length, + const string& exchange, const string& routingKey) +{ + { + Mutex::ScopedLock _lock(connLock); + if (!operational) + return; + } + + client::Message msg; + string data; + + buf.getRawData(data, length); + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + msg.setData(data); + try { + session.messageTransfer(arg::content=msg, arg::destination=exchange); + } catch(std::exception&) {} +} + +void Broker::ConnectionThread::bindExchange(const std::string& exchange, const std::string& key) +{ + { + Mutex::ScopedLock _lock(connLock); + if (!operational) + return; + } + + QPID_LOG(debug, "Broker::ConnectionThread::bindExchange: exchange=" << exchange << " key=" << key); + session.exchangeBind(arg::exchange=exchange, arg::queue=queueName.str(), + arg::bindingKey=key); +} + +void Broker::waitForStable() +{ + Mutex::ScopedLock l(lock); + if (reqsOutstanding == 0) + return; + syncInFlight = true; + while (reqsOutstanding != 0) { + cond.wait(lock); // TODO: put timeout delay in here! + } +} + +void Broker::incOutstanding() +{ + Mutex::ScopedLock l(lock); + reqsOutstanding++; +} + +void Broker::decOutstanding() +{ + Mutex::ScopedLock l(lock); + reqsOutstanding--; + if (reqsOutstanding == 0) { + if (!topicBound) { + topicBound = true; + for (vector<string>::const_iterator iter = sessionManager.bindingKeyList.begin(); + iter != sessionManager.bindingKeyList.end(); iter++) + connThreadBody.bindExchange("qpid.management", *iter); + } + if (syncInFlight) { + syncInFlight = false; + cond.notify(); + } + } +} + +void Broker::appendAgents(Agent::Vector& agentlist) const +{ + for (AgentMap::const_iterator iter = agents.begin(); iter != agents.end(); iter++) { + agentlist.push_back(iter->second); + } +} + +ostream& qpid::console::operator<<(ostream& o, const Broker& k) +{ + o << "Broker: " << k.connectionSettings.host << ":" << k.connectionSettings.port; + return o; +} diff --git a/cpp/src/qpid/console/Broker.h b/cpp/src/qpid/console/Broker.h new file mode 100644 index 0000000000..8cca99f764 --- /dev/null +++ b/cpp/src/qpid/console/Broker.h @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_CONSOLE_BROKER_H_ +#define _QPID_CONSOLE_BROKER_H_ + +#include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" +#include "qpid/Url.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/Uuid.h" +#include <string> +#include <iostream> + +namespace qpid { +namespace console { + class SessionManager; + class Agent; + class Object; + + /** + * + * \ingroup qpidconsoleapi + */ + class Broker : public client::MessageListener { + public: + Broker(SessionManager& sm, client::ConnectionSettings& settings); + ~Broker(); + + bool isConnected() const { return connected; } + const std::string& getError() const { return error; } + const std::string& getSessionId() const { return amqpSessionId; } + const framing::Uuid& getBrokerId() const { return brokerId; } + uint32_t getBrokerBank() const { return 1; } + void addBinding(const std::string& key) { + connThreadBody.bindExchange("qpid.management", key); + } + + private: + friend class SessionManager; + friend class Object; + typedef std::map<uint64_t,Agent*> AgentMap; + static const int SYNC_TIME = 60; + + SessionManager& sessionManager; + AgentMap agents; + client::SubscriptionManager* subscription; + bool connected; + std::string error; + std::string amqpSessionId; + client::ConnectionSettings connectionSettings; + sys::Mutex lock; + sys::Condition cond; + framing::Uuid brokerId; + uint32_t reqsOutstanding; + bool syncInFlight; + bool topicBound; + Object* methodObject; + + friend class ConnectionThread; + class ConnectionThread : public sys::Runnable { + bool operational; + Broker& broker; + framing::Uuid sessionId; + client::Connection connection; + client::Session session; + client::SubscriptionManager* subscriptions; + std::stringstream queueName; + sys::Mutex connLock; + void run(); + public: + ConnectionThread(Broker& _broker) : + operational(false), broker(_broker), subscriptions(0) {} + ~ConnectionThread(); + void sendBuffer(qpid::framing::Buffer& buf, + uint32_t length, + const std::string& exchange = "qpid.management", + const std::string& routingKey = "broker"); + void bindExchange(const std::string& exchange, const std::string& key); + }; + + ConnectionThread connThreadBody; + sys::Thread connThread; + + void encodeHeader(framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0) const; + bool checkHeader(framing::Buffer& buf, uint8_t *opcode, uint32_t *seq) const; + void received(client::Message& msg); + void resetAgents(); + void updateAgent(const Object& object); + void waitForStable(); + void incOutstanding(); + void decOutstanding(); + void setBrokerId(const framing::Uuid& id) { brokerId = id; } + void appendAgents(std::vector<Agent*>& agents) const; + + friend std::ostream& operator<<(std::ostream& o, const Broker& k); + }; + + std::ostream& operator<<(std::ostream& o, const Broker& k); +} +} + +#endif diff --git a/cpp/src/qpid/console/ClassKey.cpp b/cpp/src/qpid/console/ClassKey.cpp new file mode 100644 index 0000000000..6aa2bcb117 --- /dev/null +++ b/cpp/src/qpid/console/ClassKey.cpp @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ClassKey.h" +#include <string.h> + +using namespace std; +using namespace qpid::console; + +ClassKey::ClassKey(const string& _package, const string& _name, const uint8_t* _hash) : + package(_package), name(_name) +{ + ::memcpy(hash, _hash, HASH_SIZE); +} + +string ClassKey::getHashString() const +{ + char cstr[36]; + ::sprintf(cstr, "%02x%02x%02x%02x-%02x%02x%02x%02x-%02x%02x%02x%02x-%02x%02x%02x%02x", + hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7], + hash[8], hash[9], hash[10], hash[11], hash[12], hash[13], hash[14], hash[15]); + return string(cstr); +} + +string ClassKey::str() const +{ + string result(package + ":" + name + "(" + getHashString() + ")"); + return result; +} + +bool ClassKey::operator==(const ClassKey& other) const +{ + return ::memcmp(hash, other.hash, HASH_SIZE) == 0 && + name == other.name && + package == other.package; +} + +bool ClassKey::operator!=(const ClassKey& other) const +{ + return !(*this == other); +} + +bool ClassKey::operator<(const ClassKey& other) const +{ + int cmp = ::memcmp(hash, other.hash, HASH_SIZE); + if (cmp != 0) + return cmp < 0; + cmp = name.compare(other.name); + if (cmp != 0) + return cmp < 0; + return package < other.package; +} + +bool ClassKey::operator>(const ClassKey& other) const +{ + int cmp = ::memcmp(hash, other.hash, HASH_SIZE); + if (cmp != 0) + return cmp > 0; + cmp = name.compare(other.name); + if (cmp != 0) + return cmp > 0; + return package > other.package; +} + +bool ClassKey::operator<=(const ClassKey& other) const +{ + return !(*this > other); +} + +bool ClassKey::operator>=(const ClassKey& other) const +{ + return !(*this < other); +} + +void ClassKey::encode(framing::Buffer& buffer) const +{ + buffer.putShortString(package); + buffer.putShortString(name); + buffer.putBin128(const_cast<uint8_t*>(hash)); +} + +ostream& qpid::console::operator<<(ostream& o, const ClassKey& k) +{ + o << k.str(); + return o; +} diff --git a/cpp/src/qpid/console/ClassKey.h b/cpp/src/qpid/console/ClassKey.h new file mode 100644 index 0000000000..f6617e22d5 --- /dev/null +++ b/cpp/src/qpid/console/ClassKey.h @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_CONSOLE_CLASSKEY_H_ +#define _QPID_CONSOLE_CLASSKEY_H_ + +#include <string> +#include "Package.h" +#include "qpid/framing/Buffer.h" + +namespace qpid { +namespace console { + + /** + * + * \ingroup qmfconsoleapi + */ + class ClassKey { + public: + static const int HASH_SIZE = 16; + + ClassKey(const std::string& package, const std::string& name, const uint8_t* hash); + + const std::string& getPackageName() const { return package; } + const std::string& getClassName() const { return name; } + const uint8_t* getHash() const { return hash; } + std::string getHashString() const; + std::string str() const; + bool operator==(const ClassKey& other) const; + bool operator!=(const ClassKey& other) const; + bool operator<(const ClassKey& other) const; + bool operator>(const ClassKey& other) const; + bool operator<=(const ClassKey& other) const; + bool operator>=(const ClassKey& other) const; + void encode(framing::Buffer& buffer) const; + + private: + std::string package; + std::string name; + uint8_t hash[HASH_SIZE]; + }; + + std::ostream& operator<<(std::ostream& o, const ClassKey& k); +} +} + + +#endif diff --git a/cpp/src/qpid/console/ConsoleListener.h b/cpp/src/qpid/console/ConsoleListener.h new file mode 100644 index 0000000000..97fdf158cd --- /dev/null +++ b/cpp/src/qpid/console/ConsoleListener.h @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_CONSOLE_CONSOLE_LISTENER_H_ +#define _QPID_CONSOLE_CONSOLE_LISTENER_H_ + +#include <string> +#include "Broker.h" +#include "ClassKey.h" +#include "Object.h" + +namespace qpid { +namespace console { + + /** + * Implement a subclass of ConsoleListener and subscribe it using + * the SessionManager to receive indications. + * + * \ingroup qmfconsoleapi + */ + class ConsoleListener{ + public: + virtual ~ConsoleListener() {}; + + /** Invoked when a connection is established to a broker + */ + virtual void brokerConnected(const Broker&) {} + + /** Invoked when the connection to a broker is lost + */ + virtual void brokerDisconnected(const Broker&) {} + + /** Invoked when a QMF package is discovered. + */ + virtual void newPackage(const std::string&) {} + + /** Invoked when a new class is discovered. Session.getSchema can be + * used to obtain details about the class. + */ + virtual void newClass(const ClassKey&) {} + + /** Invoked when a QMF agent is discovered. + */ + virtual void newAgent(const Agent&) {} + + /** Invoked when a QMF agent disconects. + */ + virtual void delAgent(const Agent&) {} + + /** Invoked when an object is updated. + */ + virtual void objectProps(Broker&, Object&) {} + + /** Invoked when an object is updated. + */ + virtual void objectStats(Broker&, Object&) {} + + /** Invoked when an event is raised. + */ + //virtual void event(Broker&, Event) {} + + /** + */ + //virtual void heartbeat(Agent&, uint64_t) {} + + /** + */ + virtual void brokerInfo(Broker&) {} + + /** + */ + //virtual void methodResponse(Broker&, uint32_t seq, MethodResponse&) {} + }; +} +} + + +#endif diff --git a/cpp/src/qpid/console/Event.h b/cpp/src/qpid/console/Event.h new file mode 100644 index 0000000000..7627a4264d --- /dev/null +++ b/cpp/src/qpid/console/Event.h @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_CONSOLE_EVENT_H_ +#define _QPID_CONSOLE_EVENT_H_ + +#include <string> +#include "Message.h" + +namespace qpid { +namespace console { + + /** + * + * \ingroup qpidconsoleapi + */ + class Event { + }; +} +} + + +#endif diff --git a/cpp/src/qpid/console/Object.cpp b/cpp/src/qpid/console/Object.cpp new file mode 100644 index 0000000000..1ca70a616a --- /dev/null +++ b/cpp/src/qpid/console/Object.cpp @@ -0,0 +1,324 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionManager.h" +#include "Broker.h" +#include "Object.h" +#include "Schema.h" +#include "ClassKey.h" +#include "Value.h" +#include "qpid/framing/Buffer.h" +#include "qpid/sys/Mutex.h" + +using namespace qpid::console; +using namespace std; +using qpid::framing::Uuid; +using qpid::framing::FieldTable; +using qpid::sys::Mutex; + +Object::Object(Broker* b, SchemaClass* s, framing::Buffer& buffer, bool prop, bool stat) : + broker(b), schema(s), pendingMethod(0) +{ + currentTime = buffer.getLongLong(); + createTime = buffer.getLongLong(); + deleteTime = buffer.getLongLong(); + objectId.decode(buffer); + + if (prop) { + set<string> excludes; + parsePresenceMasks(buffer, excludes); + for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin(); + pIter != schema->properties.end(); pIter++) { + SchemaProperty* property = *pIter; + if (excludes.count(property->name) != 0) { + attributes[property->name] = new NullValue(); + } else { + attributes[property->name] = property->decodeValue(buffer); + } + } + } + + if (stat) { + for (vector<SchemaStatistic*>::const_iterator sIter = schema->statistics.begin(); + sIter != schema->statistics.end(); sIter++) { + SchemaStatistic* statistic = *sIter; + attributes[statistic->name] = statistic->decodeValue(buffer); + } + } +} + +Object::~Object() +{ + // for (AttributeMap::iterator iter = attributes.begin(); iter != attributes.end(); iter++) + // delete iter->second; + // attributes.clear(); +} + +const ClassKey& Object::getClassKey() const +{ + return schema->getClassKey(); +} + +std::string Object::getIndex() const +{ + string result; + + for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin(); + pIter != schema->properties.end(); pIter++) { + SchemaProperty* property = *pIter; + if (property->isIndex) { + AttributeMap::const_iterator vIter = attributes.find(property->name); + if (vIter != attributes.end()) { + if (!result.empty()) + result += ":"; + result += vIter->second->str(); + } + } + } + return result; +} + +void Object::mergeUpdate(const Object& /*updated*/) +{ + // TODO +} + +void Object::invokeMethod(const string name, const AttributeMap& args, MethodResponse& result) +{ + for (vector<SchemaMethod*>::const_iterator iter = schema->methods.begin(); + iter != schema->methods.end(); iter++) { + if ((*iter)->name == name) { + SchemaMethod* method = *iter; + char rawbuffer[65536]; + framing::Buffer buffer(rawbuffer, 65536); + uint32_t sequence = broker->sessionManager.sequenceManager.reserve("method"); + pendingMethod = method; + broker->methodObject = this; + broker->encodeHeader(buffer, 'M', sequence); + objectId.encode(buffer); + schema->key.encode(buffer); + buffer.putShortString(name); + + for (vector<SchemaArgument*>::const_iterator aIter = method->arguments.begin(); + aIter != method->arguments.end(); aIter++) { + SchemaArgument* arg = *aIter; + if (arg->dirInput) { + AttributeMap::const_iterator attr = args.find(arg->name); + if (attr != args.end()) { + ValueFactory::encodeValue(arg->typeCode, attr->second, buffer); + } else { + // TODO Use the default value instead of throwing + throw Exception("Missing arguments in method call"); + } + } + } + + uint32_t length = buffer.getPosition(); + buffer.reset(); + stringstream routingKey; + routingKey << "agent." << objectId.getBrokerBank() << "." << objectId.getAgentBank(); + broker->connThreadBody.sendBuffer(buffer, length, "qpid.management", routingKey.str()); + + { + Mutex::ScopedLock l(broker->lock); + while (pendingMethod != 0) + broker->cond.wait(broker->lock); + result = methodResponse; + } + } + } +} + +void Object::handleMethodResp(framing::Buffer& buffer, uint32_t sequence) +{ + broker->sessionManager.sequenceManager.release(sequence); + methodResponse.code = buffer.getLong(); + buffer.getMediumString(methodResponse.text); + methodResponse.arguments.clear(); + + for (vector<SchemaArgument*>::const_iterator aIter = pendingMethod->arguments.begin(); + aIter != pendingMethod->arguments.end(); aIter++) { + SchemaArgument* arg = *aIter; + if (arg->dirOutput) { + methodResponse.arguments[arg->name] = arg->decodeValue(buffer); + } + } + + { + Mutex::ScopedLock l(broker->lock); + pendingMethod = 0; + broker->cond.notify(); + } +} + +ObjectId Object::attrRef(const std::string& key) const +{ + AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return ObjectId(); + Value* val = iter->second; + if (!val->isObjectId()) + return ObjectId(); + return val->asObjectId(); +} + +uint32_t Object::attrUint(const std::string& key) const +{ + AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0; + Value* val = iter->second; + if (!val->isUint()) + return 0; + return val->asUint(); +} + +int32_t Object::attrInt(const std::string& key) const +{ + AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0; + Value* val = iter->second; + if (!val->isInt()) + return 0; + return val->asInt(); +} + +uint64_t Object::attrUint64(const std::string& key) const +{ + AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0; + Value* val = iter->second; + if (!val->isUint64()) + return 0; + return val->asUint64(); +} + +int64_t Object::attrInt64(const std::string& key) const +{ + AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0; + Value* val = iter->second; + if (!val->isInt64()) + return 0; + return val->asInt64(); +} + +string Object::attrString(const std::string& key) const +{ + AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return string(); + Value* val = iter->second; + if (!val->isString()) + return string(); + return val->asString(); +} + +bool Object::attrBool(const std::string& key) const +{ + AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return false; + Value* val = iter->second; + if (!val->isBool()) + return false; + return val->asBool(); +} + +float Object::attrFloat(const std::string& key) const +{ + AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0.0; + Value* val = iter->second; + if (!val->isFloat()) + return 0.0; + return val->asFloat(); +} + +double Object::attrDouble(const std::string& key) const +{ + AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0.0; + Value* val = iter->second; + if (!val->isDouble()) + return 0.0; + return val->asDouble(); +} + +Uuid Object::attrUuid(const std::string& key) const +{ + AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return Uuid(); + Value* val = iter->second; + if (!val->isUuid()) + return Uuid(); + return val->asUuid(); +} + +FieldTable Object::attrMap(const std::string& key) const +{ + AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return FieldTable(); + Value* val = iter->second; + if (!val->isMap()) + return FieldTable(); + return val->asMap(); +} + +void Object::parsePresenceMasks(framing::Buffer& buffer, set<string>& excludeList) +{ + excludeList.clear(); + uint8_t bit = 0; + uint8_t mask = 0; + + for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin(); + pIter != schema->properties.end(); pIter++) { + SchemaProperty* property = *pIter; + if (property->isOptional) { + if (bit == 0) { + mask = buffer.getOctet(); + bit = 1; + } + if ((mask & bit) == 0) + excludeList.insert(property->name); + if (bit == 0x80) + bit = 0; + else + bit = bit << 1; + } + } +} + +ostream& qpid::console::operator<<(ostream& o, const Object& object) +{ + const ClassKey& key = object.getClassKey(); + o << key.getPackageName() << ":" << key.getClassName() << "[" << object.getObjectId() << "] " << + object.getIndex(); + return o; +} + diff --git a/cpp/src/qpid/console/Object.h b/cpp/src/qpid/console/Object.h new file mode 100644 index 0000000000..918bee8af1 --- /dev/null +++ b/cpp/src/qpid/console/Object.h @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_CONSOLE_OBJECT_H_ +#define _QPID_CONSOLE_OBJECT_H_ + +#include "ObjectId.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/FieldTable.h" +#include <map> +#include <set> +#include <vector> + +namespace qpid { +namespace framing { + class Buffer; +} +namespace console { + + class Broker; + class SchemaClass; + class SchemaMethod; + class ObjectId; + class ClassKey; + class Value; + + /** + * \ingroup qmfconsoleapi + */ + struct MethodResponse { + uint32_t code; + std::string text; + std::map<std::string, Value*> arguments; + }; + + class Object { + public: + typedef std::vector<Object> Vector; + typedef std::map<std::string, Value*> AttributeMap; + + Object(Broker* broker, SchemaClass* schemaClass, framing::Buffer& buffer, bool prop, bool stat); + ~Object(); + + Broker* getBroker() const { return broker; } + const ObjectId& getObjectId() const { return objectId; } + const ClassKey& getClassKey() const; + SchemaClass* getSchema() const { return schema; } + uint64_t getCurrentTime() const { return currentTime; } + uint64_t getCreateTime() const { return createTime; } + uint64_t getDeleteTime() const { return deleteTime; } + bool isDeleted() const { return deleteTime != 0; } + std::string getIndex() const; + void mergeUpdate(const Object& updated); + const AttributeMap& getAttributes() const { return attributes; } + void invokeMethod(const std::string name, const AttributeMap& args, MethodResponse& result); + void handleMethodResp(framing::Buffer& buffer, uint32_t sequence); + + ObjectId attrRef(const std::string& key) const; + uint32_t attrUint(const std::string& key) const; + int32_t attrInt(const std::string& key) const; + uint64_t attrUint64(const std::string& key) const; + int64_t attrInt64(const std::string& key) const; + std::string attrString(const std::string& key) const; + bool attrBool(const std::string& key) const; + float attrFloat(const std::string& key) const; + double attrDouble(const std::string& key) const; + framing::Uuid attrUuid(const std::string& key) const; + framing::FieldTable attrMap(const std::string& key) const; + + private: + Broker* broker; + SchemaClass* schema; + ObjectId objectId; + uint64_t currentTime; + uint64_t createTime; + uint64_t deleteTime; + AttributeMap attributes; + SchemaMethod* pendingMethod; + MethodResponse methodResponse; + + void parsePresenceMasks(framing::Buffer& buffer, std::set<std::string>& excludeList); + }; + + std::ostream& operator<<(std::ostream& o, const Object& object); +} +} + + +#endif diff --git a/cpp/src/qpid/console/ObjectId.cpp b/cpp/src/qpid/console/ObjectId.cpp new file mode 100644 index 0000000000..535e59e88d --- /dev/null +++ b/cpp/src/qpid/console/ObjectId.cpp @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ObjectId.h" +#include "qpid/framing/Buffer.h" + +using namespace qpid::console; +using namespace std; + +ObjectId::ObjectId(framing::Buffer& buffer) +{ + decode(buffer); +} + +void ObjectId::decode(framing::Buffer& buffer) +{ + first = buffer.getLongLong(); + second = buffer.getLongLong(); +} + +void ObjectId::encode(framing::Buffer& buffer) +{ + buffer.putLongLong(first); + buffer.putLongLong(second); +} + +ostream& qpid::console::operator<<(ostream& o, const ObjectId& id) +{ + o << (int) id.getFlags() << "-" << id.getSequence() << "-" << id.getBrokerBank() << "-" << + id.getAgentBank() << "-" << id.getObject(); + return o; +} + + diff --git a/cpp/src/qpid/console/ObjectId.h b/cpp/src/qpid/console/ObjectId.h new file mode 100644 index 0000000000..c9c2fc852a --- /dev/null +++ b/cpp/src/qpid/console/ObjectId.h @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_CONSOLE_OBJECTID_H +#define _QPID_CONSOLE_OBJECTID_H + +#include <iostream> + +namespace qpid { +namespace framing { + class Buffer; +} +namespace console { + + /** + * + * \ingroup qmfconsoleapi + */ + class ObjectId { + public: + ObjectId() : first(0), second(0) {} + ObjectId(framing::Buffer& buffer); + + uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; } + uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; } + uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; } + uint32_t getAgentBank() const { return first & 0x000000000FFFFFFFLL; } + uint64_t getObject() const { return second; } + bool isDurable() const { return getSequence() == 0; } + void decode(framing::Buffer& buffer); + void encode(framing::Buffer& buffer); + void setValue(uint64_t f, uint64_t s) { first = f; second = s; } + + private: + uint64_t first; + uint64_t second; + }; + + std::ostream& operator<<(std::ostream& o, const ObjectId& id); +} +} + +#endif diff --git a/cpp/src/qpid/console/Package.cpp b/cpp/src/qpid/console/Package.cpp new file mode 100644 index 0000000000..81a04445f2 --- /dev/null +++ b/cpp/src/qpid/console/Package.cpp @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Package.h" + +using namespace qpid::console; + +SchemaClass* Package::getClass(const std::string& className, uint8_t* hash) +{ + NameHash key(className, hash); + ClassMap::iterator iter = classes.find(key); + if (iter != classes.end()) + return iter->second; + return 0; +} + +void Package::addClass(const std::string& className, uint8_t* hash, SchemaClass* schemaClass) +{ + NameHash key(className, hash); + ClassMap::iterator iter = classes.find(key); + if (iter == classes.end()) + classes[key] = schemaClass; +} diff --git a/cpp/src/qpid/console/Package.h b/cpp/src/qpid/console/Package.h new file mode 100644 index 0000000000..a8679dff19 --- /dev/null +++ b/cpp/src/qpid/console/Package.h @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_CONSOLE_PACKAGE_H_ +#define _QPID_CONSOLE_PACKAGE_H_ + +#include <string> +#include <map> + +namespace qpid { +namespace console { + class SchemaClass; + + /** + * + * \ingroup qmfconsoleapi + */ + class Package { + public: + Package(const std::string& n) : name(n) {} + const std::string& getName() const { return name; } + + private: + friend class SessionManager; + struct NameHash { + std::string name; + uint8_t hash[16]; + NameHash(const std::string& n, const uint8_t* h) : name(n) { + for (int i = 0; i < 16; i++) + hash[i] = h[i]; + } + }; + + struct NameHashComp { + bool operator() (const NameHash& lhs, const NameHash& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + typedef std::map<NameHash, SchemaClass*, NameHashComp> ClassMap; + + const std::string name; + ClassMap classes; + + SchemaClass* getClass(const std::string& className, uint8_t* hash); + void addClass(const std::string& className, uint8_t* hash, + SchemaClass* schemaClass); + }; +} +} + +#endif diff --git a/cpp/src/qpid/console/Schema.cpp b/cpp/src/qpid/console/Schema.cpp new file mode 100644 index 0000000000..1cc8b8ee02 --- /dev/null +++ b/cpp/src/qpid/console/Schema.cpp @@ -0,0 +1,155 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Schema.h" +#include "Value.h" +#include "qpid/framing/FieldTable.h" + +using namespace qpid::console; +using std::string; +using std::vector; + +SchemaArgument::SchemaArgument(framing::Buffer& buffer, bool forMethod) +{ + framing::FieldTable map; + map.decode(buffer); + + name = map.getAsString("name"); + typeCode = map.getAsInt("type"); + unit = map.getAsString("unit"); + min = map.getAsInt("min"); + max = map.getAsInt("max"); + maxLen = map.getAsInt("maxlen"); + desc = map.getAsString("desc"); + + dirInput = false; + dirOutput = false; + if (forMethod) { + string dir(map.getAsString("dir")); + if (dir.find('I') != dir.npos || dir.find('i') != dir.npos) + dirInput = true; + if (dir.find('O') != dir.npos || dir.find('o') != dir.npos) + dirOutput = true; + } +} + +Value* SchemaArgument::decodeValue(framing::Buffer& buffer) +{ + return ValueFactory::newValue(typeCode, buffer); +} + +SchemaProperty::SchemaProperty(framing::Buffer& buffer) +{ + framing::FieldTable map; + map.decode(buffer); + + name = map.getAsString("name"); + typeCode = map.getAsInt("type"); + accessCode = map.getAsInt("access"); + isIndex = map.getAsInt("index") != 0; + isOptional = map.getAsInt("optional") != 0; + unit = map.getAsString("unit"); + min = map.getAsInt("min"); + max = map.getAsInt("max"); + maxLen = map.getAsInt("maxlen"); + desc = map.getAsString("desc"); +} + +Value* SchemaProperty::decodeValue(framing::Buffer& buffer) +{ + return ValueFactory::newValue(typeCode, buffer); +} + +SchemaStatistic::SchemaStatistic(framing::Buffer& buffer) +{ + framing::FieldTable map; + map.decode(buffer); + + name = map.getAsString("name"); + typeCode = map.getAsInt("type"); + unit = map.getAsString("unit"); + desc = map.getAsString("desc"); +} + +Value* SchemaStatistic::decodeValue(framing::Buffer& buffer) +{ + return ValueFactory::newValue(typeCode, buffer); +} + +SchemaMethod::SchemaMethod(framing::Buffer& buffer) +{ + framing::FieldTable map; + map.decode(buffer); + + name = map.getAsString("name"); + desc = map.getAsString("desc"); + int argCount = map.getAsInt("argCount"); + + for (int i = 0; i < argCount; i++) + arguments.push_back(new SchemaArgument(buffer, true)); +} + +SchemaMethod::~SchemaMethod() +{ + for (vector<SchemaArgument*>::iterator iter = arguments.begin(); + iter != arguments.end(); iter++) + delete *iter; +} + +SchemaClass::SchemaClass(const uint8_t _kind, const ClassKey& _key, framing::Buffer& buffer) : + kind(_kind), key(_key) +{ + if (kind == KIND_TABLE) { + uint16_t propCount = buffer.getShort(); + uint16_t statCount = buffer.getShort(); + uint16_t methodCount = buffer.getShort(); + + for (uint16_t idx = 0; idx < propCount; idx++) + properties.push_back(new SchemaProperty(buffer)); + for (uint16_t idx = 0; idx < statCount; idx++) + statistics.push_back(new SchemaStatistic(buffer)); + for (uint16_t idx = 0; idx < methodCount; idx++) + methods.push_back(new SchemaMethod(buffer)); + + } else if (kind == KIND_EVENT) { + uint16_t argCount = buffer.getShort(); + + for (uint16_t idx = 0; idx < argCount; idx++) + arguments.push_back(new SchemaArgument(buffer)); + } +} + +SchemaClass::~SchemaClass() +{ + for (vector<SchemaProperty*>::iterator iter = properties.begin(); + iter != properties.end(); iter++) + delete *iter; + for (vector<SchemaStatistic*>::iterator iter = statistics.begin(); + iter != statistics.end(); iter++) + delete *iter; + for (vector<SchemaMethod*>::iterator iter = methods.begin(); + iter != methods.end(); iter++) + delete *iter; + for (vector<SchemaArgument*>::iterator iter = arguments.begin(); + iter != arguments.end(); iter++) + delete *iter; +} + diff --git a/cpp/src/qpid/console/Schema.h b/cpp/src/qpid/console/Schema.h new file mode 100644 index 0000000000..accd3a8b16 --- /dev/null +++ b/cpp/src/qpid/console/Schema.h @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_CONSOLE_SCHEMA_H_ +#define _QPID_CONSOLE_SCHEMA_H_ + +#include "ClassKey.h" +#include <vector> + +namespace qpid { +namespace framing { + class Buffer; +} +namespace console { + class Value; + + struct SchemaArgument { + SchemaArgument(framing::Buffer& buffer, bool forMethod = false); + Value* decodeValue(framing::Buffer& buffer); + + std::string name; + uint8_t typeCode; + bool dirInput; + bool dirOutput; + std::string unit; + int min; + int max; + int maxLen; + std::string desc; + std::string defaultVal; + }; + + struct SchemaProperty { + SchemaProperty(framing::Buffer& buffer); + Value* decodeValue(framing::Buffer& buffer); + + std::string name; + uint8_t typeCode; + uint8_t accessCode; + bool isIndex; + bool isOptional; + std::string unit; + int min; + int max; + int maxLen; + std::string desc; + }; + + struct SchemaStatistic { + SchemaStatistic(framing::Buffer& buffer); + Value* decodeValue(framing::Buffer& buffer); + + std::string name; + uint8_t typeCode; + std::string unit; + std::string desc; + }; + + struct SchemaMethod { + SchemaMethod(framing::Buffer& buffer); + ~SchemaMethod(); + + std::string name; + std::string desc; + std::vector<SchemaArgument*> arguments; + }; + + struct SchemaClass { + static const uint8_t KIND_TABLE = 1; + static const uint8_t KIND_EVENT = 2; + + SchemaClass(const uint8_t kind, const ClassKey& key, framing::Buffer& buffer); + ~SchemaClass(); + const ClassKey& getClassKey() const { return key; } + + const uint8_t kind; + const ClassKey key; + std::vector<SchemaProperty*> properties; + std::vector<SchemaStatistic*> statistics; + std::vector<SchemaMethod*> methods; + std::vector<SchemaArgument*> arguments; + }; +} +} + + +#endif diff --git a/cpp/src/qpid/console/SequenceManager.cpp b/cpp/src/qpid/console/SequenceManager.cpp new file mode 100644 index 0000000000..ff777430c0 --- /dev/null +++ b/cpp/src/qpid/console/SequenceManager.cpp @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SequenceManager.h" + +using namespace qpid::console; +using namespace qpid::sys; +using std::string; +using std::cout; +using std::endl; + +uint32_t SequenceManager::reserve(const std::string& context) +{ + Mutex::ScopedLock l(lock); + uint32_t result = sequence++; + pending[result] = context; + return result; +} + +std::string SequenceManager::release(uint32_t seq) +{ + Mutex::ScopedLock l(lock); + std::map<uint32_t, string>::iterator iter = pending.find(seq); + if (iter == pending.end()) + return string(); + string result(iter->second); + pending.erase(iter); + return result; +} + diff --git a/cpp/src/qpid/console/SequenceManager.h b/cpp/src/qpid/console/SequenceManager.h new file mode 100644 index 0000000000..c7a8c20fe6 --- /dev/null +++ b/cpp/src/qpid/console/SequenceManager.h @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_CONSOLE_SEQUENCEMANAGER_H_ +#define _QPID_CONSOLE_SEQUENCEMANAGER_H_ + +#include "qpid/sys/Mutex.h" +#include <map> +#include <string> +#include <set> + +namespace qpid { +namespace console { + + /** + * + * \ingroup qpidconsoleapi + */ + class SequenceManager { + public: + typedef std::set<uint32_t> set; + + SequenceManager() : sequence(0) {} + uint32_t reserve(const std::string& context = ""); + std::string release(uint32_t seq); + + private: + sys::Mutex lock; + uint32_t sequence; + std::map<uint32_t, std::string> pending; + }; +} +} + + +#endif diff --git a/cpp/src/qpid/console/SessionManager.cpp b/cpp/src/qpid/console/SessionManager.cpp new file mode 100644 index 0000000000..bd06421445 --- /dev/null +++ b/cpp/src/qpid/console/SessionManager.cpp @@ -0,0 +1,446 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionManager.h" +#include "Schema.h" +#include "Agent.h" +#include "qpid/console/ConsoleListener.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Time.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/FieldTable.h" + +using namespace qpid::console; +using namespace qpid::sys; +using namespace std; +using qpid::framing::Buffer; +using qpid::framing::FieldTable; + +SessionManager::SessionManager(ConsoleListener* _listener, + bool _rcvObjects, + bool _rcvEvents, + bool _rcvHeartbeats, + bool _manageConnections, + bool _userBindings) : + listener(_listener), + rcvObjects((listener != 0) && _rcvObjects), + rcvEvents((listener != 0) && _rcvEvents), + rcvHeartbeats((listener != 0) && _rcvHeartbeats), + userBindings(_userBindings), + manageConnections(_manageConnections) +{ + bindingKeys(); +} + +Broker* SessionManager::addBroker(client::ConnectionSettings& settings) +{ + Broker* broker(new Broker(*this, settings)); + { + Mutex::ScopedLock l(brokerListLock); + brokers.push_back(broker); + } + return broker; +} + +void SessionManager::delBroker(Broker* broker) +{ + Mutex::ScopedLock l(brokerListLock); + for (vector<Broker*>::iterator iter = brokers.begin(); + iter != brokers.end(); iter++) + if (*iter == broker) { + brokers.erase(iter); + return; + } +} + +void SessionManager::getPackages(NameVector& packageNames) +{ + allBrokersStable(); + packageNames.clear(); + { + Mutex::ScopedLock l(lock); + for (map<string, Package*>::iterator iter = packages.begin(); + iter != packages.end(); iter++) + packageNames.push_back(iter->first); + } +} + +void SessionManager::getClasses(KeyVector& classKeys, const std::string& packageName) +{ + allBrokersStable(); + classKeys.clear(); + map<string, Package*>::iterator iter = packages.find(packageName); + if (iter == packages.end()) + return; + + Package& package = *(iter->second); + for (Package::ClassMap::const_iterator piter = package.classes.begin(); + piter != package.classes.end(); piter++) { + ClassKey key(piter->second->getClassKey()); + classKeys.push_back(key); + } +} + +SchemaClass& SessionManager::getSchema(const ClassKey& classKey) +{ + allBrokersStable(); + map<string, Package*>::iterator iter = packages.find(classKey.getPackageName()); + if (iter == packages.end()) + throw Exception("Unknown package"); + + Package& package = *(iter->second); + Package::NameHash key(classKey.getClassName(), classKey.getHash()); + Package::ClassMap::iterator cIter = package.classes.find(key); + if (cIter == package.classes.end()) + throw Exception("Unknown class"); + + return *(cIter->second); +} + +void SessionManager::bindPackage(const std::string& packageName) +{ + stringstream key; + key << "console.obj.*.*." << packageName << ".#"; + bindingKeyList.push_back(key.str()); + for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) + (*iter)->addBinding(key.str()); +} + +void SessionManager::bindClass(const ClassKey& classKey) +{ + bindClass(classKey.getPackageName(), classKey.getClassName()); +} + +void SessionManager::bindClass(const std::string& packageName, const std::string& className) +{ + stringstream key; + key << "console.obj.*.*." << packageName << "." << className << ".#"; + bindingKeyList.push_back(key.str()); + for (vector<Broker*>::iterator iter = brokers.begin(); + iter != brokers.end(); iter++) + (*iter)->addBinding(key.str()); +} + +void SessionManager::getAgents(Agent::Vector& agents, Broker* broker) +{ + agents.clear(); + if (broker != 0) { + broker->appendAgents(agents); + } else { + for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) { + (*iter)->appendAgents(agents); + } + } +} + +void SessionManager::getObjects(Object::Vector& objects, const std::string& className, + Broker* _broker, Agent* _agent) +{ + Agent::Vector agentList; + + if (_agent != 0) { + agentList.push_back(_agent); + } else { + if (_broker != 0) { + _broker->appendAgents(agentList); + } else { + Mutex::ScopedLock _lock(brokerListLock); + for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) { + (*iter)->appendAgents(agentList); + } + } + } + + FieldTable ft; + uint32_t sequence; + ft.setString("_class", className); + + getResult.clear(); + syncSequenceList.clear(); + error = string(); + + for (Agent::Vector::iterator iter = agentList.begin(); iter != agentList.end(); iter++) { + Agent* agent = *iter; + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + stringstream routingKey; + routingKey << "agent." << agent->getBrokerBank() << "." << agent->getAgentBank(); + { + Mutex::ScopedLock _lock(lock); + sequence = sequenceManager.reserve("multiget"); + syncSequenceList.insert(sequence); + } + agent->getBroker()->encodeHeader(buffer, 'G', sequence); + ft.encode(buffer); + uint32_t length = buffer.getPosition(); + buffer.reset(); + agent->getBroker()->connThreadBody.sendBuffer(buffer, length, "qpid.management", routingKey.str()); + } + + { + Mutex::ScopedLock _lock(lock); + while (!syncSequenceList.empty() && error.empty()) { + cv.wait(lock); // TODO put timeout in + } + } + + objects = getResult; +} + +void SessionManager::bindingKeys() +{ + bindingKeyList.push_back("schema.#"); + if (rcvObjects && rcvEvents && rcvHeartbeats && !userBindings) { + bindingKeyList.push_back("console.#"); + } else { + if (rcvObjects && !userBindings) + bindingKeyList.push_back("console.obj.#"); + else + bindingKeyList.push_back("console.obj.*.*.org.apache.qpid.broker.agent"); + if (rcvEvents) + bindingKeyList.push_back("console.event.#"); + if (rcvHeartbeats) + bindingKeyList.push_back("console.heartbeat"); + } +} + +void SessionManager::allBrokersStable() +{ + Mutex::ScopedLock l(brokerListLock); + for (vector<Broker*>::iterator iter = brokers.begin(); + iter != brokers.end(); iter++) + (*iter)->waitForStable(); +} + +void SessionManager::startProtocol(Broker* broker) +{ + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + broker->encodeHeader(buffer, 'B'); + uint32_t length = 512 - buffer.available(); + buffer.reset(); + broker->connThreadBody.sendBuffer(buffer, length); +} + + +void SessionManager::handleBrokerResp(Broker* broker, Buffer& inBuffer, uint32_t) +{ + framing::Uuid brokerId; + + brokerId.decode(inBuffer); + broker->setBrokerId(brokerId); + + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + uint32_t sequence = sequenceManager.reserve("startup"); + broker->encodeHeader(buffer, 'P', sequence); + uint32_t length = 512 - buffer.available(); + buffer.reset(); + broker->connThreadBody.sendBuffer(buffer, length); + + if (listener != 0) { + listener->brokerInfo(*broker); + } +} + +void SessionManager::handlePackageInd(Broker* broker, Buffer& inBuffer, uint32_t) +{ + string packageName; + inBuffer.getShortString(packageName); + + { + Mutex::ScopedLock l(lock); + map<string, Package*>::iterator iter = packages.find(packageName); + if (iter == packages.end()) { + packages[packageName] = new Package(packageName); + if (listener != 0) + listener->newPackage(packageName); + } + } + + broker->incOutstanding(); + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + uint32_t sequence = sequenceManager.reserve("startup"); + broker->encodeHeader(buffer, 'Q', sequence); + buffer.putShortString(packageName); + uint32_t length = 512 - buffer.available(); + buffer.reset(); + broker->connThreadBody.sendBuffer(buffer, length); +} + +void SessionManager::handleCommandComplete(Broker* broker, Buffer& inBuffer, uint32_t sequence) +{ + Mutex::ScopedLock l(lock); + uint32_t resultCode = inBuffer.getLong(); + string resultText; + inBuffer.getShortString(resultText); + string context = sequenceManager.release(sequence); + if (resultCode != 0) + QPID_LOG(debug, "Received error in completion: " << resultCode << " " << resultText); + if (context == "startup") { + broker->decOutstanding(); + } else if (context == "multiget") { + if (syncSequenceList.count(sequence) == 1) { + syncSequenceList.erase(sequence); + if (syncSequenceList.empty()) { + cv.notify(); + } + } + } + // TODO: Other context cases +} + +void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t) +{ + uint8_t kind; + string packageName; + string className; + uint8_t hash[16]; + + kind = inBuffer.getOctet(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + + { + Mutex::ScopedLock l(lock); + map<string, Package*>::iterator pIter = packages.find(packageName); + if (pIter == packages.end() || pIter->second->getClass(className, hash)) + return; + } + + broker->incOutstanding(); + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + uint32_t sequence = sequenceManager.reserve("startup"); + broker->encodeHeader(buffer, 'S', sequence); + buffer.putShortString(packageName); + buffer.putShortString(className); + buffer.putBin128(hash); + uint32_t length = 512 - buffer.available(); + buffer.reset(); + broker->connThreadBody.sendBuffer(buffer, length); +} + +void SessionManager::handleMethodResp(Broker* broker, Buffer& buffer, uint32_t sequence) +{ + if (broker->methodObject) { + broker->methodObject->handleMethodResp(buffer, sequence); + } +} + +void SessionManager::handleHeartbeatInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/) +{ +} + +void SessionManager::handleEventInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/) +{ +} + +void SessionManager::handleSchemaResp(Broker* broker, Buffer& inBuffer, uint32_t sequence) +{ + uint8_t kind; + string packageName; + string className; + uint8_t hash[16]; + + kind = inBuffer.getOctet(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + + { + Mutex::ScopedLock l(lock); + map<string, Package*>::iterator pIter = packages.find(packageName); + if (pIter != packages.end() && !pIter->second->getClass(className, hash)) { + ClassKey key(packageName, className, hash); + SchemaClass* schemaClass(new SchemaClass(kind, key, inBuffer)); + pIter->second->addClass(className, hash, schemaClass); + if (listener != 0) { + listener->newClass(schemaClass->getClassKey()); + } + } + } + + sequenceManager.release(sequence); + broker->decOutstanding(); +} + +void SessionManager::handleContentInd(Broker* broker, Buffer& buffer, uint32_t sequence, bool prop, bool stat) +{ + string packageName; + string className; + uint8_t hash[16]; + SchemaClass* schemaClass; + + buffer.getShortString(packageName); + buffer.getShortString(className); + buffer.getBin128(hash); + + { + Mutex::ScopedLock l(lock); + map<string, Package*>::iterator pIter = packages.find(packageName); + if (pIter == packages.end()) + return; + schemaClass = pIter->second->getClass(className, hash); + if (schemaClass == 0) + return; + } + + Object object(broker, schemaClass, buffer, prop, stat); + + if (prop && className == "agent" && packageName == "org.apache.qpid.broker") + broker->updateAgent(object); + + { + Mutex::ScopedLock l(lock); + if (syncSequenceList.count(sequence) == 1) { + if (!object.isDeleted()) + getResult.push_back(object); + } + return; + } + + if (listener) { + if (prop) + listener->objectProps(*broker, object); + if (stat) + listener->objectStats(*broker, object); + } +} + +void SessionManager::handleBrokerConnect(Broker* broker) +{ + if (listener != 0) + listener->brokerConnected(*broker); +} + +void SessionManager::handleBrokerDisconnect(Broker* broker) +{ + if (listener != 0) + listener->brokerDisconnected(*broker); +} + diff --git a/cpp/src/qpid/console/SessionManager.h b/cpp/src/qpid/console/SessionManager.h new file mode 100644 index 0000000000..f860a4590e --- /dev/null +++ b/cpp/src/qpid/console/SessionManager.h @@ -0,0 +1,194 @@ +#ifndef _QPID_CONSOLE_SESSION_MANAGER_H +#define _QPID_CONSOLE_SESSION_MANAGER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Broker.h" +#include "Package.h" +#include "SequenceManager.h" +#include "ClassKey.h" +#include "Schema.h" +#include "Agent.h" +#include "Object.h" +#include "ObjectId.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" +#include "qpid/client/ConnectionSettings.h" +#include <string> +#include <vector> + +namespace qpid { +namespace console { + +class ConsoleListener; + +/** + * + * \ingroup qmfconsoleapi + */ +class SessionManager +{ + public: + typedef std::vector<std::string> NameVector; + typedef std::vector<ClassKey> KeyVector; + ~SessionManager() {} + + /** Create a new SessionManager + * + * Provide your own subclass of ConsoleListener to receive updates and indications + * asynchronously or leave it as its default and use only synchronous methods. + * + *@param listener Listener object to receive asynchronous indications. + *@param rcvObjects Listener wishes to receive managed object data. + *@param rcvEvents Listener wishes to receive events. + *@param rcvHeartbeats Listener wishes to receive agent heartbeats. + *@param manageConnections SessionManager should retry dropped connections. + *@param userBindings If rcvObjects is true, userBindings allows the console client + * to control which object classes are received. See the bindPackage and bindClass + * methods. If userBindings is false, the listener will receive updates for all + * object classes. + */ + SessionManager(ConsoleListener* listener = 0, + bool rcvObjects = true, + bool rcvEvents = true, + bool rcvHeartbeats = true, + bool manageConnections = false, + bool userBindings = false); + + /** Connect a broker to the console session + * + *@param settings Connection settings for client access + *@return broker object if operation is successful + *@exception If 'manageConnections' is disabled and the connection to the broker fails, + * an exception shall be thrown. + */ + Broker* addBroker(client::ConnectionSettings& settings); + + /** Disconnect a broker from the console session + * + *@param broker The broker object returned from an earlier call to addBroker. + */ + void delBroker(Broker* broker); + + /** Get a list of known management packages + * + *@param packages Vector of package names returned by the session manager. + */ + void getPackages(NameVector& packages); + + /** Get a list of class keys associated with a package + * + *@param classKeys List of class keys returned by the session manager. + *@param packageName Name of package being queried. + */ + void getClasses(KeyVector& classKeys, const std::string& packageName); + + /** Get the schema of a class given its class key + * + *@param classKey Class key of the desired schema. + */ + SchemaClass& getSchema(const ClassKey& classKey); + + /** Request that updates be received for all classes within a package + * + * Note that this method is only meaningful if a ConsoleListener was provided at session + * creation and if the 'userBindings' flag was set to true. + * + *@param packageName Name of the package to which to bind. + */ + void bindPackage(const std::string& packageName); + + /** Request update to be received for a particular class + * + * Note that this method is only meaningful if a ConsoleListener was provided at session + * creation and if the 'userBindings' flag was set to true. + * + *@param classKey Class key of class to which to bind. + */ + void bindClass(const ClassKey& classKey); + void bindClass(const std::string& packageName, const std::string& className); + + /** Get a list of qmf agents known to the session manager. + * + *@param agents Vector of Agent objects returned by the session manager. + *@param broker Return agents registered with this broker only. If NULL, return agents + * from all connected brokers. + */ + void getAgents(Agent::Vector& agents, Broker* broker = 0); + + /** Get objects from agents. There are four variants of this method with different ways of + * specifying from which class objects are being queried. + * + *@param objects List of objects received. + *@param classKey ClassKey object identifying class to be queried. + *@param className Class name identifying class to be queried. + *@param objectId Object Id of the single object to be queried. + *@param broker Restrict the query to this broker, or all brokers if NULL. + *@param agent Restrict the query to this agent, or all agents if NULL. + */ + void getObjects(Object::Vector& objects, const std::string& className, + Broker* broker = 0, Agent* agent = 0); + //void getObjects(Object::Vector& objects, const ClassKey& classKey, + // Broker* broker = 0, Agent* agent = 0); + //void getObjects(Object::Vector& objects, const ObjectId& objectId, + // Broker* broker = 0, Agent* agent = 0); + +private: + friend class Broker; + friend class Object; + sys::Mutex lock; + sys::Mutex brokerListLock; + ConsoleListener* listener; + std::vector<Broker*> brokers; + std::map<std::string, Package*> packages; + SequenceManager sequenceManager; + sys::Condition cv; + SequenceManager::set syncSequenceList; + Object::Vector getResult; + std::string error; + bool rcvObjects; + bool rcvEvents; + bool rcvHeartbeats; + bool userBindings; + bool manageConnections; + NameVector bindingKeyList; + + void bindingKeys(); + void allBrokersStable(); + void startProtocol(Broker* broker); + void handleBrokerResp(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence); + void handlePackageInd(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence); + void handleCommandComplete(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence); + void handleClassInd(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence); + void handleMethodResp(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence); + void handleHeartbeatInd(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence); + void handleEventInd(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence); + void handleSchemaResp(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence); + void handleContentInd(Broker* broker, framing::Buffer& inBuffer, uint32_t sequence, bool prop, bool stat); + void handleBrokerConnect(Broker* broker); + void handleBrokerDisconnect(Broker* broker); + +}; + +}} // namespace qpid::console + +#endif /*!_QPID_CONSOLE_SESSION_MANAGER_H*/ diff --git a/cpp/src/qpid/console/Value.cpp b/cpp/src/qpid/console/Value.cpp new file mode 100644 index 0000000000..3ef5d01ec3 --- /dev/null +++ b/cpp/src/qpid/console/Value.cpp @@ -0,0 +1,168 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Value.h" +#include "qpid/framing/Buffer.h" + +using namespace qpid::console; +using namespace std; + +string NullValue::str() const +{ + return "<Null>"; +} + +RefValue::RefValue(framing::Buffer& buffer) +{ + uint64_t first = buffer.getLongLong(); + uint64_t second = buffer.getLongLong(); + value.setValue(first, second); +} + +string RefValue::str() const +{ + stringstream s; + s << value; + return s.str(); +} + +string UintValue::str() const +{ + stringstream s; + s << value; + return s.str(); +} + +string IntValue::str() const +{ + stringstream s; + s << value; + return s.str(); +} + +string Uint64Value::str() const +{ + stringstream s; + s << value; + return s.str(); +} + +string Int64Value::str() const +{ + stringstream s; + s << value; + return s.str(); +} + +StringValue::StringValue(framing::Buffer& buffer, int tc) +{ + if (tc == 6) + buffer.getShortString(value); + else + buffer.getMediumString(value); +} + +string BoolValue::str() const +{ + return value ? "T" : "F"; +} + +string FloatValue::str() const +{ + stringstream s; + s << value; + return s.str(); +} + +string DoubleValue::str() const +{ + stringstream s; + s << value; + return s.str(); +} + +UuidValue::UuidValue(framing::Buffer& buffer) +{ + value.decode(buffer); +} + +string MapValue::str() const +{ + stringstream s; + s << value; + return s.str(); +} + +MapValue::MapValue(framing::Buffer& buffer) +{ + value.decode(buffer); +} + + +Value* ValueFactory::newValue(int typeCode, framing::Buffer& buffer) +{ + switch (typeCode) { + case 1: return static_cast<Value*>(new UintValue(buffer.getOctet())); // U8 + case 2: return static_cast<Value*>(new UintValue(buffer.getShort())); // U16 + case 3: return static_cast<Value*>(new UintValue(buffer.getLong())); // U32 + case 4: return static_cast<Value*>(new Uint64Value(buffer.getLongLong())); // U64 + case 6: return static_cast<Value*>(new StringValue(buffer, 6)); // SSTR + case 7: return static_cast<Value*>(new StringValue(buffer, 7)); // LSTR + case 8: return static_cast<Value*>(new Int64Value(buffer.getLongLong())); // ABSTIME + case 9: return static_cast<Value*>(new Uint64Value(buffer.getLongLong())); // DELTATIME + case 10: return static_cast<Value*>(new RefValue(buffer)); // REF + case 11: return static_cast<Value*>(new BoolValue(buffer.getOctet())); // BOOL + case 12: return static_cast<Value*>(new FloatValue(buffer.getFloat())); // FLOAT + case 13: return static_cast<Value*>(new DoubleValue(buffer.getDouble())); // DOUBLE + case 14: return static_cast<Value*>(new UuidValue(buffer)); // UUID + case 15: return static_cast<Value*>(new MapValue(buffer)); // MAP + case 16: return static_cast<Value*>(new IntValue(buffer.getOctet())); // S8 + case 17: return static_cast<Value*>(new IntValue(buffer.getShort())); // S16 + case 18: return static_cast<Value*>(new IntValue(buffer.getLong())); // S32 + case 19: return static_cast<Value*>(new Int64Value(buffer.getLongLong())); // S64 + } + + return 0; +} + +void ValueFactory::encodeValue(int typeCode, Value* value, framing::Buffer& buffer) +{ + switch (typeCode) { + case 1: buffer.putOctet(value->asUint()); return; // U8 + case 2: buffer.putShort(value->asUint()); return; // U16 + case 3: buffer.putLong(value->asUint()); return; // U32 + case 4: buffer.putLongLong(value->asUint64()); return; // U64 + case 6: buffer.putShortString(value->asString()); return; // SSTR + case 7: buffer.putMediumString(value->asString()); return; // LSTR + case 8: buffer.putLongLong(value->asInt64()); return; // ABSTIME + case 9: buffer.putLongLong(value->asUint64()); return; // DELTATIME + case 10: value->asObjectId().encode(buffer); return; // REF + case 11: buffer.putOctet(value->asBool() ? 1 : 0); return; // BOOL + case 12: buffer.putFloat(value->asFloat()); return; // FLOAT + case 13: buffer.putDouble(value->asDouble()); return; // DOUBLE + case 14: value->asUuid().encode(buffer); return; // UUID + case 15: value->asMap().encode(buffer); return; // MAP + case 16: buffer.putOctet(value->asInt()); return; // S8 + case 17: buffer.putShort(value->asInt()); return; // S16 + case 18: buffer.putLong(value->asInt()); return; // S32 + case 19: buffer.putLongLong(value->asInt64()); return; // S64 + } +} diff --git a/cpp/src/qpid/console/Value.h b/cpp/src/qpid/console/Value.h new file mode 100644 index 0000000000..42ff5661a2 --- /dev/null +++ b/cpp/src/qpid/console/Value.h @@ -0,0 +1,205 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_CONSOLE_VALUE_H_ +#define _QPID_CONSOLE_VALUE_H_ + +#include "qpid/Exception.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/FieldTable.h" +#include "ObjectId.h" + +namespace qpid { +namespace framing { + class Buffer; +} +namespace console { + + /** + * \ingroup qmfconsoleapi + */ + class Value { + + public: + virtual ~Value() {} + virtual std::string str() const = 0; + + virtual bool isNull() const { return false; } + virtual bool isObjectId() const { return false; } + virtual bool isUint() const { return false; } + virtual bool isInt() const { return false; } + virtual bool isUint64() const { return false; } + virtual bool isInt64() const { return false; } + virtual bool isString() const { return false; } + virtual bool isBool() const { return false; } + virtual bool isFloat() const { return false; } + virtual bool isDouble() const { return false; } + virtual bool isUuid() const { return false; } + virtual bool isMap() const { return false; } + + virtual ObjectId asObjectId() const { incompatible(); return ObjectId(); } + virtual uint32_t asUint() const { incompatible(); return 0; } + virtual int32_t asInt() const { incompatible(); return 0; } + virtual uint64_t asUint64() const { incompatible(); return 0; } + virtual int64_t asInt64() const { incompatible(); return 0; } + virtual std::string asString() const { incompatible(); return std::string(); } + virtual bool asBool() const { incompatible(); return false; } + virtual float asFloat() const { incompatible(); return 0.0; } + virtual double asDouble() const { incompatible(); return 0.0; } + virtual framing::Uuid asUuid() const { incompatible(); return framing::Uuid(); } + virtual framing::FieldTable asMap() const { incompatible(); return framing::FieldTable(); } + + private: + void incompatible() const { + throw Exception("Incompatible Type"); + } + }; + + class NullValue : public Value { + public: + NullValue() {} + std::string str() const; + bool isNull() const { return true; } + }; + + class RefValue : public Value { + public: + RefValue(ObjectId v) : value(v) {} + RefValue(framing::Buffer& buffer); + std::string str() const; + bool isObjectId() const { return true; } + ObjectId asObjectId() const { return value; } + private: + ObjectId value; + }; + + class UintValue : public Value { + public: + UintValue(uint32_t v) : value(v) {} + std::string str() const; + bool isUint() const { return true; } + uint32_t asUint() const { return value; } + private: + uint32_t value; + }; + + class IntValue : public Value { + public: + IntValue(int32_t v) : value(v) {} + std::string str() const; + bool isInt() const { return true; } + int32_t asInt() const { return value; } + private: + int32_t value; + }; + + class Uint64Value : public Value { + public: + Uint64Value(uint64_t v) : value(v) {} + std::string str() const; + bool isUint64() const { return true; } + uint64_t asUint64() const { return value; } + private: + uint64_t value; + }; + + class Int64Value : public Value { + public: + Int64Value(int64_t v) : value(v) {} + std::string str() const; + bool isInt64() const { return true; } + int64_t asInt64() const { return value; } + private: + int64_t value; + }; + + class StringValue : public Value { + public: + StringValue(const std::string& v) : value(v) {} + StringValue(framing::Buffer& buffer, int tc); + std::string str() const { return value; } + bool isString() const { return true; } + std::string asString() const { return value; } + private: + std::string value; + }; + + class BoolValue : public Value { + public: + BoolValue(bool v) : value(v) {} + BoolValue(uint8_t v) : value(v != 0) {} + std::string str() const; + bool isBool() const { return true; } + bool asBool() const { return value; } + private: + bool value; + }; + + class FloatValue : public Value { + public: + FloatValue(float v) : value(v) {} + std::string str() const; + bool isFloat() const { return true; } + float asFloat() const { return value; } + private: + float value; + }; + + class DoubleValue : public Value { + public: + DoubleValue(double v) : value(v) {} + std::string str() const; + bool isDouble() const { return true; } + double asDouble() const { return value; } + private: + double value; + }; + + class UuidValue : public Value { + public: + UuidValue(const framing::Uuid& v) : value(v) {} + UuidValue(framing::Buffer& buffer); + std::string str() const { return value.str(); } + bool isUuid() const { return true; } + framing::Uuid asUuid() const { return value; } + private: + framing::Uuid value; + }; + + class MapValue : public Value { + public: + MapValue(const framing::FieldTable& v) : value(v) {} + MapValue(framing::Buffer& buffer); + std::string str() const; + bool isMap() const { return true; } + framing::FieldTable asMap() const { return value; } + private: + framing::FieldTable value; + }; + + class ValueFactory { + public: + static Value* newValue(int typeCode, framing::Buffer& buffer); + static void encodeValue(int typeCode, Value* value, framing::Buffer& buffer); + }; +} +} + +#endif diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp index e8d8d517dd..fbbaac1cf5 100644 --- a/cpp/src/qpid/framing/Uuid.cpp +++ b/cpp/src/qpid/framing/Uuid.cpp @@ -52,7 +52,7 @@ istream& operator>>(istream& in, Uuid& uuid) { return in; } -std::string Uuid::str() { +std::string Uuid::str() const { std::ostringstream os; os << *this; return os.str(); diff --git a/cpp/src/qpid/framing/Uuid.h b/cpp/src/qpid/framing/Uuid.h index 4e5de9ce41..2fcbb5a261 100644 --- a/cpp/src/qpid/framing/Uuid.h +++ b/cpp/src/qpid/framing/Uuid.h @@ -68,7 +68,7 @@ struct Uuid : public boost::array<uint8_t, 16> { uint32_t encodedSize() const { return size(); } /** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */ - std::string str(); + std::string str() const; template <class S> void serialize(S& s) { s.raw(begin(), size()); diff --git a/cpp/src/qpid/sys/SystemInfo.h b/cpp/src/qpid/sys/SystemInfo.h index 5a116cf8ee..d43fe34b04 100644 --- a/cpp/src/qpid/sys/SystemInfo.h +++ b/cpp/src/qpid/sys/SystemInfo.h @@ -63,6 +63,17 @@ namespace SystemInfo { std::string &version, std::string &machine); + /** + * Get the process ID of the current process. + */ + uint32_t getProcessId(); + + /** + * Get the process ID of the parent of the current process. + */ + uint32_t getParentProcessId(); + + }}} // namespace qpid::sys::SystemInfo #endif /*!QPID_SYS_SYSTEMINFO_H*/ diff --git a/cpp/src/qpid/sys/posix/SystemInfo.cpp b/cpp/src/qpid/sys/posix/SystemInfo.cpp index 9bbd023e29..938d4861c4 100755 --- a/cpp/src/qpid/sys/posix/SystemInfo.cpp +++ b/cpp/src/qpid/sys/posix/SystemInfo.cpp @@ -94,4 +94,16 @@ void SystemInfo::getSystemId (std::string &osName, } } +uint32_t SystemInfo::getProcessId() +{ + return (uint32_t) ::getpid(); +} + +uint32_t SystemInfo::getParentProcessId() +{ + return (uint32_t) ::getppid(); +} + + + }} // namespace qpid::sys diff --git a/cpp/src/tests/ConsoleTest.cpp b/cpp/src/tests/ConsoleTest.cpp new file mode 100644 index 0000000000..1d55b13f3c --- /dev/null +++ b/cpp/src/tests/ConsoleTest.cpp @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/console/Package.h" +#include "qpid/console/ClassKey.h" +#include "unit_test.h" + +QPID_AUTO_TEST_SUITE(ConsoleTestSuite) + +using namespace qpid::framing; +using namespace qpid::console; + +QPID_AUTO_TEST_CASE(testClassKey) { + uint8_t hash[16] = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15}; + ClassKey k("com.redhat.test", "class", hash); + + BOOST_CHECK_EQUAL(k.getPackageName(), "com.redhat.test"); + BOOST_CHECK_EQUAL(k.getClassName(), "class"); + BOOST_CHECK_EQUAL(k.getHashString(), "00010203-04050607-08090a0b-0c0d0e0f"); + BOOST_CHECK_EQUAL(k.str(), "com.redhat.test:class(00010203-04050607-08090a0b-0c0d0e0f)"); +} + +QPID_AUTO_TEST_SUITE_END() + + diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index cc39309dac..3a608b2bae 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -25,6 +25,7 @@ extra_libs = lib_client = $(abs_builddir)/../libqpidclient.la lib_common = $(abs_builddir)/../libqpidcommon.la lib_broker = $(abs_builddir)/../libqpidbroker.la +lib_console = $(abs_builddir)/../libqmfconsole.la # lib_amqp_0_10 = $(abs_builddir)/../libqpidamqp_0_10.la # @@ -47,7 +48,7 @@ CLEANFILES= TESTS+=unit_test check_PROGRAMS+=unit_test unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \ - $(lib_client) $(lib_broker) + $(lib_client) $(lib_broker) $(lib_console) unit_test_SOURCES= unit_test.cpp unit_test.h \ BrokerFixture.h SocketProxy.h \ @@ -86,7 +87,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ ConnectionOptions.h \ ForkedBroker.h \ ManagementTest.cpp \ - MessageReplayTracker.cpp + MessageReplayTracker.cpp \ + ConsoleTest.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp |