1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
%
% (c) The GRASP/AQUA Project, Glasgow University, 1995
%
\section[Merge]{Mergeing streams}
Avoiding the loss of ref. transparency by attaching the merge to the
IO monad.
\begin{code}
module Merge
(
mergeIO, --:: [a] -> [a] -> IO [a]
nmergeIO --:: [[a]] -> IO [a]
) where
import Semaphore
import PreludeGlaST
import Concurrent ( forkIO )
import PreludePrimIO ( newEmptyMVar, newMVar, putMVar,
readMVar, takeMVar, _MVar
)
\end{code}
\begin{code}
max_buff_size = 1
mergeIO :: [a] -> [a] -> IO [a]
nmergeIO :: [[a]] -> IO [a]
#ifndef __CONCURRENT_HASKELL__
mergeIO _ _ = return []
nmergeIO _ = return []
#else
mergeIO ls rs
= newEmptyMVar >>= \ tail_node ->
newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar 2 >>= \ branches_running ->
let
buff = (tail_list,e)
in
forkIO (suckIO branches_running buff ls) >>
forkIO (suckIO branches_running buff rs) >>
takeMVar tail_node >>= \ val ->
signalQSem e >>
return val
type Buffer a
= (_MVar (_MVar [a]), QSem)
suckIO :: _MVar Int -> Buffer a -> [a] -> IO ()
suckIO branches_running buff@(tail_list,e) vs
= case vs of
[] -> takeMVar branches_running >>= \ val ->
if val == 1 then
takeMVar tail_list >>= \ node ->
putMVar node [] >>
putMVar tail_list node
else
putMVar branches_running (val-1)
(x:xs) ->
waitQSem e >>
takeMVar tail_list >>= \ node ->
newEmptyMVar >>= \ next_node ->
unsafeInterleavePrimIO (
takeMVar next_node `thenPrimIO` \ (Right x) ->
signalQSem e `seqPrimIO`
returnPrimIO x) `thenPrimIO` \ next_node_val ->
putMVar node (x:next_node_val) >>
putMVar tail_list next_node >>
suckIO branches_running buff xs
nmergeIO lss
= let
len = length lss
in
newEmptyMVar >>= \ tail_node ->
newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar len >>= \ branches_running ->
let
buff = (tail_list,e)
in
mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
takeMVar tail_node >>= \ val ->
signalQSem e >>
return val
where
mapIO f xs = accumulate (map f xs)
\end{code}
So as to avoid creating a mutual recursive module dep. with @Concurrent.lhs@,
the defn. of @forkIO@ is duplicated here:
\begin{code}
{- HAH! WDP 95/07
forkIO :: PrimIO a -> PrimIO a
forkIO action s
= let
(r, new_s) = action s
in
new_s `_fork_` (r, s)
where
_fork_ x y = case (fork# x) of { 0# -> parError#; _ -> y }
-}
#endif {- __CONCURRENT_HASKELL__ -}
\end{code}
|