summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/ruby/lib/qpid/qmf.rb
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/ruby/lib/qpid/qmf.rb')
-rw-r--r--M4-RCs/qpid/ruby/lib/qpid/qmf.rb1603
1 files changed, 0 insertions, 1603 deletions
diff --git a/M4-RCs/qpid/ruby/lib/qpid/qmf.rb b/M4-RCs/qpid/ruby/lib/qpid/qmf.rb
deleted file mode 100644
index d2e2651653..0000000000
--- a/M4-RCs/qpid/ruby/lib/qpid/qmf.rb
+++ /dev/null
@@ -1,1603 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Console API for Qpid Management Framework
-
-require 'socket'
-require 'monitor'
-require 'thread'
-require 'uri'
-require 'time'
-
-module Qpid::Qmf
-
- # To access the asynchronous operations, a class must be derived from
- # Console with overrides of any combination of the available methods.
- class Console
-
- # Invoked when a connection is established to a broker
- def broker_connected(broker); end
-
- # Invoked when the connection to a broker is lost
- def broker_disconnected(broker); end
-
- # Invoked when a QMF package is discovered
- def new_package(name); end
-
- # Invoked when a new class is discovered. Session.getSchema can be
- # used to obtain details about the class
- def new_class(kind, klass_key); end
-
- # Invoked when a QMF agent is discovered
- def new_agent(agent); end
-
- # Invoked when a QMF agent disconects
- def del_agent(agent); end
-
- # Invoked when an object is updated
- def object_props(broker, record); end
-
- # Invoked when an object is updated
- def object_stats(broker, record); end
-
- # Invoked when an event is raised
- def event(broker, event); end
-
- def heartbeat(agent, timestamp); end
-
- def broker_info(broker); end
- end
-
- class BrokerURL
-
- attr_reader :host, :port, :auth_name, :auth_pass, :auth_mech
-
- def initialize(text)
- uri = URI.parse(text)
-
- @host = uri.host
- @port = uri.port ? uri.port : 5672
- @auth_name = uri.user ? uri.user : "guest"
- @auth_pass = uri.password ? uri.password: "guest"
- @auth_mech = "PLAIN"
-
- return uri
- end
-
- def name
- "#{@host}:#{@port}"
- end
-
- def match(host, port)
- # FIXME: Unlcear what the Python code is actually checking for
- # here, especially since HOST can resolve to multiple IP's
- @port == port &&
- (host == @host || ipaddr(host, port) == ipaddr(@host, @port))
- end
-
- private
- def ipaddr(host, port)
- s = Socket::getaddrinfo(host, port,
- Socket::AF_INET, Socket::SOCK_STREAM)
- s[0][2]
- end
- end
-
- # An instance of the Session class represents a console session running
- # against one or more QMF brokers. A single instance of Session is
- # needed to interact with the management framework as a console.
- class Session
- CONTEXT_SYNC = 1
- CONTEXT_STARTUP = 2
- CONTEXT_MULTIGET = 3
-
- GET_WAIT_TIME = 60
-
- include MonitorMixin
-
- attr_reader :binding_key_list, :select, :seq_mgr, :console, :packages
-
- # Initialize a session. If the console argument is provided, the
- # more advanced asynchronous features are available. If console is
- # defaulted, the session will operate in a simpler, synchronous
- # manner. The rcvObjects, rcvEvents, and rcvHeartbeats arguments
- # are meaningful only if 'console' is provided. They control
- # whether object updates, events, and agent-heartbeats are
- # subscribed to. If the console is not interested in receiving one
- # or more of the above, setting the argument to False will reduce
- # tha bandwidth used by the API. If manageConnections is set to
- # True, the Session object will manage connections to the brokers.
- # This means that if a broker is unreachable, it will retry until a
- # connection can be established. If a connection is lost, the
- # Session will attempt to reconnect.
- #
- # If manageConnections is set to False, the user is responsible for
- # handing failures. In this case, an unreachable broker will cause
- # addBroker to raise an exception. If userBindings is set to False
- # (the default) and rcvObjects is True, the console will receive
- # data for all object classes. If userBindings is set to True, the
- # user must select which classes the console shall receive by
- # invoking the bindPackage or bindClass methods. This allows the
- # console to be configured to receive only information that is
- # relavant to a particular application. If rcvObjects id False,
- # userBindings has no meaning.
- #
- # Accept a hash of parameters, where keys can be :console,
- # :rcv_objects, :rcv_events, :rcv_heartbeats, :manage_connections,
- # and :user_bindings
- def initialize(kwargs = {})
- super()
- @console = kwargs[:console] || nil
- @brokers = []
- @packages = {}
- @seq_mgr = SequenceManager.new
- @cv = new_cond
- @sync_sequence_list = []
- @result = []
- @select = []
- @error = nil
- @rcv_objects = kwargs[:rcv_objects] == nil ? true : kwargs[:rcv_objects]
- @rcv_events = kwargs[:rcv_events] == nil ? true : kwargs[:rcv_events]
- @rcv_heartbeats = kwargs[:rcv_heartbeats] == nil ? true : kwargs[:rcv_heartbeats]
- @user_bindings = kwargs[:user_bindings] == nil ? false : kwargs[:user_bindings]
- unless @console
- @rcv_objects = false
- @rcv_events = false
- @rcv_heartbeats = false
- end
- @binding_key_list = binding_keys
- @manage_connections = kwargs[:manage_connections] || false
-
- if @user_bindings && ! @rcv_objects
- raise ArgumentError, "user_bindings can't be set unless rcv_objects is set and a console is provided"
- end
-
- end
-
- def to_s
- "QMF Console Session Manager (brokers: #{@brokers.size})"
- end
-
- def managedConnections?
- return @manage_connections
- end
-
- # Connect to a Qpid broker. Returns an object of type Broker
- def add_broker(target="amqp://localhost")
- url = BrokerURL.new(target)
- broker = Broker.new(self, url.host, url.port, url.auth_mech, url.auth_name, url.auth_pass)
- unless broker.connected? || @manage_connections
- raise broker.error
- end
-
- @brokers << broker
- objects(:broker => broker, :class => "agent") unless @manage_connections
- return broker
- end
-
- # Disconnect from a broker. The 'broker' argument is the object
- # returned from the addBroker call
- def del_broker(broker)
- broker.shutdown
- @brokers.delete(broker)
- end
-
- # Get the list of known classes within a QMF package
- def classes(package_name)
- @brokers.each { |broker| broker.wait_for_stable }
- if @packages.include?(package_name)
- # FIXME What's the actual structure of @packages[package_name]
- @packages[package_name].inject([]) do |list, cname, hash|
- list << [ package_name, cname, hash]
- end
- end
- end
-
- # Get the schema for a QMF class
- def schema(klass_key)
- @brokers.each { |broker| broker.wait_for_stable }
- pname, cname, hash = klass_key
- if @packages.include?(pname)
- @packages[pname][ [cname, hash] ]
- end
- end
-
- def bind_package(package_name)
- unless @user_bindings && @rcv_objects
- raise "userBindings option not set for Session"
- end
- @brokers.each do |broker|
- args = { :exchange => "qpid.management",
- :queue => broker.topic_name,
- :binding_key => "console.obj.*.*.#{package_name}.#" }
- broker.amqpSession.exchange_bind(args)
- end
- end
-
- def bind_class(package_name, class_name)
- unless @user_bindings && @rcv_objects
- raise "userBindings option not set for Session"
- end
- @brokers.each do |broker|
- args = { :exchange => "qpid.management",
- :queue => broker.topic_name,
- :binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" }
- broker.amqpSession.exchange_bind(args)
- end
- end
-
- def bind_class_key(klass_key)
- unless @user_bindings && @rcv_objects
- raise "userBindings option not set for Session"
- end
- pname, cname, hash = klass_key
- @brokers.each do |broker|
- args = { :exchange => "qpid.management",
- :queue => broker.topic_name,
- :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" }
- broker.amqpSession.exchange_bind(args)
- end
- end
-
- # Get a list of currently known agents
- def agents(broker=nil)
- broker_list = []
- if broker.nil?
- broker_list = @brokers.dup
- else
- broker_list << broker
- end
- broker_list.each { |b| b.wait_for_stable }
- agent_list = []
- broker_list.each { |b| agent_list += b.agents }
- return agent_list
- end
-
- # Get a list of objects from QMF agents.
- # All arguments are passed by name(keyword).
- #
- # The class for queried objects may be specified in one of the
- # following ways:
- # :schema => <schema> - supply a schema object returned from getSchema.
- # :key => <key> - supply a klass_key from the list returned by getClasses.
- # :class => <name> - supply a class name as a string. If the class name exists
- # in multiple packages, a _package argument may also be supplied.
- # :object_id = <id> - get the object referenced by the object-id
- #
- # If objects should be obtained from only one agent, use the following argument.
- # Otherwise, the query will go to all agents.
- #
- # :agent = <agent> - supply an agent from the list returned by getAgents.
- # If the get query is to be restricted to one broker (as opposed to
- # all connected brokers), add the following argument:
- #
- # :broker = <broker> - supply a broker as returned by addBroker.
- #
- # If additional arguments are supplied, they are used as property
- # selectors, as long as their keys are strings. For example, if
- # the argument "name" => "test" is supplied, only objects whose
- # "name" property is "test" will be returned in the result.
- def objects(kwargs)
- if kwargs.include?(:broker)
- broker_list = []
- broker_list << kwargs[:broker]
- else
- broker_list = @brokers
- end
- broker_list.each { |broker| broker.wait_for_stable }
-
- agent_list = []
- if kwargs.include?(:agent)
- agent = kwargs[:agent]
- unless broker_list.include?(agent.broker)
- raise ArgumentError, "Supplied agent is not accessible through the supplied broker"
- end
- agent_list << agent
- else
- if kwargs.include?(:object_id)
- oid = kwargs[:object_id]
- broker_list.each { |broker|
- broker.agents.each { |agent|
- if oid.broker_bank == agent.broker_bank && oid.agent_bank == agent.agent_bank
- agent_list << agent
- end
- }
- }
- else
- broker_list.each { |broker| agent_list += broker.agents }
- end
- end
-
- cname = nil
- if kwargs.include?(:schema)
- # FIXME: What kind of object is kwargs[:schema]
- pname, cname, hash = kwargs[:schema].getKey()
- elsif kwargs.include?(:key)
- pname, cname, hash = kwargs[:key]
- elsif kwargs.include?(:class)
- pname, cname, hash = [kwargs[:package], kwargs[:class], nil]
- end
- if cname.nil? && ! kwargs.include?(:object_id)
- raise ArgumentError,
- "No class supplied, use :schema, :key, :class, or :object_id' argument"
- end
-
- map = {}
- @select = []
- if kwargs.include?(:object_id)
- map["_objectid"] = kwargs[:object_id].to_s
- else
- map["_class"] = cname
- map["_package"] = pname if pname
- map["_hash"] = hash if hash
- kwargs.each do |k,v|
- @select << [k, v] if k.is_a?(String)
- end
- end
-
- @result = []
- agent_list.each do |agent|
- broker = agent.broker
- send_codec = Qpid::StringCodec.new(broker.conn.spec)
- seq = nil
- synchronize do
- seq = @seq_mgr.reserve(CONTEXT_MULTIGET)
- @sync_sequence_list << seq
- end
- broker.set_header(send_codec, ?G, seq)
- send_codec.write_map(map)
- bank_key = "%d.%d" % [broker.broker_bank, agent.agent_bank]
- smsg = broker.message(send_codec.encoded, "agent.#{bank_key}")
- broker.emit(smsg)
- end
-
- timeout = false
- synchronize do
- unless @cv.wait_for(GET_WAIT_TIME) {
- @sync_sequence_list.empty? || @error }
- @sync_sequence_list.each do |pending_seq|
- @seq_mgr.release(pending_seq)
- end
- @sync_sequence_list = []
- timeout = true
- end
- end
-
- if @error
- errorText = @error
- @error = nil
- raise errorText
- end
-
- if @result.empty? && timeout
- raise "No agent responded within timeout period"
- end
- @result
- end
-
- def set_event_filter(kwargs); end
-
- def handle_broker_connect(broker); end
-
- def handle_broker_resp(broker, codec, seq)
- broker.broker_id = codec.read_uuid
- @console.broker_info(broker) if @console
-
- # Send a package request
- # (effectively inc and dec outstanding by not doing anything)
- send_codec = Qpid::StringCodec.new(broker.conn.spec)
- seq = @seq_mgr.reserve(CONTEXT_STARTUP)
- broker.set_header(send_codec, ?P, seq)
- smsg = broker.message(send_codec.encoded)
- broker.emit(smsg)
- end
-
- def handle_package_ind(broker, codec, seq)
- pname = codec.read_str8
- new_package = false
- synchronize do
- new_package = ! @packages.include?(pname)
- @packages[pname] = {} if new_package
- end
- @console.new_package(pname) if @console
-
- # Send a class request
- broker.inc_outstanding
- send_codec = Qpid::StringCodec.new(broker.conn.spec)
- seq = @seq_mgr.reserve(CONTEXT_STARTUP)
- broker.set_header(send_codec, ?Q, seq)
- send_codec.write_str8(pname)
- smsg = broker.message(send_codec.encoded)
- broker.emit(smsg)
- end
-
- def handle_command_complete(broker, codec, seq)
- code = codec.read_uint32
- text = codec.read_str8
- context = @seq_mgr.release(seq)
- if context == CONTEXT_STARTUP
- broker.dec_outstanding
- elsif context == CONTEXT_SYNC && seq == broker.sync_sequence
- broker.sync_done
- elsif context == CONTEXT_MULTIGET && @sync_sequence_list.include?(seq)
- synchronize do
- @sync_sequence_list.delete(seq)
- @cv.signal if @sync_sequence_list.empty?
- end
- end
- end
-
- def handle_class_ind(broker, codec, seq)
- kind = codec.read_uint8
- pname = codec.read_str8
- cname = codec.read_str8
- hash = codec.read_bin128
- unknown = false
-
- synchronize do
- return unless @packages.include?(pname)
- unknown = true unless @packages[pname].include?([cname, hash])
- end
-
- if unknown
- # Send a schema request for the unknown class
- broker.inc_outstanding
- send_codec = Qpid::StringCodec.new(broker.conn.spec)
- seq = @seq_mgr.reserve(CONTEXT_STARTUP)
- broker.set_header(send_codec, ?S, seq)
- send_codec.write_str8(pname)
- send_codec.write_str8(cname)
- send_codec.write_bin128(hash)
- smsg = broker.message(send_codec.encoded)
- broker.emit(smsg)
- end
- end
-
- def handle_method_resp(broker, codec, seq)
- code = codec.read_uint32
-
- text = codec.read_str16
- out_args = {}
- method, synchronous = @seq_mgr.release(seq)
- if code == 0
- method.arguments.each do |arg|
- if arg.dir.index(?O)
- out_args[arg.name] = decode_value(codec, arg.type)
- end
- end
- end
- result = MethodResult.new(code, text, out_args)
- if synchronous:
- broker.synchronize do
- broker.sync_result = MethodResult.new(code, text, out_args)
- broker.sync_done
- end
- else
- @console.method_response(broker, seq, result) if @console
- end
- end
-
- def handle_heartbeat_ind(broker, codec, seq, msg)
- if @console
- broker_bank = 1
- agent_bank = 0
- dp = msg.get("delivery_properties")
- if dp
- key = dp["routing_key"]
- key_elements = key.split(".")
- if key_elements.length == 4
- broker_bank = key_elements[2].to_i
- agent_bank = key_elements[3].to_i
- end
- end
- agent = broker.agent(broker_bank, agent_bank)
- timestamp = codec.read_uint64
- @console.heartbeat(agent, timestamp) if agent
- end
- end
-
- def handle_event_ind(broker, codec, seq)
- if @console
- event = Event.new(self, broker, codec)
- @console.event(broker, event)
- end
- end
-
- def handle_schema_resp(broker, codec, seq)
- kind = codec.read_uint8
- pname = codec.read_str8
- cname = codec.read_str8
- hash = codec.read_bin128
- klass_key = [pname, cname, hash]
- klass = SchemaClass.new(kind, klass_key, codec)
- synchronize { @packages[pname][ [cname, hash] ] = klass }
-
- @seq_mgr.release(seq)
- broker.dec_outstanding
- @console.new_class(kind, klass_key) if @console
- end
-
- def handle_content_ind(broker, codec, seq, prop=false, stat=false)
- pname = codec.read_str8
- cname = codec.read_str8
- hash = codec.read_bin128
- klass_key = [pname, cname, hash]
-
- schema = nil
- synchronize do
- return unless @packages.include?(pname)
- return unless @packages[pname].include?([cname, hash])
- schema = @packages[pname][ [cname, hash] ]
- end
-
- object = Qpid::Qmf::Object.new(self, broker, schema, codec, prop, stat)
- if pname == "org.apache.qpid.broker" && cname == "agent" && prop
- broker.update_agent(object)
- end
-
- synchronize do
- if @sync_sequence_list.include?(seq)
- if object.timestamps()[2] == 0 && select_match(object)
- @result << object
- end
- return
- end
- end
-
- @console.object_props(broker, object) if @console && @rcv_objects && prop
- @console.object_stats(broker, object) if @console && @rcv_objects && stat
- end
-
- def handle_broker_disconnect(broker); end
-
- def handle_error(error)
- @error = error
- synchronize do
- @sync_sequence_list = []
- @cv.signal
- end
- end
-
- # Decode, from the codec, a value based on its typecode
- def decode_value(codec, typecode)
- case typecode
- when 1: data = codec.read_uint8 # U8
- when 2: data = codec.read_uint16 # U16
- when 3: data = codec.read_uint32 # U32
- when 4: data = codec.read_uint64 # U64
- when 6: data = codec.read_str8 # SSTR
- when 7: data = codec.read_str16 # LSTR
- when 8: data = codec.read_int64 # ABSTIME
- when 9: data = codec.read_uint64 # DELTATIME
- when 10: data = ObjectId.new(codec) # REF
- when 11: data = codec.read_uint8 != 0 # BOOL
- when 12: data = codec.read_float # FLOAT
- when 13: data = codec.read_double # DOUBLE
- when 14: data = codec.read_uuid # UUID
- when 15: data = codec.read_map # FTABLE
- when 16: data = codec.read_int8 # S8
- when 17: data = codec.read_int16 # S16
- when 18: data = codec.read_int32 # S32
- when 19: data = codec.read_int64 # S64
- else
- raise ArgumentError, "Invalid type code: #{typecode} - #{typecode.inspect}"
- end
- return data
- end
-
- # Encode, into the codec, a value based on its typecode
- def encode_value(codec, value, typecode)
- # FIXME: Python does a lot of magic type conversions
- # We just assume that value has the right type; this is safer
- # than coercing explicitly, since Array::pack will complain
- # loudly about various type errors
- case typecode
- when 1: codec.write_uint8(value) # U8
- when 2: codec.write_uint16(value) # U16
- when 3: codec.write_uint32(value) # U32
- when 4: codec.write_uint64(value) # U64
- when 6: codec.write_str8(value) # SSTR
- when 7: codec.write_str16(value) # LSTR
- when 8: codec.write_int64(value) # ABSTIME
- when 9: codec.write_uint64(value) # DELTATIME
- when 10: value.encode(codec) # REF
- when 11: codec.write_uint8(value ? 1 : 0) # BOOL
- when 12: codec.write_float(value) # FLOAT
- when 13: codec.write_double(value) # DOUBLE
- when 14: codec.write_uuid(value) # UUID
- when 15: codec.write_map(value) # FTABLE
- when 16: codec.write_int8(value) # S8
- when 17: codec.write_int16(value) # S16
- when 18: codec.write_int32(value) # S32
- when 19: codec.write_int64(value) # S64
- else
- raise ValueError, "Invalid type code: %d" % typecode
- end
- end
-
- def display_value(value, typecode)
- case typecode
- when 1: return value.to_s
- when 2: return value.to_s
- when 3: return value.to_s
- when 4: return value.to_s
- when 6: return value.to_s
- when 7: return value.to_s
- when 8: return strftime("%c", gmtime(value / 1000000000))
- when 9: return value.to_s
- when 10: return value.to_s
- when 11: return value ? 'T' : 'F'
- when 12: return value.to_s
- when 13: return value.to_s
- when 14: return Qpid::UUID::format(value)
- when 15: return value.to_s
- when 16: return value.to_s
- when 17: return value.to_s
- when 18: return value.to_s
- when 19: return value.to_s
- else
- raise ValueError, "Invalid type code: %d" % typecode
- end
- end
-
- private
-
- def binding_keys
- key_list = []
- key_list << "schema.#"
- if @rcv_objects && @rcv_events && @rcv_heartbeats &&
- ! @user_bindings
- key_list << "console.#"
- else
- if @rcv_objects && ! @user_bindings
- key_list << "console.obj.#"
- else
- key_list << "console.obj.*.*.org.apache.qpid.broker.agent"
- end
- key_list << "console.event.#" if @rcv_events
- key_list << "console.heartbeat.#" if @rcv_heartbeats
- end
- return key_list
- end
-
- # Check the object against select to check for a match
- def select_match(object)
- select.each do |key, value|
- object.properties.each do |prop, propval|
- return false if key == prop.name && value != propval
- end
- end
- return true
- end
-
- end
-
- class Package
- attr_reader :name
-
- def initialize(name)
- @name = name
- end
- end
-
- # A ClassKey uniquely identifies a class from the schema.
- class ClassKey
- attr_reader :package, :klass_name, :hash
-
- def initialize(package, klass_name, hash)
- @package = package
- @klass_name = klass_name
- @hash = hash
- end
- end
-
- class SchemaClass
-
- CLASS_KIND_TABLE = 1
- CLASS_KIND_EVENT = 2
-
- attr_reader :klass_key, :properties, :statistics, :methods, :arguments
-
- def initialize(kind, key, codec)
- @kind = kind
- @klass_key = key
- @properties = []
- @statistics = []
- @methods = []
- @arguments = []
-
- if @kind == CLASS_KIND_TABLE
- prop_count = codec.read_uint16
- stat_count = codec.read_uint16
- method_count = codec.read_uint16
- prop_count.times { |idx|
- @properties << SchemaProperty.new(codec) }
- stat_count.times { |idx|
- @statistics << SchemaStatistic.new(codec) }
- method_count.times { |idx|
- @methods<< SchemaMethod.new(codec) }
- elsif @kind == CLASS_KIND_EVENT
- arg_count = codec.read_uint16
- arg_count.times { |idx|
- sa = SchemaArgument.new(codec, false)
- @arguments << sa
- }
- end
- end
-
- def to_s
- pname, cname, hash = @klass_key
- if @kind == CLASS_KIND_TABLE
- kind_str = "Table"
- elsif @kind == CLASS_KIND_EVENT
- kind_str = "Event"
- else
- kind_str = "Unsupported"
- end
- result = "%s Class: %s:%s " % [kind_str, pname, cname]
- result += Qpid::UUID::format(hash)
- return result
- end
- end
-
- class SchemaProperty
-
- attr_reader :name, :type, :access, :index, :optional,
- :unit, :min, :max, :maxlan, :desc
-
- def initialize(codec)
- map = codec.read_map
- @name = map["name"]
- @type = map["type"]
- @access = map["access"]
- @index = map["index"] != 0
- @optional = map["optional"] != 0
- @unit = map["unit"]
- @min = map["min"]
- @max = map["max"]
- @maxlan = map["maxlen"]
- @desc = map["desc"]
- end
-
- def to_s
- @name
- end
- end
-
- class SchemaStatistic
-
- attr_reader :name, :type, :unit, :desc
-
- def initialize(codec)
- map = codec.read_map
- @name = map["name"]
- @type = map["type"]
- @unit = map["unit"]
- @desc = map["desc"]
- end
-
- def to_s
- @name
- end
- end
-
- class SchemaMethod
-
- attr_reader :name, :desc, :arguments
-
- def initialize(codec)
- map = codec.read_map
- @name = map["name"]
- arg_count = map["argCount"]
- @desc = map["desc"]
- @arguments = []
- arg_count.times { |idx|
- @arguments << SchemaArgument.new(codec, true)
- }
- end
-
- def to_s
- result = @name + "("
- first = true
- result += @arguments.select { |arg| arg.dir.index(?I) }.join(", ")
- result += ")"
- return result
- end
- end
-
- class SchemaArgument
-
- attr_reader :name, :type, :dir, :unit, :min, :max, :maxlen
- attr_reader :desc, :default
-
- def initialize(codec, method_arg)
- map = codec.read_map
- @name = map["name"]
- @type = map["type"]
- @dir = map["dir"].upcase if method_arg
- @unit = map["unit"]
- @min = map["min"]
- @max = map["max"]
- @maxlen = map["maxlen"]
- @desc = map["desc"]
- @default = map["default"]
- end
- end
-
- # Object that represents QMF object identifiers
- class ObjectId
-
- include Comparable
-
- attr_reader :first, :second
-
- def initialize(codec, first=0, second=0)
- if codec
- @first = codec.read_uint64
- @second = codec.read_uint64
- else
- @first = first
- @second = second
- end
- end
-
- def <=>(other)
- return 1 unless other.is_a?(ObjectId)
- return -1 if first < other.first
- return 1 if first > other.first
- return second <=> other.second
- end
-
- def to_s
- "%d-%d-%d-%d-%d" % [flags, sequence, broker_bank, agent_bank, object]
- end
-
- def index
- [first, second]
- end
-
- def flags
- (first & 0xF000000000000000) >> 60
- end
-
- def sequence
- (first & 0x0FFF000000000000) >> 48
- end
-
- def broker_bank
- (first & 0x0000FFFFF0000000) >> 28
- end
-
- def agent_bank
- first & 0x000000000FFFFFFF
- end
-
- def object
- second
- end
-
- def durable?
- sequence == 0
- end
-
- def encode(codec)
- codec.write_uint64(first)
- codec.write_uint64(second)
- end
- end
-
- class Object
-
- attr_reader :object_id, :schema, :properties, :statistics,
- :current_time, :create_time, :delete_time, :broker
-
- def initialize(session, broker, schema, codec, prop, stat)
- @session = session
- @broker = broker
- @schema = schema
- @current_time = codec.read_uint64
- @create_time = codec.read_uint64
- @delete_time = codec.read_uint64
- @object_id = ObjectId.new(codec)
- @properties = []
- @statistics = []
- if prop
- missing = parse_presence_masks(codec, schema)
- schema.properties.each do |property|
- v = nil
- unless missing.include?(property.name)
- v = @session.decode_value(codec, property.type)
- end
- @properties << [property, v]
- end
- end
-
- if stat
- schema.statistics.each do |statistic|
- s = @session.decode_value(codec, statistic.type)
- @statistics << [statistic, s]
- end
- end
- end
-
- def klass_key
- @schema.klass_key
- end
-
-
- def methods
- @schema.methods
- end
-
- # Return the current, creation, and deletion times for this object
- def timestamps
- return [@current_time, @create_time, @delete_time]
- end
-
- # Return a string describing this object's primary key
- def index
- @properties.select { |property, value|
- property.index
- }.collect { |property,value|
- @session.display_value(value, property.type) }.join(":")
- end
-
- # Replace properties and/or statistics with a newly received update
- def merge_update(newer)
- unless object_id == newer.object_id
- raise "Objects with different object-ids"
- end
- @properties = newer.getProperties unless newer.properties.empty?
- @statistics = newer.getStatistics unless newer.statistics.empty?
- end
-
- def to_s
- key = klass_key
- key[0] + ":" + key[1] + "[" + @object_id.to_s() + "] " + index
- end
-
- # This must be defined because ruby has this (deprecated) method built in.
- def id
- method_missing(:id)
- end
-
- # Same here..
- def type
- method_missing(:type)
- end
-
- def name
- method_missing(:name)
- end
-
- def method_missing(name, *args)
- name = name.to_s
-
- if method = @schema.methods.find { |method| name == method.name }
- return invoke(method, name, args)
- end
-
- @properties.each do |property, value|
- return value if name == property.name
- if name == "_#{property.name}_" && property.type == 10
- # Dereference references
- deref = @session.objects(:object_id => value, :broker => @broker)
- return nil unless deref.size == 1
- return deref[0]
- end
- end
- @statistics.each do |statistic, value|
- if name == statistic.name
- return value
- end
- end
- raise "Type Object has no attribute '#{name}'"
- end
-
- private
-
- def send_method_request(method, name, args, synchronous = false)
- @schema.methods.each do |schema_method|
- if name == schema_method.name
- send_codec = Qpid::StringCodec.new(@broker.conn.spec)
- seq = @session.seq_mgr.reserve([schema_method, synchronous])
- @broker.set_header(send_codec, ?M, seq)
- @object_id.encode(send_codec)
- pname, cname, hash = @schema.klass_key
- send_codec.write_str8(pname)
- send_codec.write_str8(cname)
- send_codec.write_bin128(hash)
- send_codec.write_str8(name)
-
- formals = method.arguments.select { |arg| arg.dir.index(?I) }
- count = method.arguments.select { |arg| arg.dir.index(?I) }.size
- unless formals.size == args.size
- raise "Incorrect number of arguments: expected #{formals.size}, got #{args.size}"
- end
-
- formals.zip(args).each do |formal, actual|
- @session.encode_value(send_codec, actual, formal.type)
- end
-
- smsg = @broker.message(send_codec.encoded,
- "agent.#{object_id.broker_bank}.#{object_id.agent_bank}")
-
- @broker.sync_start if synchronous
- @broker.emit(smsg)
-
- return seq
- end
- end
- end
-
- def invoke(method, name, args)
- if send_method_request(method, name, args, synchronous = true)
- unless @broker.wait_for_sync_done
- @session.seq_mgr.release(seq)
- raise "Timed out waiting for method to respond"
- end
-
- if @broker.error
- error_text = @broker.error
- @broker.error = nil
- raise error_text
- end
-
- return @broker.sync_result
- end
- raise "Invalid Method (software defect) [#{name}]"
- end
-
- def parse_presence_masks(codec, schema)
- exclude_list = []
- bit = 0
- schema.properties.each do |property|
- if property.optional
- if bit == 0
- mask = codec.read_uint8
- bit = 1
- end
- if (mask & bit) == 0
- exclude_list << property.name
- end
- bit *= 2
- bit = 0 if bit == 256
- end
- end
- return exclude_list
- end
- end
-
- class MethodResult
-
- attr_reader :status, :text
-
- def initialize(status, text, out_args)
- @status = status
- @text = text
- @out_args = out_args
- end
-
- def method_missing(name)
- name = name.to_s()
- if @out_args.include?(name)
- return @out_args[name]
- else
- raise "Unknown method result arg #{name}"
- end
- end
-
- def to_s
- "#{text} (#{status}) - #{out_args.inspect}"
- end
- end
-
- class ManagedConnection
-
- DELAY_MIN = 1
- DELAY_MAX = 128
- DELAY_FACTOR = 2
- include MonitorMixin
-
- def initialize(broker)
- super()
- @broker = broker
- @cv = new_cond
- @is_cancelled = false
- end
-
- # Main body of the running thread.
- def start
- @thread = Thread.new {
- delay = DELAY_MIN
- while true
- begin
- @broker.try_to_connect
- synchronize do
- while !@is_cancelled and @broker.connected?
- @cv.wait
- Thread.exit if @is_cancelled
- delay = DELAY_MIN
- end
- end
-
- rescue Qpid::Session::Closed, Qpid::Session::Detached, SystemCallError
- delay *= DELAY_FACTOR if delay < DELAY_MAX
- end
-
- synchronize do
- @cv.wait(delay)
- Thread.exit if @is_cancelled
- end
- end
- }
- end
-
- # Tell this thread to stop running and return.
- def stop
- synchronize do
- @is_cancelled = true
- @cv.signal
- end
- end
-
- # Notify the thread that the connection was lost.
- def disconnected
- synchronize do
- @cv.signal
- end
- end
-
- def join
- @thread.join
- end
- end
-
- class Broker
-
- SYNC_TIME = 60
-
- include MonitorMixin
-
- attr_accessor :error
-
- attr_reader :amqp_session_id, :amqp_session, :conn, :broker_bank, :topic_name
-
- attr_accessor :broker_id, :sync_result
-
- def initialize(session, host, port, auth_mech, auth_name, auth_pass)
- super()
-
- # For debugging..
- Thread.abort_on_exception = true
-
- @session = session
- @host = host
- @port = port
- @auth_name = auth_name
- @auth_pass = auth_pass
- @broker_bank = 1
- @agents = {}
- @agents["1.0"] = Agent.new(self, 0, "BrokerAgent")
- @topic_bound = false
- @cv = new_cond
- @sync_in_flight = false
- @sync_request = 0
- @sync_result = nil
- @reqs_outstanding = 1
- @error = nil
- @broker_id = nil
- @is_connected = false
- @conn = nil
- if @session.managedConnections?
- @thread = ManagedConnection.new(self)
- @thread.start
- else
- @thread = nil
- try_to_connect
- end
- end
-
- def connected?
- @is_connected
- end
-
- def agent(broker_bank, agent_bank)
- bank_key = "%d.%d" % [broker_bank, agent_bank]
- return @agents[bank_key]
- end
-
- # Get the list of agents reachable via this broker
- def agents
- @agents.values
- end
-
- def url
- "#{@host}:#{@port}"
- end
-
- def to_s
- if connected?
- "Broker connected at: #{url}"
- else
- "Disconnected Broker"
- end
- end
-
- def wait_for_sync_done
- synchronize do
- return @cv.wait_for(SYNC_TIME) { ! @sync_in_flight || @error }
- end
- end
-
- def wait_for_stable
- synchronize do
- return if @reqs_outstanding == 0
- @sync_in_flight = true
- unless @cv.wait_for(SYNC_TIME) { @reqs_outstanding == 0 }
- raise "Timed out waiting for broker to synchronize"
- end
- end
- end
-
- # Compose the header of a management message
- def set_header(codec, opcode, seq=0)
- codec.write_uint8(?A)
- codec.write_uint8(?M)
- codec.write_uint8(?2)
- codec.write_uint8(opcode)
- codec.write_uint32(seq)
- end
-
- def message(body, routing_key="broker")
- dp = @amqp_session.delivery_properties
- dp.routing_key = routing_key
- mp = @amqp_session.message_properties
- mp.content_type = "x-application/qmf"
- mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name)
- return Qpid::Message.new(dp, mp, body)
- end
-
- def emit(msg, dest="qpid.management")
- @amqp_session.message_transfer(:destination => dest,
- :message => msg)
- end
-
- def inc_outstanding
- synchronize { @reqs_outstanding += 1 }
- end
-
- def dec_outstanding
- synchronize do
- @reqs_outstanding -= 1
- if @reqs_outstanding == 0 && ! @topic_bound
- @topic_bound = true
- @session.binding_key_list.each do |key|
- args = {
- :exchange => "qpid.management",
- :queue => @topic_name,
- :binding_key => key }
- @amqp_session.exchange_bind(args)
- end
- end
- if @reqs_outstanding == 0 && @sync_in_flight
- sync_done
- end
- end
- end
-
- def sync_start
- synchronize { @sync_in_flight = true }
- end
-
- def sync_done
- synchronize do
- @sync_in_flight = false
- @cv.signal
- end
- end
-
- def update_agent(obj)
- bank_key = "%d.%d" % [obj.brokerBank, obj.agentBank]
- if obj.delete_time == 0
- unless @agents.include?(bank_key)
- agent = Agent.new(self, obj.agentBank, obj.label)
- @agents[bank_key] = agent
- @session.console.new_agent(agent) if @session.console
- end
- else
- agent = @agents.delete(bank_key)
- @session.console.del_agent(agent) if agent && @session.console
- end
- end
-
- def shutdown
- if @thread
- @thread.stop
- @thread.join
- end
- if connected?
- @amqp_session.incoming("rdest").stop
- if @session.console
- @amqp_session.incoming("tdest").stop
- end
- @amqp_session.close
- @is_connected = false
- end
- end
-
- def try_to_connect
- #begin
- @amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid]
- # FIXME: Need sth for Qpid::Util::connect
-
- @conn = Qpid::Connection.new(TCPSocket.new(@host, @port),
- :username => @auth_name,
- :password => @auth_pass)
- @conn.start
- @reply_name = "reply-%s" % amqp_session_id
- @amqp_session = @conn.session(@amqp_session_id)
- @amqp_session.auto_sync = true
-
- @amqp_session.queue_declare(:queue => @reply_name,
- :exclusive => true,
- :auto_delete => true)
-
- @amqp_session.exchange_bind(:exchange => "amq.direct",
- :queue => @reply_name,
- :binding_key => @reply_name)
- @amqp_session.message_subscribe(:queue => @reply_name,
- :destination => "rdest",
- :accept_mode => @amqp_session.message_accept_mode.none,
- :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired)
- q = @amqp_session.incoming("rdest")
- q.exc_listen(& method(:exception_cb))
- q.listen(& method(:reply_cb))
- @amqp_session.message_set_flow_mode(:destination => "rdest",
- :flow_mode => 1)
- @amqp_session.message_flow(:destination => "rdest",
- :unit => 0,
- :value => 0xFFFFFFFF)
- @amqp_session.message_flow(:destination => "rdest",
- :unit => 1,
- :value => 0xFFFFFFFF)
-
- @topic_name = "topic-#{@amqp_session_id}"
- @amqp_session.queue_declare(:queue => @topic_name,
- :exclusive => true,
- :auto_delete => true)
- @amqp_session.message_subscribe(:queue => @topic_name,
- :destination => "tdest",
- :accept_mode => @amqp_session.message_accept_mode.none,
- :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired)
- @amqp_session.incoming("tdest").listen(& method(:reply_cb))
- @amqp_session.message_set_flow_mode(:destination => "tdest",
- :flow_mode => 1)
- @amqp_session.message_flow(:destination => "tdest",
- :unit => 0,
- :value => 0xFFFFFFFF)
- @amqp_session.message_flow(:destination => "tdest",
- :unit => 1,
- :value => 0xFFFFFFFF)
-
- @is_connected = true
- @session.handle_broker_connect(self)
-
- codec = Qpid::StringCodec.new(@conn.spec)
- set_header(codec, ?B)
- msg = message(codec.encoded)
- emit(msg)
- end
-
- private
-
- # Check the header of a management message and extract the opcode and
- # class
- def check_header(codec)
- begin
- return [nil, nil] unless codec.read_uint8 == ?A
- return [nil, nil] unless codec.read_uint8 == ?M
- return [nil, nil] unless codec.read_uint8 == ?2
- opcode = codec.read_uint8
- seq = codec.read_uint32
- return [opcode, seq]
- rescue
- return [nil, nil]
- end
- end
-
- def reply_cb(msg)
- codec = Qpid::StringCodec.new(@conn.spec, msg.body)
- loop do
- opcode, seq = check_header(codec)
- return unless opcode
- case opcode
- when ?b: @session.handle_broker_resp(self, codec, seq)
- when ?p: @session.handle_package_ind(self, codec, seq)
- when ?z: @session.handle_command_complete(self, codec, seq)
- when ?q: @session.handle_class_ind(self, codec, seq)
- when ?m: @session.handle_method_resp(self, codec, seq)
- when ?h: @session.handle_heartbeat_ind(self, codec, seq, msg)
- when ?e: @session.handle_event_ind(self, codec, seq)
- when ?s: @session.handle_schema_resp(self, codec, seq)
- when ?c: @session.handle_content_ind(self, codec, seq, true, false)
- when ?i: @session.handle_content_ind(self, codec, seq, false, true)
- when ?g: @session.handle_content_ind(self, codec, seq, true, true)
- else
- raise "Unexpected opcode #{opcode.inspect}"
- end
- end
- end
-
- def exception_cb(data)
- @is_connected = false
- @error = data
- synchronize { @cv.signal if @sync_in_flight }
- @session.handle_error(@error)
- @session.handle_broker_disconnect(self)
- @thread.disconnected if @thread
- end
- end
-
- class Agent
- attr_reader :broker, :agent_bank, :label
-
- def initialize(broker, agent_bank, label)
- @broker = broker
- @agent_bank = agent_bank
- @label = label
- end
-
- def broker_bank
- @broker.broker_bank
- end
-
- def to_s
- "Agent at bank %d.%d (%s)" % [@broker.broker_bank, @agent_bank, @label]
- end
- end
-
- class Event
-
- attr_reader :klass_key, :arguments, :timestamp, :name, :schema
-
- def initialize(session, broker, codec)
- @session = session
- @broker = broker
- pname = codec.read_str8
- cname = codec.read_str8
- hash = codec.read_bin128
- @klass_key = [pname, cname, hash]
- @timestamp = codec.read_int64
- @severity = codec.read_uint8
- @schema = nil
- session.packages.keys.each do |pname|
- k = [cname, hash]
- if session.packages[pname].include?(k)
- @schema = session.packages[pname][k]
- @arguments = {}
- @schema.arguments.each do |arg|
- v = session.decode_value(codec, arg.type)
- @arguments[arg.name] = v
- end
- end
- end
- end
-
- def to_s
- return "<uninterpretable>" unless @schema
- t = Time.at(self.timestamp / 1000000000)
- out = t.strftime("%c")
- out += " " + sev_name + " " + @klass_key[0] + ":" + klass_key[1]
- out += " broker=" + @broker.url
- @schema.arguments.each do |arg|
- out += " " + arg.name + "=" + @session.display_value(@arguments[arg.name], arg.type)
- end
- return out
- end
-
- def sev_name
- case @severity
- when 0 : return "EMER "
- when 1 : return "ALERT"
- when 2 : return "CRIT "
- when 3 : return "ERROR"
- when 4 : return "WARN "
- when 5 : return "NOTIC"
- when 6 : return "INFO "
- when 7 : return "DEBUG"
- else
- return "INV-%d" % @severity
- end
- end
-
- end
-
- # Manage sequence numbers for asynchronous method calls
- class SequenceManager
- include MonitorMixin
-
- def initialize
- super()
- @sequence = 0
- @pending = {}
- end
-
- # Reserve a unique sequence number
- def reserve (data)
- synchronize do
- result = @sequence
- @sequence += 1
- @pending[result] = data
- return result
- end
- end
-
- # Release a reserved sequence number
- def release (seq)
- synchronize { @pending.delete(seq) }
- end
- end
-
- class DebugConsole < Console
-
- def broker_connected(broker)
- puts "brokerConnected #{broker}"
- end
-
- def broker_disconnected(broker)
- puts "brokerDisconnected #{broker}"
- end
-
- def new_package(name)
- puts "newPackage #{name}"
- end
-
- def new_class(kind, klass_key)
- puts "newClass #{kind} #{klass_key}"
- end
-
- def new_agent(agent)
- puts "new_agent #{agent}"
- end
-
- def del_agent(agent)
- puts "delAgent #{agent}"
- end
-
- def object_props(broker, record)
- puts "objectProps #{record}"
- end
-
- def object_stats(broker, record)
- puts "objectStats #{record}"
- end
-
- def event(broker, event)
- puts "event #{event}"
- end
-
- def heartbeat(agent, timestamp)
- puts "heartbeat #{agent}"
- end
-
- def broker_info(broker)
- puts "brokerInfo #{broker}"
- end
- end
-end