summaryrefslogtreecommitdiff
path: root/libgo/go/http/reverseproxy.go
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/http/reverseproxy.go')
-rw-r--r--libgo/go/http/reverseproxy.go63
1 files changed, 61 insertions, 2 deletions
diff --git a/libgo/go/http/reverseproxy.go b/libgo/go/http/reverseproxy.go
index e4ce1e34c79..3f8bfdc80c2 100644
--- a/libgo/go/http/reverseproxy.go
+++ b/libgo/go/http/reverseproxy.go
@@ -10,7 +10,11 @@ import (
"io"
"log"
"net"
+ "os"
"strings"
+ "sync"
+ "time"
+ "url"
)
// ReverseProxy is an HTTP Handler that takes an incoming request and
@@ -26,6 +30,12 @@ type ReverseProxy struct {
// The Transport used to perform proxy requests.
// If nil, DefaultTransport is used.
Transport RoundTripper
+
+ // FlushInterval specifies the flush interval, in
+ // nanoseconds, to flush to the client while
+ // coping the response body.
+ // If zero, no periodic flushing is done.
+ FlushInterval int64
}
func singleJoiningSlash(a, b string) string {
@@ -44,7 +54,7 @@ func singleJoiningSlash(a, b string) string {
// URLs to the scheme, host, and base path provided in target. If the
// target's path is "/base" and the incoming request was for "/dir",
// the target request will be for /base/dir.
-func NewSingleHostReverseProxy(target *URL) *ReverseProxy {
+func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {
director := func(req *Request) {
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
@@ -95,6 +105,55 @@ func (p *ReverseProxy) ServeHTTP(rw ResponseWriter, req *Request) {
rw.WriteHeader(res.StatusCode)
if res.Body != nil {
- io.Copy(rw, res.Body)
+ var dst io.Writer = rw
+ if p.FlushInterval != 0 {
+ if wf, ok := rw.(writeFlusher); ok {
+ dst = &maxLatencyWriter{dst: wf, latency: p.FlushInterval}
+ }
+ }
+ io.Copy(dst, res.Body)
+ }
+}
+
+type writeFlusher interface {
+ io.Writer
+ Flusher
+}
+
+type maxLatencyWriter struct {
+ dst writeFlusher
+ latency int64 // nanos
+
+ lk sync.Mutex // protects init of done, as well Write + Flush
+ done chan bool
+}
+
+func (m *maxLatencyWriter) Write(p []byte) (n int, err os.Error) {
+ m.lk.Lock()
+ defer m.lk.Unlock()
+ if m.done == nil {
+ m.done = make(chan bool)
+ go m.flushLoop()
+ }
+ n, err = m.dst.Write(p)
+ if err != nil {
+ m.done <- true
+ }
+ return
+}
+
+func (m *maxLatencyWriter) flushLoop() {
+ t := time.NewTicker(m.latency)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ m.lk.Lock()
+ m.dst.Flush()
+ m.lk.Unlock()
+ case <-m.done:
+ return
+ }
}
+ panic("unreached")
}