summaryrefslogtreecommitdiff
path: root/workhorse/internal/gitaly
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2021-10-11 09:09:08 +0000
committerGitLab Bot <gitlab-bot@gitlab.com>2021-10-11 09:09:08 +0000
commit31a9181ed65e80ceac2cbd6e2dba9af40e7b0d0b (patch)
tree29d9b11c777d092c0b3e453cc0f44a0a52ecb000 /workhorse/internal/gitaly
parentbe7d70b884e6fa66c52862f38bf0f39b0631868b (diff)
downloadgitlab-ce-31a9181ed65e80ceac2cbd6e2dba9af40e7b0d0b.tar.gz
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse/internal/gitaly')
-rw-r--r--workhorse/internal/gitaly/gitaly.go46
-rw-r--r--workhorse/internal/gitaly/gitaly_test.go20
-rw-r--r--workhorse/internal/gitaly/smarthttp.go46
-rw-r--r--workhorse/internal/gitaly/unmarshal_test.go13
4 files changed, 110 insertions, 15 deletions
diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go
index 6ea99962056..362f380dc4d 100644
--- a/workhorse/internal/gitaly/gitaly.go
+++ b/workhorse/internal/gitaly/gitaly.go
@@ -11,6 +11,7 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
+ "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
@@ -23,15 +24,19 @@ import (
)
type Server struct {
- Address string `json:"address"`
- Token string `json:"token"`
- Features map[string]string `json:"features"`
+ Address string `json:"address"`
+ Token string `json:"token"`
+ Features map[string]string `json:"features"`
+ Sidechannel bool `json:"sidechannel"`
}
-type cacheKey struct{ address, token string }
+type cacheKey struct {
+ address, token string
+ sidechannel bool
+}
func (server Server) cacheKey() cacheKey {
- return cacheKey{address: server.Address, token: server.Token}
+ return cacheKey{address: server.Address, token: server.Token, sidechannel: server.Sidechannel}
}
type connectionsCache struct {
@@ -41,9 +46,17 @@ type connectionsCache struct {
var (
jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true}
- cache = connectionsCache{
+ // This connection cache map contains two types of connections:
+ // - Normal gRPC connections
+ // - Sidechannel connections. When client dials to the Gitaly server, the
+ // server multiplexes the connection using Yamux. In the future, the server
+ // can open another stream to transfer data without gRPC. Besides, we apply
+ // a framing protocol to add the half-close capability to Yamux streams.
+ // Hence, we cannot use those connections interchangeably.
+ cache = connectionsCache{
connections: make(map[cacheKey]*grpc.ClientConn),
}
+ sidechannelRegistry *gitalyclient.SidechannelRegistry
connectionsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
@@ -54,6 +67,12 @@ var (
)
)
+func InitializeSidechannelRegistry(logger *logrus.Logger) {
+ if sidechannelRegistry == nil {
+ sidechannelRegistry = gitalyclient.NewSidechannelRegistry(logrus.NewEntry(logger))
+ }
+}
+
func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context {
md := metadata.New(nil)
for k, v := range features {
@@ -72,7 +91,12 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S
return nil, nil, err
}
grpcClient := gitalypb.NewSmartHTTPServiceClient(conn)
- return withOutgoingMetadata(ctx, server.Features), &SmartHTTPClient{grpcClient}, nil
+ smartHTTPClient := &SmartHTTPClient{
+ SmartHTTPServiceClient: grpcClient,
+ sidechannelRegistry: sidechannelRegistry,
+ useSidechannel: server.Sidechannel,
+ }
+ return withOutgoingMetadata(ctx, server.Features), smartHTTPClient, nil
}
func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) {
@@ -173,7 +197,13 @@ func newConnection(server Server) (*grpc.ClientConn, error) {
),
)
- conn, connErr := gitalyclient.Dial(server.Address, connOpts)
+ var conn *grpc.ClientConn
+ var connErr error
+ if server.Sidechannel {
+ conn, connErr = gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background
+ } else {
+ conn, connErr = gitalyclient.Dial(server.Address, connOpts)
+ }
label := "ok"
if connErr != nil {
diff --git a/workhorse/internal/gitaly/gitaly_test.go b/workhorse/internal/gitaly/gitaly_test.go
index b17fb5c1d7b..9c54caae8c6 100644
--- a/workhorse/internal/gitaly/gitaly_test.go
+++ b/workhorse/internal/gitaly/gitaly_test.go
@@ -4,14 +4,32 @@ import (
"context"
"testing"
+ "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
)
func TestNewSmartHTTPClient(t *testing.T) {
- ctx, _, err := NewSmartHTTPClient(context.Background(), serverFixture())
+ ctx, client, err := NewSmartHTTPClient(context.Background(), serverFixture())
require.NoError(t, err)
testOutgoingMetadata(t, ctx)
+
+ require.False(t, client.useSidechannel)
+ require.Nil(t, client.sidechannelRegistry)
+}
+
+func TestNewSmartHTTPClientWithSidechannel(t *testing.T) {
+ InitializeSidechannelRegistry(logrus.StandardLogger())
+
+ fixture := serverFixture()
+ fixture.Sidechannel = true
+
+ ctx, client, err := NewSmartHTTPClient(context.Background(), fixture)
+ require.NoError(t, err)
+ testOutgoingMetadata(t, ctx)
+
+ require.True(t, client.useSidechannel)
+ require.NotNil(t, client.sidechannelRegistry)
}
func TestNewBlobClient(t *testing.T) {
diff --git a/workhorse/internal/gitaly/smarthttp.go b/workhorse/internal/gitaly/smarthttp.go
index 69656ab0a92..de6954efa60 100644
--- a/workhorse/internal/gitaly/smarthttp.go
+++ b/workhorse/internal/gitaly/smarthttp.go
@@ -5,11 +5,14 @@ import (
"fmt"
"io"
+ gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
)
type SmartHTTPClient struct {
+ useSidechannel bool
+ sidechannelRegistry *gitalyclient.SidechannelRegistry
gitalypb.SmartHTTPServiceClient
}
@@ -93,6 +96,14 @@ func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.R
}
func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
+ if client.useSidechannel {
+ return client.runUploadPackWithSidechannel(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol)
+ }
+
+ return client.runUploadPack(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol)
+}
+
+func (client *SmartHTTPClient) runUploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
stream, err := client.PostUploadPack(ctx)
if err != nil {
return err
@@ -137,3 +148,38 @@ func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Re
return nil
}
+
+func (client *SmartHTTPClient) runUploadPackWithSidechannel(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
+ ctx, waiter := client.sidechannelRegistry.Register(ctx, func(conn gitalyclient.SidechannelConn) error {
+ if _, err := io.Copy(conn, clientRequest); err != nil {
+ return err
+ }
+
+ if err := conn.CloseWrite(); err != nil {
+ return fmt.Errorf("fail to signal sidechannel half-close: %w", err)
+ }
+
+ if _, err := io.Copy(clientResponse, conn); err != nil {
+ return err
+ }
+
+ return nil
+ })
+ defer waiter.Close()
+
+ rpcRequest := &gitalypb.PostUploadPackWithSidechannelRequest{
+ Repository: repo,
+ GitConfigOptions: gitConfigOptions,
+ GitProtocol: gitProtocol,
+ }
+
+ if _, err := client.PostUploadPackWithSidechannel(ctx, rpcRequest); err != nil {
+ return err
+ }
+
+ if err := waiter.Close(); err != nil {
+ return fmt.Errorf("fail to close sidechannel connection: %w", err)
+ }
+
+ return nil
+}
diff --git a/workhorse/internal/gitaly/unmarshal_test.go b/workhorse/internal/gitaly/unmarshal_test.go
index 270b96f900d..730eff4005a 100644
--- a/workhorse/internal/gitaly/unmarshal_test.go
+++ b/workhorse/internal/gitaly/unmarshal_test.go
@@ -3,6 +3,7 @@ package gitaly
import (
"testing"
+ "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
@@ -11,25 +12,25 @@ func TestUnmarshalJSON(t *testing.T) {
testCases := []struct {
desc string
in string
- out gitalypb.Repository
+ out *gitalypb.Repository
}{
{
desc: "basic example",
in: `{"relative_path":"foo/bar.git"}`,
- out: gitalypb.Repository{RelativePath: "foo/bar.git"},
+ out: &gitalypb.Repository{RelativePath: "foo/bar.git"},
},
{
desc: "unknown field",
in: `{"relative_path":"foo/bar.git","unknown_field":12345}`,
- out: gitalypb.Repository{RelativePath: "foo/bar.git"},
+ out: &gitalypb.Repository{RelativePath: "foo/bar.git"},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- result := gitalypb.Repository{}
- require.NoError(t, UnmarshalJSON(tc.in, &result))
- require.Equal(t, tc.out, result)
+ result := &gitalypb.Repository{}
+ require.NoError(t, UnmarshalJSON(tc.in, result))
+ require.True(t, proto.Equal(tc.out, result))
})
}
}