diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2022-05-19 07:33:21 +0000 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2022-05-19 07:33:21 +0000 |
commit | 36a59d088eca61b834191dacea009677a96c052f (patch) | |
tree | e4f33972dab5d8ef79e3944a9f403035fceea43f /workhorse/internal | |
parent | a1761f15ec2cae7c7f7bbda39a75494add0dfd6f (diff) | |
download | gitlab-ce-36a59d088eca61b834191dacea009677a96c052f.tar.gz |
Add latest changes from gitlab-org/gitlab@15-0-stable-eev15.0.0-rc42
Diffstat (limited to 'workhorse/internal')
34 files changed, 667 insertions, 540 deletions
diff --git a/workhorse/internal/api/api.go b/workhorse/internal/api/api.go index 896f59a322a..8954923ad75 100644 --- a/workhorse/internal/api/api.go +++ b/workhorse/internal/api/api.go @@ -17,7 +17,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/log" @@ -150,7 +149,7 @@ type Response struct { // Used to communicate channel session details Channel *ChannelSettings // GitalyServer specifies an address and authentication token for a gitaly server we should connect to. - GitalyServer gitaly.Server + GitalyServer GitalyServer // Repository object for making gRPC requests to Gitaly. Repository gitalypb.Repository // For git-http, does the requestor have the right to view all refs? @@ -163,6 +162,12 @@ type Response struct { MaximumSize int64 } +type GitalyServer struct { + Address string `json:"address"` + Token string `json:"token"` + Features map[string]string `json:"features"` +} + // singleJoiningSlash is taken from reverseproxy.go:singleJoiningSlash func singleJoiningSlash(a, b string) string { aslash := strings.HasSuffix(a, "/") diff --git a/workhorse/internal/config/config.go b/workhorse/internal/config/config.go index 60cfd567f5d..e83f55f43bf 100644 --- a/workhorse/internal/config/config.go +++ b/workhorse/internal/config/config.go @@ -84,6 +84,19 @@ type ImageResizerConfig struct { MaxFilesize uint64 `toml:"max_filesize"` } +type TlsConfig struct { + Certificate string `toml:"certificate"` + Key string `toml:"key"` + MinVersion string `toml:"min_version"` + MaxVersion string `toml:"max_version"` +} + +type ListenerConfig struct { + Network string `toml:"network"` + Addr string `toml:"addr"` + Tls *TlsConfig `toml:"tls"` +} + type Config struct { Redis *RedisConfig `toml:"redis"` Backend *url.URL `toml:"-"` @@ -106,6 +119,7 @@ type Config struct { ShutdownTimeout TomlDuration `toml:"shutdown_timeout"` TrustedCIDRsForXForwardedFor []string `toml:"trusted_cidrs_for_x_forwarded_for"` TrustedCIDRsForPropagation []string `toml:"trusted_cidrs_for_propagation"` + Listeners []ListenerConfig `toml:"listeners"` } var DefaultImageResizerConfig = ImageResizerConfig{ diff --git a/workhorse/internal/git/archive.go b/workhorse/internal/git/archive.go index fc12094cc14..e1d03828b63 100644 --- a/workhorse/internal/git/archive.go +++ b/workhorse/internal/git/archive.go @@ -22,6 +22,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/log" @@ -33,7 +34,7 @@ type archiveParams struct { ArchivePath string ArchivePrefix string CommitId string - GitalyServer gitaly.Server + GitalyServer api.GitalyServer GitalyRepository gitalypb.Repository DisableCache bool GetArchiveRequest []byte @@ -132,7 +133,12 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string func handleArchiveWithGitaly(r *http.Request, params *archiveParams, format gitalypb.GetArchiveRequest_Format) (io.Reader, error) { var request *gitalypb.GetArchiveRequest - ctx, c, err := gitaly.NewRepositoryClient(r.Context(), params.GitalyServer) + ctx, c, err := gitaly.NewRepositoryClient( + r.Context(), + params.GitalyServer, + gitaly.WithFeatures(params.GitalyServer.Features), + ) + if err != nil { return nil, err } diff --git a/workhorse/internal/git/blob.go b/workhorse/internal/git/blob.go index 3ea065766d0..192978e6c75 100644 --- a/workhorse/internal/git/blob.go +++ b/workhorse/internal/git/blob.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/senddata" @@ -13,7 +14,7 @@ import ( type blob struct{ senddata.Prefix } type blobParams struct { - GitalyServer gitaly.Server + GitalyServer api.GitalyServer GetBlobRequest gitalypb.GetBlobRequest } @@ -26,7 +27,12 @@ func (b *blob) Inject(w http.ResponseWriter, r *http.Request, sendData string) { return } - ctx, blobClient, err := gitaly.NewBlobClient(r.Context(), params.GitalyServer) + ctx, blobClient, err := gitaly.NewBlobClient( + r.Context(), + params.GitalyServer, + gitaly.WithFeatures(params.GitalyServer.Features), + ) + if err != nil { helper.Fail500(w, r, fmt.Errorf("blob.GetBlob: %v", err)) return diff --git a/workhorse/internal/git/diff.go b/workhorse/internal/git/diff.go index 4877eea045a..252db6f150b 100644 --- a/workhorse/internal/git/diff.go +++ b/workhorse/internal/git/diff.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/log" @@ -14,7 +15,7 @@ import ( type diff struct{ senddata.Prefix } type diffParams struct { - GitalyServer gitaly.Server + GitalyServer api.GitalyServer RawDiffRequest string } @@ -33,7 +34,11 @@ func (d *diff) Inject(w http.ResponseWriter, r *http.Request, sendData string) { return } - ctx, diffClient, err := gitaly.NewDiffClient(r.Context(), params.GitalyServer) + ctx, diffClient, err := gitaly.NewDiffClient( + r.Context(), + params.GitalyServer, + gitaly.WithFeatures(params.GitalyServer.Features), + ) if err != nil { helper.Fail500(w, r, fmt.Errorf("diff.RawDiff: %v", err)) return diff --git a/workhorse/internal/git/error.go b/workhorse/internal/git/error.go index 2b7cad6bb64..86a2ba44767 100644 --- a/workhorse/internal/git/error.go +++ b/workhorse/internal/git/error.go @@ -1,4 +1,100 @@ package git +import ( + "errors" + "fmt" + "io" + + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc/status" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/log" +) + +// For unwrapping google.golang.org/grpc/internal/status.Error +type grpcErr interface { + GRPCStatus() *status.Status + Error() string +} + // For cosmetic purposes in Sentry type copyError struct{ error } + +// handleLimitErr handles errors that come back from Gitaly that may be a +// LimitError. A LimitError is returned by Gitaly when it is at its limit in +// handling requests. Since this is a known error, we should print a sensible +// error message to the end user. +func handleLimitErr(err error, w io.Writer, f func(w io.Writer) error) { + var statusErr grpcErr + if !errors.As(err, &statusErr) { + return + } + + if st, ok := status.FromError(statusErr); ok { + details := st.Details() + for _, detail := range details { + switch detail.(type) { + case *gitalypb.LimitError: + if err := f(w); err != nil { + log.WithError(fmt.Errorf("handling limit error: %w", err)) + } + } + } + } +} + +// writeReceivePackError writes a "server is busy" error message to the +// git-recieve-pack-result. +// +// 0023\x01001aunpack server is busy +// 00000044\x2GitLab is currently unable to handle this request due to load. +// 0000 +// +// We write a line reporting that unpack failed, and then provide some progress +// information through the side-band 2 channel. +// See https://gitlab.com/gitlab-org/gitaly/-/tree/jc-return-structured-error-limits +// for more details. +func writeReceivePackError(w io.Writer) error { + if _, err := fmt.Fprintf(w, "%04x", 35); err != nil { + return err + } + + if _, err := w.Write([]byte{0x01}); err != nil { + return err + } + + if _, err := fmt.Fprintf(w, "%04xunpack server is busy\n", 26); err != nil { + return err + } + + if _, err := w.Write([]byte("0000")); err != nil { + return err + } + + if _, err := fmt.Fprintf(w, "%04x", 68); err != nil { + return err + } + + if _, err := w.Write([]byte{0x2}); err != nil { + return err + } + + if _, err := fmt.Fprint(w, "GitLab is currently unable to handle this request due to load.\n"); err != nil { + return err + } + + if _, err := w.Write([]byte("0000")); err != nil { + return err + } + + return nil +} + +// writeUploadPackError writes a "server is busy" error message that git +// understands and prints to stdout. UploadPack expects to receive pack data in +// PKT-LINE format. An error-line can be passed that begins with ERR. +// See https://git-scm.com/docs/pack-protocol/2.29.0#_pkt_line_format. +func writeUploadPackError(w io.Writer) error { + _, err := fmt.Fprintf(w, "%04xERR GitLab is currently unable to handle this request due to load.\n", 71) + return err +} diff --git a/workhorse/internal/git/error_test.go b/workhorse/internal/git/error_test.go new file mode 100644 index 00000000000..d87c81fc83c --- /dev/null +++ b/workhorse/internal/git/error_test.go @@ -0,0 +1,80 @@ +package git + +import ( + "bytes" + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/durationpb" +) + +func TestHandleLimitErr(t *testing.T) { + testCases := []struct { + desc string + errWriter func(io.Writer) error + expectedBytes []byte + }{ + { + desc: "upload pack", + errWriter: writeUploadPackError, + expectedBytes: bytes.Join([][]byte{ + []byte{'0', '0', '4', '7'}, + []byte("ERR GitLab is currently unable to handle this request due to load.\n"), + }, []byte{}), + }, + { + desc: "recieve pack", + errWriter: writeReceivePackError, + expectedBytes: bytes.Join([][]byte{ + {'0', '0', '2', '3', 1, '0', '0', '1', 'a'}, + []byte("unpack server is busy\n"), + {'0', '0', '0', '0', '0', '0', '4', '4', 2}, + []byte("GitLab is currently unable to handle this request due to load.\n"), + {'0', '0', '0', '0'}, + }, []byte{}), + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + var body bytes.Buffer + err := errWithDetail(t, &gitalypb.LimitError{ + ErrorMessage: "concurrency queue wait time reached", + RetryAfter: durationpb.New(0)}) + + handleLimitErr(fmt.Errorf("wrapped error: %w", err), &body, tc.errWriter) + require.Equal(t, tc.expectedBytes, body.Bytes()) + }) + } + + t.Run("non LimitError", func(t *testing.T) { + var body bytes.Buffer + err := status.Error(codes.Internal, "some internal error") + handleLimitErr(fmt.Errorf("wrapped error: %w", err), &body, writeUploadPackError) + require.Equal(t, []byte(nil), body.Bytes()) + + handleLimitErr(fmt.Errorf("wrapped error: %w", err), &body, writeReceivePackError) + require.Equal(t, []byte(nil), body.Bytes()) + + }) +} + +// errWithDetail adds the given details to the error if it is a gRPC status whose code is not OK. +func errWithDetail(t *testing.T, detail proto.Message) error { + st := status.New(codes.Unavailable, "too busy") + + proto := st.Proto() + marshaled, err := anypb.New(detail) + require.NoError(t, err) + + proto.Details = append(proto.Details, marshaled) + + return status.ErrorProto(proto) +} diff --git a/workhorse/internal/git/format-patch.go b/workhorse/internal/git/format-patch.go index 2e52fdf6c33..d52c4ef7dee 100644 --- a/workhorse/internal/git/format-patch.go +++ b/workhorse/internal/git/format-patch.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/log" @@ -14,7 +15,7 @@ import ( type patch struct{ senddata.Prefix } type patchParams struct { - GitalyServer gitaly.Server + GitalyServer api.GitalyServer RawPatchRequest string } @@ -33,7 +34,12 @@ func (p *patch) Inject(w http.ResponseWriter, r *http.Request, sendData string) return } - ctx, diffClient, err := gitaly.NewDiffClient(r.Context(), params.GitalyServer) + ctx, diffClient, err := gitaly.NewDiffClient( + r.Context(), + params.GitalyServer, + gitaly.WithFeatures(params.GitalyServer.Features), + ) + if err != nil { helper.Fail500(w, r, fmt.Errorf("diff.RawPatch: %v", err)) return diff --git a/workhorse/internal/git/git-http.go b/workhorse/internal/git/git-http.go index 7f5c1b6c584..86007e16064 100644 --- a/workhorse/internal/git/git-http.go +++ b/workhorse/internal/git/git-http.go @@ -22,11 +22,11 @@ const ( ) func ReceivePack(a *api.API) http.Handler { - return postRPCHandler(a, "handleReceivePack", handleReceivePack) + return postRPCHandler(a, "handleReceivePack", handleReceivePack, writeReceivePackError) } func UploadPack(a *api.API) http.Handler { - return postRPCHandler(a, "handleUploadPack", handleUploadPack) + return postRPCHandler(a, "handleUploadPack", handleUploadPack, writeUploadPackError) } func gitConfigOptions(a *api.Response) []string { @@ -39,7 +39,12 @@ func gitConfigOptions(a *api.Response) []string { return out } -func postRPCHandler(a *api.API, name string, handler func(*HttpResponseWriter, *http.Request, *api.Response) error) http.Handler { +func postRPCHandler( + a *api.API, + name string, + handler func(*HttpResponseWriter, *http.Request, *api.Response) error, + errWriter func(io.Writer) error, +) http.Handler { return repoPreAuthorizeHandler(a, func(rw http.ResponseWriter, r *http.Request, ar *api.Response) { cr := &countReadCloser{ReadCloser: r.Body} r.Body = cr @@ -50,7 +55,8 @@ func postRPCHandler(a *api.API, name string, handler func(*HttpResponseWriter, * }() if err := handler(w, r, ar); err != nil { - // If the handler already wrote a response this WriteHeader call is a + handleLimitErr(err, w, errWriter) + // If the handler, or handleLimitErr already wrote a response this WriteHeader call is a // no-op. It never reaches net/http because GitHttpResponseWriter calls // WriteHeader on its underlying ResponseWriter at most once. w.WriteHeader(500) diff --git a/workhorse/internal/git/info-refs.go b/workhorse/internal/git/info-refs.go index b7f825839f8..2eaed388f60 100644 --- a/workhorse/internal/git/info-refs.go +++ b/workhorse/internal/git/info-refs.go @@ -55,7 +55,13 @@ func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response) } func handleGetInfoRefsWithGitaly(ctx context.Context, responseWriter *HttpResponseWriter, a *api.Response, rpc, gitProtocol, encoding string) error { - ctx, smarthttp, err := gitaly.NewSmartHTTPClient(ctx, a.GitalyServer) + ctx, smarthttp, err := gitaly.NewSmartHTTPClient( + ctx, + a.GitalyServer, + gitaly.WithFeatures(a.GitalyServer.Features), + gitaly.WithUserID(a.GL_ID), + gitaly.WithUsername(a.GL_USERNAME), + ) if err != nil { return err } diff --git a/workhorse/internal/git/info-refs_test.go b/workhorse/internal/git/info-refs_test.go index 4f23d1ac174..0df74abe81d 100644 --- a/workhorse/internal/git/info-refs_test.go +++ b/workhorse/internal/git/info-refs_test.go @@ -11,7 +11,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" ) type smartHTTPServiceServerWithInfoRefs struct { @@ -32,7 +31,7 @@ func TestGetInfoRefsHandler(t *testing.T) { w := httptest.NewRecorder() r := httptest.NewRequest("GET", "/?service=git-upload-pack", nil) - a := &api.Response{GitalyServer: gitaly.Server{Address: addr}} + a := &api.Response{GitalyServer: api.GitalyServer{Address: addr}} handleGetInfoRefs(NewHttpResponseWriter(w), r, a) require.Equal(t, 503, w.Code) diff --git a/workhorse/internal/git/receive-pack.go b/workhorse/internal/git/receive-pack.go index ccde9331b83..a85f0edccac 100644 --- a/workhorse/internal/git/receive-pack.go +++ b/workhorse/internal/git/receive-pack.go @@ -20,13 +20,19 @@ func handleReceivePack(w *HttpResponseWriter, r *http.Request, a *api.Response) gitProtocol := r.Header.Get("Git-Protocol") - ctx, smarthttp, err := gitaly.NewSmartHTTPClient(r.Context(), a.GitalyServer) + ctx, smarthttp, err := gitaly.NewSmartHTTPClient( + r.Context(), + a.GitalyServer, + gitaly.WithFeatures(a.GitalyServer.Features), + gitaly.WithUserID(a.GL_ID), + gitaly.WithUsername(a.GL_USERNAME), + ) if err != nil { return fmt.Errorf("smarthttp.ReceivePack: %v", err) } if err := smarthttp.ReceivePack(ctx, &a.Repository, a.GL_ID, a.GL_USERNAME, a.GL_REPOSITORY, a.GitConfigOptions, cr, cw, gitProtocol); err != nil { - return fmt.Errorf("smarthttp.ReceivePack: %v", err) + return fmt.Errorf("smarthttp.ReceivePack: %w", err) } return nil diff --git a/workhorse/internal/git/snapshot.go b/workhorse/internal/git/snapshot.go index 152b2fc2b93..77b32f8a05d 100644 --- a/workhorse/internal/git/snapshot.go +++ b/workhorse/internal/git/snapshot.go @@ -7,6 +7,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/log" @@ -18,7 +19,7 @@ type snapshot struct { } type snapshotParams struct { - GitalyServer gitaly.Server + GitalyServer api.GitalyServer GetSnapshotRequest string } @@ -40,7 +41,12 @@ func (s *snapshot) Inject(w http.ResponseWriter, r *http.Request, sendData strin return } - ctx, c, err := gitaly.NewRepositoryClient(r.Context(), params.GitalyServer) + ctx, c, err := gitaly.NewRepositoryClient( + r.Context(), + params.GitalyServer, + gitaly.WithFeatures(params.GitalyServer.Features), + ) + if err != nil { helper.Fail500(w, r, fmt.Errorf("SendSnapshot: gitaly.NewRepositoryClient: %v", err)) return diff --git a/workhorse/internal/git/testhelper_test.go b/workhorse/internal/git/testhelper_test.go new file mode 100644 index 00000000000..8261dcd125f --- /dev/null +++ b/workhorse/internal/git/testhelper_test.go @@ -0,0 +1,15 @@ +package git + +import ( + "os" + "testing" + + "github.com/sirupsen/logrus" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" +) + +func TestMain(m *testing.M) { + gitaly.InitializeSidechannelRegistry(logrus.StandardLogger()) + os.Exit(m.Run()) +} diff --git a/workhorse/internal/git/upload-pack.go b/workhorse/internal/git/upload-pack.go index acf03284343..bbed5224b2d 100644 --- a/workhorse/internal/git/upload-pack.go +++ b/workhorse/internal/git/upload-pack.go @@ -44,13 +44,19 @@ func handleUploadPack(w *HttpResponseWriter, r *http.Request, a *api.Response) e } func handleUploadPackWithGitaly(ctx context.Context, a *api.Response, clientRequest io.Reader, clientResponse io.Writer, gitProtocol string) error { - ctx, smarthttp, err := gitaly.NewSmartHTTPClient(ctx, a.GitalyServer) + ctx, smarthttp, err := gitaly.NewSmartHTTPClient( + ctx, + a.GitalyServer, + gitaly.WithFeatures(a.GitalyServer.Features), + gitaly.WithUserID(a.GL_ID), + gitaly.WithUsername(a.GL_USERNAME), + ) if err != nil { - return fmt.Errorf("smarthttp.UploadPack: %v", err) + return fmt.Errorf("get gitaly client: %w", err) } if err := smarthttp.UploadPack(ctx, &a.Repository, clientRequest, clientResponse, gitConfigOptions(a), gitProtocol); err != nil { - return fmt.Errorf("smarthttp.UploadPack: %v", err) + return fmt.Errorf("do gitaly call: %w", err) } return nil diff --git a/workhorse/internal/git/upload-pack_test.go b/workhorse/internal/git/upload-pack_test.go index 211f68a2608..9ffc7117790 100644 --- a/workhorse/internal/git/upload-pack_test.go +++ b/workhorse/internal/git/upload-pack_test.go @@ -1,7 +1,10 @@ package git import ( + "context" + "errors" "fmt" + "io" "io/ioutil" "net" "net/http/httptest" @@ -13,32 +16,33 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" + "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" ) var ( originalUploadPackTimeout = uploadPackTimeout ) -type fakeReader struct { - n int - err error +type waitReader struct { + t time.Duration } -func (f *fakeReader) Read(b []byte) (int, error) { - return f.n, f.err +func (f *waitReader) Read(b []byte) (int, error) { + time.Sleep(f.t) + return 0, io.EOF } type smartHTTPServiceServer struct { gitalypb.UnimplementedSmartHTTPServiceServer - PostUploadPackFunc func(gitalypb.SmartHTTPService_PostUploadPackServer) error + handler func(context.Context, *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) } -func (srv *smartHTTPServiceServer) PostUploadPack(s gitalypb.SmartHTTPService_PostUploadPackServer) error { - return srv.PostUploadPackFunc(s) +func (srv *smartHTTPServiceServer) PostUploadPackWithSidechannel(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) { + return srv.handler(ctx, req) } func TestUploadPackTimesOut(t *testing.T) { @@ -46,21 +50,26 @@ func TestUploadPackTimesOut(t *testing.T) { defer func() { uploadPackTimeout = originalUploadPackTimeout }() addr := startSmartHTTPServer(t, &smartHTTPServiceServer{ - PostUploadPackFunc: func(stream gitalypb.SmartHTTPService_PostUploadPackServer) error { - _, err := stream.Recv() // trigger a read on the client request body - require.NoError(t, err) - return nil + handler: func(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) { + conn, err := client.OpenServerSidechannel(ctx) + if err != nil { + return nil, err + } + defer conn.Close() + + _, _ = io.Copy(ioutil.Discard, conn) + return &gitalypb.PostUploadPackWithSidechannelResponse{}, nil }, }) - body := &fakeReader{n: 0, err: nil} + body := &waitReader{t: 10 * time.Millisecond} w := httptest.NewRecorder() r := httptest.NewRequest("GET", "/", body) - a := &api.Response{GitalyServer: gitaly.Server{Address: addr}} + a := &api.Response{GitalyServer: api.GitalyServer{Address: addr}} err := handleUploadPack(NewHttpResponseWriter(w), r, a) - require.EqualError(t, err, "smarthttp.UploadPack: busyReader: context deadline exceeded") + require.True(t, errors.Is(err, context.DeadlineExceeded)) } func startSmartHTTPServer(t testing.TB, s gitalypb.SmartHTTPServiceServer) string { @@ -73,7 +82,7 @@ func startSmartHTTPServer(t testing.TB, s gitalypb.SmartHTTPServiceServer) strin ln, err := net.Listen("unix", socket) require.NoError(t, err) - srv := grpc.NewServer() + srv := grpc.NewServer(testhelper.WithSidechannel()) gitalypb.RegisterSmartHTTPServiceServer(srv, s) go func() { require.NoError(t, srv.Serve(ln)) diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go index 362f380dc4d..db1fd3f8abb 100644 --- a/workhorse/internal/gitaly/gitaly.go +++ b/workhorse/internal/gitaly/gitaly.go @@ -19,24 +19,18 @@ import ( gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" ) -type Server struct { - Address string `json:"address"` - Token string `json:"token"` - Features map[string]string `json:"features"` - Sidechannel bool `json:"sidechannel"` -} - type cacheKey struct { address, token string - sidechannel bool } -func (server Server) cacheKey() cacheKey { - return cacheKey{address: server.Address, token: server.Token, sidechannel: server.Sidechannel} +func getCacheKey(server api.GitalyServer) cacheKey { + return cacheKey{address: server.Address, token: server.Token} } type connectionsCache struct { @@ -73,19 +67,42 @@ func InitializeSidechannelRegistry(logger *logrus.Logger) { } } -func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context { - md := metadata.New(nil) - for k, v := range features { - if !strings.HasPrefix(k, "gitaly-feature-") { - continue +type MetadataFunc func(metadata.MD) + +func WithUserID(userID string) MetadataFunc { + return func(md metadata.MD) { + md.Append("user_id", userID) + } +} + +func WithUsername(username string) MetadataFunc { + return func(md metadata.MD) { + md.Append("username", username) + } +} + +func WithFeatures(features map[string]string) MetadataFunc { + return func(md metadata.MD) { + for k, v := range features { + if !strings.HasPrefix(k, "gitaly-feature-") { + continue + } + md.Append(k, v) } - md.Append(k, v) + } +} + +func withOutgoingMetadata(ctx context.Context, addMetadataFuncs ...MetadataFunc) context.Context { + md := metadata.New(nil) + + for _, f := range addMetadataFuncs { + f(md) } return metadata.NewOutgoingContext(ctx, md) } -func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *SmartHTTPClient, error) { +func NewSmartHTTPClient(ctx context.Context, server api.GitalyServer, metadataFuncs ...MetadataFunc) (context.Context, *SmartHTTPClient, error) { conn, err := getOrCreateConnection(server) if err != nil { return nil, nil, err @@ -94,50 +111,53 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S smartHTTPClient := &SmartHTTPClient{ SmartHTTPServiceClient: grpcClient, sidechannelRegistry: sidechannelRegistry, - useSidechannel: server.Sidechannel, } - return withOutgoingMetadata(ctx, server.Features), smartHTTPClient, nil + + return withOutgoingMetadata( + ctx, + metadataFuncs..., + ), smartHTTPClient, nil } -func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) { +func NewBlobClient(ctx context.Context, server api.GitalyServer, addMetadataFuncs ...MetadataFunc) (context.Context, *BlobClient, error) { conn, err := getOrCreateConnection(server) if err != nil { return nil, nil, err } grpcClient := gitalypb.NewBlobServiceClient(conn) - return withOutgoingMetadata(ctx, server.Features), &BlobClient{grpcClient}, nil + return withOutgoingMetadata(ctx, addMetadataFuncs...), &BlobClient{grpcClient}, nil } -func NewRepositoryClient(ctx context.Context, server Server) (context.Context, *RepositoryClient, error) { +func NewRepositoryClient(ctx context.Context, server api.GitalyServer, addMetadataFuncs ...MetadataFunc) (context.Context, *RepositoryClient, error) { conn, err := getOrCreateConnection(server) if err != nil { return nil, nil, err } grpcClient := gitalypb.NewRepositoryServiceClient(conn) - return withOutgoingMetadata(ctx, server.Features), &RepositoryClient{grpcClient}, nil + return withOutgoingMetadata(ctx, addMetadataFuncs...), &RepositoryClient{grpcClient}, nil } // NewNamespaceClient is only used by the Gitaly integration tests at present -func NewNamespaceClient(ctx context.Context, server Server) (context.Context, *NamespaceClient, error) { +func NewNamespaceClient(ctx context.Context, server api.GitalyServer, addMetadataFuncs ...MetadataFunc) (context.Context, *NamespaceClient, error) { conn, err := getOrCreateConnection(server) if err != nil { return nil, nil, err } grpcClient := gitalypb.NewNamespaceServiceClient(conn) - return withOutgoingMetadata(ctx, server.Features), &NamespaceClient{grpcClient}, nil + return withOutgoingMetadata(ctx, addMetadataFuncs...), &NamespaceClient{grpcClient}, nil } -func NewDiffClient(ctx context.Context, server Server) (context.Context, *DiffClient, error) { +func NewDiffClient(ctx context.Context, server api.GitalyServer, addMetadataFuncs ...MetadataFunc) (context.Context, *DiffClient, error) { conn, err := getOrCreateConnection(server) if err != nil { return nil, nil, err } grpcClient := gitalypb.NewDiffServiceClient(conn) - return withOutgoingMetadata(ctx, server.Features), &DiffClient{grpcClient}, nil + return withOutgoingMetadata(ctx, addMetadataFuncs...), &DiffClient{grpcClient}, nil } -func getOrCreateConnection(server Server) (*grpc.ClientConn, error) { - key := server.cacheKey() +func getOrCreateConnection(server api.GitalyServer) (*grpc.ClientConn, error) { + key := getCacheKey(server) cache.RLock() conn := cache.connections[key] @@ -173,7 +193,7 @@ func CloseConnections() { } } -func newConnection(server Server) (*grpc.ClientConn, error) { +func newConnection(server api.GitalyServer) (*grpc.ClientConn, error) { connOpts := append(gitalyclient.DefaultDialOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(server.Token)), grpc.WithStreamInterceptor( @@ -197,13 +217,7 @@ func newConnection(server Server) (*grpc.ClientConn, error) { ), ) - var conn *grpc.ClientConn - var connErr error - if server.Sidechannel { - conn, connErr = gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background - } else { - conn, connErr = gitalyclient.Dial(server.Address, connOpts) - } + conn, connErr := gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background label := "ok" if connErr != nil { diff --git a/workhorse/internal/gitaly/gitaly_test.go b/workhorse/internal/gitaly/gitaly_test.go index 9c54caae8c6..f693f102447 100644 --- a/workhorse/internal/gitaly/gitaly_test.go +++ b/workhorse/internal/gitaly/gitaly_test.go @@ -2,56 +2,72 @@ package gitaly import ( "context" + "os" "testing" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" -) - -func TestNewSmartHTTPClient(t *testing.T) { - ctx, client, err := NewSmartHTTPClient(context.Background(), serverFixture()) - require.NoError(t, err) - testOutgoingMetadata(t, ctx) - require.False(t, client.useSidechannel) - require.Nil(t, client.sidechannelRegistry) -} + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" +) -func TestNewSmartHTTPClientWithSidechannel(t *testing.T) { +func TestMain(m *testing.M) { InitializeSidechannelRegistry(logrus.StandardLogger()) + os.Exit(m.Run()) +} - fixture := serverFixture() - fixture.Sidechannel = true - - ctx, client, err := NewSmartHTTPClient(context.Background(), fixture) +func TestNewSmartHTTPClient(t *testing.T) { + ctx, client, err := NewSmartHTTPClient( + context.Background(), + serverFixture(), + WithFeatures(features()), + WithUsername("gl_username"), + WithUserID("gl_id"), + ) require.NoError(t, err) testOutgoingMetadata(t, ctx) - - require.True(t, client.useSidechannel) + testOutgoingIDAndUsername(t, ctx) require.NotNil(t, client.sidechannelRegistry) } func TestNewBlobClient(t *testing.T) { - ctx, _, err := NewBlobClient(context.Background(), serverFixture()) + ctx, _, err := NewBlobClient( + context.Background(), + serverFixture(), + WithFeatures(features()), + ) require.NoError(t, err) testOutgoingMetadata(t, ctx) } func TestNewRepositoryClient(t *testing.T) { - ctx, _, err := NewRepositoryClient(context.Background(), serverFixture()) + ctx, _, err := NewRepositoryClient( + context.Background(), + serverFixture(), + WithFeatures(features()), + ) + require.NoError(t, err) testOutgoingMetadata(t, ctx) } func TestNewNamespaceClient(t *testing.T) { - ctx, _, err := NewNamespaceClient(context.Background(), serverFixture()) + ctx, _, err := NewNamespaceClient( + context.Background(), + serverFixture(), + WithFeatures(features()), + ) require.NoError(t, err) testOutgoingMetadata(t, ctx) } func TestNewDiffClient(t *testing.T) { - ctx, _, err := NewDiffClient(context.Background(), serverFixture()) + ctx, _, err := NewDiffClient( + context.Background(), + serverFixture(), + WithFeatures(features()), + ) require.NoError(t, err) testOutgoingMetadata(t, ctx) } @@ -71,16 +87,29 @@ func testOutgoingMetadata(t *testing.T, ctx context.Context) { } } -func serverFixture() Server { +func testOutgoingIDAndUsername(t *testing.T, ctx context.Context) { + md, ok := metadata.FromOutgoingContext(ctx) + require.True(t, ok, "get metadata from context") + + require.Equal(t, md["user_id"], []string{"gl_id"}) + require.Equal(t, md["username"], []string{"gl_username"}) +} + +func features() map[string]string { features := make(map[string]string) for k, v := range allowedFeatures() { features[k] = v } + for k, v := range badFeatureMetadata() { features[k] = v } - return Server{Address: "tcp://localhost:123", Features: features} + return features +} + +func serverFixture() api.GitalyServer { + return api.GitalyServer{Address: "tcp://localhost:123"} } func allowedFeatures() map[string]string { diff --git a/workhorse/internal/gitaly/smarthttp.go b/workhorse/internal/gitaly/smarthttp.go index de6954efa60..12dffc3ccff 100644 --- a/workhorse/internal/gitaly/smarthttp.go +++ b/workhorse/internal/gitaly/smarthttp.go @@ -11,7 +11,6 @@ import ( ) type SmartHTTPClient struct { - useSidechannel bool sidechannelRegistry *gitalyclient.SidechannelRegistry gitalypb.SmartHTTPServiceClient } @@ -96,71 +95,17 @@ func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.R } func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { - if client.useSidechannel { - return client.runUploadPackWithSidechannel(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol) - } - - return client.runUploadPack(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol) -} - -func (client *SmartHTTPClient) runUploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { - stream, err := client.PostUploadPack(ctx) - if err != nil { - return err - } - - rpcRequest := &gitalypb.PostUploadPackRequest{ - Repository: repo, - GitConfigOptions: gitConfigOptions, - GitProtocol: gitProtocol, - } - - if err := stream.Send(rpcRequest); err != nil { - return fmt.Errorf("initial request: %v", err) - } - - numStreams := 2 - errC := make(chan error, numStreams) - - go func() { - rr := streamio.NewReader(func() ([]byte, error) { - response, err := stream.Recv() - return response.GetData(), err - }) - _, err := io.Copy(clientResponse, rr) - errC <- err - }() - - go func() { - sw := streamio.NewWriter(func(data []byte) error { - return stream.Send(&gitalypb.PostUploadPackRequest{Data: data}) - }) - _, err := io.Copy(sw, clientRequest) - stream.CloseSend() - errC <- err - }() - - for i := 0; i < numStreams; i++ { - if err := <-errC; err != nil { - return err - } - } - - return nil -} - -func (client *SmartHTTPClient) runUploadPackWithSidechannel(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { ctx, waiter := client.sidechannelRegistry.Register(ctx, func(conn gitalyclient.SidechannelConn) error { if _, err := io.Copy(conn, clientRequest); err != nil { - return err + return fmt.Errorf("copy request body: %w", err) } if err := conn.CloseWrite(); err != nil { - return fmt.Errorf("fail to signal sidechannel half-close: %w", err) + return fmt.Errorf("close request body: %w", err) } if _, err := io.Copy(clientResponse, conn); err != nil { - return err + return fmt.Errorf("copy response body: %w", err) } return nil @@ -174,11 +119,11 @@ func (client *SmartHTTPClient) runUploadPackWithSidechannel(ctx context.Context, } if _, err := client.PostUploadPackWithSidechannel(ctx, rpcRequest); err != nil { - return err + return fmt.Errorf("PostUploadPackWithSidechannel: %w", err) } if err := waiter.Close(); err != nil { - return fmt.Errorf("fail to close sidechannel connection: %w", err) + return fmt.Errorf("close sidechannel waiter: %w", err) } return nil diff --git a/workhorse/internal/helper/writeafterreader.go b/workhorse/internal/helper/writeafterreader.go index d583ae4a9b8..7df2279a86a 100644 --- a/workhorse/internal/helper/writeafterreader.go +++ b/workhorse/internal/helper/writeafterreader.go @@ -37,7 +37,7 @@ func (r *busyReader) Read(p []byte) (int, error) { n, err := r.Reader.Read(p) if err != nil { if err != io.EOF { - err = fmt.Errorf("busyReader: %v", err) + err = fmt.Errorf("busyReader: %w", err) } r.setError(err) } @@ -81,13 +81,13 @@ func (w *coupledWriter) Write(data []byte) (int, error) { if w.busyReader.IsBusy() { n, err := w.tempfileWrite(data) if err != nil { - w.writeError = fmt.Errorf("coupledWriter: %v", err) + w.writeError = fmt.Errorf("coupledWriter: %w", err) } return n, w.writeError } if err := w.Flush(); err != nil { - w.writeError = fmt.Errorf("coupledWriter: %v", err) + w.writeError = fmt.Errorf("coupledWriter: %w", err) return 0, w.writeError } diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go index 13e9fc3f051..82cb082f5f0 100644 --- a/workhorse/internal/redis/keywatcher.go +++ b/workhorse/internal/redis/keywatcher.go @@ -1,6 +1,7 @@ package redis import ( + "errors" "fmt" "strings" "sync" @@ -189,7 +190,9 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) defer delKeyChan(kw) currentValue, err := GetString(key) - if err != nil { + if errors.Is(err, redis.ErrNil) { + currentValue = "" + } else if err != nil { return WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET: %v", err) } if currentValue != value { diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go index 7ff5f8204c0..a2f2b73898f 100644 --- a/workhorse/internal/redis/keywatcher_test.go +++ b/workhorse/internal/redis/keywatcher_test.go @@ -1,10 +1,12 @@ package redis import ( + "errors" "sync" "testing" "time" + "github.com/gomodule/redigo/redis" "github.com/rafaeljusto/redigomock" "github.com/stretchr/testify/require" ) @@ -65,100 +67,191 @@ func processMessages(numWatchers int, value string) { processInner(psc) } -func TestWatchKeySeenChange(t *testing.T) { +type keyChangeTestCase struct { + desc string + returnValue string + isKeyMissing bool + watchValue string + processedValue string + expectedStatus WatchKeyStatus + timeout time.Duration +} + +func TestKeyChangesBubblesUpError(t *testing.T) { conn, td := setupMockPool() defer td() - conn.Command("GET", runnerKey).Expect("something") - - wg := &sync.WaitGroup{} - wg.Add(1) + conn.Command("GET", runnerKey).ExpectError(errors.New("test error")) - go func() { - val, err := WatchKey(runnerKey, "something", time.Second) - require.NoError(t, err, "Expected no error") - require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change") - wg.Done() - }() + _, err := WatchKey(runnerKey, "something", time.Second) + require.Error(t, err, "Expected error") - processMessages(1, "somethingelse") - wg.Wait() + deleteWatchers(runnerKey) } -func TestWatchKeyNoChange(t *testing.T) { - conn, td := setupMockPool() - defer td() +func TestKeyChangesInstantReturn(t *testing.T) { + testCases := []keyChangeTestCase{ + // WatchKeyStatusAlreadyChanged + { + desc: "sees change with key existing and changed", + returnValue: "somethingelse", + watchValue: "something", + expectedStatus: WatchKeyStatusAlreadyChanged, + timeout: time.Second, + }, + { + desc: "sees change with key non-existing", + isKeyMissing: true, + watchValue: "something", + processedValue: "somethingelse", + expectedStatus: WatchKeyStatusAlreadyChanged, + timeout: time.Second, + }, + // WatchKeyStatusTimeout + { + desc: "sees timeout with key existing and unchanged", + returnValue: "something", + watchValue: "something", + expectedStatus: WatchKeyStatusTimeout, + timeout: time.Millisecond, + }, + { + desc: "sees timeout with key non-existing and unchanged", + isKeyMissing: true, + watchValue: "", + expectedStatus: WatchKeyStatusTimeout, + timeout: time.Millisecond, + }, + } - conn.Command("GET", runnerKey).Expect("something") + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + conn, td := setupMockPool() + defer td() - wg := &sync.WaitGroup{} - wg.Add(1) + if tc.isKeyMissing { + conn.Command("GET", runnerKey).ExpectError(redis.ErrNil) + } else { + conn.Command("GET", runnerKey).Expect(tc.returnValue) + } - go func() { - val, err := WatchKey(runnerKey, "something", time.Second) - require.NoError(t, err, "Expected no error") - require.Equal(t, WatchKeyStatusNoChange, val, "Expected notification without change to value") - wg.Done() - }() + val, err := WatchKey(runnerKey, tc.watchValue, tc.timeout) - processMessages(1, "something") - wg.Wait() -} + require.NoError(t, err, "Expected no error") + require.Equal(t, tc.expectedStatus, val, "Expected value") -func TestWatchKeyTimeout(t *testing.T) { - conn, td := setupMockPool() - defer td() + deleteWatchers(runnerKey) + }) + } +} - conn.Command("GET", runnerKey).Expect("something") +func TestKeyChangesWhenWatching(t *testing.T) { + testCases := []keyChangeTestCase{ + // WatchKeyStatusSeenChange + { + desc: "sees change with key existing", + returnValue: "something", + watchValue: "something", + processedValue: "somethingelse", + expectedStatus: WatchKeyStatusSeenChange, + }, + { + desc: "sees change with key non-existing, when watching empty value", + isKeyMissing: true, + watchValue: "", + processedValue: "something", + expectedStatus: WatchKeyStatusSeenChange, + }, + // WatchKeyStatusNoChange + { + desc: "sees no change with key existing", + returnValue: "something", + watchValue: "something", + processedValue: "something", + expectedStatus: WatchKeyStatusNoChange, + }, + } - val, err := WatchKey(runnerKey, "something", time.Millisecond) - require.NoError(t, err, "Expected no error") - require.Equal(t, WatchKeyStatusTimeout, val, "Expected value to not change") + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + conn, td := setupMockPool() + defer td() - // Clean up watchers since Process isn't doing that for us (not running) - deleteWatchers(runnerKey) -} + if tc.isKeyMissing { + conn.Command("GET", runnerKey).ExpectError(redis.ErrNil) + } else { + conn.Command("GET", runnerKey).Expect(tc.returnValue) + } -func TestWatchKeyAlreadyChanged(t *testing.T) { - conn, td := setupMockPool() - defer td() + wg := &sync.WaitGroup{} + wg.Add(1) - conn.Command("GET", runnerKey).Expect("somethingelse") + go func() { + defer wg.Done() + val, err := WatchKey(runnerKey, tc.watchValue, time.Second) - val, err := WatchKey(runnerKey, "something", time.Second) - require.NoError(t, err, "Expected no error") - require.Equal(t, WatchKeyStatusAlreadyChanged, val, "Expected value to have already changed") + require.NoError(t, err, "Expected no error") + require.Equal(t, tc.expectedStatus, val, "Expected value") + }() - // Clean up watchers since Process isn't doing that for us (not running) - deleteWatchers(runnerKey) + processMessages(1, tc.processedValue) + wg.Wait() + }) + } } -func TestWatchKeyMassivelyParallel(t *testing.T) { - runTimes := 100 // 100 parallel watchers +func TestKeyChangesParallel(t *testing.T) { + testCases := []keyChangeTestCase{ + { + desc: "massively parallel, sees change with key existing", + returnValue: "something", + watchValue: "something", + processedValue: "somethingelse", + expectedStatus: WatchKeyStatusSeenChange, + }, + { + desc: "massively parallel, sees change with key existing, watching missing keys", + isKeyMissing: true, + watchValue: "", + processedValue: "somethingelse", + expectedStatus: WatchKeyStatusSeenChange, + }, + } - conn, td := setupMockPool() - defer td() + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + runTimes := 100 - wg := &sync.WaitGroup{} - wg.Add(runTimes) + conn, td := setupMockPool() + defer td() - getCmd := conn.Command("GET", runnerKey) + getCmd := conn.Command("GET", runnerKey) - for i := 0; i < runTimes; i++ { - getCmd = getCmd.Expect("something") - } + for i := 0; i < runTimes; i++ { + if tc.isKeyMissing { + getCmd = getCmd.ExpectError(redis.ErrNil) + } else { + getCmd = getCmd.Expect(tc.returnValue) + } + } - for i := 0; i < runTimes; i++ { - go func() { - val, err := WatchKey(runnerKey, "something", time.Second) - require.NoError(t, err, "Expected no error") - require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change") - wg.Done() - }() - } + wg := &sync.WaitGroup{} + wg.Add(runTimes) - processMessages(runTimes, "somethingelse") - wg.Wait() + for i := 0; i < runTimes; i++ { + go func() { + defer wg.Done() + val, err := WatchKey(runnerKey, tc.watchValue, time.Second) + + require.NoError(t, err, "Expected no error") + require.Equal(t, tc.expectedStatus, val, "Expected value") + }() + } + + processMessages(runTimes, tc.processedValue) + wg.Wait() + }) + } } func TestShutdown(t *testing.T) { diff --git a/workhorse/internal/testhelper/gitaly.go b/workhorse/internal/testhelper/gitaly.go index da2fbf30785..747d5e6d078 100644 --- a/workhorse/internal/testhelper/gitaly.go +++ b/workhorse/internal/testhelper/gitaly.go @@ -11,12 +11,15 @@ import ( "github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 + "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/log" ) @@ -181,93 +184,34 @@ func (s *GitalyTestServer) PostReceivePack(stream gitalypb.SmartHTTPService_Post return s.finalError() } -func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackServer) error { +func (s *GitalyTestServer) PostUploadPackWithSidechannel(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) { s.WaitGroup.Add(1) defer s.WaitGroup.Done() - req, err := stream.Recv() - if err != nil { - return err - } - if err := validateRepository(req.GetRepository()); err != nil { - return err - } - - marshaler := &jsonpb.Marshaler{} - jsonBytes := &bytes.Buffer{} - if err := marshaler.Marshal(jsonBytes, req); err != nil { - return err - } - - if err := stream.Send(&gitalypb.PostUploadPackResponse{ - Data: append(jsonBytes.Bytes(), 0), - }); err != nil { - return err - } - - nSends := 0 - // The body of the request starts in the second message. Gitaly streams PostUploadPack responses - // as soon as possible without reading the request completely first. We stream messages here - // directly back to the client to simulate the streaming of the actual implementation. - for { - req, err := stream.Recv() - if err != nil { - if err != io.EOF { - return err - } - break - } - - if err := stream.Send(&gitalypb.PostUploadPackResponse{Data: req.GetData()}); err != nil { - return err - } - - nSends++ - } - - if nSends <= 1 { - panic("should have sent more than one message") - } - - return s.finalError() -} - -// PostUploadPackWithSidechannel should be a part of smarthttp server in real -// server. In workhorse, setting up a real sidechannel server is troublesome. -// Therefore, we bring up a sidechannel server with a mock server exported via -// gitalyclient.TestSidechannelServer. This is the handler for that mock -// server. -func PostUploadPackWithSidechannel(srv interface{}, stream grpc.ServerStream, conn io.ReadWriteCloser) error { - if method, ok := grpc.Method(stream.Context()); !ok || method != "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel" { - return fmt.Errorf("unexpected method: %s", method) - } - - var req gitalypb.PostUploadPackWithSidechannelRequest - if err := stream.RecvMsg(&req); err != nil { - return err + return nil, err } - if err := validateRepository(req.GetRepository()); err != nil { - return err + conn, err := client.OpenServerSidechannel(ctx) + if err != nil { + return nil, err } + defer conn.Close() marshaler := &jsonpb.Marshaler{} jsonBytes := &bytes.Buffer{} - if err := marshaler.Marshal(jsonBytes, &req); err != nil { - return err - } - - // Bounce back all data back to the client, plus flushing bytes - if _, err := conn.Write(append(jsonBytes.Bytes(), 0)); err != nil { - return err + if err := marshaler.Marshal(jsonBytes, req); err != nil { + return nil, err } - if _, err := io.Copy(conn, conn); err != nil { - return err + if _, err := io.Copy(conn, io.MultiReader( + bytes.NewReader(append(jsonBytes.Bytes(), 0)), + conn, + )); err != nil { + return nil, err } - return stream.SendMsg(&gitalypb.PostUploadPackWithSidechannelResponse{}) + return &gitalypb.PostUploadPackWithSidechannelResponse{}, s.finalError() } func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAncestorRequest) (*gitalypb.CommitIsAncestorResponse, error) { @@ -424,3 +368,7 @@ func validateRepository(repo *gitalypb.Repository) error { } return nil } + +func WithSidechannel() grpc.ServerOption { + return client.SidechannelServer(logrus.NewEntry(logrus.StandardLogger()), insecure.NewCredentials()) +} diff --git a/workhorse/internal/upload/artifacts_uploader.go b/workhorse/internal/upload/artifacts_uploader.go index 2a91a05fe3d..c1c49638e21 100644 --- a/workhorse/internal/upload/artifacts_uploader.go +++ b/workhorse/internal/upload/artifacts_uploader.go @@ -35,7 +35,6 @@ var zipSubcommandsErrorsCounter = promauto.NewCounterVec( }, []string{"error"}) type artifactsUploadProcessor struct { - opts *destination.UploadOpts format string SavedFileTracker @@ -44,7 +43,7 @@ type artifactsUploadProcessor struct { // Artifacts is like a Multipart but specific for artifacts upload. func Artifacts(myAPI *api.API, h http.Handler, p Preparer) http.Handler { return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) { - opts, _, err := p.Prepare(a) + opts, err := p.Prepare(a) if err != nil { helper.Fail500(w, r, fmt.Errorf("UploadArtifacts: error preparing file storage options")) return @@ -52,7 +51,7 @@ func Artifacts(myAPI *api.API, h http.Handler, p Preparer) http.Handler { format := r.URL.Query().Get(ArtifactFormatKey) - mg := &artifactsUploadProcessor{opts: opts, format: format, SavedFileTracker: SavedFileTracker{Request: r}} + mg := &artifactsUploadProcessor{format: format, SavedFileTracker: SavedFileTracker{Request: r}} interceptMultipartFiles(w, r, h, a, mg, opts) }, "/authorize") } @@ -62,12 +61,9 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, defer metaWriter.Close() metaOpts := &destination.UploadOpts{ - LocalTempPath: a.opts.LocalTempPath, + LocalTempPath: os.TempDir(), TempFilePrefix: "metadata.gz", } - if metaOpts.LocalTempPath == "" { - metaOpts.LocalTempPath = os.TempDir() - } fileName := file.LocalPath if fileName == "" { diff --git a/workhorse/internal/upload/body_uploader.go b/workhorse/internal/upload/body_uploader.go index d831f9f43a1..6fb201fe677 100644 --- a/workhorse/internal/upload/body_uploader.go +++ b/workhorse/internal/upload/body_uploader.go @@ -17,7 +17,7 @@ import ( // request to gitlab-rails without the original request body. func RequestBody(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler { return rails.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) { - opts, verifier, err := p.Prepare(a) + opts, err := p.Prepare(a) if err != nil { helper.Fail500(w, r, fmt.Errorf("RequestBody: preparation failed: %v", err)) return @@ -29,13 +29,6 @@ func RequestBody(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler { return } - if verifier != nil { - if err := verifier.Verify(fh); err != nil { - helper.Fail500(w, r, fmt.Errorf("RequestBody: verification failed: %v", err)) - return - } - } - data := url.Values{} fields, err := fh.GitLabFinalizeFields("file") if err != nil { diff --git a/workhorse/internal/upload/body_uploader_test.go b/workhorse/internal/upload/body_uploader_test.go index 47490db8780..35772be5bc3 100644 --- a/workhorse/internal/upload/body_uploader_test.go +++ b/workhorse/internal/upload/body_uploader_test.go @@ -49,19 +49,6 @@ func TestRequestBodyCustomPreparer(t *testing.T) { require.Equal(t, fileContent, string(uploadEcho)) } -func TestRequestBodyCustomVerifier(t *testing.T) { - body := strings.NewReader(fileContent) - verifier := &mockVerifier{} - - resp := testUpload(&rails{}, &alwaysLocalPreparer{verifier: verifier}, echoProxy(t, fileLen), body) - require.Equal(t, http.StatusOK, resp.StatusCode) - - uploadEcho, err := ioutil.ReadAll(resp.Body) - require.NoError(t, err, "Can't read response body") - require.Equal(t, fileContent, string(uploadEcho)) - require.True(t, verifier.invoked, "Verifier.Verify not invoked") -} - func TestRequestBodyAuthorizationFailure(t *testing.T) { testNoProxyInvocation(t, http.StatusUnauthorized, &rails{unauthorized: true}, &alwaysLocalPreparer{}) } @@ -72,7 +59,6 @@ func TestRequestBodyErrors(t *testing.T) { preparer *alwaysLocalPreparer }{ {name: "Prepare failure", preparer: &alwaysLocalPreparer{prepareError: fmt.Errorf("")}}, - {name: "Verify failure", preparer: &alwaysLocalPreparer{verifier: &alwaysFailsVerifier{}}}, } for _, test := range tests { @@ -165,31 +151,14 @@ func (r *rails) PreAuthorizeHandler(next api.HandleFunc, _ string) http.Handler } type alwaysLocalPreparer struct { - verifier Verifier prepareError error } -func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*destination.UploadOpts, Verifier, error) { +func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*destination.UploadOpts, error) { opts, err := destination.GetOpts(&api.Response{TempPath: os.TempDir()}) if err != nil { - return nil, nil, err + return nil, err } - return opts, a.verifier, a.prepareError -} - -type alwaysFailsVerifier struct{} - -func (alwaysFailsVerifier) Verify(handler *destination.FileHandler) error { - return fmt.Errorf("Verification failed") -} - -type mockVerifier struct { - invoked bool -} - -func (m *mockVerifier) Verify(handler *destination.FileHandler) error { - m.invoked = true - - return nil + return opts, a.prepareError } diff --git a/workhorse/internal/upload/lfs_preparer.go b/workhorse/internal/upload/lfs_preparer.go deleted file mode 100644 index e7c5cf16a30..00000000000 --- a/workhorse/internal/upload/lfs_preparer.go +++ /dev/null @@ -1,47 +0,0 @@ -package upload - -import ( - "fmt" - - "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" -) - -type object struct { - size int64 - oid string -} - -func (l *object) Verify(fh *destination.FileHandler) error { - if fh.Size != l.size { - return fmt.Errorf("LFSObject: expected size %d, wrote %d", l.size, fh.Size) - } - - if fh.SHA256() != l.oid { - return fmt.Errorf("LFSObject: expected sha256 %s, got %s", l.oid, fh.SHA256()) - } - - return nil -} - -type uploadPreparer struct { - objectPreparer Preparer -} - -// NewLfs returns a new preparer instance which adds capability to a wrapped -// preparer to set options required for a LFS upload. -func NewLfsPreparer(c config.Config, objectPreparer Preparer) Preparer { - return &uploadPreparer{objectPreparer: objectPreparer} -} - -func (l *uploadPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) { - opts, _, err := l.objectPreparer.Prepare(a) - if err != nil { - return nil, nil, err - } - - opts.TempFilePrefix = a.LfsOid - - return opts, &object{oid: a.LfsOid, size: a.LfsSize}, nil -} diff --git a/workhorse/internal/upload/lfs_preparer_test.go b/workhorse/internal/upload/lfs_preparer_test.go deleted file mode 100644 index 6be4a7c2955..00000000000 --- a/workhorse/internal/upload/lfs_preparer_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package upload - -import ( - "testing" - - "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" - - "github.com/stretchr/testify/require" -) - -func TestLfsPreparerWithConfig(t *testing.T) { - lfsOid := "abcd1234" - creds := config.S3Credentials{ - AwsAccessKeyID: "test-key", - AwsSecretAccessKey: "test-secret", - } - - c := config.Config{ - ObjectStorageCredentials: config.ObjectStorageCredentials{ - Provider: "AWS", - S3Credentials: creds, - }, - } - - r := &api.Response{ - LfsOid: lfsOid, - RemoteObject: api.RemoteObject{ - ID: "the upload ID", - UseWorkhorseClient: true, - ObjectStorage: &api.ObjectStorageParams{ - Provider: "AWS", - }, - }, - } - - uploadPreparer := NewObjectStoragePreparer(c) - lfsPreparer := NewLfsPreparer(c, uploadPreparer) - opts, verifier, err := lfsPreparer.Prepare(r) - - require.NoError(t, err) - require.Equal(t, lfsOid, opts.TempFilePrefix) - require.True(t, opts.ObjectStorageConfig.IsAWS()) - require.True(t, opts.UseWorkhorseClient) - require.Equal(t, creds, opts.ObjectStorageConfig.S3Credentials) - require.NotNil(t, verifier) -} - -func TestLfsPreparerWithNoConfig(t *testing.T) { - c := config.Config{} - r := &api.Response{RemoteObject: api.RemoteObject{ID: "the upload ID"}} - uploadPreparer := NewObjectStoragePreparer(c) - lfsPreparer := NewLfsPreparer(c, uploadPreparer) - opts, verifier, err := lfsPreparer.Prepare(r) - - require.NoError(t, err) - require.False(t, opts.UseWorkhorseClient) - require.NotNil(t, verifier) -} diff --git a/workhorse/internal/upload/multipart_uploader.go b/workhorse/internal/upload/multipart_uploader.go index d0097f9e153..34675d2aa14 100644 --- a/workhorse/internal/upload/multipart_uploader.go +++ b/workhorse/internal/upload/multipart_uploader.go @@ -17,7 +17,7 @@ func Multipart(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler { return rails.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) { s := &SavedFileTracker{Request: r} - opts, _, err := p.Prepare(a) + opts, err := p.Prepare(a) if err != nil { helper.Fail500(w, r, fmt.Errorf("Multipart: error preparing file storage options")) return diff --git a/workhorse/internal/upload/object_storage_preparer.go b/workhorse/internal/upload/object_storage_preparer.go index f28f598c895..d237a9ca6bc 100644 --- a/workhorse/internal/upload/object_storage_preparer.go +++ b/workhorse/internal/upload/object_storage_preparer.go @@ -18,14 +18,14 @@ func NewObjectStoragePreparer(c config.Config) Preparer { return &ObjectStoragePreparer{credentials: c.ObjectStorageCredentials, config: c.ObjectStorageConfig} } -func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) { +func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*destination.UploadOpts, error) { opts, err := destination.GetOpts(a) if err != nil { - return nil, nil, err + return nil, err } opts.ObjectStorageConfig.URLMux = p.config.URLMux opts.ObjectStorageConfig.S3Credentials = p.credentials.S3Credentials - return opts, nil, nil + return opts, nil } diff --git a/workhorse/internal/upload/object_storage_preparer_test.go b/workhorse/internal/upload/object_storage_preparer_test.go index 5856a1bcc92..56de6bbf7d6 100644 --- a/workhorse/internal/upload/object_storage_preparer_test.go +++ b/workhorse/internal/upload/object_storage_preparer_test.go @@ -39,24 +39,22 @@ func TestPrepareWithS3Config(t *testing.T) { } p := upload.NewObjectStoragePreparer(c) - opts, v, err := p.Prepare(r) + opts, err := p.Prepare(r) require.NoError(t, err) require.True(t, opts.ObjectStorageConfig.IsAWS()) require.True(t, opts.UseWorkhorseClient) require.Equal(t, creds, opts.ObjectStorageConfig.S3Credentials) require.NotNil(t, opts.ObjectStorageConfig.URLMux) - require.Equal(t, nil, v) } func TestPrepareWithNoConfig(t *testing.T) { c := config.Config{} r := &api.Response{RemoteObject: api.RemoteObject{ID: "id"}} p := upload.NewObjectStoragePreparer(c) - opts, v, err := p.Prepare(r) + opts, err := p.Prepare(r) require.NoError(t, err) require.False(t, opts.UseWorkhorseClient) - require.Nil(t, v) require.Nil(t, opts.ObjectStorageConfig.URLMux) } diff --git a/workhorse/internal/upload/preparer.go b/workhorse/internal/upload/preparer.go index 46a4cac01b5..4d6d8bd1189 100644 --- a/workhorse/internal/upload/preparer.go +++ b/workhorse/internal/upload/preparer.go @@ -5,29 +5,18 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" ) -// Verifier is an optional pluggable behavior for upload paths. If -// Verify() returns an error, Workhorse will return an error response to -// the client instead of propagating the request to Rails. The motivating -// use case is Git LFS, where Workhorse checks the size and SHA256 -// checksum of the uploaded file. -type Verifier interface { - // Verify can abort the upload by returning an error - Verify(handler *destination.FileHandler) error -} - // Preparer is a pluggable behavior that interprets a Rails API response // and either tells Workhorse how to handle the upload, via the -// UploadOpts and Verifier, or it rejects the request by returning a -// non-nil error. Its intended use is to make sure the upload gets stored -// in the right location: either a local directory, or one of several -// supported object storage backends. +// UploadOpts, or it rejects the request by returning a non-nil error. +// Its intended use is to make sure the upload gets stored in the right +// location: either a local directory, or one of several supported object +// storage backends. type Preparer interface { - Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) + Prepare(a *api.Response) (*destination.UploadOpts, error) } type DefaultPreparer struct{} -func (s *DefaultPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) { - opts, err := destination.GetOpts(a) - return opts, nil, err +func (s *DefaultPreparer) Prepare(a *api.Response) (*destination.UploadOpts, error) { + return destination.GetOpts(a) } diff --git a/workhorse/internal/upload/uploads_test.go b/workhorse/internal/upload/uploads_test.go index 9d787b10d1c..a9c8834d4be 100644 --- a/workhorse/internal/upload/uploads_test.go +++ b/workhorse/internal/upload/uploads_test.go @@ -46,7 +46,7 @@ func (a *testFormProcessor) Finalize(ctx context.Context) error { func TestUploadTempPathRequirement(t *testing.T) { apiResponse := &api.Response{} preparer := &DefaultPreparer{} - _, _, err := preparer.Prepare(apiResponse) + _, err := preparer.Prepare(apiResponse) require.Error(t, err) } @@ -75,7 +75,7 @@ func TestUploadHandlerForwardingRawData(t *testing.T) { handler := newProxy(ts.URL) apiResponse := &api.Response{TempPath: tempPath} preparer := &DefaultPreparer{} - opts, _, err := preparer.Prepare(apiResponse) + opts, err := preparer.Prepare(apiResponse) require.NoError(t, err) interceptMultipartFiles(response, httpRequest, handler, apiResponse, nil, opts) @@ -146,7 +146,7 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) { apiResponse := &api.Response{TempPath: tempPath} preparer := &DefaultPreparer{} - opts, _, err := preparer.Prepare(apiResponse) + opts, err := preparer.Prepare(apiResponse) require.NoError(t, err) interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) @@ -215,7 +215,7 @@ func TestUploadHandlerDetectingInjectedMultiPartData(t *testing.T) { handler := newProxy(ts.URL) apiResponse := &api.Response{TempPath: tempPath} preparer := &DefaultPreparer{} - opts, _, err := preparer.Prepare(apiResponse) + opts, err := preparer.Prepare(apiResponse) require.NoError(t, err) interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) @@ -245,7 +245,7 @@ func TestUploadProcessingField(t *testing.T) { response := httptest.NewRecorder() apiResponse := &api.Response{TempPath: tempPath} preparer := &DefaultPreparer{} - opts, _, err := preparer.Prepare(apiResponse) + opts, err := preparer.Prepare(apiResponse) require.NoError(t, err) interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) @@ -276,7 +276,7 @@ func TestUploadingMultipleFiles(t *testing.T) { response := httptest.NewRecorder() apiResponse := &api.Response{TempPath: tempPath} preparer := &DefaultPreparer{} - opts, _, err := preparer.Prepare(apiResponse) + opts, err := preparer.Prepare(apiResponse) require.NoError(t, err) interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) @@ -332,7 +332,7 @@ func TestUploadProcessingFile(t *testing.T) { response := httptest.NewRecorder() apiResponse := &api.Response{TempPath: tempPath} preparer := &DefaultPreparer{} - opts, _, err := preparer.Prepare(apiResponse) + opts, err := preparer.Prepare(apiResponse) require.NoError(t, err) interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) @@ -378,7 +378,7 @@ func TestInvalidFileNames(t *testing.T) { response := httptest.NewRecorder() apiResponse := &api.Response{TempPath: tempPath} preparer := &DefaultPreparer{} - opts, _, err := preparer.Prepare(apiResponse) + opts, err := preparer.Prepare(apiResponse) require.NoError(t, err) interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts) @@ -444,7 +444,7 @@ func TestContentDispositionRewrite(t *testing.T) { response := httptest.NewRecorder() apiResponse := &api.Response{TempPath: tempPath} preparer := &DefaultPreparer{} - opts, _, err := preparer.Prepare(apiResponse) + opts, err := preparer.Prepare(apiResponse) require.NoError(t, err) interceptMultipartFiles(response, httpRequest, customHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts) @@ -567,7 +567,7 @@ func runUploadTest(t *testing.T, image []byte, filename string, httpCode int, ts handler := newProxy(ts.URL) apiResponse := &api.Response{TempPath: tempPath} preparer := &DefaultPreparer{} - opts, _, err := preparer.Prepare(apiResponse) + opts, err := preparer.Prepare(apiResponse) require.NoError(t, err) interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go index b1d76dfc1bd..dd106053f8b 100644 --- a/workhorse/internal/upstream/routes.go +++ b/workhorse/internal/upstream/routes.go @@ -46,13 +46,6 @@ type routeOptions struct { matchers []matcherFunc } -type uploadPreparers struct { - artifacts upload.Preparer - lfs upload.Preparer - packages upload.Preparer - uploads upload.Preparer -} - const ( apiPattern = `^/api/` ciAPIPattern = `^/ci/api/` @@ -225,13 +218,16 @@ func configureRoutes(u *upstream) { signingTripper := secret.NewRoundTripper(u.RoundTripper, u.Version) signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config, dependencyProxyInjector) - preparers := createUploadPreparers(u.Config) + preparer := upload.NewObjectStoragePreparer(u.Config) + requestBodyUploader := upload.RequestBody(api, signingProxy, preparer) + mimeMultipartUploader := upload.Multipart(api, signingProxy, preparer) + uploadPath := path.Join(u.DocumentRoot, "uploads/tmp") - tempfileMultipartProxy := upload.Multipart(&upload.SkipRailsAuthorizer{TempPath: uploadPath}, proxy, preparers.uploads) + tempfileMultipartProxy := upload.Multipart(&upload.SkipRailsAuthorizer{TempPath: uploadPath}, proxy, preparer) ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", tempfileMultipartProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout) ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration) - dependencyProxyInjector.SetUploadHandler(upload.RequestBody(api, signingProxy, preparers.packages)) + dependencyProxyInjector.SetUploadHandler(requestBodyUploader) // Serve static files or forward the requests defaultUpstream := static.ServeExisting( @@ -247,11 +243,11 @@ func configureRoutes(u *upstream) { u.route("GET", gitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)), u.route("POST", gitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))), u.route("POST", gitProjectPattern+`git-receive-pack\z`, contentEncodingHandler(git.ReceivePack(api)), withMatcher(isContentType("application/x-git-receive-pack-request"))), - u.route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, upload.RequestBody(api, signingProxy, preparers.lfs), withMatcher(isContentType("application/octet-stream"))), + u.route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, requestBodyUploader, withMatcher(isContentType("application/octet-stream"))), // CI Artifacts - u.route("POST", apiPattern+`v4/jobs/[0-9]+/artifacts\z`, contentEncodingHandler(upload.Artifacts(api, signingProxy, preparers.artifacts))), - u.route("POST", ciAPIPattern+`v1/builds/[0-9]+/artifacts\z`, contentEncodingHandler(upload.Artifacts(api, signingProxy, preparers.artifacts))), + u.route("POST", apiPattern+`v4/jobs/[0-9]+/artifacts\z`, contentEncodingHandler(upload.Artifacts(api, signingProxy, preparer))), + u.route("POST", ciAPIPattern+`v1/builds/[0-9]+/artifacts\z`, contentEncodingHandler(upload.Artifacts(api, signingProxy, preparer))), // ActionCable websocket u.wsRoute(`^/-/cable\z`, cableProxy), @@ -275,32 +271,32 @@ func configureRoutes(u *upstream) { // https://gitlab.com/gitlab-org/gitlab/-/merge_requests/56731. // Maven Artifact Repository - u.route("PUT", apiProjectPattern+`packages/maven/`, upload.RequestBody(api, signingProxy, preparers.packages)), + u.route("PUT", apiProjectPattern+`packages/maven/`, requestBodyUploader), // Conan Artifact Repository - u.route("PUT", apiPattern+`v4/packages/conan/`, upload.RequestBody(api, signingProxy, preparers.packages)), - u.route("PUT", apiProjectPattern+`packages/conan/`, upload.RequestBody(api, signingProxy, preparers.packages)), + u.route("PUT", apiPattern+`v4/packages/conan/`, requestBodyUploader), + u.route("PUT", apiProjectPattern+`packages/conan/`, requestBodyUploader), // Generic Packages Repository - u.route("PUT", apiProjectPattern+`packages/generic/`, upload.RequestBody(api, signingProxy, preparers.packages)), + u.route("PUT", apiProjectPattern+`packages/generic/`, requestBodyUploader), // NuGet Artifact Repository - u.route("PUT", apiProjectPattern+`packages/nuget/`, upload.Multipart(api, signingProxy, preparers.packages)), + u.route("PUT", apiProjectPattern+`packages/nuget/`, mimeMultipartUploader), // PyPI Artifact Repository - u.route("POST", apiProjectPattern+`packages/pypi`, upload.Multipart(api, signingProxy, preparers.packages)), + u.route("POST", apiProjectPattern+`packages/pypi`, mimeMultipartUploader), // Debian Artifact Repository - u.route("PUT", apiProjectPattern+`packages/debian/`, upload.RequestBody(api, signingProxy, preparers.packages)), + u.route("PUT", apiProjectPattern+`packages/debian/`, requestBodyUploader), // Gem Artifact Repository - u.route("POST", apiProjectPattern+`packages/rubygems/`, upload.RequestBody(api, signingProxy, preparers.packages)), + u.route("POST", apiProjectPattern+`packages/rubygems/`, requestBodyUploader), // Terraform Module Package Repository - u.route("PUT", apiProjectPattern+`packages/terraform/modules/`, upload.RequestBody(api, signingProxy, preparers.packages)), + u.route("PUT", apiProjectPattern+`packages/terraform/modules/`, requestBodyUploader), // Helm Artifact Repository - u.route("POST", apiProjectPattern+`packages/helm/api/[^/]+/charts\z`, upload.Multipart(api, signingProxy, preparers.packages)), + u.route("POST", apiProjectPattern+`packages/helm/api/[^/]+/charts\z`, mimeMultipartUploader), // We are porting API to disk acceleration // we need to declare each routes until we have fixed all the routes on the rails codebase. @@ -309,25 +305,25 @@ func configureRoutes(u *upstream) { u.route("POST", apiPattern+`graphql\z`, tempfileMultipartProxy), u.route("POST", apiTopicPattern, tempfileMultipartProxy), u.route("PUT", apiTopicPattern, tempfileMultipartProxy), - u.route("POST", apiPattern+`v4/groups/import`, upload.Multipart(api, signingProxy, preparers.uploads)), - u.route("POST", apiPattern+`v4/projects/import`, upload.Multipart(api, signingProxy, preparers.uploads)), + u.route("POST", apiPattern+`v4/groups/import`, mimeMultipartUploader), + u.route("POST", apiPattern+`v4/projects/import`, mimeMultipartUploader), // Project Import via UI upload acceleration - u.route("POST", importPattern+`gitlab_project`, upload.Multipart(api, signingProxy, preparers.uploads)), + u.route("POST", importPattern+`gitlab_project`, mimeMultipartUploader), // Group Import via UI upload acceleration - u.route("POST", importPattern+`gitlab_group`, upload.Multipart(api, signingProxy, preparers.uploads)), + u.route("POST", importPattern+`gitlab_group`, mimeMultipartUploader), // Issuable Metric image upload - u.route("POST", apiProjectPattern+`issues/[0-9]+/metric_images\z`, upload.Multipart(api, signingProxy, preparers.uploads)), + u.route("POST", apiProjectPattern+`issues/[0-9]+/metric_images\z`, mimeMultipartUploader), // Alert Metric image upload - u.route("POST", apiProjectPattern+`alert_management_alerts/[0-9]+/metric_images\z`, upload.Multipart(api, signingProxy, preparers.uploads)), + u.route("POST", apiProjectPattern+`alert_management_alerts/[0-9]+/metric_images\z`, mimeMultipartUploader), // Requirements Import via UI upload acceleration - u.route("POST", projectPattern+`requirements_management/requirements/import_csv`, upload.Multipart(api, signingProxy, preparers.uploads)), + u.route("POST", projectPattern+`requirements_management/requirements/import_csv`, mimeMultipartUploader), // Uploads via API - u.route("POST", apiProjectPattern+`uploads\z`, upload.Multipart(api, signingProxy, preparers.uploads)), + u.route("POST", apiProjectPattern+`uploads\z`, mimeMultipartUploader), // Explicitly proxy API requests u.route("", apiPattern, proxy), @@ -345,9 +341,9 @@ func configureRoutes(u *upstream) { ), // Uploads - u.route("POST", projectPattern+`uploads\z`, upload.Multipart(api, signingProxy, preparers.uploads)), - u.route("POST", snippetUploadPattern, upload.Multipart(api, signingProxy, preparers.uploads)), - u.route("POST", userUploadPattern, upload.Multipart(api, signingProxy, preparers.uploads)), + u.route("POST", projectPattern+`uploads\z`, mimeMultipartUploader), + u.route("POST", snippetUploadPattern, mimeMultipartUploader), + u.route("POST", userUploadPattern, mimeMultipartUploader), // health checks don't intercept errors and go straight to rails // TODO: We should probably not return a HTML deploy page? @@ -411,17 +407,6 @@ func configureRoutes(u *upstream) { } } -func createUploadPreparers(cfg config.Config) uploadPreparers { - defaultPreparer := upload.NewObjectStoragePreparer(cfg) - - return uploadPreparers{ - artifacts: defaultPreparer, - lfs: upload.NewLfsPreparer(cfg, defaultPreparer), - packages: defaultPreparer, - uploads: defaultPreparer, - } -} - func denyWebsocket(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if websocket.IsWebSocketUpgrade(r) { |