# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # require 'cqpid' require 'qpid/errors' module Qpid module Messaging # A Session represents a distinct conversation between end points. class Session def initialize(connection, session) # :nodoc: @connection = connection @session_impl = session @senders = Hash.new @receivers = Hash.new end def session_impl # :nodoc: @session_impl end # Returns the +Connection+ associated with this session. def connection @connection end # Creates a new endpoint for sending messages. # # The +address+ can either be an instance +Address+ or else a # string that describes an address endpoint. # # ==== Arguments # # * +address+ The end point address. # # ==== Examples # # sender = session.create_sender "my-queue;{create:always}" # def create_sender(address) _address = address if address.class == Qpid::Messaging::Address _address = address.address_impl end sender_impl = @session_impl.createSender(_address) sender_name = sender_impl.getName @senders[sender_name] = Qpid::Messaging::Sender.new(self, sender_impl) @senders[sender_name] end # Retrieves the +Sender+ with the specified name. # # The +Sender+ must have been previously created using # the +create_sender+ method. # # ==== Arguments # # * +name+ The +Sender+ name. # # ==== Examples # # sender = session.sender "my-queue" # def sender(name) raise Qpid::Messaging::KeyError, "No such sender: #{name}" unless @senders.has_key? name @senders[name] end # Creates a new endpoint for receiving messages. # # The +address+ can either be an instance +Address+ or else a # string that describes an address endpoint. # # ==== Arguments # # * +address+ The end point address. # # ==== Examples # # receiver = session.create_receiver "my-queue" # def create_receiver(address) result = nil receiver_impl = nil if address.class == Qpid::Messaging::Address address_impl = address.address_impl receiver_impl = @session_impl.createReceiver address_impl else receiver_impl = @session_impl.createReceiver(address) end receiver_name = receiver_impl.getName @receivers[receiver_name] = Qpid::Messaging::Receiver.new self, receiver_impl @receivers[receiver_name] end # Retrieves the +Receiver+ with the specified name. # # The +Receiver+ must have been previously created using # the +create_receiver+ method. # # ==== Arguments # # * +name+ The +Receiver+ name. # # ==== Examples # # receiver = session.receiver "my-queue" # def receiver(name) raise Qpid::Messaging::KeyError, "No such receiver: #{name}" unless @receivers.has_key? name @receivers[name] end # Closes the +Session+ and all associated +Sender+ and +Receiver+ instances. # # NOTE: All +Session+ instances for a +Connection+ are closed when the # +Connection+ is closed. def close; @session_impl.close; end # Commits any pending transactions for a transactional session. def commit; @session_impl.commit; end # Rolls back any uncommitted transactions on a transactional session. def rollback; @session_impl.rollback; end # Acknowledges one or more outstanding messages that have been received # on this session. # # ==== Arguments # # * :message - if specified, then only the +Message+ specified is acknowledged # * :sync - if true then the call will block until processed by the server (def. false) # # ==== Examples # # session.acknowledge # acknowledges all received messages # session.acknowledge :message => message # acknowledge one message # session.acknowledge :sync => true # blocks until the call completes # #-- # TODO: Add an optional block to be used for blocking calls. #++ def acknowledge(args = {}) sync = args[:sync] || false message = args[:message] if args[:message] unless message.nil? @session_impl.acknowledge message.message_impl, sync else @session_impl.acknowledge sync end end # Rejects the specified message. A rejected message will not be # redelivered. # # NOTE: A message cannot be rejected once it has been acknowledged. def reject(message); @session_impl.reject message.message_impl; end # Releases the message, which allows the broker to attempt to # redeliver it. # # NOTE: A message connot be released once it has been acknowled. def release(message); @session_impl.release message.message_impl; end # Requests synchronization with the server. # # ==== Arguments # # * :block - if true then the call blocks until the server acknowledges it (def. false) # #-- # TODO: Add an optional block to be used for blocking calls. #++ def sync(args = {}) block = args[:block] || false @session_impl.sync block end # Returns the total number of receivable messages, and messages already # received, by +Receiver+ instances associated with this +Session+. def receivable; @session_impl.getReceivable; end # Returns the number of messages that have been acknowledged by this session # whose acknowledgements have not been confirmed as processed by the server. def unsettled_acks; @session_impl.getUnsettledAcks; end # Fetches the +Receiver+ for the next message. # # ==== Arguments # # * timeout - time to wait for a +Receiver+ before timing out # # ==== Examples # # recv = session.next_receiver # wait forever for the next +Receiver+ # # execute a block on the next receiver # session.next_receiver do |recv| # msg = recv.get # puts "Received message: #{msg.content}" # end def next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block) receiver_impl = @session_impl.nextReceiver(timeout.duration_impl) unless receiver_impl.nil? recv = Qpid::Messaging::Receiver.new self, receiver_impl block.call recv unless block.nil? end return recv end # Returns true if there were exceptions on this session. # # ==== Examples # # puts "There were session errors." if @session.errors? def errors?; @session_impl.hasError; end # If the +Session+ has been rendered invalid due to some exception, # this method will result in that exception being raised. # # If none have occurred, then no exceptions are raised. # # ==== Examples # # if @session.errors? # begin # @session.errors # rescue Exception => error # puts "An error occurred: #{error}" # end # end def errors; @session_impl.checkError; end end end end