summaryrefslogtreecommitdiff
path: root/otherlibs/threads/event.ml
blob: 74f457b06d416de442e28184001f4ec99ab79c57 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
(***********************************************************************)
(*                                                                     *)
(*                           Objective Caml                            *)
(*                                                                     *)
(*  David Nowak and Xavier Leroy, projet Cristal, INRIA Rocquencourt   *)
(*                                                                     *)
(*  Copyright 1996 Institut National de Recherche en Informatique et   *)
(*  en Automatique.  All rights reserved.  This file is distributed    *)
(*  under the terms of the GNU Library General Public License, with    *)
(*  the special exception on linking described in file ../../LICENSE.  *)
(*                                                                     *)
(***********************************************************************)

(* $Id$ *)

(* Events *)
type 'a basic_event =
  { poll: unit -> bool;
      (* If communication can take place immediately, return true. *)
    suspend: unit -> unit;
      (* Offer the communication on the channel and get ready
         to suspend current process. *)
    result: unit -> 'a }
      (* Return the result of the communication *)

type 'a behavior = int ref -> Condition.t -> int -> 'a basic_event

type 'a event =
    Communication of 'a behavior
  | Choose of 'a event list
  | WrapAbort of 'a event * (unit -> unit)
  | Guard of (unit -> 'a event)

(* Communication channels *)
type 'a channel =
  { mutable writes_pending: 'a communication Queue.t;
                        (* All offers to write on it *)
    mutable reads_pending:  'a communication Queue.t }
                        (* All offers to read from it *)

(* Communication offered *)
and 'a communication =
  { performed: int ref;  (* -1 if not performed yet, set to the number *)
                         (* of the matching communication after rendez-vous. *)
    condition: Condition.t;             (* To restart the blocked thread. *)
    mutable data: 'a option;            (* The data sent or received. *)
    event_number: int }                 (* Event number in select *)

(* Create a channel *)

let new_channel () =
  { writes_pending = Queue.create();
    reads_pending = Queue.create() }

(* Basic synchronization function *)

let masterlock = Mutex.create()

let do_aborts abort_env genev performed =
  if abort_env <> [] then begin
    if performed >= 0 then begin
      let ids_done = snd genev.(performed) in
      List.iter
        (fun (id,f) -> if not (List.mem id ids_done) then f ())
        abort_env
    end else begin
      List.iter (fun (_,f) -> f ()) abort_env
    end
  end

let basic_sync abort_env genev =
  let performed = ref (-1) in
  let condition = Condition.create() in
  let bev = Array.create (Array.length genev)
                         (fst (genev.(0)) performed condition 0) in
  for i = 1 to Array.length genev - 1 do
    bev.(i) <- (fst genev.(i)) performed condition i
  done;
  (* See if any of the events is already activable *)
  let rec poll_events i =
    if i >= Array.length bev
    then false
    else bev.(i).poll() || poll_events (i+1) in
  Mutex.lock masterlock;
  if not (poll_events 0) then begin
    (* Suspend on all events *)
    for i = 0 to Array.length bev - 1 do bev.(i).suspend() done;
    (* Wait until the condition is signalled *)
    Condition.wait condition masterlock
  end;
  Mutex.unlock masterlock;
  (* Extract the result *)
  if abort_env = [] then
    (* Preserve tail recursion *)
    bev.(!performed).result()
  else begin
    let num = !performed in
    let result = bev.(num).result() in
    (* Handle the aborts and return the result *)
    do_aborts abort_env genev num;
    result
  end

(* Apply a random permutation on an array *)

let scramble_array a =
  let len = Array.length a in
  if len = 0 then invalid_arg "Event.choose";
  for i = len - 1 downto 1 do
    let j = Random.int (i + 1) in
    let temp = a.(i) in a.(i) <- a.(j); a.(j) <- temp
  done;
  a

(* Main synchronization function *)

let gensym = let count = ref 0 in fun () -> incr count; !count

let rec flatten_event
      (abort_list : int list)
      (accu : ('a behavior * int list) list)
      (accu_abort : (int * (unit -> unit)) list)
      ev =
  match ev with
     Communication bev -> ((bev,abort_list) :: accu) , accu_abort
  | WrapAbort (ev,fn) ->
      let id = gensym () in
      flatten_event (id :: abort_list) accu ((id,fn)::accu_abort) ev
  | Choose evl ->
      let rec flatten_list accu' accu_abort'= function
         ev :: l ->
           let (accu'',accu_abort'') =
             flatten_event abort_list accu' accu_abort' ev in
           flatten_list accu'' accu_abort'' l
       | [] -> (accu',accu_abort') in
      flatten_list accu accu_abort evl
  | Guard fn -> flatten_event abort_list accu accu_abort (fn ())

let sync ev =
  let (evl,abort_env) = flatten_event [] [] [] ev in
  basic_sync abort_env (scramble_array(Array.of_list evl))

(* Event polling -- like sync, but non-blocking *)

let basic_poll abort_env genev =
  let performed = ref (-1) in
  let condition = Condition.create() in
  let bev = Array.create(Array.length genev)
                        (fst genev.(0) performed condition 0) in
  for i = 1 to Array.length genev - 1 do
    bev.(i) <- fst genev.(i) performed condition i
  done;
  (* See if any of the events is already activable *)
  let rec poll_events i =
    if i >= Array.length bev
    then false
    else bev.(i).poll() || poll_events (i+1) in
  Mutex.lock masterlock;
  let ready = poll_events 0 in
  if ready then begin
    (* Extract the result *)
    Mutex.unlock masterlock;
    let result = Some(bev.(!performed).result()) in
    do_aborts abort_env genev !performed; result
  end else begin
    (* Cancel the communication offers *)
    performed := 0;
    Mutex.unlock masterlock;
    do_aborts abort_env genev (-1);
    None
  end

let poll ev =
  let (evl,abort_env) = flatten_event [] [] [] ev in
  basic_poll abort_env (scramble_array(Array.of_list evl))

(* Remove all communication opportunities already synchronized *)

let cleanup_queue q =
  let q' = Queue.create() in
  Queue.iter (fun c -> if !(c.performed) = -1 then Queue.add c q') q;
  q'

(* Event construction *)

let always data =
  Communication(fun performed condition evnum ->
    { poll = (fun () -> performed := evnum; true);
      suspend = (fun () -> ());
      result = (fun () -> data) })

let send channel data =
  Communication(fun performed condition evnum ->
    let wcomm =
      { performed = performed;
        condition = condition;
        data = Some data;
        event_number = evnum } in
    { poll = (fun () ->
        let rec poll () =
          let rcomm = Queue.take channel.reads_pending in
          if !(rcomm.performed) >= 0 then
            poll ()
          else begin
            rcomm.data <- wcomm.data;
            performed := evnum;
            rcomm.performed := rcomm.event_number;
            Condition.signal rcomm.condition
          end in
        try
          poll();
          true
        with Queue.Empty ->
          false);
      suspend = (fun () ->
        channel.writes_pending <- cleanup_queue channel.writes_pending;
        Queue.add wcomm channel.writes_pending);
      result = (fun () -> ()) })

let receive channel =
  Communication(fun performed condition evnum ->
    let rcomm =
      { performed = performed;
        condition = condition;
        data = None;
        event_number = evnum } in
    { poll = (fun () ->
        let rec poll () =
          let wcomm = Queue.take channel.writes_pending in
          if !(wcomm.performed) >= 0 then
            poll ()
          else begin
            rcomm.data <- wcomm.data;
            performed := evnum;
            wcomm.performed := wcomm.event_number;
            Condition.signal wcomm.condition
          end in
        try
          poll();
          true
        with Queue.Empty ->
          false);
    suspend = (fun () ->
      channel.reads_pending <- cleanup_queue channel.reads_pending;
      Queue.add rcomm channel.reads_pending);
    result = (fun () ->
      match rcomm.data with
        None -> invalid_arg "Event.receive"
      | Some res -> res) })

let choose evl = Choose evl

let wrap_abort ev fn = WrapAbort(ev,fn)

let guard fn = Guard fn

let rec wrap ev fn =
  match ev with
    Communication genev ->
      Communication(fun performed condition evnum ->
        let bev = genev performed condition evnum in
        { poll = bev.poll;
          suspend = bev.suspend;
          result = (fun () -> fn(bev.result())) })
  | Choose evl ->
      Choose(List.map (fun ev -> wrap ev fn) evl)
  | WrapAbort (ev, f') ->
      WrapAbort (wrap ev fn, f')
  | Guard gu ->
      Guard(fun () -> wrap (gu()) fn)

(* Convenience functions *)

let select evl = sync(Choose evl)