diff options
-rw-r--r-- | ruby/ext/sasl/sasl.c | 40 | ||||
-rw-r--r-- | ruby/lib/qpid/connection.rb | 3 | ||||
-rw-r--r-- | ruby/lib/qpid/delegates.rb | 2 | ||||
-rw-r--r-- | ruby/lib/qpid/framer.rb | 2 | ||||
-rw-r--r-- | ruby/lib/qpid/qmf.rb | 63 | ||||
-rw-r--r-- | ruby/lib/qpid/session.rb | 4 | ||||
-rw-r--r-- | ruby/tests/qmf.rb | 4 |
7 files changed, 88 insertions, 30 deletions
diff --git a/ruby/ext/sasl/sasl.c b/ruby/ext/sasl/sasl.c index 2a2a829aa8..2d4e40d30e 100644 --- a/ruby/ext/sasl/sasl.c +++ b/ruby/ext/sasl/sasl.c @@ -41,6 +41,7 @@ typedef struct { sasl_callback_t callbacks[8]; char* userName; char* password; + char* operUserName; unsigned int minSsf; unsigned int maxSsf; char mechanism[MECH_SIZE]; @@ -280,6 +281,8 @@ static VALUE qsasl_free(int argc, VALUE *argv, VALUE obj) free(context->userName); if (context->password) free(context->password); + if (context->operUserName) + free(context->operUserName); free(context); return Qnil; @@ -294,10 +297,12 @@ static VALUE qsasl_client_start(int argc, VALUE *argv, VALUE obj) char* mechList; char* mechToUse; int result; + int propResult; const char* response; unsigned int len; sasl_interact_t* interact = 0; const char* chosen; + const char* operName; if (argc == 2) { context = (context_t*) argv[0]; @@ -322,6 +327,14 @@ static VALUE qsasl_client_start(int argc, VALUE *argv, VALUE obj) rb_raise(rb_eRuntimeError, "sasl_client_start failed: %d - %s", result, sasl_errdetail(context->conn)); + if (result == SASL_OK) { + propResult = sasl_getprop(context->conn, SASL_USERNAME, (const void**) &operName); + if (propResult == SASL_OK) { + context->operUserName = (char*) malloc(strlen(operName) + 1); + strcpy(context->operUserName, operName); + } + } + return rb_ary_new3(3, INT2NUM(result), rb_str_new(response, len), rb_str_new2(chosen)); } @@ -333,7 +346,9 @@ static VALUE qsasl_client_step(int argc, VALUE *argv, VALUE obj) context_t* context; VALUE challenge; int result; + int propResult; const char* response; + const char* operName; unsigned int len; sasl_interact_t* interact = 0; @@ -356,9 +371,33 @@ static VALUE qsasl_client_step(int argc, VALUE *argv, VALUE obj) if (result != SASL_OK && result != SASL_CONTINUE) return QSASL_FAILED; + if (result == SASL_OK) { + propResult = sasl_getprop(context->conn, SASL_USERNAME, (const void**) &operName); + if (propResult == SASL_OK) { + context->operUserName = (char*) malloc(strlen(operName) + 1); + strcpy(context->operUserName, operName); + } + } + return rb_ary_new3(2, INT2NUM(result), rb_str_new(response, len)); } +static VALUE qsasl_user_id(int argc, VALUE *argv, VALUE obj) +{ + context_t* context; + + if (argc == 1) { + context = (context_t*) argv[0]; + } else { + rb_raise(rb_eRuntimeError, "Wrong Number of Arguments"); + } + + if (context->operUserName) + return rb_str_new2(context->operUserName); + + return Qnil; +} + // // Encode transport data for the security layer. // @@ -427,6 +466,7 @@ void Init_sasl() rb_define_module_function(mSasl, "free", qsasl_free, -1); rb_define_module_function(mSasl, "client_start", qsasl_client_start, -1); rb_define_module_function(mSasl, "client_step", qsasl_client_step, -1); + rb_define_module_function(mSasl, "user_id", qsasl_user_id, -1); rb_define_module_function(mSasl, "encode", qsasl_encode, -1); rb_define_module_function(mSasl, "decode", qsasl_decode, -1); } diff --git a/ruby/lib/qpid/connection.rb b/ruby/lib/qpid/connection.rb index 59d88196a3..d2efbfb263 100644 --- a/ruby/lib/qpid/connection.rb +++ b/ruby/lib/qpid/connection.rb @@ -36,7 +36,7 @@ module Qpid include MonitorMixin attr_reader :spec, :attached, :sessions, :thread - attr_accessor :opened, :failed, :close_code + attr_accessor :opened, :failed, :close_code, :user_id def initialize(sock, args={}) super(sock) @@ -58,6 +58,7 @@ module Qpid @thread = nil @channel_max = 65535 + @user_id = nil @delegate = delegate.call(self, args) end diff --git a/ruby/lib/qpid/delegates.rb b/ruby/lib/qpid/delegates.rb index 8d866e895f..6d655067ef 100644 --- a/ruby/lib/qpid/delegates.rb +++ b/ruby/lib/qpid/delegates.rb @@ -202,6 +202,7 @@ module Qpid end begin resp = Sasl.client_start(@saslConn, mech_list) + @connection.user_id = Sasl.user_id(@saslConn) ch.connection_start_ok(:client_properties => PROPERTIES, :mechanism => resp[2], :response => resp[1]) @@ -214,6 +215,7 @@ module Qpid def connection_secure(ch, secure) resp = Sasl.client_step(@saslConn, secure.challenge) + @connection.user_id = Sasl.user_id(@saslConn) ch.connection_secure_ok(:response => resp[1]) end diff --git a/ruby/lib/qpid/framer.rb b/ruby/lib/qpid/framer.rb index abac221f00..d057605383 100644 --- a/ruby/lib/qpid/framer.rb +++ b/ruby/lib/qpid/framer.rb @@ -137,6 +137,8 @@ module Qpid @tx_buf = "" frm.debug("FLUSHED") if frm end + rescue + @sock.close unless @sock.closed? end def _write(buf) diff --git a/ruby/lib/qpid/qmf.rb b/ruby/lib/qpid/qmf.rb index ebca7ee5ab..b7309155c3 100644 --- a/ruby/lib/qpid/qmf.rb +++ b/ruby/lib/qpid/qmf.rb @@ -252,7 +252,7 @@ module Qpid::Qmf args = { :exchange => "qpid.management", :queue => broker.topic_name, :binding_key => "console.obj.*.*.#{package_name}.#" } - broker.amqpSession.exchange_bind(args) + broker.amqp_session.exchange_bind(args) end end @@ -264,7 +264,7 @@ module Qpid::Qmf args = { :exchange => "qpid.management", :queue => broker.topic_name, :binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" } - broker.amqpSession.exchange_bind(args) + broker.amqp_session.exchange_bind(args) end end @@ -277,7 +277,7 @@ module Qpid::Qmf args = { :exchange => "qpid.management", :queue => broker.topic_name, :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" } - broker.amqpSession.exchange_bind(args) + broker.amqp_session.exchange_bind(args) end end @@ -319,7 +319,7 @@ module Qpid::Qmf # The default timeout for this synchronous operation is 60 seconds. To change the timeout, # use the following argument: # - # :_timeout = <time in seconds> + # :timeout = <time in seconds> # # If additional arguments are supplied, they are used as property # selectors, as long as their keys are strings. For example, if @@ -340,19 +340,21 @@ module Qpid::Qmf unless broker_list.include?(agent.broker) raise ArgumentError, "Supplied agent is not accessible through the supplied broker" end - agent_list << agent + agent_list << agent if agent.broker.connected? else if kwargs.include?(:object_id) oid = kwargs[:object_id] broker_list.each { |broker| broker.agents.each { |agent| if oid.broker_bank == agent.broker_bank && oid.agent_bank == agent.agent_bank - agent_list << agent + agent_list << agent if agent.broker.connected? end } } else - broker_list.each { |broker| agent_list += broker.agents } + broker_list.each { |broker| + agent_list += broker.agents if broker.connected? + } end end @@ -400,8 +402,8 @@ module Qpid::Qmf end timeout = false - if kwargs.include?(:_timeout) - wait_time = kwargs[:_timeout] + if kwargs.include?(:timeout) + wait_time = kwargs[:timeout] else wait_time = DEFAULT_GET_WAIT_TIME end @@ -616,8 +618,8 @@ module Qpid::Qmf def handle_broker_disconnect(broker); end def handle_error(error) - @error = error synchronize do + @error = error if @sync_sequence_list.length > 0 @sync_sequence_list = [] @cv.signal end @@ -953,6 +955,8 @@ module Qpid::Qmf class Object + DEFAULT_METHOD_WAIT_TIME = 60 + attr_reader :object_id, :schema, :properties, :statistics, :current_time, :create_time, :delete_time, :broker @@ -1110,12 +1114,14 @@ module Qpid::Qmf timeout = nil if kwargs.class == Hash - if kwargs.include?(:_timeout) - timeout = kwargs[:_timeout] + if kwargs.include?(:timeout) + timeout = kwargs[:timeout] + else + timeout = DEFAULT_METHOD_WAIT_TIME end - if kwargs.include?(:_async) - sync = !kwargs[:_async] + if kwargs.include?(:async) + sync = !kwargs[:async] end args.pop end @@ -1212,7 +1218,7 @@ module Qpid::Qmf end end - rescue Qpid::Session::Closed, Qpid::Session::Detached, SystemCallError + rescue delay *= DELAY_FACTOR if delay < DELAY_MAX end @@ -1247,6 +1253,7 @@ module Qpid::Qmf class Broker SYNC_TIME = 60 + @@next_seq = 1 include MonitorMixin @@ -1267,20 +1274,17 @@ module Qpid::Qmf @port = port @auth_name = auth_name @auth_pass = auth_pass + @user_id = nil @auth_mechanism = kwargs[:mechanism] @auth_service = kwargs[:service] @broker_bank = 1 - @agents = {} - @agents["1.0"] = Agent.new(self, 0, "BrokerAgent") @topic_bound = false @cv = new_cond - @sync_in_flight = false - @sync_request = 0 - @sync_result = nil - @reqs_outstanding = 1 - @error = nil - @broker_id = nil + @error = nil + @broker_id = nil @is_connected = false + @amqp_session_id = "%s.%d.%d" % [Socket.gethostname, Process::pid, @@next_seq] + @@next_seq += 1 @conn = nil if @session.managedConnections? @thread = ManagedConnection.new(self) @@ -1326,6 +1330,7 @@ module Qpid::Qmf def wait_for_stable synchronize do + return unless connected? return if @reqs_outstanding == 0 @sync_in_flight = true unless @cv.wait_for(SYNC_TIME) { @reqs_outstanding == 0 } @@ -1350,6 +1355,7 @@ module Qpid::Qmf mp = @amqp_session.message_properties mp.content_type = "x-application/qmf" mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name) + #mp.user_id = @user_id if @user_id return Qpid::Message.new(dp, mp, body) end @@ -1422,8 +1428,14 @@ module Qpid::Qmf end def try_to_connect - #begin - @amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid] + @agents = {} + @agents["1.0"] = Agent.new(self, 0, "BrokerAgent") + @topic_bound = false + @sync_in_flight = false + @sync_request = 0 + @sync_result = nil + @reqs_outstanding = 1 + # FIXME: Need sth for Qpid::Util::connect @conn = Qpid::Connection.new(TCPSocket.new(@host, @port), @@ -1433,6 +1445,7 @@ module Qpid::Qmf :host => @host, :service => @auth_service) @conn.start + @user_id = @conn.user_id @reply_name = "reply-%s" % amqp_session_id @amqp_session = @conn.session(@amqp_session_id) @amqp_session.auto_sync = true diff --git a/ruby/lib/qpid/session.rb b/ruby/lib/qpid/session.rb index 43a664d285..1b4d78814a 100644 --- a/ruby/lib/qpid/session.rb +++ b/ruby/lib/qpid/session.rb @@ -114,7 +114,7 @@ module Qpid end end if error? - raise Qpid::Session::Exception, exceptions + raise Qpid::Session::Exception, @exceptions end end @@ -436,7 +436,7 @@ module Qpid end def execution_exception(ex) - @session.exceptions.append(ex) + @session.exceptions << ex end end diff --git a/ruby/tests/qmf.rb b/ruby/tests/qmf.rb index 06371a5a82..274e38416e 100644 --- a/ruby/tests/qmf.rb +++ b/ruby/tests/qmf.rb @@ -44,7 +44,7 @@ class QmfTest < Test::Unit::TestCase @count = count for idx in 0...count synchronize do - seq = broker.echo(idx, "Echo Message", :_async => true) + seq = broker.echo(idx, "Echo Message", :async => true) @xmt_list[seq] = idx end end @@ -109,7 +109,7 @@ class QmfTest < Test::Unit::TestCase start_qmf body = "Echo Message Body" for seq in 1..10 - res = @broker.echo(seq, body, :_timeout => 10) + res = @broker.echo(seq, body, :timeout => 10) assert_equal(0, res.status) assert_equal("OK", res.text) assert_equal(seq, res.sequence) |