diff options
Diffstat (limited to 'cpp/src/qpid/console/SessionManager.cpp')
-rw-r--r-- | cpp/src/qpid/console/SessionManager.cpp | 52 |
1 files changed, 34 insertions, 18 deletions
diff --git a/cpp/src/qpid/console/SessionManager.cpp b/cpp/src/qpid/console/SessionManager.cpp index bd06421445..6aa347e051 100644 --- a/cpp/src/qpid/console/SessionManager.cpp +++ b/cpp/src/qpid/console/SessionManager.cpp @@ -34,18 +34,8 @@ using namespace std; using qpid::framing::Buffer; using qpid::framing::FieldTable; -SessionManager::SessionManager(ConsoleListener* _listener, - bool _rcvObjects, - bool _rcvEvents, - bool _rcvHeartbeats, - bool _manageConnections, - bool _userBindings) : - listener(_listener), - rcvObjects((listener != 0) && _rcvObjects), - rcvEvents((listener != 0) && _rcvEvents), - rcvHeartbeats((listener != 0) && _rcvHeartbeats), - userBindings(_userBindings), - manageConnections(_manageConnections) +SessionManager::SessionManager(ConsoleListener* _listener, Settings _settings) : + listener(_listener), settings(_settings) { bindingKeys(); } @@ -158,10 +148,13 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas if (_agent != 0) { agentList.push_back(_agent); + _agent->getBroker()->waitForStable(); } else { if (_broker != 0) { _broker->appendAgents(agentList); + _broker->waitForStable(); } else { + allBrokersStable(); Mutex::ScopedLock _lock(brokerListLock); for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) { (*iter)->appendAgents(agentList); @@ -198,7 +191,7 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas { Mutex::ScopedLock _lock(lock); while (!syncSequenceList.empty() && error.empty()) { - cv.wait(lock); // TODO put timeout in + cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC)); } } @@ -208,16 +201,16 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas void SessionManager::bindingKeys() { bindingKeyList.push_back("schema.#"); - if (rcvObjects && rcvEvents && rcvHeartbeats && !userBindings) { + if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) { bindingKeyList.push_back("console.#"); } else { - if (rcvObjects && !userBindings) + if (settings.rcvObjects && !settings.userBindings) bindingKeyList.push_back("console.obj.#"); else bindingKeyList.push_back("console.obj.*.*.org.apache.qpid.broker.agent"); - if (rcvEvents) + if (settings.rcvEvents) bindingKeyList.push_back("console.event.#"); - if (rcvHeartbeats) + if (settings.rcvHeartbeats) bindingKeyList.push_back("console.heartbeat"); } } @@ -356,8 +349,31 @@ void SessionManager::handleHeartbeatInd(Broker* /*broker*/, Buffer& /*inBuffer*/ { } -void SessionManager::handleEventInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/) +void SessionManager::handleEventInd(Broker* broker, Buffer& buffer, uint32_t /*sequence*/) { + string packageName; + string className; + uint8_t hash[16]; + SchemaClass* schemaClass; + + buffer.getShortString(packageName); + buffer.getShortString(className); + buffer.getBin128(hash); + + { + Mutex::ScopedLock l(lock); + map<string, Package*>::iterator pIter = packages.find(packageName); + if (pIter == packages.end()) + return; + schemaClass = pIter->second->getClass(className, hash); + if (schemaClass == 0) + return; + } + + Event event(broker, schemaClass, buffer); + + if (listener) + listener->event(event); } void SessionManager::handleSchemaResp(Broker* broker, Buffer& inBuffer, uint32_t sequence) |