summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-12-09 22:15:20 +0000
committerTed Ross <tross@apache.org>2008-12-09 22:15:20 +0000
commitab9efada20267cdd8a60db99f0fef877ad94a172 (patch)
tree11efa9742088850df1e5409f6111ed3c3fa7536c
parent3a6c4abc4cb3fb9c8c6d7982b7c9c90fd3359228 (diff)
downloadqpid-python-ab9efada20267cdd8a60db99f0fef877ad94a172.tar.gz
Port features and bug-fixes from Python API to Ruby API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@724911 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--ruby/lib/qpid/qmf.rb159
1 files changed, 128 insertions, 31 deletions
diff --git a/ruby/lib/qpid/qmf.rb b/ruby/lib/qpid/qmf.rb
index 378d4068be..222801b6cd 100644
--- a/ruby/lib/qpid/qmf.rb
+++ b/ruby/lib/qpid/qmf.rb
@@ -20,6 +20,7 @@
require 'socket'
require 'monitor'
+require 'thread'
require 'uri'
require 'time'
@@ -63,6 +64,9 @@ module Qpid::Qmf
end
class BrokerURL
+
+ attr_reader :host, :port, :auth_name, :auth_pass, :auth_mech
+
def initialize(text)
uri = URI.parse(text)
@@ -163,23 +167,26 @@ module Qpid::Qmf
raise ArgumentError, "user_bindings can't be set unless rcv_objects is set and a console is provided"
end
- #raise NotImplementedError, @manage_connections
end
def to_s
- "QMF Console Session Manager (brokers connected: #{@brokers.size})"
+ "QMF Console Session Manager (brokers: #{@brokers.size})"
+ end
+
+ def managedConnections?
+ return @manage_connections
end
# Connect to a Qpid broker. Returns an object of type Broker
def add_broker(target="amqp://localhost")
- uri = URI.parse(target)
- broker = Broker.new(self, uri.host, uri.port, "PLAIN", uri.user, uri.password)
+ url = BrokerURL.new(target)
+ broker = Broker.new(self, url.host, url.port, url.auth_mech, url.auth_name, url.auth_pass)
unless broker.connected? || @manage_connections
raise broker.error
end
@brokers << broker
- objects(:broker => broker, :class => "agent")
+ objects(:broker => broker, :class => "agent") unless @manage_connections
return broker
end
@@ -216,20 +223,32 @@ module Qpid::Qmf
end
@brokers.each do |broker|
args = { :exchange => "qpid.management",
- :queue => broker.topicName,
+ :queue => broker.topic_name,
:binding_key => "console.obj.*.*.#{package_name}.#" }
broker.amqpSession.exchange_bind(args)
end
end
- def bind_class(klass_key)
+ def bind_class(package_name, class_name)
+ unless @user_bindings && @rcv_objects
+ raise "userBindings option not set for Session"
+ end
+ @brokers.each do |broker|
+ args = { :exchange => "qpid.management",
+ :queue => broker.topic_name,
+ :binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" }
+ broker.amqpSession.exchange_bind(args)
+ end
+ end
+
+ def bind_class_key(klass_key)
unless @user_bindings && @rcv_objects
raise "userBindings option not set for Session"
end
pname, cname, hash = klass_key
@brokers.each do |broker|
args = { :exchange => "qpid.management",
- :queue => broker.topicName,
+ :queue => broker.topic_name,
:binding_key => "console.obj.*.*.#{pname}.#{cname}.#" }
broker.amqpSession.exchange_bind(args)
end
@@ -290,7 +309,18 @@ module Qpid::Qmf
end
agent_list << agent
else
- broker_list.each { |broker| agent_list += broker.agents }
+ if kwargs.include?(:object_id)
+ oid = kwargs[:object_id]
+ broker_list.each { |broker|
+ broker.agents.each { |agent|
+ if oid.broker_bank == agent.broker_bank && oid.agent_bank == agent.agent_bank
+ agent_list << agent
+ end
+ }
+ }
+ else
+ broker_list.each { |broker| agent_list += broker.agents }
+ end
end
cname = nil
@@ -310,7 +340,7 @@ module Qpid::Qmf
map = {}
@select = []
if kwargs.include?(:object_id)
- map["_objectId"] = kwargs[:object_id].to_str
+ map["_objectid"] = kwargs[:object_id].to_s
else
map["_class"] = cname
map["_package"] = pname if pname
@@ -477,7 +507,7 @@ module Qpid::Qmf
end
agent = broker.agent(broker_bank, agent_bank)
timestamp = codec.read_uint64
- @console.heartbeat(agent, timestamp)
+ @console.heartbeat(agent, timestamp) if agent
end
end
@@ -516,7 +546,7 @@ module Qpid::Qmf
end
object = Qpid::Qmf::Object.new(self, broker, schema, codec, prop, stat)
- if pname == "org.apache.qpid.broker" && cname == "agent"
+ if pname == "org.apache.qpid.broker" && cname == "agent" && prop
broker.update_agent(object)
end
@@ -665,6 +695,7 @@ module Qpid::Qmf
end
end
+ # A ClassKey uniquely identifies a class from the schema.
class ClassKey
attr_reader :package, :klass_name, :hash
@@ -1075,6 +1106,67 @@ module Qpid::Qmf
end
end
+ class ManagedConnection
+
+ DELAY_MIN = 1
+ DELAY_MAX = 128
+ DELAY_FACTOR = 2
+ include MonitorMixin
+
+ def initialize(broker)
+ super()
+ @broker = broker
+ @cv = new_cond
+ @is_cancelled = false
+ end
+
+ # Main body of the running thread.
+ def start
+ @thread = Thread.new {
+ delay = DELAY_MIN
+ while true
+ begin
+ @broker.try_to_connect
+ synchronize do
+ while !@is_cancelled and @broker.connected?
+ @cv.wait
+ Thread.exit if @is_cancelled
+ delay = DELAY_MIN
+ end
+ end
+
+ rescue Qpid::Session::Closed, Qpid::Session::Detached, SystemCallError
+ delay *= DELAY_FACTOR if delay < DELAY_MAX
+ end
+
+ synchronize do
+ @cv.wait(delay)
+ Thread.exit if @is_cancelled
+ end
+ end
+ }
+ end
+
+ # Tell this thread to stop running and return.
+ def stop
+ synchronize do
+ @is_cancelled = true
+ @cv.signal
+ end
+ end
+
+ # Notify the thread that the connection was lost.
+ def disconnected
+ synchronize do
+ @cv.signal
+ end
+ end
+
+ def join
+ @thread.join
+ end
+ end
+
class Broker
SYNC_TIME = 60
@@ -1083,11 +1175,11 @@ module Qpid::Qmf
attr_accessor :error
- attr_reader :amqp_session_id, :amqp_session, :conn
+ attr_reader :amqp_session_id, :amqp_session, :conn, :broker_bank, :topic_name
- attr_accessor :broker_id, :sync_result, :broker_bank
+ attr_accessor :broker_id, :sync_result
- def initialize(session, host, port, auth_mech, auth_user, auth_pass)
+ def initialize(session, host, port, auth_mech, auth_name, auth_pass)
super()
# For debugging..
@@ -1096,8 +1188,9 @@ module Qpid::Qmf
@session = session
@host = host
@port = port
- @auth_user = auth_user
+ @auth_name = auth_name
@auth_pass = auth_pass
+ @broker_bank = 1
@agents = {}
@agents["1.0"] = Agent.new(self, 0, "BrokerAgent")
@topic_bound = false
@@ -1108,10 +1201,15 @@ module Qpid::Qmf
@reqs_outstanding = 1
@error = nil
@broker_id = nil
- @broker_bank = 1
@is_connected = false
@conn = nil
- try_to_connect
+ if @session.managedConnections?
+ @thread = ManagedConnection.new(self)
+ @thread.start
+ else
+ @thread = nil
+ try_to_connect
+ end
end
def connected?
@@ -1228,6 +1326,10 @@ module Qpid::Qmf
end
def shutdown
+ if @thread
+ @thread.stop
+ @thread.join
+ end
if connected?
@amqp_session.incoming("rdest").stop
if @session.console
@@ -1235,20 +1337,16 @@ module Qpid::Qmf
end
@amqp_session.close
@is_connected = false
- else
- raise "Broker already disconnected"
end
end
- private
-
def try_to_connect
#begin
@amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid]
# FIXME: Need sth for Qpid::Util::connect
@conn = Qpid::Connection.new(TCPSocket.new(@host, @port),
- :username => @auth_user,
+ :username => @auth_name,
:password => @auth_pass)
@conn.start
@reply_name = "reply-%s" % amqp_session_id
@@ -1303,16 +1401,10 @@ module Qpid::Qmf
set_header(codec, ?B)
msg = message(codec.encoded)
emit(msg)
- # FIXME: These exceptions are bogus here
- #rescue socket.error => e
- # @error = "Socket Error %s - %s" % [e[0], e[1]]
- #rescue Closed => e
- # @error = "Connect Failed %d - %s" % [e[0], e[1]]
- #rescue ConnectionFailed => e
- # @error = "Connect Failed %d - %s" % [e[0], e[1]]
- #end
end
+ private
+
# Check the header of a management message and extract the opcode and
# class
def check_header(codec)
@@ -1357,6 +1449,7 @@ module Qpid::Qmf
synchronize { @cv.signal if @sync_in_flight }
@session.handle_error(@error)
@session.handle_broker_disconnect(self)
+ @thread.disconnected if @thread
end
end
@@ -1369,6 +1462,10 @@ module Qpid::Qmf
@label = label
end
+ def broker_bank
+ @broker.broker_bank
+ end
+
def to_s
"Agent at bank %d.%d (%s)" % [@broker.broker_bank, @agent_bank, @label]
end