summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2017-06-02 15:20:43 +0200
committerJacob Vosmaer <jacob@gitlab.com>2017-06-02 15:20:43 +0200
commit649060abb13ab7bbb9672f85c6d6550d98bee9b7 (patch)
tree888159953bca27f7a575fe8ffb71d08bbfececaa
parente1e347070e94cf139738166171cc35b21b278dda (diff)
downloadgitlab-shell-649060abb13ab7bbb9672f85c6d6550d98bee9b7.tar.gz
Downgrade grpc to get Go 1.5 compatibilitygo-1.5-compat
-rw-r--r--go/vendor/google.golang.org/grpc/README.md20
-rw-r--r--go/vendor/google.golang.org/grpc/call.go14
-rw-r--r--go/vendor/google.golang.org/grpc/clientconn.go164
-rw-r--r--go/vendor/google.golang.org/grpc/interceptor.go2
-rw-r--r--go/vendor/google.golang.org/grpc/rpc_util.go106
-rw-r--r--go/vendor/google.golang.org/grpc/server.go216
-rw-r--r--go/vendor/google.golang.org/grpc/stream.go70
-rw-r--r--go/vendor/google.golang.org/grpc/transport/control.go18
-rw-r--r--go/vendor/google.golang.org/grpc/transport/handler_server.go15
-rw-r--r--go/vendor/google.golang.org/grpc/transport/http2_client.go276
-rw-r--r--go/vendor/google.golang.org/grpc/transport/http2_server.go273
-rw-r--r--go/vendor/google.golang.org/grpc/transport/http_util.go77
-rw-r--r--go/vendor/google.golang.org/grpc/transport/pre_go16.go51
-rw-r--r--go/vendor/google.golang.org/grpc/transport/transport.go80
-rw-r--r--go/vendor/vendor.json12
15 files changed, 473 insertions, 921 deletions
diff --git a/go/vendor/google.golang.org/grpc/README.md b/go/vendor/google.golang.org/grpc/README.md
index ae0236f..39120c2 100644
--- a/go/vendor/google.golang.org/grpc/README.md
+++ b/go/vendor/google.golang.org/grpc/README.md
@@ -1,4 +1,4 @@
-# gRPC-Go
+#gRPC-Go
[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc)
@@ -16,7 +16,23 @@ $ go get google.golang.org/grpc
Prerequisites
-------------
-This requires Go 1.6 or later.
+This requires Go 1.5 or later.
+
+A note on the version used: significant performance improvements in benchmarks
+of grpc-go have been seen by upgrading the go version from 1.5 to the latest
+1.7.1.
+
+From https://golang.org/doc/install, one way to install the latest version of go is:
+```
+$ GO_VERSION=1.7.1
+$ OS=linux
+$ ARCH=amd64
+$ curl -O https://storage.googleapis.com/golang/go${GO_VERSION}.${OS}-${ARCH}.tar.gz
+$ sudo tar -C /usr/local -xzf go$GO_VERSION.$OS-$ARCH.tar.gz
+$ # Put go on the PATH, keep the usual installation dir
+$ sudo ln -s /usr/local/go/bin/go /usr/bin/go
+$ rm go$GO_VERSION.$OS-$ARCH.tar.gz
+```
Constraints
-----------
diff --git a/go/vendor/google.golang.org/grpc/call.go b/go/vendor/google.golang.org/grpc/call.go
index 13ca5b7..ba17721 100644
--- a/go/vendor/google.golang.org/grpc/call.go
+++ b/go/vendor/google.golang.org/grpc/call.go
@@ -36,14 +36,13 @@ package grpc
import (
"bytes"
"io"
+ "math"
"time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
- "google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
@@ -73,22 +72,19 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
}
}
for {
- if err = recv(p, dopts.codec, stream, dopts.dc, reply, dopts.maxMsgSize, inPayload); err != nil {
+ if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {
if err == io.EOF {
break
}
return
}
}
- if inPayload != nil && err == io.EOF && stream.Status().Code() == codes.OK {
+ if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {
// TODO in the current implementation, inTrailer may be handled before inPayload in some cases.
// Fix the order if necessary.
dopts.copts.StatsHandler.HandleRPC(ctx, inPayload)
}
c.trailerMD = stream.Trailer()
- if peer, ok := peer.FromContext(stream.Context()); ok {
- c.peer = peer
- }
return nil
}
@@ -231,7 +227,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
- if _, ok := status.FromError(err); ok {
+ if _, ok := err.(*rpcError); ok {
return err
}
if err == errConnClosing || err == errConnUnavailable {
@@ -285,6 +281,6 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
put()
put = nil
}
- return stream.Status().Err()
+ return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
}
}
diff --git a/go/vendor/google.golang.org/grpc/clientconn.go b/go/vendor/google.golang.org/grpc/clientconn.go
index aff4f5c..146166a 100644
--- a/go/vendor/google.golang.org/grpc/clientconn.go
+++ b/go/vendor/google.golang.org/grpc/clientconn.go
@@ -36,8 +36,8 @@ package grpc
import (
"errors"
"fmt"
- "math"
"net"
+ "strings"
"sync"
"time"
@@ -45,7 +45,6 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -79,6 +78,7 @@ var (
errConnClosing = errors.New("grpc: the connection is closing")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
+ errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@@ -86,33 +86,23 @@ var (
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
- unaryInt UnaryClientInterceptor
- streamInt StreamClientInterceptor
- codec Codec
- cp Compressor
- dc Decompressor
- bs backoffStrategy
- balancer Balancer
- block bool
- insecure bool
- timeout time.Duration
- scChan <-chan ServiceConfig
- copts transport.ConnectOptions
- maxMsgSize int
-}
-
-const defaultClientMaxMsgSize = math.MaxInt32
+ unaryInt UnaryClientInterceptor
+ streamInt StreamClientInterceptor
+ codec Codec
+ cp Compressor
+ dc Decompressor
+ bs backoffStrategy
+ balancer Balancer
+ block bool
+ insecure bool
+ timeout time.Duration
+ scChan <-chan ServiceConfig
+ copts transport.ConnectOptions
+}
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
-// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
-func WithMaxMsgSize(s int) DialOption {
- return func(o *dialOptions) {
- o.maxMsgSize = s
- }
-}
-
// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
func WithCodec(c Codec) DialOption {
return func(o *dialOptions) {
@@ -259,13 +249,6 @@ func WithUserAgent(s string) DialOption {
}
}
-// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
-func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
- return func(o *dialOptions) {
- o.copts.KeepaliveParams = kp
- }
-}
-
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
@@ -280,15 +263,6 @@ func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
}
}
-// WithAuthority returns a DialOption that specifies the value to be used as
-// the :authority pseudo-header. This value only works with WithInsecure and
-// has no effect if TransportCredentials are present.
-func WithAuthority(a string) DialOption {
- return func(o *dialOptions) {
- o.copts.Authority = a
- }
-}
-
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
@@ -305,19 +279,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[Address]*addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
- cc.dopts.maxMsgSize = defaultClientMaxMsgSize
for _, opt := range opts {
opt(&cc.dopts)
}
- cc.mkp = cc.dopts.copts.KeepaliveParams
-
- grpcUA := "grpc-go/" + Version
- if cc.dopts.copts.UserAgent != "" {
- cc.dopts.copts.UserAgent += " " + grpcUA
- } else {
- cc.dopts.copts.UserAgent = grpcUA
- }
-
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
@@ -357,18 +321,24 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
- } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
- cc.authority = cc.dopts.copts.Authority
} else {
- cc.authority = target
+ colonPos := strings.LastIndex(target, ":")
+ if colonPos == -1 {
+ colonPos = len(target)
+ }
+ cc.authority = target[:colonPos]
}
+ var ok bool
waitC := make(chan error, 1)
go func() {
- defer close(waitC)
+ var addrs []Address
if cc.dopts.balancer == nil && cc.sc.LB != nil {
cc.dopts.balancer = cc.sc.LB
}
- if cc.dopts.balancer != nil {
+ if cc.dopts.balancer == nil {
+ // Connect to target directly if balancer is nil.
+ addrs = append(addrs, Address{Addr: target})
+ } else {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
@@ -381,22 +351,24 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return
}
ch := cc.dopts.balancer.Notify()
- if ch != nil {
- if cc.dopts.block {
- doneChan := make(chan struct{})
- go cc.lbWatcher(doneChan)
- <-doneChan
- } else {
- go cc.lbWatcher(nil)
+ if ch == nil {
+ // There is no name resolver installed.
+ addrs = append(addrs, Address{Addr: target})
+ } else {
+ addrs, ok = <-ch
+ if !ok || len(addrs) == 0 {
+ waitC <- errNoAddr
+ return
}
- return
}
}
- // No balancer, or no resolver within the balancer. Connect directly.
- if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
- waitC <- err
- return
+ for _, a := range addrs {
+ if err := cc.resetAddrConn(a, false, nil); err != nil {
+ waitC <- err
+ return
+ }
}
+ close(waitC)
}()
select {
case <-ctx.Done():
@@ -407,10 +379,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}
+ // If balancer is nil or balancer.Notify() is nil, ok will be false here.
+ // The lbWatcher goroutine will not be created.
+ if ok {
+ go cc.lbWatcher()
+ }
+
if cc.dopts.scChan != nil {
go cc.scWatcher()
}
-
return cc, nil
}
@@ -459,14 +436,9 @@ type ClientConn struct {
mu sync.RWMutex
sc ServiceConfig
conns map[Address]*addrConn
- // Keepalive parameter can be udated if a GoAway is received.
- mkp keepalive.ClientParameters
}
-// lbWatcher watches the Notify channel of the balancer in cc and manages
-// connections accordingly. If doneChan is not nil, it is closed after the
-// first successfull connection is made.
-func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
+func (cc *ClientConn) lbWatcher() {
for addrs := range cc.dopts.balancer.Notify() {
var (
add []Address // Addresses need to setup connections.
@@ -493,15 +465,7 @@ func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
}
cc.mu.Unlock()
for _, a := range add {
- if doneChan != nil {
- err := cc.resetAddrConn(a, true, nil)
- if err == nil {
- close(doneChan)
- doneChan = nil
- }
- } else {
- cc.resetAddrConn(a, false, nil)
- }
+ cc.resetAddrConn(a, true, nil)
}
for _, c := range del {
c.tearDown(errConnDrain)
@@ -530,15 +494,12 @@ func (cc *ClientConn) scWatcher() {
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
-func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
+func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
dopts: cc.dopts,
}
- cc.mu.RLock()
- ac.dopts.copts.KeepaliveParams = cc.mkp
- cc.mu.RUnlock()
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
@@ -583,7 +544,8 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
stale.tearDown(tearDownErr)
}
}
- if block {
+ // skipWait may overwrite the decision in ac.dopts.block.
+ if ac.dopts.block && !skipWait {
if err := ac.resetTransport(false); err != nil {
if err != errConnClosing {
// Tear down ac and delete it from cc.conns.
@@ -720,20 +682,6 @@ type addrConn struct {
tearDownErr error
}
-// adjustParams updates parameters used to create transports upon
-// receiving a GoAway.
-func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
- switch r {
- case transport.TooManyPings:
- v := 2 * ac.dopts.copts.KeepaliveParams.Time
- ac.cc.mu.Lock()
- if v > ac.cc.mkp.Time {
- ac.cc.mkp.Time = v
- }
- ac.cc.mu.Unlock()
- }
-}
-
// printf records an event in ac's event log, unless ac has been closed.
// REQUIRES ac.mu is held.
func (ac *addrConn) printf(format string, a ...interface{}) {
@@ -818,8 +766,6 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
Metadata: ac.addr.Metadata,
}
newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
- // Don't call cancel in success path due to a race in Go 1.6:
- // https://github.com/golang/go/issues/15078.
if err != nil {
cancel()
@@ -890,7 +836,6 @@ func (ac *addrConn) transportMonitor() {
}
return
case <-t.GoAway():
- ac.adjustParams(t.GetGoAwayReason())
// If GoAway happens without any network I/O error, ac is closed without shutting down the
// underlying transport (the transport will be closed when all the pending RPCs finished or
// failed.).
@@ -899,9 +844,9 @@ func (ac *addrConn) transportMonitor() {
// In both cases, a new ac is created.
select {
case <-t.Error():
- ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
+ ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
default:
- ac.cc.resetAddrConn(ac.addr, false, errConnDrain)
+ ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
}
return
case <-t.Error():
@@ -910,8 +855,7 @@ func (ac *addrConn) transportMonitor() {
t.Close()
return
case <-t.GoAway():
- ac.adjustParams(t.GetGoAwayReason())
- ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
+ ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
return
default:
}
diff --git a/go/vendor/google.golang.org/grpc/interceptor.go b/go/vendor/google.golang.org/grpc/interceptor.go
index a692161..8d932ef 100644
--- a/go/vendor/google.golang.org/grpc/interceptor.go
+++ b/go/vendor/google.golang.org/grpc/interceptor.go
@@ -40,7 +40,7 @@ import (
// UnaryInvoker is called by UnaryClientInterceptor to complete RPCs.
type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error
-// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. invoker is the handler to complete the RPC
+// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. inovker is the handler to complete the RPC
// and it is the responsibility of the interceptor to call it.
// This is the EXPERIMENTAL API.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
diff --git a/go/vendor/google.golang.org/grpc/rpc_util.go b/go/vendor/google.golang.org/grpc/rpc_util.go
index db56a88..2619d39 100644
--- a/go/vendor/google.golang.org/grpc/rpc_util.go
+++ b/go/vendor/google.golang.org/grpc/rpc_util.go
@@ -37,6 +37,7 @@ import (
"bytes"
"compress/gzip"
"encoding/binary"
+ "fmt"
"io"
"io/ioutil"
"math"
@@ -47,9 +48,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
- "google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
- "google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
@@ -141,7 +140,6 @@ type callInfo struct {
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
- peer *peer.Peer
traceInfo traceInfo // in trace.go
}
@@ -185,22 +183,12 @@ func Trailer(md *metadata.MD) CallOption {
})
}
-// Peer returns a CallOption that retrieves peer information for a
-// unary RPC.
-func Peer(peer *peer.Peer) CallOption {
- return afterCall(func(c *callInfo) {
- if c.peer != nil {
- *peer = *c.peer
- }
- })
-}
-
// FailFast configures the action to take when an RPC is attempted on broken
// connections or unreachable servers. If failfast is true, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
// connection is available (or the call is canceled or times out) and will retry
// the call if it fails due to a transient error. Please refer to
-// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md. Note: failFast is default to true.
+// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md
func FailFast(failFast bool) CallOption {
return beforeCall(func(c *callInfo) error {
c.failFast = failFast
@@ -372,57 +360,88 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
return nil
}
+// rpcError defines the status from an RPC.
+type rpcError struct {
+ code codes.Code
+ desc string
+}
+
+func (e *rpcError) Error() string {
+ return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
+}
+
// Code returns the error code for err if it was produced by the rpc system.
// Otherwise, it returns codes.Unknown.
-//
-// Deprecated; use status.FromError and Code method instead.
func Code(err error) codes.Code {
- if s, ok := status.FromError(err); ok {
- return s.Code()
+ if err == nil {
+ return codes.OK
+ }
+ if e, ok := err.(*rpcError); ok {
+ return e.code
}
return codes.Unknown
}
// ErrorDesc returns the error description of err if it was produced by the rpc system.
// Otherwise, it returns err.Error() or empty string when err is nil.
-//
-// Deprecated; use status.FromError and Message method instead.
func ErrorDesc(err error) string {
- if s, ok := status.FromError(err); ok {
- return s.Message()
+ if err == nil {
+ return ""
+ }
+ if e, ok := err.(*rpcError); ok {
+ return e.desc
}
return err.Error()
}
// Errorf returns an error containing an error code and a description;
// Errorf returns nil if c is OK.
-//
-// Deprecated; use status.Errorf instead.
func Errorf(c codes.Code, format string, a ...interface{}) error {
- return status.Errorf(c, format, a...)
+ if c == codes.OK {
+ return nil
+ }
+ return &rpcError{
+ code: c,
+ desc: fmt.Sprintf(format, a...),
+ }
}
-// toRPCErr converts an error into an error from the status package.
+// toRPCErr converts an error into a rpcError.
func toRPCErr(err error) error {
- if _, ok := status.FromError(err); ok {
- return err
- }
switch e := err.(type) {
+ case *rpcError:
+ return err
case transport.StreamError:
- return status.Error(e.Code, e.Desc)
+ return &rpcError{
+ code: e.Code,
+ desc: e.Desc,
+ }
case transport.ConnectionError:
- return status.Error(codes.Internal, e.Desc)
+ return &rpcError{
+ code: codes.Internal,
+ desc: e.Desc,
+ }
default:
switch err {
case context.DeadlineExceeded:
- return status.Error(codes.DeadlineExceeded, err.Error())
+ return &rpcError{
+ code: codes.DeadlineExceeded,
+ desc: err.Error(),
+ }
case context.Canceled:
- return status.Error(codes.Canceled, err.Error())
+ return &rpcError{
+ code: codes.Canceled,
+ desc: err.Error(),
+ }
case ErrClientConnClosing:
- return status.Error(codes.FailedPrecondition, err.Error())
+ return &rpcError{
+ code: codes.FailedPrecondition,
+ desc: err.Error(),
+ }
}
+
}
- return status.Error(codes.Unknown, err.Error())
+ return Errorf(codes.Unknown, "%v", err)
}
// convertCode converts a standard Go error into its canonical code. Note that
@@ -467,17 +486,17 @@ type MethodConfig struct {
// then the other will be used. If neither is set, then the RPC has no deadline.
Timeout time.Duration
// MaxReqSize is the maximum allowed payload size for an individual request in a
- // stream (client->server) in bytes. The size which is measured is the serialized
- // payload after per-message compression (but before stream compression) in bytes.
- // The actual value used is the minumum of the value specified here and the value set
- // by the application via the gRPC client API. If either one is not set, then the other
- // will be used. If neither is set, then the built-in default is used.
+ // stream (client->server) in bytes. The size which is measured is the serialized,
+ // uncompressed payload in bytes. The actual value used is the minumum of the value
+ // specified here and the value set by the application via the gRPC client API. If
+ // either one is not set, then the other will be used. If neither is set, then the
+ // built-in default is used.
// TODO: support this.
- MaxReqSize uint32
+ MaxReqSize uint64
// MaxRespSize is the maximum allowed payload size for an individual response in a
// stream (server->client) in bytes.
// TODO: support this.
- MaxRespSize uint32
+ MaxRespSize uint64
}
// ServiceConfig is provided by the service provider and contains parameters for how
@@ -498,6 +517,3 @@ type ServiceConfig struct {
// requires a synchronised update of grpc-go and protoc-gen-go. This constant
// should not be referenced from any other code.
const SupportPackageIsVersion4 = true
-
-// Version is the current grpc version.
-const Version = "1.3.0-dev"
diff --git a/go/vendor/google.golang.org/grpc/server.go b/go/vendor/google.golang.org/grpc/server.go
index b15f71c..985226d 100644
--- a/go/vendor/google.golang.org/grpc/server.go
+++ b/go/vendor/google.golang.org/grpc/server.go
@@ -53,10 +53,8 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
- "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
- "google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
"google.golang.org/grpc/transport"
)
@@ -118,9 +116,6 @@ type options struct {
statsHandler stats.Handler
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
- unknownStreamDesc *StreamDesc
- keepaliveParams keepalive.ServerParameters
- keepalivePolicy keepalive.EnforcementPolicy
}
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
@@ -128,20 +123,6 @@ var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size l
// A ServerOption sets options.
type ServerOption func(*options)
-// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
-func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
- return func(o *options) {
- o.keepaliveParams = kp
- }
-}
-
-// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
-func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
- return func(o *options) {
- o.keepalivePolicy = kep
- }
-}
-
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
@@ -227,24 +208,6 @@ func StatsHandler(h stats.Handler) ServerOption {
}
}
-// UnknownServiceHandler returns a ServerOption that allows for adding a custom
-// unknown service handler. The provided method is a bidi-streaming RPC service
-// handler that will be invoked instead of returning the the "unimplemented" gRPC
-// error whenever a request is received for an unregistered service or method.
-// The handling function has full access to the Context of the request and the
-// stream, and the invocation passes through interceptors.
-func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
- return func(o *options) {
- o.unknownStreamDesc = &StreamDesc{
- StreamName: "unknown_service_handler",
- Handler: streamHandler,
- // We need to assume that the users of the streamHandler will want to use both.
- ClientStreams: true,
- ServerStreams: true,
- }
- }
-}
-
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@@ -483,12 +446,10 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
// transport.NewServerTransport).
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
config := &transport.ServerConfig{
- MaxStreams: s.opts.maxConcurrentStreams,
- AuthInfo: authInfo,
- InTapHandle: s.opts.inTapHandle,
- StatsHandler: s.opts.statsHandler,
- KeepaliveParams: s.opts.keepaliveParams,
- KeepalivePolicy: s.opts.keepalivePolicy,
+ MaxStreams: s.opts.maxConcurrentStreams,
+ AuthInfo: authInfo,
+ InTapHandle: s.opts.inTapHandle,
+ StatsHandler: s.opts.statsHandler,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
@@ -672,7 +633,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
stream.SetSendCompress(s.opts.cp.Type())
}
p := &parser{r: stream}
- for { // TODO: delete
+ for {
pf, req, err := p.recvMsg(s.opts.maxMsgSize)
if err == io.EOF {
// The entire stream is done (for unary RPC only).
@@ -682,37 +643,36 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
if err != nil {
- if st, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, st); e != nil {
+ switch err := err.(type) {
+ case *rpcError:
+ if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
- } else {
- switch st := err.(type) {
- case transport.ConnectionError:
- // Nothing to do here.
- case transport.StreamError:
- if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- default:
- panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
+ case transport.ConnectionError:
+ // Nothing to do here.
+ case transport.StreamError:
+ if e := t.WriteStatus(stream, err.Code, err.Desc); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
+ default:
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
}
return err
}
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
- if st, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, st); e != nil {
+ switch err := err.(type) {
+ case *rpcError:
+ if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
return err
+ default:
+ if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
}
- if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
-
- // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
}
var inPayload *stats.InPayload
if sh != nil {
@@ -720,6 +680,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
RecvTime: time.Now(),
}
}
+ statusCode := codes.OK
+ statusDesc := ""
df := func(v interface{}) error {
if inPayload != nil {
inPayload.WireLength = len(req)
@@ -728,16 +690,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
var err error
req, err = s.opts.dc.Do(bytes.NewReader(req))
if err != nil {
+ if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
+ }
return Errorf(codes.Internal, err.Error())
}
}
if len(req) > s.opts.maxMsgSize {
// TODO: Revisit the error code. Currently keep it consistent with
// java implementation.
- return status.Errorf(codes.Internal, "grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
+ statusCode = codes.Internal
+ statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
}
if err := s.opts.codec.Unmarshal(req, v); err != nil {
- return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
+ return err
}
if inPayload != nil {
inPayload.Payload = v
@@ -752,20 +718,21 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
if appErr != nil {
- appStatus, ok := status.FromError(appErr)
- if !ok {
- // Convert appErr if it is not a grpc status error.
- appErr = status.Error(convertCode(appErr), appErr.Error())
- appStatus, _ = status.FromError(appErr)
+ if err, ok := appErr.(*rpcError); ok {
+ statusCode = err.code
+ statusDesc = err.desc
+ } else {
+ statusCode = convertCode(appErr)
+ statusDesc = appErr.Error()
}
- if trInfo != nil {
- trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
+ if trInfo != nil && statusCode != codes.OK {
+ trInfo.tr.LazyLog(stringer(statusDesc), true)
trInfo.tr.SetError()
}
- if e := t.WriteStatus(stream, appStatus); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
+ if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
}
- return appErr
+ return Errorf(statusCode, statusDesc)
}
if trInfo != nil {
trInfo.tr.LazyLog(stringer("OK"), false)
@@ -775,35 +742,26 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Delay: false,
}
if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
- if err == io.EOF {
- // The entire stream is done (for unary RPC only).
- return err
- }
- if s, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, s); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
- }
- } else {
- switch st := err.(type) {
- case transport.ConnectionError:
- // Nothing to do here.
- case transport.StreamError:
- if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- default:
- panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
- }
+ switch err := err.(type) {
+ case transport.ConnectionError:
+ // Nothing to do here.
+ case transport.StreamError:
+ statusCode = err.Code
+ statusDesc = err.Desc
+ default:
+ statusCode = codes.Unknown
+ statusDesc = err.Error()
}
return err
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
}
- // TODO: Should we be logging if writing status failed here, like above?
- // Should the logging be in WriteStatus? Should we ignore the WriteStatus
- // error or allow the stats handler to see it?
- return t.WriteStatus(stream, status.New(codes.OK, ""))
+ errWrite := t.WriteStatus(stream, statusCode, statusDesc)
+ if statusCode != codes.OK {
+ return Errorf(statusCode, statusDesc)
+ }
+ return errWrite
}
}
@@ -857,47 +815,43 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}()
}
var appErr error
- var server interface{}
- if srv != nil {
- server = srv.server
- }
if s.opts.streamInt == nil {
- appErr = sd.Handler(server, ss)
+ appErr = sd.Handler(srv.server, ss)
} else {
info := &StreamServerInfo{
FullMethod: stream.Method(),
IsClientStream: sd.ClientStreams,
IsServerStream: sd.ServerStreams,
}
- appErr = s.opts.streamInt(server, ss, info, sd.Handler)
+ appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
}
if appErr != nil {
- appStatus, ok := status.FromError(appErr)
- if !ok {
- switch err := appErr.(type) {
- case transport.StreamError:
- appStatus = status.New(err.Code, err.Desc)
- default:
- appStatus = status.New(convertCode(appErr), appErr.Error())
- }
- appErr = appStatus.Err()
- }
- if trInfo != nil {
- ss.mu.Lock()
- ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
- ss.trInfo.tr.SetError()
- ss.mu.Unlock()
+ if err, ok := appErr.(*rpcError); ok {
+ ss.statusCode = err.code
+ ss.statusDesc = err.desc
+ } else if err, ok := appErr.(transport.StreamError); ok {
+ ss.statusCode = err.Code
+ ss.statusDesc = err.Desc
+ } else {
+ ss.statusCode = convertCode(appErr)
+ ss.statusDesc = appErr.Error()
}
- t.WriteStatus(ss.s, appStatus)
- // TODO: Should we log an error from WriteStatus here and below?
- return appErr
}
if trInfo != nil {
ss.mu.Lock()
- ss.trInfo.tr.LazyLog(stringer("OK"), false)
+ if ss.statusCode != codes.OK {
+ ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
+ ss.trInfo.tr.SetError()
+ } else {
+ ss.trInfo.tr.LazyLog(stringer("OK"), false)
+ }
ss.mu.Unlock()
}
- return t.WriteStatus(ss.s, status.New(codes.OK, ""))
+ errWrite := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
+ if ss.statusCode != codes.OK {
+ return Errorf(ss.statusCode, ss.statusDesc)
+ }
+ return errWrite
}
@@ -913,7 +867,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
- if err := t.WriteStatus(stream, status.New(codes.InvalidArgument, errDesc)); err != nil {
+ if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
@@ -929,16 +883,12 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
method := sm[pos+1:]
srv, ok := s.m[service]
if !ok {
- if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
- s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
- return
- }
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("unknown service %v", service)
- if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
+ if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
@@ -963,12 +913,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
trInfo.tr.SetError()
}
- if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
- s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
- return
- }
errDesc := fmt.Sprintf("unknown method %v", method)
- if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
+ if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
diff --git a/go/vendor/google.golang.org/grpc/stream.go b/go/vendor/google.golang.org/grpc/stream.go
index ecb1a31..bb468dc 100644
--- a/go/vendor/google.golang.org/grpc/stream.go
+++ b/go/vendor/google.golang.org/grpc/stream.go
@@ -37,6 +37,7 @@ import (
"bytes"
"errors"
"io"
+ "math"
"sync"
"time"
@@ -45,7 +46,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
- "google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
@@ -178,7 +178,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
- if _, ok := status.FromError(err); ok {
+ if _, ok := err.(*rpcError); ok {
return nil, err
}
if err == errConnClosing || err == errConnUnavailable {
@@ -208,14 +208,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
break
}
cs := &clientStream{
- opts: opts,
- c: c,
- desc: desc,
- codec: cc.dopts.codec,
- cp: cc.dopts.cp,
- dc: cc.dopts.dc,
- maxMsgSize: cc.dopts.maxMsgSize,
- cancel: cancel,
+ opts: opts,
+ c: c,
+ desc: desc,
+ codec: cc.dopts.codec,
+ cp: cc.dopts.cp,
+ dc: cc.dopts.dc,
+ cancel: cancel,
put: put,
t: t,
@@ -240,7 +239,11 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
case <-s.Done():
// TODO: The trace of the RPC is terminated here when there is no pending
// I/O, which is probably not the optimal solution.
- cs.finish(s.Status().Err())
+ if s.StatusCode() == codes.OK {
+ cs.finish(nil)
+ } else {
+ cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
+ }
cs.closeTransportStream(nil)
case <-s.GoAway():
cs.finish(errConnDrain)
@@ -256,18 +259,17 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream.
type clientStream struct {
- opts []CallOption
- c callInfo
- t transport.ClientTransport
- s *transport.Stream
- p *parser
- desc *StreamDesc
- codec Codec
- cp Compressor
- cbuf *bytes.Buffer
- dc Decompressor
- maxMsgSize int
- cancel context.CancelFunc
+ opts []CallOption
+ c callInfo
+ t transport.ClientTransport
+ s *transport.Stream
+ p *parser
+ desc *StreamDesc
+ codec Codec
+ cp Compressor
+ cbuf *bytes.Buffer
+ dc Decompressor
+ cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created.
@@ -380,7 +382,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
Client: true,
}
}
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, inPayload)
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
@@ -403,17 +405,17 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
}
// Special handling for client streaming rpc.
// This recv expects EOF or errors, so we don't collect inPayload.
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, nil)
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil)
cs.closeTransportStream(err)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
if err == io.EOF {
- if se := cs.s.Status().Err(); se != nil {
- return se
+ if cs.s.StatusCode() == codes.OK {
+ cs.finish(err)
+ return nil
}
- cs.finish(err)
- return nil
+ return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
}
return toRPCErr(err)
}
@@ -421,11 +423,11 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
cs.closeTransportStream(err)
}
if err == io.EOF {
- if statusErr := cs.s.Status().Err(); statusErr != nil {
- return statusErr
+ if cs.s.StatusCode() == codes.OK {
+ // Returns io.EOF to indicate the end of the stream.
+ return
}
- // Returns io.EOF to indicate the end of the stream.
- return
+ return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
}
return toRPCErr(err)
}
@@ -517,6 +519,8 @@ type serverStream struct {
dc Decompressor
cbuf *bytes.Buffer
maxMsgSize int
+ statusCode codes.Code
+ statusDesc string
trInfo *traceInfo
statsHandler stats.Handler
diff --git a/go/vendor/google.golang.org/grpc/transport/control.go b/go/vendor/google.golang.org/grpc/transport/control.go
index 8d29aee..2586cba 100644
--- a/go/vendor/google.golang.org/grpc/transport/control.go
+++ b/go/vendor/google.golang.org/grpc/transport/control.go
@@ -35,9 +35,7 @@ package transport
import (
"fmt"
- "math"
"sync"
- "time"
"golang.org/x/net/http2"
)
@@ -46,18 +44,8 @@ const (
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
- initialWindowSize = defaultWindowSize // for an RPC
- initialConnWindowSize = defaultWindowSize * 16 // for a connection
- infinity = time.Duration(math.MaxInt64)
- defaultClientKeepaliveTime = infinity
- defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
- defaultMaxStreamsClient = 100
- defaultMaxConnectionIdle = infinity
- defaultMaxConnectionAge = infinity
- defaultMaxConnectionAgeGrace = infinity
- defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
- defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
- defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
+ initialWindowSize = defaultWindowSize // for an RPC
+ initialConnWindowSize = defaultWindowSize * 16 // for a connection
)
// The following defines various control items which could flow through
@@ -85,8 +73,6 @@ type resetStream struct {
func (*resetStream) item() {}
type goAway struct {
- code http2.ErrCode
- debugData []byte
}
func (*goAway) item() {}
diff --git a/go/vendor/google.golang.org/grpc/transport/handler_server.go b/go/vendor/google.golang.org/grpc/transport/handler_server.go
index 28c9ce0..10b6dc0 100644
--- a/go/vendor/google.golang.org/grpc/transport/handler_server.go
+++ b/go/vendor/google.golang.org/grpc/transport/handler_server.go
@@ -53,7 +53,6 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
- "google.golang.org/grpc/status"
)
// NewServerHandlerTransport returns a ServerTransport handling gRPC
@@ -183,7 +182,7 @@ func (ht *serverHandlerTransport) do(fn func()) error {
}
}
-func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
+func (ht *serverHandlerTransport) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
err := ht.do(func() {
ht.writeCommonHeaders(s)
@@ -193,13 +192,10 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
ht.rw.(http.Flusher).Flush()
h := ht.rw.Header()
- h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
- if m := st.Message(); m != "" {
- h.Set("Grpc-Message", encodeGrpcMessage(m))
+ h.Set("Grpc-Status", fmt.Sprintf("%d", statusCode))
+ if statusDesc != "" {
+ h.Set("Grpc-Message", encodeGrpcMessage(statusDesc))
}
-
- // TODO: Support Grpc-Status-Details-Bin
-
if md := s.Trailer(); len(md) > 0 {
for k, vv := range md {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
@@ -238,7 +234,6 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
// and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
h.Add("Trailer", "Grpc-Status")
h.Add("Trailer", "Grpc-Message")
- // TODO: Support Grpc-Status-Details-Bin
if s.sendCompress != "" {
h.Set("Grpc-Encoding", s.sendCompress)
@@ -319,7 +314,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
if req.TLS != nil {
pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
}
- ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
+ ctx = metadata.NewContext(ctx, ht.headerMD)
ctx = peer.NewContext(ctx, pr)
s.ctx = newContextWithStream(ctx, s)
s.dec = &recvBufferReader{ctx: s.ctx, recv: s.buf}
diff --git a/go/vendor/google.golang.org/grpc/transport/http2_client.go b/go/vendor/google.golang.org/grpc/transport/http2_client.go
index 486d4a1..892f8ba 100644
--- a/go/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/go/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -35,12 +35,12 @@ package transport
import (
"bytes"
+ "fmt"
"io"
"math"
"net"
"strings"
"sync"
- "sync/atomic"
"time"
"golang.org/x/net/context"
@@ -49,11 +49,9 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
- "google.golang.org/grpc/status"
)
// http2Client implements the ClientTransport interface with HTTP2.
@@ -82,8 +80,6 @@ type http2Client struct {
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
- // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
- awakenKeepalive chan struct{}
framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
@@ -103,11 +99,6 @@ type http2Client struct {
creds []credentials.PerRPCCredentials
- // Boolean to keep track of reading activity on transport.
- // 1 is true and 0 is false.
- activity uint32 // Accessed atomically.
- kp keepalive.ClientParameters
-
statsHandler stats.Handler
mu sync.Mutex // guard the following variables
@@ -121,9 +112,6 @@ type http2Client struct {
goAwayID uint32
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
prevGoAwayID uint32
- // goAwayReason records the http2.ErrCode and debug data received with the
- // GoAway frame.
- goAwayReason GoAwayReason
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@@ -190,19 +178,15 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
return nil, connectionErrorf(temp, err, "transport: %v", err)
}
}
- kp := opts.KeepaliveParams
- // Validate keepalive parameters.
- if kp.Time == 0 {
- kp.Time = defaultClientKeepaliveTime
- }
- if kp.Timeout == 0 {
- kp.Timeout = defaultClientKeepaliveTimeout
+ ua := primaryUA
+ if opts.UserAgent != "" {
+ ua = opts.UserAgent + " " + ua
}
var buf bytes.Buffer
t := &http2Client{
ctx: ctx,
target: addr.Addr,
- userAgent: opts.UserAgent,
+ userAgent: ua,
md: addr.Metadata,
conn: conn,
remoteAddr: conn.RemoteAddr(),
@@ -214,7 +198,6 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
shutdownChan: make(chan struct{}),
errorChan: make(chan struct{}),
goAway: make(chan struct{}),
- awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
@@ -225,15 +208,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
state: reachable,
activeStreams: make(map[uint32]*Stream),
creds: opts.PerRPCCredentials,
- maxStreams: defaultMaxStreamsClient,
- streamsQuota: newQuotaPool(defaultMaxStreamsClient),
+ maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize,
- kp: kp,
statsHandler: opts.StatsHandler,
}
- // Make sure awakenKeepalive can't be written upon.
- // keepalive routine will make it writable, if need be.
- t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
@@ -278,9 +256,6 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
}
go t.controller()
- if t.kp.Time != infinity {
- go t.keepalive()
- }
t.writableChan <- 0
return t, nil
}
@@ -314,7 +289,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
return s
}
-// NewStream creates a stream and registers it into the transport as "active"
+// NewStream creates a stream and register it into the transport as "active"
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
pr := &peer.Peer{
@@ -362,18 +337,21 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Unlock()
return nil, ErrConnClosing
}
+ checkStreamsQuota := t.streamsQuota != nil
t.mu.Unlock()
- sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
- if err != nil {
- return nil, err
- }
- // Returns the quota balance back.
- if sq > 1 {
- t.streamsQuota.add(sq - 1)
+ if checkStreamsQuota {
+ sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
+ if err != nil {
+ return nil, err
+ }
+ // Returns the quota balance back.
+ if sq > 1 {
+ t.streamsQuota.add(sq - 1)
+ }
}
if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
// Return the quota back now because there is no stream returned to the caller.
- if _, ok := err.(StreamError); ok {
+ if _, ok := err.(StreamError); ok && checkStreamsQuota {
t.streamsQuota.add(1)
}
return nil, err
@@ -381,7 +359,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Lock()
if t.state == draining {
t.mu.Unlock()
- t.streamsQuota.add(1)
+ if checkStreamsQuota {
+ t.streamsQuota.add(1)
+ }
// Need to make t writable again so that the rpc in flight can still proceed.
t.writableChan <- 0
return nil, ErrStreamDrain
@@ -393,17 +373,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
s := t.newStream(ctx, callHdr)
s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
- // If the number of active streams change from 0 to 1, then check if keepalive
- // has gone dormant. If so, wake it up.
- if len(t.activeStreams) == 1 {
- select {
- case t.awakenKeepalive <- struct{}{}:
- t.framer.writePing(false, false, [8]byte{})
- default:
- }
- }
+ // This stream is not counted when applySetings(...) initialize t.streamsQuota.
+ // Reset t.streamsQuota to the right value.
+ var reset bool
+ if !checkStreamsQuota && t.streamsQuota != nil {
+ reset = true
+ }
t.mu.Unlock()
+ if reset {
+ t.streamsQuota.add(-1)
+ }
// HPACK encodes various headers. Note that once WriteField(...) is
// called, the corresponding headers/continuation frame has to be sent
@@ -435,7 +415,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
hasMD bool
endHeaders bool
)
- if md, ok := metadata.FromOutgoingContext(ctx); ok {
+ if md, ok := metadata.FromContext(ctx); ok {
hasMD = true
for k, v := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
@@ -511,11 +491,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
// CloseStream clears the footprint of a stream when the stream is not needed any more.
// This must not be executed in reader's goroutine.
func (t *http2Client) CloseStream(s *Stream, err error) {
+ var updateStreams bool
t.mu.Lock()
if t.activeStreams == nil {
t.mu.Unlock()
return
}
+ if t.streamsQuota != nil {
+ updateStreams = true
+ }
delete(t.activeStreams, s.id)
if t.state == draining && len(t.activeStreams) == 0 {
// The transport is draining and s is the last live stream on t.
@@ -524,27 +508,10 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
return
}
t.mu.Unlock()
- // rstStream is true in case the stream is being closed at the client-side
- // and the server needs to be intimated about it by sending a RST_STREAM
- // frame.
- // To make sure this frame is written to the wire before the headers of the
- // next stream waiting for streamsQuota, we add to streamsQuota pool only
- // after having acquired the writableChan to send RST_STREAM out (look at
- // the controller() routine).
- var rstStream bool
- var rstError http2.ErrCode
- defer func() {
- // In case, the client doesn't have to send RST_STREAM to server
- // we can safely add back to streamsQuota pool now.
- if !rstStream {
- t.streamsQuota.add(1)
- return
- }
- t.controlBuf.put(&resetStream{s.id, rstError})
- }()
+ if updateStreams {
+ t.streamsQuota.add(1)
+ }
s.mu.Lock()
- rstStream = s.rstStream
- rstError = s.rstError
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
t.controlBuf.put(&windowUpdate{0, n})
@@ -560,9 +527,8 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
}
s.state = streamDone
s.mu.Unlock()
- if _, ok := err.(StreamError); ok {
- rstStream = true
- rstError = http2.ErrCodeCancel
+ if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
+ t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
}
}
@@ -776,7 +742,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
}
func (t *http2Client) handleData(f *http2.DataFrame) {
- size := f.Header().Length
+ size := len(f.Data())
if err := t.fc.onData(uint32(size)); err != nil {
t.notifyError(connectionErrorf(true, err, "%v", err))
return
@@ -790,11 +756,6 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
return
}
if size > 0 {
- if f.Header().Flags.Has(http2.FlagDataPadded) {
- if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- }
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
@@ -805,27 +766,22 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
return
}
if err := s.fc.onData(uint32(size)); err != nil {
- s.rstStream = true
- s.rstError = http2.ErrCodeFlowControl
- s.finish(status.New(codes.Internal, err.Error()))
+ s.state = streamDone
+ s.statusCode = codes.Internal
+ s.statusDesc = err.Error()
+ close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
+ t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
- if f.Header().Flags.Has(http2.FlagDataPadded) {
- if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
- }
- }
s.mu.Unlock()
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
- if len(f.Data()) > 0 {
- data := make([]byte, len(f.Data()))
- copy(data, f.Data())
- s.write(recvMsg{data: data})
- }
+ data := make([]byte, size)
+ copy(data, f.Data())
+ s.write(recvMsg{data: data})
}
// The server has closed the stream without sending trailers. Record that
// the read direction is closed, and set the status appropriately.
@@ -835,7 +791,10 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
s.mu.Unlock()
return
}
- s.finish(status.New(codes.Internal, "server closed the stream without sending trailers"))
+ s.state = streamDone
+ s.statusCode = codes.Internal
+ s.statusDesc = "server closed the stream without sending trailers"
+ close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@@ -851,16 +810,18 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
s.mu.Unlock()
return
}
+ s.state = streamDone
if !s.headerDone {
close(s.headerChan)
s.headerDone = true
}
- statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)]
+ s.statusCode, ok = http2ErrConvTab[http2.ErrCode(f.ErrCode)]
if !ok {
grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
- statusCode = codes.Unknown
+ s.statusCode = codes.Unknown
}
- s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %d", f.ErrCode))
+ s.statusDesc = fmt.Sprintf("stream terminated by RST_STREAM with error code: %d", f.ErrCode)
+ close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@@ -888,9 +849,6 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
}
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
- if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
- grpclog.Printf("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
- }
t.mu.Lock()
if t.state == reachable || t.state == draining {
if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
@@ -912,7 +870,6 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.mu.Unlock()
return
default:
- t.setGoAwayReason(f)
}
t.goAwayID = f.LastStreamID
close(t.goAway)
@@ -920,26 +877,6 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.mu.Unlock()
}
-// setGoAwayReason sets the value of t.goAwayReason based
-// on the GoAway frame received.
-// It expects a lock on transport's mutext to be held by
-// the caller.
-func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
- t.goAwayReason = NoReason
- switch f.ErrCode {
- case http2.ErrCodeEnhanceYourCalm:
- if string(f.DebugData()) == "too_many_pings" {
- t.goAwayReason = TooManyPings
- }
- }
-}
-
-func (t *http2Client) GetGoAwayReason() GoAwayReason {
- t.mu.Lock()
- defer t.mu.Unlock()
- return t.goAwayReason
-}
-
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
id := f.Header().StreamID
incr := f.Increment
@@ -960,17 +897,18 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
var state decodeState
for _, hf := range frame.Fields {
- if err := state.processHeaderField(hf); err != nil {
- s.mu.Lock()
- if !s.headerDone {
- close(s.headerChan)
- s.headerDone = true
- }
- s.mu.Unlock()
- s.write(recvMsg{err: err})
- // Something wrong. Stops reading even when there is remaining.
- return
+ state.processHeaderField(hf)
+ }
+ if state.err != nil {
+ s.mu.Lock()
+ if !s.headerDone {
+ close(s.headerChan)
+ s.headerDone = true
}
+ s.mu.Unlock()
+ s.write(recvMsg{err: state.err})
+ // Something wrong. Stops reading even when there is remaining.
+ return
}
endStream := frame.StreamEnded()
@@ -1013,7 +951,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
if len(state.mdata) > 0 {
s.trailer = state.mdata
}
- s.finish(state.status())
+ s.statusCode = state.statusCode
+ s.statusDesc = state.statusDesc
+ close(s.done)
+ s.state = streamDone
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@@ -1041,7 +982,6 @@ func (t *http2Client) reader() {
t.notifyError(err)
return
}
- atomic.CompareAndSwapUint32(&t.activity, 0, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.notifyError(err)
@@ -1052,7 +992,6 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
for {
frame, err := t.framer.readFrame()
- atomic.CompareAndSwapUint32(&t.activity, 0, 1)
if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
@@ -1104,10 +1043,16 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
s.Val = math.MaxInt32
}
t.mu.Lock()
+ reset := t.streamsQuota != nil
+ if !reset {
+ t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
+ }
ms := t.maxStreams
t.maxStreams = int(s.Val)
t.mu.Unlock()
- t.streamsQuota.add(int(s.Val) - ms)
+ if reset {
+ t.streamsQuota.add(int(s.Val) - ms)
+ }
case http2.SettingInitialWindowSize:
t.mu.Lock()
for _, stream := range t.activeStreams {
@@ -1140,12 +1085,6 @@ func (t *http2Client) controller() {
t.framer.writeSettings(true, i.ss...)
}
case *resetStream:
- // If the server needs to be to intimated about stream closing,
- // then we need to make sure the RST_STREAM frame is written to
- // the wire before the headers of the next stream waiting on
- // streamQuota. We ensure this by adding to the streamsQuota pool
- // only after having acquired the writableChan to send RST_STREAM.
- t.streamsQuota.add(1)
t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO:
t.framer.flushWrite()
@@ -1165,61 +1104,6 @@ func (t *http2Client) controller() {
}
}
-// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
-func (t *http2Client) keepalive() {
- p := &ping{data: [8]byte{}}
- timer := time.NewTimer(t.kp.Time)
- for {
- select {
- case <-timer.C:
- if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
- timer.Reset(t.kp.Time)
- continue
- }
- // Check if keepalive should go dormant.
- t.mu.Lock()
- if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
- // Make awakenKeepalive writable.
- <-t.awakenKeepalive
- t.mu.Unlock()
- select {
- case <-t.awakenKeepalive:
- // If the control gets here a ping has been sent
- // need to reset the timer with keepalive.Timeout.
- case <-t.shutdownChan:
- return
- }
- } else {
- t.mu.Unlock()
- // Send ping.
- t.controlBuf.put(p)
- }
-
- // By the time control gets here a ping has been sent one way or the other.
- timer.Reset(t.kp.Timeout)
- select {
- case <-timer.C:
- if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
- timer.Reset(t.kp.Time)
- continue
- }
- t.Close()
- return
- case <-t.shutdownChan:
- if !timer.Stop() {
- <-timer.C
- }
- return
- }
- case <-t.shutdownChan:
- if !timer.Stop() {
- <-timer.C
- }
- return
- }
- }
-}
-
func (t *http2Client) Error() <-chan struct{} {
return t.errorChan
}
diff --git a/go/vendor/google.golang.org/grpc/transport/http2_server.go b/go/vendor/google.golang.org/grpc/transport/http2_server.go
index 31fefc7..a095dd0 100644
--- a/go/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/go/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -38,25 +38,19 @@ import (
"errors"
"io"
"math"
- "math/rand"
"net"
"strconv"
"sync"
- "sync/atomic"
- "time"
- "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
- "google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
)
@@ -96,33 +90,11 @@ type http2Server struct {
stats stats.Handler
- // Flag to keep track of reading activity on transport.
- // 1 is true and 0 is false.
- activity uint32 // Accessed atomically.
- // Keepalive and max-age parameters for the server.
- kp keepalive.ServerParameters
-
- // Keepalive enforcement policy.
- kep keepalive.EnforcementPolicy
- // The time instance last ping was received.
- lastPingAt time.Time
- // Number of times the client has violated keepalive ping policy so far.
- pingStrikes uint8
- // Flag to signify that number of ping strikes should be reset to 0.
- // This is set whenever data or header frames are sent.
- // 1 means yes.
- resetPingStrikes uint32 // Accessed atomically.
-
mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
- // idle is the time instant when the connection went idle.
- // This is either the begining of the connection or when the number of
- // RPCs go down to 0.
- // When the connection is busy, this value is set to 0.
- idle time.Time
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -156,28 +128,6 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
- kp := config.KeepaliveParams
- if kp.MaxConnectionIdle == 0 {
- kp.MaxConnectionIdle = defaultMaxConnectionIdle
- }
- if kp.MaxConnectionAge == 0 {
- kp.MaxConnectionAge = defaultMaxConnectionAge
- }
- // Add a jitter to MaxConnectionAge.
- kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
- if kp.MaxConnectionAgeGrace == 0 {
- kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
- }
- if kp.Time == 0 {
- kp.Time = defaultServerKeepaliveTime
- }
- if kp.Timeout == 0 {
- kp.Timeout = defaultServerKeepaliveTimeout
- }
- kep := config.KeepalivePolicy
- if kep.MinTime == 0 {
- kep.MinTime = defaultKeepalivePolicyMinTime
- }
var buf bytes.Buffer
t := &http2Server{
ctx: context.Background(),
@@ -199,9 +149,6 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
stats: config.StatsHandler,
- kp: kp,
- idle: time.Now(),
- kep: kep,
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
@@ -212,7 +159,6 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
t.stats.HandleConn(t.ctx, connBegin)
}
go t.controller()
- go t.keepalive()
t.writableChan <- 0
return t, nil
}
@@ -229,12 +175,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
var state decodeState
for _, hf := range frame.Fields {
- if err := state.processHeaderField(hf); err != nil {
- if se, ok := err.(StreamError); ok {
- t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
- }
- return
+ state.processHeaderField(hf)
+ }
+ if err := state.err; err != nil {
+ if se, ok := err.(StreamError); ok {
+ t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
}
+ return
}
if frame.StreamEnded() {
@@ -261,7 +208,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.ctx = newContextWithStream(s.ctx, s)
// Attach the received metadata to the context.
if len(state.mdata) > 0 {
- s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
+ s.ctx = metadata.NewContext(s.ctx, state.mdata)
}
s.dec = &recvBufferReader{
@@ -301,9 +248,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.maxStreamID = s.id
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
t.activeStreams[s.id] = s
- if len(t.activeStreams) == 1 {
- t.idle = time.Time{}
- }
t.mu.Unlock()
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
@@ -351,7 +295,6 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
t.Close()
return
}
- atomic.StoreUint32(&t.activity, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
@@ -362,7 +305,6 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
for {
frame, err := t.framer.readFrame()
- atomic.StoreUint32(&t.activity, 1)
if err != nil {
if se, ok := err.(http2.StreamError); ok {
t.mu.Lock()
@@ -439,7 +381,7 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
}
func (t *http2Server) handleData(f *http2.DataFrame) {
- size := f.Header().Length
+ size := len(f.Data())
if err := t.fc.onData(uint32(size)); err != nil {
grpclog.Printf("transport: http2Server %v", err)
t.Close()
@@ -454,11 +396,6 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
return
}
if size > 0 {
- if f.Header().Flags.Has(http2.FlagDataPadded) {
- if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- }
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
@@ -474,20 +411,13 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
- if f.Header().Flags.Has(http2.FlagDataPadded) {
- if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
- }
- }
s.mu.Unlock()
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
- if len(f.Data()) > 0 {
- data := make([]byte, len(f.Data()))
- copy(data, f.Data())
- s.write(recvMsg{data: data})
- }
+ data := make([]byte, size)
+ copy(data, f.Data())
+ s.write(recvMsg{data: data})
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
// Received the end of stream from the client.
@@ -521,11 +451,6 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
t.controlBuf.put(&settings{ack: true, ss: ss})
}
-const (
- maxPingStrikes = 2
- defaultPingTimeout = 2 * time.Hour
-)
-
func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() { // Do nothing.
return
@@ -533,38 +458,6 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
-
- now := time.Now()
- defer func() {
- t.lastPingAt = now
- }()
- // A reset ping strikes means that we don't need to check for policy
- // violation for this ping and the pingStrikes counter should be set
- // to 0.
- if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
- t.pingStrikes = 0
- return
- }
- t.mu.Lock()
- ns := len(t.activeStreams)
- t.mu.Unlock()
- if ns < 1 && !t.kep.PermitWithoutStream {
- // Keepalive shouldn't be active thus, this new ping should
- // have come after atleast defaultPingTimeout.
- if t.lastPingAt.Add(defaultPingTimeout).After(now) {
- t.pingStrikes++
- }
- } else {
- // Check if keepalive policy is respected.
- if t.lastPingAt.Add(t.kep.MinTime).After(now) {
- t.pingStrikes++
- }
- }
-
- if t.pingStrikes > maxPingStrikes {
- // Send goaway and close the connection.
- t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings")})
- }
}
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@@ -583,13 +476,6 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
first := true
endHeaders := false
var err error
- defer func() {
- if err == nil {
- // Reset ping strikes when seding headers since that might cause the
- // peer to send ping.
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- }
- }()
// Sends the headers in a single batch.
for !endHeaders {
size := t.hBuf.Len()
@@ -671,7 +557,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
// There is no further I/O operations being able to perform on this stream.
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
-func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
+func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
var headersSent, hasHeader bool
s.mu.Lock()
if s.state == streamDone {
@@ -702,24 +588,9 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
t.hEnc.WriteField(
hpack.HeaderField{
Name: "grpc-status",
- Value: strconv.Itoa(int(st.Code())),
+ Value: strconv.Itoa(int(statusCode)),
})
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
-
- if p := st.Proto(); p != nil && len(p.Details) > 0 {
- stBytes, err := proto.Marshal(p)
- if err != nil {
- // TODO: return error instead, when callers are able to handle it.
- panic(err)
- }
-
- for k, v := range metadata.New(map[string]string{"grpc-status-details-bin": (string)(stBytes)}) {
- for _, entry := range v {
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
- }
- }
- }
-
+ t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(statusDesc)})
// Attach the trailer metadata.
for k, v := range s.trailer {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
@@ -748,7 +619,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
-func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
+func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
// TODO(zhaoq): Support multi-writers for a single stream.
var writeHeaderFrame bool
s.mu.Lock()
@@ -763,13 +634,6 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
if writeHeaderFrame {
t.WriteHeader(s, nil)
}
- defer func() {
- if err == nil {
- // Reset ping strikes when sending data since this might cause
- // the peer to send ping.
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- }
- }()
r := bytes.NewBuffer(data)
for {
if r.Len() == 0 {
@@ -859,91 +723,6 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
}
}
-// keepalive running in a separate goroutine does the following:
-// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
-// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
-// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
-// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection
-// after an additional duration of keepalive.Timeout.
-func (t *http2Server) keepalive() {
- p := &ping{}
- var pingSent bool
- maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
- maxAge := time.NewTimer(t.kp.MaxConnectionAge)
- keepalive := time.NewTimer(t.kp.Time)
- // NOTE: All exit paths of this function should reset their
- // respecitve timers. A failure to do so will cause the
- // following clean-up to deadlock and eventually leak.
- defer func() {
- if !maxIdle.Stop() {
- <-maxIdle.C
- }
- if !maxAge.Stop() {
- <-maxAge.C
- }
- if !keepalive.Stop() {
- <-keepalive.C
- }
- }()
- for {
- select {
- case <-maxIdle.C:
- t.mu.Lock()
- idle := t.idle
- if idle.IsZero() { // The connection is non-idle.
- t.mu.Unlock()
- maxIdle.Reset(t.kp.MaxConnectionIdle)
- continue
- }
- val := t.kp.MaxConnectionIdle - time.Since(idle)
- if val <= 0 {
- // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
- // Gracefully close the connection.
- t.state = draining
- t.mu.Unlock()
- t.Drain()
- // Reseting the timer so that the clean-up doesn't deadlock.
- maxIdle.Reset(infinity)
- return
- }
- t.mu.Unlock()
- maxIdle.Reset(val)
- case <-maxAge.C:
- t.mu.Lock()
- t.state = draining
- t.mu.Unlock()
- t.Drain()
- maxAge.Reset(t.kp.MaxConnectionAgeGrace)
- select {
- case <-maxAge.C:
- // Close the connection after grace period.
- t.Close()
- // Reseting the timer so that the clean-up doesn't deadlock.
- maxAge.Reset(infinity)
- case <-t.shutdownChan:
- }
- return
- case <-keepalive.C:
- if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
- pingSent = false
- keepalive.Reset(t.kp.Time)
- continue
- }
- if pingSent {
- t.Close()
- // Reseting the timer so that the clean-up doesn't deadlock.
- keepalive.Reset(infinity)
- return
- }
- pingSent = true
- t.controlBuf.put(p)
- keepalive.Reset(t.kp.Timeout)
- case <-t.shutdownChan:
- return
- }
- }
-}
-
// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Server) controller() {
@@ -975,10 +754,7 @@ func (t *http2Server) controller() {
sid := t.maxStreamID
t.state = draining
t.mu.Unlock()
- t.framer.writeGoAway(true, sid, i.code, i.debugData)
- if i.code == http2.ErrCodeEnhanceYourCalm {
- t.Close()
- }
+ t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
case *flushIO:
t.framer.flushWrite()
case *ping:
@@ -1028,9 +804,6 @@ func (t *http2Server) Close() (err error) {
func (t *http2Server) closeStream(s *Stream) {
t.mu.Lock()
delete(t.activeStreams, s.id)
- if len(t.activeStreams) == 0 {
- t.idle = time.Now()
- }
if t.state == draining && len(t.activeStreams) == 0 {
defer t.Close()
}
@@ -1058,17 +831,5 @@ func (t *http2Server) RemoteAddr() net.Addr {
}
func (t *http2Server) Drain() {
- t.controlBuf.put(&goAway{code: http2.ErrCodeNo})
-}
-
-var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
-
-func getJitter(v time.Duration) time.Duration {
- if v == infinity {
- return 0
- }
- // Generate a jitter between +/- 10% of the value.
- r := int64(v / 10)
- j := rgen.Int63n(2*r) - r
- return time.Duration(j)
+ t.controlBuf.put(&goAway{})
}
diff --git a/go/vendor/google.golang.org/grpc/transport/http_util.go b/go/vendor/google.golang.org/grpc/transport/http_util.go
index 89c1525..a3c68d4 100644
--- a/go/vendor/google.golang.org/grpc/transport/http_util.go
+++ b/go/vendor/google.golang.org/grpc/transport/http_util.go
@@ -44,17 +44,16 @@ import (
"sync/atomic"
"time"
- "github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
- spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
)
const (
+ // The primary user agent
+ primaryUA = "grpc-go/1.0"
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
@@ -93,15 +92,13 @@ var (
// Records the states during HPACK decoding. Must be reset once the
// decoding of the entire headers are finished.
type decodeState struct {
+ err error // first error encountered decoding
+
encoding string
- // statusGen caches the stream status received from the trailer the server
- // sent. Client side only. Do not access directly. After all trailers are
- // parsed, use the status method to retrieve the status.
- statusGen *status.Status
- // rawStatusCode and rawStatusMsg are set from the raw trailer fields and are not
- // intended for direct access outside of parsing.
- rawStatusCode int32
- rawStatusMsg string
+ // statusCode caches the stream status received from the trailer
+ // the server sent. Client side only.
+ statusCode codes.Code
+ statusDesc string
// Server side only fields.
timeoutSet bool
timeout time.Duration
@@ -124,7 +121,6 @@ func isReservedHeader(hdr string) bool {
"grpc-message",
"grpc-status",
"grpc-timeout",
- "grpc-status-details-bin",
"te":
return true
default:
@@ -143,6 +139,12 @@ func isWhitelistedPseudoHeader(hdr string) bool {
}
}
+func (d *decodeState) setErr(err error) {
+ if d.err == nil {
+ d.err = err
+ }
+}
+
func validContentType(t string) bool {
e := "application/grpc"
if !strings.HasPrefix(t, e) {
@@ -156,62 +158,56 @@ func validContentType(t string) bool {
return true
}
-func (d *decodeState) status() *status.Status {
- if d.statusGen == nil {
- // No status-details were provided; generate status using code/msg.
- d.statusGen = status.New(codes.Code(d.rawStatusCode), d.rawStatusMsg)
- }
- return d.statusGen
-}
-
-func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
+func (d *decodeState) processHeaderField(f hpack.HeaderField) {
switch f.Name {
case "content-type":
if !validContentType(f.Value) {
- return streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value)
+ d.setErr(streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
+ return
}
case "grpc-encoding":
d.encoding = f.Value
case "grpc-status":
code, err := strconv.Atoi(f.Value)
if err != nil {
- return streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err)
+ d.setErr(streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
+ return
}
- d.rawStatusCode = int32(code)
+ d.statusCode = codes.Code(code)
case "grpc-message":
- d.rawStatusMsg = decodeGrpcMessage(f.Value)
- case "grpc-status-details-bin":
- _, v, err := metadata.DecodeKeyValue("grpc-status-details-bin", f.Value)
- if err != nil {
- return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
- }
- s := &spb.Status{}
- if err := proto.Unmarshal([]byte(v), s); err != nil {
- return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
- }
- d.statusGen = status.FromProto(s)
+ d.statusDesc = decodeGrpcMessage(f.Value)
case "grpc-timeout":
d.timeoutSet = true
var err error
- if d.timeout, err = decodeTimeout(f.Value); err != nil {
- return streamErrorf(codes.Internal, "transport: malformed time-out: %v", err)
+ d.timeout, err = decodeTimeout(f.Value)
+ if err != nil {
+ d.setErr(streamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
+ return
}
case ":path":
d.method = f.Value
default:
if !isReservedHeader(f.Name) || isWhitelistedPseudoHeader(f.Name) {
+ if f.Name == "user-agent" {
+ i := strings.LastIndex(f.Value, " ")
+ if i == -1 {
+ // There is no application user agent string being set.
+ return
+ }
+ // Extract the application user agent string.
+ f.Value = f.Value[:i]
+ }
if d.mdata == nil {
d.mdata = make(map[string][]string)
}
k, v, err := metadata.DecodeKeyValue(f.Name, f.Value)
if err != nil {
grpclog.Printf("Failed to decode (%q, %q): %v", f.Name, f.Value, err)
- return nil
+ return
}
d.mdata[k] = append(d.mdata[k], v)
}
}
- return nil
}
type timeoutUnit uint8
@@ -383,9 +379,6 @@ func newFramer(conn net.Conn) *framer {
writer: bufio.NewWriterSize(conn, http2IOBufSize),
}
f.fr = http2.NewFramer(f.writer, f.reader)
- // Opt-in to Frame reuse API on framer to reduce garbage.
- // Frames aren't safe to read from after a subsequent call to ReadFrame.
- f.fr.SetReuseFrames()
f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return f
}
diff --git a/go/vendor/google.golang.org/grpc/transport/pre_go16.go b/go/vendor/google.golang.org/grpc/transport/pre_go16.go
new file mode 100644
index 0000000..33d91c1
--- /dev/null
+++ b/go/vendor/google.golang.org/grpc/transport/pre_go16.go
@@ -0,0 +1,51 @@
+// +build !go1.6
+
+/*
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package transport
+
+import (
+ "net"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+// dialContext connects to the address on the named network.
+func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ var dialer net.Dialer
+ if deadline, ok := ctx.Deadline(); ok {
+ dialer.Timeout = deadline.Sub(time.Now())
+ }
+ return dialer.Dial(network, address)
+}
diff --git a/go/vendor/google.golang.org/grpc/transport/transport.go b/go/vendor/google.golang.org/grpc/transport/transport.go
index 4d381d6..d465991 100644
--- a/go/vendor/google.golang.org/grpc/transport/transport.go
+++ b/go/vendor/google.golang.org/grpc/transport/transport.go
@@ -45,13 +45,10 @@ import (
"sync"
"golang.org/x/net/context"
- "golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
- "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
- "google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
)
@@ -213,13 +210,9 @@ type Stream struct {
// true iff headerChan is closed. Used to avoid closing headerChan
// multiple times.
headerDone bool
- // the status error received from the server.
- status *status.Status
- // rstStream indicates whether a RST_STREAM frame needs to be sent
- // to the server to signify that this stream is closing.
- rstStream bool
- // rstError is the error that needs to be sent along with the RST_STREAM frame.
- rstError http2.ErrCode
+ // the status received from the server.
+ statusCode codes.Code
+ statusDesc string
}
// RecvCompress returns the compression algorithm applied to the inbound
@@ -284,9 +277,14 @@ func (s *Stream) Method() string {
return s.method
}
-// Status returns the status received from the server.
-func (s *Stream) Status() *status.Status {
- return s.status
+// StatusCode returns statusCode received from the server.
+func (s *Stream) StatusCode() codes.Code {
+ return s.statusCode
+}
+
+// StatusDesc returns statusDesc received from the server.
+func (s *Stream) StatusDesc() string {
+ return s.statusDesc
}
// SetHeader sets the header metadata. This can be called multiple times.
@@ -333,20 +331,6 @@ func (s *Stream) Read(p []byte) (n int, err error) {
return
}
-// finish sets the stream's state and status, and closes the done channel.
-// s.mu must be held by the caller. st must always be non-nil.
-func (s *Stream) finish(st *status.Status) {
- s.status = st
- s.state = streamDone
- close(s.done)
-}
-
-// GoString is implemented by Stream so context.String() won't
-// race when printing %#v.
-func (s *Stream) GoString() string {
- return fmt.Sprintf("<stream: %p, %v>", s, s.method)
-}
-
// The key to save transport.Stream in the context.
type streamKey struct{}
@@ -374,12 +358,10 @@ const (
// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
- MaxStreams uint32
- AuthInfo credentials.AuthInfo
- InTapHandle tap.ServerInHandle
- StatsHandler stats.Handler
- KeepaliveParams keepalive.ServerParameters
- KeepalivePolicy keepalive.EnforcementPolicy
+ MaxStreams uint32
+ AuthInfo credentials.AuthInfo
+ InTapHandle tap.ServerInHandle
+ StatsHandler stats.Handler
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
@@ -392,9 +374,6 @@ func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (S
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
- // Authority is the :authority pseudo-header to use. This field has no effect if
- // TransportCredentials is set.
- Authority string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
@@ -403,8 +382,6 @@ type ConnectOptions struct {
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
- // KeepaliveParams stores the keepalive parameters.
- KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
}
@@ -493,9 +470,6 @@ type ClientTransport interface {
// receives the draining signal from the server (e.g., GOAWAY frame in
// HTTP/2).
GoAway() <-chan struct{}
-
- // GetGoAwayReason returns the reason why GoAway frame was received.
- GetGoAwayReason() GoAwayReason
}
// ServerTransport is the common interface for all gRPC server-side transport
@@ -515,9 +489,10 @@ type ServerTransport interface {
// Write may not be called on all streams.
Write(s *Stream, data []byte, opts *Options) error
- // WriteStatus sends the status of a stream to the client. WriteStatus is
- // the final call made on a stream and always occurs.
- WriteStatus(s *Stream, st *status.Status) error
+ // WriteStatus sends the status of a stream to the client.
+ // WriteStatus is the final call made on a stream and always
+ // occurs.
+ WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
@@ -583,8 +558,6 @@ var (
ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
)
-// TODO: See if we can replace StreamError with status package errors.
-
// StreamError is an error that only affects one stream within a connection.
type StreamError struct {
Code codes.Code
@@ -592,7 +565,7 @@ type StreamError struct {
}
func (e StreamError) Error() string {
- return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
+ return fmt.Sprintf("stream error: code = %d desc = %q", e.Code, e.Desc)
}
// ContextErr converts the error from context package into a StreamError.
@@ -633,16 +606,3 @@ func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-
return i, nil
}
}
-
-// GoAwayReason contains the reason for the GoAway frame received.
-type GoAwayReason uint8
-
-const (
- // Invalid indicates that no GoAway frame is received.
- Invalid GoAwayReason = 0
- // NoReason is the default value when GoAway frame is received.
- NoReason GoAwayReason = 1
- // TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
- // was recieved and that the debug data said "too_many_pings".
- TooManyPings GoAwayReason = 2
-)
diff --git a/go/vendor/vendor.json b/go/vendor/vendor.json
index 528df74..271e9bd 100644
--- a/go/vendor/vendor.json
+++ b/go/vendor/vendor.json
@@ -132,10 +132,10 @@
"revisionTime": "2017-04-04T13:20:09Z"
},
{
- "checksumSHA1": "tidJMmntKTZuU196aiLojkULL+g=",
+ "checksumSHA1": "epHwh7hDQSYzDowPIbw8vnLzPS0=",
"path": "google.golang.org/grpc",
- "revision": "6d158dbf32084eac5fc0b9ea6f1feed214290ec6",
- "revisionTime": "2017-04-12T06:39:30Z"
+ "revision": "50955793b0183f9de69bd78e2ec251cf20aab121",
+ "revisionTime": "2017-01-11T19:10:52Z"
},
{
"checksumSHA1": "08icuA15HRkdYCt6H+Cs90RPQsY=",
@@ -204,10 +204,10 @@
"revisionTime": "2017-04-12T06:39:30Z"
},
{
- "checksumSHA1": "WMlN+OrgFM70j2/AoMh6DM6NtK8=",
+ "checksumSHA1": "yHpUeGwKoqqwd3cbEp3lkcnvft0=",
"path": "google.golang.org/grpc/transport",
- "revision": "6d158dbf32084eac5fc0b9ea6f1feed214290ec6",
- "revisionTime": "2017-04-12T06:39:30Z"
+ "revision": "50955793b0183f9de69bd78e2ec251cf20aab121",
+ "revisionTime": "2017-01-11T19:10:52Z"
},
{
"checksumSHA1": "fALlQNY1fM99NesfLJ50KguWsio=",