summaryrefslogtreecommitdiff
path: root/ruby/lib
diff options
context:
space:
mode:
Diffstat (limited to 'ruby/lib')
-rw-r--r--ruby/lib/qpid/connection.rb3
-rw-r--r--ruby/lib/qpid/delegates.rb2
-rw-r--r--ruby/lib/qpid/framer.rb2
-rw-r--r--ruby/lib/qpid/qmf.rb63
-rw-r--r--ruby/lib/qpid/session.rb4
5 files changed, 46 insertions, 28 deletions
diff --git a/ruby/lib/qpid/connection.rb b/ruby/lib/qpid/connection.rb
index 59d88196a3..d2efbfb263 100644
--- a/ruby/lib/qpid/connection.rb
+++ b/ruby/lib/qpid/connection.rb
@@ -36,7 +36,7 @@ module Qpid
include MonitorMixin
attr_reader :spec, :attached, :sessions, :thread
- attr_accessor :opened, :failed, :close_code
+ attr_accessor :opened, :failed, :close_code, :user_id
def initialize(sock, args={})
super(sock)
@@ -58,6 +58,7 @@ module Qpid
@thread = nil
@channel_max = 65535
+ @user_id = nil
@delegate = delegate.call(self, args)
end
diff --git a/ruby/lib/qpid/delegates.rb b/ruby/lib/qpid/delegates.rb
index 8d866e895f..6d655067ef 100644
--- a/ruby/lib/qpid/delegates.rb
+++ b/ruby/lib/qpid/delegates.rb
@@ -202,6 +202,7 @@ module Qpid
end
begin
resp = Sasl.client_start(@saslConn, mech_list)
+ @connection.user_id = Sasl.user_id(@saslConn)
ch.connection_start_ok(:client_properties => PROPERTIES,
:mechanism => resp[2],
:response => resp[1])
@@ -214,6 +215,7 @@ module Qpid
def connection_secure(ch, secure)
resp = Sasl.client_step(@saslConn, secure.challenge)
+ @connection.user_id = Sasl.user_id(@saslConn)
ch.connection_secure_ok(:response => resp[1])
end
diff --git a/ruby/lib/qpid/framer.rb b/ruby/lib/qpid/framer.rb
index abac221f00..d057605383 100644
--- a/ruby/lib/qpid/framer.rb
+++ b/ruby/lib/qpid/framer.rb
@@ -137,6 +137,8 @@ module Qpid
@tx_buf = ""
frm.debug("FLUSHED") if frm
end
+ rescue
+ @sock.close unless @sock.closed?
end
def _write(buf)
diff --git a/ruby/lib/qpid/qmf.rb b/ruby/lib/qpid/qmf.rb
index ebca7ee5ab..b7309155c3 100644
--- a/ruby/lib/qpid/qmf.rb
+++ b/ruby/lib/qpid/qmf.rb
@@ -252,7 +252,7 @@ module Qpid::Qmf
args = { :exchange => "qpid.management",
:queue => broker.topic_name,
:binding_key => "console.obj.*.*.#{package_name}.#" }
- broker.amqpSession.exchange_bind(args)
+ broker.amqp_session.exchange_bind(args)
end
end
@@ -264,7 +264,7 @@ module Qpid::Qmf
args = { :exchange => "qpid.management",
:queue => broker.topic_name,
:binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" }
- broker.amqpSession.exchange_bind(args)
+ broker.amqp_session.exchange_bind(args)
end
end
@@ -277,7 +277,7 @@ module Qpid::Qmf
args = { :exchange => "qpid.management",
:queue => broker.topic_name,
:binding_key => "console.obj.*.*.#{pname}.#{cname}.#" }
- broker.amqpSession.exchange_bind(args)
+ broker.amqp_session.exchange_bind(args)
end
end
@@ -319,7 +319,7 @@ module Qpid::Qmf
# The default timeout for this synchronous operation is 60 seconds. To change the timeout,
# use the following argument:
#
- # :_timeout = <time in seconds>
+ # :timeout = <time in seconds>
#
# If additional arguments are supplied, they are used as property
# selectors, as long as their keys are strings. For example, if
@@ -340,19 +340,21 @@ module Qpid::Qmf
unless broker_list.include?(agent.broker)
raise ArgumentError, "Supplied agent is not accessible through the supplied broker"
end
- agent_list << agent
+ agent_list << agent if agent.broker.connected?
else
if kwargs.include?(:object_id)
oid = kwargs[:object_id]
broker_list.each { |broker|
broker.agents.each { |agent|
if oid.broker_bank == agent.broker_bank && oid.agent_bank == agent.agent_bank
- agent_list << agent
+ agent_list << agent if agent.broker.connected?
end
}
}
else
- broker_list.each { |broker| agent_list += broker.agents }
+ broker_list.each { |broker|
+ agent_list += broker.agents if broker.connected?
+ }
end
end
@@ -400,8 +402,8 @@ module Qpid::Qmf
end
timeout = false
- if kwargs.include?(:_timeout)
- wait_time = kwargs[:_timeout]
+ if kwargs.include?(:timeout)
+ wait_time = kwargs[:timeout]
else
wait_time = DEFAULT_GET_WAIT_TIME
end
@@ -616,8 +618,8 @@ module Qpid::Qmf
def handle_broker_disconnect(broker); end
def handle_error(error)
- @error = error
synchronize do
+ @error = error if @sync_sequence_list.length > 0
@sync_sequence_list = []
@cv.signal
end
@@ -953,6 +955,8 @@ module Qpid::Qmf
class Object
+ DEFAULT_METHOD_WAIT_TIME = 60
+
attr_reader :object_id, :schema, :properties, :statistics,
:current_time, :create_time, :delete_time, :broker
@@ -1110,12 +1114,14 @@ module Qpid::Qmf
timeout = nil
if kwargs.class == Hash
- if kwargs.include?(:_timeout)
- timeout = kwargs[:_timeout]
+ if kwargs.include?(:timeout)
+ timeout = kwargs[:timeout]
+ else
+ timeout = DEFAULT_METHOD_WAIT_TIME
end
- if kwargs.include?(:_async)
- sync = !kwargs[:_async]
+ if kwargs.include?(:async)
+ sync = !kwargs[:async]
end
args.pop
end
@@ -1212,7 +1218,7 @@ module Qpid::Qmf
end
end
- rescue Qpid::Session::Closed, Qpid::Session::Detached, SystemCallError
+ rescue
delay *= DELAY_FACTOR if delay < DELAY_MAX
end
@@ -1247,6 +1253,7 @@ module Qpid::Qmf
class Broker
SYNC_TIME = 60
+ @@next_seq = 1
include MonitorMixin
@@ -1267,20 +1274,17 @@ module Qpid::Qmf
@port = port
@auth_name = auth_name
@auth_pass = auth_pass
+ @user_id = nil
@auth_mechanism = kwargs[:mechanism]
@auth_service = kwargs[:service]
@broker_bank = 1
- @agents = {}
- @agents["1.0"] = Agent.new(self, 0, "BrokerAgent")
@topic_bound = false
@cv = new_cond
- @sync_in_flight = false
- @sync_request = 0
- @sync_result = nil
- @reqs_outstanding = 1
- @error = nil
- @broker_id = nil
+ @error = nil
+ @broker_id = nil
@is_connected = false
+ @amqp_session_id = "%s.%d.%d" % [Socket.gethostname, Process::pid, @@next_seq]
+ @@next_seq += 1
@conn = nil
if @session.managedConnections?
@thread = ManagedConnection.new(self)
@@ -1326,6 +1330,7 @@ module Qpid::Qmf
def wait_for_stable
synchronize do
+ return unless connected?
return if @reqs_outstanding == 0
@sync_in_flight = true
unless @cv.wait_for(SYNC_TIME) { @reqs_outstanding == 0 }
@@ -1350,6 +1355,7 @@ module Qpid::Qmf
mp = @amqp_session.message_properties
mp.content_type = "x-application/qmf"
mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name)
+ #mp.user_id = @user_id if @user_id
return Qpid::Message.new(dp, mp, body)
end
@@ -1422,8 +1428,14 @@ module Qpid::Qmf
end
def try_to_connect
- #begin
- @amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid]
+ @agents = {}
+ @agents["1.0"] = Agent.new(self, 0, "BrokerAgent")
+ @topic_bound = false
+ @sync_in_flight = false
+ @sync_request = 0
+ @sync_result = nil
+ @reqs_outstanding = 1
+
# FIXME: Need sth for Qpid::Util::connect
@conn = Qpid::Connection.new(TCPSocket.new(@host, @port),
@@ -1433,6 +1445,7 @@ module Qpid::Qmf
:host => @host,
:service => @auth_service)
@conn.start
+ @user_id = @conn.user_id
@reply_name = "reply-%s" % amqp_session_id
@amqp_session = @conn.session(@amqp_session_id)
@amqp_session.auto_sync = true
diff --git a/ruby/lib/qpid/session.rb b/ruby/lib/qpid/session.rb
index 43a664d285..1b4d78814a 100644
--- a/ruby/lib/qpid/session.rb
+++ b/ruby/lib/qpid/session.rb
@@ -114,7 +114,7 @@ module Qpid
end
end
if error?
- raise Qpid::Session::Exception, exceptions
+ raise Qpid::Session::Exception, @exceptions
end
end
@@ -436,7 +436,7 @@ module Qpid
end
def execution_exception(ex)
- @session.exceptions.append(ex)
+ @session.exceptions << ex
end
end