diff options
author | Ted Ross <tross@apache.org> | 2008-12-23 19:38:25 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-12-23 19:38:25 +0000 |
commit | 564775ae717db7743d41f7620cde8845dc7d35b8 (patch) | |
tree | 8e5f5b8985a5428ce766ef05161887b256615a88 | |
parent | 1ac4193fbf6dcbf1e95da4250b6d3bb1d928ab2f (diff) | |
download | qpid-python-564775ae717db7743d41f7620cde8845dc7d35b8.tar.gz |
QPID-1412 Updates and fixes for the c++ console API:
- Added event support
- Converted raw pointers to shared_ptrs in references to Values.
This fixes a memory leak in the original code.
- Added wrappers to make value access more convenient.
- Added timeout handling for synchronous operations.
Timeout values are configurable.
- Fixed a bug in getObjects whereby waitForStable was not called and
the operation could fail if called too early.
- Added examples "printevents" and "ping" to illustrate the usage of
different aspects of the API.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@729075 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/examples/qmf-console/Makefile.am | 14 | ||||
-rw-r--r-- | qpid/cpp/examples/qmf-console/console.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/examples/qmf-console/ping.cpp | 129 | ||||
-rw-r--r-- | qpid/cpp/examples/qmf-console/printevents.cpp | 105 | ||||
-rw-r--r-- | qpid/cpp/src/qmfc.mk | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/Broker.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/Broker.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/ConsoleListener.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/Event.cpp | 205 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/Event.h | 49 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/Object.cpp | 127 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/Object.h | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/Schema.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/Schema.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/SessionManager.cpp | 52 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/SessionManager.h | 28 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/Value.cpp | 42 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/console/Value.h | 6 |
18 files changed, 706 insertions, 113 deletions
diff --git a/qpid/cpp/examples/qmf-console/Makefile.am b/qpid/cpp/examples/qmf-console/Makefile.am index 05db50f734..e115fc0fb0 100644 --- a/qpid/cpp/examples/qmf-console/Makefile.am +++ b/qpid/cpp/examples/qmf-console/Makefile.am @@ -22,12 +22,20 @@ examplesdir=$(pkgdatadir)/examples/qmf-console MAKELDFLAG = qmfconsole include $(top_srcdir)/examples/makedist.mk -noinst_PROGRAMS=qmfc +noinst_PROGRAMS=console printevents ping -qmfc_SOURCES=console.cpp -qmfc_LDADD=$(CONSOLE_LIB) +console_SOURCES=console.cpp +console_LDADD=$(CONSOLE_LIB) + +printevents_SOURCES=printevents.cpp +printevents_LDADD=$(CONSOLE_LIB) + +ping_SOURCES=ping.cpp +ping_LDADD=$(CONSOLE_LIB) examples_DATA= \ console.cpp \ + printevents.cpp \ + ping.cpp \ $(MAKEDIST) diff --git a/qpid/cpp/examples/qmf-console/console.cpp b/qpid/cpp/examples/qmf-console/console.cpp index c98f1ace34..5700d5556f 100644 --- a/qpid/cpp/examples/qmf-console/console.cpp +++ b/qpid/cpp/examples/qmf-console/console.cpp @@ -21,17 +21,9 @@ #include "qpid/console/ConsoleListener.h" #include "qpid/console/SessionManager.h" -#include "qpid/console/Value.h" -#include <unistd.h> -#include <cstdlib> -#include <iostream> - -#include <sstream> using namespace std; using namespace qpid::console; -using std::cout; -using std::endl; class Listener : public ConsoleListener { public: @@ -68,6 +60,10 @@ public: void objectStats(Broker& broker, Object& object) { cout << "objectStats: broker=" << broker << " object=" << object << endl; } + + void event(Event& event) { + cout << "event: " << event << endl; + } }; @@ -127,8 +123,8 @@ int main_int(int /*argc*/, char** /*argv*/) Object::AttributeMap args; MethodResponse result; - args["sequence"] = new UintValue(1); - args["body"] = new StringValue("Testing..."); + args.addUint("sequence", 1); + args.addString("body", "Testing..."); cout << "Call echo method..." << endl; broker.invokeMethod("echo", args, result); diff --git a/qpid/cpp/examples/qmf-console/ping.cpp b/qpid/cpp/examples/qmf-console/ping.cpp new file mode 100644 index 0000000000..debca7428a --- /dev/null +++ b/qpid/cpp/examples/qmf-console/ping.cpp @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/console/SessionManager.h" + +using namespace std; +using namespace qpid::console; + +//============================================================== +// Main program +//============================================================== +int main_int(int /*argc*/, char** /*argv*/) +{ + // + // Declare connection settings for the messaging broker. The settings default to + // localhost:5672 with user guest (password guest). Refer to the header file + // <qpid/client/ConnectionSettings.h> for full details. + // + qpid::client::ConnectionSettings connSettings; + + // + // Declare the (optional) session manager settings. Override the default timeout + // for methods calls to be 2 seconds. + // + SessionManager::Settings smSettings; + smSettings.methodTimeout = 2; + + // + // Declare the console session manager. With a null listener argument, it defaults to + // synchronous-only access mode. + // + SessionManager sm(0, smSettings); + + // + // Add a broker connection to the session manager. + // + Broker* broker = sm.addBroker(connSettings); + + uint32_t count = 5; // The number of echo requests we will send to the broker. + Object::Vector list; // A container for holding objects retrieved from the broker. + + // + // Query for a list of 'broker' objects from the Management Database + // + sm.getObjects(list, "broker"); + + // + // We expect one object (since we are connected to only one broker) + // + if (list.size() == 1) { + Object& brokerObject = *(list.begin()); + + for (uint32_t iter = 0; iter < count; iter++) { + // + // Declare a container for arguments to be sent with the "echo" method + // that we will invoke on the remote "broker" object. + // + Object::AttributeMap args; + + // + // Declare a container for the result of the method invocation. + // + MethodResponse result; + + // + // Set the values of the input arguments. + // + args.addUint("sequence", iter); + args.addString("body", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"); + + cout << "Ping Broker: " << broker->getUrl() << "... "; + cout.flush(); + + // + // Invoke the method. This is a synchronous operation that will block until + // the method completes and returns a result. + // + brokerObject.invokeMethod("echo", args, result); + + // + // result.code is the return code (0 => Success) + // result.text is the return text + // result.arguments is a container (of type Object::AttributeMap) that holds + // the output arguments (if any) from the method. + // + cout << "Result: code=" << result.code << " text=" << result.text; + if (result.code == 0) + cout << " seq=" << result.arguments["sequence"]->asUint(); + cout << endl; + + if (result.code == 0 && iter < count - 1) + ::sleep(1); + } + } + + // + // Disconnect the broker from the session manager. + // + sm.delBroker(broker); + return 0; +} + +int main(int argc, char** argv) +{ + try { + return main_int(argc, argv); + } catch(std::exception& e) { + cout << "Top Level Exception: " << e.what() << endl; + } +} + diff --git a/qpid/cpp/examples/qmf-console/printevents.cpp b/qpid/cpp/examples/qmf-console/printevents.cpp new file mode 100644 index 0000000000..bbec2c1af0 --- /dev/null +++ b/qpid/cpp/examples/qmf-console/printevents.cpp @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/console/ConsoleListener.h" +#include "qpid/console/SessionManager.h" +#include "qpid/sys/Time.h" + +using namespace std; +using namespace qpid::console; + +// +// Define a listener class to receive asynchronous events. +// +class Listener : public ConsoleListener { +public: + void brokerConnected(const Broker& broker) { + cout << qpid::sys::now() << " NOTIC qpid-printevents:brokerConnected broker=" << + broker.getUrl() << endl; + } + + void brokerDisconnected(const Broker& broker) { + cout << qpid::sys::now() << " NOTIC qpid-printevents:brokerDisonnected broker=" << + broker.getUrl() << endl; + } + + void event(Event& event) { + cout << event << endl; + } +}; + + +//============================================================== +// Main program +//============================================================== +int main_int(int /*argc*/, char** /*argv*/) +{ + // + // Declare an instance of our listener. + // + Listener listener; + + // + // Declare connection settings for the messaging broker. The settings default to + // localhost:5672 with user guest (password guest). Refer to the header file + // <qpid/client/ConnectionSettings.h> for full details. + // + qpid::client::ConnectionSettings connSettings; + + // + // Declare the (optional) session manager settings. Disable the reception of + // object updates and heartbeats. Note that by disabling the reception of things + // we don't need, we don't unnecessarily use network bandwidth. + // + SessionManager::Settings smSettings; + smSettings.rcvObjects = false; + smSettings.rcvHeartbeats = false; + + // + // Declare the console session manager. + // + SessionManager sm(&listener, smSettings); + + // + // Add a broker connection to the session manager. If desired, multiple brokers may + // be connected. + // + Broker* broker = sm.addBroker(connSettings); + + // + // Sleep indefinitely while asynchronous events are handled by the listener. + // + for (;;) + ::sleep(1); + + sm.delBroker(broker); + return 0; +} + +int main(int argc, char** argv) +{ + try { + return main_int(argc, argv); + } catch(std::exception& e) { + cout << "Top Level Exception: " << e.what() << endl; + } +} + diff --git a/qpid/cpp/src/qmfc.mk b/qpid/cpp/src/qmfc.mk index 6fa7d4c988..089ac2dcd9 100644 --- a/qpid/cpp/src/qmfc.mk +++ b/qpid/cpp/src/qmfc.mk @@ -31,6 +31,7 @@ libqmfconsole_la_SOURCES = \ qpid/console/ClassKey.cpp \ qpid/console/ConsoleListener.h \ qpid/console/Event.h \ + qpid/console/Event.cpp \ qpid/console/Object.h \ qpid/console/Object.cpp \ qpid/console/ObjectId.h \ diff --git a/qpid/cpp/src/qpid/console/Broker.cpp b/qpid/cpp/src/qpid/console/Broker.cpp index 2e7ba95b1d..c6b1be1d31 100644 --- a/qpid/cpp/src/qpid/console/Broker.cpp +++ b/qpid/cpp/src/qpid/console/Broker.cpp @@ -59,6 +59,13 @@ Broker::~Broker() { } +string Broker::getUrl() const +{ + stringstream url; + url << connectionSettings.host << ":" << connectionSettings.port; + return url.str(); +} + void Broker::encodeHeader(framing::Buffer& buf, uint8_t opcode, uint32_t seq) const { buf.putOctet('A'); @@ -249,7 +256,9 @@ void Broker::waitForStable() return; syncInFlight = true; while (reqsOutstanding != 0) { - cond.wait(lock); // TODO: put timeout delay in here! + bool result = cond.wait(lock, AbsTime(now(), TIME_SEC * sessionManager.settings.getTimeout)); + if (!result) + throw(Exception("Timed out waiting for broker to synchronize")); } } diff --git a/qpid/cpp/src/qpid/console/Broker.h b/qpid/cpp/src/qpid/console/Broker.h index 8cca99f764..9df2380dff 100644 --- a/qpid/cpp/src/qpid/console/Broker.h +++ b/qpid/cpp/src/qpid/console/Broker.h @@ -61,6 +61,7 @@ namespace console { void addBinding(const std::string& key) { connThreadBody.bindExchange("qpid.management", key); } + std::string getUrl() const; private: friend class SessionManager; diff --git a/qpid/cpp/src/qpid/console/ConsoleListener.h b/qpid/cpp/src/qpid/console/ConsoleListener.h index 97fdf158cd..d0db6034f6 100644 --- a/qpid/cpp/src/qpid/console/ConsoleListener.h +++ b/qpid/cpp/src/qpid/console/ConsoleListener.h @@ -25,6 +25,7 @@ #include "Broker.h" #include "ClassKey.h" #include "Object.h" +#include "Event.h" namespace qpid { namespace console { @@ -74,7 +75,7 @@ namespace console { /** Invoked when an event is raised. */ - //virtual void event(Broker&, Event) {} + virtual void event(Event&) {} /** */ diff --git a/qpid/cpp/src/qpid/console/Event.cpp b/qpid/cpp/src/qpid/console/Event.cpp new file mode 100644 index 0000000000..51f043159c --- /dev/null +++ b/qpid/cpp/src/qpid/console/Event.cpp @@ -0,0 +1,205 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Broker.h" +#include "ClassKey.h" +#include "Schema.h" +#include "Event.h" +#include "Value.h" +#include "qpid/sys/Time.h" +#include "qpid/framing/Buffer.h" + +using namespace qpid::console; +using namespace std; +using qpid::framing::Uuid; +using qpid::framing::FieldTable; + +Event::Event(Broker* _broker, SchemaClass* _schema, framing::Buffer& buffer) : + broker(_broker), schema(_schema) +{ + timestamp = buffer.getLongLong(); + severity = (Severity) buffer.getOctet(); + for (vector<SchemaArgument*>::const_iterator aIter = schema->arguments.begin(); + aIter != schema->arguments.end(); aIter++) { + SchemaArgument* argument = *aIter; + attributes[argument->name] = argument->decodeValue(buffer); + } +} + +const ClassKey& Event::getClassKey() const +{ + return schema->getClassKey(); +} + +string Event::getSeverityString() const +{ + switch (severity) { + case EMERGENCY : return string("EMER"); + case ALERT : return string("ALERT"); + case CRITICAL : return string("CRIT"); + case ERROR : return string("ERROR"); + case WARNING : return string("WARN"); + case NOTICE : return string("NOTIC"); + case INFO : return string("INFO"); + case DEBUG : return string("DEBUG"); + } + return string("<UNKNOWN>"); +} + +ObjectId Event::attrRef(const string& key) const +{ + Object::AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return ObjectId(); + Value::Ptr val = iter->second; + if (!val->isObjectId()) + return ObjectId(); + return val->asObjectId(); +} + +uint32_t Event::attrUint(const string& key) const +{ + Object::AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0; + Value::Ptr val = iter->second; + if (!val->isUint()) + return 0; + return val->asUint(); +} + +int32_t Event::attrInt(const string& key) const +{ + Object::AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0; + Value::Ptr val = iter->second; + if (!val->isInt()) + return 0; + return val->asInt(); +} + +uint64_t Event::attrUint64(const string& key) const +{ + Object::AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0; + Value::Ptr val = iter->second; + if (!val->isUint64()) + return 0; + return val->asUint64(); +} + +int64_t Event::attrInt64(const string& key) const +{ + Object::AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0; + Value::Ptr val = iter->second; + if (!val->isInt64()) + return 0; + return val->asInt64(); +} + +string Event::attrString(const string& key) const +{ + Object::AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return string(); + Value::Ptr val = iter->second; + if (!val->isString()) + return string(); + return val->asString(); +} + +bool Event::attrBool(const string& key) const +{ + Object::AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return false; + Value::Ptr val = iter->second; + if (!val->isBool()) + return false; + return val->asBool(); +} + +float Event::attrFloat(const string& key) const +{ + Object::AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0.0; + Value::Ptr val = iter->second; + if (!val->isFloat()) + return 0.0; + return val->asFloat(); +} + +double Event::attrDouble(const string& key) const +{ + Object::AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return 0.0; + Value::Ptr val = iter->second; + if (!val->isDouble()) + return 0.0; + return val->asDouble(); +} + +Uuid Event::attrUuid(const string& key) const +{ + Object::AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return Uuid(); + Value::Ptr val = iter->second; + if (!val->isUuid()) + return Uuid(); + return val->asUuid(); +} + +FieldTable Event::attrMap(const string& key) const +{ + Object::AttributeMap::const_iterator iter = attributes.find(key); + if (iter == attributes.end()) + return FieldTable(); + Value::Ptr val = iter->second; + if (!val->isMap()) + return FieldTable(); + return val->asMap(); +} + + +std::ostream& qpid::console::operator<<(std::ostream& o, const Event& event) +{ + const ClassKey& key = event.getClassKey(); + sys::AbsTime aTime(sys::AbsTime(), sys::Duration(event.getTimestamp())); + o << aTime << " " << event.getSeverityString() << " " << + key.getPackageName() << ":" << key.getClassName() << + " broker=" << event.getBroker()->getUrl(); + + const Object::AttributeMap& attributes = event.getAttributes(); + for (Object::AttributeMap::const_iterator iter = attributes.begin(); + iter != attributes.end(); iter++) { + o << " " << iter->first << "=" << iter->second->str(); + } + return o; +} + + diff --git a/qpid/cpp/src/qpid/console/Event.h b/qpid/cpp/src/qpid/console/Event.h index 7627a4264d..c212b72889 100644 --- a/qpid/cpp/src/qpid/console/Event.h +++ b/qpid/cpp/src/qpid/console/Event.h @@ -21,18 +21,61 @@ #ifndef _QPID_CONSOLE_EVENT_H_ #define _QPID_CONSOLE_EVENT_H_ -#include <string> -#include "Message.h" +#include "Object.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/FieldTable.h" namespace qpid { +namespace framing { + class Buffer; +} namespace console { + class Broker; + class SchemaClass; + class ClassKey; + /** * - * \ingroup qpidconsoleapi + * \ingroup qmfconsoleapi */ class Event { + public: + typedef enum { + EMERGENCY = 0, ALERT = 1, CRITICAL = 2, ERROR = 3, WARNING = 4, + NOTICE = 5, INFO = 6, DEBUG = 7 + } Severity; + + Event(Broker* broker, SchemaClass* schemaClass, framing::Buffer& buffer); + Broker* getBroker() const { return broker; } + const ClassKey& getClassKey() const; + SchemaClass* getSchema() const { return schema; } + const Object::AttributeMap& getAttributes() const { return attributes; } + uint64_t getTimestamp() const { return timestamp; } + uint8_t getSeverity() const { return severity; } + std::string getSeverityString() const; + + ObjectId attrRef(const std::string& key) const; + uint32_t attrUint(const std::string& key) const; + int32_t attrInt(const std::string& key) const; + uint64_t attrUint64(const std::string& key) const; + int64_t attrInt64(const std::string& key) const; + std::string attrString(const std::string& key) const; + bool attrBool(const std::string& key) const; + float attrFloat(const std::string& key) const; + double attrDouble(const std::string& key) const; + framing::Uuid attrUuid(const std::string& key) const; + framing::FieldTable attrMap(const std::string& key) const; + + private: + Broker* broker; + SchemaClass* schema; + uint64_t timestamp; + Severity severity; + Object::AttributeMap attributes; }; + + std::ostream& operator<<(std::ostream& o, const Event& event); } } diff --git a/qpid/cpp/src/qpid/console/Object.cpp b/qpid/cpp/src/qpid/console/Object.cpp index 1ca70a616a..da8ab962e0 100644 --- a/qpid/cpp/src/qpid/console/Object.cpp +++ b/qpid/cpp/src/qpid/console/Object.cpp @@ -29,10 +29,65 @@ #include "qpid/sys/Mutex.h" using namespace qpid::console; +using namespace qpid::sys; using namespace std; using qpid::framing::Uuid; using qpid::framing::FieldTable; -using qpid::sys::Mutex; + +void Object::AttributeMap::addRef(const string& key, const ObjectId& val) +{ + (*this)[key] = Value::Ptr(new RefValue(val)); +} + +void Object::AttributeMap::addUint(const string& key, uint32_t val) +{ + (*this)[key] = Value::Ptr(new UintValue(val)); +} + +void Object::AttributeMap::addInt(const string& key, int32_t val) +{ + (*this)[key] = Value::Ptr(new IntValue(val)); +} + +void Object::AttributeMap::addUint64(const string& key, uint64_t val) +{ + (*this)[key] = Value::Ptr(new Uint64Value(val)); +} + +void Object::AttributeMap::addInt64(const string& key, int64_t val) +{ + (*this)[key] = Value::Ptr(new Int64Value(val)); +} + +void Object::AttributeMap::addString(const string& key, const string& val) +{ + (*this)[key] = Value::Ptr(new StringValue(val)); +} + +void Object::AttributeMap::addBool(const string& key, bool val) +{ + (*this)[key] = Value::Ptr(new BoolValue(val)); +} + +void Object::AttributeMap::addFloat(const string& key, float val) +{ + (*this)[key] = Value::Ptr(new FloatValue(val)); +} + +void Object::AttributeMap::addDouble(const string& key, double val) +{ + (*this)[key] = Value::Ptr(new DoubleValue(val)); +} + +void Object::AttributeMap::addUuid(const string& key, const framing::Uuid& val) +{ + (*this)[key] = Value::Ptr(new UuidValue(val)); +} + +void Object::AttributeMap::addMap(const string& key, const framing::FieldTable& val) +{ + (*this)[key] = Value::Ptr(new MapValue(val)); +} Object::Object(Broker* b, SchemaClass* s, framing::Buffer& buffer, bool prop, bool stat) : broker(b), schema(s), pendingMethod(0) @@ -49,7 +104,7 @@ Object::Object(Broker* b, SchemaClass* s, framing::Buffer& buffer, bool prop, bo pIter != schema->properties.end(); pIter++) { SchemaProperty* property = *pIter; if (excludes.count(property->name) != 0) { - attributes[property->name] = new NullValue(); + attributes[property->name] = Value::Ptr(new NullValue()); } else { attributes[property->name] = property->decodeValue(buffer); } @@ -65,19 +120,14 @@ Object::Object(Broker* b, SchemaClass* s, framing::Buffer& buffer, bool prop, bo } } -Object::~Object() -{ - // for (AttributeMap::iterator iter = attributes.begin(); iter != attributes.end(); iter++) - // delete iter->second; - // attributes.clear(); -} +Object::~Object() {} const ClassKey& Object::getClassKey() const { return schema->getClassKey(); } -std::string Object::getIndex() const +string Object::getIndex() const { string result; @@ -139,9 +189,18 @@ void Object::invokeMethod(const string name, const AttributeMap& args, MethodRes { Mutex::ScopedLock l(broker->lock); - while (pendingMethod != 0) - broker->cond.wait(broker->lock); - result = methodResponse; + bool ok = true; + while (pendingMethod != 0 && ok) { + ok = broker->cond.wait(broker->lock, AbsTime(now(), broker->sessionManager.settings.methodTimeout * TIME_SEC)); + } + + if (!ok) { + result.code = 0x1001; + result.text.assign("Method call timed out"); + result.arguments.clear(); + } else { + result = methodResponse; + } } } } @@ -169,122 +228,122 @@ void Object::handleMethodResp(framing::Buffer& buffer, uint32_t sequence) } } -ObjectId Object::attrRef(const std::string& key) const +ObjectId Object::attrRef(const string& key) const { AttributeMap::const_iterator iter = attributes.find(key); if (iter == attributes.end()) return ObjectId(); - Value* val = iter->second; + Value::Ptr val = iter->second; if (!val->isObjectId()) return ObjectId(); return val->asObjectId(); } -uint32_t Object::attrUint(const std::string& key) const +uint32_t Object::attrUint(const string& key) const { AttributeMap::const_iterator iter = attributes.find(key); if (iter == attributes.end()) return 0; - Value* val = iter->second; + Value::Ptr val = iter->second; if (!val->isUint()) return 0; return val->asUint(); } -int32_t Object::attrInt(const std::string& key) const +int32_t Object::attrInt(const string& key) const { AttributeMap::const_iterator iter = attributes.find(key); if (iter == attributes.end()) return 0; - Value* val = iter->second; + Value::Ptr val = iter->second; if (!val->isInt()) return 0; return val->asInt(); } -uint64_t Object::attrUint64(const std::string& key) const +uint64_t Object::attrUint64(const string& key) const { AttributeMap::const_iterator iter = attributes.find(key); if (iter == attributes.end()) return 0; - Value* val = iter->second; + Value::Ptr val = iter->second; if (!val->isUint64()) return 0; return val->asUint64(); } -int64_t Object::attrInt64(const std::string& key) const +int64_t Object::attrInt64(const string& key) const { AttributeMap::const_iterator iter = attributes.find(key); if (iter == attributes.end()) return 0; - Value* val = iter->second; + Value::Ptr val = iter->second; if (!val->isInt64()) return 0; return val->asInt64(); } -string Object::attrString(const std::string& key) const +string Object::attrString(const string& key) const { AttributeMap::const_iterator iter = attributes.find(key); if (iter == attributes.end()) return string(); - Value* val = iter->second; + Value::Ptr val = iter->second; if (!val->isString()) return string(); return val->asString(); } -bool Object::attrBool(const std::string& key) const +bool Object::attrBool(const string& key) const { AttributeMap::const_iterator iter = attributes.find(key); if (iter == attributes.end()) return false; - Value* val = iter->second; + Value::Ptr val = iter->second; if (!val->isBool()) return false; return val->asBool(); } -float Object::attrFloat(const std::string& key) const +float Object::attrFloat(const string& key) const { AttributeMap::const_iterator iter = attributes.find(key); if (iter == attributes.end()) return 0.0; - Value* val = iter->second; + Value::Ptr val = iter->second; if (!val->isFloat()) return 0.0; return val->asFloat(); } -double Object::attrDouble(const std::string& key) const +double Object::attrDouble(const string& key) const { AttributeMap::const_iterator iter = attributes.find(key); if (iter == attributes.end()) return 0.0; - Value* val = iter->second; + Value::Ptr val = iter->second; if (!val->isDouble()) return 0.0; return val->asDouble(); } -Uuid Object::attrUuid(const std::string& key) const +Uuid Object::attrUuid(const string& key) const { AttributeMap::const_iterator iter = attributes.find(key); if (iter == attributes.end()) return Uuid(); - Value* val = iter->second; + Value::Ptr val = iter->second; if (!val->isUuid()) return Uuid(); return val->asUuid(); } -FieldTable Object::attrMap(const std::string& key) const +FieldTable Object::attrMap(const string& key) const { AttributeMap::const_iterator iter = attributes.find(key); if (iter == attributes.end()) return FieldTable(); - Value* val = iter->second; + Value::Ptr val = iter->second; if (!val->isMap()) return FieldTable(); return val->asMap(); diff --git a/qpid/cpp/src/qpid/console/Object.h b/qpid/cpp/src/qpid/console/Object.h index 918bee8af1..54a3e0f6e8 100644 --- a/qpid/cpp/src/qpid/console/Object.h +++ b/qpid/cpp/src/qpid/console/Object.h @@ -24,6 +24,7 @@ #include "ObjectId.h" #include "qpid/framing/Uuid.h" #include "qpid/framing/FieldTable.h" +#include <boost/shared_ptr.hpp> #include <map> #include <set> #include <vector> @@ -47,13 +48,25 @@ namespace console { struct MethodResponse { uint32_t code; std::string text; - std::map<std::string, Value*> arguments; + std::map<std::string, boost::shared_ptr<Value> > arguments; }; class Object { public: typedef std::vector<Object> Vector; - typedef std::map<std::string, Value*> AttributeMap; + struct AttributeMap : public std::map<std::string, boost::shared_ptr<Value> > { + void addRef(const std::string& key, const ObjectId& val); + void addUint(const std::string& key, uint32_t val); + void addInt(const std::string& key, int32_t val); + void addUint64(const std::string& key, uint64_t val); + void addInt64(const std::string& key, int64_t val); + void addString(const std::string& key, const std::string& val); + void addBool(const std::string& key, bool val); + void addFloat(const std::string& key, float val); + void addDouble(const std::string& key, double val); + void addUuid(const std::string& key, const framing::Uuid& val); + void addMap(const std::string& key, const framing::FieldTable& val); + }; Object(Broker* broker, SchemaClass* schemaClass, framing::Buffer& buffer, bool prop, bool stat); ~Object(); diff --git a/qpid/cpp/src/qpid/console/Schema.cpp b/qpid/cpp/src/qpid/console/Schema.cpp index 1cc8b8ee02..31d947cdd5 100644 --- a/qpid/cpp/src/qpid/console/Schema.cpp +++ b/qpid/cpp/src/qpid/console/Schema.cpp @@ -51,7 +51,7 @@ SchemaArgument::SchemaArgument(framing::Buffer& buffer, bool forMethod) } } -Value* SchemaArgument::decodeValue(framing::Buffer& buffer) +Value::Ptr SchemaArgument::decodeValue(framing::Buffer& buffer) { return ValueFactory::newValue(typeCode, buffer); } @@ -73,7 +73,7 @@ SchemaProperty::SchemaProperty(framing::Buffer& buffer) desc = map.getAsString("desc"); } -Value* SchemaProperty::decodeValue(framing::Buffer& buffer) +Value::Ptr SchemaProperty::decodeValue(framing::Buffer& buffer) { return ValueFactory::newValue(typeCode, buffer); } @@ -89,7 +89,7 @@ SchemaStatistic::SchemaStatistic(framing::Buffer& buffer) desc = map.getAsString("desc"); } -Value* SchemaStatistic::decodeValue(framing::Buffer& buffer) +Value::Ptr SchemaStatistic::decodeValue(framing::Buffer& buffer) { return ValueFactory::newValue(typeCode, buffer); } diff --git a/qpid/cpp/src/qpid/console/Schema.h b/qpid/cpp/src/qpid/console/Schema.h index accd3a8b16..aacedfe23f 100644 --- a/qpid/cpp/src/qpid/console/Schema.h +++ b/qpid/cpp/src/qpid/console/Schema.h @@ -22,6 +22,7 @@ #define _QPID_CONSOLE_SCHEMA_H_ #include "ClassKey.h" +#include <boost/shared_ptr.hpp> #include <vector> namespace qpid { @@ -33,7 +34,7 @@ namespace console { struct SchemaArgument { SchemaArgument(framing::Buffer& buffer, bool forMethod = false); - Value* decodeValue(framing::Buffer& buffer); + boost::shared_ptr<Value> decodeValue(framing::Buffer& buffer); std::string name; uint8_t typeCode; @@ -49,7 +50,7 @@ namespace console { struct SchemaProperty { SchemaProperty(framing::Buffer& buffer); - Value* decodeValue(framing::Buffer& buffer); + boost::shared_ptr<Value> decodeValue(framing::Buffer& buffer); std::string name; uint8_t typeCode; @@ -65,7 +66,7 @@ namespace console { struct SchemaStatistic { SchemaStatistic(framing::Buffer& buffer); - Value* decodeValue(framing::Buffer& buffer); + boost::shared_ptr<Value> decodeValue(framing::Buffer& buffer); std::string name; uint8_t typeCode; diff --git a/qpid/cpp/src/qpid/console/SessionManager.cpp b/qpid/cpp/src/qpid/console/SessionManager.cpp index bd06421445..6aa347e051 100644 --- a/qpid/cpp/src/qpid/console/SessionManager.cpp +++ b/qpid/cpp/src/qpid/console/SessionManager.cpp @@ -34,18 +34,8 @@ using namespace std; using qpid::framing::Buffer; using qpid::framing::FieldTable; -SessionManager::SessionManager(ConsoleListener* _listener, - bool _rcvObjects, - bool _rcvEvents, - bool _rcvHeartbeats, - bool _manageConnections, - bool _userBindings) : - listener(_listener), - rcvObjects((listener != 0) && _rcvObjects), - rcvEvents((listener != 0) && _rcvEvents), - rcvHeartbeats((listener != 0) && _rcvHeartbeats), - userBindings(_userBindings), - manageConnections(_manageConnections) +SessionManager::SessionManager(ConsoleListener* _listener, Settings _settings) : + listener(_listener), settings(_settings) { bindingKeys(); } @@ -158,10 +148,13 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas if (_agent != 0) { agentList.push_back(_agent); + _agent->getBroker()->waitForStable(); } else { if (_broker != 0) { _broker->appendAgents(agentList); + _broker->waitForStable(); } else { + allBrokersStable(); Mutex::ScopedLock _lock(brokerListLock); for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) { (*iter)->appendAgents(agentList); @@ -198,7 +191,7 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas { Mutex::ScopedLock _lock(lock); while (!syncSequenceList.empty() && error.empty()) { - cv.wait(lock); // TODO put timeout in + cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC)); } } @@ -208,16 +201,16 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas void SessionManager::bindingKeys() { bindingKeyList.push_back("schema.#"); - if (rcvObjects && rcvEvents && rcvHeartbeats && !userBindings) { + if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) { bindingKeyList.push_back("console.#"); } else { - if (rcvObjects && !userBindings) + if (settings.rcvObjects && !settings.userBindings) bindingKeyList.push_back("console.obj.#"); else bindingKeyList.push_back("console.obj.*.*.org.apache.qpid.broker.agent"); - if (rcvEvents) + if (settings.rcvEvents) bindingKeyList.push_back("console.event.#"); - if (rcvHeartbeats) + if (settings.rcvHeartbeats) bindingKeyList.push_back("console.heartbeat"); } } @@ -356,8 +349,31 @@ void SessionManager::handleHeartbeatInd(Broker* /*broker*/, Buffer& /*inBuffer*/ { } -void SessionManager::handleEventInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/) +void SessionManager::handleEventInd(Broker* broker, Buffer& buffer, uint32_t /*sequence*/) { + string packageName; + string className; + uint8_t hash[16]; + SchemaClass* schemaClass; + + buffer.getShortString(packageName); + buffer.getShortString(className); + buffer.getBin128(hash); + + { + Mutex::ScopedLock l(lock); + map<string, Package*>::iterator pIter = packages.find(packageName); + if (pIter == packages.end()) + return; + schemaClass = pIter->second->getClass(className, hash); + if (schemaClass == 0) + return; + } + + Event event(broker, schemaClass, buffer); + + if (listener) + listener->event(event); } void SessionManager::handleSchemaResp(Broker* broker, Buffer& inBuffer, uint32_t sequence) diff --git a/qpid/cpp/src/qpid/console/SessionManager.h b/qpid/cpp/src/qpid/console/SessionManager.h index 3da6f98c86..27df00494c 100644 --- a/qpid/cpp/src/qpid/console/SessionManager.h +++ b/qpid/cpp/src/qpid/console/SessionManager.h @@ -30,6 +30,7 @@ #include "Agent.h" #include "Object.h" #include "ObjectId.h" +#include "Value.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Condition.h" #include "qpid/client/ConnectionSettings.h" @@ -52,6 +53,19 @@ class SessionManager typedef std::vector<ClassKey> KeyVector; ~SessionManager() {} + struct Settings { + bool rcvObjects; + bool rcvEvents; + bool rcvHeartbeats; + bool userBindings; + uint32_t methodTimeout; + uint32_t getTimeout; + + Settings() : rcvObjects(true), rcvEvents(true), rcvHeartbeats(true), userBindings(false), + methodTimeout(20), getTimeout(20) + {} + }; + /** Create a new SessionManager * * Provide your own subclass of ConsoleListener to receive updates and indications @@ -61,24 +75,18 @@ class SessionManager *@param rcvObjects Listener wishes to receive managed object data. *@param rcvEvents Listener wishes to receive events. *@param rcvHeartbeats Listener wishes to receive agent heartbeats. - *@param manageConnections SessionManager should retry dropped connections. *@param userBindings If rcvObjects is true, userBindings allows the console client * to control which object classes are received. See the bindPackage and bindClass * methods. If userBindings is false, the listener will receive updates for all * object classes. */ SessionManager(ConsoleListener* listener = 0, - bool rcvObjects = true, - bool rcvEvents = true, - bool rcvHeartbeats = true, - bool manageConnections = false, - bool userBindings = false); + Settings settings = Settings()); /** Connect a broker to the console session * *@param settings Connection settings for client access *@return broker object if operation is successful - *@exception If 'manageConnections' is disabled and the connection to the broker fails, * an exception shall be thrown. */ Broker* addBroker(client::ConnectionSettings& settings); @@ -166,11 +174,7 @@ private: SequenceManager::set syncSequenceList; Object::Vector getResult; std::string error; - bool rcvObjects; - bool rcvEvents; - bool rcvHeartbeats; - bool userBindings; - bool manageConnections; + Settings settings; NameVector bindingKeyList; void bindingKeys(); diff --git a/qpid/cpp/src/qpid/console/Value.cpp b/qpid/cpp/src/qpid/console/Value.cpp index 3ef5d01ec3..532709ab05 100644 --- a/qpid/cpp/src/qpid/console/Value.cpp +++ b/qpid/cpp/src/qpid/console/Value.cpp @@ -117,33 +117,33 @@ MapValue::MapValue(framing::Buffer& buffer) } -Value* ValueFactory::newValue(int typeCode, framing::Buffer& buffer) +Value::Ptr ValueFactory::newValue(int typeCode, framing::Buffer& buffer) { switch (typeCode) { - case 1: return static_cast<Value*>(new UintValue(buffer.getOctet())); // U8 - case 2: return static_cast<Value*>(new UintValue(buffer.getShort())); // U16 - case 3: return static_cast<Value*>(new UintValue(buffer.getLong())); // U32 - case 4: return static_cast<Value*>(new Uint64Value(buffer.getLongLong())); // U64 - case 6: return static_cast<Value*>(new StringValue(buffer, 6)); // SSTR - case 7: return static_cast<Value*>(new StringValue(buffer, 7)); // LSTR - case 8: return static_cast<Value*>(new Int64Value(buffer.getLongLong())); // ABSTIME - case 9: return static_cast<Value*>(new Uint64Value(buffer.getLongLong())); // DELTATIME - case 10: return static_cast<Value*>(new RefValue(buffer)); // REF - case 11: return static_cast<Value*>(new BoolValue(buffer.getOctet())); // BOOL - case 12: return static_cast<Value*>(new FloatValue(buffer.getFloat())); // FLOAT - case 13: return static_cast<Value*>(new DoubleValue(buffer.getDouble())); // DOUBLE - case 14: return static_cast<Value*>(new UuidValue(buffer)); // UUID - case 15: return static_cast<Value*>(new MapValue(buffer)); // MAP - case 16: return static_cast<Value*>(new IntValue(buffer.getOctet())); // S8 - case 17: return static_cast<Value*>(new IntValue(buffer.getShort())); // S16 - case 18: return static_cast<Value*>(new IntValue(buffer.getLong())); // S32 - case 19: return static_cast<Value*>(new Int64Value(buffer.getLongLong())); // S64 + case 1: return Value::Ptr(new UintValue(buffer.getOctet())); // U8 + case 2: return Value::Ptr(new UintValue(buffer.getShort())); // U16 + case 3: return Value::Ptr(new UintValue(buffer.getLong())); // U32 + case 4: return Value::Ptr(new Uint64Value(buffer.getLongLong())); // U64 + case 6: return Value::Ptr(new StringValue(buffer, 6)); // SSTR + case 7: return Value::Ptr(new StringValue(buffer, 7)); // LSTR + case 8: return Value::Ptr(new Int64Value(buffer.getLongLong())); // ABSTIME + case 9: return Value::Ptr(new Uint64Value(buffer.getLongLong())); // DELTATIME + case 10: return Value::Ptr(new RefValue(buffer)); // REF + case 11: return Value::Ptr(new BoolValue(buffer.getOctet())); // BOOL + case 12: return Value::Ptr(new FloatValue(buffer.getFloat())); // FLOAT + case 13: return Value::Ptr(new DoubleValue(buffer.getDouble())); // DOUBLE + case 14: return Value::Ptr(new UuidValue(buffer)); // UUID + case 15: return Value::Ptr(new MapValue(buffer)); // MAP + case 16: return Value::Ptr(new IntValue(buffer.getOctet())); // S8 + case 17: return Value::Ptr(new IntValue(buffer.getShort())); // S16 + case 18: return Value::Ptr(new IntValue(buffer.getLong())); // S32 + case 19: return Value::Ptr(new Int64Value(buffer.getLongLong())); // S64 } - return 0; + return Value::Ptr(); } -void ValueFactory::encodeValue(int typeCode, Value* value, framing::Buffer& buffer) +void ValueFactory::encodeValue(int typeCode, Value::Ptr value, framing::Buffer& buffer) { switch (typeCode) { case 1: buffer.putOctet(value->asUint()); return; // U8 diff --git a/qpid/cpp/src/qpid/console/Value.h b/qpid/cpp/src/qpid/console/Value.h index 42ff5661a2..5a0915c69b 100644 --- a/qpid/cpp/src/qpid/console/Value.h +++ b/qpid/cpp/src/qpid/console/Value.h @@ -25,6 +25,7 @@ #include "qpid/framing/Uuid.h" #include "qpid/framing/FieldTable.h" #include "ObjectId.h" +#include <boost/shared_ptr.hpp> namespace qpid { namespace framing { @@ -38,6 +39,7 @@ namespace console { class Value { public: + typedef boost::shared_ptr<Value> Ptr; virtual ~Value() {} virtual std::string str() const = 0; @@ -196,8 +198,8 @@ namespace console { class ValueFactory { public: - static Value* newValue(int typeCode, framing::Buffer& buffer); - static void encodeValue(int typeCode, Value* value, framing::Buffer& buffer); + static Value::Ptr newValue(int typeCode, framing::Buffer& buffer); + static void encodeValue(int typeCode, Value::Ptr value, framing::Buffer& buffer); }; } } |