diff options
author | Ted Ross <tross@apache.org> | 2008-12-09 22:15:20 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-12-09 22:15:20 +0000 |
commit | ab9efada20267cdd8a60db99f0fef877ad94a172 (patch) | |
tree | 11efa9742088850df1e5409f6111ed3c3fa7536c | |
parent | 3a6c4abc4cb3fb9c8c6d7982b7c9c90fd3359228 (diff) | |
download | qpid-python-ab9efada20267cdd8a60db99f0fef877ad94a172.tar.gz |
Port features and bug-fixes from Python API to Ruby API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@724911 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | ruby/lib/qpid/qmf.rb | 159 |
1 files changed, 128 insertions, 31 deletions
diff --git a/ruby/lib/qpid/qmf.rb b/ruby/lib/qpid/qmf.rb index 378d4068be..222801b6cd 100644 --- a/ruby/lib/qpid/qmf.rb +++ b/ruby/lib/qpid/qmf.rb @@ -20,6 +20,7 @@ require 'socket' require 'monitor' +require 'thread' require 'uri' require 'time' @@ -63,6 +64,9 @@ module Qpid::Qmf end class BrokerURL + + attr_reader :host, :port, :auth_name, :auth_pass, :auth_mech + def initialize(text) uri = URI.parse(text) @@ -163,23 +167,26 @@ module Qpid::Qmf raise ArgumentError, "user_bindings can't be set unless rcv_objects is set and a console is provided" end - #raise NotImplementedError, @manage_connections end def to_s - "QMF Console Session Manager (brokers connected: #{@brokers.size})" + "QMF Console Session Manager (brokers: #{@brokers.size})" + end + + def managedConnections? + return @manage_connections end # Connect to a Qpid broker. Returns an object of type Broker def add_broker(target="amqp://localhost") - uri = URI.parse(target) - broker = Broker.new(self, uri.host, uri.port, "PLAIN", uri.user, uri.password) + url = BrokerURL.new(target) + broker = Broker.new(self, url.host, url.port, url.auth_mech, url.auth_name, url.auth_pass) unless broker.connected? || @manage_connections raise broker.error end @brokers << broker - objects(:broker => broker, :class => "agent") + objects(:broker => broker, :class => "agent") unless @manage_connections return broker end @@ -216,20 +223,32 @@ module Qpid::Qmf end @brokers.each do |broker| args = { :exchange => "qpid.management", - :queue => broker.topicName, + :queue => broker.topic_name, :binding_key => "console.obj.*.*.#{package_name}.#" } broker.amqpSession.exchange_bind(args) end end - def bind_class(klass_key) + def bind_class(package_name, class_name) + unless @user_bindings && @rcv_objects + raise "userBindings option not set for Session" + end + @brokers.each do |broker| + args = { :exchange => "qpid.management", + :queue => broker.topic_name, + :binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" } + broker.amqpSession.exchange_bind(args) + end + end + + def bind_class_key(klass_key) unless @user_bindings && @rcv_objects raise "userBindings option not set for Session" end pname, cname, hash = klass_key @brokers.each do |broker| args = { :exchange => "qpid.management", - :queue => broker.topicName, + :queue => broker.topic_name, :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" } broker.amqpSession.exchange_bind(args) end @@ -290,7 +309,18 @@ module Qpid::Qmf end agent_list << agent else - broker_list.each { |broker| agent_list += broker.agents } + if kwargs.include?(:object_id) + oid = kwargs[:object_id] + broker_list.each { |broker| + broker.agents.each { |agent| + if oid.broker_bank == agent.broker_bank && oid.agent_bank == agent.agent_bank + agent_list << agent + end + } + } + else + broker_list.each { |broker| agent_list += broker.agents } + end end cname = nil @@ -310,7 +340,7 @@ module Qpid::Qmf map = {} @select = [] if kwargs.include?(:object_id) - map["_objectId"] = kwargs[:object_id].to_str + map["_objectid"] = kwargs[:object_id].to_s else map["_class"] = cname map["_package"] = pname if pname @@ -477,7 +507,7 @@ module Qpid::Qmf end agent = broker.agent(broker_bank, agent_bank) timestamp = codec.read_uint64 - @console.heartbeat(agent, timestamp) + @console.heartbeat(agent, timestamp) if agent end end @@ -516,7 +546,7 @@ module Qpid::Qmf end object = Qpid::Qmf::Object.new(self, broker, schema, codec, prop, stat) - if pname == "org.apache.qpid.broker" && cname == "agent" + if pname == "org.apache.qpid.broker" && cname == "agent" && prop broker.update_agent(object) end @@ -665,6 +695,7 @@ module Qpid::Qmf end end + # A ClassKey uniquely identifies a class from the schema. class ClassKey attr_reader :package, :klass_name, :hash @@ -1075,6 +1106,67 @@ module Qpid::Qmf end end + class ManagedConnection + + DELAY_MIN = 1 + DELAY_MAX = 128 + DELAY_FACTOR = 2 + include MonitorMixin + + def initialize(broker) + super() + @broker = broker + @cv = new_cond + @is_cancelled = false + end + + # Main body of the running thread. + def start + @thread = Thread.new { + delay = DELAY_MIN + while true + begin + @broker.try_to_connect + synchronize do + while !@is_cancelled and @broker.connected? + @cv.wait + Thread.exit if @is_cancelled + delay = DELAY_MIN + end + end + + rescue Qpid::Session::Closed, Qpid::Session::Detached, SystemCallError + delay *= DELAY_FACTOR if delay < DELAY_MAX + end + + synchronize do + @cv.wait(delay) + Thread.exit if @is_cancelled + end + end + } + end + + # Tell this thread to stop running and return. + def stop + synchronize do + @is_cancelled = true + @cv.signal + end + end + + # Notify the thread that the connection was lost. + def disconnected + synchronize do + @cv.signal + end + end + + def join + @thread.join + end + end + class Broker SYNC_TIME = 60 @@ -1083,11 +1175,11 @@ module Qpid::Qmf attr_accessor :error - attr_reader :amqp_session_id, :amqp_session, :conn + attr_reader :amqp_session_id, :amqp_session, :conn, :broker_bank, :topic_name - attr_accessor :broker_id, :sync_result, :broker_bank + attr_accessor :broker_id, :sync_result - def initialize(session, host, port, auth_mech, auth_user, auth_pass) + def initialize(session, host, port, auth_mech, auth_name, auth_pass) super() # For debugging.. @@ -1096,8 +1188,9 @@ module Qpid::Qmf @session = session @host = host @port = port - @auth_user = auth_user + @auth_name = auth_name @auth_pass = auth_pass + @broker_bank = 1 @agents = {} @agents["1.0"] = Agent.new(self, 0, "BrokerAgent") @topic_bound = false @@ -1108,10 +1201,15 @@ module Qpid::Qmf @reqs_outstanding = 1 @error = nil @broker_id = nil - @broker_bank = 1 @is_connected = false @conn = nil - try_to_connect + if @session.managedConnections? + @thread = ManagedConnection.new(self) + @thread.start + else + @thread = nil + try_to_connect + end end def connected? @@ -1228,6 +1326,10 @@ module Qpid::Qmf end def shutdown + if @thread + @thread.stop + @thread.join + end if connected? @amqp_session.incoming("rdest").stop if @session.console @@ -1235,20 +1337,16 @@ module Qpid::Qmf end @amqp_session.close @is_connected = false - else - raise "Broker already disconnected" end end - private - def try_to_connect #begin @amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid] # FIXME: Need sth for Qpid::Util::connect @conn = Qpid::Connection.new(TCPSocket.new(@host, @port), - :username => @auth_user, + :username => @auth_name, :password => @auth_pass) @conn.start @reply_name = "reply-%s" % amqp_session_id @@ -1303,16 +1401,10 @@ module Qpid::Qmf set_header(codec, ?B) msg = message(codec.encoded) emit(msg) - # FIXME: These exceptions are bogus here - #rescue socket.error => e - # @error = "Socket Error %s - %s" % [e[0], e[1]] - #rescue Closed => e - # @error = "Connect Failed %d - %s" % [e[0], e[1]] - #rescue ConnectionFailed => e - # @error = "Connect Failed %d - %s" % [e[0], e[1]] - #end end + private + # Check the header of a management message and extract the opcode and # class def check_header(codec) @@ -1357,6 +1449,7 @@ module Qpid::Qmf synchronize { @cv.signal if @sync_in_flight } @session.handle_error(@error) @session.handle_broker_disconnect(self) + @thread.disconnected if @thread end end @@ -1369,6 +1462,10 @@ module Qpid::Qmf @label = label end + def broker_bank + @broker.broker_bank + end + def to_s "Agent at bank %d.%d (%s)" % [@broker.broker_bank, @agent_bank, @label] end |