diff options
Diffstat (limited to 'libgo/go/net/http/transport.go')
-rw-r--r-- | libgo/go/net/http/transport.go | 279 |
1 files changed, 191 insertions, 88 deletions
diff --git a/libgo/go/net/http/transport.go b/libgo/go/net/http/transport.go index 98e198e78af..4cd0533ffc2 100644 --- a/libgo/go/net/http/transport.go +++ b/libgo/go/net/http/transport.go @@ -17,7 +17,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "log" "net" "net/url" @@ -42,14 +41,13 @@ const DefaultMaxIdleConnsPerHost = 2 // https, and http proxies (for either http or https with CONNECT). // Transport can also cache connections for future re-use. type Transport struct { - idleLk sync.Mutex - idleConn map[string][]*persistConn - altLk sync.RWMutex - altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper - - // TODO: tunable on global max cached connections - // TODO: tunable on timeout on cached connections - // TODO: optional pipelining + idleMu sync.Mutex + idleConn map[string][]*persistConn + idleConnCh map[string]chan *persistConn + reqMu sync.Mutex + reqConn map[*Request]*persistConn + altMu sync.RWMutex + altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper // Proxy specifies a function to return a proxy for a given // Request. If the function returns a non-nil error, the @@ -60,19 +58,39 @@ type Transport struct { // Dial specifies the dial function for creating TCP // connections. // If Dial is nil, net.Dial is used. - Dial func(net, addr string) (c net.Conn, err error) + Dial func(network, addr string) (net.Conn, error) // TLSClientConfig specifies the TLS configuration to use with // tls.Client. If nil, the default configuration is used. TLSClientConfig *tls.Config - DisableKeepAlives bool + // DisableKeepAlives, if true, prevents re-use of TCP connections + // between different HTTP requests. + DisableKeepAlives bool + + // DisableCompression, if true, prevents the Transport from + // requesting compression with an "Accept-Encoding: gzip" + // request header when the Request contains no existing + // Accept-Encoding value. If the Transport requests gzip on + // its own and gets a gzipped response, it's transparently + // decoded in the Response.Body. However, if the user + // explicitly requested gzip it is not automatically + // uncompressed. DisableCompression bool // MaxIdleConnsPerHost, if non-zero, controls the maximum idle // (keep-alive) to keep per-host. If zero, // DefaultMaxIdleConnsPerHost is used. MaxIdleConnsPerHost int + + // ResponseHeaderTimeout, if non-zero, specifies the amount of + // time to wait for a server's response headers after fully + // writing the request (including its body, if any). This + // time does not include the time to read the response body. + ResponseHeaderTimeout time.Duration + + // TODO: tunable on global max cached connections + // TODO: tunable on timeout on cached connections } // ProxyFromEnvironment returns the URL of the proxy to use for a @@ -125,6 +143,9 @@ func (tr *transportRequest) extraHeaders() Header { } // RoundTrip implements the RoundTripper interface. +// +// For higher-level HTTP client support (such as handling of cookies +// and redirects), see Get, Post, and the Client type. func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) { if req.URL == nil { return nil, errors.New("http: nil Request.URL") @@ -133,12 +154,12 @@ func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) { return nil, errors.New("http: nil Request.Header") } if req.URL.Scheme != "http" && req.URL.Scheme != "https" { - t.altLk.RLock() + t.altMu.RLock() var rt RoundTripper if t.altProto != nil { rt = t.altProto[req.URL.Scheme] } - t.altLk.RUnlock() + t.altMu.RUnlock() if rt == nil { return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme} } @@ -175,8 +196,8 @@ func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) { if scheme == "http" || scheme == "https" { panic("protocol " + scheme + " already registered") } - t.altLk.Lock() - defer t.altLk.Unlock() + t.altMu.Lock() + defer t.altMu.Unlock() if t.altProto == nil { t.altProto = make(map[string]RoundTripper) } @@ -191,10 +212,10 @@ func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) { // a "keep-alive" state. It does not interrupt any connections currently // in use. func (t *Transport) CloseIdleConnections() { - t.idleLk.Lock() + t.idleMu.Lock() m := t.idleConn t.idleConn = nil - t.idleLk.Unlock() + t.idleMu.Unlock() if m == nil { return } @@ -205,6 +226,17 @@ func (t *Transport) CloseIdleConnections() { } } +// CancelRequest cancels an in-flight request by closing its +// connection. +func (t *Transport) CancelRequest(req *Request) { + t.reqMu.Lock() + pc := t.reqConn[req] + t.reqMu.Unlock() + if pc != nil { + pc.conn.Close() + } +} + // // Private implementation past this point. // @@ -260,12 +292,23 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { if max == 0 { max = DefaultMaxIdleConnsPerHost } - t.idleLk.Lock() + t.idleMu.Lock() + select { + case t.idleConnCh[key] <- pconn: + // We're done with this pconn and somebody else is + // currently waiting for a conn of this type (they're + // actively dialing, but this conn is ready + // first). Chrome calls this socket late binding. See + // https://insouciant.org/tech/connection-management-in-chromium/ + t.idleMu.Unlock() + return true + default: + } if t.idleConn == nil { t.idleConn = make(map[string][]*persistConn) } if len(t.idleConn[key]) >= max { - t.idleLk.Unlock() + t.idleMu.Unlock() pconn.close() return false } @@ -275,14 +318,29 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { } } t.idleConn[key] = append(t.idleConn[key], pconn) - t.idleLk.Unlock() + t.idleMu.Unlock() return true } +func (t *Transport) getIdleConnCh(cm *connectMethod) chan *persistConn { + key := cm.key() + t.idleMu.Lock() + defer t.idleMu.Unlock() + if t.idleConnCh == nil { + t.idleConnCh = make(map[string]chan *persistConn) + } + ch, ok := t.idleConnCh[key] + if !ok { + ch = make(chan *persistConn) + t.idleConnCh[key] = ch + } + return ch +} + func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) { - key := cm.String() - t.idleLk.Lock() - defer t.idleLk.Unlock() + key := cm.key() + t.idleMu.Lock() + defer t.idleMu.Unlock() if t.idleConn == nil { return nil } @@ -304,7 +362,19 @@ func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) { return } } - panic("unreachable") +} + +func (t *Transport) setReqConn(r *Request, pc *persistConn) { + t.reqMu.Lock() + defer t.reqMu.Unlock() + if t.reqConn == nil { + t.reqConn = make(map[*Request]*persistConn) + } + if pc != nil { + t.reqConn[r] = pc + } else { + delete(t.reqConn, r) + } } func (t *Transport) dial(network, addr string) (c net.Conn, err error) { @@ -323,6 +393,37 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { return pc, nil } + type dialRes struct { + pc *persistConn + err error + } + dialc := make(chan dialRes) + go func() { + pc, err := t.dialConn(cm) + dialc <- dialRes{pc, err} + }() + + idleConnCh := t.getIdleConnCh(cm) + select { + case v := <-dialc: + // Our dial finished. + return v.pc, v.err + case pc := <-idleConnCh: + // Another request finished first and its net.Conn + // became available before our dial. Or somebody + // else's dial that they didn't use. + // But our dial is still going, so give it away + // when it finishes: + go func() { + if v := <-dialc; v.err == nil { + t.putIdleConn(v.pc) + } + }() + return pc, nil + } +} + +func (t *Transport) dialConn(cm *connectMethod) (*persistConn, error) { conn, err := t.dial("tcp", cm.addr()) if err != nil { if cm.proxyURL != nil { @@ -335,7 +436,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { pconn := &persistConn{ t: t, - cacheKey: cm.String(), + cacheKey: cm.key(), conn: conn, reqch: make(chan requestAndChan, 50), writech: make(chan writeRequest, 50), @@ -485,6 +586,10 @@ type connectMethod struct { targetAddr string // Not used if proxy + http targetScheme (4th example in table) } +func (ck *connectMethod) key() string { + return ck.String() // TODO: use a struct type instead +} + func (ck *connectMethod) String() string { proxyStr := "" targetAddr := ck.targetAddr @@ -529,14 +634,13 @@ type persistConn struct { closech chan struct{} // broadcast close when readLoop (TCP connection) closes isProxy bool + lk sync.Mutex // guards following 3 fields + numExpectedResponses int + broken bool // an error has happened on this connection; marked broken so it's not reused. // mutateHeaderFunc is an optional func to modify extra // headers on each outbound request before it's written. (the // original Request given to RoundTrip is not modified) mutateHeaderFunc func(Header) - - lk sync.Mutex // guards numExpectedResponses and broken - numExpectedResponses int - broken bool // an error has happened on this connection; marked broken so it's not reused. } func (pc *persistConn) isBroken() bool { @@ -561,7 +665,6 @@ func remoteSideClosed(err error) bool { func (pc *persistConn) readLoop() { defer close(pc.closech) alive := true - var lastbody io.ReadCloser // last response body, if any, read on this connection for alive { pb, err := pc.br.Peek(1) @@ -580,22 +683,23 @@ func (pc *persistConn) readLoop() { rc := <-pc.reqch - // Advance past the previous response's body, if the - // caller hasn't done so. - if lastbody != nil { - lastbody.Close() // assumed idempotent - lastbody = nil - } - var resp *Response if err == nil { resp, err = ReadResponse(pc.br, rc.req) + if err == nil && resp.StatusCode == 100 { + // Skip any 100-continue for now. + // TODO(bradfitz): if rc.req had "Expect: 100-continue", + // actually block the request body write and signal the + // writeLoop now to begin sending it. (Issue 2184) For now we + // eat it, since we're never expecting one. + resp, err = ReadResponse(pc.br, rc.req) + } } + hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0 if err != nil { pc.close() } else { - hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" { resp.Header.Del("Content-Encoding") resp.Header.Del("Content-Length") @@ -605,21 +709,29 @@ func (pc *persistConn) readLoop() { pc.close() err = zerr } else { - resp.Body = &readFirstCloseBoth{&discardOnCloseReadCloser{gzReader}, resp.Body} + resp.Body = &readerAndCloser{gzReader, resp.Body} } } resp.Body = &bodyEOFSignal{body: resp.Body} } - if err != nil || resp.Close || rc.req.Close { + if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 { + // Don't do keep-alive on error if either party requested a close + // or we get an unexpected informational (1xx) response. + // StatusCode 100 is already handled above. alive = false } - hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0 var waitForBodyRead chan bool if hasBody { - lastbody = resp.Body - waitForBodyRead = make(chan bool, 1) + waitForBodyRead = make(chan bool, 2) + resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error { + // Sending false here sets alive to + // false and closes the connection + // below. + waitForBodyRead <- false + return nil + } resp.Body.(*bodyEOFSignal).fn = func(err error) { alive1 := alive if err != nil { @@ -636,15 +748,6 @@ func (pc *persistConn) readLoop() { } if alive && !hasBody { - // When there's no response body, we immediately - // reuse the TCP connection (putIdleConn), but - // we need to prevent ClientConn.Read from - // closing the Response.Body on the next - // loop, otherwise it might close the body - // before the client code has had a chance to - // read it (even though it'll just be 0, EOF). - lastbody = nil - if !pc.t.putIdleConn(pc) { alive = false } @@ -658,6 +761,8 @@ func (pc *persistConn) readLoop() { alive = <-waitForBodyRead } + pc.t.setReqConn(rc.req, nil) + if !alive { pc.close() } @@ -711,8 +816,14 @@ type writeRequest struct { } func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { - if pc.mutateHeaderFunc != nil { - pc.mutateHeaderFunc(req.extraHeaders()) + pc.t.setReqConn(req.Request, pc) + pc.lk.Lock() + pc.numExpectedResponses++ + headerFn := pc.mutateHeaderFunc + pc.lk.Unlock() + + if headerFn != nil { + headerFn(req.extraHeaders()) } // Ask for a compressed version if the caller didn't set their @@ -728,10 +839,6 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err req.extraHeaders().Set("Accept-Encoding", "gzip") } - pc.lk.Lock() - pc.numExpectedResponses++ - pc.lk.Unlock() - // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. @@ -744,6 +851,7 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err var re responseAndError var pconnDeadCh = pc.closech var failTicker <-chan time.Time + var respHeaderTimer <-chan time.Time WaitResponse: for { select { @@ -753,6 +861,9 @@ WaitResponse: pc.close() break WaitResponse } + if d := pc.t.ResponseHeaderTimeout; d > 0 { + respHeaderTimer = time.After(d) + } case <-pconnDeadCh: // The persist connection is dead. This shouldn't // usually happen (only with Connection: close responses @@ -769,7 +880,11 @@ WaitResponse: pconnDeadCh = nil // avoid spinning failTicker = time.After(100 * time.Millisecond) // arbitrary time to wait for resc case <-failTicker: - re = responseAndError{nil, errors.New("net/http: transport closed before response was received")} + re = responseAndError{err: errors.New("net/http: transport closed before response was received")} + break WaitResponse + case <-respHeaderTimer: + pc.close() + re = responseAndError{err: errors.New("net/http: timeout awaiting response headers")} break WaitResponse case re = <-resc: break WaitResponse @@ -780,6 +895,9 @@ WaitResponse: pc.numExpectedResponses-- pc.lk.Unlock() + if re.err != nil { + pc.t.setReqConn(req.Request, nil) + } return re.res, re.err } @@ -823,13 +941,16 @@ func canonicalAddr(url *url.URL) string { // bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most // once, right before its final (error-producing) Read or Close call -// returns. +// returns. If earlyCloseFn is non-nil and Close is called before +// io.EOF is seen, earlyCloseFn is called instead of fn, and its +// return value is the return value from Close. type bodyEOFSignal struct { - body io.ReadCloser - mu sync.Mutex // guards closed, rerr and fn - closed bool // whether Close has been called - rerr error // sticky Read error - fn func(error) // error will be nil on Read io.EOF + body io.ReadCloser + mu sync.Mutex // guards following 4 fields + closed bool // whether Close has been called + rerr error // sticky Read error + fn func(error) // error will be nil on Read io.EOF + earlyCloseFn func() error // optional alt Close func used if io.EOF not seen } func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { @@ -862,6 +983,9 @@ func (es *bodyEOFSignal) Close() error { return nil } es.closed = true + if es.earlyCloseFn != nil && es.rerr != io.EOF { + return es.earlyCloseFn() + } err := es.body.Close() es.condfn(err) return err @@ -879,28 +1003,7 @@ func (es *bodyEOFSignal) condfn(err error) { es.fn = nil } -type readFirstCloseBoth struct { - io.ReadCloser +type readerAndCloser struct { + io.Reader io.Closer } - -func (r *readFirstCloseBoth) Close() error { - if err := r.ReadCloser.Close(); err != nil { - r.Closer.Close() - return err - } - if err := r.Closer.Close(); err != nil { - return err - } - return nil -} - -// discardOnCloseReadCloser consumes all its input on Close. -type discardOnCloseReadCloser struct { - io.ReadCloser -} - -func (d *discardOnCloseReadCloser) Close() error { - io.Copy(ioutil.Discard, d.ReadCloser) // ignore errors; likely invalid or already closed - return d.ReadCloser.Close() -} |