# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Augment the standard python multithreaded Queue implementation to add a # close() method so that threads blocking on the content of a queue can be # notified if the queue is no longer in use. require 'thread' # Python nominally uses a bounded queue, but the code never establishes # a maximum size; we therefore use Ruby's unbounded queue class Qpid::Queue < ::Queue DONE = Object.new STOP = Object.new def initialize super @error = nil @listener = nil @exc_listener = nil @exc_listener_lock = Monitor.new @thread = nil end def close(error = nil) @error = error put(DONE) unless @thread.nil? @thread.join() @thread = nil end end def get(block = true, timeout = nil) unless timeout.nil? raise NotImplementedError end result = pop(! block) if result == DONE # this guarantees that any other waiting threads or any future # calls to get will also result in a Qpid::Closed exception put(DONE) raise Qpid::Closed.new(@error) else return result end end alias :put :push def exc_listen(&block) @exc_listener_lock.synchronize do @exc_listener = block end end def listen(&block) if ! block_given? && @thread put(STOP) @thread.join() @thread = nil end # FIXME: There is a potential race since we could be changing one # non-nil listener to another @listener = block if block_given? && @thread.nil? @thread = Thread.new do loop do begin o = get() break if o == STOP @listener.call(o) rescue Qpid::Closed => e @exc_listener.call(e) if @exc_listener break end end end end end end