summaryrefslogtreecommitdiff
path: root/cpp/bindings/qpid/ruby/lib/qpid_messaging/session.rb
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/bindings/qpid/ruby/lib/qpid_messaging/session.rb')
-rw-r--r--cpp/bindings/qpid/ruby/lib/qpid_messaging/session.rb264
1 files changed, 264 insertions, 0 deletions
diff --git a/cpp/bindings/qpid/ruby/lib/qpid_messaging/session.rb b/cpp/bindings/qpid/ruby/lib/qpid_messaging/session.rb
new file mode 100644
index 0000000000..7e6e11f654
--- /dev/null
+++ b/cpp/bindings/qpid/ruby/lib/qpid_messaging/session.rb
@@ -0,0 +1,264 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+module Qpid
+
+ module Messaging
+
+ # A +Session+ represents a distinct conversation between end points. They are
+ # created from an active (i.e., not closed) Connection.
+ #
+ # A +Session+ is used to acknowledge individual or all messages that have
+ # passed through it
+ class Session
+
+ def initialize(connection, session) # :nodoc:
+ @connection = connection
+ @session_impl = session
+ end
+
+ def session_impl # :nodoc:
+ @session_impl
+ end
+
+ # Returns the Connection associated with this session.
+ def connection
+ @connection
+ end
+
+ # Creates a new endpoint for sending messages.
+ #
+ # The address can either be an instance Address or else an
+ # address string.
+ #
+ # ==== Arguments
+ #
+ # * +address+ - the end point address.
+ def create_sender(address)
+ _address = address
+
+ if address.class == Qpid::Messaging::Address
+ _address = address.address_impl
+ end
+
+ sender_impl = @session_impl.createSender(_address)
+ sender_name = sender_impl.getName
+
+ Qpid::Messaging::Sender.new(self, sender_impl)
+ end
+
+ # Retrieves the Sender with the specified name.
+ #
+ # Raises an exception if no such Sender exists.
+ #
+ # ==== Arguments
+ #
+ # * +name+ - the name of the Sender
+ def sender(name)
+ Qpid::Messaging::Sender.new self, @session_impl.getSender(name)
+ end
+
+ # Creates a new endpoint for receiving messages.
+ #
+ # The +address+ can either be an instance Address or else an
+ # address string.
+ #
+ # ==== Arguments
+ #
+ # * +address+ - the end point address.
+ def create_receiver(address)
+ result = nil
+ receiver_impl = nil
+
+ if address.class == Qpid::Messaging::Address
+ address_impl = address.address_impl
+ receiver_impl = @session_impl.createReceiver address_impl
+ else
+ receiver_impl = @session_impl.createReceiver(address)
+ end
+
+ Qpid::Messaging::Receiver.new self, receiver_impl
+ end
+
+ # Retrieves the +Receiver+ with the specified name, or nil if no such
+ # Receiver exists.
+ #
+ # ==== Arguments
+ #
+ # * +name+ - the name of the Receiver
+ def receiver(name)
+ Qpid::Messaging::Receiver.new self, @session_impl.getReceiver(name)
+ end
+
+ # Closes the +Session+ and all associated +Sender+ and +Receiver+ instances.
+ #
+ # *NOTE:* All +Session+ instances for a Connection are closed when the
+ # Connection is closed. But closing a +Session+ does not affect the
+ # owning Connection.
+ def close; @session_impl.close; end
+
+ # Commits any pending transactions for a transactional session.
+ def commit; @session_impl.commit; end
+
+ # Rolls back any uncommitted transactions on a transactional session.
+ def rollback; @session_impl.rollback; end
+
+ # Acknowledges one or more outstanding messages that have been received
+ # on this session.
+ #
+ # ==== Arguments
+ #
+ # * +options+ - the set of options
+ #
+ # ==== Options
+ #
+ # * :message - if specified, then only that Message is acknowledged
+ # * :sync - if true, the call will block until processed by the broker
+ #
+ # ==== Examples
+ #
+ # # acknowledge all received messages
+ # session.acknowledge
+ #
+ # # acknowledge a single message
+ # session.acknowledge :message => message
+ #
+ # # acknowledge all messages, wait until the call finishes
+ # session.acknowledge :sync => true
+ #
+ #--
+ # TODO: Add an optional block to be used for blocking calls.
+ #++
+ def acknowledge(options = {})
+ sync = options[:sync] || false
+ message = options[:message] if options[:message]
+
+ unless message.nil?
+ @session_impl.acknowledge message.message_impl, sync
+ else
+ @session_impl.acknowledge sync
+ end
+ end
+
+ # Rejects the specified message. A rejected message will not be
+ # redelivered.
+ #
+ # NOTE: A message cannot be rejected once it has been acknowledged.
+ def reject(message); @session_impl.reject message.message_impl; end
+
+ # Releases the message, which allows the broker to attempt to
+ # redeliver it.
+ #
+ # NOTE: A message connot be released once it has been acknowled.
+ def release(message); @session_impl.release message.message_impl; end
+
+ # Requests synchronization with the broker.
+ #
+ # ==== Arguments
+ #
+ # * +options+ - the list of options
+ #
+ # ==== Options
+ #
+ # * +:block+ - if true, the call blocks until the broker acknowledges it
+ #
+ #--
+ # TODO: Add an optional block to be used for blocking calls.
+ #++
+ def sync(args = {})
+ block = args[:block] || false
+ @session_impl.sync block
+ end
+
+ # Returns the total number of receivable messages, and messages already
+ # received, by Receiver instances associated with this +Session+.
+ def receivable; @session_impl.getReceivable; end
+
+ # Returns the number of messages that have been acknowledged by this
+ # +Session+ whose acknowledgements have not been confirmed as processed
+ # by the broker.
+ def unsettled_acks; @session_impl.getUnsettledAcks; end
+
+ # Fetches the next Receiver with a message pending. Waits the specified
+ # number of milliseconds before timing out.
+ #
+ # For a Receiver to be returned, it must have a capacity > 0 and have
+ # Messages locally queued.
+ #
+ # If no Receiver is found within the time out period, then a MessageError
+ # is raised.
+ #
+ # ==== Arguments
+ #
+ # * +timeout+ - the duration
+ #
+ # ==== Examples
+ #
+ # loop do
+ #
+ # begin
+ # # wait a maximum of one minute for the next receiver to be ready
+ # recv = session.next_receiver Qpid::Messaging::Duration::MINUTE
+ #
+ # # get and dispatch the message
+ # msg = recv.get
+ # dispatch_message msg
+ #
+ # rescue
+ # puts "No receivers were returned"
+ # end
+ #
+ # end
+ def next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block)
+ receiver_impl = @session_impl.nextReceiver(timeout.duration_impl)
+
+ unless receiver_impl.nil?
+ recv = Qpid::Messaging::Receiver.new self, receiver_impl
+ block.call recv unless block.nil?
+ end
+
+ return recv
+ end
+
+ # Returns true if there were exceptions on this session.
+ def errors?; @session_impl.hasError; end
+
+ # If the +Session+ has been rendered invalid due to some exception,
+ # this method will result in that exception being raised.
+ #
+ # If none have occurred, then no exceptions are raised.
+ #
+ # ==== Examples
+ #
+ # # show any errors that occurred during the Session
+ # if @session.errors?
+ # begin
+ # @session.errors
+ # rescue Exception => error
+ # puts "An error occurred: #{error}"
+ # end
+ # end
+ def errors; @session_impl.checkError; end
+
+ end
+
+ end
+
+end
+