diff options
Diffstat (limited to 'cpp/bindings/qpid/ruby/lib/qpid_messaging/session.rb')
-rw-r--r-- | cpp/bindings/qpid/ruby/lib/qpid_messaging/session.rb | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/cpp/bindings/qpid/ruby/lib/qpid_messaging/session.rb b/cpp/bindings/qpid/ruby/lib/qpid_messaging/session.rb new file mode 100644 index 0000000000..7e6e11f654 --- /dev/null +++ b/cpp/bindings/qpid/ruby/lib/qpid_messaging/session.rb @@ -0,0 +1,264 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +module Qpid + + module Messaging + + # A +Session+ represents a distinct conversation between end points. They are + # created from an active (i.e., not closed) Connection. + # + # A +Session+ is used to acknowledge individual or all messages that have + # passed through it + class Session + + def initialize(connection, session) # :nodoc: + @connection = connection + @session_impl = session + end + + def session_impl # :nodoc: + @session_impl + end + + # Returns the Connection associated with this session. + def connection + @connection + end + + # Creates a new endpoint for sending messages. + # + # The address can either be an instance Address or else an + # address string. + # + # ==== Arguments + # + # * +address+ - the end point address. + def create_sender(address) + _address = address + + if address.class == Qpid::Messaging::Address + _address = address.address_impl + end + + sender_impl = @session_impl.createSender(_address) + sender_name = sender_impl.getName + + Qpid::Messaging::Sender.new(self, sender_impl) + end + + # Retrieves the Sender with the specified name. + # + # Raises an exception if no such Sender exists. + # + # ==== Arguments + # + # * +name+ - the name of the Sender + def sender(name) + Qpid::Messaging::Sender.new self, @session_impl.getSender(name) + end + + # Creates a new endpoint for receiving messages. + # + # The +address+ can either be an instance Address or else an + # address string. + # + # ==== Arguments + # + # * +address+ - the end point address. + def create_receiver(address) + result = nil + receiver_impl = nil + + if address.class == Qpid::Messaging::Address + address_impl = address.address_impl + receiver_impl = @session_impl.createReceiver address_impl + else + receiver_impl = @session_impl.createReceiver(address) + end + + Qpid::Messaging::Receiver.new self, receiver_impl + end + + # Retrieves the +Receiver+ with the specified name, or nil if no such + # Receiver exists. + # + # ==== Arguments + # + # * +name+ - the name of the Receiver + def receiver(name) + Qpid::Messaging::Receiver.new self, @session_impl.getReceiver(name) + end + + # Closes the +Session+ and all associated +Sender+ and +Receiver+ instances. + # + # *NOTE:* All +Session+ instances for a Connection are closed when the + # Connection is closed. But closing a +Session+ does not affect the + # owning Connection. + def close; @session_impl.close; end + + # Commits any pending transactions for a transactional session. + def commit; @session_impl.commit; end + + # Rolls back any uncommitted transactions on a transactional session. + def rollback; @session_impl.rollback; end + + # Acknowledges one or more outstanding messages that have been received + # on this session. + # + # ==== Arguments + # + # * +options+ - the set of options + # + # ==== Options + # + # * :message - if specified, then only that Message is acknowledged + # * :sync - if true, the call will block until processed by the broker + # + # ==== Examples + # + # # acknowledge all received messages + # session.acknowledge + # + # # acknowledge a single message + # session.acknowledge :message => message + # + # # acknowledge all messages, wait until the call finishes + # session.acknowledge :sync => true + # + #-- + # TODO: Add an optional block to be used for blocking calls. + #++ + def acknowledge(options = {}) + sync = options[:sync] || false + message = options[:message] if options[:message] + + unless message.nil? + @session_impl.acknowledge message.message_impl, sync + else + @session_impl.acknowledge sync + end + end + + # Rejects the specified message. A rejected message will not be + # redelivered. + # + # NOTE: A message cannot be rejected once it has been acknowledged. + def reject(message); @session_impl.reject message.message_impl; end + + # Releases the message, which allows the broker to attempt to + # redeliver it. + # + # NOTE: A message connot be released once it has been acknowled. + def release(message); @session_impl.release message.message_impl; end + + # Requests synchronization with the broker. + # + # ==== Arguments + # + # * +options+ - the list of options + # + # ==== Options + # + # * +:block+ - if true, the call blocks until the broker acknowledges it + # + #-- + # TODO: Add an optional block to be used for blocking calls. + #++ + def sync(args = {}) + block = args[:block] || false + @session_impl.sync block + end + + # Returns the total number of receivable messages, and messages already + # received, by Receiver instances associated with this +Session+. + def receivable; @session_impl.getReceivable; end + + # Returns the number of messages that have been acknowledged by this + # +Session+ whose acknowledgements have not been confirmed as processed + # by the broker. + def unsettled_acks; @session_impl.getUnsettledAcks; end + + # Fetches the next Receiver with a message pending. Waits the specified + # number of milliseconds before timing out. + # + # For a Receiver to be returned, it must have a capacity > 0 and have + # Messages locally queued. + # + # If no Receiver is found within the time out period, then a MessageError + # is raised. + # + # ==== Arguments + # + # * +timeout+ - the duration + # + # ==== Examples + # + # loop do + # + # begin + # # wait a maximum of one minute for the next receiver to be ready + # recv = session.next_receiver Qpid::Messaging::Duration::MINUTE + # + # # get and dispatch the message + # msg = recv.get + # dispatch_message msg + # + # rescue + # puts "No receivers were returned" + # end + # + # end + def next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block) + receiver_impl = @session_impl.nextReceiver(timeout.duration_impl) + + unless receiver_impl.nil? + recv = Qpid::Messaging::Receiver.new self, receiver_impl + block.call recv unless block.nil? + end + + return recv + end + + # Returns true if there were exceptions on this session. + def errors?; @session_impl.hasError; end + + # If the +Session+ has been rendered invalid due to some exception, + # this method will result in that exception being raised. + # + # If none have occurred, then no exceptions are raised. + # + # ==== Examples + # + # # show any errors that occurred during the Session + # if @session.errors? + # begin + # @session.errors + # rescue Exception => error + # puts "An error occurred: #{error}" + # end + # end + def errors; @session_impl.checkError; end + + end + + end + +end + |