diff options
author | Ted Ross <tross@apache.org> | 2009-09-02 12:32:56 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-09-02 12:32:56 +0000 |
commit | 93596b93d29ebcb76719a9e7a9e4793d4738dfed (patch) | |
tree | 199af52eab71373be7b5aca8433c9bb9251bd98e /qpid/cpp/src/qmf/ConsoleEngine.cpp | |
parent | 840ec0fa37770ac7fa35ee5b0dd7c7b891198f31 (diff) | |
download | qpid-python-93596b93d29ebcb76719a9e7a9e4793d4738dfed.tar.gz |
More QMF implementation:
- Added schema handling and exchange to the console
- Improved the connection performance by switching to pre-acquired mode
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@810482 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qmf/ConsoleEngine.cpp')
-rw-r--r-- | qpid/cpp/src/qmf/ConsoleEngine.cpp | 280 |
1 files changed, 235 insertions, 45 deletions
diff --git a/qpid/cpp/src/qmf/ConsoleEngine.cpp b/qpid/cpp/src/qmf/ConsoleEngine.cpp index 7620e875eb..3d1b378b68 100644 --- a/qpid/cpp/src/qmf/ConsoleEngine.cpp +++ b/qpid/cpp/src/qmf/ConsoleEngine.cpp @@ -137,6 +137,7 @@ namespace qmf { BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName); BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); BrokerEventImpl::Ptr eventSetupComplete(); + BrokerEventImpl::Ptr eventStable(); void handleBrokerResponse(Buffer& inBuffer, uint32_t seq); void handlePackageIndication(Buffer& inBuffer, uint32_t seq); @@ -147,7 +148,7 @@ namespace qmf { void handleEventIndication(Buffer& inBuffer, uint32_t seq); void handleSchemaResponse(Buffer& inBuffer, uint32_t seq); void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat); - void incOutstanding(); + void incOutstandingLH(); void decOutstanding(); }; @@ -178,12 +179,12 @@ namespace qmf { uint32_t classCount(const char* packageName) const; const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const; - ClassKind getClassKind(const SchemaClassKey& key) const; - const SchemaObjectClass* getObjectClass(const SchemaClassKey& key) const; - const SchemaEventClass* getEventClass(const SchemaClassKey& key) const; + ClassKind getClassKind(const SchemaClassKey* key) const; + const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const; + const SchemaEventClass* getEventClass(const SchemaClassKey* key) const; void bindPackage(const char* packageName); - void bindClass(const SchemaClassKey& key); + void bindClass(const SchemaClassKey* key); void bindClass(const char* packageName, const char* className); uint32_t agentCount() const; @@ -203,8 +204,28 @@ namespace qmf { const ConsoleSettings& settings; mutable Mutex lock; deque<ConsoleEventImpl::Ptr> eventQueue; - vector<BrokerProxyImpl::Ptr> brokerList; + vector<BrokerProxyImpl*> brokerList; vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE) + + // Declare a compare class for the class maps that compares the dereferenced + // class key pointers. The default behavior would be to compare the pointer + // addresses themselves. + struct KeyCompare { + bool operator()(const SchemaClassKeyImpl* left, const SchemaClassKeyImpl* right) const { + return *left < *right; + } + }; + + typedef map<const SchemaClassKeyImpl*, SchemaObjectClassImpl::Ptr, KeyCompare> ObjectClassList; + typedef map<const SchemaClassKeyImpl*, SchemaEventClassImpl::Ptr, KeyCompare> EventClassList; + typedef map<string, pair<ObjectClassList, EventClassList> > PackageList; + + PackageList packages; + + void learnPackage(const string& packageName); + void learnClass(SchemaObjectClassImpl::Ptr cls); + void learnClass(SchemaEventClassImpl::Ptr cls); + bool haveClass(const SchemaClassKeyImpl& key) const; }; } @@ -393,6 +414,12 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete() return event; } +BrokerEventImpl::Ptr BrokerProxyImpl::eventStable() +{ + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE)); + return event; +} + void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq) { // Note that this function doesn't touch requestsOutstanding. This is because @@ -415,6 +442,16 @@ void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq) inBuffer.getShortString(package); QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package); + console->learnPackage(package); + + Mutex::ScopedLock _lock(lock); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve(this)); + incOutstandingLH(); + Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence); + outBuffer.putShortString(package); + sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package); } void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq) @@ -426,9 +463,30 @@ void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq) seqMgr.release(seq); } -void BrokerProxyImpl::handleClassIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) +void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq) { - // TODO + string package; + string clsName; + SchemaHash hash; + uint8_t kind = inBuffer.getOctet(); + inBuffer.getShortString(package); + inBuffer.getShortString(clsName); + hash.decode(inBuffer); + Uuid printableHash(hash.get()); + SchemaClassKeyImpl classKey(package, clsName, hash); + + QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str()); + + if (!console->haveClass(classKey)) { + Mutex::ScopedLock _lock(lock); + incOutstandingLH(); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve(this)); + Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence); + classKey.encode(outBuffer); + sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str()); + } } void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/) @@ -446,9 +504,28 @@ void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq // TODO } -void BrokerProxyImpl::handleSchemaResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/) -{ - // TODO +void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) +{ + SchemaObjectClassImpl::Ptr oClassPtr; + SchemaEventClassImpl::Ptr eClassPtr; + uint8_t kind = inBuffer.getOctet(); + const SchemaClassKeyImpl* key; + if (kind == CLASS_OBJECT) { + oClassPtr.reset(new SchemaObjectClassImpl(inBuffer)); + console->learnClass(oClassPtr); + key = oClassPtr->getClassKey()->impl; + QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str()); + } else if (kind == CLASS_EVENT) { + eClassPtr.reset(new SchemaEventClassImpl(inBuffer)); + console->learnClass(eClassPtr); + key = eClassPtr->getClassKey()->impl; + QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str()); + } + else { + QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind); + } + + decOutstanding(); } void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/) @@ -456,9 +533,8 @@ void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*se // TODO } -void BrokerProxyImpl::incOutstanding() +void BrokerProxyImpl::incOutstandingLH() { - Mutex::ScopedLock _lock(lock); requestsOutstanding++; } @@ -467,12 +543,14 @@ void BrokerProxyImpl::decOutstanding() Mutex::ScopedLock _lock(lock); requestsOutstanding--; if (requestsOutstanding == 0 && !topicBound) { + topicBound = true; for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin(); iter != console->bindingList.end(); iter++) { string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first); string key(iter->second); eventQueue.push_back(eventBind(exchange, queueName, key)); } + eventQueue.push_back(eventStable()); } } @@ -528,57 +606,122 @@ void ConsoleEngineImpl::popEvent() eventQueue.pop_front(); } -void ConsoleEngineImpl::addConnection(BrokerProxy& /*broker*/, void* /*context*/) +void ConsoleEngineImpl::addConnection(BrokerProxy& broker, void* /*context*/) { - // TODO + Mutex::ScopedLock _lock(lock); + brokerList.push_back(broker.impl); } -void ConsoleEngineImpl::delConnection(BrokerProxy& /*broker*/) +void ConsoleEngineImpl::delConnection(BrokerProxy& broker) { - // TODO + Mutex::ScopedLock _lock(lock); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + if (*iter == broker.impl) { + brokerList.erase(iter); + break; + } } uint32_t ConsoleEngineImpl::packageCount() const { - // TODO - return 0; + Mutex::ScopedLock _lock(lock); + return packages.size(); } -const string& ConsoleEngineImpl::getPackageName(uint32_t /*idx*/) const +const string& ConsoleEngineImpl::getPackageName(uint32_t idx) const { - // TODO - static string temp; - return temp; + const static string empty; + + Mutex::ScopedLock _lock(lock); + if (idx >= packages.size()) + return empty; + + PackageList::const_iterator iter = packages.begin(); + for (uint32_t i = 0; i < idx; i++) iter++; + return iter->first; } -uint32_t ConsoleEngineImpl::classCount(const char* /*packageName*/) const +uint32_t ConsoleEngineImpl::classCount(const char* packageName) const { - // TODO - return 0; + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(packageName); + if (pIter == packages.end()) + return 0; + + const ObjectClassList& oList = pIter->second.first; + const EventClassList& eList = pIter->second.second; + + return oList.size() + eList.size(); } -const SchemaClassKey* ConsoleEngineImpl::getClass(const char* /*packageName*/, uint32_t /*idx*/) const +const SchemaClassKey* ConsoleEngineImpl::getClass(const char* packageName, uint32_t idx) const { - // TODO + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(packageName); + if (pIter == packages.end()) + return 0; + + const ObjectClassList& oList = pIter->second.first; + const EventClassList& eList = pIter->second.second; + uint32_t count = 0; + + for (ObjectClassList::const_iterator oIter = oList.begin(); + oIter != oList.end(); oIter++) { + if (count == idx) + return oIter->second->getClassKey(); + count++; + } + + for (EventClassList::const_iterator eIter = eList.begin(); + eIter != eList.end(); eIter++) { + if (count == idx) + return eIter->second->getClassKey(); + count++; + } + return 0; } -ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey& /*key*/) const +ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey* key) const { - // TODO + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return CLASS_OBJECT; + + const EventClassList& eList = pIter->second.second; + if (eList.find(key->impl) != eList.end()) + return CLASS_EVENT; return CLASS_OBJECT; } -const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey& /*key*/) const +const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey* key) const { - // TODO - return 0; + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return 0; + + const ObjectClassList& oList = pIter->second.first; + ObjectClassList::const_iterator iter = oList.find(key->impl); + if (iter == oList.end()) + return 0; + return iter->second->envelope; } -const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey& /*key*/) const +const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey* key) const { - // TODO - return 0; + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return 0; + + const EventClassList& eList = pIter->second.second; + EventClassList::const_iterator iter = eList.find(key->impl); + if (iter == eList.end()) + return 0; + return iter->second->envelope; } void ConsoleEngineImpl::bindPackage(const char* packageName) @@ -587,18 +730,18 @@ void ConsoleEngineImpl::bindPackage(const char* packageName) key << "console.obj.*.*." << packageName << ".#"; Mutex::ScopedLock _lock(lock); bindingList.push_back(pair<string, string>(string(), key.str())); - for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin(); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); iter != brokerList.end(); iter++) (*iter)->addBinding(QMF_EXCHANGE, key.str()); } -void ConsoleEngineImpl::bindClass(const SchemaClassKey& classKey) +void ConsoleEngineImpl::bindClass(const SchemaClassKey* classKey) { stringstream key; - key << "console.obj.*.*." << classKey.getPackageName() << "." << classKey.getClassName() << ".#"; + key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#"; Mutex::ScopedLock _lock(lock); bindingList.push_back(pair<string, string>(string(), key.str())); - for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin(); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); iter != brokerList.end(); iter++) (*iter)->addBinding(QMF_EXCHANGE, key.str()); } @@ -609,7 +752,7 @@ void ConsoleEngineImpl::bindClass(const char* packageName, const char* className key << "console.obj.*.*." << packageName << "." << className << ".#"; Mutex::ScopedLock _lock(lock); bindingList.push_back(pair<string, string>(string(), key.str())); - for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin(); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); iter != brokerList.end(); iter++) (*iter)->addBinding(QMF_EXCHANGE, key.str()); } @@ -645,6 +788,53 @@ void ConsoleEngineImpl::endSync(SyncQuery& sync) } */ +void ConsoleEngineImpl::learnPackage(const string& packageName) +{ + Mutex::ScopedLock _lock(lock); + if (packages.find(packageName) == packages.end()) + packages.insert(pair<string, pair<ObjectClassList, EventClassList> > + (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList()))); +} + +void ConsoleEngineImpl::learnClass(SchemaObjectClassImpl::Ptr cls) +{ + Mutex::ScopedLock _lock(lock); + const SchemaClassKey* key = cls->getClassKey(); + PackageList::iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return; + + ObjectClassList& list = pIter->second.first; + if (list.find(key->impl) == list.end()) + list[key->impl] = cls; +} + +void ConsoleEngineImpl::learnClass(SchemaEventClassImpl::Ptr cls) +{ + Mutex::ScopedLock _lock(lock); + const SchemaClassKey* key = cls->getClassKey(); + PackageList::iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return; + + EventClassList& list = pIter->second.second; + if (list.find(key->impl) == list.end()) + list[key->impl] = cls; +} + +bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key.getPackageName()); + if (pIter == packages.end()) + return false; + + const ObjectClassList& oList = pIter->second.first; + const EventClassList& eList = pIter->second.second; + + return oList.find(&key) != oList.end() || eList.find(&key) != eList.end(); +} + //================================================================== // Wrappers @@ -680,11 +870,11 @@ uint32_t ConsoleEngine::packageCount() const { return impl->packageCount(); } const char* ConsoleEngine::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); } uint32_t ConsoleEngine::classCount(const char* packageName) const { return impl->classCount(packageName); } const SchemaClassKey* ConsoleEngine::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); } -ClassKind ConsoleEngine::getClassKind(const SchemaClassKey& key) const { return impl->getClassKind(key); } -const SchemaObjectClass* ConsoleEngine::getObjectClass(const SchemaClassKey& key) const { return impl->getObjectClass(key); } -const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey& key) const { return impl->getEventClass(key); } +ClassKind ConsoleEngine::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); } +const SchemaObjectClass* ConsoleEngine::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); } +const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); } void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); } -void ConsoleEngine::bindClass(const SchemaClassKey& key) { impl->bindClass(key); } +void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); } void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); } uint32_t ConsoleEngine::agentCount() const { return impl->agentCount(); } const AgentProxy* ConsoleEngine::getAgent(uint32_t idx) const { return impl->getAgent(idx); } |