diff options
Diffstat (limited to 'qpid/cpp/src/qmf/engine/ConsoleImpl.cpp')
-rw-r--r-- | qpid/cpp/src/qmf/engine/ConsoleImpl.cpp | 458 |
1 files changed, 458 insertions, 0 deletions
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp new file mode 100644 index 0000000000..4a5da31bdc --- /dev/null +++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/engine/ConsoleImpl.h" +#include "qmf/engine/MessageImpl.h" +#include "qmf/engine/SchemaImpl.h" +#include "qmf/engine/Typecode.h" +#include "qmf/engine/ObjectImpl.h" +#include "qmf/engine/ObjectIdImpl.h" +#include "qmf/engine/QueryImpl.h" +#include "qmf/engine/ValueImpl.h" +#include "qmf/engine/Protocol.h" +#include "qmf/engine/SequenceManager.h" +#include "qmf/engine/BrokerProxyImpl.h" +#include <qpid/framing/Buffer.h> +#include <qpid/framing/Uuid.h> +#include <qpid/framing/FieldTable.h> +#include <qpid/framing/FieldValue.h> +#include <qpid/log/Statement.h> +#include <qpid/sys/Time.h> +#include <qpid/sys/SystemInfo.h> +#include <string.h> +#include <iostream> +#include <fstream> + +using namespace std; +using namespace qmf::engine; +using namespace qpid::framing; +using namespace qpid::sys; + +namespace { + const char* QMF_EXCHANGE = "qpid.management"; +} + +#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} + +ConsoleEvent ConsoleEventImpl::copy() +{ + ConsoleEvent item; + + ::memset(&item, 0, sizeof(ConsoleEvent)); + item.kind = kind; + item.agent = agent.get(); + item.classKey = classKey; + item.object = object.get(); + item.context = context; + item.event = event.get(); + item.timestamp = timestamp; + item.hasProps = hasProps; + item.hasStats = hasStats; + + STRING_REF(name); + + return item; +} + +ConsoleImpl::ConsoleImpl(const ConsoleSettings& s) : settings(s) +{ + bindingList.push_back(pair<string, string>(string(), "schema.#")); + if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) { + bindingList.push_back(pair<string, string>(string(), "console.#")); + } else { + if (settings.rcvObjects && !settings.userBindings) + bindingList.push_back(pair<string, string>(string(), "console.obj.#")); + else + bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent")); + if (settings.rcvEvents) + bindingList.push_back(pair<string, string>(string(), "console.event.#")); + if (settings.rcvHeartbeats) + bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#")); + } +} + +ConsoleImpl::~ConsoleImpl() +{ + // This function intentionally left blank. +} + +bool ConsoleImpl::getEvent(ConsoleEvent& event) const +{ + Mutex::ScopedLock _lock(lock); + if (eventQueue.empty()) + return false; + event = eventQueue.front()->copy(); + return true; +} + +void ConsoleImpl::popEvent() +{ + Mutex::ScopedLock _lock(lock); + if (!eventQueue.empty()) + eventQueue.pop_front(); +} + +void ConsoleImpl::addConnection(BrokerProxy& broker, void* /*context*/) +{ + Mutex::ScopedLock _lock(lock); + brokerList.push_back(broker.impl); +} + +void ConsoleImpl::delConnection(BrokerProxy& broker) +{ + Mutex::ScopedLock _lock(lock); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + if (*iter == broker.impl) { + brokerList.erase(iter); + break; + } +} + +uint32_t ConsoleImpl::packageCount() const +{ + Mutex::ScopedLock _lock(lock); + return packages.size(); +} + +const string& ConsoleImpl::getPackageName(uint32_t idx) const +{ + const static string empty; + + Mutex::ScopedLock _lock(lock); + if (idx >= packages.size()) + return empty; + + PackageList::const_iterator iter = packages.begin(); + for (uint32_t i = 0; i < idx; i++) iter++; + return iter->first; +} + +uint32_t ConsoleImpl::classCount(const char* packageName) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(packageName); + if (pIter == packages.end()) + return 0; + + const ObjectClassList& oList = pIter->second.first; + const EventClassList& eList = pIter->second.second; + + return oList.size() + eList.size(); +} + +const SchemaClassKey* ConsoleImpl::getClass(const char* packageName, uint32_t idx) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(packageName); + if (pIter == packages.end()) + return 0; + + const ObjectClassList& oList = pIter->second.first; + const EventClassList& eList = pIter->second.second; + uint32_t count = 0; + + for (ObjectClassList::const_iterator oIter = oList.begin(); + oIter != oList.end(); oIter++) { + if (count == idx) + return oIter->second->getClassKey(); + count++; + } + + for (EventClassList::const_iterator eIter = eList.begin(); + eIter != eList.end(); eIter++) { + if (count == idx) + return eIter->second->getClassKey(); + count++; + } + + return 0; +} + +ClassKind ConsoleImpl::getClassKind(const SchemaClassKey* key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return CLASS_OBJECT; + + const EventClassList& eList = pIter->second.second; + if (eList.find(key) != eList.end()) + return CLASS_EVENT; + return CLASS_OBJECT; +} + +const SchemaObjectClass* ConsoleImpl::getObjectClass(const SchemaClassKey* key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return 0; + + const ObjectClassList& oList = pIter->second.first; + ObjectClassList::const_iterator iter = oList.find(key); + if (iter == oList.end()) + return 0; + return iter->second; +} + +const SchemaEventClass* ConsoleImpl::getEventClass(const SchemaClassKey* key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return 0; + + const EventClassList& eList = pIter->second.second; + EventClassList::const_iterator iter = eList.find(key); + if (iter == eList.end()) + return 0; + return iter->second; +} + +void ConsoleImpl::bindPackage(const char* packageName) +{ + stringstream key; + key << "console.obj.*.*." << packageName << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); +} + +void ConsoleImpl::bindClass(const SchemaClassKey* classKey) +{ + stringstream key; + key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); +} + +void ConsoleImpl::bindClass(const char* packageName, const char* className) +{ + stringstream key; + key << "console.obj.*.*." << packageName << "." << className << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); +} + + +void ConsoleImpl::bindEvent(const SchemaClassKey* classKey) +{ + bindEvent(classKey->getPackageName(), classKey->getClassName()); +} + +void ConsoleImpl::bindEvent(const char* packageName, const char* eventName) +{ + if (!settings.userBindings) throw qpid::Exception("Console not configured for userBindings."); + if (settings.rcvEvents) throw qpid::Exception("Console already configured to receive all events."); + + stringstream key; + key << "console.event.*.*." << packageName; + if (eventName && *eventName) { + key << "." << eventName << ".#"; + } else { + key << ".#"; + } + + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); +} + +/* +void ConsoleImpl::startSync(const Query& query, void* context, SyncQuery& sync) +{ +} + +void ConsoleImpl::touchSync(SyncQuery& sync) +{ +} + +void ConsoleImpl::endSync(SyncQuery& sync) +{ +} +*/ + +void ConsoleImpl::learnPackage(const string& packageName) +{ + Mutex::ScopedLock _lock(lock); + if (packages.find(packageName) == packages.end()) { + packages.insert(pair<string, pair<ObjectClassList, EventClassList> > + (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList()))); + eventNewPackage(packageName); + } +} + +void ConsoleImpl::learnClass(SchemaObjectClass* cls) +{ + Mutex::ScopedLock _lock(lock); + const SchemaClassKey* key = cls->getClassKey(); + PackageList::iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return; + + ObjectClassList& list = pIter->second.first; + if (list.find(key) == list.end()) { + list[key] = cls; + eventNewClass(key); + } +} + +void ConsoleImpl::learnClass(SchemaEventClass* cls) +{ + Mutex::ScopedLock _lock(lock); + const SchemaClassKey* key = cls->getClassKey(); + PackageList::iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return; + + EventClassList& list = pIter->second.second; + if (list.find(key) == list.end()) { + list[key] = cls; + eventNewClass(key); + } +} + +bool ConsoleImpl::haveClass(const SchemaClassKey* key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return false; + + const ObjectClassList& oList = pIter->second.first; + const EventClassList& eList = pIter->second.second; + + return oList.find(key) != oList.end() || eList.find(key) != eList.end(); +} + +SchemaObjectClass* ConsoleImpl::getSchema(const SchemaClassKey* key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return 0; + + const ObjectClassList& oList = pIter->second.first; + ObjectClassList::const_iterator iter = oList.find(key); + if (iter == oList.end()) + return 0; + + return iter->second; +} + +void ConsoleImpl::eventAgentAdded(boost::shared_ptr<AgentProxy> agent) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_ADDED)); + event->agent = agent; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventAgentDeleted(boost::shared_ptr<AgentProxy> agent) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_DELETED)); + event->agent = agent; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventNewPackage(const string& packageName) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_PACKAGE)); + event->name = packageName; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventNewClass(const SchemaClassKey* key) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_CLASS)); + event->classKey = key; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventObjectUpdate(ObjectPtr object, bool prop, bool stat) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::OBJECT_UPDATE)); + event->object = object; + event->hasProps = prop; + event->hasStats = stat; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_HEARTBEAT)); + event->agent = agent; + event->timestamp = timestamp; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + + +void ConsoleImpl::eventEventReceived(EventPtr event) +{ + ConsoleEventImpl::Ptr console_event(new ConsoleEventImpl(ConsoleEvent::EVENT_RECEIVED)); + console_event->event = event; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(console_event); +} + +//================================================================== +// Wrappers +//================================================================== + +Console::Console(const ConsoleSettings& settings) : impl(new ConsoleImpl(settings)) {} +Console::~Console() { delete impl; } +bool Console::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); } +void Console::popEvent() { impl->popEvent(); } +void Console::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); } +void Console::delConnection(BrokerProxy& broker) { impl->delConnection(broker); } +uint32_t Console::packageCount() const { return impl->packageCount(); } +const char* Console::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); } +uint32_t Console::classCount(const char* packageName) const { return impl->classCount(packageName); } +const SchemaClassKey* Console::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); } +ClassKind Console::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); } +const SchemaObjectClass* Console::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); } +const SchemaEventClass* Console::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); } +void Console::bindPackage(const char* packageName) { impl->bindPackage(packageName); } +void Console::bindClass(const SchemaClassKey* key) { impl->bindClass(key); } +void Console::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); } + +void Console::bindEvent(const SchemaClassKey *key) { impl->bindEvent(key); } +void Console::bindEvent(const char* packageName, const char* eventName) { impl->bindEvent(packageName, eventName); } + +//void Console::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); } +//void Console::touchSync(SyncQuery& sync) { impl->touchSync(sync); } +//void Console::endSync(SyncQuery& sync) { impl->endSync(sync); } + + |