diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2021-10-11 09:09:08 +0000 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2021-10-11 09:09:08 +0000 |
commit | 31a9181ed65e80ceac2cbd6e2dba9af40e7b0d0b (patch) | |
tree | 29d9b11c777d092c0b3e453cc0f44a0a52ecb000 /workhorse/internal/gitaly | |
parent | be7d70b884e6fa66c52862f38bf0f39b0631868b (diff) | |
download | gitlab-ce-31a9181ed65e80ceac2cbd6e2dba9af40e7b0d0b.tar.gz |
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse/internal/gitaly')
-rw-r--r-- | workhorse/internal/gitaly/gitaly.go | 46 | ||||
-rw-r--r-- | workhorse/internal/gitaly/gitaly_test.go | 20 | ||||
-rw-r--r-- | workhorse/internal/gitaly/smarthttp.go | 46 | ||||
-rw-r--r-- | workhorse/internal/gitaly/unmarshal_test.go | 13 |
4 files changed, 110 insertions, 15 deletions
diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go index 6ea99962056..362f380dc4d 100644 --- a/workhorse/internal/gitaly/gitaly.go +++ b/workhorse/internal/gitaly/gitaly.go @@ -11,6 +11,7 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -23,15 +24,19 @@ import ( ) type Server struct { - Address string `json:"address"` - Token string `json:"token"` - Features map[string]string `json:"features"` + Address string `json:"address"` + Token string `json:"token"` + Features map[string]string `json:"features"` + Sidechannel bool `json:"sidechannel"` } -type cacheKey struct{ address, token string } +type cacheKey struct { + address, token string + sidechannel bool +} func (server Server) cacheKey() cacheKey { - return cacheKey{address: server.Address, token: server.Token} + return cacheKey{address: server.Address, token: server.Token, sidechannel: server.Sidechannel} } type connectionsCache struct { @@ -41,9 +46,17 @@ type connectionsCache struct { var ( jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true} - cache = connectionsCache{ + // This connection cache map contains two types of connections: + // - Normal gRPC connections + // - Sidechannel connections. When client dials to the Gitaly server, the + // server multiplexes the connection using Yamux. In the future, the server + // can open another stream to transfer data without gRPC. Besides, we apply + // a framing protocol to add the half-close capability to Yamux streams. + // Hence, we cannot use those connections interchangeably. + cache = connectionsCache{ connections: make(map[cacheKey]*grpc.ClientConn), } + sidechannelRegistry *gitalyclient.SidechannelRegistry connectionsTotal = promauto.NewCounterVec( prometheus.CounterOpts{ @@ -54,6 +67,12 @@ var ( ) ) +func InitializeSidechannelRegistry(logger *logrus.Logger) { + if sidechannelRegistry == nil { + sidechannelRegistry = gitalyclient.NewSidechannelRegistry(logrus.NewEntry(logger)) + } +} + func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context { md := metadata.New(nil) for k, v := range features { @@ -72,7 +91,12 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S return nil, nil, err } grpcClient := gitalypb.NewSmartHTTPServiceClient(conn) - return withOutgoingMetadata(ctx, server.Features), &SmartHTTPClient{grpcClient}, nil + smartHTTPClient := &SmartHTTPClient{ + SmartHTTPServiceClient: grpcClient, + sidechannelRegistry: sidechannelRegistry, + useSidechannel: server.Sidechannel, + } + return withOutgoingMetadata(ctx, server.Features), smartHTTPClient, nil } func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) { @@ -173,7 +197,13 @@ func newConnection(server Server) (*grpc.ClientConn, error) { ), ) - conn, connErr := gitalyclient.Dial(server.Address, connOpts) + var conn *grpc.ClientConn + var connErr error + if server.Sidechannel { + conn, connErr = gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background + } else { + conn, connErr = gitalyclient.Dial(server.Address, connOpts) + } label := "ok" if connErr != nil { diff --git a/workhorse/internal/gitaly/gitaly_test.go b/workhorse/internal/gitaly/gitaly_test.go index b17fb5c1d7b..9c54caae8c6 100644 --- a/workhorse/internal/gitaly/gitaly_test.go +++ b/workhorse/internal/gitaly/gitaly_test.go @@ -4,14 +4,32 @@ import ( "context" "testing" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" ) func TestNewSmartHTTPClient(t *testing.T) { - ctx, _, err := NewSmartHTTPClient(context.Background(), serverFixture()) + ctx, client, err := NewSmartHTTPClient(context.Background(), serverFixture()) require.NoError(t, err) testOutgoingMetadata(t, ctx) + + require.False(t, client.useSidechannel) + require.Nil(t, client.sidechannelRegistry) +} + +func TestNewSmartHTTPClientWithSidechannel(t *testing.T) { + InitializeSidechannelRegistry(logrus.StandardLogger()) + + fixture := serverFixture() + fixture.Sidechannel = true + + ctx, client, err := NewSmartHTTPClient(context.Background(), fixture) + require.NoError(t, err) + testOutgoingMetadata(t, ctx) + + require.True(t, client.useSidechannel) + require.NotNil(t, client.sidechannelRegistry) } func TestNewBlobClient(t *testing.T) { diff --git a/workhorse/internal/gitaly/smarthttp.go b/workhorse/internal/gitaly/smarthttp.go index 69656ab0a92..de6954efa60 100644 --- a/workhorse/internal/gitaly/smarthttp.go +++ b/workhorse/internal/gitaly/smarthttp.go @@ -5,11 +5,14 @@ import ( "fmt" "io" + gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" ) type SmartHTTPClient struct { + useSidechannel bool + sidechannelRegistry *gitalyclient.SidechannelRegistry gitalypb.SmartHTTPServiceClient } @@ -93,6 +96,14 @@ func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.R } func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { + if client.useSidechannel { + return client.runUploadPackWithSidechannel(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol) + } + + return client.runUploadPack(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol) +} + +func (client *SmartHTTPClient) runUploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { stream, err := client.PostUploadPack(ctx) if err != nil { return err @@ -137,3 +148,38 @@ func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Re return nil } + +func (client *SmartHTTPClient) runUploadPackWithSidechannel(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { + ctx, waiter := client.sidechannelRegistry.Register(ctx, func(conn gitalyclient.SidechannelConn) error { + if _, err := io.Copy(conn, clientRequest); err != nil { + return err + } + + if err := conn.CloseWrite(); err != nil { + return fmt.Errorf("fail to signal sidechannel half-close: %w", err) + } + + if _, err := io.Copy(clientResponse, conn); err != nil { + return err + } + + return nil + }) + defer waiter.Close() + + rpcRequest := &gitalypb.PostUploadPackWithSidechannelRequest{ + Repository: repo, + GitConfigOptions: gitConfigOptions, + GitProtocol: gitProtocol, + } + + if _, err := client.PostUploadPackWithSidechannel(ctx, rpcRequest); err != nil { + return err + } + + if err := waiter.Close(); err != nil { + return fmt.Errorf("fail to close sidechannel connection: %w", err) + } + + return nil +} diff --git a/workhorse/internal/gitaly/unmarshal_test.go b/workhorse/internal/gitaly/unmarshal_test.go index 270b96f900d..730eff4005a 100644 --- a/workhorse/internal/gitaly/unmarshal_test.go +++ b/workhorse/internal/gitaly/unmarshal_test.go @@ -3,6 +3,7 @@ package gitaly import ( "testing" + "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" ) @@ -11,25 +12,25 @@ func TestUnmarshalJSON(t *testing.T) { testCases := []struct { desc string in string - out gitalypb.Repository + out *gitalypb.Repository }{ { desc: "basic example", in: `{"relative_path":"foo/bar.git"}`, - out: gitalypb.Repository{RelativePath: "foo/bar.git"}, + out: &gitalypb.Repository{RelativePath: "foo/bar.git"}, }, { desc: "unknown field", in: `{"relative_path":"foo/bar.git","unknown_field":12345}`, - out: gitalypb.Repository{RelativePath: "foo/bar.git"}, + out: &gitalypb.Repository{RelativePath: "foo/bar.git"}, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - result := gitalypb.Repository{} - require.NoError(t, UnmarshalJSON(tc.in, &result)) - require.Equal(t, tc.out, result) + result := &gitalypb.Repository{} + require.NoError(t, UnmarshalJSON(tc.in, result)) + require.True(t, proto.Equal(tc.out, result)) }) } } |