summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-12-23 19:38:25 +0000
committerTed Ross <tross@apache.org>2008-12-23 19:38:25 +0000
commit564775ae717db7743d41f7620cde8845dc7d35b8 (patch)
tree8e5f5b8985a5428ce766ef05161887b256615a88
parent1ac4193fbf6dcbf1e95da4250b6d3bb1d928ab2f (diff)
downloadqpid-python-564775ae717db7743d41f7620cde8845dc7d35b8.tar.gz
QPID-1412 Updates and fixes for the c++ console API:
- Added event support - Converted raw pointers to shared_ptrs in references to Values. This fixes a memory leak in the original code. - Added wrappers to make value access more convenient. - Added timeout handling for synchronous operations. Timeout values are configurable. - Fixed a bug in getObjects whereby waitForStable was not called and the operation could fail if called too early. - Added examples "printevents" and "ping" to illustrate the usage of different aspects of the API. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@729075 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/examples/qmf-console/Makefile.am14
-rw-r--r--qpid/cpp/examples/qmf-console/console.cpp16
-rw-r--r--qpid/cpp/examples/qmf-console/ping.cpp129
-rw-r--r--qpid/cpp/examples/qmf-console/printevents.cpp105
-rw-r--r--qpid/cpp/src/qmfc.mk1
-rw-r--r--qpid/cpp/src/qpid/console/Broker.cpp11
-rw-r--r--qpid/cpp/src/qpid/console/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/console/ConsoleListener.h3
-rw-r--r--qpid/cpp/src/qpid/console/Event.cpp205
-rw-r--r--qpid/cpp/src/qpid/console/Event.h49
-rw-r--r--qpid/cpp/src/qpid/console/Object.cpp127
-rw-r--r--qpid/cpp/src/qpid/console/Object.h17
-rw-r--r--qpid/cpp/src/qpid/console/Schema.cpp6
-rw-r--r--qpid/cpp/src/qpid/console/Schema.h7
-rw-r--r--qpid/cpp/src/qpid/console/SessionManager.cpp52
-rw-r--r--qpid/cpp/src/qpid/console/SessionManager.h28
-rw-r--r--qpid/cpp/src/qpid/console/Value.cpp42
-rw-r--r--qpid/cpp/src/qpid/console/Value.h6
18 files changed, 706 insertions, 113 deletions
diff --git a/qpid/cpp/examples/qmf-console/Makefile.am b/qpid/cpp/examples/qmf-console/Makefile.am
index 05db50f734..e115fc0fb0 100644
--- a/qpid/cpp/examples/qmf-console/Makefile.am
+++ b/qpid/cpp/examples/qmf-console/Makefile.am
@@ -22,12 +22,20 @@ examplesdir=$(pkgdatadir)/examples/qmf-console
MAKELDFLAG = qmfconsole
include $(top_srcdir)/examples/makedist.mk
-noinst_PROGRAMS=qmfc
+noinst_PROGRAMS=console printevents ping
-qmfc_SOURCES=console.cpp
-qmfc_LDADD=$(CONSOLE_LIB)
+console_SOURCES=console.cpp
+console_LDADD=$(CONSOLE_LIB)
+
+printevents_SOURCES=printevents.cpp
+printevents_LDADD=$(CONSOLE_LIB)
+
+ping_SOURCES=ping.cpp
+ping_LDADD=$(CONSOLE_LIB)
examples_DATA= \
console.cpp \
+ printevents.cpp \
+ ping.cpp \
$(MAKEDIST)
diff --git a/qpid/cpp/examples/qmf-console/console.cpp b/qpid/cpp/examples/qmf-console/console.cpp
index c98f1ace34..5700d5556f 100644
--- a/qpid/cpp/examples/qmf-console/console.cpp
+++ b/qpid/cpp/examples/qmf-console/console.cpp
@@ -21,17 +21,9 @@
#include "qpid/console/ConsoleListener.h"
#include "qpid/console/SessionManager.h"
-#include "qpid/console/Value.h"
-#include <unistd.h>
-#include <cstdlib>
-#include <iostream>
-
-#include <sstream>
using namespace std;
using namespace qpid::console;
-using std::cout;
-using std::endl;
class Listener : public ConsoleListener {
public:
@@ -68,6 +60,10 @@ public:
void objectStats(Broker& broker, Object& object) {
cout << "objectStats: broker=" << broker << " object=" << object << endl;
}
+
+ void event(Event& event) {
+ cout << "event: " << event << endl;
+ }
};
@@ -127,8 +123,8 @@ int main_int(int /*argc*/, char** /*argv*/)
Object::AttributeMap args;
MethodResponse result;
- args["sequence"] = new UintValue(1);
- args["body"] = new StringValue("Testing...");
+ args.addUint("sequence", 1);
+ args.addString("body", "Testing...");
cout << "Call echo method..." << endl;
broker.invokeMethod("echo", args, result);
diff --git a/qpid/cpp/examples/qmf-console/ping.cpp b/qpid/cpp/examples/qmf-console/ping.cpp
new file mode 100644
index 0000000000..debca7428a
--- /dev/null
+++ b/qpid/cpp/examples/qmf-console/ping.cpp
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/console/SessionManager.h"
+
+using namespace std;
+using namespace qpid::console;
+
+//==============================================================
+// Main program
+//==============================================================
+int main_int(int /*argc*/, char** /*argv*/)
+{
+ //
+ // Declare connection settings for the messaging broker. The settings default to
+ // localhost:5672 with user guest (password guest). Refer to the header file
+ // <qpid/client/ConnectionSettings.h> for full details.
+ //
+ qpid::client::ConnectionSettings connSettings;
+
+ //
+ // Declare the (optional) session manager settings. Override the default timeout
+ // for methods calls to be 2 seconds.
+ //
+ SessionManager::Settings smSettings;
+ smSettings.methodTimeout = 2;
+
+ //
+ // Declare the console session manager. With a null listener argument, it defaults to
+ // synchronous-only access mode.
+ //
+ SessionManager sm(0, smSettings);
+
+ //
+ // Add a broker connection to the session manager.
+ //
+ Broker* broker = sm.addBroker(connSettings);
+
+ uint32_t count = 5; // The number of echo requests we will send to the broker.
+ Object::Vector list; // A container for holding objects retrieved from the broker.
+
+ //
+ // Query for a list of 'broker' objects from the Management Database
+ //
+ sm.getObjects(list, "broker");
+
+ //
+ // We expect one object (since we are connected to only one broker)
+ //
+ if (list.size() == 1) {
+ Object& brokerObject = *(list.begin());
+
+ for (uint32_t iter = 0; iter < count; iter++) {
+ //
+ // Declare a container for arguments to be sent with the "echo" method
+ // that we will invoke on the remote "broker" object.
+ //
+ Object::AttributeMap args;
+
+ //
+ // Declare a container for the result of the method invocation.
+ //
+ MethodResponse result;
+
+ //
+ // Set the values of the input arguments.
+ //
+ args.addUint("sequence", iter);
+ args.addString("body", "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
+
+ cout << "Ping Broker: " << broker->getUrl() << "... ";
+ cout.flush();
+
+ //
+ // Invoke the method. This is a synchronous operation that will block until
+ // the method completes and returns a result.
+ //
+ brokerObject.invokeMethod("echo", args, result);
+
+ //
+ // result.code is the return code (0 => Success)
+ // result.text is the return text
+ // result.arguments is a container (of type Object::AttributeMap) that holds
+ // the output arguments (if any) from the method.
+ //
+ cout << "Result: code=" << result.code << " text=" << result.text;
+ if (result.code == 0)
+ cout << " seq=" << result.arguments["sequence"]->asUint();
+ cout << endl;
+
+ if (result.code == 0 && iter < count - 1)
+ ::sleep(1);
+ }
+ }
+
+ //
+ // Disconnect the broker from the session manager.
+ //
+ sm.delBroker(broker);
+ return 0;
+}
+
+int main(int argc, char** argv)
+{
+ try {
+ return main_int(argc, argv);
+ } catch(std::exception& e) {
+ cout << "Top Level Exception: " << e.what() << endl;
+ }
+}
+
diff --git a/qpid/cpp/examples/qmf-console/printevents.cpp b/qpid/cpp/examples/qmf-console/printevents.cpp
new file mode 100644
index 0000000000..bbec2c1af0
--- /dev/null
+++ b/qpid/cpp/examples/qmf-console/printevents.cpp
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/console/ConsoleListener.h"
+#include "qpid/console/SessionManager.h"
+#include "qpid/sys/Time.h"
+
+using namespace std;
+using namespace qpid::console;
+
+//
+// Define a listener class to receive asynchronous events.
+//
+class Listener : public ConsoleListener {
+public:
+ void brokerConnected(const Broker& broker) {
+ cout << qpid::sys::now() << " NOTIC qpid-printevents:brokerConnected broker=" <<
+ broker.getUrl() << endl;
+ }
+
+ void brokerDisconnected(const Broker& broker) {
+ cout << qpid::sys::now() << " NOTIC qpid-printevents:brokerDisonnected broker=" <<
+ broker.getUrl() << endl;
+ }
+
+ void event(Event& event) {
+ cout << event << endl;
+ }
+};
+
+
+//==============================================================
+// Main program
+//==============================================================
+int main_int(int /*argc*/, char** /*argv*/)
+{
+ //
+ // Declare an instance of our listener.
+ //
+ Listener listener;
+
+ //
+ // Declare connection settings for the messaging broker. The settings default to
+ // localhost:5672 with user guest (password guest). Refer to the header file
+ // <qpid/client/ConnectionSettings.h> for full details.
+ //
+ qpid::client::ConnectionSettings connSettings;
+
+ //
+ // Declare the (optional) session manager settings. Disable the reception of
+ // object updates and heartbeats. Note that by disabling the reception of things
+ // we don't need, we don't unnecessarily use network bandwidth.
+ //
+ SessionManager::Settings smSettings;
+ smSettings.rcvObjects = false;
+ smSettings.rcvHeartbeats = false;
+
+ //
+ // Declare the console session manager.
+ //
+ SessionManager sm(&listener, smSettings);
+
+ //
+ // Add a broker connection to the session manager. If desired, multiple brokers may
+ // be connected.
+ //
+ Broker* broker = sm.addBroker(connSettings);
+
+ //
+ // Sleep indefinitely while asynchronous events are handled by the listener.
+ //
+ for (;;)
+ ::sleep(1);
+
+ sm.delBroker(broker);
+ return 0;
+}
+
+int main(int argc, char** argv)
+{
+ try {
+ return main_int(argc, argv);
+ } catch(std::exception& e) {
+ cout << "Top Level Exception: " << e.what() << endl;
+ }
+}
+
diff --git a/qpid/cpp/src/qmfc.mk b/qpid/cpp/src/qmfc.mk
index 6fa7d4c988..089ac2dcd9 100644
--- a/qpid/cpp/src/qmfc.mk
+++ b/qpid/cpp/src/qmfc.mk
@@ -31,6 +31,7 @@ libqmfconsole_la_SOURCES = \
qpid/console/ClassKey.cpp \
qpid/console/ConsoleListener.h \
qpid/console/Event.h \
+ qpid/console/Event.cpp \
qpid/console/Object.h \
qpid/console/Object.cpp \
qpid/console/ObjectId.h \
diff --git a/qpid/cpp/src/qpid/console/Broker.cpp b/qpid/cpp/src/qpid/console/Broker.cpp
index 2e7ba95b1d..c6b1be1d31 100644
--- a/qpid/cpp/src/qpid/console/Broker.cpp
+++ b/qpid/cpp/src/qpid/console/Broker.cpp
@@ -59,6 +59,13 @@ Broker::~Broker()
{
}
+string Broker::getUrl() const
+{
+ stringstream url;
+ url << connectionSettings.host << ":" << connectionSettings.port;
+ return url.str();
+}
+
void Broker::encodeHeader(framing::Buffer& buf, uint8_t opcode, uint32_t seq) const
{
buf.putOctet('A');
@@ -249,7 +256,9 @@ void Broker::waitForStable()
return;
syncInFlight = true;
while (reqsOutstanding != 0) {
- cond.wait(lock); // TODO: put timeout delay in here!
+ bool result = cond.wait(lock, AbsTime(now(), TIME_SEC * sessionManager.settings.getTimeout));
+ if (!result)
+ throw(Exception("Timed out waiting for broker to synchronize"));
}
}
diff --git a/qpid/cpp/src/qpid/console/Broker.h b/qpid/cpp/src/qpid/console/Broker.h
index 8cca99f764..9df2380dff 100644
--- a/qpid/cpp/src/qpid/console/Broker.h
+++ b/qpid/cpp/src/qpid/console/Broker.h
@@ -61,6 +61,7 @@ namespace console {
void addBinding(const std::string& key) {
connThreadBody.bindExchange("qpid.management", key);
}
+ std::string getUrl() const;
private:
friend class SessionManager;
diff --git a/qpid/cpp/src/qpid/console/ConsoleListener.h b/qpid/cpp/src/qpid/console/ConsoleListener.h
index 97fdf158cd..d0db6034f6 100644
--- a/qpid/cpp/src/qpid/console/ConsoleListener.h
+++ b/qpid/cpp/src/qpid/console/ConsoleListener.h
@@ -25,6 +25,7 @@
#include "Broker.h"
#include "ClassKey.h"
#include "Object.h"
+#include "Event.h"
namespace qpid {
namespace console {
@@ -74,7 +75,7 @@ namespace console {
/** Invoked when an event is raised.
*/
- //virtual void event(Broker&, Event) {}
+ virtual void event(Event&) {}
/**
*/
diff --git a/qpid/cpp/src/qpid/console/Event.cpp b/qpid/cpp/src/qpid/console/Event.cpp
new file mode 100644
index 0000000000..51f043159c
--- /dev/null
+++ b/qpid/cpp/src/qpid/console/Event.cpp
@@ -0,0 +1,205 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Broker.h"
+#include "ClassKey.h"
+#include "Schema.h"
+#include "Event.h"
+#include "Value.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/Buffer.h"
+
+using namespace qpid::console;
+using namespace std;
+using qpid::framing::Uuid;
+using qpid::framing::FieldTable;
+
+Event::Event(Broker* _broker, SchemaClass* _schema, framing::Buffer& buffer) :
+ broker(_broker), schema(_schema)
+{
+ timestamp = buffer.getLongLong();
+ severity = (Severity) buffer.getOctet();
+ for (vector<SchemaArgument*>::const_iterator aIter = schema->arguments.begin();
+ aIter != schema->arguments.end(); aIter++) {
+ SchemaArgument* argument = *aIter;
+ attributes[argument->name] = argument->decodeValue(buffer);
+ }
+}
+
+const ClassKey& Event::getClassKey() const
+{
+ return schema->getClassKey();
+}
+
+string Event::getSeverityString() const
+{
+ switch (severity) {
+ case EMERGENCY : return string("EMER");
+ case ALERT : return string("ALERT");
+ case CRITICAL : return string("CRIT");
+ case ERROR : return string("ERROR");
+ case WARNING : return string("WARN");
+ case NOTICE : return string("NOTIC");
+ case INFO : return string("INFO");
+ case DEBUG : return string("DEBUG");
+ }
+ return string("<UNKNOWN>");
+}
+
+ObjectId Event::attrRef(const string& key) const
+{
+ Object::AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return ObjectId();
+ Value::Ptr val = iter->second;
+ if (!val->isObjectId())
+ return ObjectId();
+ return val->asObjectId();
+}
+
+uint32_t Event::attrUint(const string& key) const
+{
+ Object::AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0;
+ Value::Ptr val = iter->second;
+ if (!val->isUint())
+ return 0;
+ return val->asUint();
+}
+
+int32_t Event::attrInt(const string& key) const
+{
+ Object::AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0;
+ Value::Ptr val = iter->second;
+ if (!val->isInt())
+ return 0;
+ return val->asInt();
+}
+
+uint64_t Event::attrUint64(const string& key) const
+{
+ Object::AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0;
+ Value::Ptr val = iter->second;
+ if (!val->isUint64())
+ return 0;
+ return val->asUint64();
+}
+
+int64_t Event::attrInt64(const string& key) const
+{
+ Object::AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0;
+ Value::Ptr val = iter->second;
+ if (!val->isInt64())
+ return 0;
+ return val->asInt64();
+}
+
+string Event::attrString(const string& key) const
+{
+ Object::AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return string();
+ Value::Ptr val = iter->second;
+ if (!val->isString())
+ return string();
+ return val->asString();
+}
+
+bool Event::attrBool(const string& key) const
+{
+ Object::AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return false;
+ Value::Ptr val = iter->second;
+ if (!val->isBool())
+ return false;
+ return val->asBool();
+}
+
+float Event::attrFloat(const string& key) const
+{
+ Object::AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0.0;
+ Value::Ptr val = iter->second;
+ if (!val->isFloat())
+ return 0.0;
+ return val->asFloat();
+}
+
+double Event::attrDouble(const string& key) const
+{
+ Object::AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return 0.0;
+ Value::Ptr val = iter->second;
+ if (!val->isDouble())
+ return 0.0;
+ return val->asDouble();
+}
+
+Uuid Event::attrUuid(const string& key) const
+{
+ Object::AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return Uuid();
+ Value::Ptr val = iter->second;
+ if (!val->isUuid())
+ return Uuid();
+ return val->asUuid();
+}
+
+FieldTable Event::attrMap(const string& key) const
+{
+ Object::AttributeMap::const_iterator iter = attributes.find(key);
+ if (iter == attributes.end())
+ return FieldTable();
+ Value::Ptr val = iter->second;
+ if (!val->isMap())
+ return FieldTable();
+ return val->asMap();
+}
+
+
+std::ostream& qpid::console::operator<<(std::ostream& o, const Event& event)
+{
+ const ClassKey& key = event.getClassKey();
+ sys::AbsTime aTime(sys::AbsTime(), sys::Duration(event.getTimestamp()));
+ o << aTime << " " << event.getSeverityString() << " " <<
+ key.getPackageName() << ":" << key.getClassName() <<
+ " broker=" << event.getBroker()->getUrl();
+
+ const Object::AttributeMap& attributes = event.getAttributes();
+ for (Object::AttributeMap::const_iterator iter = attributes.begin();
+ iter != attributes.end(); iter++) {
+ o << " " << iter->first << "=" << iter->second->str();
+ }
+ return o;
+}
+
+
diff --git a/qpid/cpp/src/qpid/console/Event.h b/qpid/cpp/src/qpid/console/Event.h
index 7627a4264d..c212b72889 100644
--- a/qpid/cpp/src/qpid/console/Event.h
+++ b/qpid/cpp/src/qpid/console/Event.h
@@ -21,18 +21,61 @@
#ifndef _QPID_CONSOLE_EVENT_H_
#define _QPID_CONSOLE_EVENT_H_
-#include <string>
-#include "Message.h"
+#include "Object.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FieldTable.h"
namespace qpid {
+namespace framing {
+ class Buffer;
+}
namespace console {
+ class Broker;
+ class SchemaClass;
+ class ClassKey;
+
/**
*
- * \ingroup qpidconsoleapi
+ * \ingroup qmfconsoleapi
*/
class Event {
+ public:
+ typedef enum {
+ EMERGENCY = 0, ALERT = 1, CRITICAL = 2, ERROR = 3, WARNING = 4,
+ NOTICE = 5, INFO = 6, DEBUG = 7
+ } Severity;
+
+ Event(Broker* broker, SchemaClass* schemaClass, framing::Buffer& buffer);
+ Broker* getBroker() const { return broker; }
+ const ClassKey& getClassKey() const;
+ SchemaClass* getSchema() const { return schema; }
+ const Object::AttributeMap& getAttributes() const { return attributes; }
+ uint64_t getTimestamp() const { return timestamp; }
+ uint8_t getSeverity() const { return severity; }
+ std::string getSeverityString() const;
+
+ ObjectId attrRef(const std::string& key) const;
+ uint32_t attrUint(const std::string& key) const;
+ int32_t attrInt(const std::string& key) const;
+ uint64_t attrUint64(const std::string& key) const;
+ int64_t attrInt64(const std::string& key) const;
+ std::string attrString(const std::string& key) const;
+ bool attrBool(const std::string& key) const;
+ float attrFloat(const std::string& key) const;
+ double attrDouble(const std::string& key) const;
+ framing::Uuid attrUuid(const std::string& key) const;
+ framing::FieldTable attrMap(const std::string& key) const;
+
+ private:
+ Broker* broker;
+ SchemaClass* schema;
+ uint64_t timestamp;
+ Severity severity;
+ Object::AttributeMap attributes;
};
+
+ std::ostream& operator<<(std::ostream& o, const Event& event);
}
}
diff --git a/qpid/cpp/src/qpid/console/Object.cpp b/qpid/cpp/src/qpid/console/Object.cpp
index 1ca70a616a..da8ab962e0 100644
--- a/qpid/cpp/src/qpid/console/Object.cpp
+++ b/qpid/cpp/src/qpid/console/Object.cpp
@@ -29,10 +29,65 @@
#include "qpid/sys/Mutex.h"
using namespace qpid::console;
+using namespace qpid::sys;
using namespace std;
using qpid::framing::Uuid;
using qpid::framing::FieldTable;
-using qpid::sys::Mutex;
+
+void Object::AttributeMap::addRef(const string& key, const ObjectId& val)
+{
+ (*this)[key] = Value::Ptr(new RefValue(val));
+}
+
+void Object::AttributeMap::addUint(const string& key, uint32_t val)
+{
+ (*this)[key] = Value::Ptr(new UintValue(val));
+}
+
+void Object::AttributeMap::addInt(const string& key, int32_t val)
+{
+ (*this)[key] = Value::Ptr(new IntValue(val));
+}
+
+void Object::AttributeMap::addUint64(const string& key, uint64_t val)
+{
+ (*this)[key] = Value::Ptr(new Uint64Value(val));
+}
+
+void Object::AttributeMap::addInt64(const string& key, int64_t val)
+{
+ (*this)[key] = Value::Ptr(new Int64Value(val));
+}
+
+void Object::AttributeMap::addString(const string& key, const string& val)
+{
+ (*this)[key] = Value::Ptr(new StringValue(val));
+}
+
+void Object::AttributeMap::addBool(const string& key, bool val)
+{
+ (*this)[key] = Value::Ptr(new BoolValue(val));
+}
+
+void Object::AttributeMap::addFloat(const string& key, float val)
+{
+ (*this)[key] = Value::Ptr(new FloatValue(val));
+}
+
+void Object::AttributeMap::addDouble(const string& key, double val)
+{
+ (*this)[key] = Value::Ptr(new DoubleValue(val));
+}
+
+void Object::AttributeMap::addUuid(const string& key, const framing::Uuid& val)
+{
+ (*this)[key] = Value::Ptr(new UuidValue(val));
+}
+
+void Object::AttributeMap::addMap(const string& key, const framing::FieldTable& val)
+{
+ (*this)[key] = Value::Ptr(new MapValue(val));
+}
Object::Object(Broker* b, SchemaClass* s, framing::Buffer& buffer, bool prop, bool stat) :
broker(b), schema(s), pendingMethod(0)
@@ -49,7 +104,7 @@ Object::Object(Broker* b, SchemaClass* s, framing::Buffer& buffer, bool prop, bo
pIter != schema->properties.end(); pIter++) {
SchemaProperty* property = *pIter;
if (excludes.count(property->name) != 0) {
- attributes[property->name] = new NullValue();
+ attributes[property->name] = Value::Ptr(new NullValue());
} else {
attributes[property->name] = property->decodeValue(buffer);
}
@@ -65,19 +120,14 @@ Object::Object(Broker* b, SchemaClass* s, framing::Buffer& buffer, bool prop, bo
}
}
-Object::~Object()
-{
- // for (AttributeMap::iterator iter = attributes.begin(); iter != attributes.end(); iter++)
- // delete iter->second;
- // attributes.clear();
-}
+Object::~Object() {}
const ClassKey& Object::getClassKey() const
{
return schema->getClassKey();
}
-std::string Object::getIndex() const
+string Object::getIndex() const
{
string result;
@@ -139,9 +189,18 @@ void Object::invokeMethod(const string name, const AttributeMap& args, MethodRes
{
Mutex::ScopedLock l(broker->lock);
- while (pendingMethod != 0)
- broker->cond.wait(broker->lock);
- result = methodResponse;
+ bool ok = true;
+ while (pendingMethod != 0 && ok) {
+ ok = broker->cond.wait(broker->lock, AbsTime(now(), broker->sessionManager.settings.methodTimeout * TIME_SEC));
+ }
+
+ if (!ok) {
+ result.code = 0x1001;
+ result.text.assign("Method call timed out");
+ result.arguments.clear();
+ } else {
+ result = methodResponse;
+ }
}
}
}
@@ -169,122 +228,122 @@ void Object::handleMethodResp(framing::Buffer& buffer, uint32_t sequence)
}
}
-ObjectId Object::attrRef(const std::string& key) const
+ObjectId Object::attrRef(const string& key) const
{
AttributeMap::const_iterator iter = attributes.find(key);
if (iter == attributes.end())
return ObjectId();
- Value* val = iter->second;
+ Value::Ptr val = iter->second;
if (!val->isObjectId())
return ObjectId();
return val->asObjectId();
}
-uint32_t Object::attrUint(const std::string& key) const
+uint32_t Object::attrUint(const string& key) const
{
AttributeMap::const_iterator iter = attributes.find(key);
if (iter == attributes.end())
return 0;
- Value* val = iter->second;
+ Value::Ptr val = iter->second;
if (!val->isUint())
return 0;
return val->asUint();
}
-int32_t Object::attrInt(const std::string& key) const
+int32_t Object::attrInt(const string& key) const
{
AttributeMap::const_iterator iter = attributes.find(key);
if (iter == attributes.end())
return 0;
- Value* val = iter->second;
+ Value::Ptr val = iter->second;
if (!val->isInt())
return 0;
return val->asInt();
}
-uint64_t Object::attrUint64(const std::string& key) const
+uint64_t Object::attrUint64(const string& key) const
{
AttributeMap::const_iterator iter = attributes.find(key);
if (iter == attributes.end())
return 0;
- Value* val = iter->second;
+ Value::Ptr val = iter->second;
if (!val->isUint64())
return 0;
return val->asUint64();
}
-int64_t Object::attrInt64(const std::string& key) const
+int64_t Object::attrInt64(const string& key) const
{
AttributeMap::const_iterator iter = attributes.find(key);
if (iter == attributes.end())
return 0;
- Value* val = iter->second;
+ Value::Ptr val = iter->second;
if (!val->isInt64())
return 0;
return val->asInt64();
}
-string Object::attrString(const std::string& key) const
+string Object::attrString(const string& key) const
{
AttributeMap::const_iterator iter = attributes.find(key);
if (iter == attributes.end())
return string();
- Value* val = iter->second;
+ Value::Ptr val = iter->second;
if (!val->isString())
return string();
return val->asString();
}
-bool Object::attrBool(const std::string& key) const
+bool Object::attrBool(const string& key) const
{
AttributeMap::const_iterator iter = attributes.find(key);
if (iter == attributes.end())
return false;
- Value* val = iter->second;
+ Value::Ptr val = iter->second;
if (!val->isBool())
return false;
return val->asBool();
}
-float Object::attrFloat(const std::string& key) const
+float Object::attrFloat(const string& key) const
{
AttributeMap::const_iterator iter = attributes.find(key);
if (iter == attributes.end())
return 0.0;
- Value* val = iter->second;
+ Value::Ptr val = iter->second;
if (!val->isFloat())
return 0.0;
return val->asFloat();
}
-double Object::attrDouble(const std::string& key) const
+double Object::attrDouble(const string& key) const
{
AttributeMap::const_iterator iter = attributes.find(key);
if (iter == attributes.end())
return 0.0;
- Value* val = iter->second;
+ Value::Ptr val = iter->second;
if (!val->isDouble())
return 0.0;
return val->asDouble();
}
-Uuid Object::attrUuid(const std::string& key) const
+Uuid Object::attrUuid(const string& key) const
{
AttributeMap::const_iterator iter = attributes.find(key);
if (iter == attributes.end())
return Uuid();
- Value* val = iter->second;
+ Value::Ptr val = iter->second;
if (!val->isUuid())
return Uuid();
return val->asUuid();
}
-FieldTable Object::attrMap(const std::string& key) const
+FieldTable Object::attrMap(const string& key) const
{
AttributeMap::const_iterator iter = attributes.find(key);
if (iter == attributes.end())
return FieldTable();
- Value* val = iter->second;
+ Value::Ptr val = iter->second;
if (!val->isMap())
return FieldTable();
return val->asMap();
diff --git a/qpid/cpp/src/qpid/console/Object.h b/qpid/cpp/src/qpid/console/Object.h
index 918bee8af1..54a3e0f6e8 100644
--- a/qpid/cpp/src/qpid/console/Object.h
+++ b/qpid/cpp/src/qpid/console/Object.h
@@ -24,6 +24,7 @@
#include "ObjectId.h"
#include "qpid/framing/Uuid.h"
#include "qpid/framing/FieldTable.h"
+#include <boost/shared_ptr.hpp>
#include <map>
#include <set>
#include <vector>
@@ -47,13 +48,25 @@ namespace console {
struct MethodResponse {
uint32_t code;
std::string text;
- std::map<std::string, Value*> arguments;
+ std::map<std::string, boost::shared_ptr<Value> > arguments;
};
class Object {
public:
typedef std::vector<Object> Vector;
- typedef std::map<std::string, Value*> AttributeMap;
+ struct AttributeMap : public std::map<std::string, boost::shared_ptr<Value> > {
+ void addRef(const std::string& key, const ObjectId& val);
+ void addUint(const std::string& key, uint32_t val);
+ void addInt(const std::string& key, int32_t val);
+ void addUint64(const std::string& key, uint64_t val);
+ void addInt64(const std::string& key, int64_t val);
+ void addString(const std::string& key, const std::string& val);
+ void addBool(const std::string& key, bool val);
+ void addFloat(const std::string& key, float val);
+ void addDouble(const std::string& key, double val);
+ void addUuid(const std::string& key, const framing::Uuid& val);
+ void addMap(const std::string& key, const framing::FieldTable& val);
+ };
Object(Broker* broker, SchemaClass* schemaClass, framing::Buffer& buffer, bool prop, bool stat);
~Object();
diff --git a/qpid/cpp/src/qpid/console/Schema.cpp b/qpid/cpp/src/qpid/console/Schema.cpp
index 1cc8b8ee02..31d947cdd5 100644
--- a/qpid/cpp/src/qpid/console/Schema.cpp
+++ b/qpid/cpp/src/qpid/console/Schema.cpp
@@ -51,7 +51,7 @@ SchemaArgument::SchemaArgument(framing::Buffer& buffer, bool forMethod)
}
}
-Value* SchemaArgument::decodeValue(framing::Buffer& buffer)
+Value::Ptr SchemaArgument::decodeValue(framing::Buffer& buffer)
{
return ValueFactory::newValue(typeCode, buffer);
}
@@ -73,7 +73,7 @@ SchemaProperty::SchemaProperty(framing::Buffer& buffer)
desc = map.getAsString("desc");
}
-Value* SchemaProperty::decodeValue(framing::Buffer& buffer)
+Value::Ptr SchemaProperty::decodeValue(framing::Buffer& buffer)
{
return ValueFactory::newValue(typeCode, buffer);
}
@@ -89,7 +89,7 @@ SchemaStatistic::SchemaStatistic(framing::Buffer& buffer)
desc = map.getAsString("desc");
}
-Value* SchemaStatistic::decodeValue(framing::Buffer& buffer)
+Value::Ptr SchemaStatistic::decodeValue(framing::Buffer& buffer)
{
return ValueFactory::newValue(typeCode, buffer);
}
diff --git a/qpid/cpp/src/qpid/console/Schema.h b/qpid/cpp/src/qpid/console/Schema.h
index accd3a8b16..aacedfe23f 100644
--- a/qpid/cpp/src/qpid/console/Schema.h
+++ b/qpid/cpp/src/qpid/console/Schema.h
@@ -22,6 +22,7 @@
#define _QPID_CONSOLE_SCHEMA_H_
#include "ClassKey.h"
+#include <boost/shared_ptr.hpp>
#include <vector>
namespace qpid {
@@ -33,7 +34,7 @@ namespace console {
struct SchemaArgument {
SchemaArgument(framing::Buffer& buffer, bool forMethod = false);
- Value* decodeValue(framing::Buffer& buffer);
+ boost::shared_ptr<Value> decodeValue(framing::Buffer& buffer);
std::string name;
uint8_t typeCode;
@@ -49,7 +50,7 @@ namespace console {
struct SchemaProperty {
SchemaProperty(framing::Buffer& buffer);
- Value* decodeValue(framing::Buffer& buffer);
+ boost::shared_ptr<Value> decodeValue(framing::Buffer& buffer);
std::string name;
uint8_t typeCode;
@@ -65,7 +66,7 @@ namespace console {
struct SchemaStatistic {
SchemaStatistic(framing::Buffer& buffer);
- Value* decodeValue(framing::Buffer& buffer);
+ boost::shared_ptr<Value> decodeValue(framing::Buffer& buffer);
std::string name;
uint8_t typeCode;
diff --git a/qpid/cpp/src/qpid/console/SessionManager.cpp b/qpid/cpp/src/qpid/console/SessionManager.cpp
index bd06421445..6aa347e051 100644
--- a/qpid/cpp/src/qpid/console/SessionManager.cpp
+++ b/qpid/cpp/src/qpid/console/SessionManager.cpp
@@ -34,18 +34,8 @@ using namespace std;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
-SessionManager::SessionManager(ConsoleListener* _listener,
- bool _rcvObjects,
- bool _rcvEvents,
- bool _rcvHeartbeats,
- bool _manageConnections,
- bool _userBindings) :
- listener(_listener),
- rcvObjects((listener != 0) && _rcvObjects),
- rcvEvents((listener != 0) && _rcvEvents),
- rcvHeartbeats((listener != 0) && _rcvHeartbeats),
- userBindings(_userBindings),
- manageConnections(_manageConnections)
+SessionManager::SessionManager(ConsoleListener* _listener, Settings _settings) :
+ listener(_listener), settings(_settings)
{
bindingKeys();
}
@@ -158,10 +148,13 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas
if (_agent != 0) {
agentList.push_back(_agent);
+ _agent->getBroker()->waitForStable();
} else {
if (_broker != 0) {
_broker->appendAgents(agentList);
+ _broker->waitForStable();
} else {
+ allBrokersStable();
Mutex::ScopedLock _lock(brokerListLock);
for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) {
(*iter)->appendAgents(agentList);
@@ -198,7 +191,7 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas
{
Mutex::ScopedLock _lock(lock);
while (!syncSequenceList.empty() && error.empty()) {
- cv.wait(lock); // TODO put timeout in
+ cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC));
}
}
@@ -208,16 +201,16 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas
void SessionManager::bindingKeys()
{
bindingKeyList.push_back("schema.#");
- if (rcvObjects && rcvEvents && rcvHeartbeats && !userBindings) {
+ if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
bindingKeyList.push_back("console.#");
} else {
- if (rcvObjects && !userBindings)
+ if (settings.rcvObjects && !settings.userBindings)
bindingKeyList.push_back("console.obj.#");
else
bindingKeyList.push_back("console.obj.*.*.org.apache.qpid.broker.agent");
- if (rcvEvents)
+ if (settings.rcvEvents)
bindingKeyList.push_back("console.event.#");
- if (rcvHeartbeats)
+ if (settings.rcvHeartbeats)
bindingKeyList.push_back("console.heartbeat");
}
}
@@ -356,8 +349,31 @@ void SessionManager::handleHeartbeatInd(Broker* /*broker*/, Buffer& /*inBuffer*/
{
}
-void SessionManager::handleEventInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/)
+void SessionManager::handleEventInd(Broker* broker, Buffer& buffer, uint32_t /*sequence*/)
{
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ SchemaClass* schemaClass;
+
+ buffer.getShortString(packageName);
+ buffer.getShortString(className);
+ buffer.getBin128(hash);
+
+ {
+ Mutex::ScopedLock l(lock);
+ map<string, Package*>::iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return;
+ schemaClass = pIter->second->getClass(className, hash);
+ if (schemaClass == 0)
+ return;
+ }
+
+ Event event(broker, schemaClass, buffer);
+
+ if (listener)
+ listener->event(event);
}
void SessionManager::handleSchemaResp(Broker* broker, Buffer& inBuffer, uint32_t sequence)
diff --git a/qpid/cpp/src/qpid/console/SessionManager.h b/qpid/cpp/src/qpid/console/SessionManager.h
index 3da6f98c86..27df00494c 100644
--- a/qpid/cpp/src/qpid/console/SessionManager.h
+++ b/qpid/cpp/src/qpid/console/SessionManager.h
@@ -30,6 +30,7 @@
#include "Agent.h"
#include "Object.h"
#include "ObjectId.h"
+#include "Value.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Condition.h"
#include "qpid/client/ConnectionSettings.h"
@@ -52,6 +53,19 @@ class SessionManager
typedef std::vector<ClassKey> KeyVector;
~SessionManager() {}
+ struct Settings {
+ bool rcvObjects;
+ bool rcvEvents;
+ bool rcvHeartbeats;
+ bool userBindings;
+ uint32_t methodTimeout;
+ uint32_t getTimeout;
+
+ Settings() : rcvObjects(true), rcvEvents(true), rcvHeartbeats(true), userBindings(false),
+ methodTimeout(20), getTimeout(20)
+ {}
+ };
+
/** Create a new SessionManager
*
* Provide your own subclass of ConsoleListener to receive updates and indications
@@ -61,24 +75,18 @@ class SessionManager
*@param rcvObjects Listener wishes to receive managed object data.
*@param rcvEvents Listener wishes to receive events.
*@param rcvHeartbeats Listener wishes to receive agent heartbeats.
- *@param manageConnections SessionManager should retry dropped connections.
*@param userBindings If rcvObjects is true, userBindings allows the console client
* to control which object classes are received. See the bindPackage and bindClass
* methods. If userBindings is false, the listener will receive updates for all
* object classes.
*/
SessionManager(ConsoleListener* listener = 0,
- bool rcvObjects = true,
- bool rcvEvents = true,
- bool rcvHeartbeats = true,
- bool manageConnections = false,
- bool userBindings = false);
+ Settings settings = Settings());
/** Connect a broker to the console session
*
*@param settings Connection settings for client access
*@return broker object if operation is successful
- *@exception If 'manageConnections' is disabled and the connection to the broker fails,
* an exception shall be thrown.
*/
Broker* addBroker(client::ConnectionSettings& settings);
@@ -166,11 +174,7 @@ private:
SequenceManager::set syncSequenceList;
Object::Vector getResult;
std::string error;
- bool rcvObjects;
- bool rcvEvents;
- bool rcvHeartbeats;
- bool userBindings;
- bool manageConnections;
+ Settings settings;
NameVector bindingKeyList;
void bindingKeys();
diff --git a/qpid/cpp/src/qpid/console/Value.cpp b/qpid/cpp/src/qpid/console/Value.cpp
index 3ef5d01ec3..532709ab05 100644
--- a/qpid/cpp/src/qpid/console/Value.cpp
+++ b/qpid/cpp/src/qpid/console/Value.cpp
@@ -117,33 +117,33 @@ MapValue::MapValue(framing::Buffer& buffer)
}
-Value* ValueFactory::newValue(int typeCode, framing::Buffer& buffer)
+Value::Ptr ValueFactory::newValue(int typeCode, framing::Buffer& buffer)
{
switch (typeCode) {
- case 1: return static_cast<Value*>(new UintValue(buffer.getOctet())); // U8
- case 2: return static_cast<Value*>(new UintValue(buffer.getShort())); // U16
- case 3: return static_cast<Value*>(new UintValue(buffer.getLong())); // U32
- case 4: return static_cast<Value*>(new Uint64Value(buffer.getLongLong())); // U64
- case 6: return static_cast<Value*>(new StringValue(buffer, 6)); // SSTR
- case 7: return static_cast<Value*>(new StringValue(buffer, 7)); // LSTR
- case 8: return static_cast<Value*>(new Int64Value(buffer.getLongLong())); // ABSTIME
- case 9: return static_cast<Value*>(new Uint64Value(buffer.getLongLong())); // DELTATIME
- case 10: return static_cast<Value*>(new RefValue(buffer)); // REF
- case 11: return static_cast<Value*>(new BoolValue(buffer.getOctet())); // BOOL
- case 12: return static_cast<Value*>(new FloatValue(buffer.getFloat())); // FLOAT
- case 13: return static_cast<Value*>(new DoubleValue(buffer.getDouble())); // DOUBLE
- case 14: return static_cast<Value*>(new UuidValue(buffer)); // UUID
- case 15: return static_cast<Value*>(new MapValue(buffer)); // MAP
- case 16: return static_cast<Value*>(new IntValue(buffer.getOctet())); // S8
- case 17: return static_cast<Value*>(new IntValue(buffer.getShort())); // S16
- case 18: return static_cast<Value*>(new IntValue(buffer.getLong())); // S32
- case 19: return static_cast<Value*>(new Int64Value(buffer.getLongLong())); // S64
+ case 1: return Value::Ptr(new UintValue(buffer.getOctet())); // U8
+ case 2: return Value::Ptr(new UintValue(buffer.getShort())); // U16
+ case 3: return Value::Ptr(new UintValue(buffer.getLong())); // U32
+ case 4: return Value::Ptr(new Uint64Value(buffer.getLongLong())); // U64
+ case 6: return Value::Ptr(new StringValue(buffer, 6)); // SSTR
+ case 7: return Value::Ptr(new StringValue(buffer, 7)); // LSTR
+ case 8: return Value::Ptr(new Int64Value(buffer.getLongLong())); // ABSTIME
+ case 9: return Value::Ptr(new Uint64Value(buffer.getLongLong())); // DELTATIME
+ case 10: return Value::Ptr(new RefValue(buffer)); // REF
+ case 11: return Value::Ptr(new BoolValue(buffer.getOctet())); // BOOL
+ case 12: return Value::Ptr(new FloatValue(buffer.getFloat())); // FLOAT
+ case 13: return Value::Ptr(new DoubleValue(buffer.getDouble())); // DOUBLE
+ case 14: return Value::Ptr(new UuidValue(buffer)); // UUID
+ case 15: return Value::Ptr(new MapValue(buffer)); // MAP
+ case 16: return Value::Ptr(new IntValue(buffer.getOctet())); // S8
+ case 17: return Value::Ptr(new IntValue(buffer.getShort())); // S16
+ case 18: return Value::Ptr(new IntValue(buffer.getLong())); // S32
+ case 19: return Value::Ptr(new Int64Value(buffer.getLongLong())); // S64
}
- return 0;
+ return Value::Ptr();
}
-void ValueFactory::encodeValue(int typeCode, Value* value, framing::Buffer& buffer)
+void ValueFactory::encodeValue(int typeCode, Value::Ptr value, framing::Buffer& buffer)
{
switch (typeCode) {
case 1: buffer.putOctet(value->asUint()); return; // U8
diff --git a/qpid/cpp/src/qpid/console/Value.h b/qpid/cpp/src/qpid/console/Value.h
index 42ff5661a2..5a0915c69b 100644
--- a/qpid/cpp/src/qpid/console/Value.h
+++ b/qpid/cpp/src/qpid/console/Value.h
@@ -25,6 +25,7 @@
#include "qpid/framing/Uuid.h"
#include "qpid/framing/FieldTable.h"
#include "ObjectId.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace framing {
@@ -38,6 +39,7 @@ namespace console {
class Value {
public:
+ typedef boost::shared_ptr<Value> Ptr;
virtual ~Value() {}
virtual std::string str() const = 0;
@@ -196,8 +198,8 @@ namespace console {
class ValueFactory {
public:
- static Value* newValue(int typeCode, framing::Buffer& buffer);
- static void encodeValue(int typeCode, Value* value, framing::Buffer& buffer);
+ static Value::Ptr newValue(int typeCode, framing::Buffer& buffer);
+ static void encodeValue(int typeCode, Value::Ptr value, framing::Buffer& buffer);
};
}
}