summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark H Weaver <mhw@netris.org>2013-11-17 17:37:37 -0500
committerMark H Weaver <mhw@netris.org>2013-11-17 17:37:37 -0500
commitf06104aac64b5b552372fe93dac65db4e2dc56a6 (patch)
tree8f7b3c186fdae8f6b6c430c4c6ecb493b777d354
parent7bc28986ebdacbe77a43c52f36645c20b2bdf442 (diff)
downloadguile-f06104aac64b5b552372fe93dac65db4e2dc56a6.tar.gz
Rewrite par-map, par-for-each, n-par-map and n-par-for-each.alt-par-map-and-thread-safe-popen
* module/ice-9/threads.scm (par-map, par-for-each, n-par-map, n-par-for-each): Reimplement without using futures.
-rw-r--r--module/ice-9/threads.scm123
1 files changed, 66 insertions, 57 deletions
diff --git a/module/ice-9/threads.scm b/module/ice-9/threads.scm
index 9f9e1bf8e..cfc281893 100644
--- a/module/ice-9/threads.scm
+++ b/module/ice-9/threads.scm
@@ -1,5 +1,5 @@
;;;; Copyright (C) 1996, 1998, 2001, 2002, 2003, 2006, 2010, 2011,
-;;;; 2012 Free Software Foundation, Inc.
+;;;; 2012, 2013 Free Software Foundation, Inc.
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
@@ -20,6 +20,7 @@
;;;; 4 March 1996, Anthony Green <green@cygnus.com>
;;;; Modified 5 October 1996, MDJ <djurfeldt@nada.kth.se>
;;;; Modified 6 April 2001, ttn
+;;;; Modified 17 November 2013, Mark H Weaver <mhw@netris.org>
;;;; ----------------------------------------------------------------
;;;;
@@ -33,6 +34,7 @@
;;; Code:
(define-module (ice-9 threads)
+ #:use-module (srfi srfi-1)
#:use-module (ice-9 futures)
#:use-module (ice-9 match)
#:export (begin-thread
@@ -89,62 +91,69 @@
(with-mutex (make-mutex)
first rest ...))
-(define (par-mapper mapper cons)
- (lambda (proc . lists)
- (let loop ((lists lists))
- (match lists
- (((heads tails ...) ...)
- (let ((tail (future (loop tails)))
- (head (apply proc heads)))
- (cons head (touch tail))))
- (_
- '())))))
-
-(define par-map (par-mapper map cons))
-(define par-for-each (par-mapper for-each (const *unspecified*)))
-
-(define (n-par-map n proc . arglists)
- (let* ((m (make-mutex))
- (threads '())
- (results (make-list (length (car arglists))))
- (result results))
- (do ((i 0 (+ 1 i)))
- ((= i n)
- (for-each join-thread threads)
- results)
- (set! threads
- (cons (begin-thread
- (let loop ()
- (lock-mutex m)
- (if (null? result)
- (unlock-mutex m)
- (let ((args (map car arglists))
- (my-result result))
- (set! arglists (map cdr arglists))
- (set! result (cdr result))
- (unlock-mutex m)
- (set-car! my-result (apply proc args))
- (loop)))))
- threads)))))
-
-(define (n-par-for-each n proc . arglists)
- (let ((m (make-mutex))
- (threads '()))
- (do ((i 0 (+ 1 i)))
- ((= i n)
- (for-each join-thread threads))
- (set! threads
- (cons (begin-thread
- (let loop ()
- (lock-mutex m)
- (if (null? (car arglists))
- (unlock-mutex m)
- (let ((args (map car arglists)))
- (set! arglists (map cdr arglists))
- (unlock-mutex m)
- (apply proc args)
- (loop)))))
- threads)))))
+(define (n-par-for-each num-threads proc lst . lsts)
+ (let ((lsts (cons lst lsts))
+ (mutex (make-mutex))
+ (exception #f))
+
+ (define (get-next-args)
+ (with-mutex mutex
+ (and (not (any null? lsts))
+ (let ((args (map car lsts)))
+ (set! lsts (map cdr lsts))
+ args))))
+
+ (define (worker)
+ (let loop ()
+ (let ((args (get-next-args)))
+ (when args
+ (apply proc args)
+ (loop)))))
+
+ (define (handler . key-args)
+ (with-mutex mutex
+ (unless exception
+ (set! exception key-args)
+ (set! lsts '(()))
+ ;; FIXME two threads in this handler might cancel each other
+ ;; before the other threads are cancelled.
+ (for-each cancel-thread (delq (current-thread) threads)))))
+
+ (define threads
+ (map (lambda (_)
+ (call-with-new-thread worker handler))
+ (iota num-threads)))
+
+ (for-each join-thread threads)
+
+ (when exception
+ (apply throw exception))))
+
+(define (n-par-map num-threads proc lst . lsts)
+ (let* ((lsts (cons lst lsts))
+ (count (apply min (map length lsts)))
+ (results (make-vector count)))
+ (apply n-par-for-each
+ num-threads
+ (lambda (i . args)
+ (vector-set! results i (apply proc args)))
+ (iota count)
+ lsts)
+ (vector->list results)))
+
+(define (par-for-each proc lst . lsts)
+ (apply n-par-for-each
+ (current-processor-count)
+ proc
+ lst
+ lsts))
+
+(define (par-map proc lst . lsts)
+ (apply n-par-map
+ (current-processor-count)
+ proc
+ lst
+ lsts))
;;; The following procedure is motivated by the common and important
;;; case where a lot of work should be done, (not too much) in parallel,