summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/bindings/qmf/python/qmf.py108
-rw-r--r--cpp/bindings/qmf/ruby/qmf.rb80
-rwxr-xr-xcpp/bindings/qmf/tests/ruby_console.rb5
-rwxr-xr-xcpp/bindings/qmf/tests/ruby_console_test.rb13
-rw-r--r--cpp/include/qmf/engine/Console.h1
-rw-r--r--cpp/src/qmf/engine/BrokerProxyImpl.cpp1
6 files changed, 109 insertions, 99 deletions
diff --git a/cpp/bindings/qmf/python/qmf.py b/cpp/bindings/qmf/python/qmf.py
index 871a25e207..e97279a8fa 100644
--- a/cpp/bindings/qmf/python/qmf.py
+++ b/cpp/bindings/qmf/python/qmf.py
@@ -535,7 +535,7 @@ class ConsoleObject(QmfObject):
return self.impl.isDeleted()
- def index(self): pass
+ def key(self): pass
@@ -545,6 +545,7 @@ class ObjectId:
self.impl = impl
else:
self.impl = qmfengine.ObjectId()
+ self.agent_key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank())
def object_num_high(self):
@@ -555,13 +556,8 @@ class ObjectId:
return self.impl.getObjectNumLo()
- def broker_bank(self):
- return self.impl.getBrokerBank()
-
-
- def agent_bank(self):
- return self.impl.getAgentBank()
-
+ def agent_key(self):
+ self.agent_key
def __eq__(self, other):
if not isinstance(other, self.__class__): return False
@@ -1076,13 +1072,18 @@ class Console(Thread):
def objects(self, query, kwargs = {}):
timeout = 30
+ agent = None
temp_args = kwargs.copy()
if type(query) == type({}):
temp_args.update(query)
- if "timeout" in temp_args:
- timeout = temp_args["timeout"]
- temp_args.pop("timeout")
+ if "_timeout" in temp_args:
+ timeout = temp_args["_timeout"]
+ temp_args.pop("_timeout")
+
+ if "_agent" in temp_args:
+ agent = temp_args["_agent"]
+ temp_args.pop("_agent")
if type(query) == type({}):
query = Query(temp_args)
@@ -1097,7 +1098,7 @@ class Console(Thread):
self._sync_count = 1
self._sync_result = []
broker = self._broker_list[0]
- broker.send_query(query.impl, None)
+ broker.send_query(query.impl, None, agent)
self._cv.wait(timeout)
if self._sync_count == 1:
raise Exception("Timed out: waiting for query response")
@@ -1193,38 +1194,40 @@ class Console(Thread):
valid = self.impl.getEvent(self._event)
while valid:
count += 1
- if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
- logging.debug("Console Event AGENT_ADDED received")
- if self._handler:
- self._handler.agent_added(AgentProxy(self._event.agent, None))
- elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
- logging.debug("Console Event AGENT_DELETED received")
- if self._handler:
- self._handler.agent_deleted(AgentProxy(self._event.agent, None))
- elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
- logging.debug("Console Event NEW_PACKAGE received")
- if self._handler:
- self._handler.new_package(self._event.name)
- elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
- logging.debug("Console Event NEW_CLASS received")
- if self._handler:
- self._handler.new_class(SchemaClassKey(self._event.classKey))
- elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
- logging.debug("Console Event OBJECT_UPDATE received")
- if self._handler:
- self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
- self._event.hasProps, self._event.hasStats)
- elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
- logging.debug("Console Event EVENT_RECEIVED received")
- elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
- logging.debug("Console Event AGENT_HEARTBEAT received")
- if self._handler:
- self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
- elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
- logging.debug("Console Event METHOD_RESPONSE received")
- else:
- logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
-
+ try:
+ if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
+ logging.debug("Console Event AGENT_ADDED received")
+ if self._handler:
+ self._handler.agent_added(AgentProxy(self._event.agent, None))
+ elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
+ logging.debug("Console Event AGENT_DELETED received")
+ if self._handler:
+ self._handler.agent_deleted(AgentProxy(self._event.agent, None))
+ elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
+ logging.debug("Console Event NEW_PACKAGE received")
+ if self._handler:
+ self._handler.new_package(self._event.name)
+ elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
+ logging.debug("Console Event NEW_CLASS received")
+ if self._handler:
+ self._handler.new_class(SchemaClassKey(self._event.classKey))
+ elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
+ logging.debug("Console Event OBJECT_UPDATE received")
+ if self._handler:
+ self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
+ self._event.hasProps, self._event.hasStats)
+ elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
+ logging.debug("Console Event EVENT_RECEIVED received")
+ elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
+ logging.debug("Console Event AGENT_HEARTBEAT received")
+ if self._handler:
+ self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
+ elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
+ logging.debug("Console Event METHOD_RESPONSE received")
+ else:
+ logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
+ except e:
+ print "Exception caught in callback thread:", e
self.impl.popEvent()
valid = self.impl.getEvent(self._event)
return count
@@ -1236,19 +1239,15 @@ class AgentProxy:
def __init__(self, impl, broker):
self.impl = impl
self.broker = broker
+ self.key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank())
def label(self):
return self.impl.getLabel()
- def broker_bank(self):
- return self.impl.getBrokerBank()
-
-
- def agent_bank(self):
- return self.impl.getAgentBank()
-
+ def key(self):
+ return self.key
class Broker(ConnectionHandler):
@@ -1298,8 +1297,11 @@ class Broker(ConnectionHandler):
self._cv.release()
- def send_query(self, query, ctx):
- self.impl.sendQuery(query, ctx)
+ def send_query(self, query, ctx, agent):
+ agent_impl = None
+ if agent:
+ agent_impl = agent.impl
+ self.impl.sendQuery(query, ctx, agent_impl)
self.conn.kick()
diff --git a/cpp/bindings/qmf/ruby/qmf.rb b/cpp/bindings/qmf/ruby/qmf.rb
index db80381581..41c2c4a1ee 100644
--- a/cpp/bindings/qmf/ruby/qmf.rb
+++ b/cpp/bindings/qmf/ruby/qmf.rb
@@ -448,18 +448,19 @@ module Qmf
@impl.isDeleted
end
- def index()
+ def key()
end
end
class ObjectId
- attr_reader :impl
+ attr_reader :impl, :agent_key
def initialize(impl=nil)
if impl
@impl = impl
else
@impl = Qmfengine::ObjectId.new
end
+ @agent_key = "#{@impl.getBrokerBank}.#{@impl.getAgentBank}"
end
def object_num_high
@@ -470,14 +471,6 @@ module Qmf
@impl.getObjectNumLo
end
- def broker_bank
- @impl.getBrokerBank
- end
-
- def agent_bank
- @impl.getAgentBank
- end
-
def ==(other)
return (@impl.getObjectNumHi == other.impl.getObjectNumHi) &&
(@impl.getObjectNumLo == other.impl.getObjectNumLo)
@@ -748,7 +741,7 @@ module Qmf
class SchemaClassKey
attr_reader :impl
def initialize(i)
- @impl = i
+ @impl = Qmfengine::SchemaClassKey.new(i)
end
def package_name
@@ -956,6 +949,7 @@ module Qmf
def objects(query, kwargs = {})
timeout = 30
+ agent = nil
kwargs.merge!(query) if query.class == Hash
if kwargs.include?(:timeout)
@@ -963,6 +957,11 @@ module Qmf
kwargs.delete(:timeout)
end
+ if kwargs.include?(:agent)
+ agent = kwargs[:agent]
+ kwargs.delete(:agent)
+ end
+
query = Query.new(kwargs) if query.class == Hash
@select = []
@@ -975,7 +974,7 @@ module Qmf
@sync_result = []
broker = nil
synchronize { broker = @broker_list[0] }
- broker.send_query(query.impl, nil)
+ broker.send_query(query.impl, nil, agent)
unless @cv.wait(timeout) { @sync_count == 0 }
raise "Timed out waiting for response"
end
@@ -1045,21 +1044,25 @@ module Qmf
valid = @impl.getEvent(@event)
while valid
count += 1
- case @event.kind
- when Qmfengine::ConsoleEvent::AGENT_ADDED
- @handler.agent_added(AgentProxy.new(@event.agent, nil)) if @handler
- when Qmfengine::ConsoleEvent::AGENT_DELETED
- @handler.agent_deleted(AgentProxy.new(@event.agent, nil)) if @handler
- when Qmfengine::ConsoleEvent::NEW_PACKAGE
- @handler.new_package(@event.name) if @handler
- when Qmfengine::ConsoleEvent::NEW_CLASS
- @handler.new_class(SchemaClassKey.new(@event.classKey)) if @handler
- when Qmfengine::ConsoleEvent::OBJECT_UPDATE
- @handler.object_update(ConsoleObject.new(nil, :impl => @event.object), @event.hasProps, @event.hasStats) if @handler
- when Qmfengine::ConsoleEvent::EVENT_RECEIVED
- when Qmfengine::ConsoleEvent::AGENT_HEARTBEAT
- @handler.agent_heartbeat(AgentProxy.new(@event.agent, nil), @event.timestamp) if @handler
- when Qmfengine::ConsoleEvent::METHOD_RESPONSE
+ begin
+ case @event.kind
+ when Qmfengine::ConsoleEvent::AGENT_ADDED
+ @handler.agent_added(AgentProxy.new(@event.agent, nil)) if @handler
+ when Qmfengine::ConsoleEvent::AGENT_DELETED
+ @handler.agent_deleted(AgentProxy.new(@event.agent, nil)) if @handler
+ when Qmfengine::ConsoleEvent::NEW_PACKAGE
+ @handler.new_package(@event.name) if @handler
+ when Qmfengine::ConsoleEvent::NEW_CLASS
+ @handler.new_class(SchemaClassKey.new(@event.classKey)) if @handler
+ when Qmfengine::ConsoleEvent::OBJECT_UPDATE
+ @handler.object_update(ConsoleObject.new(nil, :impl => @event.object), @event.hasProps, @event.hasStats) if @handler
+ when Qmfengine::ConsoleEvent::EVENT_RECEIVED
+ when Qmfengine::ConsoleEvent::AGENT_HEARTBEAT
+ @handler.agent_heartbeat(AgentProxy.new(@event.agent, nil), @event.timestamp) if @handler
+ when Qmfengine::ConsoleEvent::METHOD_RESPONSE
+ end
+ rescue
+ puts "Exception caught in callback thread: #{$!}"
end
@impl.popEvent
valid = @impl.getEvent(@event)
@@ -1069,23 +1072,13 @@ module Qmf
end
class AgentProxy
- attr_reader :broker
+ attr_reader :impl, :broker, :label, :key
def initialize(impl, broker)
- @impl = impl
+ @impl = Qmfengine::AgentProxy.new(impl)
@broker = broker
- end
-
- def label
- @impl.getLabel
- end
-
- def broker_bank
- @impl.getBrokerBank
- end
-
- def agent_bank
- @impl.getAgentBank
+ @label = @impl.getLabel
+ @key = "#{@impl.getBrokerBank}.#{@impl.getAgentBank}"
end
end
@@ -1130,8 +1123,9 @@ module Qmf
end
end
- def send_query(query, ctx)
- @impl.sendQuery(query, ctx)
+ def send_query(query, ctx, agent)
+ agent_impl = agent.impl if agent
+ @impl.sendQuery(query, ctx, agent_impl)
@conn.kick
end
diff --git a/cpp/bindings/qmf/tests/ruby_console.rb b/cpp/bindings/qmf/tests/ruby_console.rb
index a76099f83a..31670312d6 100755
--- a/cpp/bindings/qmf/tests/ruby_console.rb
+++ b/cpp/bindings/qmf/tests/ruby_console.rb
@@ -25,7 +25,7 @@ require 'socket'
class App < Qmf::ConsoleHandler
def agent_added(agent)
- puts "AgentAdded: #{agent.label} broker=#{agent.broker_bank} agent=#{agent.agent_bank}"
+ puts "AgentAdded: label=#{agent.label} key=#{agent.key}"
end
def agent_deleted(agent)
@@ -42,8 +42,7 @@ class App < Qmf::ConsoleHandler
def object_update(object, hasProps, hasStats)
puts "ObjectUpdate: #{object.object_class.class_name} props=#{hasProps} stats=#{hasStats}"
- puts " broker-bank=#{object.object_id.broker_bank}"
- puts " agent-bank=#{object.object_id.agent_bank}"
+ puts " agent-key=#{object.object_id.agent_key}"
puts " package=#{object.object_class.package_name}"
end
diff --git a/cpp/bindings/qmf/tests/ruby_console_test.rb b/cpp/bindings/qmf/tests/ruby_console_test.rb
index e0440367c5..98180a97cd 100755
--- a/cpp/bindings/qmf/tests/ruby_console_test.rb
+++ b/cpp/bindings/qmf/tests/ruby_console_test.rb
@@ -201,6 +201,19 @@ class ConsoleTest < ConsoleTestBase
end
+ def test_D_get_with_agent
+ agents = @qmfc.agents
+ agents.each do |agent|
+ if agent.label == "qmfa"
+ parent = @qmfc.object(:class => "parent", :agent => agent)
+ assert(parent, "Number of parent objects")
+ return
+ end
+ end
+
+ fail("Didn't find a non-broker agent")
+ end
+
end
app = ConsoleTest.new
diff --git a/cpp/include/qmf/engine/Console.h b/cpp/include/qmf/engine/Console.h
index ce72360da7..8463023468 100644
--- a/cpp/include/qmf/engine/Console.h
+++ b/cpp/include/qmf/engine/Console.h
@@ -134,6 +134,7 @@ namespace engine {
*/
class AgentProxy {
public:
+ AgentProxy(const AgentProxy& from);
~AgentProxy();
const char* getLabel() const;
uint32_t getBrokerBank() const;
diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/cpp/src/qmf/engine/BrokerProxyImpl.cpp
index 0a1769f891..2d955b0c26 100644
--- a/cpp/src/qmf/engine/BrokerProxyImpl.cpp
+++ b/cpp/src/qmf/engine/BrokerProxyImpl.cpp
@@ -741,6 +741,7 @@ bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const strin
//==================================================================
AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
+AgentProxy::AgentProxy(const AgentProxy& from) : impl(new AgentProxyImpl(*(from.impl))) {}
AgentProxy::~AgentProxy() { delete impl; }
const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
uint32_t AgentProxy::getBrokerBank() const { return impl->getBrokerBank(); }