diff options
Diffstat (limited to 'qpid/ruby/lib/qpid/queue.rb')
-rw-r--r-- | qpid/ruby/lib/qpid/queue.rb | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/qpid/ruby/lib/qpid/queue.rb b/qpid/ruby/lib/qpid/queue.rb new file mode 100644 index 0000000000..4150173b53 --- /dev/null +++ b/qpid/ruby/lib/qpid/queue.rb @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Augment the standard python multithreaded Queue implementation to add a +# close() method so that threads blocking on the content of a queue can be +# notified if the queue is no longer in use. + +require 'thread' + +# Python nominally uses a bounded queue, but the code never establishes +# a maximum size; we therefore use Ruby's unbounded queue +class Qpid::Queue < ::Queue + + DONE = Object.new + STOP = Object.new + + def initialize + super + @error = nil + @listener = nil + @exc_listener = nil + @exc_listener_lock = Monitor.new + @thread = nil + end + + def close(error = nil) + @error = error + put(DONE) + unless @thread.nil? + @thread.join() + @thread = nil + end + end + + def get(block = true, timeout = nil) + unless timeout.nil? + raise NotImplementedError + end + result = pop(! block) + if result == DONE + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Qpid::Closed exception + put(DONE) + raise Qpid::Closed.new(@error) + else + return result + end + end + + alias :put :push + + def exc_listen(&block) + @exc_listener_lock.synchronize do + @exc_listener = block + end + end + + def listen(&block) + if ! block_given? && @thread + put(STOP) + @thread.join() + @thread = nil + end + + # FIXME: There is a potential race since we could be changing one + # non-nil listener to another + @listener = block + + if block_given? && @thread.nil? + @thread = Thread.new do + loop do + begin + o = get() + break if o == STOP + @listener.call(o) + rescue Qpid::Closed => e + @exc_listener.call(e) if @exc_listener + break + end + end + end + end + end + +end |