summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Wingo <wingo@pobox.com>2012-02-07 22:12:26 +0100
committerAndy Wingo <wingo@pobox.com>2012-03-12 17:12:37 +0100
commit58942f30d6e7dd58350b8f4bce863e597dd94b43 (patch)
tree65a773d01fb45873ffeba5818501ebcf4351dd77
parentf20ae551a1666e93df694060bce625a48c2c3767 (diff)
downloadguile-wip-threaded-http-server.tar.gz
http web server: allow concurrent write operationswip-threaded-http-server
* module/web/server/http.scm (<http-server>, http-open) (read-request!, write-request!, enqueue-write!, http-write): As in the previous commit, add support for concurrent writes. (http-read): Pop off keepalive ports in this, the main loop. (http-close): Shut down writers appropriately.
-rw-r--r--module/web/server/http.scm104
1 files changed, 83 insertions, 21 deletions
diff --git a/module/web/server/http.scm b/module/web/server/http.scm
index a88b480c7..05990af26 100644
--- a/module/web/server/http.scm
+++ b/module/web/server/http.scm
@@ -67,7 +67,8 @@
(define-record-type <http-server>
(make-http-server socket poll-idx poll-set wake-thunk threaded?
- read-workers read-queue handle-queue)
+ read-workers read-queue handle-queue
+ write-workers write-queue keepalive-queue)
http-server?
(socket http-socket)
(poll-idx http-poll-idx set-http-poll-idx!)
@@ -77,7 +78,11 @@
(read-workers http-read-workers set-http-read-workers!)
(read-queue http-read-queue)
- (handle-queue http-handle-queue))
+ (handle-queue http-handle-queue)
+
+ (write-workers http-write-workers set-http-write-workers!)
+ (write-queue http-write-queue)
+ (keepalive-queue http-keepalive-queue))
(define *error-events* (logior POLLHUP POLLERR))
(define *read-events* POLLIN)
@@ -93,7 +98,8 @@
(port 8080)
(socket (make-default-socket family addr port))
(threaded? (and (provided? 'threads) #t))
- (read-workers (if threaded? 8 1)))
+ (read-workers (if threaded? 8 1))
+ (write-workers (if threaded? 8 1)))
(listen socket 128)
(sigaction SIGPIPE SIG_IGN)
(let ((poll-set (make-empty-poll-set)))
@@ -102,10 +108,14 @@
(poll-set-add! poll-set socket *events*)
(poll-set-add! poll-set wake-port *read-events*)
(let ((read-queue (make-async-queue #:capacity read-workers))
- (handle-queue (make-async-queue #:capacity read-workers)))
+ (handle-queue (make-async-queue #:capacity read-workers))
+ (write-queue (make-async-queue #:capacity write-workers))
+ (keepalive-queue (make-async-queue #:capacity write-workers
+ #:fixed? #f)))
(define server
(make-http-server socket 0 poll-set wake-thunk threaded?
- #f read-queue handle-queue))
+ #f read-queue handle-queue
+ #f write-queue keepalive-queue))
(if threaded?
(begin
(set-http-read-workers!
@@ -115,7 +125,16 @@
(lambda ()
(cond ((async-queue-pop! read-queue)
=> (lambda (p) (read-request! server p)))))))
- (start-thread-pool! (http-read-workers server))))
+ (start-thread-pool! (http-read-workers server))
+ (set-http-write-workers!
+ server
+ (make-thread-pool
+ write-workers
+ (lambda ()
+ (cond ((async-queue-pop! write-queue)
+ => (lambda (args)
+ (apply write-request! server args)))))))
+ (start-thread-pool! (http-write-workers server))))
server)))))
(define (bad-request port)
@@ -173,6 +192,10 @@
(cond
((async-queue-try-pop! (http-handle-queue server))
=> (lambda (vals) (apply values vals)))
+ ((async-queue-try-pop! (http-keepalive-queue server))
+ => (lambda (port)
+ (poll-set-add! poll-set port *events*)
+ (lp idx)))
((zero? idx)
;; The server socket, and the end of our downward loop.
(cond
@@ -224,23 +247,48 @@
((0) (memq 'keep-alive (response-connection response)))))
(else #f)))))
+(define (write-request! server client response body)
+ (call-with-error-handling
+ (lambda ()
+ (let* ((response (write-response response client))
+ (port (response-port response)))
+ (cond
+ ((not body)) ; pass
+ ((bytevector? body)
+ (write-response-body response body))
+ (else
+ (error "Expected a bytevector for body" body)))
+ (cond
+ ((keep-alive? response)
+ (force-output port)
+ (or (async-queue-push! (http-keepalive-queue server) port)
+ ;; Shutting down; there is a sane thing to do, no need to
+ ;; error.
+ (close-port port))
+ ((http-wake-thunk server)))
+ (else
+ (close-port port)))))
+ #:pass-keys '(quit interrupt)
+ #:on-error 'backtrace
+ #:post-error
+ (lambda (k . args)
+ (display "While writing response:\n" (current-error-port))
+ (print-exception (current-error-port) #f k args))))
+
+(define (enqueue-write! server client response body)
+ (if (http-threaded? server)
+ (or (async-queue-push! (http-write-queue server)
+ (list client response body))
+ (false-if-exception
+ (begin
+ (warn "failed to push write during shutdown.")
+ (close-port client))))
+ (write-request! server client response body)))
+
;; -> 0 values
(define (http-write server client response body)
- (let* ((response (write-response response client))
- (port (response-port response)))
- (cond
- ((not body)) ; pass
- ((bytevector? body)
- (write-response-body response body))
- (else
- (error "Expected a bytevector for body" body)))
- (cond
- ((keep-alive? response)
- (force-output port)
- (poll-set-add! (http-poll-set server) port *events*))
- (else
- (close-port port)))
- (values)))
+ (enqueue-write! server client response body)
+ (values))
(define (seconds-from-now n)
(let ((now (gettimeofday)))
@@ -270,6 +318,20 @@
(display "Draining handle queue\n")
(async-queue-for-each (http-handle-queue server)
(lambda (vals) (close-port (car vals))))
+ (display "Plugging write queue\n")
+ (async-queue-stop-accepting! (http-write-queue server))
+ (display "Stopping write workers\n")
+ (if (http-threaded? server)
+ (stop-thread-pool! (http-write-workers server)
+ (seconds-from-now 5)
+ #:cancel? #t))
+ (display "Draining write queue\n")
+ (async-queue-for-each (http-write-queue server)
+ (lambda (vals) (close-port (car vals))))
+ (display "Plugging keepalive queue\n")
+ (async-queue-stop-accepting! (http-keepalive-queue server))
+ (display "Draining keepalive queue\n")
+ (async-queue-for-each (http-keepalive-queue server) close-port)
(display "Closing poll ports\n")
(let lp ((n (poll-set-nfds poll-set)))
(if (positive? n)