diff options
Diffstat (limited to 'src/net/http/transport.go')
-rw-r--r-- | src/net/http/transport.go | 1242 |
1 files changed, 1242 insertions, 0 deletions
diff --git a/src/net/http/transport.go b/src/net/http/transport.go new file mode 100644 index 000000000..527ed8bdd --- /dev/null +++ b/src/net/http/transport.go @@ -0,0 +1,1242 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// HTTP client implementation. See RFC 2616. +// +// This is the low-level Transport implementation of RoundTripper. +// The high-level interface is in client.go. + +package http + +import ( + "bufio" + "compress/gzip" + "crypto/tls" + "errors" + "fmt" + "io" + "log" + "net" + "net/url" + "os" + "strings" + "sync" + "time" +) + +// DefaultTransport is the default implementation of Transport and is +// used by DefaultClient. It establishes network connections as needed +// and caches them for reuse by subsequent calls. It uses HTTP proxies +// as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and +// $no_proxy) environment variables. +var DefaultTransport RoundTripper = &Transport{ + Proxy: ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, +} + +// DefaultMaxIdleConnsPerHost is the default value of Transport's +// MaxIdleConnsPerHost. +const DefaultMaxIdleConnsPerHost = 2 + +// Transport is an implementation of RoundTripper that supports HTTP, +// HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT). +// Transport can also cache connections for future re-use. +type Transport struct { + idleMu sync.Mutex + idleConn map[connectMethodKey][]*persistConn + idleConnCh map[connectMethodKey]chan *persistConn + reqMu sync.Mutex + reqCanceler map[*Request]func() + altMu sync.RWMutex + altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper + + // Proxy specifies a function to return a proxy for a given + // Request. If the function returns a non-nil error, the + // request is aborted with the provided error. + // If Proxy is nil or returns a nil *URL, no proxy is used. + Proxy func(*Request) (*url.URL, error) + + // Dial specifies the dial function for creating unencrypted + // TCP connections. + // If Dial is nil, net.Dial is used. + Dial func(network, addr string) (net.Conn, error) + + // DialTLS specifies an optional dial function for creating + // TLS connections for non-proxied HTTPS requests. + // + // If DialTLS is nil, Dial and TLSClientConfig are used. + // + // If DialTLS is set, the Dial hook is not used for HTTPS + // requests and the TLSClientConfig and TLSHandshakeTimeout + // are ignored. The returned net.Conn is assumed to already be + // past the TLS handshake. + DialTLS func(network, addr string) (net.Conn, error) + + // TLSClientConfig specifies the TLS configuration to use with + // tls.Client. If nil, the default configuration is used. + TLSClientConfig *tls.Config + + // TLSHandshakeTimeout specifies the maximum amount of time waiting to + // wait for a TLS handshake. Zero means no timeout. + TLSHandshakeTimeout time.Duration + + // DisableKeepAlives, if true, prevents re-use of TCP connections + // between different HTTP requests. + DisableKeepAlives bool + + // DisableCompression, if true, prevents the Transport from + // requesting compression with an "Accept-Encoding: gzip" + // request header when the Request contains no existing + // Accept-Encoding value. If the Transport requests gzip on + // its own and gets a gzipped response, it's transparently + // decoded in the Response.Body. However, if the user + // explicitly requested gzip it is not automatically + // uncompressed. + DisableCompression bool + + // MaxIdleConnsPerHost, if non-zero, controls the maximum idle + // (keep-alive) to keep per-host. If zero, + // DefaultMaxIdleConnsPerHost is used. + MaxIdleConnsPerHost int + + // ResponseHeaderTimeout, if non-zero, specifies the amount of + // time to wait for a server's response headers after fully + // writing the request (including its body, if any). This + // time does not include the time to read the response body. + ResponseHeaderTimeout time.Duration + + // TODO: tunable on global max cached connections + // TODO: tunable on timeout on cached connections +} + +// ProxyFromEnvironment returns the URL of the proxy to use for a +// given request, as indicated by the environment variables +// $HTTP_PROXY and $NO_PROXY (or $http_proxy and $no_proxy). +// An error is returned if the proxy environment is invalid. +// A nil URL and nil error are returned if no proxy is defined in the +// environment, or a proxy should not be used for the given request. +// +// As a special case, if req.URL.Host is "localhost" (with or without +// a port number), then a nil URL and nil error will be returned. +func ProxyFromEnvironment(req *Request) (*url.URL, error) { + proxy := httpProxyEnv.Get() + if proxy == "" { + return nil, nil + } + if !useProxy(canonicalAddr(req.URL)) { + return nil, nil + } + proxyURL, err := url.Parse(proxy) + if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") { + // proxy was bogus. Try prepending "http://" to it and + // see if that parses correctly. If not, we fall + // through and complain about the original one. + if proxyURL, err := url.Parse("http://" + proxy); err == nil { + return proxyURL, nil + } + } + if err != nil { + return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err) + } + return proxyURL, nil +} + +// ProxyURL returns a proxy function (for use in a Transport) +// that always returns the same URL. +func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) { + return func(*Request) (*url.URL, error) { + return fixedURL, nil + } +} + +// transportRequest is a wrapper around a *Request that adds +// optional extra headers to write. +type transportRequest struct { + *Request // original request, not to be mutated + extra Header // extra headers to write, or nil +} + +func (tr *transportRequest) extraHeaders() Header { + if tr.extra == nil { + tr.extra = make(Header) + } + return tr.extra +} + +// RoundTrip implements the RoundTripper interface. +// +// For higher-level HTTP client support (such as handling of cookies +// and redirects), see Get, Post, and the Client type. +func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) { + if req.URL == nil { + req.closeBody() + return nil, errors.New("http: nil Request.URL") + } + if req.Header == nil { + req.closeBody() + return nil, errors.New("http: nil Request.Header") + } + if req.URL.Scheme != "http" && req.URL.Scheme != "https" { + t.altMu.RLock() + var rt RoundTripper + if t.altProto != nil { + rt = t.altProto[req.URL.Scheme] + } + t.altMu.RUnlock() + if rt == nil { + req.closeBody() + return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme} + } + return rt.RoundTrip(req) + } + if req.URL.Host == "" { + req.closeBody() + return nil, errors.New("http: no Host in request URL") + } + treq := &transportRequest{Request: req} + cm, err := t.connectMethodForRequest(treq) + if err != nil { + req.closeBody() + return nil, err + } + + // Get the cached or newly-created connection to either the + // host (for http or https), the http proxy, or the http proxy + // pre-CONNECTed to https server. In any case, we'll be ready + // to send it requests. + pconn, err := t.getConn(req, cm) + if err != nil { + t.setReqCanceler(req, nil) + req.closeBody() + return nil, err + } + + return pconn.roundTrip(treq) +} + +// RegisterProtocol registers a new protocol with scheme. +// The Transport will pass requests using the given scheme to rt. +// It is rt's responsibility to simulate HTTP request semantics. +// +// RegisterProtocol can be used by other packages to provide +// implementations of protocol schemes like "ftp" or "file". +func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) { + if scheme == "http" || scheme == "https" { + panic("protocol " + scheme + " already registered") + } + t.altMu.Lock() + defer t.altMu.Unlock() + if t.altProto == nil { + t.altProto = make(map[string]RoundTripper) + } + if _, exists := t.altProto[scheme]; exists { + panic("protocol " + scheme + " already registered") + } + t.altProto[scheme] = rt +} + +// CloseIdleConnections closes any connections which were previously +// connected from previous requests but are now sitting idle in +// a "keep-alive" state. It does not interrupt any connections currently +// in use. +func (t *Transport) CloseIdleConnections() { + t.idleMu.Lock() + m := t.idleConn + t.idleConn = nil + t.idleConnCh = nil + t.idleMu.Unlock() + for _, conns := range m { + for _, pconn := range conns { + pconn.close() + } + } +} + +// CancelRequest cancels an in-flight request by closing its +// connection. +func (t *Transport) CancelRequest(req *Request) { + t.reqMu.Lock() + cancel := t.reqCanceler[req] + t.reqMu.Unlock() + if cancel != nil { + cancel() + } +} + +// +// Private implementation past this point. +// + +var ( + httpProxyEnv = &envOnce{ + names: []string{"HTTP_PROXY", "http_proxy"}, + } + noProxyEnv = &envOnce{ + names: []string{"NO_PROXY", "no_proxy"}, + } +) + +// envOnce looks up an environment variable (optionally by multiple +// names) once. It mitigates expensive lookups on some platforms +// (e.g. Windows). +type envOnce struct { + names []string + once sync.Once + val string +} + +func (e *envOnce) Get() string { + e.once.Do(e.init) + return e.val +} + +func (e *envOnce) init() { + for _, n := range e.names { + e.val = os.Getenv(n) + if e.val != "" { + return + } + } +} + +// reset is used by tests +func (e *envOnce) reset() { + e.once = sync.Once{} + e.val = "" +} + +func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) { + cm.targetScheme = treq.URL.Scheme + cm.targetAddr = canonicalAddr(treq.URL) + if t.Proxy != nil { + cm.proxyURL, err = t.Proxy(treq.Request) + } + return cm, nil +} + +// proxyAuth returns the Proxy-Authorization header to set +// on requests, if applicable. +func (cm *connectMethod) proxyAuth() string { + if cm.proxyURL == nil { + return "" + } + if u := cm.proxyURL.User; u != nil { + username := u.Username() + password, _ := u.Password() + return "Basic " + basicAuth(username, password) + } + return "" +} + +// putIdleConn adds pconn to the list of idle persistent connections awaiting +// a new request. +// If pconn is no longer needed or not in a good state, putIdleConn +// returns false. +func (t *Transport) putIdleConn(pconn *persistConn) bool { + if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 { + pconn.close() + return false + } + if pconn.isBroken() { + return false + } + key := pconn.cacheKey + max := t.MaxIdleConnsPerHost + if max == 0 { + max = DefaultMaxIdleConnsPerHost + } + t.idleMu.Lock() + + waitingDialer := t.idleConnCh[key] + select { + case waitingDialer <- pconn: + // We're done with this pconn and somebody else is + // currently waiting for a conn of this type (they're + // actively dialing, but this conn is ready + // first). Chrome calls this socket late binding. See + // https://insouciant.org/tech/connection-management-in-chromium/ + t.idleMu.Unlock() + return true + default: + if waitingDialer != nil { + // They had populated this, but their dial won + // first, so we can clean up this map entry. + delete(t.idleConnCh, key) + } + } + if t.idleConn == nil { + t.idleConn = make(map[connectMethodKey][]*persistConn) + } + if len(t.idleConn[key]) >= max { + t.idleMu.Unlock() + pconn.close() + return false + } + for _, exist := range t.idleConn[key] { + if exist == pconn { + log.Fatalf("dup idle pconn %p in freelist", pconn) + } + } + t.idleConn[key] = append(t.idleConn[key], pconn) + t.idleMu.Unlock() + return true +} + +// getIdleConnCh returns a channel to receive and return idle +// persistent connection for the given connectMethod. +// It may return nil, if persistent connections are not being used. +func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn { + if t.DisableKeepAlives { + return nil + } + key := cm.key() + t.idleMu.Lock() + defer t.idleMu.Unlock() + if t.idleConnCh == nil { + t.idleConnCh = make(map[connectMethodKey]chan *persistConn) + } + ch, ok := t.idleConnCh[key] + if !ok { + ch = make(chan *persistConn) + t.idleConnCh[key] = ch + } + return ch +} + +func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) { + key := cm.key() + t.idleMu.Lock() + defer t.idleMu.Unlock() + if t.idleConn == nil { + return nil + } + for { + pconns, ok := t.idleConn[key] + if !ok { + return nil + } + if len(pconns) == 1 { + pconn = pconns[0] + delete(t.idleConn, key) + } else { + // 2 or more cached connections; pop last + // TODO: queue? + pconn = pconns[len(pconns)-1] + t.idleConn[key] = pconns[:len(pconns)-1] + } + if !pconn.isBroken() { + return + } + } +} + +func (t *Transport) setReqCanceler(r *Request, fn func()) { + t.reqMu.Lock() + defer t.reqMu.Unlock() + if t.reqCanceler == nil { + t.reqCanceler = make(map[*Request]func()) + } + if fn != nil { + t.reqCanceler[r] = fn + } else { + delete(t.reqCanceler, r) + } +} + +func (t *Transport) dial(network, addr string) (c net.Conn, err error) { + if t.Dial != nil { + return t.Dial(network, addr) + } + return net.Dial(network, addr) +} + +// Testing hooks: +var prePendingDial, postPendingDial func() + +// getConn dials and creates a new persistConn to the target as +// specified in the connectMethod. This includes doing a proxy CONNECT +// and/or setting up TLS. If this doesn't return an error, the persistConn +// is ready to write requests to. +func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error) { + if pc := t.getIdleConn(cm); pc != nil { + return pc, nil + } + + type dialRes struct { + pc *persistConn + err error + } + dialc := make(chan dialRes) + + handlePendingDial := func() { + if prePendingDial != nil { + prePendingDial() + } + go func() { + if v := <-dialc; v.err == nil { + t.putIdleConn(v.pc) + } + if postPendingDial != nil { + postPendingDial() + } + }() + } + + cancelc := make(chan struct{}) + t.setReqCanceler(req, func() { close(cancelc) }) + + go func() { + pc, err := t.dialConn(cm) + dialc <- dialRes{pc, err} + }() + + idleConnCh := t.getIdleConnCh(cm) + select { + case v := <-dialc: + // Our dial finished. + return v.pc, v.err + case pc := <-idleConnCh: + // Another request finished first and its net.Conn + // became available before our dial. Or somebody + // else's dial that they didn't use. + // But our dial is still going, so give it away + // when it finishes: + handlePendingDial() + return pc, nil + case <-cancelc: + handlePendingDial() + return nil, errors.New("net/http: request canceled while waiting for connection") + } +} + +func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) { + pconn := &persistConn{ + t: t, + cacheKey: cm.key(), + reqch: make(chan requestAndChan, 1), + writech: make(chan writeRequest, 1), + closech: make(chan struct{}), + writeErrCh: make(chan error, 1), + } + tlsDial := t.DialTLS != nil && cm.targetScheme == "https" && cm.proxyURL == nil + if tlsDial { + var err error + pconn.conn, err = t.DialTLS("tcp", cm.addr()) + if err != nil { + return nil, err + } + if tc, ok := pconn.conn.(*tls.Conn); ok { + cs := tc.ConnectionState() + pconn.tlsState = &cs + } + } else { + conn, err := t.dial("tcp", cm.addr()) + if err != nil { + if cm.proxyURL != nil { + err = fmt.Errorf("http: error connecting to proxy %s: %v", cm.proxyURL, err) + } + return nil, err + } + pconn.conn = conn + } + + // Proxy setup. + switch { + case cm.proxyURL == nil: + // Do nothing. Not using a proxy. + case cm.targetScheme == "http": + pconn.isProxy = true + if pa := cm.proxyAuth(); pa != "" { + pconn.mutateHeaderFunc = func(h Header) { + h.Set("Proxy-Authorization", pa) + } + } + case cm.targetScheme == "https": + conn := pconn.conn + connectReq := &Request{ + Method: "CONNECT", + URL: &url.URL{Opaque: cm.targetAddr}, + Host: cm.targetAddr, + Header: make(Header), + } + if pa := cm.proxyAuth(); pa != "" { + connectReq.Header.Set("Proxy-Authorization", pa) + } + connectReq.Write(conn) + + // Read response. + // Okay to use and discard buffered reader here, because + // TLS server will not speak until spoken to. + br := bufio.NewReader(conn) + resp, err := ReadResponse(br, connectReq) + if err != nil { + conn.Close() + return nil, err + } + if resp.StatusCode != 200 { + f := strings.SplitN(resp.Status, " ", 2) + conn.Close() + return nil, errors.New(f[1]) + } + } + + if cm.targetScheme == "https" && !tlsDial { + // Initiate TLS and check remote host name against certificate. + cfg := t.TLSClientConfig + if cfg == nil || cfg.ServerName == "" { + host := cm.tlsHost() + if cfg == nil { + cfg = &tls.Config{ServerName: host} + } else { + clone := *cfg // shallow clone + clone.ServerName = host + cfg = &clone + } + } + plainConn := pconn.conn + tlsConn := tls.Client(plainConn, cfg) + errc := make(chan error, 2) + var timer *time.Timer // for canceling TLS handshake + if d := t.TLSHandshakeTimeout; d != 0 { + timer = time.AfterFunc(d, func() { + errc <- tlsHandshakeTimeoutError{} + }) + } + go func() { + err := tlsConn.Handshake() + if timer != nil { + timer.Stop() + } + errc <- err + }() + if err := <-errc; err != nil { + plainConn.Close() + return nil, err + } + if !cfg.InsecureSkipVerify { + if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil { + plainConn.Close() + return nil, err + } + } + cs := tlsConn.ConnectionState() + pconn.tlsState = &cs + pconn.conn = tlsConn + } + + pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF}) + pconn.bw = bufio.NewWriter(pconn.conn) + go pconn.readLoop() + go pconn.writeLoop() + return pconn, nil +} + +// useProxy returns true if requests to addr should use a proxy, +// according to the NO_PROXY or no_proxy environment variable. +// addr is always a canonicalAddr with a host and port. +func useProxy(addr string) bool { + if len(addr) == 0 { + return true + } + host, _, err := net.SplitHostPort(addr) + if err != nil { + return false + } + if host == "localhost" { + return false + } + if ip := net.ParseIP(host); ip != nil { + if ip.IsLoopback() { + return false + } + } + + no_proxy := noProxyEnv.Get() + if no_proxy == "*" { + return false + } + + addr = strings.ToLower(strings.TrimSpace(addr)) + if hasPort(addr) { + addr = addr[:strings.LastIndex(addr, ":")] + } + + for _, p := range strings.Split(no_proxy, ",") { + p = strings.ToLower(strings.TrimSpace(p)) + if len(p) == 0 { + continue + } + if hasPort(p) { + p = p[:strings.LastIndex(p, ":")] + } + if addr == p { + return false + } + if p[0] == '.' && (strings.HasSuffix(addr, p) || addr == p[1:]) { + // no_proxy ".foo.com" matches "bar.foo.com" or "foo.com" + return false + } + if p[0] != '.' && strings.HasSuffix(addr, p) && addr[len(addr)-len(p)-1] == '.' { + // no_proxy "foo.com" matches "bar.foo.com" + return false + } + } + return true +} + +// connectMethod is the map key (in its String form) for keeping persistent +// TCP connections alive for subsequent HTTP requests. +// +// A connect method may be of the following types: +// +// Cache key form Description +// ----------------- ------------------------- +// |http|foo.com http directly to server, no proxy +// |https|foo.com https directly to server, no proxy +// http://proxy.com|https|foo.com http to proxy, then CONNECT to foo.com +// http://proxy.com|http http to proxy, http to anywhere after that +// +// Note: no support to https to the proxy yet. +// +type connectMethod struct { + proxyURL *url.URL // nil for no proxy, else full proxy URL + targetScheme string // "http" or "https" + targetAddr string // Not used if proxy + http targetScheme (4th example in table) +} + +func (cm *connectMethod) key() connectMethodKey { + proxyStr := "" + targetAddr := cm.targetAddr + if cm.proxyURL != nil { + proxyStr = cm.proxyURL.String() + if cm.targetScheme == "http" { + targetAddr = "" + } + } + return connectMethodKey{ + proxy: proxyStr, + scheme: cm.targetScheme, + addr: targetAddr, + } +} + +// addr returns the first hop "host:port" to which we need to TCP connect. +func (cm *connectMethod) addr() string { + if cm.proxyURL != nil { + return canonicalAddr(cm.proxyURL) + } + return cm.targetAddr +} + +// tlsHost returns the host name to match against the peer's +// TLS certificate. +func (cm *connectMethod) tlsHost() string { + h := cm.targetAddr + if hasPort(h) { + h = h[:strings.LastIndex(h, ":")] + } + return h +} + +// connectMethodKey is the map key version of connectMethod, with a +// stringified proxy URL (or the empty string) instead of a pointer to +// a URL. +type connectMethodKey struct { + proxy, scheme, addr string +} + +func (k connectMethodKey) String() string { + // Only used by tests. + return fmt.Sprintf("%s|%s|%s", k.proxy, k.scheme, k.addr) +} + +// persistConn wraps a connection, usually a persistent one +// (but may be used for non-keep-alive requests as well) +type persistConn struct { + t *Transport + cacheKey connectMethodKey + conn net.Conn + tlsState *tls.ConnectionState + br *bufio.Reader // from conn + sawEOF bool // whether we've seen EOF from conn; owned by readLoop + bw *bufio.Writer // to conn + reqch chan requestAndChan // written by roundTrip; read by readLoop + writech chan writeRequest // written by roundTrip; read by writeLoop + closech chan struct{} // closed when conn closed + isProxy bool + // writeErrCh passes the request write error (usually nil) + // from the writeLoop goroutine to the readLoop which passes + // it off to the res.Body reader, which then uses it to decide + // whether or not a connection can be reused. Issue 7569. + writeErrCh chan error + + lk sync.Mutex // guards following fields + numExpectedResponses int + closed bool // whether conn has been closed + broken bool // an error has happened on this connection; marked broken so it's not reused. + // mutateHeaderFunc is an optional func to modify extra + // headers on each outbound request before it's written. (the + // original Request given to RoundTrip is not modified) + mutateHeaderFunc func(Header) +} + +// isBroken reports whether this connection is in a known broken state. +func (pc *persistConn) isBroken() bool { + pc.lk.Lock() + b := pc.broken + pc.lk.Unlock() + return b +} + +func (pc *persistConn) cancelRequest() { + pc.conn.Close() +} + +var remoteSideClosedFunc func(error) bool // or nil to use default + +func remoteSideClosed(err error) bool { + if err == io.EOF { + return true + } + if remoteSideClosedFunc != nil { + return remoteSideClosedFunc(err) + } + return false +} + +func (pc *persistConn) readLoop() { + alive := true + + for alive { + pb, err := pc.br.Peek(1) + + pc.lk.Lock() + if pc.numExpectedResponses == 0 { + if !pc.closed { + pc.closeLocked() + if len(pb) > 0 { + log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", + string(pb), err) + } + } + pc.lk.Unlock() + return + } + pc.lk.Unlock() + + rc := <-pc.reqch + + var resp *Response + if err == nil { + resp, err = ReadResponse(pc.br, rc.req) + if err == nil && resp.StatusCode == 100 { + // Skip any 100-continue for now. + // TODO(bradfitz): if rc.req had "Expect: 100-continue", + // actually block the request body write and signal the + // writeLoop now to begin sending it. (Issue 2184) For now we + // eat it, since we're never expecting one. + resp, err = ReadResponse(pc.br, rc.req) + } + } + + if resp != nil { + resp.TLS = pc.tlsState + } + + hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0 + + if err != nil { + pc.close() + } else { + if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" { + resp.Header.Del("Content-Encoding") + resp.Header.Del("Content-Length") + resp.ContentLength = -1 + resp.Body = &gzipReader{body: resp.Body} + } + resp.Body = &bodyEOFSignal{body: resp.Body} + } + + if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 { + // Don't do keep-alive on error if either party requested a close + // or we get an unexpected informational (1xx) response. + // StatusCode 100 is already handled above. + alive = false + } + + var waitForBodyRead chan bool + if hasBody { + waitForBodyRead = make(chan bool, 2) + resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error { + // Sending false here sets alive to + // false and closes the connection + // below. + waitForBodyRead <- false + return nil + } + resp.Body.(*bodyEOFSignal).fn = func(err error) { + waitForBodyRead <- alive && + err == nil && + !pc.sawEOF && + pc.wroteRequest() && + pc.t.putIdleConn(pc) + } + } + + if alive && !hasBody { + alive = !pc.sawEOF && + pc.wroteRequest() && + pc.t.putIdleConn(pc) + } + + rc.ch <- responseAndError{resp, err} + + // Wait for the just-returned response body to be fully consumed + // before we race and peek on the underlying bufio reader. + if waitForBodyRead != nil { + select { + case alive = <-waitForBodyRead: + case <-pc.closech: + alive = false + } + } + + pc.t.setReqCanceler(rc.req, nil) + + if !alive { + pc.close() + } + } +} + +func (pc *persistConn) writeLoop() { + for { + select { + case wr := <-pc.writech: + if pc.isBroken() { + wr.ch <- errors.New("http: can't write HTTP request on broken connection") + continue + } + err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) + if err == nil { + err = pc.bw.Flush() + } + if err != nil { + pc.markBroken() + wr.req.Request.closeBody() + } + pc.writeErrCh <- err // to the body reader, which might recycle us + wr.ch <- err // to the roundTrip function + case <-pc.closech: + return + } + } +} + +// wroteRequest is a check before recycling a connection that the previous write +// (from writeLoop above) happened and was successful. +func (pc *persistConn) wroteRequest() bool { + select { + case err := <-pc.writeErrCh: + // Common case: the write happened well before the response, so + // avoid creating a timer. + return err == nil + default: + // Rare case: the request was written in writeLoop above but + // before it could send to pc.writeErrCh, the reader read it + // all, processed it, and called us here. In this case, give the + // write goroutine a bit of time to finish its send. + // + // Less rare case: We also get here in the legitimate case of + // Issue 7569, where the writer is still writing (or stalled), + // but the server has already replied. In this case, we don't + // want to wait too long, and we want to return false so this + // connection isn't re-used. + select { + case err := <-pc.writeErrCh: + return err == nil + case <-time.After(50 * time.Millisecond): + return false + } + } +} + +type responseAndError struct { + res *Response + err error +} + +type requestAndChan struct { + req *Request + ch chan responseAndError + + // did the Transport (as opposed to the client code) add an + // Accept-Encoding gzip header? only if it we set it do + // we transparently decode the gzip. + addedGzip bool +} + +// A writeRequest is sent by the readLoop's goroutine to the +// writeLoop's goroutine to write a request while the read loop +// concurrently waits on both the write response and the server's +// reply. +type writeRequest struct { + req *transportRequest + ch chan<- error +} + +type httpError struct { + err string + timeout bool +} + +func (e *httpError) Error() string { return e.err } +func (e *httpError) Timeout() bool { return e.timeout } +func (e *httpError) Temporary() bool { return true } + +var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true} +var errClosed error = &httpError{err: "net/http: transport closed before response was received"} + +func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { + pc.t.setReqCanceler(req.Request, pc.cancelRequest) + pc.lk.Lock() + pc.numExpectedResponses++ + headerFn := pc.mutateHeaderFunc + pc.lk.Unlock() + + if headerFn != nil { + headerFn(req.extraHeaders()) + } + + // Ask for a compressed version if the caller didn't set their + // own value for Accept-Encoding. We only attempted to + // uncompress the gzip stream if we were the layer that + // requested it. + requestedGzip := false + if !pc.t.DisableCompression && req.Header.Get("Accept-Encoding") == "" && req.Method != "HEAD" { + // Request gzip only, not deflate. Deflate is ambiguous and + // not as universally supported anyway. + // See: http://www.gzip.org/zlib/zlib_faq.html#faq38 + // + // Note that we don't request this for HEAD requests, + // due to a bug in nginx: + // http://trac.nginx.org/nginx/ticket/358 + // http://golang.org/issue/5522 + requestedGzip = true + req.extraHeaders().Set("Accept-Encoding", "gzip") + } + + // Write the request concurrently with waiting for a response, + // in case the server decides to reply before reading our full + // request body. + writeErrCh := make(chan error, 1) + pc.writech <- writeRequest{req, writeErrCh} + + resc := make(chan responseAndError, 1) + pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} + + var re responseAndError + var pconnDeadCh = pc.closech + var failTicker <-chan time.Time + var respHeaderTimer <-chan time.Time +WaitResponse: + for { + select { + case err := <-writeErrCh: + if err != nil { + re = responseAndError{nil, err} + pc.close() + break WaitResponse + } + if d := pc.t.ResponseHeaderTimeout; d > 0 { + respHeaderTimer = time.After(d) + } + case <-pconnDeadCh: + // The persist connection is dead. This shouldn't + // usually happen (only with Connection: close responses + // with no response bodies), but if it does happen it + // means either a) the remote server hung up on us + // prematurely, or b) the readLoop sent us a response & + // closed its closech at roughly the same time, and we + // selected this case first, in which case a response + // might still be coming soon. + // + // We can't avoid the select race in b) by using a unbuffered + // resc channel instead, because then goroutines can + // leak if we exit due to other errors. + pconnDeadCh = nil // avoid spinning + failTicker = time.After(100 * time.Millisecond) // arbitrary time to wait for resc + case <-failTicker: + re = responseAndError{err: errClosed} + break WaitResponse + case <-respHeaderTimer: + pc.close() + re = responseAndError{err: errTimeout} + break WaitResponse + case re = <-resc: + break WaitResponse + } + } + + pc.lk.Lock() + pc.numExpectedResponses-- + pc.lk.Unlock() + + if re.err != nil { + pc.t.setReqCanceler(req.Request, nil) + } + return re.res, re.err +} + +// markBroken marks a connection as broken (so it's not reused). +// It differs from close in that it doesn't close the underlying +// connection for use when it's still being read. +func (pc *persistConn) markBroken() { + pc.lk.Lock() + defer pc.lk.Unlock() + pc.broken = true +} + +func (pc *persistConn) close() { + pc.lk.Lock() + defer pc.lk.Unlock() + pc.closeLocked() +} + +func (pc *persistConn) closeLocked() { + pc.broken = true + if !pc.closed { + pc.conn.Close() + pc.closed = true + close(pc.closech) + } + pc.mutateHeaderFunc = nil +} + +var portMap = map[string]string{ + "http": "80", + "https": "443", +} + +// canonicalAddr returns url.Host but always with a ":port" suffix +func canonicalAddr(url *url.URL) string { + addr := url.Host + if !hasPort(addr) { + return addr + ":" + portMap[url.Scheme] + } + return addr +} + +// bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most +// once, right before its final (error-producing) Read or Close call +// returns. If earlyCloseFn is non-nil and Close is called before +// io.EOF is seen, earlyCloseFn is called instead of fn, and its +// return value is the return value from Close. +type bodyEOFSignal struct { + body io.ReadCloser + mu sync.Mutex // guards following 4 fields + closed bool // whether Close has been called + rerr error // sticky Read error + fn func(error) // error will be nil on Read io.EOF + earlyCloseFn func() error // optional alt Close func used if io.EOF not seen +} + +func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { + es.mu.Lock() + closed, rerr := es.closed, es.rerr + es.mu.Unlock() + if closed { + return 0, errors.New("http: read on closed response body") + } + if rerr != nil { + return 0, rerr + } + + n, err = es.body.Read(p) + if err != nil { + es.mu.Lock() + defer es.mu.Unlock() + if es.rerr == nil { + es.rerr = err + } + es.condfn(err) + } + return +} + +func (es *bodyEOFSignal) Close() error { + es.mu.Lock() + defer es.mu.Unlock() + if es.closed { + return nil + } + es.closed = true + if es.earlyCloseFn != nil && es.rerr != io.EOF { + return es.earlyCloseFn() + } + err := es.body.Close() + es.condfn(err) + return err +} + +// caller must hold es.mu. +func (es *bodyEOFSignal) condfn(err error) { + if es.fn == nil { + return + } + if err == io.EOF { + err = nil + } + es.fn(err) + es.fn = nil +} + +// gzipReader wraps a response body so it can lazily +// call gzip.NewReader on the first call to Read +type gzipReader struct { + body io.ReadCloser // underlying Response.Body + zr io.Reader // lazily-initialized gzip reader +} + +func (gz *gzipReader) Read(p []byte) (n int, err error) { + if gz.zr == nil { + gz.zr, err = gzip.NewReader(gz.body) + if err != nil { + return 0, err + } + } + return gz.zr.Read(p) +} + +func (gz *gzipReader) Close() error { + return gz.body.Close() +} + +type readerAndCloser struct { + io.Reader + io.Closer +} + +type tlsHandshakeTimeoutError struct{} + +func (tlsHandshakeTimeoutError) Timeout() bool { return true } +func (tlsHandshakeTimeoutError) Temporary() bool { return true } +func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" } + +type noteEOFReader struct { + r io.Reader + sawEOF *bool +} + +func (nr noteEOFReader) Read(p []byte) (n int, err error) { + n, err = nr.r.Read(p) + if err == io.EOF { + *nr.sawEOF = true + } + return +} |