# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # require 'cqpid' require 'qpid/duration' module Qpid module Messaging # Receiver is the entity through which messages are received. # # An instance of Receiver can only be created using an active (not # previously closed) Session. # # ==== Example # # conn = Qpid::Messaging::Connection.new :url => "mybroker:5762" # conn.open # session = conn.create_session # receiver = session.create_receiver "my-sender-queue" class Receiver def initialize(session, receiver_impl) # :nodoc: @session = session @receiver_impl = receiver_impl end def receiver_impl # :nodoc: @receiver_impl end # Retrieves a message from the local queue, or waits for up to # the duration specified for one to become available. # # If a block is given, then it will be invaked after the next message # is received or the call times out, passing in the message or nil # respectively. # # ==== Options # * duration - the timeout to wait (def. Duration::FOREVER) # # ==== Examples # # msg = rcvr.get # Uses the default timeout of forever # # msg = rcvr.get Qpid::Messaging::Duration::IMMEDIATE # returns a message or exits immediately # # # passes in a block to handle the received message # rcvr.get Qpid::Messaging::Duration::SECOND do |message| # if message.nil? # puts "No message was received." # else # puts "Received this message: #{message.content}" # end # end def get(duration = Qpid::Messaging::Duration::FOREVER) message_impl = @receiver_impl.get duration.duration_impl create_message_wrapper message_impl unless message_impl.nil? end # Retrieves a message from the receiver's subscription, or waits # for up to the duration specified for one to become available. # # If a block is given, then it will be invaked after the next message # is received or the call times out, passing in the message or nil # respectively. # # ==== Options # * duration - the timeout to wait (def. Duration::FOREVER) # # ==== Examples # # msg = rcvr.fetch # Uses the default timeout of forever # # msg = rcvr.fetch Qpid::Messaging::Duration::IMMEDIATE # returns a message or exits immediately # # # passes in a block to handle the received message # rcvr.fetch Qpid::Messaging::Duration::SECOND do |message| # if message.nil? # puts "No message was received." # else # puts "Received this message: #{message.content}" # end # end def fetch(duration = Qpid::Messaging::Duration::FOREVER) message_impl = @receiver_impl.fetch duration.duration_impl create_message_wrapper message_impl unless message_impl.nil? end # Sets the capacity for this +Receiver+. # # ==== Options # # * capacity - the capacity # # ==== Examples # # receiver.capacity = 50 # sets the incoming capacity to 50 messages # def capacity=(capacity); @receiver_impl.setCapacity capacity; end # Returns the capacity. # # # The capacity is the numnber of incoming messages that can be held # locally before being fetched. # # ==== Examples # # puts "The receiver can hold #{rcv.capacity} messages." # def capacity; @receiver_impl.getCapacity; end # Returns the number of slots for receiving messages. # # This differs from +capacity+ in that it is the available slots in # the capacity for holding incoming messages, where available <= capacity. # # ==== Examples # # puts "You can receive #{rcv.available} messages before blocking." # def available; @receiver_impl.getAvailable; end # Returns the number of messages that have been received and acknowledged # but whose acknowledgements have not been confirmed by the sender. # # ==== Examples # # puts "You have #{rcv.unsettled} messages to be confirmed." # def unsettled; @receiver_impl.getUnsettled; end # Closes this +Receiver+. # # This does not affect the +Session+. def close; @receiver_impl.close; end # Returns whether the receiver is closed. # # ==== Examples # # recv.close unless recv.closed? # def closed?; @receiver_impl.isClosed; end # Returns the name of this +Receiver+. # # ==== Examples # # puts "Receiver: #{recv.name}" def name; @receiver_impl.getName; end # Returns the Session for this +Receiver+. def session; @session; end private def create_message_wrapper message_impl # :nodoc: Qpid::Messaging::Message.new(:impl => message_impl) end end end end