summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/bindings/qmf/python/Makefile.am2
-rw-r--r--cpp/bindings/qmf/python/qmf.py14
-rw-r--r--cpp/bindings/qmf/ruby/qmf.rb295
-rwxr-xr-xcpp/bindings/qmf/tests/agent_ruby.rb7
-rw-r--r--cpp/bindings/qmf/tests/python_agent.py4
-rwxr-xr-xcpp/bindings/qmf/tests/ruby_console.rb83
-rw-r--r--cpp/src/qmf/AgentEngine.cpp19
-rw-r--r--cpp/src/qmf/ConsoleEngine.cpp379
-rw-r--r--cpp/src/qmf/ConsoleEngine.h70
-rw-r--r--cpp/src/qmf/Object.h3
-rw-r--r--cpp/src/qmf/ObjectId.h1
-rw-r--r--cpp/src/qmf/ObjectIdImpl.cpp21
-rw-r--r--cpp/src/qmf/ObjectIdImpl.h3
-rw-r--r--cpp/src/qmf/ObjectImpl.cpp88
-rw-r--r--cpp/src/qmf/ObjectImpl.h13
-rw-r--r--cpp/src/qmf/Query.h70
-rw-r--r--cpp/src/qmf/QueryImpl.cpp85
-rw-r--r--cpp/src/qmf/QueryImpl.h78
-rw-r--r--cpp/src/qmf/SchemaImpl.cpp13
-rw-r--r--cpp/src/qmf/SchemaImpl.h7
-rw-r--r--cpp/src/qmf/SequenceManager.cpp52
-rw-r--r--cpp/src/qmf/SequenceManager.h20
22 files changed, 1004 insertions, 323 deletions
diff --git a/cpp/bindings/qmf/python/Makefile.am b/cpp/bindings/qmf/python/Makefile.am
index f51d26bfad..55d9079fb7 100644
--- a/cpp/bindings/qmf/python/Makefile.am
+++ b/cpp/bindings/qmf/python/Makefile.am
@@ -29,7 +29,7 @@ EXTRA_DIST = python.i
BUILT_SOURCES = $(generated_file_list)
$(generated_file_list): $(srcdir)/python.i $(srcdir)/../qmfengine.i
- swig -python -c++ -Wall -I/usr/include $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -o qmfengine.cpp $(srcdir)/python.i
+ swig -c++ -python -Wall -I/usr/include $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -o qmfengine.cpp $(srcdir)/python.i
pylibdir = $(PYTHON_LIB)
diff --git a/cpp/bindings/qmf/python/qmf.py b/cpp/bindings/qmf/python/qmf.py
index 265f204852..4800b327f1 100644
--- a/cpp/bindings/qmf/python/qmf.py
+++ b/cpp/bindings/qmf/python/qmf.py
@@ -30,6 +30,8 @@ from qmfengine import (TYPE_ABSTIME, TYPE_ARRAY, TYPE_BOOL, TYPE_DELTATIME,
TYPE_INT8, TYPE_LIST, TYPE_LSTR, TYPE_MAP, TYPE_OBJECT,
TYPE_REF, TYPE_SSTR, TYPE_UINT16, TYPE_UINT32, TYPE_UINT64,
TYPE_UINT8, TYPE_UUID)
+from qmfengine import (O_EQ, O_NE, O_LT, O_LE, O_GT, O_GE, O_RE_MATCH, O_RE_NOMATCH,
+ E_NOT, E_AND, E_OR, E_XOR)
##==============================================================================
@@ -404,11 +406,16 @@ class Arguments:
class Query:
- def __init__(self, i=None):
+ def __init__(self, i=None, package="", cls=None, oid=None):
if i:
self.impl = i
else:
- self.impl = qmfengine.Query()
+ if cls:
+ self.impl = qmfengine.Query(cls, package)
+ elif oid:
+ self.impl = qmfengine.Query(oid)
+ else:
+ raise "Argument error"
def package_name(self): return self.impl.getPackage()
@@ -419,9 +426,6 @@ class Query:
return ObjectId(_objid)
else:
return None
- OPER_AND = qmfengine.Query.OPER_AND
- OPER_OR = qmfengine.Query.OPER_OR
-
##==============================================================================
diff --git a/cpp/bindings/qmf/ruby/qmf.rb b/cpp/bindings/qmf/ruby/qmf.rb
index 21fbf6c157..16f1058f4a 100644
--- a/cpp/bindings/qmf/ruby/qmf.rb
+++ b/cpp/bindings/qmf/ruby/qmf.rb
@@ -67,6 +67,7 @@ module Qmf
class ConnectionHandler
def conn_event_connected(); end
def conn_event_disconnected(error); end
+ def conn_event_visit(); end
def sess_event_session_closed(context, error); end
def sess_event_recv(context, message); end
end
@@ -82,6 +83,7 @@ module Qmf
@sockEngine, @sock = Socket::socketpair(Socket::PF_UNIX, Socket::SOCK_STREAM, 0)
@impl.setNotifyFd(@sockEngine.fileno)
@new_conn_handlers = []
+ @conn_handlers_to_delete = []
@conn_handlers = []
@thread = Thread.new do
@@ -89,17 +91,30 @@ module Qmf
end
end
+ def kick
+ @sockEngine.write(".")
+ @sockEngine.flush
+ end
+
def add_conn_handler(handler)
synchronize do
@new_conn_handlers << handler
end
- @sockEngine.write("x")
+ kick
+ end
+
+ def del_conn_handler(handler)
+ synchronize do
+ @conn_handlers_to_delete << handler
+ end
+ kick
end
def run()
eventImpl = Qmfengine::ResilientConnectionEvent.new
connected = nil
new_handlers = nil
+ del_handlers = nil
bt_count = 0
while :true
@@ -107,7 +122,9 @@ module Qmf
synchronize do
new_handlers = @new_conn_handlers
+ del_handlers = @conn_handlers_to_delete
@new_conn_handlers = []
+ @conn_handlers_to_delete = []
end
new_handlers.each do |nh|
@@ -116,6 +133,11 @@ module Qmf
end
new_handlers = nil
+ del_handlers.each do |dh|
+ d = @conn_handlers.delete(dh)
+ end
+ del_handlers = nil
+
valid = @impl.getEvent(eventImpl)
while valid
begin
@@ -141,6 +163,7 @@ module Qmf
@impl.popEvent
valid = @impl.getEvent(eventImpl)
end
+ @conn_handlers.each { |h| h.conn_event_visit }
end
end
end
@@ -167,23 +190,20 @@ module Qmf
class QmfObject
attr_reader :impl, :object_class
- def initialize(cls)
- @object_class = cls
- @impl = Qmfengine::Object.new(@object_class.impl)
- end
-
- def destroy
- @impl.destroy
+ def initialize(cls, kwargs={})
+ if cls:
+ @object_class = cls
+ @impl = Qmfengine::Object.new(@object_class.impl)
+ elsif kwargs.include?(:impl)
+ @impl = Qmfengine::Object.new(kwargs[:impl])
+ @object_class = SchemaObjectClass.new(nil, nil, :impl => @impl.getClass)
+ end
end
def object_id
return ObjectId.new(@impl.getObjectId)
end
- def set_object_id(oid)
- @impl.setObjectId(oid.impl)
- end
-
def get_attr(name)
val = value(name)
case val.getType
@@ -248,17 +268,31 @@ module Qmf
def value(name)
val = @impl.getValue(name.to_s)
if val.nil?
- raise ArgumentError, "Attribute '#{name}' not defined for class #{@object_class.impl.getName}"
+ raise ArgumentError, "Attribute '#{name}' not defined for class #{@object_class.impl.getClassKey.getPackageName}:#{@object_class.impl.getClassKey.getClassName}"
end
return val
end
end
+ class AgentObject < QmfObject
+ def initialize(cls, kwargs={})
+ super(cls, kwargs)
+ end
+
+ def destroy
+ @impl.destroy
+ end
+
+ def set_object_id(oid)
+ @impl.setObjectId(oid.impl)
+ end
+ end
+
class ConsoleObject < QmfObject
attr_reader :current_time, :create_time, :delete_time
- def initialize(cls)
- super(cls)
+ def initialize(cls, kwargs={})
+ super(cls, kwargs)
end
def update()
@@ -373,10 +407,30 @@ module Qmf
end
end
+ ##==============================================================================
+ ## QUERY
+ ##==============================================================================
+
class Query
attr_reader :impl
- def initialize(i)
- @impl = i
+ def initialize(kwargs = {})
+ if kwargs.include?(:impl)
+ @impl = kwargs[:impl]
+ else
+ package = ''
+ if kwargs.include?(:key)
+ @impl = Qmfengine::Query.new(kwargs[:key])
+ elsif kwargs.include?(:object_id)
+ @impl = Qmfengine::Query.new(kwargs[:object_id])
+ else
+ package = kwargs[:package] if kwargs.include?(:package)
+ if kwargs.include?(:class)
+ @impl = Qmfengine::Query.new(kwargs[:class], package)
+ else
+ raise ArgumentError, "Invalid arguments, use :key or :class[,:package]"
+ end
+ end
+ end
end
def package_name
@@ -403,36 +457,60 @@ module Qmf
class SchemaArgument
attr_reader :impl
def initialize(name, typecode, kwargs={})
- @impl = Qmfengine::SchemaArgument.new(name, typecode)
- @impl.setDirection(kwargs[:dir]) if kwargs.include?(:dir)
- @impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit)
- @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
+ if kwargs.include?(:impl)
+ @impl = kwargs[:impl]
+ else
+ @impl = Qmfengine::SchemaArgument.new(name, typecode)
+ @impl.setDirection(kwargs[:dir]) if kwargs.include?(:dir)
+ @impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit)
+ @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
+ end
+ end
+
+ def name
+ @impl.getName
end
end
class SchemaMethod
- attr_reader :impl
+ attr_reader :impl, :arguments
def initialize(name, kwargs={})
- @impl = Qmfengine::SchemaMethod.new(name)
- @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
@arguments = []
+ if kwargs.include?(:impl)
+ @impl = kwargs[:impl]
+ arg_count = @impl.getArgumentCount
+ for i in 0...arg_count
+ @arguments << SchemaArgument.new(nil, nil, :impl => @impl.getArgument(i))
+ end
+ else
+ @impl = Qmfengine::SchemaMethod.new(name)
+ @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
+ end
end
def add_argument(arg)
@arguments << arg
@impl.addArgument(arg.impl)
end
+
+ def name
+ @impl.getName
+ end
end
class SchemaProperty
attr_reader :impl
def initialize(name, typecode, kwargs={})
- @impl = Qmfengine::SchemaProperty.new(name, typecode)
- @impl.setAccess(kwargs[:access]) if kwargs.include?(:access)
- @impl.setIndex(kwargs[:index]) if kwargs.include?(:index)
- @impl.setOptional(kwargs[:optional]) if kwargs.include?(:optional)
- @impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit)
- @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
+ if kwargs.include?(:impl)
+ @impl = kwargs[:impl]
+ else
+ @impl = Qmfengine::SchemaProperty.new(name, typecode)
+ @impl.setAccess(kwargs[:access]) if kwargs.include?(:access)
+ @impl.setIndex(kwargs[:index]) if kwargs.include?(:index)
+ @impl.setOptional(kwargs[:optional]) if kwargs.include?(:optional)
+ @impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit)
+ @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
+ end
end
def name
@@ -443,9 +521,17 @@ module Qmf
class SchemaStatistic
attr_reader :impl
def initialize(name, typecode, kwargs={})
- @impl = Qmfengine::SchemaStatistic.new(name, typecode)
- @impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit)
- @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
+ if kwargs.include?(:impl)
+ @impl = kwargs[:impl]
+ else
+ @impl = Qmfengine::SchemaStatistic.new(name, typecode)
+ @impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit)
+ @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
+ end
+ end
+
+ def name
+ @impl.getName
end
end
@@ -465,13 +551,25 @@ module Qmf
end
class SchemaObjectClass
- attr_reader :impl
- def initialize(package='', name='', kwargs={})
+ attr_reader :impl, :properties, :statistics, :methods
+ def initialize(package, name, kwargs={})
@properties = []
@statistics = []
@methods = []
if kwargs.include?(:impl)
@impl = kwargs[:impl]
+
+ @impl.getPropertyCount.times do |i|
+ @properties << SchemaProperty.new(nil, nil, :impl => @impl.getProperty(i))
+ end
+
+ @impl.getStatisticCount.times do |i|
+ @statistics << SchemaStatistic.new(nil, nil, :impl => @impl.getStatistic(i))
+ end
+
+ @impl.getMethodCount.times do |i|
+ @methods << SchemaMethod.new(nil, :impl => @impl.getMethod(i))
+ end
else
@impl = Qmfengine::SchemaObjectClass.new(package, name)
end
@@ -495,24 +593,17 @@ module Qmf
def name
@impl.getClassKey.getClassName
end
-
- def properties
- unless @properties
- @properties = []
- @impl.getPropertyCount.times do |i|
- @properties << @impl.getProperty(i)
- end
- end
- return @properties
- end
end
class SchemaEventClass
- attr_reader :impl
- def initialize(package='', name='', kwargs={})
+ attr_reader :impl, :arguments
+ def initialize(package, name, kwargs={})
@arguments = []
if kwargs.include?(:impl)
@impl = kwargs[:impl]
+ @impl.getArgumentCount.times do |i|
+ @arguments << SchemaArgument.new(nil, nil, :impl => @impl.getArgument(i))
+ end
else
@impl = Qmfengine::SchemaEventClass.new(package, name)
@impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
@@ -546,13 +637,18 @@ module Qmf
end
class Console
+ include MonitorMixin
attr_reader :impl
def initialize(handler = nil, kwargs={})
+ super()
@handler = handler
@impl = Qmfengine::ConsoleEngine.new
@event = Qmfengine::ConsoleEvent.new
@broker_list = []
+ @cv = new_cond
+ @sync_count = nil
+ @sync_result = nil
end
def add_connection(conn)
@@ -562,6 +658,8 @@ module Qmf
end
def del_connection(broker)
+ broker.shutdown
+ @broker_list.delete(broker)
end
def get_packages()
@@ -581,9 +679,9 @@ module Qmf
class_kind = @impl.getClassKind(key)
if class_kind == kind
if kind == CLASS_OBJECT
- clist << SchemaObjectClass.new('', '', :impl => @impl.getObjectClass(key))
+ clist << SchemaObjectClass.new(nil, nil, :impl => @impl.getObjectClass(key))
elsif kind == CLASS_EVENT
- clist << SchemaEventClass.new('', '', :impl => @impl.getEventClass(key))
+ clist << SchemaEventClass.new(nil, nil, :impl => @impl.getEventClass(key))
end
end
end
@@ -591,19 +689,70 @@ module Qmf
return clist
end
- def get_schema(class_key)
- end
-
def bind_package(package)
+ @impl.bindPackage(package)
end
def bind_class(kwargs = {})
+ if kwargs.include?(:key)
+ @impl.bindClass(kwargs[:key])
+ elsif kwargs.include?(:package)
+ package = kwargs[:package]
+ if kwargs.include?(:class)
+ @impl.bindClass(package, kwargs[:class])
+ else
+ @impl.bindClass(package)
+ end
+ else
+ raise ArgumentError, "Invalid arguments, use :key or :package[,:class]"
+ end
end
def get_agents(broker = nil)
+ blist = []
+ if broker
+ blist << broker
+ else
+ blist = @broker_list
+ end
+
+ agents = []
+ blist.each do |b|
+ count = b.impl.agentCount
+ for idx in 0...count
+ agents << AgentProxy.new(b.impl.getAgent(idx), b)
+ end
+ end
+
+ return agents
end
def get_objects(query, kwargs = {})
+ timeout = 30
+ if kwargs.include?(:timeout)
+ timeout = kwargs[:timeout]
+ end
+ synchronize do
+ @sync_count = 1
+ @sync_result = []
+ broker = @broker_list[0]
+ broker.send_query(query.impl, nil)
+ unless @cv.wait(timeout) { @sync_count == 0 }
+ raise "Timed out waiting for response"
+ end
+
+ return @sync_result
+ end
+ end
+
+ def _get_result(list, context)
+ synchronize do
+ list.each do |item|
+ @sync_result << item
+ end
+ @sync_count -= 1
+ @cv.signal
+ end
end
def start_sync(query)
@@ -638,6 +787,19 @@ module Qmf
end
end
+ class AgentProxy
+ attr_reader :broker
+
+ def initialize(impl, broker)
+ @impl = impl
+ @broker = broker
+ end
+
+ def label
+ @impl.getLabel
+ end
+ end
+
class Broker < ConnectionHandler
include MonitorMixin
attr_reader :impl
@@ -654,6 +816,13 @@ module Qmf
@impl = Qmfengine::BrokerProxy.new(@console.impl)
@console.impl.addConnection(@impl, self)
@conn.add_conn_handler(self)
+ @operational = :true
+ end
+
+ def shutdown()
+ @console.impl.delConnection(@impl)
+ @conn.del_conn_handler(self)
+ @operational = :false
end
def waitForStable(timeout = nil)
@@ -671,6 +840,11 @@ module Qmf
end
end
+ def send_query(query, ctx)
+ @impl.sendQuery(query, ctx)
+ @conn.kick
+ end
+
def do_broker_events()
count = 0
valid = @impl.getEvent(@event)
@@ -694,6 +868,12 @@ module Qmf
@stable = :true
@cv.signal
end
+ when Qmfengine::BrokerEvent::QUERY_COMPLETE
+ result = []
+ for idx in 0...@event.queryResponse.getObjectCount
+ result << ConsoleObject.new(nil, :impl => @event.queryResponse.getObject(idx))
+ end
+ @console._get_result(result, @event.context)
end
@impl.popEvent
valid = @impl.getEvent(@event)
@@ -732,12 +912,17 @@ module Qmf
puts "Console Connection Lost"
end
+ def conn_event_visit
+ do_events
+ end
+
def sess_event_session_closed(context, error)
puts "Console Session Lost"
@impl.sessionClosed()
end
def sess_event_recv(context, message)
+ puts "Unexpected RECV Event" if not @operational
@impl.handleRcvMessage(message)
do_events
end
@@ -798,7 +983,7 @@ module Qmf
count += 1
case @event.kind
when Qmfengine::AgentEvent::GET_QUERY
- @handler.get_query(@event.sequence, Query.new(@event.query), @event.authUserId)
+ @handler.get_query(@event.sequence, Query.new(:impl => @event.query), @event.authUserId)
when Qmfengine::AgentEvent::START_SYNC
when Qmfengine::AgentEvent::END_SYNC
when Qmfengine::AgentEvent::METHOD_CALL
@@ -852,6 +1037,10 @@ module Qmf
puts "Agent Connection Lost"
end
+ def conn_event_visit
+ do_events
+ end
+
def sess_event_session_closed(context, error)
puts "Agent Session Lost"
end
diff --git a/cpp/bindings/qmf/tests/agent_ruby.rb b/cpp/bindings/qmf/tests/agent_ruby.rb
index 75de2b5fa1..67591319ee 100755
--- a/cpp/bindings/qmf/tests/agent_ruby.rb
+++ b/cpp/bindings/qmf/tests/agent_ruby.rb
@@ -72,8 +72,7 @@ end
class App < Qmf::AgentHandler
def get_query(context, query, userId)
-# puts "Query: user=#{userId} context=#{context} class=#{query.class_name} object_num=#{query.object_id.object_num_low if query.object_id}"
- #@parent.inc_attr("queryCount")
+# puts "Query: user=#{userId} context=#{context} class=#{query.class_name} object_num=#{query.object_id.object_num_low if query.object_id}"
if query.class_name == 'parent'
@agent.query_response(context, @parent)
elsif query.object_id == @parent_oid
@@ -135,7 +134,7 @@ class App < Qmf::AgentHandler
elsif name == "create_child"
oid = @agent.alloc_object_id(2)
args['child_ref'] = oid
- @child = Qmf::QmfObject.new(@model.child_class)
+ @child = Qmf::AgentObject.new(@model.child_class)
@child.set_attr("name", args.by_key("child_name"))
@child.set_object_id(oid)
@agent.method_response(context, 0, "OK", args)
@@ -161,7 +160,7 @@ class App < Qmf::AgentHandler
@agent.set_connection(@connection)
- @parent = Qmf::QmfObject.new(@model.parent_class)
+ @parent = Qmf::AgentObject.new(@model.parent_class)
@parent.set_attr("name", "Parent One")
@parent.set_attr("state", "OPERATIONAL")
diff --git a/cpp/bindings/qmf/tests/python_agent.py b/cpp/bindings/qmf/tests/python_agent.py
index f6cb51cbf5..d4373d3bb8 100644
--- a/cpp/bindings/qmf/tests/python_agent.py
+++ b/cpp/bindings/qmf/tests/python_agent.py
@@ -72,7 +72,9 @@ class Model:
class App(qmf.AgentHandler):
def get_query(self, context, query, userId):
- # puts "Query: user=#{userId} context=#{context} class=#{query.class_name} object_num=#{query.object_id.object_num_low if query.object_id}"
+ #print "Query: user=%s context=%d class=%s" % (userId, context, query.class_name())
+ #if query.object_id():
+ # print query.object_id().object_num_low()
self._parent.inc_attr("queryCount")
if query.class_name() == 'parent':
self._agent.query_response(context, self._parent)
diff --git a/cpp/bindings/qmf/tests/ruby_console.rb b/cpp/bindings/qmf/tests/ruby_console.rb
index fb48c29566..c071829f09 100755
--- a/cpp/bindings/qmf/tests/ruby_console.rb
+++ b/cpp/bindings/qmf/tests/ruby_console.rb
@@ -24,33 +24,90 @@ require 'socket'
class App < Qmf::ConsoleHandler
- def main
- @settings = Qmf::ConnectionSettings.new
- @settings.set_attr("host", ARGV[0]) if ARGV.size > 0
- @settings.set_attr("port", ARGV[1].to_i) if ARGV.size > 1
- @connection = Qmf::Connection.new(@settings)
- @qmf = Qmf::Console.new
-
- @broker = @qmf.add_connection(@connection)
- @broker.waitForStable
-
- packages = @qmf.get_packages
+ def dump_schema
+ packages = @qmfc.get_packages
puts "----- Packages -----"
packages.each do |p|
puts p
puts " ----- Object Classes -----"
- classes = @qmf.get_classes(p)
+ classes = @qmfc.get_classes(p)
classes.each do |c|
puts " #{c.name}"
+
+ puts " ---- Properties ----"
+ props = c.properties
+ props.each do |prop|
+ puts " #{prop.name}"
+ end
+
+ puts " ---- Statistics ----"
+ stats = c.statistics
+ stats.each do |stat|
+ puts " #{stat.name}"
+ end
+
+ puts " ---- Methods ----"
+ methods = c.methods
+ methods.each do |method|
+ puts " #{method.name}"
+ puts " ---- Args ----"
+ args = method.arguments
+ args.each do |arg|
+ puts " #{arg.name}"
+ end
+ end
end
+
puts " ----- Event Classes -----"
- classes = @qmf.get_classes(p, Qmf::CLASS_EVENT)
+ classes = @qmfc.get_classes(p, Qmf::CLASS_EVENT)
classes.each do |c|
puts " #{c.name}"
+ puts " ---- Args ----"
+ args = c.arguments
+ args.each do |arg|
+ puts " #{arg.name}"
+ end
end
end
puts "-----"
+ end
+
+ def main
+ @settings = Qmf::ConnectionSettings.new
+ @settings.set_attr("host", ARGV[0]) if ARGV.size > 0
+ @settings.set_attr("port", ARGV[1].to_i) if ARGV.size > 1
+ @connection = Qmf::Connection.new(@settings)
+ @qmfc = Qmf::Console.new
+
+ @broker = @qmfc.add_connection(@connection)
+ @broker.waitForStable
+
+ dump_schema
+
+ agents = @qmfc.get_agents()
+ puts "---- Agents ----"
+ agents.each do |a|
+ puts " => #{a.label}"
+ end
+ puts "----"
+
+ for idx in 0...20
+ blist = @qmfc.get_objects(Qmf::Query.new(:class => "broker"))
+ puts "---- Brokers ----"
+ blist.each do |b|
+ puts " ---- Broker ----"
+ puts " systemRef: #{b.get_attr('systemRef')}"
+ puts " port : #{b.get_attr('port')}"
+ puts " uptime : #{b.get_attr('uptime') / 1000000000}"
+ end
+ puts "----"
+ sleep(5)
+ end
+ sleep(5)
+ puts "Deleting connection..."
+ @qmfc.del_connection(@broker)
+ puts " done"
sleep
end
end
diff --git a/cpp/src/qmf/AgentEngine.cpp b/cpp/src/qmf/AgentEngine.cpp
index d3204042d5..9ea3be5907 100644
--- a/cpp/src/qmf/AgentEngine.cpp
+++ b/cpp/src/qmf/AgentEngine.cpp
@@ -57,7 +57,7 @@ namespace qmf {
string name;
Object* object;
boost::shared_ptr<ObjectId> objectId;
- Query query;
+ boost::shared_ptr<Query> query;
boost::shared_ptr<Value> arguments;
string exchange;
string bindingKey;
@@ -214,7 +214,7 @@ AgentEvent AgentEventImpl::copy()
item.sequence = sequence;
item.object = object;
item.objectId = objectId.get();
- item.query = &query;
+ item.query = query.get();
item.arguments = arguments.get();
item.objectClass = objectClass;
@@ -381,7 +381,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t
}
}
sendBufferLH(buffer, context->exchange, context->key);
- QPID_LOG(trace, "SENT MethodResponse");
+ QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
}
void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
@@ -403,7 +403,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop
object.impl->encodeStatistics(buffer);
sendBufferLH(buffer, context->exchange, context->key);
- QPID_LOG(trace, "SENT ContentIndication");
+ QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
}
void AgentEngineImpl::queryComplete(uint32_t sequence)
@@ -511,9 +511,10 @@ AgentEventImpl::Ptr AgentEngineImpl::eventQuery(uint32_t num, const string& user
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
event->sequence = num;
event->authUserId = userId;
- event->query.impl->packageName = package;
- event->query.impl->className = cls;
- event->query.impl->oid = oid;
+ if (oid.get())
+ event->query.reset(new Query(oid.get()));
+ else
+ event->query.reset(new Query(cls.c_str(), package.c_str()));
return event;
}
@@ -723,7 +724,7 @@ void AgentEngineImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const
ft.decode(inBuffer);
- QPID_LOG(trace, "RCVD GetQuery: map=" << ft);
+ QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft);
value = ft.get("_package");
if (value.get() && value->convertsTo<string>()) {
@@ -773,6 +774,8 @@ void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, con
AgentClassKey classKey(buffer);
buffer.getShortString(method);
+ QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method);
+
map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
if (pIter == packages.end()) {
sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname);
diff --git a/cpp/src/qmf/ConsoleEngine.cpp b/cpp/src/qmf/ConsoleEngine.cpp
index 3d1b378b68..e7991328ee 100644
--- a/cpp/src/qmf/ConsoleEngine.cpp
+++ b/cpp/src/qmf/ConsoleEngine.cpp
@@ -34,6 +34,7 @@
#include <qpid/sys/Mutex.h>
#include <qpid/log/Statement.h>
#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
#include <string.h>
#include <string>
#include <deque>
@@ -58,12 +59,27 @@ namespace qmf {
auto_ptr<Value> arguments;
MethodResponseImpl(Buffer& buf);
- ~MethodResponseImpl() {}
+ ~MethodResponseImpl() { delete envelope; }
uint32_t getStatus() const { return status; }
const Value* getException() const { return exception.get(); }
const Value* getArgs() const { return arguments.get(); }
};
+ struct QueryResponseImpl {
+ typedef boost::shared_ptr<QueryResponseImpl> Ptr;
+ QueryResponse *envelope;
+ uint32_t status;
+ auto_ptr<Value> exception;
+ vector<ObjectImpl::Ptr> results;
+
+ QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {}
+ ~QueryResponseImpl() { delete envelope; }
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ uint32_t getObjectCount() const { return results.size(); }
+ const Object* getObject(uint32_t idx) const;
+ };
+
struct ConsoleEventImpl {
typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
ConsoleEvent::EventKind kind;
@@ -89,13 +105,29 @@ namespace qmf {
string name;
string exchange;
string bindingKey;
+ void* context;
+ QueryResponseImpl::Ptr queryResponse;
BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {}
~BrokerEventImpl() {}
BrokerEvent copy();
};
- class BrokerProxyImpl : public SequenceContext {
+ struct AgentProxyImpl {
+ typedef boost::shared_ptr<AgentProxyImpl> Ptr;
+ AgentProxy* envelope;
+ ConsoleEngineImpl* console;
+ BrokerProxyImpl* broker;
+ uint32_t agentBank;
+ string label;
+
+ AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const string& l) :
+ envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {}
+ ~AgentProxyImpl() {}
+ const string& getLabel() const { return label; }
+ };
+
+ class BrokerProxyImpl {
public:
typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
@@ -114,12 +146,17 @@ namespace qmf {
bool getEvent(BrokerEvent& event) const;
void popEvent();
- // From SequenceContext
- void complete();
+ uint32_t agentCount() const;
+ const AgentProxy* getAgent(uint32_t idx) const;
+ void sendQuery(const Query& query, void* context, const AgentProxy* agent);
+ void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent);
void addBinding(const string& exchange, const string& key);
+ void staticRelease() { decOutstanding(); }
private:
+ friend class StaticContext;
+ friend class QueryContext;
mutable Mutex lock;
BrokerProxy* envelope;
ConsoleEngineImpl* console;
@@ -128,6 +165,7 @@ namespace qmf {
SequenceManager seqMgr;
uint32_t requestsOutstanding;
bool topicBound;
+ vector<AgentProxyImpl::Ptr> agentList;
deque<MessageImpl::Ptr> xmtQueue;
deque<BrokerEventImpl::Ptr> eventQueue;
@@ -138,6 +176,7 @@ namespace qmf {
BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
BrokerEventImpl::Ptr eventSetupComplete();
BrokerEventImpl::Ptr eventStable();
+ BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response);
void handleBrokerResponse(Buffer& inBuffer, uint32_t seq);
void handlePackageIndication(Buffer& inBuffer, uint32_t seq);
@@ -147,19 +186,33 @@ namespace qmf {
void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq);
void handleEventIndication(Buffer& inBuffer, uint32_t seq);
void handleSchemaResponse(Buffer& inBuffer, uint32_t seq);
- void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
+ ObjectImpl::Ptr handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
void incOutstandingLH();
void decOutstanding();
};
- struct AgentProxyImpl {
- typedef boost::shared_ptr<AgentProxyImpl> Ptr;
- AgentProxy* envelope;
- ConsoleEngineImpl* console;
+ struct StaticContext : public SequenceContext {
+ StaticContext(BrokerProxyImpl& b) : broker(b) {}
+ ~StaticContext() {}
+ void reserve() {}
+ void release() { broker.staticRelease(); }
+ bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
+ BrokerProxyImpl& broker;
+ };
- AgentProxyImpl(AgentProxy* e, ConsoleEngine& _console) :
- envelope(e), console(_console.impl) {}
- ~AgentProxyImpl() {}
+ struct QueryContext : public SequenceContext {
+ QueryContext(BrokerProxyImpl& b, void* u) :
+ broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {}
+ ~QueryContext() {}
+ void reserve();
+ void release();
+ bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
+
+ mutable Mutex lock;
+ BrokerProxyImpl& broker;
+ void* userContext;
+ uint32_t requestsOutstanding;
+ QueryResponseImpl::Ptr queryResponse;
};
class ConsoleEngineImpl {
@@ -187,11 +240,6 @@ namespace qmf {
void bindClass(const SchemaClassKey* key);
void bindClass(const char* packageName, const char* className);
- uint32_t agentCount() const;
- const AgentProxy* getAgent(uint32_t idx) const;
-
- void sendQuery(const Query& query, void* context);
-
/*
void startSync(const Query& query, void* context, SyncQuery& sync);
void touchSync(SyncQuery& sync);
@@ -226,13 +274,31 @@ namespace qmf {
void learnClass(SchemaObjectClassImpl::Ptr cls);
void learnClass(SchemaEventClassImpl::Ptr cls);
bool haveClass(const SchemaClassKeyImpl& key) const;
+ SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const;
};
}
namespace {
-const char* QMF_EXCHANGE = "qpid.management";
-const char* DIR_EXCHANGE = "amq.direct";
-const char* BROKER_KEY = "broker";
+ const char* QMF_EXCHANGE = "qpid.management";
+ const char* DIR_EXCHANGE = "amq.direct";
+ const char* BROKER_KEY = "broker";
+ const char* BROKER_PACKAGE = "org.apache.qpid.broker";
+ const char* AGENT_CLASS = "agent";
+ const char* BROKER_AGENT_KEY = "agent.1.0";
+}
+
+const Object* QueryResponseImpl::getObject(uint32_t idx) const
+{
+ vector<ObjectImpl::Ptr>::const_iterator iter = results.begin();
+
+ while (idx > 0) {
+ if (iter == results.end())
+ return 0;
+ iter++;
+ idx--;
+ }
+
+ return (*iter)->envelope;
}
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
@@ -267,19 +333,29 @@ BrokerEvent BrokerEventImpl::copy()
STRING_REF(name);
STRING_REF(exchange);
STRING_REF(bindingKey);
+ item.context = context;
+ item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0;
return item;
}
BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
- envelope(e), console(_console.impl), queueName("qmfc-")
+ envelope(e), console(_console.impl)
{
- // TODO: Give the queue name a unique suffix
+ stringstream qn;
+ qpid::TcpAddress addr;
+
+ SystemInfo::getLocalHostname(addr);
+ qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
+ queueName = qn.str();
+
+ seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
}
void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
{
Mutex::ScopedLock _lock(lock);
+ agentList.clear();
eventQueue.clear();
xmtQueue.clear();
eventQueue.push_back(eventDeclareQueue(queueName));
@@ -292,6 +368,7 @@ void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
void BrokerProxyImpl::sessionClosed()
{
Mutex::ScopedLock _lock(lock);
+ agentList.clear();
eventQueue.clear();
xmtQueue.clear();
}
@@ -302,11 +379,14 @@ void BrokerProxyImpl::startProtocol()
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
+ agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker")));
+
requestsOutstanding = 1;
topicBound = false;
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest");
+ QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
}
void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
@@ -330,23 +410,8 @@ void BrokerProxyImpl::handleRcvMessage(Message& message)
uint8_t opcode;
uint32_t sequence;
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
- if (opcode == Protocol::OP_BROKER_RESPONSE) handleBrokerResponse(inBuffer, sequence);
- else if (opcode == Protocol::OP_PACKAGE_INDICATION) handlePackageIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_COMMAND_COMPLETE) handleCommandComplete(inBuffer, sequence);
- else if (opcode == Protocol::OP_CLASS_INDICATION) handleClassIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_METHOD_RESPONSE) handleMethodResponse(inBuffer, sequence);
- else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) handleHeartbeatIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_EVENT_INDICATION) handleEventIndication(inBuffer, sequence);
- else if (opcode == Protocol::OP_SCHEMA_RESPONSE) handleSchemaResponse(inBuffer, sequence);
- else if (opcode == Protocol::OP_PROPERTY_INDICATION) handleObjectIndication(inBuffer, sequence, true, false);
- else if (opcode == Protocol::OP_STATISTIC_INDICATION) handleObjectIndication(inBuffer, sequence, false, true);
- else if (opcode == Protocol::OP_OBJECT_INDICATION) handleObjectIndication(inBuffer, sequence, true, true);
- else {
- QPID_LOG(trace, "BrokerProxyImpl::handleRcvMessage invalid opcode: " << opcode);
- break;
- }
- }
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
+ seqMgr.dispatch(opcode, sequence, inBuffer);
}
bool BrokerProxyImpl::getXmtMessage(Message& item) const
@@ -381,9 +446,48 @@ void BrokerProxyImpl::popEvent()
eventQueue.pop_front();
}
-void BrokerProxyImpl::complete()
+uint32_t BrokerProxyImpl::agentCount() const
{
- decOutstanding();
+ Mutex::ScopedLock _lock(lock);
+ return agentList.size();
+}
+
+const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
+{
+ Mutex::ScopedLock _lock(lock);
+ for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++)
+ if (idx-- == 0)
+ return (*iter)->envelope;
+ return 0;
+}
+
+void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
+{
+ SequenceContext::Ptr queryContext(new QueryContext(*this, context));
+ Mutex::ScopedLock _lock(lock);
+ if (agent != 0) {
+ sendGetRequestLH(queryContext, query, agent->impl);
+ } else {
+ // TODO (optimization) only send queries to agents that have the requested class+package
+ for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++) {
+ sendGetRequestLH(queryContext, query, (*iter).get());
+ }
+ }
+}
+
+void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent)
+{
+ stringstream key;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(queryContext));
+
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ query.impl->encode(outBuffer);
+ key << "agent.1." << agent->agentBank;
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
}
void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
@@ -420,17 +524,22 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
return event;
}
-void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response)
{
- // Note that this function doesn't touch requestsOutstanding. This is because
- // it accounts for one request completed (the BrokerRequest) and one request
- // started (the PackageRequest) which cancel each other out.
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
+ event->context = context;
+ event->queryResponse = response;
+ return event;
+}
+void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+{
brokerId.decode(inBuffer);
QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
Mutex::ScopedLock _lock(lock);
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(this));
+ uint32_t sequence(seqMgr.reserve());
+ incOutstandingLH();
Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
@@ -446,7 +555,7 @@ void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
Mutex::ScopedLock _lock(lock);
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(this));
+ uint32_t sequence(seqMgr.reserve());
incOutstandingLH();
Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
outBuffer.putShortString(package);
@@ -460,20 +569,12 @@ void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
uint32_t code = inBuffer.getLong();
inBuffer.getShortString(text);
QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
- seqMgr.release(seq);
}
void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
{
- string package;
- string clsName;
- SchemaHash hash;
uint8_t kind = inBuffer.getOctet();
- inBuffer.getShortString(package);
- inBuffer.getShortString(clsName);
- hash.decode(inBuffer);
- Uuid printableHash(hash.get());
- SchemaClassKeyImpl classKey(package, clsName, hash);
+ SchemaClassKeyImpl classKey(inBuffer);
QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
@@ -481,7 +582,7 @@ void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
Mutex::ScopedLock _lock(lock);
incOutstandingLH();
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(this));
+ uint32_t sequence(seqMgr.reserve());
Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
classKey.encode(outBuffer);
sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
@@ -515,6 +616,25 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
console->learnClass(oClassPtr);
key = oClassPtr->getClassKey()->impl;
QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
+
+ //
+ // If we have just learned about the org.apache.qpid.broker:agent class, send a get
+ // request for the current list of agents so we can have it on-hand before we declare
+ // this session "stable".
+ //
+ if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ FieldTable ft;
+ ft.setString("_class", AGENT_CLASS);
+ ft.setString("_package", BROKER_PACKAGE);
+ ft.encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
+ }
} else if (kind == CLASS_EVENT) {
eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
console->learnClass(eClassPtr);
@@ -524,13 +644,20 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
else {
QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
}
-
- decOutstanding();
}
-void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/)
+ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
{
- // TODO
+ SchemaClassKeyImpl classKey(inBuffer);
+ QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str());
+
+ SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey);
+ if (schema.get() == 0) {
+ QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str());
+ return ObjectImpl::Ptr();
+ }
+
+ return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, inBuffer, prop, stat, true));
}
void BrokerProxyImpl::incOutstandingLH()
@@ -567,6 +694,79 @@ MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodRespons
arguments.reset(new Value(TYPE_MAP));
}
+bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ bool completeContext = false;
+ if (opcode == Protocol::OP_BROKER_RESPONSE) {
+ broker.handleBrokerResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
+ broker.handleSchemaResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_PACKAGE_INDICATION)
+ broker.handlePackageIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_CLASS_INDICATION)
+ broker.handleClassIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
+ broker.handleHeartbeatIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_EVENT_INDICATION)
+ broker.handleEventIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_PROPERTY_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, true, false);
+ else if (opcode == Protocol::OP_STATISTIC_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, false, true);
+ else if (opcode == Protocol::OP_OBJECT_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, true, true);
+ else {
+ QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
+void QueryContext::reserve()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding++;
+}
+
+void QueryContext::release()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (--requestsOutstanding == 0) {
+ broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
+ }
+}
+
+bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ bool completeContext = false;
+ ObjectImpl::Ptr object;
+
+ if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, true);
+ if (object.get() != 0)
+ queryResponse->results.push_back(object);
+ }
+ else {
+ QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
envelope(e), settings(s)
{
@@ -757,23 +957,6 @@ void ConsoleEngineImpl::bindClass(const char* packageName, const char* className
(*iter)->addBinding(QMF_EXCHANGE, key.str());
}
-uint32_t ConsoleEngineImpl::agentCount() const
-{
- // TODO
- return 0;
-}
-
-const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const
-{
- // TODO
- return 0;
-}
-
-void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/)
-{
- // TODO
-}
-
/*
void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync)
{
@@ -835,11 +1018,29 @@ bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const
return oList.find(&key) != oList.end() || eList.find(&key) != eList.end();
}
+SchemaObjectClassImpl::Ptr ConsoleEngineImpl::getSchema(const SchemaClassKeyImpl& key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key.getPackageName());
+ if (pIter == packages.end())
+ return SchemaObjectClassImpl::Ptr();
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(&key);
+ if (iter == oList.end())
+ return SchemaObjectClassImpl::Ptr();
+
+ return iter->second;
+}
//==================================================================
// Wrappers
//==================================================================
+AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
+AgentProxy::~AgentProxy() { delete impl; }
+const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
+
BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
BrokerProxy::~BrokerProxy() { delete impl; }
void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
@@ -850,16 +1051,23 @@ bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessag
void BrokerProxy::popXmt() { impl->popXmt(); }
bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
void BrokerProxy::popEvent() { impl->popEvent(); }
-
-AgentProxy::AgentProxy(ConsoleEngine& console) : impl(new AgentProxyImpl(this, console)) {}
-AgentProxy::~AgentProxy() { delete impl; }
+uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
+const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
+void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
-MethodResponse::~MethodResponse() { delete impl; } // TODO: correct to delete here?
+MethodResponse::~MethodResponse() {}
uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
const Value* MethodResponse::getException() const { return impl->getException(); }
const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
+QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
+QueryResponse::~QueryResponse() {}
+uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
+const Value* QueryResponse::getException() const { return impl->getException(); }
+uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
+const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
+
ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {}
ConsoleEngine::~ConsoleEngine() { delete impl; }
bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
@@ -876,9 +1084,6 @@ const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key)
void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); }
void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
-uint32_t ConsoleEngine::agentCount() const { return impl->agentCount(); }
-const AgentProxy* ConsoleEngine::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
-void ConsoleEngine::sendQuery(const Query& query, void* context) { impl->sendQuery(query, context); }
//void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); }
//void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); }
//void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); }
diff --git a/cpp/src/qmf/ConsoleEngine.h b/cpp/src/qmf/ConsoleEngine.h
index 84ac78cd69..457e83ad58 100644
--- a/cpp/src/qmf/ConsoleEngine.h
+++ b/cpp/src/qmf/ConsoleEngine.h
@@ -37,6 +37,8 @@ namespace qmf {
class AgentProxy;
class AgentProxyImpl;
class MethodResponseImpl;
+ class QueryResponseImpl;
+ class QueryContext;
/**
*
@@ -57,6 +59,23 @@ namespace qmf {
/**
*
*/
+ class QueryResponse {
+ public:
+ QueryResponse(QueryResponseImpl* impl);
+ ~QueryResponse();
+ uint32_t getStatus() const;
+ const Value* getException() const;
+ uint32_t getObjectCount() const;
+ const Object* getObject(uint32_t idx) const;
+
+ private:
+ friend class QueryContext;
+ QueryResponseImpl *impl;
+ };
+
+ /**
+ *
+ */
struct ConsoleEvent {
enum EventKind {
AGENT_ADDED = 1,
@@ -64,7 +83,6 @@ namespace qmf {
NEW_PACKAGE = 3,
NEW_CLASS = 4,
OBJECT_UPDATE = 5,
- QUERY_COMPLETE = 6,
EVENT_RECEIVED = 7,
AGENT_HEARTBEAT = 8,
METHOD_RESPONSE = 9
@@ -75,11 +93,12 @@ namespace qmf {
char* name; // (NEW_PACKAGE)
SchemaClassKey* classKey; // (NEW_CLASS)
Object* object; // (OBJECT_UPDATE)
- void* context; // (OBJECT_UPDATE, QUERY_COMPLETE)
+ void* context; // (OBJECT_UPDATE)
Event* event; // (EVENT_RECEIVED)
uint64_t timestamp; // (AGENT_HEARTBEAT)
uint32_t methodHandle; // (METHOD_RESPONSE)
MethodResponse* methodResponse; // (METHOD_RESPONSE)
+ QueryResponse* queryResponse; // (QUERY_COMPLETE)
};
/**
@@ -93,13 +112,30 @@ namespace qmf {
BIND = 13,
UNBIND = 14,
SETUP_COMPLETE = 15,
- STABLE = 16
+ STABLE = 16,
+ QUERY_COMPLETE = 17
};
EventKind kind;
- char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
- char* exchange; // ([UN]BIND)
- char* bindingKey; // ([UN]BIND)
+ char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
+ char* exchange; // ([UN]BIND)
+ char* bindingKey; // ([UN]BIND)
+ void* context; // (QUERY_COMPLETE)
+ QueryResponse* queryResponse; // (QUERY_COMPLETE)
+ };
+
+ /**
+ *
+ */
+ class AgentProxy {
+ public:
+ AgentProxy(AgentProxyImpl* impl);
+ ~AgentProxy();
+ const char* getLabel() const;
+
+ private:
+ friend class BrokerProxyImpl;
+ AgentProxyImpl* impl;
};
/**
@@ -121,22 +157,13 @@ namespace qmf {
bool getEvent(BrokerEvent& event) const;
void popEvent();
- private:
- friend class ConsoleEngineImpl;
- BrokerProxyImpl* impl;
- };
-
- /**
- *
- */
- class AgentProxy {
- public:
- AgentProxy(ConsoleEngine& console);
- ~AgentProxy();
+ uint32_t agentCount() const;
+ const AgentProxy* getAgent(uint32_t idx) const;
+ void sendQuery(const Query& query, void* context, const AgentProxy* agent = 0);
private:
friend class ConsoleEngineImpl;
- AgentProxyImpl* impl;
+ BrokerProxyImpl* impl;
};
// TODO - move this to a public header
@@ -178,11 +205,6 @@ namespace qmf {
void bindClass(const SchemaClassKey* key);
void bindClass(const char* packageName, const char* className);
- uint32_t agentCount() const;
- const AgentProxy* getAgent(uint32_t idx) const;
-
- void sendQuery(const Query& query, void* context);
-
/*
void startSync(const Query& query, void* context, SyncQuery& sync);
void touchSync(SyncQuery& sync);
diff --git a/cpp/src/qmf/Object.h b/cpp/src/qmf/Object.h
index eb92cbbe45..9cb3224d9b 100644
--- a/cpp/src/qmf/Object.h
+++ b/cpp/src/qmf/Object.h
@@ -31,13 +31,14 @@ namespace qmf {
public:
Object(const SchemaObjectClass* type);
Object(ObjectImpl* impl);
+ Object(const Object& from);
virtual ~Object();
void destroy();
const ObjectId* getObjectId() const;
void setObjectId(ObjectId* oid);
const SchemaObjectClass* getClass() const;
- Value* getValue(char* key);
+ Value* getValue(char* key) const;
ObjectImpl* impl;
};
diff --git a/cpp/src/qmf/ObjectId.h b/cpp/src/qmf/ObjectId.h
index ffd1b6978b..e894e0b39c 100644
--- a/cpp/src/qmf/ObjectId.h
+++ b/cpp/src/qmf/ObjectId.h
@@ -30,6 +30,7 @@ namespace qmf {
class ObjectId {
public:
ObjectId();
+ ObjectId(const ObjectId& from);
ObjectId(ObjectIdImpl* impl);
~ObjectId();
diff --git a/cpp/src/qmf/ObjectIdImpl.cpp b/cpp/src/qmf/ObjectIdImpl.cpp
index 75661fdb47..c0618ccc49 100644
--- a/cpp/src/qmf/ObjectIdImpl.cpp
+++ b/cpp/src/qmf/ObjectIdImpl.cpp
@@ -100,6 +100,15 @@ void ObjectIdImpl::fromString(const std::string& repr)
agent = 0;
}
+std::string ObjectIdImpl::asString() const
+{
+ stringstream val;
+
+ val << getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" <<
+ getAgentBank() << "-" << getObjectNum();
+ return val.str();
+}
+
bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const
{
uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
@@ -126,15 +135,11 @@ bool ObjectIdImpl::operator>(const ObjectIdImpl& other) const
// Wrappers
//==================================================================
-ObjectId::ObjectId()
-{
- impl = new ObjectIdImpl(this);
-}
+ObjectId::ObjectId() : impl(new ObjectIdImpl(this)) {}
-ObjectId::ObjectId(ObjectIdImpl* i)
-{
- impl = i;
-}
+ObjectId::ObjectId(const ObjectId& from) : impl(new ObjectIdImpl(*(from.impl))) {}
+
+ObjectId::ObjectId(ObjectIdImpl* i) : impl(i) {}
ObjectId::~ObjectId()
{
diff --git a/cpp/src/qmf/ObjectIdImpl.h b/cpp/src/qmf/ObjectIdImpl.h
index 5d8ee59aee..38d231237f 100644
--- a/cpp/src/qmf/ObjectIdImpl.h
+++ b/cpp/src/qmf/ObjectIdImpl.h
@@ -39,13 +39,14 @@ namespace qmf {
uint64_t first;
uint64_t second;
- ObjectIdImpl(ObjectId* e) : envelope(e), agent(0) {}
+ ObjectIdImpl(ObjectId* e) : envelope(e), agent(0), first(0), second(0) {}
ObjectIdImpl(qpid::framing::Buffer& buffer);
ObjectIdImpl(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object);
void decode(qpid::framing::Buffer& buffer);
void encode(qpid::framing::Buffer& buffer) const;
void fromString(const std::string& repr);
+ std::string asString() const;
uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; }
uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; }
uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; }
diff --git a/cpp/src/qmf/ObjectImpl.cpp b/cpp/src/qmf/ObjectImpl.cpp
index 645ccd5c81..1ea2d54527 100644
--- a/cpp/src/qmf/ObjectImpl.cpp
+++ b/cpp/src/qmf/ObjectImpl.cpp
@@ -45,30 +45,40 @@ ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) :
}
}
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer) :
- envelope(new Object(this)), objectClass(type), createTime(uint64_t(Duration(now()))),
- destroyTime(0), lastUpdatedTime(createTime)
+ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop, bool stat, bool managed) :
+ envelope(new Object(this)), objectClass(type), createTime(0), destroyTime(0), lastUpdatedTime(0)
{
- int propCount = objectClass->getPropertyCount();
- int statCount = objectClass->getStatisticCount();
int idx;
- set<string> excludes;
- parsePresenceMasks(buffer, excludes);
- for (idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- if (excludes.count(prop->getName()) != 0) {
- properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
- } else {
- ValueImpl* pval = new ValueImpl(prop->getType(), buffer);
- properties[prop->getName()] = ValuePtr(pval->envelope);
+ if (managed) {
+ lastUpdatedTime = buffer.getLongLong();
+ createTime = buffer.getLongLong();
+ destroyTime = buffer.getLongLong();
+ objectId.reset(new ObjectIdImpl(buffer));
+ }
+
+ if (prop) {
+ int propCount = objectClass->getPropertyCount();
+ set<string> excludes;
+ parsePresenceMasks(buffer, excludes);
+ for (idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ if (excludes.count(prop->getName()) != 0) {
+ properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
+ } else {
+ ValueImpl* pval = new ValueImpl(prop->getType(), buffer);
+ properties[prop->getName()] = ValuePtr(pval->envelope);
+ }
}
}
- for (idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* stat = objectClass->getStatistic(idx);
- ValueImpl* sval = new ValueImpl(stat->getType(), buffer);
- statistics[stat->getName()] = ValuePtr(sval->envelope);
+ if (stat) {
+ int statCount = objectClass->getStatisticCount();
+ for (idx = 0; idx < statCount; idx++) {
+ const SchemaStatistic* stat = objectClass->getStatistic(idx);
+ ValueImpl* sval = new ValueImpl(stat->getType(), buffer);
+ statistics[stat->getName()] = ValuePtr(sval->envelope);
+ }
}
}
@@ -82,7 +92,7 @@ void ObjectImpl::destroy()
// TODO - flag deletion
}
-Value* ObjectImpl::getValue(const string& key)
+Value* ObjectImpl::getValue(const string& key) const
{
map<string, ValuePtr>::const_iterator iter;
@@ -133,7 +143,7 @@ void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const
buffer.putLongLong(lastUpdatedTime);
buffer.putLongLong(createTime);
buffer.putLongLong(destroyTime);
- objectId->impl->encode(buffer);
+ objectId->encode(buffer);
}
void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const
@@ -187,36 +197,12 @@ void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const
//==================================================================
Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(this, type)) {}
-
Object::Object(ObjectImpl* i) : impl(i) {}
-
-Object::~Object()
-{
- delete impl;
-}
-
-void Object::destroy()
-{
- impl->destroy();
-}
-
-const ObjectId* Object::getObjectId() const
-{
- return impl->getObjectId();
-}
-
-void Object::setObjectId(ObjectId* oid)
-{
- impl->setObjectId(oid);
-}
-
-const SchemaObjectClass* Object::getClass() const
-{
- return impl->getClass();
-}
-
-Value* Object::getValue(char* key)
-{
- return impl->getValue(key);
-}
+Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {}
+Object::~Object() { delete impl; }
+void Object::destroy() { impl->destroy(); }
+const ObjectId* Object::getObjectId() const { return impl->getObjectId(); }
+void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); }
+const SchemaObjectClass* Object::getClass() const { return impl->getClass(); }
+Value* Object::getValue(char* key) const { return impl->getValue(key); }
diff --git a/cpp/src/qmf/ObjectImpl.h b/cpp/src/qmf/ObjectImpl.h
index 4dc2170bfc..d69979e0da 100644
--- a/cpp/src/qmf/ObjectImpl.h
+++ b/cpp/src/qmf/ObjectImpl.h
@@ -21,19 +21,22 @@
*/
#include <qmf/Object.h>
+#include <qmf/ObjectIdImpl.h>
#include <map>
#include <set>
#include <string>
#include <qpid/framing/Buffer.h>
#include <boost/shared_ptr.hpp>
+#include <qpid/sys/Mutex.h>
namespace qmf {
struct ObjectImpl {
+ typedef boost::shared_ptr<ObjectImpl> Ptr;
typedef boost::shared_ptr<Value> ValuePtr;
Object* envelope;
const SchemaObjectClass* objectClass;
- boost::shared_ptr<ObjectId> objectId;
+ boost::shared_ptr<ObjectIdImpl> objectId;
uint64_t createTime;
uint64_t destroyTime;
uint64_t lastUpdatedTime;
@@ -41,14 +44,14 @@ namespace qmf {
mutable std::map<std::string, ValuePtr> statistics;
ObjectImpl(Object* e, const SchemaObjectClass* type);
- ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer);
+ ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer, bool prop, bool stat, bool managed);
~ObjectImpl();
void destroy();
- const ObjectId* getObjectId() const { return objectId.get(); }
- void setObjectId(ObjectId* oid) { objectId.reset(oid); }
+ const ObjectId* getObjectId() const { return objectId.get() ? objectId->envelope : 0; }
+ void setObjectId(ObjectId* oid) { objectId.reset(oid->impl); }
const SchemaObjectClass* getClass() const { return objectClass; }
- Value* getValue(const std::string& key);
+ Value* getValue(const std::string& key) const;
void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
diff --git a/cpp/src/qmf/Query.h b/cpp/src/qmf/Query.h
index 78bc6f4ae2..875749862e 100644
--- a/cpp/src/qmf/Query.h
+++ b/cpp/src/qmf/Query.h
@@ -25,26 +25,76 @@
namespace qmf {
+ struct Object;
+ struct QueryElementImpl;
struct QueryImpl;
+ struct QueryExpressionImpl;
+ struct SchemaClassKey;
+
+ enum ValueOper {
+ O_EQ = 1,
+ O_NE = 2,
+ O_LT = 3,
+ O_LE = 4,
+ O_GT = 5,
+ O_GE = 6,
+ O_RE_MATCH = 7,
+ O_RE_NOMATCH = 8
+ };
+
+ struct QueryOperand {
+ virtual ~QueryOperand() {}
+ virtual bool evaluate(const Object* object) const = 0;
+ };
+
+ struct QueryElement : public QueryOperand {
+ QueryElement(const char* attrName, const Value* value, ValueOper oper);
+ QueryElement(QueryElementImpl* impl);
+ virtual ~QueryElement();
+ bool evaluate(const Object* object) const;
+
+ QueryElementImpl* impl;
+ };
+
+ enum ExprOper {
+ E_NOT = 1,
+ E_AND = 2,
+ E_OR = 3,
+ E_XOR = 4
+ };
+
+ struct QueryExpression : public QueryOperand {
+ QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2);
+ QueryExpression(QueryExpressionImpl* impl);
+ virtual ~QueryExpression();
+ bool evaluate(const Object* object) const;
+
+ QueryExpressionImpl* impl;
+ };
+
class Query {
public:
- Query();
+ Query(const char* className, const char* packageName);
+ Query(const SchemaClassKey* key);
+ Query(const ObjectId* oid);
Query(QueryImpl* impl);
~Query();
+ void setSelect(const QueryOperand* criterion);
+ void setLimit(uint32_t maxResults);
+ void setOrderBy(const char* attrName, bool decreasing);
+
const char* getPackage() const;
const char* getClass() const;
const ObjectId* getObjectId() const;
- enum Oper {
- OPER_AND = 1,
- OPER_OR = 2
- };
-
- int whereCount() const;
- Oper whereOper() const;
- const char* whereKey() const;
- const Value* whereValue() const;
+ bool haveSelect() const;
+ bool haveLimit() const;
+ bool haveOrderBy() const;
+ const QueryOperand* getSelect() const;
+ uint32_t getLimit() const;
+ const char* getOrderBy() const;
+ bool getDecreasing() const;
QueryImpl* impl;
};
diff --git a/cpp/src/qmf/QueryImpl.cpp b/cpp/src/qmf/QueryImpl.cpp
index 7e827796bb..f75a9aa5d5 100644
--- a/cpp/src/qmf/QueryImpl.cpp
+++ b/cpp/src/qmf/QueryImpl.cpp
@@ -18,54 +18,77 @@
*/
#include "qmf/QueryImpl.h"
+#include "qmf/ObjectIdImpl.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/FieldTable.h"
using namespace std;
using namespace qmf;
+using namespace qpid::framing;
-//==================================================================
-// Wrappers
-//==================================================================
-
-Query::Query() : impl(new QueryImpl(this)) {}
-Query::Query(QueryImpl* i) : impl(i) {}
-
-Query::~Query()
+bool QueryElementImpl::evaluate(const Object* /*object*/) const
{
- delete impl;
+ // TODO: Implement this
+ return false;
}
-const char* Query::getPackage() const
+bool QueryExpressionImpl::evaluate(const Object* /*object*/) const
{
- return impl->getPackage();
+ // TODO: Implement this
+ return false;
}
-const char* Query::getClass() const
+QueryImpl::QueryImpl(Buffer& buffer)
{
- return impl->getClass();
+ FieldTable ft;
+ ft.decode(buffer);
+ // TODO
}
-const ObjectId* Query::getObjectId() const
+void QueryImpl::encode(Buffer& buffer) const
{
- return impl->getObjectId();
-}
+ FieldTable ft;
-int Query::whereCount() const
-{
- return impl->whereCount();
-}
+ if (oid.get() != 0) {
+ ft.setString("_objectid", oid->impl->asString());
+ } else {
+ if (!packageName.empty())
+ ft.setString("_package", packageName);
+ ft.setString("_class", className);
+ }
-Query::Oper Query::whereOper() const
-{
- return impl->whereOper();
+ ft.encode(buffer);
}
-const char* Query::whereKey() const
-{
- return impl->whereKey();
-}
-const Value* Query::whereValue() const
-{
- return impl->whereValue();
-}
+//==================================================================
+// Wrappers
+//==================================================================
+
+QueryElement::QueryElement(const char* attrName, const Value* value, ValueOper oper) : impl(new QueryElementImpl(attrName, value, oper)) {}
+QueryElement::QueryElement(QueryElementImpl* i) : impl(i) {}
+QueryElement::~QueryElement() { delete impl; }
+bool QueryElement::evaluate(const Object* object) const { return impl->evaluate(object); }
+QueryExpression::QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2) : impl(new QueryExpressionImpl(oper, operand1, operand2)) {}
+QueryExpression::QueryExpression(QueryExpressionImpl* i) : impl(i) {}
+QueryExpression::~QueryExpression() { delete impl; }
+bool QueryExpression::evaluate(const Object* object) const { return impl->evaluate(object); }
+Query::Query(const char* className, const char* packageName) : impl(new QueryImpl(className, packageName)) {}
+Query::Query(const SchemaClassKey* key) : impl(new QueryImpl(key)) {}
+Query::Query(const ObjectId* oid) : impl(new QueryImpl(oid)) {}
+Query::Query(QueryImpl* i) : impl(i) {}
+Query::~Query() { delete impl; }
+void Query::setSelect(const QueryOperand* criterion) { impl->setSelect(criterion); }
+void Query::setLimit(uint32_t maxResults) { impl->setLimit(maxResults); }
+void Query::setOrderBy(const char* attrName, bool decreasing) { impl->setOrderBy(attrName, decreasing); }
+const char* Query::getPackage() const { return impl->getPackage().c_str(); }
+const char* Query::getClass() const { return impl->getClass().c_str(); }
+const ObjectId* Query::getObjectId() const { return impl->getObjectId(); }
+bool Query::haveSelect() const { return impl->haveSelect(); }
+bool Query::haveLimit() const { return impl->haveLimit(); }
+bool Query::haveOrderBy() const { return impl->haveOrderBy(); }
+const QueryOperand* Query::getSelect() const { return impl->getSelect(); }
+uint32_t Query::getLimit() const { return impl->getLimit(); }
+const char* Query::getOrderBy() const { return impl->getOrderBy().c_str(); }
+bool Query::getDecreasing() const { return impl->getDecreasing(); }
diff --git a/cpp/src/qmf/QueryImpl.h b/cpp/src/qmf/QueryImpl.h
index 1cb9bfe554..4a56a457c0 100644
--- a/cpp/src/qmf/QueryImpl.h
+++ b/cpp/src/qmf/QueryImpl.h
@@ -20,28 +20,82 @@
* under the License.
*/
-#include <qmf/Query.h>
+#include "qmf/Query.h"
+#include "qmf/Schema.h"
#include <string>
#include <boost/shared_ptr.hpp>
+namespace qpid {
+ namespace framing {
+ class Buffer;
+ }
+}
+
namespace qmf {
+ struct QueryElementImpl {
+ QueryElementImpl(const std::string& a, const Value* v, ValueOper o) :
+ envelope(new QueryElement(this)), attrName(a), value(v), oper(o) {}
+ ~QueryElementImpl() {}
+ bool evaluate(const Object* object) const;
+
+ QueryElement* envelope;
+ std::string attrName;
+ const Value* value;
+ ValueOper oper;
+ };
+
+ struct QueryExpressionImpl {
+ QueryExpressionImpl(ExprOper o, const QueryOperand* operand1, const QueryOperand* operand2) :
+ envelope(new QueryExpression(this)), oper(o), left(operand1), right(operand2) {}
+ ~QueryExpressionImpl() {}
+ bool evaluate(const Object* object) const;
+
+ QueryExpression* envelope;
+ ExprOper oper;
+ const QueryOperand* left;
+ const QueryOperand* right;
+ };
+
struct QueryImpl {
- Query* envelope;
- std::string packageName;
- std::string className;
- boost::shared_ptr<ObjectId> oid;
+ QueryImpl(Query* e) : envelope(e), select(0) {}
+ QueryImpl(const std::string& c, const std::string& p) :
+ envelope(new Query(this)), packageName(p), className(c) {}
+ QueryImpl(const SchemaClassKey* key) :
+ envelope(new Query(this)), packageName(key->getPackageName()), className(key->getClassName()) {}
+ QueryImpl(const ObjectId* oid) :
+ envelope(new Query(this)), oid(new ObjectId(*oid)) {}
+ QueryImpl(qpid::framing::Buffer& buffer);
+ ~QueryImpl() {};
- QueryImpl(Query* e) : envelope(e) {}
+ void setSelect(const QueryOperand* criterion) { select = criterion; }
+ void setLimit(uint32_t maxResults) { resultLimit = maxResults; }
+ void setOrderBy(const std::string& attrName, bool decreasing) {
+ orderBy = attrName; orderDecreasing = decreasing;
+ }
- const char* getPackage() const { return packageName.empty() ? 0 : packageName.c_str(); }
- const char* getClass() const { return className.empty() ? 0 : className.c_str(); }
+ const std::string& getPackage() const { return packageName; }
+ const std::string& getClass() const { return className; }
const ObjectId* getObjectId() const { return oid.get(); }
- int whereCount() const { return 0;}
- Query::Oper whereOper() const { return Query::OPER_AND; }
- const char* whereKey() const { return 0; }
- const Value* whereValue() const { return 0; }
+ bool haveSelect() const { return select != 0; }
+ bool haveLimit() const { return resultLimit > 0; }
+ bool haveOrderBy() const { return !orderBy.empty(); }
+ const QueryOperand* getSelect() const { return select; }
+ uint32_t getLimit() const { return resultLimit; }
+ const std::string& getOrderBy() const { return orderBy; }
+ bool getDecreasing() const { return orderDecreasing; }
+
+ void encode(qpid::framing::Buffer& buffer) const;
+
+ Query* envelope;
+ std::string packageName;
+ std::string className;
+ boost::shared_ptr<ObjectId> oid;
+ const QueryOperand* select;
+ uint32_t resultLimit;
+ std::string orderBy;
+ bool orderDecreasing;
};
}
diff --git a/cpp/src/qmf/SchemaImpl.cpp b/cpp/src/qmf/SchemaImpl.cpp
index ae7d6ca689..3eb14c3952 100644
--- a/cpp/src/qmf/SchemaImpl.cpp
+++ b/cpp/src/qmf/SchemaImpl.cpp
@@ -261,7 +261,15 @@ void SchemaStatisticImpl::updateHash(SchemaHash& hash) const
SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) :
envelope(new SchemaClassKey(this)), package(p), name(n), hash(h) {}
-void SchemaClassKeyImpl::encode(qpid::framing::Buffer& buffer) const
+SchemaClassKeyImpl::SchemaClassKeyImpl(Buffer& buffer) :
+ envelope(new SchemaClassKey(this)), package(packageContainer), name(nameContainer), hash(hashContainer)
+{
+ buffer.getShortString(packageContainer);
+ buffer.getShortString(nameContainer);
+ hashContainer.decode(buffer);
+}
+
+void SchemaClassKeyImpl::encode(Buffer& buffer) const
{
buffer.putShortString(package);
buffer.putShortString(name);
@@ -413,8 +421,9 @@ SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) :
buffer.getShortString(package);
buffer.getShortString(name);
hash.decode(buffer);
+ buffer.putOctet(0); // No parent class
- uint16_t argCount = buffer.getShort();
+ uint16_t argCount = buffer.getShort();
for (uint16_t idx = 0; idx < argCount; idx++) {
SchemaArgumentImpl* argument = new SchemaArgumentImpl(buffer);
diff --git a/cpp/src/qmf/SchemaImpl.h b/cpp/src/qmf/SchemaImpl.h
index 3e9677d1fa..035d99aecd 100644
--- a/cpp/src/qmf/SchemaImpl.h
+++ b/cpp/src/qmf/SchemaImpl.h
@@ -148,7 +148,14 @@ namespace qmf {
const std::string& name;
const SchemaHash& hash;
+ // The *Container elements are only used if there isn't an external place to
+ // store these values.
+ std::string packageContainer;
+ std::string nameContainer;
+ SchemaHash hashContainer;
+
SchemaClassKeyImpl(const std::string& package, const std::string& name, const SchemaHash& hash);
+ SchemaClassKeyImpl(qpid::framing::Buffer& buffer);
const std::string& getPackageName() const { return package; }
const std::string& getClassName() const { return name; }
diff --git a/cpp/src/qmf/SequenceManager.cpp b/cpp/src/qmf/SequenceManager.cpp
index f51ce9d8b8..3171e66fac 100644
--- a/cpp/src/qmf/SequenceManager.cpp
+++ b/cpp/src/qmf/SequenceManager.cpp
@@ -25,26 +25,72 @@ using namespace qpid::sys;
SequenceManager::SequenceManager() : nextSequence(1) {}
-uint32_t SequenceManager::reserve(SequenceContext* ctx)
+void SequenceManager::setUnsolicitedContext(SequenceContext::Ptr ctx)
+{
+ unsolicitedContext = ctx;
+}
+
+uint32_t SequenceManager::reserve(SequenceContext::Ptr ctx)
{
Mutex::ScopedLock _lock(lock);
+ if (ctx.get() == 0)
+ ctx = unsolicitedContext;
uint32_t seq = nextSequence;
while (contextMap.find(seq) != contextMap.end())
seq = seq < 0xFFFFFFFF ? seq + 1 : 1;
nextSequence = seq < 0xFFFFFFFF ? seq + 1 : 1;
contextMap[seq] = ctx;
+ ctx->reserve();
return seq;
}
void SequenceManager::release(uint32_t sequence)
{
Mutex::ScopedLock _lock(lock);
- map<uint32_t, SequenceContext*>::iterator iter = contextMap.find(sequence);
+
+ if (sequence == 0) {
+ if (unsolicitedContext.get() != 0)
+ unsolicitedContext->release();
+ return;
+ }
+
+ map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter != contextMap.end()) {
if (iter->second != 0)
- iter->second->complete();
+ iter->second->release();
contextMap.erase(iter);
}
}
+void SequenceManager::releaseAll()
+{
+ Mutex::ScopedLock _lock(lock);
+ contextMap.clear();
+}
+
+void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer)
+{
+ Mutex::ScopedLock _lock(lock);
+ bool done;
+
+ if (sequence == 0) {
+ if (unsolicitedContext.get() != 0) {
+ done = unsolicitedContext->handleMessage(opcode, sequence, buffer);
+ if (done)
+ unsolicitedContext->release();
+ }
+ return;
+ }
+
+ map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter != contextMap.end()) {
+ if (iter->second != 0) {
+ done = iter->second->handleMessage(opcode, sequence, buffer);
+ if (done) {
+ iter->second->release();
+ contextMap.erase(iter);
+ }
+ }
+ }
+}
diff --git a/cpp/src/qmf/SequenceManager.h b/cpp/src/qmf/SequenceManager.h
index c027872313..bbfd0728a7 100644
--- a/cpp/src/qmf/SequenceManager.h
+++ b/cpp/src/qmf/SequenceManager.h
@@ -21,29 +21,43 @@
*/
#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
#include <map>
+namespace qpid {
+ namespace framing {
+ class Buffer;
+ }
+}
+
namespace qmf {
class SequenceContext {
public:
+ typedef boost::shared_ptr<SequenceContext> Ptr;
SequenceContext() {}
virtual ~SequenceContext() {}
- virtual void complete() = 0;
+ virtual void reserve() = 0;
+ virtual bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer) = 0;
+ virtual void release() = 0;
};
class SequenceManager {
public:
SequenceManager();
- uint32_t reserve(SequenceContext* ctx);
+ void setUnsolicitedContext(SequenceContext::Ptr ctx);
+ uint32_t reserve(SequenceContext::Ptr ctx = SequenceContext::Ptr());
void release(uint32_t sequence);
+ void releaseAll();
+ void dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
private:
mutable qpid::sys::Mutex lock;
uint32_t nextSequence;
- std::map<uint32_t, SequenceContext*> contextMap;
+ SequenceContext::Ptr unsolicitedContext;
+ std::map<uint32_t, SequenceContext::Ptr> contextMap;
};
}