summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-08-28 22:03:26 +0000
committerTed Ross <tross@apache.org>2009-08-28 22:03:26 +0000
commit72cf689b75c8ccd68ba34351e3aba9cafad9c891 (patch)
treeb6ed34dfb19c95c408c99d3d74ffc5e204529ebf /qpid/cpp
parentd0139eba311c380b06ba433e31576899441591e0 (diff)
downloadqpid-python-72cf689b75c8ccd68ba34351e3aba9cafad9c891.tar.gz
Major work in the QMF engine.
- The console framework now establishes connectivity with the broker. - The Ruby binding for console is tracking the engine development. - Overall improvements (thread safety in Ruby, etc.) have been added. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@809042 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/bindings/qmf/qmfengine.i4
-rw-r--r--qpid/cpp/bindings/qmf/ruby/qmf.rb298
-rwxr-xr-xqpid/cpp/bindings/qmf/tests/agent_ruby.rb10
-rwxr-xr-xqpid/cpp/bindings/qmf/tests/python_console.py12
-rwxr-xr-xqpid/cpp/bindings/qmf/tests/ruby_console.rb43
-rwxr-xr-xqpid/cpp/bindings/qmf/tests/run_interop_tests13
-rw-r--r--qpid/cpp/src/qmf.mk1
-rw-r--r--qpid/cpp/src/qmf/AgentEngine.cpp44
-rw-r--r--qpid/cpp/src/qmf/AgentEngine.h4
-rw-r--r--qpid/cpp/src/qmf/ConsoleEngine.cpp488
-rw-r--r--qpid/cpp/src/qmf/ConsoleEngine.h178
-rw-r--r--qpid/cpp/src/qmf/ObjectImpl.cpp6
-rw-r--r--qpid/cpp/src/qmf/ResilientConnection.cpp22
-rw-r--r--qpid/cpp/src/qmf/ResilientConnection.h11
-rw-r--r--qpid/cpp/src/qmf/Schema.h23
-rw-r--r--qpid/cpp/src/qmf/SchemaImpl.cpp75
-rw-r--r--qpid/cpp/src/qmf/SchemaImpl.h27
17 files changed, 1096 insertions, 163 deletions
diff --git a/qpid/cpp/bindings/qmf/qmfengine.i b/qpid/cpp/bindings/qmf/qmfengine.i
index 8ae28730e5..d3500c9b8f 100644
--- a/qpid/cpp/bindings/qmf/qmfengine.i
+++ b/qpid/cpp/bindings/qmf/qmfengine.i
@@ -20,7 +20,8 @@
%{
#include "qmf/AgentEngine.h"
-#include <qmf/ResilientConnection.h>
+#include "qmf/ConsoleEngine.h"
+#include "qmf/ResilientConnection.h"
%}
@@ -28,6 +29,7 @@
%include <qmf/Query.h>
%include <qmf/Message.h>
%include <qmf/AgentEngine.h>
+%include <qmf/ConsoleEngine.h>
%include <qmf/ConnectionSettings.h>
%include <qmf/ResilientConnection.h>
%include <qmf/Typecode.h>
diff --git a/qpid/cpp/bindings/qmf/ruby/qmf.rb b/qpid/cpp/bindings/qmf/ruby/qmf.rb
index badd53942d..38c42b5aa7 100644
--- a/qpid/cpp/bindings/qmf/ruby/qmf.rb
+++ b/qpid/cpp/bindings/qmf/ruby/qmf.rb
@@ -20,6 +20,7 @@
require 'qmfengine'
require 'thread'
require 'socket'
+require 'monitor'
module Qmf
@@ -70,39 +71,18 @@ module Qmf
def sess_event_recv(context, message); end
end
- class Query
- attr_reader :impl
- def initialize(i)
- @impl = i
- end
-
- def package_name
- @impl.getPackage
- end
-
- def class_name
- @impl.getClass
- end
-
- def object_id
- objid = @impl.getObjectId
- if objid.class == NilClass
- return nil
- end
- return ObjectId.new(objid)
- end
- end
-
class Connection
+ include MonitorMixin
+
attr_reader :impl
def initialize(settings)
+ super()
@impl = Qmfengine::ResilientConnection.new(settings.impl)
@sockEngine, @sock = Socket::socketpair(Socket::PF_UNIX, Socket::SOCK_STREAM, 0)
@impl.setNotifyFd(@sockEngine.fileno)
@new_conn_handlers = Array.new
@conn_handlers = Array.new
- @sess_handlers = Array.new
@thread = Thread.new do
run
@@ -110,47 +90,56 @@ module Qmf
end
def add_conn_handler(handler)
- @new_conn_handlers.push(handler)
+ synchronize do
+ @new_conn_handlers.push(handler)
+ end
@sockEngine.write("x")
end
- def add_sess_handler(handler)
- @sess_handlers.push(handler)
- end
-
def run()
- event = Qmfengine::ResilientConnectionEvent.new
+ eventImpl = Qmfengine::ResilientConnectionEvent.new
connected = nil
+ new_handlers = nil
+ bt_count = 0
+
while :true
@sock.read(1)
- @new_conn_handlers.each do |nh|
+ synchronize do
+ new_handlers = @new_conn_handlers
+ @new_conn_handlers = Array.new
+ end
+
+ new_handlers.each do |nh|
@conn_handlers.push(nh)
nh.conn_event_connected() if connected
end
- @new_conn_handlers = Array.new
+ new_handlers = nil
- valid = @impl.getEvent(event)
+ valid = @impl.getEvent(eventImpl)
while valid
begin
- case event.kind
+ case eventImpl.kind
when Qmfengine::ResilientConnectionEvent::CONNECTED
connected = :true
@conn_handlers.each { |h| h.conn_event_connected() }
when Qmfengine::ResilientConnectionEvent::DISCONNECTED
connected = nil
- @conn_handlers.each { |h| h.conn_event_disconnected(event.errorText) }
+ @conn_handlers.each { |h| h.conn_event_disconnected(eventImpl.errorText) }
when Qmfengine::ResilientConnectionEvent::SESSION_CLOSED
- event.sessionContext.handler.sess_event_session_closed(event.sessionContext, event.errorText)
+ eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText)
when Qmfengine::ResilientConnectionEvent::RECV
- event.sessionContext.handler.sess_event_recv(event.sessionContext, event.message)
+ eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message)
end
rescue Exception => ex
puts "Event Exception: #{ex}"
- puts ex.backtrace
+ if bt_count < 2
+ puts ex.backtrace
+ bt_count += 1
+ end
end
@impl.popEvent
- valid = @impl.getEvent(event)
+ valid = @impl.getEvent(eventImpl)
end
end
end
@@ -164,9 +153,12 @@ module Qmf
@label = label
@handler = handler
@handle = Qmfengine::SessionHandle.new
- @conn.add_sess_handler(@handler)
result = @conn.impl.createSession(label, self, @handle)
end
+
+ def destroy()
+ @conn.impl.destroySession(@handle)
+ end
end
##==============================================================================
@@ -262,6 +254,30 @@ module Qmf
end
end
+ class ConsoleObject < QmfObject
+ attr_reader :current_time, :create_time, :delete_time
+
+ def initialize(cls)
+ super(cls)
+ end
+
+ def update()
+ end
+
+ def mergeUpdate(newObject)
+ end
+
+ def deleted?()
+ @delete_time > 0
+ end
+
+ def index()
+ end
+
+ def method_missing(name, *args)
+ end
+ end
+
class ObjectId
attr_reader :impl
def initialize(impl=nil)
@@ -357,6 +373,29 @@ module Qmf
end
end
+ class Query
+ attr_reader :impl
+ def initialize(i)
+ @impl = i
+ end
+
+ def package_name
+ @impl.getPackage
+ end
+
+ def class_name
+ @impl.getClass
+ end
+
+ def object_id
+ objid = @impl.getObjectId
+ if objid.class == NilClass
+ return nil
+ end
+ return ObjectId.new(objid)
+ end
+ end
+
##==============================================================================
## SCHEMA
##==============================================================================
@@ -410,6 +449,21 @@ module Qmf
end
end
+ class SchemaClassKey
+ attr_reader :impl
+ def initialize(i)
+ @impl = i
+ end
+
+ def get_package()
+ @impl.getPackageName()
+ end
+
+ def get_class()
+ @impl.getClassName()
+ end
+ end
+
class SchemaObjectClass
attr_reader :impl
def initialize(package, name, kwargs={})
@@ -467,6 +521,170 @@ module Qmf
## CONSOLE
##==============================================================================
+ class ConsoleHandler
+ def agent_added(agent); end
+ def agent_deleted(agent); end
+ def new_package(package); end
+ def new_class(class_key); end
+ def object_update(object, hasProps, hasStats); end
+ def event_received(event); end
+ def agent_heartbeat(agent, timestamp); end
+ def method_response(resp); end
+ def broker_info(broker); end
+ end
+
+ class Console
+ attr_reader :impl
+
+ def initialize(handler, kwargs={})
+ @handler = handler
+ @impl = Qmfengine::ConsoleEngine.new
+ @event = Qmfengine::ConsoleEvent.new
+ @broker_list = Array.new
+ end
+
+ def add_connection(conn)
+ broker = Broker.new(self, conn)
+ @broker_list.push(broker)
+ return broker
+ end
+
+ def del_connection(broker)
+ end
+
+ def get_packages()
+ end
+
+ def get_classes(package)
+ end
+
+ def get_schema(class_key)
+ end
+
+ def bind_package(package)
+ end
+
+ def bind_class(kwargs = {})
+ end
+
+ def get_agents(broker = nil)
+ end
+
+ def get_objects(query, kwargs = {})
+ end
+
+ def start_sync(query)
+ end
+
+ def touch_sync(sync)
+ end
+
+ def end_sync(sync)
+ end
+
+ def do_console_events()
+ count = 0
+ valid = @impl.getEvent(@event)
+ while valid
+ count += 1
+ case @event.kind
+ when Qmfengine::ConsoleEvent::AGENT_ADDED
+ when Qmfengine::ConsoleEvent::AGENT_DELETED
+ when Qmfengine::ConsoleEvent::NEW_PACKAGE
+ when Qmfengine::ConsoleEvent::NEW_CLASS
+ when Qmfengine::ConsoleEvent::OBJECT_UPDATE
+ when Qmfengine::ConsoleEvent::EVENT_RECEIVED
+ when Qmfengine::ConsoleEvent::AGENT_HEARTBEAT
+ when Qmfengine::ConsoleEvent::METHOD_RESPONSE
+ end
+ @impl.popEvent
+ valid = @impl.getEvent(@event)
+ end
+ return count
+ end
+ end
+
+ class Broker < ConnectionHandler
+ attr_reader :impl
+
+ def initialize(console, conn)
+ @console = console
+ @conn = conn
+ @session = nil
+ @event = Qmfengine::BrokerEvent.new
+ @xmtMessage = Qmfengine::Message.new
+ @impl = Qmfengine::BrokerProxy.new(@console.impl)
+ @console.impl.addConnection(@impl, self)
+ @conn.add_conn_handler(self)
+ end
+
+ def do_broker_events()
+ count = 0
+ valid = @impl.getEvent(@event)
+ while valid
+ count += 1
+ puts "Broker Event: #{@event.kind}"
+ case @event.kind
+ when Qmfengine::BrokerEvent::BROKER_INFO
+ when Qmfengine::BrokerEvent::DECLARE_QUEUE
+ @conn.impl.declareQueue(@session.handle, @event.name)
+ when Qmfengine::BrokerEvent::DELETE_QUEUE
+ @conn.impl.deleteQueue(@session.handle, @event.name)
+ when Qmfengine::BrokerEvent::BIND
+ @conn.impl.bind(@session.handle, @event.exchange, @event.name, @event.bindingKey)
+ when Qmfengine::BrokerEvent::UNBIND
+ @conn.impl.unbind(@session.handle, @event.exchange, @event.name, @event.bindingKey)
+ when Qmfengine::BrokerEvent::SETUP_COMPLETE
+ @impl.startProtocol
+ end
+ @impl.popEvent
+ valid = @impl.getEvent(@event)
+ end
+ return count
+ end
+
+ def do_broker_messages()
+ count = 0
+ valid = @impl.getXmtMessage(@xmtMessage)
+ while valid
+ count += 1
+ @conn.impl.sendMessage(@session.handle, @xmtMessage)
+ @impl.popXmt
+ valid = @impl.getXmtMessage(@xmtMessage)
+ end
+ return count
+ end
+
+ def do_events()
+ begin
+ ccnt = @console.do_console_events
+ bcnt = do_broker_events
+ mcnt = do_broker_messages
+ end until ccnt == 0 and bcnt == 0 and mcnt == 0
+ end
+
+ def conn_event_connected()
+ puts "Console Connection Established..."
+ @session = Session.new(@conn, "qmfc-%s.%d" % [Socket.gethostname, Process::pid], self)
+ @impl.sessionOpened(@session.handle)
+ do_events
+ end
+
+ def conn_event_disconnected(error)
+ puts "Console Connection Lost"
+ end
+
+ def sess_event_session_closed(context, error)
+ puts "Console Session Lost"
+ @impl.sessionClosed()
+ end
+
+ def sess_event_recv(context, message)
+ @impl.handleRcvMessage(message)
+ do_events
+ end
+ end
+
##==============================================================================
## AGENT
##==============================================================================
diff --git a/qpid/cpp/bindings/qmf/tests/agent_ruby.rb b/qpid/cpp/bindings/qmf/tests/agent_ruby.rb
index 0f85c90f37..75de2b5fa1 100755
--- a/qpid/cpp/bindings/qmf/tests/agent_ruby.rb
+++ b/qpid/cpp/bindings/qmf/tests/agent_ruby.rb
@@ -53,7 +53,10 @@ class Model
method = Qmf::SchemaMethod.new("create_child", :desc => "Create a new child object")
method.add_argument(Qmf::SchemaArgument.new("child_name", Qmf::TYPE_LSTR, :dir => Qmf::DIR_IN))
method.add_argument(Qmf::SchemaArgument.new("child_ref", Qmf::TYPE_REF, :dir => Qmf::DIR_OUT))
+ @parent_class.add_method(method)
+ method = Qmf::SchemaMethod.new("probe_userid", :desc => "Return the user-id for this method call")
+ method.add_argument(Qmf::SchemaArgument.new("userid", Qmf::TYPE_SSTR, :dir => Qmf::DIR_OUT))
@parent_class.add_method(method)
@child_class = Qmf::SchemaObjectClass.new("org.apache.qpid.qmf", "child")
@@ -136,6 +139,13 @@ class App < Qmf::AgentHandler
@child.set_attr("name", args.by_key("child_name"))
@child.set_object_id(oid)
@agent.method_response(context, 0, "OK", args)
+
+ elsif name == "probe_userid"
+ args['userid'] = userId
+ @agent.method_response(context, 0, "OK", args)
+
+ else
+ @agent.method_response(context, 1, "Unimplemented Method: #{name}", args)
end
end
diff --git a/qpid/cpp/bindings/qmf/tests/python_console.py b/qpid/cpp/bindings/qmf/tests/python_console.py
index 365f2ac33a..bcd3063fe3 100755
--- a/qpid/cpp/bindings/qmf/tests/python_console.py
+++ b/qpid/cpp/bindings/qmf/tests/python_console.py
@@ -128,6 +128,18 @@ class QmfInteropTests(TestBase010):
self.assertEqual(parent.int16val, -1000)
self.assertEqual(parent.int8val, -100)
+ def test_D_userid_for_method(self):
+ self.startQmf();
+ qmf = self.qmf
+
+ parents = qmf.getObjects(_class="parent")
+ self.assertEqual(len(parents), 1)
+ parent = parents[0]
+
+ result = parent.probe_userid()
+ self.assertEqual(result.status, 0)
+ self.assertEqual(result.userid, "guest")
+
def getProperty(self, msg, name):
for h in msg.headers:
if hasattr(h, name): return getattr(h, name)
diff --git a/qpid/cpp/bindings/qmf/tests/ruby_console.rb b/qpid/cpp/bindings/qmf/tests/ruby_console.rb
new file mode 100755
index 0000000000..c7ee9c3686
--- /dev/null
+++ b/qpid/cpp/bindings/qmf/tests/ruby_console.rb
@@ -0,0 +1,43 @@
+#!/usr/bin/ruby
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'qmf'
+require 'socket'
+
+class App < Qmf::ConsoleHandler
+
+ def main
+ @settings = Qmf::ConnectionSettings.new
+ @settings.set_attr("host", ARGV[0]) if ARGV.size > 0
+ @settings.set_attr("port", ARGV[1].to_i) if ARGV.size > 1
+ @connection = Qmf::Connection.new(@settings)
+ @qmf = Qmf::Console.new(self)
+
+ @qmf.add_connection(@connection)
+
+ sleep
+ end
+end
+
+app = App.new
+app.main
+
+
diff --git a/qpid/cpp/bindings/qmf/tests/run_interop_tests b/qpid/cpp/bindings/qmf/tests/run_interop_tests
index e6fc872dbb..f3f78185c7 100755
--- a/qpid/cpp/bindings/qmf/tests/run_interop_tests
+++ b/qpid/cpp/bindings/qmf/tests/run_interop_tests
@@ -54,12 +54,23 @@ if test -d ${PYTHON_DIR} ; then
echo "Running qmf interop tests using broker on port $BROKER_PORT"
PYTHONPATH=${PYTHON_DIR}:${MY_DIR}
export PYTHONPATH
- echo " Ruby Agent vs. Pure-Python Console"
+ echo " Ruby Agent (external storage) vs. Pure-Python Console"
start_ruby_agent
echo " Ruby agent started at pid $AGENT_PID"
${PYTHON_DIR}/qpid-python-test -m python_console -b localhost:$BROKER_PORT $@
RETCODE=$?
stop_ruby_agent
+
+ # Also against the Pure-Python console:
+ # Ruby agent (internal storage)
+ # Python agent (external and internal)
+ # C++ agent (external and internal)
+ #
+ # Other consoles against the same set of agents:
+ # Wrapped Python console
+ # Ruby console
+ # C++ console
+
stop_broker
if test x$RETCODE != x0; then
echo "FAIL qmf interop tests"; exit 1;
diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk
index 9d24709b6e..a9c2f24922 100644
--- a/qpid/cpp/src/qmf.mk
+++ b/qpid/cpp/src/qmf.mk
@@ -37,6 +37,7 @@ nobase_include_HEADERS += \
libqmfcommon_la_SOURCES = \
qmf/ConnectionSettingsImpl.cpp \
qmf/ConnectionSettingsImpl.h \
+ qmf/ConsoleEngine.cpp \
qmf/ConsoleEngine.h \
qmf/Event.h \
qmf/Message.h \
diff --git a/qpid/cpp/src/qmf/AgentEngine.cpp b/qpid/cpp/src/qmf/AgentEngine.cpp
index bef8b3d102..ec5b117337 100644
--- a/qpid/cpp/src/qmf/AgentEngine.cpp
+++ b/qpid/cpp/src/qmf/AgentEngine.cpp
@@ -85,9 +85,9 @@ namespace qmf {
void setStoreDir(const char* path);
void setTransferDir(const char* path);
void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item);
+ bool getXmtMessage(Message& item) const;
void popXmt();
- bool getEvent(AgentEvent& event);
+ bool getEvent(AgentEvent& event) const;
void popEvent();
void newSession();
void startProtocol();
@@ -103,7 +103,7 @@ namespace qmf {
void raiseEvent(Event& event);
private:
- Mutex lock;
+ mutable Mutex lock;
Mutex addLock;
string label;
string queueName;
@@ -134,13 +134,13 @@ namespace qmf {
# define MA_BUFFER_SIZE 65536
char outputBuffer[MA_BUFFER_SIZE];
- struct SchemaClassKey {
+ struct AgentClassKey {
string name;
uint8_t hash[16];
- SchemaClassKey(const string& n, const uint8_t* h) : name(n) {
+ AgentClassKey(const string& n, const uint8_t* h) : name(n) {
memcpy(hash, h, 16);
}
- SchemaClassKey(Buffer& buffer) {
+ AgentClassKey(Buffer& buffer) {
buffer.getShortString(name);
buffer.getBin128(hash);
}
@@ -149,8 +149,8 @@ namespace qmf {
}
};
- struct SchemaClassKeyComp {
- bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ struct AgentClassKeyComp {
+ bool operator() (const AgentClassKey& lhs, const AgentClassKey& rhs) const
{
if (lhs.name != rhs.name)
return lhs.name < rhs.name;
@@ -162,8 +162,8 @@ namespace qmf {
}
};
- typedef map<SchemaClassKey, SchemaObjectClassImpl*, SchemaClassKeyComp> ObjectClassMap;
- typedef map<SchemaClassKey, SchemaEventClassImpl*, SchemaClassKeyComp> EventClassMap;
+ typedef map<AgentClassKey, SchemaObjectClassImpl*, AgentClassKeyComp> ObjectClassMap;
+ typedef map<AgentClassKey, SchemaEventClassImpl*, AgentClassKeyComp> EventClassMap;
struct ClassMaps {
ObjectClassMap objectClasses;
@@ -185,7 +185,7 @@ namespace qmf {
void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
void sendPackageIndicationLH(const string& packageName);
- void sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key);
+ void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
uint32_t code = 0, const string& text = "OK");
void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
@@ -277,7 +277,7 @@ void AgentEngineImpl::handleRcvMessage(Message& message)
}
}
-bool AgentEngineImpl::getXmtMessage(Message& item)
+bool AgentEngineImpl::getXmtMessage(Message& item) const
{
Mutex::ScopedLock _lock(lock);
if (xmtQueue.empty())
@@ -293,7 +293,7 @@ void AgentEngineImpl::popXmt()
xmtQueue.pop_front();
}
-bool AgentEngineImpl::getEvent(AgentEvent& event)
+bool AgentEngineImpl::getEvent(AgentEvent& event) const
{
Mutex::ScopedLock _lock(lock);
if (eventQueue.empty())
@@ -423,11 +423,11 @@ void AgentEngineImpl::registerClass(SchemaObjectClass* cls)
map<string, ClassMaps>::iterator iter = packages.find(impl->package);
if (iter == packages.end()) {
packages[impl->package] = ClassMaps();
- iter = packages.find(impl->package);
+ iter = packages.find(impl->getClassKey()->getPackageName());
// TODO: Indicate this package if connected
}
- SchemaClassKey key(impl->name, impl->getHash());
+ AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash());
iter->second.objectClasses[key] = impl;
// TODO: Indicate this schema if connected.
@@ -441,11 +441,11 @@ void AgentEngineImpl::registerClass(SchemaEventClass* cls)
map<string, ClassMaps>::iterator iter = packages.find(impl->package);
if (iter == packages.end()) {
packages[impl->package] = ClassMaps();
- iter = packages.find(impl->package);
+ iter = packages.find(impl->getClassKey()->getPackageName());
// TODO: Indicate this package if connected
}
- SchemaClassKey key(impl->name, impl->getHash());
+ AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash());
iter->second.eventClasses[key] = impl;
// TODO: Indicate this schema if connected.
@@ -576,7 +576,7 @@ void AgentEngineImpl::sendPackageIndicationLH(const string& packageName)
QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
}
-void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key)
+void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
encodeHeader(buffer, 'q');
@@ -690,7 +690,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
string rKey(replyKey);
string packageName;
inBuffer.getShortString(packageName);
- SchemaClassKey key(inBuffer);
+ AgentClassKey key(inBuffer);
if (rExchange.empty())
rExchange = QMF_EXCHANGE;
@@ -791,7 +791,7 @@ void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, con
ObjectIdImpl* oidImpl = new ObjectIdImpl(buffer);
boost::shared_ptr<ObjectId> oid(oidImpl->envelope);
buffer.getShortString(pname);
- SchemaClassKey classKey(buffer);
+ AgentClassKey classKey(buffer);
buffer.getShortString(method);
map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
@@ -876,7 +876,7 @@ void AgentEngine::handleRcvMessage(Message& message)
impl->handleRcvMessage(message);
}
-bool AgentEngine::getXmtMessage(Message& item)
+bool AgentEngine::getXmtMessage(Message& item) const
{
return impl->getXmtMessage(item);
}
@@ -886,7 +886,7 @@ void AgentEngine::popXmt()
impl->popXmt();
}
-bool AgentEngine::getEvent(AgentEvent& event)
+bool AgentEngine::getEvent(AgentEvent& event) const
{
return impl->getEvent(event);
}
diff --git a/qpid/cpp/src/qmf/AgentEngine.h b/qpid/cpp/src/qmf/AgentEngine.h
index d18a104e96..bbfbada80c 100644
--- a/qpid/cpp/src/qmf/AgentEngine.h
+++ b/qpid/cpp/src/qmf/AgentEngine.h
@@ -101,7 +101,7 @@ namespace qmf {
*@param item The Message structure describing the message to be produced.
*@return true if the Message is valid, false if there are no messages to send.
*/
- bool getXmtMessage(Message& item);
+ bool getXmtMessage(Message& item) const;
/**
* Remove and discard one message from the head of the transmit queue.
@@ -113,7 +113,7 @@ namespace qmf {
*@param event The event iff the return value is true
*@return true if event is valid, false if there are no events to process
*/
- bool getEvent(AgentEvent& event);
+ bool getEvent(AgentEvent& event) const;
/**
* Remove and discard one event from the head of the event queue.
diff --git a/qpid/cpp/src/qmf/ConsoleEngine.cpp b/qpid/cpp/src/qmf/ConsoleEngine.cpp
new file mode 100644
index 0000000000..28b2852d67
--- /dev/null
+++ b/qpid/cpp/src/qmf/ConsoleEngine.cpp
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/ConsoleEngine.h"
+#include "qmf/MessageImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/Typecode.h"
+#include "qmf/ObjectImpl.h"
+#include "qmf/ObjectIdImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/ValueImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/Time.h>
+#include <string.h>
+#include <string>
+#include <deque>
+#include <map>
+#include <iostream>
+#include <fstream>
+#include <boost/shared_ptr.hpp>
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace qmf {
+
+ struct MethodResponseImpl {
+ typedef boost::shared_ptr<MethodResponseImpl> Ptr;
+ MethodResponse* envelope;
+ uint32_t status;
+ auto_ptr<Value> exception;
+ auto_ptr<Value> arguments;
+
+ MethodResponseImpl(Buffer& buf);
+ ~MethodResponseImpl() {}
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ const Value* getArgs() const { return arguments.get(); }
+ };
+
+ struct ConsoleEventImpl {
+ typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
+ ConsoleEvent::EventKind kind;
+ boost::shared_ptr<AgentProxyImpl> agent;
+ string name;
+ boost::shared_ptr<SchemaClassKey> classKey;
+ Object* object;
+ void* context;
+ Event* event;
+ uint64_t timestamp;
+ uint32_t methodHandle;
+ MethodResponseImpl::Ptr methodResponse;
+
+ ConsoleEventImpl(ConsoleEvent::EventKind k) :
+ kind(k), object(0), context(0), event(0), timestamp(0), methodHandle(0) {}
+ ~ConsoleEventImpl() {}
+ ConsoleEvent copy();
+ };
+
+ struct BrokerEventImpl {
+ typedef boost::shared_ptr<BrokerEventImpl> Ptr;
+ BrokerEvent::EventKind kind;
+ string name;
+ string exchange;
+ string bindingKey;
+
+ BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {}
+ ~BrokerEventImpl() {}
+ BrokerEvent copy();
+ };
+
+ struct BrokerProxyImpl {
+ typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
+ mutable Mutex lock;
+ BrokerProxy* envelope;
+ ConsoleEngineImpl* console;
+ string queueName;
+ deque<MessageImpl::Ptr> xmtQueue;
+ deque<BrokerEventImpl::Ptr> eventQueue;
+
+ static const char* QMF_EXCHANGE;
+ static const char* DIR_EXCHANGE;
+ static const char* BROKER_KEY;
+
+ BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console);
+ ~BrokerProxyImpl() {}
+
+ void sessionOpened(SessionHandle& sh);
+ void sessionClosed();
+ void startProtocol();
+
+ void handleRcvMessage(Message& message);
+ bool getXmtMessage(Message& item) const;
+ void popXmt();
+
+ bool getEvent(BrokerEvent& event) const;
+ void popEvent();
+
+ BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName);
+ BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
+ BrokerEventImpl::Ptr eventSetupComplete();
+ };
+
+ struct AgentProxyImpl {
+ typedef boost::shared_ptr<AgentProxyImpl> Ptr;
+ AgentProxy* envelope;
+ ConsoleEngineImpl* console;
+
+ AgentProxyImpl(AgentProxy* e, ConsoleEngine& _console) :
+ envelope(e), console(_console.impl) {}
+ ~AgentProxyImpl() {}
+ };
+
+ class ConsoleEngineImpl {
+ public:
+ ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings());
+ ~ConsoleEngineImpl();
+
+ void handleRcvMessage(BrokerProxy& broker, Message& message);
+ bool getXmtMessage(Message& item) const;
+ void popXmt();
+
+ bool getEvent(ConsoleEvent& event) const;
+ void popEvent();
+
+ void addConnection(BrokerProxy& broker, void* context);
+ void delConnection(BrokerProxy& broker);
+
+ uint32_t packageCount() const;
+ const string& getPackageName(uint32_t idx) const;
+
+ uint32_t classCount(const char* packageName) const;
+ const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
+
+ ClassKind getClassKind(const SchemaClassKey& key) const;
+ const SchemaObjectClass* getObjectClass(const SchemaClassKey& key) const;
+ const SchemaEventClass* getEventClass(const SchemaClassKey& key) const;
+
+ void bindPackage(const char* packageName);
+ void bindClass(const SchemaClassKey& key);
+ void bindClass(const char* packageName, const char* className);
+
+ uint32_t agentCount() const;
+ const AgentProxy* getAgent(uint32_t idx) const;
+
+ void sendQuery(const Query& query, void* context);
+
+ /*
+ void startSync(const Query& query, void* context, SyncQuery& sync);
+ void touchSync(SyncQuery& sync);
+ void endSync(SyncQuery& sync);
+ */
+
+ private:
+ ConsoleEngine* envelope;
+ const ConsoleSettings& settings;
+ mutable Mutex lock;
+ deque<ConsoleEventImpl::Ptr> eventQueue;
+ };
+}
+
+const char* BrokerProxyImpl::QMF_EXCHANGE = "qpid.management";
+const char* BrokerProxyImpl::DIR_EXCHANGE = "amq.direct";
+const char* BrokerProxyImpl::BROKER_KEY = "broker";
+
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+ConsoleEvent ConsoleEventImpl::copy()
+{
+ ConsoleEvent item;
+
+ ::memset(&item, 0, sizeof(ConsoleEvent));
+ item.kind = kind;
+ item.agent = agent.get() ? agent->envelope : 0;
+ item.classKey = classKey.get();
+ item.object = object;
+ item.context = context;
+ item.event = event;
+ item.timestamp = timestamp;
+ item.methodHandle = methodHandle;
+ item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0;
+
+ STRING_REF(name);
+
+ return item;
+}
+
+BrokerEvent BrokerEventImpl::copy()
+{
+ BrokerEvent item;
+
+ ::memset(&item, 0, sizeof(BrokerEvent));
+ item.kind = kind;
+
+ STRING_REF(name);
+ STRING_REF(exchange);
+ STRING_REF(bindingKey);
+
+ return item;
+}
+
+BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
+ envelope(e), console(_console.impl), queueName("qmfc-")
+{
+ // TODO: Give the queue name a unique suffix
+}
+
+void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
+{
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.clear();
+ xmtQueue.clear();
+ eventQueue.push_back(eventDeclareQueue(queueName));
+ eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
+ eventQueue.push_back(eventSetupComplete());
+
+ // TODO: Store session handle
+}
+
+void BrokerProxyImpl::sessionClosed()
+{
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.clear();
+ xmtQueue.clear();
+}
+
+void BrokerProxyImpl::startProtocol()
+{
+ cout << "BrokerProxyImpl::startProtocol" << endl;
+}
+
+void BrokerProxyImpl::handleRcvMessage(Message& /*message*/)
+{
+ // TODO: Dispatch the messages types
+}
+
+bool BrokerProxyImpl::getXmtMessage(Message& item) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (xmtQueue.empty())
+ return false;
+ item = xmtQueue.front()->copy();
+ return true;
+}
+
+void BrokerProxyImpl::popXmt()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!xmtQueue.empty())
+ xmtQueue.pop_front();
+}
+
+bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void BrokerProxyImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
+ event->name = queueName;
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
+ event->name = queue;
+ event->exchange = exchange;
+ event->bindingKey = key;
+
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
+ return event;
+}
+
+MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this))
+{
+ string text;
+
+ status = buf.getLong();
+ buf.getMediumString(text);
+ exception.reset(new Value(TYPE_LSTR));
+ exception->setString(text.c_str());
+
+ // TODO: Parse schema-specific output arguments.
+ arguments.reset(new Value(TYPE_MAP));
+}
+
+ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
+ envelope(e), settings(s)
+{
+}
+
+ConsoleEngineImpl::~ConsoleEngineImpl()
+{
+ // This function intentionally left blank.
+}
+
+bool ConsoleEngineImpl::getEvent(ConsoleEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void ConsoleEngineImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+void ConsoleEngineImpl::addConnection(BrokerProxy& /*broker*/, void* /*context*/)
+{
+}
+
+void ConsoleEngineImpl::delConnection(BrokerProxy& /*broker*/)
+{
+}
+
+uint32_t ConsoleEngineImpl::packageCount() const
+{
+ return 0;
+}
+
+const string& ConsoleEngineImpl::getPackageName(uint32_t /*idx*/) const
+{
+ static string temp;
+ return temp;
+}
+
+uint32_t ConsoleEngineImpl::classCount(const char* /*packageName*/) const
+{
+ return 0;
+}
+
+const SchemaClassKey* ConsoleEngineImpl::getClass(const char* /*packageName*/, uint32_t /*idx*/) const
+{
+ return 0;
+}
+
+ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey& /*key*/) const
+{
+ return CLASS_OBJECT;
+}
+
+const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey& /*key*/) const
+{
+ return 0;
+}
+
+const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey& /*key*/) const
+{
+ return 0;
+}
+
+void ConsoleEngineImpl::bindPackage(const char* /*packageName*/)
+{
+}
+
+void ConsoleEngineImpl::bindClass(const SchemaClassKey& /*key*/)
+{
+}
+
+void ConsoleEngineImpl::bindClass(const char* /*packageName*/, const char* /*className*/)
+{
+}
+
+uint32_t ConsoleEngineImpl::agentCount() const
+{
+ return 0;
+}
+
+const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const
+{
+ return 0;
+}
+
+void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/)
+{
+}
+
+/*
+void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync)
+{
+}
+
+void ConsoleEngineImpl::touchSync(SyncQuery& sync)
+{
+}
+
+void ConsoleEngineImpl::endSync(SyncQuery& sync)
+{
+}
+*/
+
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
+BrokerProxy::~BrokerProxy() { delete impl; }
+void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
+void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
+void BrokerProxy::startProtocol() { impl->startProtocol(); }
+void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void BrokerProxy::popXmt() { impl->popXmt(); }
+bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
+void BrokerProxy::popEvent() { impl->popEvent(); }
+
+AgentProxy::AgentProxy(ConsoleEngine& console) : impl(new AgentProxyImpl(this, console)) {}
+AgentProxy::~AgentProxy() { delete impl; }
+
+MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
+MethodResponse::~MethodResponse() { delete impl; } // TODO: correct to delete here?
+uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
+const Value* MethodResponse::getException() const { return impl->getException(); }
+const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
+
+ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {}
+ConsoleEngine::~ConsoleEngine() { delete impl; }
+bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
+void ConsoleEngine::popEvent() { impl->popEvent(); }
+void ConsoleEngine::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); }
+void ConsoleEngine::delConnection(BrokerProxy& broker) { impl->delConnection(broker); }
+uint32_t ConsoleEngine::packageCount() const { return impl->packageCount(); }
+const char* ConsoleEngine::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); }
+uint32_t ConsoleEngine::classCount(const char* packageName) const { return impl->classCount(packageName); }
+const SchemaClassKey* ConsoleEngine::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); }
+ClassKind ConsoleEngine::getClassKind(const SchemaClassKey& key) const { return impl->getClassKind(key); }
+const SchemaObjectClass* ConsoleEngine::getObjectClass(const SchemaClassKey& key) const { return impl->getObjectClass(key); }
+const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey& key) const { return impl->getEventClass(key); }
+void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
+void ConsoleEngine::bindClass(const SchemaClassKey& key) { impl->bindClass(key); }
+void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
+uint32_t ConsoleEngine::agentCount() const { return impl->agentCount(); }
+const AgentProxy* ConsoleEngine::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
+void ConsoleEngine::sendQuery(const Query& query, void* context) { impl->sendQuery(query, context); }
+//void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); }
+//void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); }
+//void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); }
+
+
diff --git a/qpid/cpp/src/qmf/ConsoleEngine.h b/qpid/cpp/src/qmf/ConsoleEngine.h
index 823e281b14..4ee52f6114 100644
--- a/qpid/cpp/src/qmf/ConsoleEngine.h
+++ b/qpid/cpp/src/qmf/ConsoleEngine.h
@@ -20,62 +20,178 @@
* under the License.
*/
-#include <qmf/ManagedConnection.h>
-#include <qmf/Broker.h>
-#include <qmf/Package.h>
-#include <qmf/SchemaClassTable.h>
+#include <qmf/ResilientConnection.h>
+#include <qmf/Schema.h>
+#include <qmf/ObjectId.h>
#include <qmf/Object.h>
-#include <qmf/ConsoleHandler.h>
-#include <set>
-#include <vector>
-#include <string>
+#include <qmf/Event.h>
+#include <qmf/Query.h>
+#include <qmf/Value.h>
+#include <qmf/Message.h>
namespace qmf {
+ class ConsoleEngine;
+ class ConsoleEngineImpl;
+ class BrokerProxyImpl;
+ class AgentProxy;
+ class AgentProxyImpl;
+ class MethodResponseImpl;
+
+ /**
+ *
+ */
+ class MethodResponse {
+ public:
+ MethodResponse(MethodResponseImpl* impl);
+ ~MethodResponse();
+ uint32_t getStatus() const;
+ const Value* getException() const;
+ const Value* getArgs() const;
+
+ private:
+ friend class ConsoleEngineImpl;
+ MethodResponseImpl* impl;
+ };
+
+ /**
+ *
+ */
+ struct ConsoleEvent {
+ enum EventKind {
+ AGENT_ADDED = 1,
+ AGENT_DELETED = 2,
+ NEW_PACKAGE = 3,
+ NEW_CLASS = 4,
+ OBJECT_UPDATE = 5,
+ QUERY_COMPLETE = 6,
+ EVENT_RECEIVED = 7,
+ AGENT_HEARTBEAT = 8,
+ METHOD_RESPONSE = 9
+ };
+
+ EventKind kind;
+ AgentProxy* agent; // (AGENT_[ADDED|DELETED|HEARTBEAT])
+ char* name; // (NEW_PACKAGE)
+ SchemaClassKey* classKey; // (NEW_CLASS)
+ Object* object; // (OBJECT_UPDATE)
+ void* context; // (OBJECT_UPDATE, QUERY_COMPLETE)
+ Event* event; // (EVENT_RECEIVED)
+ uint64_t timestamp; // (AGENT_HEARTBEAT)
+ uint32_t methodHandle; // (METHOD_RESPONSE)
+ MethodResponse* methodResponse; // (METHOD_RESPONSE)
+ };
+
+ /**
+ *
+ */
+ struct BrokerEvent {
+ enum EventKind {
+ BROKER_INFO = 10,
+ DECLARE_QUEUE = 11,
+ DELETE_QUEUE = 12,
+ BIND = 13,
+ UNBIND = 14,
+ SETUP_COMPLETE = 15
+ };
+
+ EventKind kind;
+ char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
+ char* exchange; // ([UN]BIND)
+ char* bindingKey; // ([UN]BIND)
+ };
+
+ /**
+ *
+ */
+ class BrokerProxy {
+ public:
+ BrokerProxy(ConsoleEngine& console);
+ ~BrokerProxy();
+
+ void sessionOpened(SessionHandle& sh);
+ void sessionClosed();
+ void startProtocol();
+
+ void handleRcvMessage(Message& message);
+ bool getXmtMessage(Message& item) const;
+ void popXmt();
+
+ bool getEvent(BrokerEvent& event) const;
+ void popEvent();
+
+ private:
+ friend class ConsoleEngineImpl;
+ BrokerProxyImpl* impl;
+ };
+
+ /**
+ *
+ */
+ class AgentProxy {
+ public:
+ AgentProxy(ConsoleEngine& console);
+ ~AgentProxy();
+
+ private:
+ friend class ConsoleEngineImpl;
+ AgentProxyImpl* impl;
+ };
+
+ // TODO - move this to a public header
struct ConsoleSettings {
bool rcvObjects;
bool rcvEvents;
bool rcvHeartbeats;
bool userBindings;
- uint32_t methodTimeout;
- uint32_t getTimeout;
ConsoleSettings() :
rcvObjects(true),
rcvEvents(true),
rcvHeartbeats(true),
- userBindings(false),
- methodTimeout(20),
- getTimeout(20) {}
+ userBindings(false) {}
};
class ConsoleEngine {
public:
- ConsoleEngine(ConsoleHandler* handler = 0, ConsoleSettings settings = ConsoleSettings());
+ ConsoleEngine(const ConsoleSettings& settings = ConsoleSettings());
~ConsoleEngine();
- Broker* addConnection(ManagedConnection& connection);
- void delConnection(Broker* broker);
- void delConnection(ManagedConnection& connection);
+ bool getEvent(ConsoleEvent& event) const;
+ void popEvent();
+
+ void addConnection(BrokerProxy& broker, void* context);
+ void delConnection(BrokerProxy& broker);
+
+ uint32_t packageCount() const;
+ const char* getPackageName(uint32_t idx) const;
+
+ uint32_t classCount(const char* packageName) const;
+ const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
- const PackageMap& getPackages() const;
+ ClassKind getClassKind(const SchemaClassKey& key) const;
+ const SchemaObjectClass* getObjectClass(const SchemaClassKey& key) const;
+ const SchemaEventClass* getEventClass(const SchemaClassKey& key) const;
- void bindPackage(const Package& package);
- void bindPackage(const std::string& packageName);
- void bindClass(const SchemaClass& otype);
- void bindClass(const std::string& packageName, const std::string& className);
+ void bindPackage(const char* packageName);
+ void bindClass(const SchemaClassKey& key);
+ void bindClass(const char* packageName, const char* className);
+
+ uint32_t agentCount() const;
+ const AgentProxy* getAgent(uint32_t idx) const;
+
+ void sendQuery(const Query& query, void* context);
/*
- void getAgents(std::set<Agent>& agents, Broker* = 0);
- void getObjects(std::vector<Object>& objects, const std::string& typeName,
- const std::string& packageName = "",
- Broker* broker = 0,
- Agent* agent = 0);
- void getObjects(std::vector<Object>& objects,
- const std::map<std::string, std::string>& query,
- Broker* broker = 0,
- Agent* agent = 0);
+ void startSync(const Query& query, void* context, SyncQuery& sync);
+ void touchSync(SyncQuery& sync);
+ void endSync(SyncQuery& sync);
*/
+
+ private:
+ friend class BrokerProxyImpl;
+ friend class AgentProxyImpl;
+ ConsoleEngineImpl* impl;
};
}
diff --git a/qpid/cpp/src/qmf/ObjectImpl.cpp b/qpid/cpp/src/qmf/ObjectImpl.cpp
index d3882935e4..645ccd5c81 100644
--- a/qpid/cpp/src/qmf/ObjectImpl.cpp
+++ b/qpid/cpp/src/qmf/ObjectImpl.cpp
@@ -123,9 +123,9 @@ void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList)
void ObjectImpl::encodeSchemaKey(qpid::framing::Buffer& buffer) const
{
- buffer.putShortString(objectClass->getPackage());
- buffer.putShortString(objectClass->getName());
- buffer.putBin128(const_cast<uint8_t*>(objectClass->getHash()));
+ buffer.putShortString(objectClass->getClassKey()->getPackageName());
+ buffer.putShortString(objectClass->getClassKey()->getClassName());
+ buffer.putBin128(const_cast<uint8_t*>(objectClass->getClassKey()->getHash()));
}
void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const
diff --git a/qpid/cpp/src/qmf/ResilientConnection.cpp b/qpid/cpp/src/qmf/ResilientConnection.cpp
index 3531e104b0..88b9169c75 100644
--- a/qpid/cpp/src/qmf/ResilientConnection.cpp
+++ b/qpid/cpp/src/qmf/ResilientConnection.cpp
@@ -79,7 +79,7 @@ namespace qmf {
class ResilientConnectionImpl : public qpid::sys::Runnable {
public:
- ResilientConnectionImpl(ConnectionSettings& settings);
+ ResilientConnectionImpl(const ConnectionSettings& settings);
~ResilientConnectionImpl();
bool isConnected() const;
@@ -108,7 +108,7 @@ namespace qmf {
bool connected;
bool shutdown;
string lastError;
- ConnectionSettings settings;
+ const ConnectionSettings settings;
Connection connection;
mutable qpid::sys::Mutex lock;
int delayMin;
@@ -175,7 +175,7 @@ void RCSession::received(qpid::client::Message& msg)
connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);
}
-ResilientConnectionImpl::ResilientConnectionImpl(ConnectionSettings& _settings) :
+ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) :
notifyFd(-1), connected(false), shutdown(false), settings(_settings), connThread(*this)
{
connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this));
@@ -222,7 +222,7 @@ bool ResilientConnectionImpl::createSession(const char* name, void* sessionConte
RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext));
- handle.handle = (void*) sess.get();
+ handle.impl = (void*) sess.get();
sessions.insert(sess);
return true;
@@ -231,7 +231,7 @@ bool ResilientConnectionImpl::createSession(const char* name, void* sessionConte
void ResilientConnectionImpl::destroySession(SessionHandle handle)
{
Mutex::ScopedLock _lock(lock);
- RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle);
+ RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
set<RCSession::Ptr>::iterator iter = sessions.find(sess);
if (iter != sessions.end()) {
for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++)
@@ -247,7 +247,7 @@ void ResilientConnectionImpl::destroySession(SessionHandle handle)
void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& message)
{
Mutex::ScopedLock _lock(lock);
- RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle);
+ RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
set<RCSession::Ptr>::iterator iter = sessions.find(sess);
qpid::client::Message msg;
string data(message.body, message.length);
@@ -267,7 +267,7 @@ void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& me
void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue)
{
Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.handle;
+ RCSession* sess = (RCSession*) handle.impl;
sess->session.queueDeclare(arg::queue=queue, arg::autoDelete=true, arg::exclusive=true);
sess->subscriptions->subscribe(*sess, queue, queue);
@@ -277,7 +277,7 @@ void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue)
void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue)
{
Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.handle;
+ RCSession* sess = (RCSession*) handle.impl;
sess->session.queueDelete(arg::queue=queue);
for (vector<string>::iterator iter = sess->dests.begin();
@@ -293,7 +293,7 @@ void ResilientConnectionImpl::bind(SessionHandle handle,
char* exchange, char* queue, char* key)
{
Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.handle;
+ RCSession* sess = (RCSession*) handle.impl;
sess->session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key);
}
@@ -302,7 +302,7 @@ void ResilientConnectionImpl::unbind(SessionHandle handle,
char* exchange, char* queue, char* key)
{
Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.handle;
+ RCSession* sess = (RCSession*) handle.impl;
sess->session.exchangeUnbind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key);
}
@@ -396,7 +396,7 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k
// Wrappers
//==================================================================
-ResilientConnection::ResilientConnection(ConnectionSettings& settings)
+ResilientConnection::ResilientConnection(const ConnectionSettings& settings)
{
impl = new ResilientConnectionImpl(settings);
}
diff --git a/qpid/cpp/src/qmf/ResilientConnection.h b/qpid/cpp/src/qmf/ResilientConnection.h
index 6e05541253..03f1b9c0d5 100644
--- a/qpid/cpp/src/qmf/ResilientConnection.h
+++ b/qpid/cpp/src/qmf/ResilientConnection.h
@@ -26,6 +26,8 @@
namespace qmf {
+ class ResilientConnectionImpl;
+
/**
* Represents events that occur, unsolicited, from ResilientConnection.
*/
@@ -43,12 +45,11 @@ namespace qmf {
Message message; // RECV
};
- struct SessionHandle {
- void* handle;
+ class SessionHandle {
+ friend class ResilientConnectionImpl;
+ void* impl;
};
- class ResilientConnectionImpl;
-
/**
* ResilientConnection represents a Qpid connection that is resilient.
*
@@ -67,7 +68,7 @@ namespace qmf {
*@param delayMax Maximum delay (in seconds) between retries.
*@param delayFactor Factor to multiply retry delay by after each failure.
*/
- ResilientConnection(ConnectionSettings& settings);
+ ResilientConnection(const ConnectionSettings& settings);
~ResilientConnection();
/**
diff --git a/qpid/cpp/src/qmf/Schema.h b/qpid/cpp/src/qmf/Schema.h
index e3ab90e3e3..1123acc3b8 100644
--- a/qpid/cpp/src/qmf/Schema.h
+++ b/qpid/cpp/src/qmf/Schema.h
@@ -35,6 +35,7 @@ namespace qmf {
struct SchemaStatisticImpl;
struct SchemaObjectClassImpl;
struct SchemaEventClassImpl;
+ struct SchemaClassKeyImpl;
/**
*/
@@ -114,6 +115,20 @@ namespace qmf {
/**
*/
+ class SchemaClassKey {
+ public:
+ SchemaClassKey(SchemaClassKeyImpl* impl);
+ ~SchemaClassKey();
+
+ const char* getPackageName() const;
+ const char* getClassName() const;
+ const uint8_t* getHash() const;
+
+ SchemaClassKeyImpl* impl;
+ };
+
+ /**
+ */
class SchemaObjectClass {
public:
SchemaObjectClass(const char* package, const char* name);
@@ -123,9 +138,7 @@ namespace qmf {
void addStatistic(const SchemaStatistic& statistic);
void addMethod(const SchemaMethod& method);
- const char* getPackage() const;
- const char* getName() const;
- const uint8_t* getHash() const;
+ const SchemaClassKey* getClassKey() const;
int getPropertyCount() const;
int getStatisticCount() const;
int getMethodCount() const;
@@ -146,9 +159,7 @@ namespace qmf {
void addArgument(const SchemaArgument& argument);
void setDesc(const char* desc);
- const char* getPackage() const;
- const char* getName() const;
- const uint8_t* getHash() const;
+ const SchemaClassKey* getClassKey() const;
int getArgumentCount() const;
const SchemaArgument* getArgument(int idx) const;
diff --git a/qpid/cpp/src/qmf/SchemaImpl.cpp b/qpid/cpp/src/qmf/SchemaImpl.cpp
index 665c94f2a1..be30cdb642 100644
--- a/qpid/cpp/src/qmf/SchemaImpl.cpp
+++ b/qpid/cpp/src/qmf/SchemaImpl.cpp
@@ -240,15 +240,20 @@ void SchemaStatisticImpl::updateHash(SchemaHash& hash) const
hash.update(description);
}
-SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : envelope(new SchemaObjectClass(this)), hasHash(true)
+SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) :
+ envelope(new SchemaClassKey(this)), package(p), name(n), hash(h) {}
+
+SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) :
+ envelope(new SchemaObjectClass(this)), hasHash(true), classKey(package, name, hash)
{
buffer.getShortString(package);
buffer.getShortString(name);
hash.decode(buffer);
- uint16_t propCount = buffer.getShort();
- uint16_t statCount = buffer.getShort();
- uint16_t methodCount = buffer.getShort();
+ /*uint8_t hasParentClass =*/ buffer.getOctet(); // TODO: Parse parent-class indicator
+ uint16_t propCount = buffer.getShort();
+ uint16_t statCount = buffer.getShort();
+ uint16_t methodCount = buffer.getShort();
for (uint16_t idx = 0; idx < propCount; idx++) {
SchemaPropertyImpl* property = new SchemaPropertyImpl(buffer);
@@ -288,7 +293,7 @@ void SchemaObjectClassImpl::encode(Buffer& buffer) const
(*iter)->encode(buffer);
}
-const uint8_t* SchemaObjectClassImpl::getHash() const
+const SchemaClassKey* SchemaObjectClassImpl::getClassKey() const
{
if (!hasHash) {
hasHash = true;
@@ -305,7 +310,7 @@ const uint8_t* SchemaObjectClassImpl::getHash() const
(*iter)->updateHash(hash);
}
- return hash.get();
+ return classKey.envelope;
}
void SchemaObjectClassImpl::addProperty(const SchemaProperty& property)
@@ -353,7 +358,8 @@ const SchemaMethod* SchemaObjectClassImpl::getMethod(int idx) const
return 0;
}
-SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : envelope(new SchemaEventClass(this)), hasHash(true)
+SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) :
+ envelope(new SchemaEventClass(this)), hasHash(true), classKey(package, name, hash)
{
buffer.getShortString(package);
buffer.getShortString(name);
@@ -380,7 +386,7 @@ void SchemaEventClassImpl::encode(Buffer& buffer) const
(*iter)->encode(buffer);
}
-const uint8_t* SchemaEventClassImpl::getHash() const
+const SchemaClassKey* SchemaEventClassImpl::getClassKey() const
{
if (!hasHash) {
hasHash = true;
@@ -390,7 +396,7 @@ const uint8_t* SchemaEventClassImpl::getHash() const
iter != arguments.end(); iter++)
(*iter)->updateHash(hash);
}
- return hash.get();
+ return classKey.envelope;
}
void SchemaEventClassImpl::addArgument(const SchemaArgument& argument)
@@ -408,6 +414,7 @@ const SchemaArgument* SchemaEventClassImpl::getArgument(int idx) const
return 0;
}
+
//==================================================================
// Wrappers
//==================================================================
@@ -620,6 +627,28 @@ const char* SchemaStatistic::getDesc() const
return impl->getDesc().c_str();
}
+SchemaClassKey::SchemaClassKey(SchemaClassKeyImpl* i) : impl(i) {}
+
+SchemaClassKey::~SchemaClassKey()
+{
+ delete impl;
+}
+
+const char* SchemaClassKey::getPackageName() const
+{
+ return impl->getPackageName().c_str();
+}
+
+const char* SchemaClassKey::getClassName() const
+{
+ return impl->getClassName().c_str();
+}
+
+const uint8_t* SchemaClassKey::getHash() const
+{
+ return impl->getHash();
+}
+
SchemaObjectClass::SchemaObjectClass(const char* package, const char* name)
{
impl = new SchemaObjectClassImpl(this, package, name);
@@ -647,19 +676,9 @@ void SchemaObjectClass::addMethod(const SchemaMethod& method)
impl->addMethod(method);
}
-const char* SchemaObjectClass::getPackage() const
+const SchemaClassKey* SchemaObjectClass::getClassKey() const
{
- return impl->getPackage().c_str();
-}
-
-const char* SchemaObjectClass::getName() const
-{
- return impl->getName().c_str();
-}
-
-const uint8_t* SchemaObjectClass::getHash() const
-{
- return impl->getHash();
+ return impl->getClassKey();
}
int SchemaObjectClass::getPropertyCount() const
@@ -714,19 +733,9 @@ void SchemaEventClass::setDesc(const char* desc)
impl->setDesc(desc);
}
-const char* SchemaEventClass::getPackage() const
+const SchemaClassKey* SchemaEventClass::getClassKey() const
{
- return impl->getPackage().c_str();
-}
-
-const char* SchemaEventClass::getName() const
-{
- return impl->getName().c_str();
-}
-
-const uint8_t* SchemaEventClass::getHash() const
-{
- return impl->getHash();
+ return impl->getClassKey();
}
int SchemaEventClass::getArgumentCount() const
diff --git a/qpid/cpp/src/qmf/SchemaImpl.h b/qpid/cpp/src/qmf/SchemaImpl.h
index 2c30a8851f..3a18703ae6 100644
--- a/qpid/cpp/src/qmf/SchemaImpl.h
+++ b/qpid/cpp/src/qmf/SchemaImpl.h
@@ -138,27 +138,39 @@ namespace qmf {
void updateHash(SchemaHash& hash) const;
};
+ struct SchemaClassKeyImpl {
+ const SchemaClassKey* envelope;
+ const std::string& package;
+ const std::string& name;
+ const SchemaHash& hash;
+
+ SchemaClassKeyImpl(const std::string& package, const std::string& name, const SchemaHash& hash);
+
+ const std::string& getPackageName() const { return package; }
+ const std::string& getClassName() const { return name; }
+ const uint8_t* getHash() const { return hash.get(); }
+ };
+
struct SchemaObjectClassImpl {
SchemaObjectClass* envelope;
std::string package;
std::string name;
mutable SchemaHash hash;
mutable bool hasHash;
+ SchemaClassKeyImpl classKey;
std::vector<SchemaPropertyImpl*> properties;
std::vector<SchemaStatisticImpl*> statistics;
std::vector<SchemaMethodImpl*> methods;
SchemaObjectClassImpl(SchemaObjectClass* e, const char* p, const char* n) :
- envelope(e), package(p), name(n), hasHash(false) {}
+ envelope(e), package(p), name(n), hasHash(false), classKey(package, name, hash) {}
SchemaObjectClassImpl(qpid::framing::Buffer& buffer);
void encode(qpid::framing::Buffer& buffer) const;
void addProperty(const SchemaProperty& property);
void addStatistic(const SchemaStatistic& statistic);
void addMethod(const SchemaMethod& method);
- const std::string& getPackage() const { return package; }
- const std::string& getName() const { return name; }
- const uint8_t* getHash() const;
+ const SchemaClassKey* getClassKey() const;
int getPropertyCount() const { return properties.size(); }
int getStatisticCount() const { return statistics.size(); }
int getMethodCount() const { return methods.size(); }
@@ -173,19 +185,18 @@ namespace qmf {
std::string name;
mutable SchemaHash hash;
mutable bool hasHash;
+ SchemaClassKeyImpl classKey;
std::string description;
std::vector<SchemaArgumentImpl*> arguments;
SchemaEventClassImpl(SchemaEventClass* e, const char* p, const char* n) :
- envelope(e), package(p), name(n), hasHash(false) {}
+ envelope(e), package(p), name(n), hasHash(false), classKey(package, name, hash) {}
SchemaEventClassImpl(qpid::framing::Buffer& buffer);
void encode(qpid::framing::Buffer& buffer) const;
void addArgument(const SchemaArgument& argument);
void setDesc(const char* desc) { description = desc; }
- const std::string& getPackage() const { return package; }
- const std::string& getName() const { return name; }
- const uint8_t* getHash() const;
+ const SchemaClassKey* getClassKey() const;
int getArgumentCount() const { return arguments.size(); }
const SchemaArgument* getArgument(int idx) const;
};