summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Wingo <wingo@pobox.com>2012-02-07 21:41:36 +0100
committerAndy Wingo <wingo@pobox.com>2012-03-12 17:12:37 +0100
commitf20ae551a1666e93df694060bce625a48c2c3767 (patch)
treee789d956f0e091ac34d14a3adc8bc4f5e80b9cc6
parent31a04ee23982f2834fb19c89b5356bc551108e6d (diff)
downloadguile-f20ae551a1666e93df694060bce625a48c2c3767.tar.gz
http web server: allow concurrent read operations
* module/web/server/http.scm (<http-server>): Add fields for a reader thread-pool, and some async queues that it operates on. Also, a flag, http-threaded?. (http-open): Add #:threaded? and #:read-workers kwargs. Create a thread pool for reading if threads are available. (read-request!): New function, factored out of http-read. (enqueue-read!, http-read): Instead of reading the client directly, enqueue a read. In the case where threads are not available, this will call read-request! directly. read-request! takes care of adding to the handle-queue. The read polling loop will pop items off the handle-queue. (seconds-from-now, async-queue-for-each): New helpers. (http-write): Shut down the queues and threads, hopefully in a nonblocking fashion.
-rw-r--r--module/web/server/http.scm133
1 files changed, 104 insertions, 29 deletions
diff --git a/module/web/server/http.scm b/module/web/server/http.scm
index ace70b840..a88b480c7 100644
--- a/module/web/server/http.scm
+++ b/module/web/server/http.scm
@@ -35,7 +35,10 @@
#:use-module (web request)
#:use-module (web response)
#:use-module (web server)
- #:use-module (ice-9 poll))
+ #:use-module (system repl error-handling)
+ #:use-module (ice-9 poll)
+ #:use-module (ice-9 async-queue)
+ #:use-module (ice-9 thread-pool))
(define (make-default-socket family addr port)
@@ -63,12 +66,18 @@
(get-u8 port)))
(define-record-type <http-server>
- (make-http-server socket poll-idx poll-set wake-thunk)
+ (make-http-server socket poll-idx poll-set wake-thunk threaded?
+ read-workers read-queue handle-queue)
http-server?
(socket http-socket)
(poll-idx http-poll-idx set-http-poll-idx!)
(poll-set http-poll-set)
- (wake-thunk http-wake-thunk))
+ (wake-thunk http-wake-thunk)
+ (threaded? http-threaded?)
+
+ (read-workers http-read-workers set-http-read-workers!)
+ (read-queue http-read-queue)
+ (handle-queue http-handle-queue))
(define *error-events* (logior POLLHUP POLLERR))
(define *read-events* POLLIN)
@@ -82,7 +91,9 @@
(inet-pton family host)
INADDR_LOOPBACK))
(port 8080)
- (socket (make-default-socket family addr port)))
+ (socket (make-default-socket family addr port))
+ (threaded? (and (provided? 'threads) #t))
+ (read-workers (if threaded? 8 1)))
(listen socket 128)
(sigaction SIGPIPE SIG_IGN)
(let ((poll-set (make-empty-poll-set)))
@@ -90,19 +101,78 @@
(lambda (wake-thunk wake-port)
(poll-set-add! poll-set socket *events*)
(poll-set-add! poll-set wake-port *read-events*)
- (make-http-server socket 0 poll-set wake-thunk)))))
+ (let ((read-queue (make-async-queue #:capacity read-workers))
+ (handle-queue (make-async-queue #:capacity read-workers)))
+ (define server
+ (make-http-server socket 0 poll-set wake-thunk threaded?
+ #f read-queue handle-queue))
+ (if threaded?
+ (begin
+ (set-http-read-workers!
+ server
+ (make-thread-pool
+ read-workers
+ (lambda ()
+ (cond ((async-queue-pop! read-queue)
+ => (lambda (p) (read-request! server p)))))))
+ (start-thread-pool! (http-read-workers server))))
+ server)))))
(define (bad-request port)
(write-response (build-response #:version '(1 . 0) #:code 400
#:headers '((content-length . 0)))
port))
+(define (read-request! server port)
+ (call-with-error-handling
+ (lambda ()
+ (cond
+ ((eof-object? (peek-char port))
+ ;; EOF.
+ (close-port port))
+ (else
+ ;; Otherwise, try to read a request from this port.
+ (with-throw-handler #t
+ (lambda ()
+ (let* ((req (read-request port))
+ (body (read-request-body req)))
+ (or (async-queue-push! (http-handle-queue server)
+ (list port req body))
+ (error "failed to push request during shutdown"))
+ ((http-wake-thunk server))))
+ (lambda (k . args)
+ (define-syntax-rule (cleanup-catch statement)
+ (catch #t
+ (lambda () statement)
+ (lambda (k . args)
+ (format (current-error-port) "In ~a:\n" 'statement)
+ (print-exception (current-error-port) #f k args))))
+ (cleanup-catch (bad-request port))
+ (cleanup-catch (close-port port)))))))
+ #:pass-keys '(quit interrupt)
+ #:on-error 'backtrace
+ #:post-error
+ (lambda (k . args)
+ (display "While reading request:\n" (current-error-port))
+ (print-exception (current-error-port) #f k args))))
+
+(define (enqueue-read! server port)
+ (if (http-threaded? server)
+ (or (async-queue-push! (http-read-queue server) port)
+ (false-if-exception
+ (begin
+ (warn "failed to push read during shutdown.")
+ (close-port port))))
+ (read-request! server port)))
+
;; -> (client request body | #f #f #f)
(define (http-read server)
(let* ((poll-set (http-poll-set server)))
(let lp ((idx (http-poll-idx server)))
(let ((revents (poll-set-revents poll-set idx)))
(cond
+ ((async-queue-try-pop! (http-handle-queue server))
+ => (lambda (vals) (apply values vals)))
((zero? idx)
;; The server socket, and the end of our downward loop.
(cond
@@ -137,32 +207,11 @@
;; it. Remove it from the poll set.
(else
(let ((port (poll-set-remove! poll-set idx)))
- ;; Record the next index in all cases, in case the EOF check
+ ;; Record the next index in all cases, in case enqueue-read!
;; throws an error.
(set-http-poll-idx! server (1- idx))
- (cond
- ((eof-object? (peek-char port))
- ;; EOF.
- (close-port port)
- (lp (1- idx)))
- (else
- ;; Otherwise, try to read a request from this port.
- (with-throw-handler
- #t
- (lambda ()
- (let ((req (read-request port)))
- (values port
- req
- (read-request-body req))))
- (lambda (k . args)
- (define-syntax-rule (cleanup-catch statement)
- (catch #t
- (lambda () statement)
- (lambda (k . args)
- (format (current-error-port) "In ~a:\n" 'statement)
- (print-exception (current-error-port) #f k args))))
- (cleanup-catch (bad-request port))
- (cleanup-catch (close-port port)))))))))))))
+ (enqueue-read! server port)
+ (lp (1- idx)))))))))
(define (keep-alive? response)
(let ((v (response-version response)))
@@ -193,9 +242,35 @@
(close-port port)))
(values)))
+(define (seconds-from-now n)
+ (let ((now (gettimeofday)))
+ (cons (+ (car now) n) (cdr now))))
+
+(define (async-queue-for-each queue proc)
+ (let lp ()
+ (cond ((async-queue-try-pop! queue)
+ => (lambda (vals) (proc vals) (lp))))))
+
;; -> unspecified values
(define (http-close server)
(let ((poll-set (http-poll-set server)))
+ (display "Plugging read queue\n")
+ (async-queue-stop-accepting! (http-read-queue server))
+ (display "Stopping read workers\n")
+ (if (http-threaded? server)
+ (stop-thread-pool! (http-read-workers server)
+ (seconds-from-now 5)
+ #:cancel? #t))
+ (display "Plugging read queue\n")
+ (async-queue-stop-accepting! (http-read-queue server))
+ (display "Draining read queue\n")
+ (async-queue-for-each (http-read-queue server) close-port)
+ (display "Plugging handle queue\n")
+ (async-queue-stop-accepting! (http-handle-queue server))
+ (display "Draining handle queue\n")
+ (async-queue-for-each (http-handle-queue server)
+ (lambda (vals) (close-port (car vals))))
+ (display "Closing poll ports\n")
(let lp ((n (poll-set-nfds poll-set)))
(if (positive? n)
(begin