1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
#
# thwait.rb - thread synchronization class
# $Release Version: 0.9 $
# $Revision: 1.3 $
# $Date: 1998/06/26 03:19:34 $
# by Keiju ISHITSUKA(Nihpon Rational Software Co.,Ltd.)
#
# --
# feature:
# provides synchronization for multiple threads.
#
# class methods:
# * ThreadsWait.all_waits(thread1,...)
# waits until all of specified threads are terminated.
# if a block is supplied for the method, evaluates it for
# each thread termination.
# * th = ThreadsWait.new(thread1,...)
# creates synchronization object, specifying thread(s) to wait.
#
# methods:
# * th.threads
# list threads to be synchronized
# * th.empty?
# is there any thread to be synchronized.
# * th.finished?
# is there already terminated thread.
# * th.join(thread1,...)
# wait for specified thread(s).
# * th.join_nowait(threa1,...)
# specifies thread(s) to wait. non-blocking.
# * th.next_wait
# waits until any of specified threads is terminated.
# * th.all_waits
# waits until all of specified threads are terminated.
# if a block is supplied for the method, evaluates it for
# each thread termination.
#
require "thread.rb"
require "e2mmap.rb"
class ThreadsWait
RCS_ID='-$Id: thwait.rb,v 1.3 1998/06/26 03:19:34 keiju Exp keiju $-'
Exception2MessageMapper.extend_to(binding)
def_exception("ErrNoWaitingThread", "No threads for waiting.")
def_exception("ErrNoFinishedThread", "No finished threads.")
def ThreadsWait.all_waits(*threads)
tw = ThreadsWait.new(*threads)
if block_given?
tw.all_waits do |th|
yield th
end
else
tw.all_waits
end
end
def initialize(*threads)
@threads = []
@wait_queue = Queue.new
join_nowait(*threads) unless threads.empty?
end
# accessing
# threads - list threads to be synchronized
attr :threads
# testing
# empty?
# finished?
# is there any thread to be synchronized.
def empty?
@threads.empty?
end
# is there already terminated thread.
def finished?
!@wait_queue.empty?
end
# main process:
# join
# join_nowait
# next_wait
# all_wait
# adds thread(s) to join, waits for any of waiting threads to terminate.
def join(*threads)
join_nowait(*threads)
next_wait
end
# adds thread(s) to join, no wait.
def join_nowait(*threads)
threads.flatten!
@threads.concat threads
for th in threads
Thread.start(th) do |t|
t.join
@wait_queue.push t
end
end
end
# waits for any of waiting threads to terminate
# if there is no thread to wait, raises ErrNoWaitingThread.
# if `nonblock' is true, and there is no terminated thread,
# raises ErrNoFinishedThread.
def next_wait(nonblock = nil)
ThreadsWait.fail ErrNoWaitingThread if @threads.empty?
begin
@threads.delete(th = @wait_queue.pop(nonblock))
th
rescue ThreadError
ThreadsWait.fail ErrNoFinishedThread
end
end
# waits until all of specified threads are terminated.
# if a block is supplied for the method, evaluates it for
# each thread termination.
def all_waits
until @threads.empty?
th = next_wait
yield th if block_given?
end
end
end
ThWait = ThreadsWait
|