diff options
Diffstat (limited to 'libgo/go/http/reverseproxy.go')
-rw-r--r-- | libgo/go/http/reverseproxy.go | 63 |
1 files changed, 61 insertions, 2 deletions
diff --git a/libgo/go/http/reverseproxy.go b/libgo/go/http/reverseproxy.go index e4ce1e34c79..3f8bfdc80c2 100644 --- a/libgo/go/http/reverseproxy.go +++ b/libgo/go/http/reverseproxy.go @@ -10,7 +10,11 @@ import ( "io" "log" "net" + "os" "strings" + "sync" + "time" + "url" ) // ReverseProxy is an HTTP Handler that takes an incoming request and @@ -26,6 +30,12 @@ type ReverseProxy struct { // The Transport used to perform proxy requests. // If nil, DefaultTransport is used. Transport RoundTripper + + // FlushInterval specifies the flush interval, in + // nanoseconds, to flush to the client while + // coping the response body. + // If zero, no periodic flushing is done. + FlushInterval int64 } func singleJoiningSlash(a, b string) string { @@ -44,7 +54,7 @@ func singleJoiningSlash(a, b string) string { // URLs to the scheme, host, and base path provided in target. If the // target's path is "/base" and the incoming request was for "/dir", // the target request will be for /base/dir. -func NewSingleHostReverseProxy(target *URL) *ReverseProxy { +func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy { director := func(req *Request) { req.URL.Scheme = target.Scheme req.URL.Host = target.Host @@ -95,6 +105,55 @@ func (p *ReverseProxy) ServeHTTP(rw ResponseWriter, req *Request) { rw.WriteHeader(res.StatusCode) if res.Body != nil { - io.Copy(rw, res.Body) + var dst io.Writer = rw + if p.FlushInterval != 0 { + if wf, ok := rw.(writeFlusher); ok { + dst = &maxLatencyWriter{dst: wf, latency: p.FlushInterval} + } + } + io.Copy(dst, res.Body) + } +} + +type writeFlusher interface { + io.Writer + Flusher +} + +type maxLatencyWriter struct { + dst writeFlusher + latency int64 // nanos + + lk sync.Mutex // protects init of done, as well Write + Flush + done chan bool +} + +func (m *maxLatencyWriter) Write(p []byte) (n int, err os.Error) { + m.lk.Lock() + defer m.lk.Unlock() + if m.done == nil { + m.done = make(chan bool) + go m.flushLoop() + } + n, err = m.dst.Write(p) + if err != nil { + m.done <- true + } + return +} + +func (m *maxLatencyWriter) flushLoop() { + t := time.NewTicker(m.latency) + defer t.Stop() + for { + select { + case <-t.C: + m.lk.Lock() + m.dst.Flush() + m.lk.Unlock() + case <-m.done: + return + } } + panic("unreached") } |