summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/engine/ConsoleImpl.cpp')
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.cpp458
1 files changed, 458 insertions, 0 deletions
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
new file mode 100644
index 0000000000..4a5da31bdc
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/ConsoleImpl.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qmf/engine/SchemaImpl.h"
+#include "qmf/engine/Typecode.h"
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/ObjectIdImpl.h"
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/Protocol.h"
+#include "qmf/engine/SequenceManager.h"
+#include "qmf/engine/BrokerProxyImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
+#include <string.h>
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace {
+ const char* QMF_EXCHANGE = "qpid.management";
+}
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+ConsoleEvent ConsoleEventImpl::copy()
+{
+ ConsoleEvent item;
+
+ ::memset(&item, 0, sizeof(ConsoleEvent));
+ item.kind = kind;
+ item.agent = agent.get();
+ item.classKey = classKey;
+ item.object = object.get();
+ item.context = context;
+ item.event = event.get();
+ item.timestamp = timestamp;
+ item.hasProps = hasProps;
+ item.hasStats = hasStats;
+
+ STRING_REF(name);
+
+ return item;
+}
+
+ConsoleImpl::ConsoleImpl(const ConsoleSettings& s) : settings(s)
+{
+ bindingList.push_back(pair<string, string>(string(), "schema.#"));
+ if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
+ bindingList.push_back(pair<string, string>(string(), "console.#"));
+ } else {
+ if (settings.rcvObjects && !settings.userBindings)
+ bindingList.push_back(pair<string, string>(string(), "console.obj.#"));
+ else
+ bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent"));
+ if (settings.rcvEvents)
+ bindingList.push_back(pair<string, string>(string(), "console.event.#"));
+ if (settings.rcvHeartbeats)
+ bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#"));
+ }
+}
+
+ConsoleImpl::~ConsoleImpl()
+{
+ // This function intentionally left blank.
+}
+
+bool ConsoleImpl::getEvent(ConsoleEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void ConsoleImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+void ConsoleImpl::addConnection(BrokerProxy& broker, void* /*context*/)
+{
+ Mutex::ScopedLock _lock(lock);
+ brokerList.push_back(broker.impl);
+}
+
+void ConsoleImpl::delConnection(BrokerProxy& broker)
+{
+ Mutex::ScopedLock _lock(lock);
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ if (*iter == broker.impl) {
+ brokerList.erase(iter);
+ break;
+ }
+}
+
+uint32_t ConsoleImpl::packageCount() const
+{
+ Mutex::ScopedLock _lock(lock);
+ return packages.size();
+}
+
+const string& ConsoleImpl::getPackageName(uint32_t idx) const
+{
+ const static string empty;
+
+ Mutex::ScopedLock _lock(lock);
+ if (idx >= packages.size())
+ return empty;
+
+ PackageList::const_iterator iter = packages.begin();
+ for (uint32_t i = 0; i < idx; i++) iter++;
+ return iter->first;
+}
+
+uint32_t ConsoleImpl::classCount(const char* packageName) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+
+ return oList.size() + eList.size();
+}
+
+const SchemaClassKey* ConsoleImpl::getClass(const char* packageName, uint32_t idx) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+ uint32_t count = 0;
+
+ for (ObjectClassList::const_iterator oIter = oList.begin();
+ oIter != oList.end(); oIter++) {
+ if (count == idx)
+ return oIter->second->getClassKey();
+ count++;
+ }
+
+ for (EventClassList::const_iterator eIter = eList.begin();
+ eIter != eList.end(); eIter++) {
+ if (count == idx)
+ return eIter->second->getClassKey();
+ count++;
+ }
+
+ return 0;
+}
+
+ClassKind ConsoleImpl::getClassKind(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return CLASS_OBJECT;
+
+ const EventClassList& eList = pIter->second.second;
+ if (eList.find(key) != eList.end())
+ return CLASS_EVENT;
+ return CLASS_OBJECT;
+}
+
+const SchemaObjectClass* ConsoleImpl::getObjectClass(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(key);
+ if (iter == oList.end())
+ return 0;
+ return iter->second;
+}
+
+const SchemaEventClass* ConsoleImpl::getEventClass(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const EventClassList& eList = pIter->second.second;
+ EventClassList::const_iterator iter = eList.find(key);
+ if (iter == eList.end())
+ return 0;
+ return iter->second;
+}
+
+void ConsoleImpl::bindPackage(const char* packageName)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+void ConsoleImpl::bindClass(const SchemaClassKey* classKey)
+{
+ stringstream key;
+ key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+void ConsoleImpl::bindClass(const char* packageName, const char* className)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << "." << className << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+
+void ConsoleImpl::bindEvent(const SchemaClassKey* classKey)
+{
+ bindEvent(classKey->getPackageName(), classKey->getClassName());
+}
+
+void ConsoleImpl::bindEvent(const char* packageName, const char* eventName)
+{
+ if (!settings.userBindings) throw qpid::Exception("Console not configured for userBindings.");
+ if (settings.rcvEvents) throw qpid::Exception("Console already configured to receive all events.");
+
+ stringstream key;
+ key << "console.event.*.*." << packageName;
+ if (eventName && *eventName) {
+ key << "." << eventName << ".#";
+ } else {
+ key << ".#";
+ }
+
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+/*
+void ConsoleImpl::startSync(const Query& query, void* context, SyncQuery& sync)
+{
+}
+
+void ConsoleImpl::touchSync(SyncQuery& sync)
+{
+}
+
+void ConsoleImpl::endSync(SyncQuery& sync)
+{
+}
+*/
+
+void ConsoleImpl::learnPackage(const string& packageName)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (packages.find(packageName) == packages.end()) {
+ packages.insert(pair<string, pair<ObjectClassList, EventClassList> >
+ (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList())));
+ eventNewPackage(packageName);
+ }
+}
+
+void ConsoleImpl::learnClass(SchemaObjectClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ const SchemaClassKey* key = cls->getClassKey();
+ PackageList::iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return;
+
+ ObjectClassList& list = pIter->second.first;
+ if (list.find(key) == list.end()) {
+ list[key] = cls;
+ eventNewClass(key);
+ }
+}
+
+void ConsoleImpl::learnClass(SchemaEventClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ const SchemaClassKey* key = cls->getClassKey();
+ PackageList::iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return;
+
+ EventClassList& list = pIter->second.second;
+ if (list.find(key) == list.end()) {
+ list[key] = cls;
+ eventNewClass(key);
+ }
+}
+
+bool ConsoleImpl::haveClass(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return false;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+
+ return oList.find(key) != oList.end() || eList.find(key) != eList.end();
+}
+
+SchemaObjectClass* ConsoleImpl::getSchema(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(key);
+ if (iter == oList.end())
+ return 0;
+
+ return iter->second;
+}
+
+void ConsoleImpl::eventAgentAdded(boost::shared_ptr<AgentProxy> agent)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_ADDED));
+ event->agent = agent;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventAgentDeleted(boost::shared_ptr<AgentProxy> agent)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_DELETED));
+ event->agent = agent;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventNewPackage(const string& packageName)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_PACKAGE));
+ event->name = packageName;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventNewClass(const SchemaClassKey* key)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_CLASS));
+ event->classKey = key;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventObjectUpdate(ObjectPtr object, bool prop, bool stat)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::OBJECT_UPDATE));
+ event->object = object;
+ event->hasProps = prop;
+ event->hasStats = stat;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_HEARTBEAT));
+ event->agent = agent;
+ event->timestamp = timestamp;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+
+void ConsoleImpl::eventEventReceived(EventPtr event)
+{
+ ConsoleEventImpl::Ptr console_event(new ConsoleEventImpl(ConsoleEvent::EVENT_RECEIVED));
+ console_event->event = event;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(console_event);
+}
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Console::Console(const ConsoleSettings& settings) : impl(new ConsoleImpl(settings)) {}
+Console::~Console() { delete impl; }
+bool Console::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
+void Console::popEvent() { impl->popEvent(); }
+void Console::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); }
+void Console::delConnection(BrokerProxy& broker) { impl->delConnection(broker); }
+uint32_t Console::packageCount() const { return impl->packageCount(); }
+const char* Console::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); }
+uint32_t Console::classCount(const char* packageName) const { return impl->classCount(packageName); }
+const SchemaClassKey* Console::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); }
+ClassKind Console::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); }
+const SchemaObjectClass* Console::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); }
+const SchemaEventClass* Console::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); }
+void Console::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
+void Console::bindClass(const SchemaClassKey* key) { impl->bindClass(key); }
+void Console::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
+
+void Console::bindEvent(const SchemaClassKey *key) { impl->bindEvent(key); }
+void Console::bindEvent(const char* packageName, const char* eventName) { impl->bindEvent(packageName, eventName); }
+
+//void Console::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); }
+//void Console::touchSync(SyncQuery& sync) { impl->touchSync(sync); }
+//void Console::endSync(SyncQuery& sync) { impl->endSync(sync); }
+
+