summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/console/SessionManager.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-12-23 19:38:25 +0000
committerTed Ross <tross@apache.org>2008-12-23 19:38:25 +0000
commit12d7d4125a42a7f0ab26a89c3c34e88135cf5869 (patch)
tree5970de22eb3027833688156a4116d71ef6b44442 /cpp/src/qpid/console/SessionManager.cpp
parent912a6db37456524c60e1b7f3236de4dca3c77636 (diff)
downloadqpid-python-12d7d4125a42a7f0ab26a89c3c34e88135cf5869.tar.gz
QPID-1412 Updates and fixes for the c++ console API:
- Added event support - Converted raw pointers to shared_ptrs in references to Values. This fixes a memory leak in the original code. - Added wrappers to make value access more convenient. - Added timeout handling for synchronous operations. Timeout values are configurable. - Fixed a bug in getObjects whereby waitForStable was not called and the operation could fail if called too early. - Added examples "printevents" and "ping" to illustrate the usage of different aspects of the API. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@729075 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/console/SessionManager.cpp')
-rw-r--r--cpp/src/qpid/console/SessionManager.cpp52
1 files changed, 34 insertions, 18 deletions
diff --git a/cpp/src/qpid/console/SessionManager.cpp b/cpp/src/qpid/console/SessionManager.cpp
index bd06421445..6aa347e051 100644
--- a/cpp/src/qpid/console/SessionManager.cpp
+++ b/cpp/src/qpid/console/SessionManager.cpp
@@ -34,18 +34,8 @@ using namespace std;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
-SessionManager::SessionManager(ConsoleListener* _listener,
- bool _rcvObjects,
- bool _rcvEvents,
- bool _rcvHeartbeats,
- bool _manageConnections,
- bool _userBindings) :
- listener(_listener),
- rcvObjects((listener != 0) && _rcvObjects),
- rcvEvents((listener != 0) && _rcvEvents),
- rcvHeartbeats((listener != 0) && _rcvHeartbeats),
- userBindings(_userBindings),
- manageConnections(_manageConnections)
+SessionManager::SessionManager(ConsoleListener* _listener, Settings _settings) :
+ listener(_listener), settings(_settings)
{
bindingKeys();
}
@@ -158,10 +148,13 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas
if (_agent != 0) {
agentList.push_back(_agent);
+ _agent->getBroker()->waitForStable();
} else {
if (_broker != 0) {
_broker->appendAgents(agentList);
+ _broker->waitForStable();
} else {
+ allBrokersStable();
Mutex::ScopedLock _lock(brokerListLock);
for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) {
(*iter)->appendAgents(agentList);
@@ -198,7 +191,7 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas
{
Mutex::ScopedLock _lock(lock);
while (!syncSequenceList.empty() && error.empty()) {
- cv.wait(lock); // TODO put timeout in
+ cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC));
}
}
@@ -208,16 +201,16 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas
void SessionManager::bindingKeys()
{
bindingKeyList.push_back("schema.#");
- if (rcvObjects && rcvEvents && rcvHeartbeats && !userBindings) {
+ if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
bindingKeyList.push_back("console.#");
} else {
- if (rcvObjects && !userBindings)
+ if (settings.rcvObjects && !settings.userBindings)
bindingKeyList.push_back("console.obj.#");
else
bindingKeyList.push_back("console.obj.*.*.org.apache.qpid.broker.agent");
- if (rcvEvents)
+ if (settings.rcvEvents)
bindingKeyList.push_back("console.event.#");
- if (rcvHeartbeats)
+ if (settings.rcvHeartbeats)
bindingKeyList.push_back("console.heartbeat");
}
}
@@ -356,8 +349,31 @@ void SessionManager::handleHeartbeatInd(Broker* /*broker*/, Buffer& /*inBuffer*/
{
}
-void SessionManager::handleEventInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/)
+void SessionManager::handleEventInd(Broker* broker, Buffer& buffer, uint32_t /*sequence*/)
{
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ SchemaClass* schemaClass;
+
+ buffer.getShortString(packageName);
+ buffer.getShortString(className);
+ buffer.getBin128(hash);
+
+ {
+ Mutex::ScopedLock l(lock);
+ map<string, Package*>::iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return;
+ schemaClass = pIter->second->getClass(className, hash);
+ if (schemaClass == 0)
+ return;
+ }
+
+ Event event(broker, schemaClass, buffer);
+
+ if (listener)
+ listener->event(event);
}
void SessionManager::handleSchemaResp(Broker* broker, Buffer& inBuffer, uint32_t sequence)