# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Console API for Qpid Management Framework require 'socket' require 'monitor' require 'thread' require 'uri' require 'time' module Qpid::Qmf # To access the asynchronous operations, a class must be derived from # Console with overrides of any combination of the available methods. class Console # Invoked when a connection is established to a broker def broker_connected(broker); end # Invoked when the connection to a broker is lost def broker_disconnected(broker); end # Invoked when a QMF package is discovered def new_package(name); end # Invoked when a new class is discovered. Session.getSchema can be # used to obtain details about the class def new_class(kind, klass_key); end # Invoked when a QMF agent is discovered def new_agent(agent); end # Invoked when a QMF agent disconects def del_agent(agent); end # Invoked when an object is updated def object_props(broker, record); end # Invoked when an object is updated def object_stats(broker, record); end # Invoked when an event is raised def event(broker, event); end # Invoked when an agent heartbeat is received. def heartbeat(agent, timestamp); end # Invoked when the connection sequence reaches the point where broker information is available. def broker_info(broker); end # Invoked when a method response from an asynchronous method call is received. def method_response(broker, seq, response); end end class BrokerURL attr_reader :host, :port, :auth_name, :auth_pass def initialize(text) uri = URI.parse(text) @host = uri.host @port = uri.port ? uri.port : 5672 @auth_name = uri.user @auth_pass = uri.password return uri end def name "#{@host}:#{@port}" end def match(host, port) # FIXME: Unlcear what the Python code is actually checking for # here, especially since HOST can resolve to multiple IP's @port == port && (host == @host || ipaddr(host, port) == ipaddr(@host, @port)) end private def ipaddr(host, port) s = Socket::getaddrinfo(host, port, Socket::AF_INET, Socket::SOCK_STREAM) s[0][2] end end # An instance of the Session class represents a console session running # against one or more QMF brokers. A single instance of Session is # needed to interact with the management framework as a console. class Session CONTEXT_SYNC = 1 CONTEXT_STARTUP = 2 CONTEXT_MULTIGET = 3 DEFAULT_GET_WAIT_TIME = 60 include MonitorMixin attr_reader :binding_key_list, :select, :seq_mgr, :console, :packages # Initialize a session. If the console argument is provided, the # more advanced asynchronous features are available. If console is # defaulted, the session will operate in a simpler, synchronous # manner. The rcvObjects, rcvEvents, and rcvHeartbeats arguments # are meaningful only if 'console' is provided. They control # whether object updates, events, and agent-heartbeats are # subscribed to. If the console is not interested in receiving one # or more of the above, setting the argument to False will reduce # tha bandwidth used by the API. If manageConnections is set to # True, the Session object will manage connections to the brokers. # This means that if a broker is unreachable, it will retry until a # connection can be established. If a connection is lost, the # Session will attempt to reconnect. # # If manageConnections is set to False, the user is responsible for # handing failures. In this case, an unreachable broker will cause # addBroker to raise an exception. If userBindings is set to False # (the default) and rcvObjects is True, the console will receive # data for all object classes. If userBindings is set to True, the # user must select which classes the console shall receive by # invoking the bindPackage or bindClass methods. This allows the # console to be configured to receive only information that is # relavant to a particular application. If rcvObjects id False, # userBindings has no meaning. # # Accept a hash of parameters, where keys can be :console, # :rcv_objects, :rcv_events, :rcv_heartbeats, :manage_connections, # and :user_bindings def initialize(kwargs = {}) super() @console = kwargs[:console] || nil @brokers = [] @packages = {} @seq_mgr = SequenceManager.new @cv = new_cond @sync_sequence_list = [] @result = [] @select = [] @error = nil @rcv_objects = kwargs[:rcv_objects] == nil ? true : kwargs[:rcv_objects] @rcv_events = kwargs[:rcv_events] == nil ? true : kwargs[:rcv_events] @rcv_heartbeats = kwargs[:rcv_heartbeats] == nil ? true : kwargs[:rcv_heartbeats] @user_bindings = kwargs[:user_bindings] == nil ? false : kwargs[:user_bindings] unless @console @rcv_objects = false @rcv_events = false @rcv_heartbeats = false end @binding_key_list = binding_keys @manage_connections = kwargs[:manage_connections] || false if @user_bindings && ! @rcv_objects raise ArgumentError, "user_bindings can't be set unless rcv_objects is set and a console is provided" end end def to_s "QMF Console Session Manager (brokers: #{@brokers.size})" end def managedConnections? return @manage_connections end # Connect to a Qpid broker. Returns an object of type Broker # # To supply a username for authentication, use the URL syntax: # # amqp://username@hostname:port # # If the broker needs a password for the client, an interactive prompt will be # provided to the user. # # To supply a username and a password, use # # amqp://username:password@hostname:port # # The following keyword arguments may be used to control authentication: # # :mechanism - SASL mechanism (i.e. "PLAIN", "GSSAPI", "ANONYMOUS", etc. # - defaults to unspecified (the system chooses for you) # :service - SASL service name (i.e. the kerberos principal of the broker) # - defaults to "qpidd" # :min_ssf - Minimum Security Strength Factor for SASL security layers # - defaults to 0 # :max_ssf - Maximum Security Strength Factor for SASL security layers # - defaults to 65535 # def add_broker(target = "amqp://localhost", kwargs = {}) url = BrokerURL.new(target) broker = Broker.new(self, url.host, url.port, url.auth_name, url.auth_pass, kwargs) unless broker.connected? || @manage_connections raise broker.error end @brokers << broker objects(:broker => broker, :class => "agent") unless @manage_connections return broker end # Disconnect from a broker. The 'broker' argument is the object # returned from the addBroker call def del_broker(broker) broker.shutdown @brokers.delete(broker) end # Get the list of known classes within a QMF package def classes(package_name) list = [] @brokers.each { |broker| broker.wait_for_stable } if @packages.include?(package_name) # FIXME What's the actual structure of @packages[package_name] @packages[package_name].each do |key, schema_class| list << schema_class.klass_key end end return list end # Get the schema for a QMF class def schema(klass_key) @brokers.each { |broker| broker.wait_for_stable } if @packages.include?(klass_key.package) @packages[klass_key.package][ [klass_key.klass_name, klass_key.hash] ] end end def bind_package(package_name) unless @user_bindings && @rcv_objects raise "userBindings option not set for Session" end @brokers.each do |broker| args = { :exchange => "qpid.management", :queue => broker.topic_name, :binding_key => "console.obj.*.*.#{package_name}.#" } broker.amqp_session.exchange_bind(args) end end def bind_class(package_name, class_name) unless @user_bindings && @rcv_objects raise "userBindings option not set for Session" end @brokers.each do |broker| args = { :exchange => "qpid.management", :queue => broker.topic_name, :binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" } broker.amqp_session.exchange_bind(args) end end def bind_class_key(klass_key) unless @user_bindings && @rcv_objects raise "userBindings option not set for Session" end pname, cname, hash = klass_key.to_a() @brokers.each do |broker| args = { :exchange => "qpid.management", :queue => broker.topic_name, :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" } broker.amqp_session.exchange_bind(args) end end # Get a list of currently known agents def agents(broker=nil) broker_list = [] if broker.nil? broker_list = @brokers.dup else broker_list << broker end broker_list.each { |b| b.wait_for_stable } agent_list = [] broker_list.each { |b| agent_list += b.agents } return agent_list end # Get a list of objects from QMF agents. # All arguments are passed by name(keyword). # # The class for queried objects may be specified in one of the # following ways: # :schema => - supply a schema object returned from getSchema. # :key => - supply a klass_key from the list returned by getClasses. # :class => - supply a class name as a string. If the class name exists # in multiple packages, a _package argument may also be supplied. # :object_id = - get the object referenced by the object-id # # If objects should be obtained from only one agent, use the following argument. # Otherwise, the query will go to all agents. # # :agent = - supply an agent from the list returned by getAgents. # # If the get query is to be restricted to one broker (as opposed to # all connected brokers), add the following argument: # # :broker = - supply a broker as returned by addBroker. # # The default timeout for this synchronous operation is 60 seconds. To change the timeout, # use the following argument: # # :timeout =