diff options
-rw-r--r-- | cpp/bindings/qmf/python/qmf.py | 108 | ||||
-rw-r--r-- | cpp/bindings/qmf/ruby/qmf.rb | 80 | ||||
-rwxr-xr-x | cpp/bindings/qmf/tests/ruby_console.rb | 5 | ||||
-rwxr-xr-x | cpp/bindings/qmf/tests/ruby_console_test.rb | 13 | ||||
-rw-r--r-- | cpp/include/qmf/engine/Console.h | 1 | ||||
-rw-r--r-- | cpp/src/qmf/engine/BrokerProxyImpl.cpp | 1 |
6 files changed, 109 insertions, 99 deletions
diff --git a/cpp/bindings/qmf/python/qmf.py b/cpp/bindings/qmf/python/qmf.py index 871a25e207..e97279a8fa 100644 --- a/cpp/bindings/qmf/python/qmf.py +++ b/cpp/bindings/qmf/python/qmf.py @@ -535,7 +535,7 @@ class ConsoleObject(QmfObject): return self.impl.isDeleted() - def index(self): pass + def key(self): pass @@ -545,6 +545,7 @@ class ObjectId: self.impl = impl else: self.impl = qmfengine.ObjectId() + self.agent_key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank()) def object_num_high(self): @@ -555,13 +556,8 @@ class ObjectId: return self.impl.getObjectNumLo() - def broker_bank(self): - return self.impl.getBrokerBank() - - - def agent_bank(self): - return self.impl.getAgentBank() - + def agent_key(self): + self.agent_key def __eq__(self, other): if not isinstance(other, self.__class__): return False @@ -1076,13 +1072,18 @@ class Console(Thread): def objects(self, query, kwargs = {}): timeout = 30 + agent = None temp_args = kwargs.copy() if type(query) == type({}): temp_args.update(query) - if "timeout" in temp_args: - timeout = temp_args["timeout"] - temp_args.pop("timeout") + if "_timeout" in temp_args: + timeout = temp_args["_timeout"] + temp_args.pop("_timeout") + + if "_agent" in temp_args: + agent = temp_args["_agent"] + temp_args.pop("_agent") if type(query) == type({}): query = Query(temp_args) @@ -1097,7 +1098,7 @@ class Console(Thread): self._sync_count = 1 self._sync_result = [] broker = self._broker_list[0] - broker.send_query(query.impl, None) + broker.send_query(query.impl, None, agent) self._cv.wait(timeout) if self._sync_count == 1: raise Exception("Timed out: waiting for query response") @@ -1193,38 +1194,40 @@ class Console(Thread): valid = self.impl.getEvent(self._event) while valid: count += 1 - if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: - logging.debug("Console Event AGENT_ADDED received") - if self._handler: - self._handler.agent_added(AgentProxy(self._event.agent, None)) - elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: - logging.debug("Console Event AGENT_DELETED received") - if self._handler: - self._handler.agent_deleted(AgentProxy(self._event.agent, None)) - elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: - logging.debug("Console Event NEW_PACKAGE received") - if self._handler: - self._handler.new_package(self._event.name) - elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: - logging.debug("Console Event NEW_CLASS received") - if self._handler: - self._handler.new_class(SchemaClassKey(self._event.classKey)) - elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: - logging.debug("Console Event OBJECT_UPDATE received") - if self._handler: - self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), - self._event.hasProps, self._event.hasStats) - elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: - logging.debug("Console Event EVENT_RECEIVED received") - elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: - logging.debug("Console Event AGENT_HEARTBEAT received") - if self._handler: - self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) - elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: - logging.debug("Console Event METHOD_RESPONSE received") - else: - logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) - + try: + if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: + logging.debug("Console Event AGENT_ADDED received") + if self._handler: + self._handler.agent_added(AgentProxy(self._event.agent, None)) + elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: + logging.debug("Console Event AGENT_DELETED received") + if self._handler: + self._handler.agent_deleted(AgentProxy(self._event.agent, None)) + elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: + logging.debug("Console Event NEW_PACKAGE received") + if self._handler: + self._handler.new_package(self._event.name) + elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: + logging.debug("Console Event NEW_CLASS received") + if self._handler: + self._handler.new_class(SchemaClassKey(self._event.classKey)) + elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: + logging.debug("Console Event OBJECT_UPDATE received") + if self._handler: + self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), + self._event.hasProps, self._event.hasStats) + elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: + logging.debug("Console Event EVENT_RECEIVED received") + elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: + logging.debug("Console Event AGENT_HEARTBEAT received") + if self._handler: + self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) + elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: + logging.debug("Console Event METHOD_RESPONSE received") + else: + logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) + except e: + print "Exception caught in callback thread:", e self.impl.popEvent() valid = self.impl.getEvent(self._event) return count @@ -1236,19 +1239,15 @@ class AgentProxy: def __init__(self, impl, broker): self.impl = impl self.broker = broker + self.key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank()) def label(self): return self.impl.getLabel() - def broker_bank(self): - return self.impl.getBrokerBank() - - - def agent_bank(self): - return self.impl.getAgentBank() - + def key(self): + return self.key class Broker(ConnectionHandler): @@ -1298,8 +1297,11 @@ class Broker(ConnectionHandler): self._cv.release() - def send_query(self, query, ctx): - self.impl.sendQuery(query, ctx) + def send_query(self, query, ctx, agent): + agent_impl = None + if agent: + agent_impl = agent.impl + self.impl.sendQuery(query, ctx, agent_impl) self.conn.kick() diff --git a/cpp/bindings/qmf/ruby/qmf.rb b/cpp/bindings/qmf/ruby/qmf.rb index db80381581..41c2c4a1ee 100644 --- a/cpp/bindings/qmf/ruby/qmf.rb +++ b/cpp/bindings/qmf/ruby/qmf.rb @@ -448,18 +448,19 @@ module Qmf @impl.isDeleted end - def index() + def key() end end class ObjectId - attr_reader :impl + attr_reader :impl, :agent_key def initialize(impl=nil) if impl @impl = impl else @impl = Qmfengine::ObjectId.new end + @agent_key = "#{@impl.getBrokerBank}.#{@impl.getAgentBank}" end def object_num_high @@ -470,14 +471,6 @@ module Qmf @impl.getObjectNumLo end - def broker_bank - @impl.getBrokerBank - end - - def agent_bank - @impl.getAgentBank - end - def ==(other) return (@impl.getObjectNumHi == other.impl.getObjectNumHi) && (@impl.getObjectNumLo == other.impl.getObjectNumLo) @@ -748,7 +741,7 @@ module Qmf class SchemaClassKey attr_reader :impl def initialize(i) - @impl = i + @impl = Qmfengine::SchemaClassKey.new(i) end def package_name @@ -956,6 +949,7 @@ module Qmf def objects(query, kwargs = {}) timeout = 30 + agent = nil kwargs.merge!(query) if query.class == Hash if kwargs.include?(:timeout) @@ -963,6 +957,11 @@ module Qmf kwargs.delete(:timeout) end + if kwargs.include?(:agent) + agent = kwargs[:agent] + kwargs.delete(:agent) + end + query = Query.new(kwargs) if query.class == Hash @select = [] @@ -975,7 +974,7 @@ module Qmf @sync_result = [] broker = nil synchronize { broker = @broker_list[0] } - broker.send_query(query.impl, nil) + broker.send_query(query.impl, nil, agent) unless @cv.wait(timeout) { @sync_count == 0 } raise "Timed out waiting for response" end @@ -1045,21 +1044,25 @@ module Qmf valid = @impl.getEvent(@event) while valid count += 1 - case @event.kind - when Qmfengine::ConsoleEvent::AGENT_ADDED - @handler.agent_added(AgentProxy.new(@event.agent, nil)) if @handler - when Qmfengine::ConsoleEvent::AGENT_DELETED - @handler.agent_deleted(AgentProxy.new(@event.agent, nil)) if @handler - when Qmfengine::ConsoleEvent::NEW_PACKAGE - @handler.new_package(@event.name) if @handler - when Qmfengine::ConsoleEvent::NEW_CLASS - @handler.new_class(SchemaClassKey.new(@event.classKey)) if @handler - when Qmfengine::ConsoleEvent::OBJECT_UPDATE - @handler.object_update(ConsoleObject.new(nil, :impl => @event.object), @event.hasProps, @event.hasStats) if @handler - when Qmfengine::ConsoleEvent::EVENT_RECEIVED - when Qmfengine::ConsoleEvent::AGENT_HEARTBEAT - @handler.agent_heartbeat(AgentProxy.new(@event.agent, nil), @event.timestamp) if @handler - when Qmfengine::ConsoleEvent::METHOD_RESPONSE + begin + case @event.kind + when Qmfengine::ConsoleEvent::AGENT_ADDED + @handler.agent_added(AgentProxy.new(@event.agent, nil)) if @handler + when Qmfengine::ConsoleEvent::AGENT_DELETED + @handler.agent_deleted(AgentProxy.new(@event.agent, nil)) if @handler + when Qmfengine::ConsoleEvent::NEW_PACKAGE + @handler.new_package(@event.name) if @handler + when Qmfengine::ConsoleEvent::NEW_CLASS + @handler.new_class(SchemaClassKey.new(@event.classKey)) if @handler + when Qmfengine::ConsoleEvent::OBJECT_UPDATE + @handler.object_update(ConsoleObject.new(nil, :impl => @event.object), @event.hasProps, @event.hasStats) if @handler + when Qmfengine::ConsoleEvent::EVENT_RECEIVED + when Qmfengine::ConsoleEvent::AGENT_HEARTBEAT + @handler.agent_heartbeat(AgentProxy.new(@event.agent, nil), @event.timestamp) if @handler + when Qmfengine::ConsoleEvent::METHOD_RESPONSE + end + rescue + puts "Exception caught in callback thread: #{$!}" end @impl.popEvent valid = @impl.getEvent(@event) @@ -1069,23 +1072,13 @@ module Qmf end class AgentProxy - attr_reader :broker + attr_reader :impl, :broker, :label, :key def initialize(impl, broker) - @impl = impl + @impl = Qmfengine::AgentProxy.new(impl) @broker = broker - end - - def label - @impl.getLabel - end - - def broker_bank - @impl.getBrokerBank - end - - def agent_bank - @impl.getAgentBank + @label = @impl.getLabel + @key = "#{@impl.getBrokerBank}.#{@impl.getAgentBank}" end end @@ -1130,8 +1123,9 @@ module Qmf end end - def send_query(query, ctx) - @impl.sendQuery(query, ctx) + def send_query(query, ctx, agent) + agent_impl = agent.impl if agent + @impl.sendQuery(query, ctx, agent_impl) @conn.kick end diff --git a/cpp/bindings/qmf/tests/ruby_console.rb b/cpp/bindings/qmf/tests/ruby_console.rb index a76099f83a..31670312d6 100755 --- a/cpp/bindings/qmf/tests/ruby_console.rb +++ b/cpp/bindings/qmf/tests/ruby_console.rb @@ -25,7 +25,7 @@ require 'socket' class App < Qmf::ConsoleHandler def agent_added(agent) - puts "AgentAdded: #{agent.label} broker=#{agent.broker_bank} agent=#{agent.agent_bank}" + puts "AgentAdded: label=#{agent.label} key=#{agent.key}" end def agent_deleted(agent) @@ -42,8 +42,7 @@ class App < Qmf::ConsoleHandler def object_update(object, hasProps, hasStats) puts "ObjectUpdate: #{object.object_class.class_name} props=#{hasProps} stats=#{hasStats}" - puts " broker-bank=#{object.object_id.broker_bank}" - puts " agent-bank=#{object.object_id.agent_bank}" + puts " agent-key=#{object.object_id.agent_key}" puts " package=#{object.object_class.package_name}" end diff --git a/cpp/bindings/qmf/tests/ruby_console_test.rb b/cpp/bindings/qmf/tests/ruby_console_test.rb index e0440367c5..98180a97cd 100755 --- a/cpp/bindings/qmf/tests/ruby_console_test.rb +++ b/cpp/bindings/qmf/tests/ruby_console_test.rb @@ -201,6 +201,19 @@ class ConsoleTest < ConsoleTestBase end + def test_D_get_with_agent + agents = @qmfc.agents + agents.each do |agent| + if agent.label == "qmfa" + parent = @qmfc.object(:class => "parent", :agent => agent) + assert(parent, "Number of parent objects") + return + end + end + + fail("Didn't find a non-broker agent") + end + end app = ConsoleTest.new diff --git a/cpp/include/qmf/engine/Console.h b/cpp/include/qmf/engine/Console.h index ce72360da7..8463023468 100644 --- a/cpp/include/qmf/engine/Console.h +++ b/cpp/include/qmf/engine/Console.h @@ -134,6 +134,7 @@ namespace engine { */ class AgentProxy { public: + AgentProxy(const AgentProxy& from); ~AgentProxy(); const char* getLabel() const; uint32_t getBrokerBank() const; diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/cpp/src/qmf/engine/BrokerProxyImpl.cpp index 0a1769f891..2d955b0c26 100644 --- a/cpp/src/qmf/engine/BrokerProxyImpl.cpp +++ b/cpp/src/qmf/engine/BrokerProxyImpl.cpp @@ -741,6 +741,7 @@ bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const strin //================================================================== AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {} +AgentProxy::AgentProxy(const AgentProxy& from) : impl(new AgentProxyImpl(*(from.impl))) {} AgentProxy::~AgentProxy() { delete impl; } const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); } uint32_t AgentProxy::getBrokerBank() const { return impl->getBrokerBank(); } |