diff options
author | Mark H Weaver <mhw@netris.org> | 2013-11-17 17:37:37 -0500 |
---|---|---|
committer | Mark H Weaver <mhw@netris.org> | 2013-11-17 17:37:37 -0500 |
commit | f06104aac64b5b552372fe93dac65db4e2dc56a6 (patch) | |
tree | 8f7b3c186fdae8f6b6c430c4c6ecb493b777d354 | |
parent | 7bc28986ebdacbe77a43c52f36645c20b2bdf442 (diff) | |
download | guile-f06104aac64b5b552372fe93dac65db4e2dc56a6.tar.gz |
Rewrite par-map, par-for-each, n-par-map and n-par-for-each.alt-par-map-and-thread-safe-popen
* module/ice-9/threads.scm (par-map, par-for-each, n-par-map,
n-par-for-each): Reimplement without using futures.
-rw-r--r-- | module/ice-9/threads.scm | 123 |
1 files changed, 66 insertions, 57 deletions
diff --git a/module/ice-9/threads.scm b/module/ice-9/threads.scm index 9f9e1bf8e..cfc281893 100644 --- a/module/ice-9/threads.scm +++ b/module/ice-9/threads.scm @@ -1,5 +1,5 @@ ;;;; Copyright (C) 1996, 1998, 2001, 2002, 2003, 2006, 2010, 2011, -;;;; 2012 Free Software Foundation, Inc. +;;;; 2012, 2013 Free Software Foundation, Inc. ;;;; ;;;; This library is free software; you can redistribute it and/or ;;;; modify it under the terms of the GNU Lesser General Public @@ -20,6 +20,7 @@ ;;;; 4 March 1996, Anthony Green <green@cygnus.com> ;;;; Modified 5 October 1996, MDJ <djurfeldt@nada.kth.se> ;;;; Modified 6 April 2001, ttn +;;;; Modified 17 November 2013, Mark H Weaver <mhw@netris.org> ;;;; ---------------------------------------------------------------- ;;;; @@ -33,6 +34,7 @@ ;;; Code: (define-module (ice-9 threads) + #:use-module (srfi srfi-1) #:use-module (ice-9 futures) #:use-module (ice-9 match) #:export (begin-thread @@ -89,62 +91,69 @@ (with-mutex (make-mutex) first rest ...)) -(define (par-mapper mapper cons) - (lambda (proc . lists) - (let loop ((lists lists)) - (match lists - (((heads tails ...) ...) - (let ((tail (future (loop tails))) - (head (apply proc heads))) - (cons head (touch tail)))) - (_ - '()))))) - -(define par-map (par-mapper map cons)) -(define par-for-each (par-mapper for-each (const *unspecified*))) - -(define (n-par-map n proc . arglists) - (let* ((m (make-mutex)) - (threads '()) - (results (make-list (length (car arglists)))) - (result results)) - (do ((i 0 (+ 1 i))) - ((= i n) - (for-each join-thread threads) - results) - (set! threads - (cons (begin-thread - (let loop () - (lock-mutex m) - (if (null? result) - (unlock-mutex m) - (let ((args (map car arglists)) - (my-result result)) - (set! arglists (map cdr arglists)) - (set! result (cdr result)) - (unlock-mutex m) - (set-car! my-result (apply proc args)) - (loop))))) - threads))))) - -(define (n-par-for-each n proc . arglists) - (let ((m (make-mutex)) - (threads '())) - (do ((i 0 (+ 1 i))) - ((= i n) - (for-each join-thread threads)) - (set! threads - (cons (begin-thread - (let loop () - (lock-mutex m) - (if (null? (car arglists)) - (unlock-mutex m) - (let ((args (map car arglists))) - (set! arglists (map cdr arglists)) - (unlock-mutex m) - (apply proc args) - (loop))))) - threads))))) +(define (n-par-for-each num-threads proc lst . lsts) + (let ((lsts (cons lst lsts)) + (mutex (make-mutex)) + (exception #f)) + + (define (get-next-args) + (with-mutex mutex + (and (not (any null? lsts)) + (let ((args (map car lsts))) + (set! lsts (map cdr lsts)) + args)))) + + (define (worker) + (let loop () + (let ((args (get-next-args))) + (when args + (apply proc args) + (loop))))) + + (define (handler . key-args) + (with-mutex mutex + (unless exception + (set! exception key-args) + (set! lsts '(())) + ;; FIXME two threads in this handler might cancel each other + ;; before the other threads are cancelled. + (for-each cancel-thread (delq (current-thread) threads))))) + + (define threads + (map (lambda (_) + (call-with-new-thread worker handler)) + (iota num-threads))) + + (for-each join-thread threads) + + (when exception + (apply throw exception)))) + +(define (n-par-map num-threads proc lst . lsts) + (let* ((lsts (cons lst lsts)) + (count (apply min (map length lsts))) + (results (make-vector count))) + (apply n-par-for-each + num-threads + (lambda (i . args) + (vector-set! results i (apply proc args))) + (iota count) + lsts) + (vector->list results))) + +(define (par-for-each proc lst . lsts) + (apply n-par-for-each + (current-processor-count) + proc + lst + lsts)) + +(define (par-map proc lst . lsts) + (apply n-par-map + (current-processor-count) + proc + lst + lsts)) ;;; The following procedure is motivated by the common and important ;;; case where a lot of work should be done, (not too much) in parallel, |