summaryrefslogtreecommitdiff
path: root/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/cpp/src/qpid/console/SessionManager.cpp')
-rw-r--r--trunk/qpid/cpp/src/qpid/console/SessionManager.cpp481
1 files changed, 481 insertions, 0 deletions
diff --git a/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp b/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp
new file mode 100644
index 0000000000..0285c5f34a
--- /dev/null
+++ b/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp
@@ -0,0 +1,481 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/console/SessionManager.h"
+#include "qpid/console/Schema.h"
+#include "qpid/console/Agent.h"
+#include "qpid/console/ConsoleListener.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FieldTable.h"
+
+using namespace qpid::console;
+using namespace qpid::sys;
+using namespace qpid;
+using namespace std;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+
+SessionManager::SessionManager(ConsoleListener* _listener, Settings _settings) :
+ listener(_listener), settings(_settings)
+{
+ bindingKeys();
+}
+
+SessionManager::~SessionManager()
+{
+ for (vector<Broker*>::iterator iter = brokers.begin();
+ iter != brokers.end(); iter++)
+ delete *iter;
+}
+
+Broker* SessionManager::addBroker(client::ConnectionSettings& settings)
+{
+ Broker* broker(new Broker(*this, settings));
+ {
+ Mutex::ScopedLock l(brokerListLock);
+ brokers.push_back(broker);
+ }
+ return broker;
+}
+
+void SessionManager::delBroker(Broker* broker)
+{
+ Mutex::ScopedLock l(brokerListLock);
+ for (vector<Broker*>::iterator iter = brokers.begin();
+ iter != brokers.end(); iter++)
+ if (*iter == broker) {
+ brokers.erase(iter);
+ delete broker;
+ return;
+ }
+}
+
+void SessionManager::getPackages(NameVector& packageNames)
+{
+ allBrokersStable();
+ packageNames.clear();
+ {
+ Mutex::ScopedLock l(lock);
+ for (map<string, Package*>::iterator iter = packages.begin();
+ iter != packages.end(); iter++)
+ packageNames.push_back(iter->first);
+ }
+}
+
+void SessionManager::getClasses(KeyVector& classKeys, const std::string& packageName)
+{
+ allBrokersStable();
+ classKeys.clear();
+ map<string, Package*>::iterator iter = packages.find(packageName);
+ if (iter == packages.end())
+ return;
+
+ Package& package = *(iter->second);
+ for (Package::ClassMap::const_iterator piter = package.classes.begin();
+ piter != package.classes.end(); piter++) {
+ ClassKey key(piter->second->getClassKey());
+ classKeys.push_back(key);
+ }
+}
+
+SchemaClass& SessionManager::getSchema(const ClassKey& classKey)
+{
+ allBrokersStable();
+ map<string, Package*>::iterator iter = packages.find(classKey.getPackageName());
+ if (iter == packages.end())
+ throw Exception("Unknown package");
+
+ Package& package = *(iter->second);
+ Package::NameHash key(classKey.getClassName(), classKey.getHash());
+ Package::ClassMap::iterator cIter = package.classes.find(key);
+ if (cIter == package.classes.end())
+ throw Exception("Unknown class");
+
+ return *(cIter->second);
+}
+
+void SessionManager::bindPackage(const std::string& packageName)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << ".#";
+ bindingKeyList.push_back(key.str());
+ for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++)
+ (*iter)->addBinding(key.str());
+}
+
+void SessionManager::bindClass(const ClassKey& classKey)
+{
+ bindClass(classKey.getPackageName(), classKey.getClassName());
+}
+
+void SessionManager::bindClass(const std::string& packageName, const std::string& className)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << "." << className << ".#";
+ bindingKeyList.push_back(key.str());
+ for (vector<Broker*>::iterator iter = brokers.begin();
+ iter != brokers.end(); iter++)
+ (*iter)->addBinding(key.str());
+}
+
+void SessionManager::getAgents(Agent::Vector& agents, Broker* broker)
+{
+ agents.clear();
+ if (broker != 0) {
+ broker->appendAgents(agents);
+ } else {
+ for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) {
+ (*iter)->appendAgents(agents);
+ }
+ }
+}
+
+void SessionManager::getObjects(Object::Vector& objects, const std::string& className,
+ Broker* _broker, Agent* _agent)
+{
+ Agent::Vector agentList;
+
+ if (_agent != 0) {
+ agentList.push_back(_agent);
+ _agent->getBroker()->waitForStable();
+ } else {
+ if (_broker != 0) {
+ _broker->appendAgents(agentList);
+ _broker->waitForStable();
+ } else {
+ allBrokersStable();
+ Mutex::ScopedLock _lock(brokerListLock);
+ for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) {
+ (*iter)->appendAgents(agentList);
+ }
+ }
+ }
+
+ FieldTable ft;
+ uint32_t sequence;
+ ft.setString("_class", className);
+
+ getResult.clear();
+ syncSequenceList.clear();
+ error = string();
+
+ if (agentList.empty()) {
+ objects = getResult;
+ return;
+ }
+
+ for (Agent::Vector::iterator iter = agentList.begin(); iter != agentList.end(); iter++) {
+ Agent* agent = *iter;
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+ stringstream routingKey;
+ routingKey << "agent." << agent->getBrokerBank() << "." << agent->getAgentBank();
+ {
+ Mutex::ScopedLock _lock(lock);
+ sequence = sequenceManager.reserve("multiget");
+ syncSequenceList.insert(sequence);
+ }
+ agent->getBroker()->encodeHeader(buffer, 'G', sequence);
+ ft.encode(buffer);
+ uint32_t length = buffer.getPosition();
+ buffer.reset();
+ agent->getBroker()->connThreadBody.sendBuffer(buffer, length, "qpid.management", routingKey.str());
+ }
+
+ {
+ Mutex::ScopedLock _lock(lock);
+ sys::AbsTime startTime = sys::now();
+ while (!syncSequenceList.empty() && error.empty()) {
+ cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC));
+ sys::AbsTime currTime = sys::now();
+ if (sys::Duration(startTime, currTime) > settings.getTimeout * TIME_SEC)
+ break;
+ }
+ }
+
+ objects = getResult;
+}
+
+void SessionManager::bindingKeys()
+{
+ bindingKeyList.push_back("schema.#");
+ if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
+ bindingKeyList.push_back("console.#");
+ } else {
+ if (settings.rcvObjects && !settings.userBindings)
+ bindingKeyList.push_back("console.obj.#");
+ else
+ bindingKeyList.push_back("console.obj.*.*.org.apache.qpid.broker.agent");
+ if (settings.rcvEvents)
+ bindingKeyList.push_back("console.event.#");
+ if (settings.rcvHeartbeats)
+ bindingKeyList.push_back("console.heartbeat");
+ }
+}
+
+void SessionManager::allBrokersStable()
+{
+ Mutex::ScopedLock l(brokerListLock);
+ for (vector<Broker*>::iterator iter = brokers.begin();
+ iter != brokers.end(); iter++)
+ if ((*iter)->isConnected())
+ (*iter)->waitForStable();
+}
+
+void SessionManager::startProtocol(Broker* broker)
+{
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ broker->encodeHeader(buffer, 'B');
+ uint32_t length = 512 - buffer.available();
+ buffer.reset();
+ broker->connThreadBody.sendBuffer(buffer, length);
+}
+
+
+void SessionManager::handleBrokerResp(Broker* broker, Buffer& inBuffer, uint32_t)
+{
+ framing::Uuid brokerId;
+
+ brokerId.decode(inBuffer);
+ broker->setBrokerId(brokerId);
+
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ uint32_t sequence = sequenceManager.reserve("startup");
+ broker->encodeHeader(buffer, 'P', sequence);
+ uint32_t length = 512 - buffer.available();
+ buffer.reset();
+ broker->connThreadBody.sendBuffer(buffer, length);
+
+ if (listener != 0) {
+ listener->brokerInfo(*broker);
+ }
+}
+
+void SessionManager::handlePackageInd(Broker* broker, Buffer& inBuffer, uint32_t)
+{
+ string packageName;
+ inBuffer.getShortString(packageName);
+
+ {
+ Mutex::ScopedLock l(lock);
+ map<string, Package*>::iterator iter = packages.find(packageName);
+ if (iter == packages.end()) {
+ packages[packageName] = new Package(packageName);
+ if (listener != 0)
+ listener->newPackage(packageName);
+ }
+ }
+
+ broker->incOutstanding();
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ uint32_t sequence = sequenceManager.reserve("startup");
+ broker->encodeHeader(buffer, 'Q', sequence);
+ buffer.putShortString(packageName);
+ uint32_t length = 512 - buffer.available();
+ buffer.reset();
+ broker->connThreadBody.sendBuffer(buffer, length);
+}
+
+void SessionManager::handleCommandComplete(Broker* broker, Buffer& inBuffer, uint32_t sequence)
+{
+ Mutex::ScopedLock l(lock);
+ uint32_t resultCode = inBuffer.getLong();
+ string resultText;
+ inBuffer.getShortString(resultText);
+ string context = sequenceManager.release(sequence);
+ if (resultCode != 0)
+ QPID_LOG(debug, "Received error in completion: " << resultCode << " " << resultText);
+ if (context == "startup") {
+ broker->decOutstanding();
+ } else if (context == "multiget") {
+ if (syncSequenceList.count(sequence) == 1) {
+ syncSequenceList.erase(sequence);
+ if (syncSequenceList.empty()) {
+ cv.notify();
+ }
+ }
+ }
+ // TODO: Other context cases
+}
+
+void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t)
+{
+ uint8_t kind;
+ string packageName;
+ string className;
+ uint8_t hash[16];
+
+ kind = inBuffer.getOctet();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+
+ {
+ Mutex::ScopedLock l(lock);
+ map<string, Package*>::iterator pIter = packages.find(packageName);
+ if (pIter == packages.end() || pIter->second->getClass(className, hash))
+ return;
+ }
+
+ broker->incOutstanding();
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ uint32_t sequence = sequenceManager.reserve("startup");
+ broker->encodeHeader(buffer, 'S', sequence);
+ buffer.putShortString(packageName);
+ buffer.putShortString(className);
+ buffer.putBin128(hash);
+ uint32_t length = 512 - buffer.available();
+ buffer.reset();
+ broker->connThreadBody.sendBuffer(buffer, length);
+}
+
+void SessionManager::handleMethodResp(Broker* broker, Buffer& buffer, uint32_t sequence)
+{
+ if (broker->methodObject) {
+ broker->methodObject->handleMethodResp(buffer, sequence);
+ }
+}
+
+void SessionManager::handleHeartbeatInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/)
+{
+}
+
+void SessionManager::handleEventInd(Broker* broker, Buffer& buffer, uint32_t /*sequence*/)
+{
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ SchemaClass* schemaClass;
+
+ buffer.getShortString(packageName);
+ buffer.getShortString(className);
+ buffer.getBin128(hash);
+
+ {
+ Mutex::ScopedLock l(lock);
+ map<string, Package*>::iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return;
+ schemaClass = pIter->second->getClass(className, hash);
+ if (schemaClass == 0)
+ return;
+ }
+
+ Event event(broker, schemaClass, buffer);
+
+ if (listener)
+ listener->event(event);
+}
+
+void SessionManager::handleSchemaResp(Broker* broker, Buffer& inBuffer, uint32_t sequence)
+{
+ uint8_t kind;
+ string packageName;
+ string className;
+ uint8_t hash[16];
+
+ kind = inBuffer.getOctet();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+
+ {
+ Mutex::ScopedLock l(lock);
+ map<string, Package*>::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end() && !pIter->second->getClass(className, hash)) {
+ ClassKey key(packageName, className, hash);
+ SchemaClass* schemaClass(new SchemaClass(kind, key, inBuffer));
+ pIter->second->addClass(className, hash, schemaClass);
+ if (listener != 0) {
+ listener->newClass(schemaClass->getClassKey());
+ }
+ }
+ }
+
+ sequenceManager.release(sequence);
+ broker->decOutstanding();
+}
+
+void SessionManager::handleContentInd(Broker* broker, Buffer& buffer, uint32_t sequence, bool prop, bool stat)
+{
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ SchemaClass* schemaClass;
+
+ buffer.getShortString(packageName);
+ buffer.getShortString(className);
+ buffer.getBin128(hash);
+
+ {
+ Mutex::ScopedLock l(lock);
+ map<string, Package*>::iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return;
+ schemaClass = pIter->second->getClass(className, hash);
+ if (schemaClass == 0)
+ return;
+ }
+
+ Object object(broker, schemaClass, buffer, prop, stat);
+
+ if (prop && className == "agent" && packageName == "org.apache.qpid.broker")
+ broker->updateAgent(object);
+
+ {
+ Mutex::ScopedLock l(lock);
+ if (syncSequenceList.count(sequence) == 1) {
+ if (!object.isDeleted())
+ getResult.push_back(object);
+ return;
+ }
+ }
+
+ if (listener) {
+ if (prop)
+ listener->objectProps(*broker, object);
+ if (stat)
+ listener->objectStats(*broker, object);
+ }
+}
+
+void SessionManager::handleBrokerConnect(Broker* broker)
+{
+ if (listener != 0)
+ listener->brokerConnected(*broker);
+}
+
+void SessionManager::handleBrokerDisconnect(Broker* broker)
+{
+ if (listener != 0)
+ listener->brokerDisconnected(*broker);
+}
+