summaryrefslogtreecommitdiff
path: root/qpid/ruby/lib/qpid/queue.rb
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/ruby/lib/qpid/queue.rb')
-rw-r--r--qpid/ruby/lib/qpid/queue.rb101
1 files changed, 101 insertions, 0 deletions
diff --git a/qpid/ruby/lib/qpid/queue.rb b/qpid/ruby/lib/qpid/queue.rb
new file mode 100644
index 0000000000..4150173b53
--- /dev/null
+++ b/qpid/ruby/lib/qpid/queue.rb
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Augment the standard python multithreaded Queue implementation to add a
+# close() method so that threads blocking on the content of a queue can be
+# notified if the queue is no longer in use.
+
+require 'thread'
+
+# Python nominally uses a bounded queue, but the code never establishes
+# a maximum size; we therefore use Ruby's unbounded queue
+class Qpid::Queue < ::Queue
+
+ DONE = Object.new
+ STOP = Object.new
+
+ def initialize
+ super
+ @error = nil
+ @listener = nil
+ @exc_listener = nil
+ @exc_listener_lock = Monitor.new
+ @thread = nil
+ end
+
+ def close(error = nil)
+ @error = error
+ put(DONE)
+ unless @thread.nil?
+ @thread.join()
+ @thread = nil
+ end
+ end
+
+ def get(block = true, timeout = nil)
+ unless timeout.nil?
+ raise NotImplementedError
+ end
+ result = pop(! block)
+ if result == DONE
+ # this guarantees that any other waiting threads or any future
+ # calls to get will also result in a Qpid::Closed exception
+ put(DONE)
+ raise Qpid::Closed.new(@error)
+ else
+ return result
+ end
+ end
+
+ alias :put :push
+
+ def exc_listen(&block)
+ @exc_listener_lock.synchronize do
+ @exc_listener = block
+ end
+ end
+
+ def listen(&block)
+ if ! block_given? && @thread
+ put(STOP)
+ @thread.join()
+ @thread = nil
+ end
+
+ # FIXME: There is a potential race since we could be changing one
+ # non-nil listener to another
+ @listener = block
+
+ if block_given? && @thread.nil?
+ @thread = Thread.new do
+ loop do
+ begin
+ o = get()
+ break if o == STOP
+ @listener.call(o)
+ rescue Qpid::Closed => e
+ @exc_listener.call(e) if @exc_listener
+ break
+ end
+ end
+ end
+ end
+ end
+
+end