summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/ConsoleEngine.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-09-02 12:32:56 +0000
committerTed Ross <tross@apache.org>2009-09-02 12:32:56 +0000
commit93596b93d29ebcb76719a9e7a9e4793d4738dfed (patch)
tree199af52eab71373be7b5aca8433c9bb9251bd98e /qpid/cpp/src/qmf/ConsoleEngine.cpp
parent840ec0fa37770ac7fa35ee5b0dd7c7b891198f31 (diff)
downloadqpid-python-93596b93d29ebcb76719a9e7a9e4793d4738dfed.tar.gz
More QMF implementation:
- Added schema handling and exchange to the console - Improved the connection performance by switching to pre-acquired mode git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@810482 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qmf/ConsoleEngine.cpp')
-rw-r--r--qpid/cpp/src/qmf/ConsoleEngine.cpp280
1 files changed, 235 insertions, 45 deletions
diff --git a/qpid/cpp/src/qmf/ConsoleEngine.cpp b/qpid/cpp/src/qmf/ConsoleEngine.cpp
index 7620e875eb..3d1b378b68 100644
--- a/qpid/cpp/src/qmf/ConsoleEngine.cpp
+++ b/qpid/cpp/src/qmf/ConsoleEngine.cpp
@@ -137,6 +137,7 @@ namespace qmf {
BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName);
BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
BrokerEventImpl::Ptr eventSetupComplete();
+ BrokerEventImpl::Ptr eventStable();
void handleBrokerResponse(Buffer& inBuffer, uint32_t seq);
void handlePackageIndication(Buffer& inBuffer, uint32_t seq);
@@ -147,7 +148,7 @@ namespace qmf {
void handleEventIndication(Buffer& inBuffer, uint32_t seq);
void handleSchemaResponse(Buffer& inBuffer, uint32_t seq);
void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
- void incOutstanding();
+ void incOutstandingLH();
void decOutstanding();
};
@@ -178,12 +179,12 @@ namespace qmf {
uint32_t classCount(const char* packageName) const;
const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
- ClassKind getClassKind(const SchemaClassKey& key) const;
- const SchemaObjectClass* getObjectClass(const SchemaClassKey& key) const;
- const SchemaEventClass* getEventClass(const SchemaClassKey& key) const;
+ ClassKind getClassKind(const SchemaClassKey* key) const;
+ const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
+ const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
void bindPackage(const char* packageName);
- void bindClass(const SchemaClassKey& key);
+ void bindClass(const SchemaClassKey* key);
void bindClass(const char* packageName, const char* className);
uint32_t agentCount() const;
@@ -203,8 +204,28 @@ namespace qmf {
const ConsoleSettings& settings;
mutable Mutex lock;
deque<ConsoleEventImpl::Ptr> eventQueue;
- vector<BrokerProxyImpl::Ptr> brokerList;
+ vector<BrokerProxyImpl*> brokerList;
vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
+
+ // Declare a compare class for the class maps that compares the dereferenced
+ // class key pointers. The default behavior would be to compare the pointer
+ // addresses themselves.
+ struct KeyCompare {
+ bool operator()(const SchemaClassKeyImpl* left, const SchemaClassKeyImpl* right) const {
+ return *left < *right;
+ }
+ };
+
+ typedef map<const SchemaClassKeyImpl*, SchemaObjectClassImpl::Ptr, KeyCompare> ObjectClassList;
+ typedef map<const SchemaClassKeyImpl*, SchemaEventClassImpl::Ptr, KeyCompare> EventClassList;
+ typedef map<string, pair<ObjectClassList, EventClassList> > PackageList;
+
+ PackageList packages;
+
+ void learnPackage(const string& packageName);
+ void learnClass(SchemaObjectClassImpl::Ptr cls);
+ void learnClass(SchemaEventClassImpl::Ptr cls);
+ bool haveClass(const SchemaClassKeyImpl& key) const;
};
}
@@ -393,6 +414,12 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
return event;
}
+BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
+ return event;
+}
+
void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
{
// Note that this function doesn't touch requestsOutstanding. This is because
@@ -415,6 +442,16 @@ void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
inBuffer.getShortString(package);
QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
+ console->learnPackage(package);
+
+ Mutex::ScopedLock _lock(lock);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(this));
+ incOutstandingLH();
+ Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
+ outBuffer.putShortString(package);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
}
void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
@@ -426,9 +463,30 @@ void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
seqMgr.release(seq);
}
-void BrokerProxyImpl::handleClassIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
{
- // TODO
+ string package;
+ string clsName;
+ SchemaHash hash;
+ uint8_t kind = inBuffer.getOctet();
+ inBuffer.getShortString(package);
+ inBuffer.getShortString(clsName);
+ hash.decode(inBuffer);
+ Uuid printableHash(hash.get());
+ SchemaClassKeyImpl classKey(package, clsName, hash);
+
+ QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
+
+ if (!console->haveClass(classKey)) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(this));
+ Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
+ classKey.encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str());
+ }
}
void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
@@ -446,9 +504,28 @@ void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq
// TODO
}
-void BrokerProxyImpl::handleSchemaResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
- // TODO
+void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
+{
+ SchemaObjectClassImpl::Ptr oClassPtr;
+ SchemaEventClassImpl::Ptr eClassPtr;
+ uint8_t kind = inBuffer.getOctet();
+ const SchemaClassKeyImpl* key;
+ if (kind == CLASS_OBJECT) {
+ oClassPtr.reset(new SchemaObjectClassImpl(inBuffer));
+ console->learnClass(oClassPtr);
+ key = oClassPtr->getClassKey()->impl;
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
+ } else if (kind == CLASS_EVENT) {
+ eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
+ console->learnClass(eClassPtr);
+ key = eClassPtr->getClassKey()->impl;
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str());
+ }
+ else {
+ QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
+ }
+
+ decOutstanding();
}
void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/)
@@ -456,9 +533,8 @@ void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*se
// TODO
}
-void BrokerProxyImpl::incOutstanding()
+void BrokerProxyImpl::incOutstandingLH()
{
- Mutex::ScopedLock _lock(lock);
requestsOutstanding++;
}
@@ -467,12 +543,14 @@ void BrokerProxyImpl::decOutstanding()
Mutex::ScopedLock _lock(lock);
requestsOutstanding--;
if (requestsOutstanding == 0 && !topicBound) {
+ topicBound = true;
for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
iter != console->bindingList.end(); iter++) {
string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
string key(iter->second);
eventQueue.push_back(eventBind(exchange, queueName, key));
}
+ eventQueue.push_back(eventStable());
}
}
@@ -528,57 +606,122 @@ void ConsoleEngineImpl::popEvent()
eventQueue.pop_front();
}
-void ConsoleEngineImpl::addConnection(BrokerProxy& /*broker*/, void* /*context*/)
+void ConsoleEngineImpl::addConnection(BrokerProxy& broker, void* /*context*/)
{
- // TODO
+ Mutex::ScopedLock _lock(lock);
+ brokerList.push_back(broker.impl);
}
-void ConsoleEngineImpl::delConnection(BrokerProxy& /*broker*/)
+void ConsoleEngineImpl::delConnection(BrokerProxy& broker)
{
- // TODO
+ Mutex::ScopedLock _lock(lock);
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ if (*iter == broker.impl) {
+ brokerList.erase(iter);
+ break;
+ }
}
uint32_t ConsoleEngineImpl::packageCount() const
{
- // TODO
- return 0;
+ Mutex::ScopedLock _lock(lock);
+ return packages.size();
}
-const string& ConsoleEngineImpl::getPackageName(uint32_t /*idx*/) const
+const string& ConsoleEngineImpl::getPackageName(uint32_t idx) const
{
- // TODO
- static string temp;
- return temp;
+ const static string empty;
+
+ Mutex::ScopedLock _lock(lock);
+ if (idx >= packages.size())
+ return empty;
+
+ PackageList::const_iterator iter = packages.begin();
+ for (uint32_t i = 0; i < idx; i++) iter++;
+ return iter->first;
}
-uint32_t ConsoleEngineImpl::classCount(const char* /*packageName*/) const
+uint32_t ConsoleEngineImpl::classCount(const char* packageName) const
{
- // TODO
- return 0;
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+
+ return oList.size() + eList.size();
}
-const SchemaClassKey* ConsoleEngineImpl::getClass(const char* /*packageName*/, uint32_t /*idx*/) const
+const SchemaClassKey* ConsoleEngineImpl::getClass(const char* packageName, uint32_t idx) const
{
- // TODO
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+ uint32_t count = 0;
+
+ for (ObjectClassList::const_iterator oIter = oList.begin();
+ oIter != oList.end(); oIter++) {
+ if (count == idx)
+ return oIter->second->getClassKey();
+ count++;
+ }
+
+ for (EventClassList::const_iterator eIter = eList.begin();
+ eIter != eList.end(); eIter++) {
+ if (count == idx)
+ return eIter->second->getClassKey();
+ count++;
+ }
+
return 0;
}
-ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey& /*key*/) const
+ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey* key) const
{
- // TODO
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return CLASS_OBJECT;
+
+ const EventClassList& eList = pIter->second.second;
+ if (eList.find(key->impl) != eList.end())
+ return CLASS_EVENT;
return CLASS_OBJECT;
}
-const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey& /*key*/) const
+const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey* key) const
{
- // TODO
- return 0;
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(key->impl);
+ if (iter == oList.end())
+ return 0;
+ return iter->second->envelope;
}
-const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey& /*key*/) const
+const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey* key) const
{
- // TODO
- return 0;
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const EventClassList& eList = pIter->second.second;
+ EventClassList::const_iterator iter = eList.find(key->impl);
+ if (iter == eList.end())
+ return 0;
+ return iter->second->envelope;
}
void ConsoleEngineImpl::bindPackage(const char* packageName)
@@ -587,18 +730,18 @@ void ConsoleEngineImpl::bindPackage(const char* packageName)
key << "console.obj.*.*." << packageName << ".#";
Mutex::ScopedLock _lock(lock);
bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
iter != brokerList.end(); iter++)
(*iter)->addBinding(QMF_EXCHANGE, key.str());
}
-void ConsoleEngineImpl::bindClass(const SchemaClassKey& classKey)
+void ConsoleEngineImpl::bindClass(const SchemaClassKey* classKey)
{
stringstream key;
- key << "console.obj.*.*." << classKey.getPackageName() << "." << classKey.getClassName() << ".#";
+ key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#";
Mutex::ScopedLock _lock(lock);
bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
iter != brokerList.end(); iter++)
(*iter)->addBinding(QMF_EXCHANGE, key.str());
}
@@ -609,7 +752,7 @@ void ConsoleEngineImpl::bindClass(const char* packageName, const char* className
key << "console.obj.*.*." << packageName << "." << className << ".#";
Mutex::ScopedLock _lock(lock);
bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
iter != brokerList.end(); iter++)
(*iter)->addBinding(QMF_EXCHANGE, key.str());
}
@@ -645,6 +788,53 @@ void ConsoleEngineImpl::endSync(SyncQuery& sync)
}
*/
+void ConsoleEngineImpl::learnPackage(const string& packageName)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (packages.find(packageName) == packages.end())
+ packages.insert(pair<string, pair<ObjectClassList, EventClassList> >
+ (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList())));
+}
+
+void ConsoleEngineImpl::learnClass(SchemaObjectClassImpl::Ptr cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ const SchemaClassKey* key = cls->getClassKey();
+ PackageList::iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return;
+
+ ObjectClassList& list = pIter->second.first;
+ if (list.find(key->impl) == list.end())
+ list[key->impl] = cls;
+}
+
+void ConsoleEngineImpl::learnClass(SchemaEventClassImpl::Ptr cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ const SchemaClassKey* key = cls->getClassKey();
+ PackageList::iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return;
+
+ EventClassList& list = pIter->second.second;
+ if (list.find(key->impl) == list.end())
+ list[key->impl] = cls;
+}
+
+bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key.getPackageName());
+ if (pIter == packages.end())
+ return false;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+
+ return oList.find(&key) != oList.end() || eList.find(&key) != eList.end();
+}
+
//==================================================================
// Wrappers
@@ -680,11 +870,11 @@ uint32_t ConsoleEngine::packageCount() const { return impl->packageCount(); }
const char* ConsoleEngine::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); }
uint32_t ConsoleEngine::classCount(const char* packageName) const { return impl->classCount(packageName); }
const SchemaClassKey* ConsoleEngine::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); }
-ClassKind ConsoleEngine::getClassKind(const SchemaClassKey& key) const { return impl->getClassKind(key); }
-const SchemaObjectClass* ConsoleEngine::getObjectClass(const SchemaClassKey& key) const { return impl->getObjectClass(key); }
-const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey& key) const { return impl->getEventClass(key); }
+ClassKind ConsoleEngine::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); }
+const SchemaObjectClass* ConsoleEngine::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); }
+const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); }
void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
-void ConsoleEngine::bindClass(const SchemaClassKey& key) { impl->bindClass(key); }
+void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); }
void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
uint32_t ConsoleEngine::agentCount() const { return impl->agentCount(); }
const AgentProxy* ConsoleEngine::getAgent(uint32_t idx) const { return impl->getAgent(idx); }