summaryrefslogtreecommitdiff
path: root/workhorse/internal
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2022-05-19 07:33:21 +0000
committerGitLab Bot <gitlab-bot@gitlab.com>2022-05-19 07:33:21 +0000
commit36a59d088eca61b834191dacea009677a96c052f (patch)
treee4f33972dab5d8ef79e3944a9f403035fceea43f /workhorse/internal
parenta1761f15ec2cae7c7f7bbda39a75494add0dfd6f (diff)
downloadgitlab-ce-36a59d088eca61b834191dacea009677a96c052f.tar.gz
Add latest changes from gitlab-org/gitlab@15-0-stable-eev15.0.0-rc42
Diffstat (limited to 'workhorse/internal')
-rw-r--r--workhorse/internal/api/api.go9
-rw-r--r--workhorse/internal/config/config.go14
-rw-r--r--workhorse/internal/git/archive.go10
-rw-r--r--workhorse/internal/git/blob.go10
-rw-r--r--workhorse/internal/git/diff.go9
-rw-r--r--workhorse/internal/git/error.go96
-rw-r--r--workhorse/internal/git/error_test.go80
-rw-r--r--workhorse/internal/git/format-patch.go10
-rw-r--r--workhorse/internal/git/git-http.go14
-rw-r--r--workhorse/internal/git/info-refs.go8
-rw-r--r--workhorse/internal/git/info-refs_test.go3
-rw-r--r--workhorse/internal/git/receive-pack.go10
-rw-r--r--workhorse/internal/git/snapshot.go10
-rw-r--r--workhorse/internal/git/testhelper_test.go15
-rw-r--r--workhorse/internal/git/upload-pack.go12
-rw-r--r--workhorse/internal/git/upload-pack_test.go43
-rw-r--r--workhorse/internal/gitaly/gitaly.go88
-rw-r--r--workhorse/internal/gitaly/gitaly_test.go73
-rw-r--r--workhorse/internal/gitaly/smarthttp.go65
-rw-r--r--workhorse/internal/helper/writeafterreader.go6
-rw-r--r--workhorse/internal/redis/keywatcher.go5
-rw-r--r--workhorse/internal/redis/keywatcher_test.go227
-rw-r--r--workhorse/internal/testhelper/gitaly.go94
-rw-r--r--workhorse/internal/upload/artifacts_uploader.go10
-rw-r--r--workhorse/internal/upload/body_uploader.go9
-rw-r--r--workhorse/internal/upload/body_uploader_test.go37
-rw-r--r--workhorse/internal/upload/lfs_preparer.go47
-rw-r--r--workhorse/internal/upload/lfs_preparer_test.go59
-rw-r--r--workhorse/internal/upload/multipart_uploader.go2
-rw-r--r--workhorse/internal/upload/object_storage_preparer.go6
-rw-r--r--workhorse/internal/upload/object_storage_preparer_test.go6
-rw-r--r--workhorse/internal/upload/preparer.go25
-rw-r--r--workhorse/internal/upload/uploads_test.go20
-rw-r--r--workhorse/internal/upstream/routes.go75
34 files changed, 667 insertions, 540 deletions
diff --git a/workhorse/internal/api/api.go b/workhorse/internal/api/api.go
index 896f59a322a..8954923ad75 100644
--- a/workhorse/internal/api/api.go
+++ b/workhorse/internal/api/api.go
@@ -17,7 +17,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
@@ -150,7 +149,7 @@ type Response struct {
// Used to communicate channel session details
Channel *ChannelSettings
// GitalyServer specifies an address and authentication token for a gitaly server we should connect to.
- GitalyServer gitaly.Server
+ GitalyServer GitalyServer
// Repository object for making gRPC requests to Gitaly.
Repository gitalypb.Repository
// For git-http, does the requestor have the right to view all refs?
@@ -163,6 +162,12 @@ type Response struct {
MaximumSize int64
}
+type GitalyServer struct {
+ Address string `json:"address"`
+ Token string `json:"token"`
+ Features map[string]string `json:"features"`
+}
+
// singleJoiningSlash is taken from reverseproxy.go:singleJoiningSlash
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
diff --git a/workhorse/internal/config/config.go b/workhorse/internal/config/config.go
index 60cfd567f5d..e83f55f43bf 100644
--- a/workhorse/internal/config/config.go
+++ b/workhorse/internal/config/config.go
@@ -84,6 +84,19 @@ type ImageResizerConfig struct {
MaxFilesize uint64 `toml:"max_filesize"`
}
+type TlsConfig struct {
+ Certificate string `toml:"certificate"`
+ Key string `toml:"key"`
+ MinVersion string `toml:"min_version"`
+ MaxVersion string `toml:"max_version"`
+}
+
+type ListenerConfig struct {
+ Network string `toml:"network"`
+ Addr string `toml:"addr"`
+ Tls *TlsConfig `toml:"tls"`
+}
+
type Config struct {
Redis *RedisConfig `toml:"redis"`
Backend *url.URL `toml:"-"`
@@ -106,6 +119,7 @@ type Config struct {
ShutdownTimeout TomlDuration `toml:"shutdown_timeout"`
TrustedCIDRsForXForwardedFor []string `toml:"trusted_cidrs_for_x_forwarded_for"`
TrustedCIDRsForPropagation []string `toml:"trusted_cidrs_for_propagation"`
+ Listeners []ListenerConfig `toml:"listeners"`
}
var DefaultImageResizerConfig = ImageResizerConfig{
diff --git a/workhorse/internal/git/archive.go b/workhorse/internal/git/archive.go
index fc12094cc14..e1d03828b63 100644
--- a/workhorse/internal/git/archive.go
+++ b/workhorse/internal/git/archive.go
@@ -22,6 +22,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
@@ -33,7 +34,7 @@ type archiveParams struct {
ArchivePath string
ArchivePrefix string
CommitId string
- GitalyServer gitaly.Server
+ GitalyServer api.GitalyServer
GitalyRepository gitalypb.Repository
DisableCache bool
GetArchiveRequest []byte
@@ -132,7 +133,12 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string
func handleArchiveWithGitaly(r *http.Request, params *archiveParams, format gitalypb.GetArchiveRequest_Format) (io.Reader, error) {
var request *gitalypb.GetArchiveRequest
- ctx, c, err := gitaly.NewRepositoryClient(r.Context(), params.GitalyServer)
+ ctx, c, err := gitaly.NewRepositoryClient(
+ r.Context(),
+ params.GitalyServer,
+ gitaly.WithFeatures(params.GitalyServer.Features),
+ )
+
if err != nil {
return nil, err
}
diff --git a/workhorse/internal/git/blob.go b/workhorse/internal/git/blob.go
index 3ea065766d0..192978e6c75 100644
--- a/workhorse/internal/git/blob.go
+++ b/workhorse/internal/git/blob.go
@@ -6,6 +6,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/senddata"
@@ -13,7 +14,7 @@ import (
type blob struct{ senddata.Prefix }
type blobParams struct {
- GitalyServer gitaly.Server
+ GitalyServer api.GitalyServer
GetBlobRequest gitalypb.GetBlobRequest
}
@@ -26,7 +27,12 @@ func (b *blob) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
return
}
- ctx, blobClient, err := gitaly.NewBlobClient(r.Context(), params.GitalyServer)
+ ctx, blobClient, err := gitaly.NewBlobClient(
+ r.Context(),
+ params.GitalyServer,
+ gitaly.WithFeatures(params.GitalyServer.Features),
+ )
+
if err != nil {
helper.Fail500(w, r, fmt.Errorf("blob.GetBlob: %v", err))
return
diff --git a/workhorse/internal/git/diff.go b/workhorse/internal/git/diff.go
index 4877eea045a..252db6f150b 100644
--- a/workhorse/internal/git/diff.go
+++ b/workhorse/internal/git/diff.go
@@ -6,6 +6,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
@@ -14,7 +15,7 @@ import (
type diff struct{ senddata.Prefix }
type diffParams struct {
- GitalyServer gitaly.Server
+ GitalyServer api.GitalyServer
RawDiffRequest string
}
@@ -33,7 +34,11 @@ func (d *diff) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
return
}
- ctx, diffClient, err := gitaly.NewDiffClient(r.Context(), params.GitalyServer)
+ ctx, diffClient, err := gitaly.NewDiffClient(
+ r.Context(),
+ params.GitalyServer,
+ gitaly.WithFeatures(params.GitalyServer.Features),
+ )
if err != nil {
helper.Fail500(w, r, fmt.Errorf("diff.RawDiff: %v", err))
return
diff --git a/workhorse/internal/git/error.go b/workhorse/internal/git/error.go
index 2b7cad6bb64..86a2ba44767 100644
--- a/workhorse/internal/git/error.go
+++ b/workhorse/internal/git/error.go
@@ -1,4 +1,100 @@
package git
+import (
+ "errors"
+ "fmt"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc/status"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
+)
+
+// For unwrapping google.golang.org/grpc/internal/status.Error
+type grpcErr interface {
+ GRPCStatus() *status.Status
+ Error() string
+}
+
// For cosmetic purposes in Sentry
type copyError struct{ error }
+
+// handleLimitErr handles errors that come back from Gitaly that may be a
+// LimitError. A LimitError is returned by Gitaly when it is at its limit in
+// handling requests. Since this is a known error, we should print a sensible
+// error message to the end user.
+func handleLimitErr(err error, w io.Writer, f func(w io.Writer) error) {
+ var statusErr grpcErr
+ if !errors.As(err, &statusErr) {
+ return
+ }
+
+ if st, ok := status.FromError(statusErr); ok {
+ details := st.Details()
+ for _, detail := range details {
+ switch detail.(type) {
+ case *gitalypb.LimitError:
+ if err := f(w); err != nil {
+ log.WithError(fmt.Errorf("handling limit error: %w", err))
+ }
+ }
+ }
+ }
+}
+
+// writeReceivePackError writes a "server is busy" error message to the
+// git-recieve-pack-result.
+//
+// 0023\x01001aunpack server is busy
+// 00000044\x2GitLab is currently unable to handle this request due to load.
+// 0000
+//
+// We write a line reporting that unpack failed, and then provide some progress
+// information through the side-band 2 channel.
+// See https://gitlab.com/gitlab-org/gitaly/-/tree/jc-return-structured-error-limits
+// for more details.
+func writeReceivePackError(w io.Writer) error {
+ if _, err := fmt.Fprintf(w, "%04x", 35); err != nil {
+ return err
+ }
+
+ if _, err := w.Write([]byte{0x01}); err != nil {
+ return err
+ }
+
+ if _, err := fmt.Fprintf(w, "%04xunpack server is busy\n", 26); err != nil {
+ return err
+ }
+
+ if _, err := w.Write([]byte("0000")); err != nil {
+ return err
+ }
+
+ if _, err := fmt.Fprintf(w, "%04x", 68); err != nil {
+ return err
+ }
+
+ if _, err := w.Write([]byte{0x2}); err != nil {
+ return err
+ }
+
+ if _, err := fmt.Fprint(w, "GitLab is currently unable to handle this request due to load.\n"); err != nil {
+ return err
+ }
+
+ if _, err := w.Write([]byte("0000")); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// writeUploadPackError writes a "server is busy" error message that git
+// understands and prints to stdout. UploadPack expects to receive pack data in
+// PKT-LINE format. An error-line can be passed that begins with ERR.
+// See https://git-scm.com/docs/pack-protocol/2.29.0#_pkt_line_format.
+func writeUploadPackError(w io.Writer) error {
+ _, err := fmt.Fprintf(w, "%04xERR GitLab is currently unable to handle this request due to load.\n", 71)
+ return err
+}
diff --git a/workhorse/internal/git/error_test.go b/workhorse/internal/git/error_test.go
new file mode 100644
index 00000000000..d87c81fc83c
--- /dev/null
+++ b/workhorse/internal/git/error_test.go
@@ -0,0 +1,80 @@
+package git
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/anypb"
+ "google.golang.org/protobuf/types/known/durationpb"
+)
+
+func TestHandleLimitErr(t *testing.T) {
+ testCases := []struct {
+ desc string
+ errWriter func(io.Writer) error
+ expectedBytes []byte
+ }{
+ {
+ desc: "upload pack",
+ errWriter: writeUploadPackError,
+ expectedBytes: bytes.Join([][]byte{
+ []byte{'0', '0', '4', '7'},
+ []byte("ERR GitLab is currently unable to handle this request due to load.\n"),
+ }, []byte{}),
+ },
+ {
+ desc: "recieve pack",
+ errWriter: writeReceivePackError,
+ expectedBytes: bytes.Join([][]byte{
+ {'0', '0', '2', '3', 1, '0', '0', '1', 'a'},
+ []byte("unpack server is busy\n"),
+ {'0', '0', '0', '0', '0', '0', '4', '4', 2},
+ []byte("GitLab is currently unable to handle this request due to load.\n"),
+ {'0', '0', '0', '0'},
+ }, []byte{}),
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ var body bytes.Buffer
+ err := errWithDetail(t, &gitalypb.LimitError{
+ ErrorMessage: "concurrency queue wait time reached",
+ RetryAfter: durationpb.New(0)})
+
+ handleLimitErr(fmt.Errorf("wrapped error: %w", err), &body, tc.errWriter)
+ require.Equal(t, tc.expectedBytes, body.Bytes())
+ })
+ }
+
+ t.Run("non LimitError", func(t *testing.T) {
+ var body bytes.Buffer
+ err := status.Error(codes.Internal, "some internal error")
+ handleLimitErr(fmt.Errorf("wrapped error: %w", err), &body, writeUploadPackError)
+ require.Equal(t, []byte(nil), body.Bytes())
+
+ handleLimitErr(fmt.Errorf("wrapped error: %w", err), &body, writeReceivePackError)
+ require.Equal(t, []byte(nil), body.Bytes())
+
+ })
+}
+
+// errWithDetail adds the given details to the error if it is a gRPC status whose code is not OK.
+func errWithDetail(t *testing.T, detail proto.Message) error {
+ st := status.New(codes.Unavailable, "too busy")
+
+ proto := st.Proto()
+ marshaled, err := anypb.New(detail)
+ require.NoError(t, err)
+
+ proto.Details = append(proto.Details, marshaled)
+
+ return status.ErrorProto(proto)
+}
diff --git a/workhorse/internal/git/format-patch.go b/workhorse/internal/git/format-patch.go
index 2e52fdf6c33..d52c4ef7dee 100644
--- a/workhorse/internal/git/format-patch.go
+++ b/workhorse/internal/git/format-patch.go
@@ -6,6 +6,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
@@ -14,7 +15,7 @@ import (
type patch struct{ senddata.Prefix }
type patchParams struct {
- GitalyServer gitaly.Server
+ GitalyServer api.GitalyServer
RawPatchRequest string
}
@@ -33,7 +34,12 @@ func (p *patch) Inject(w http.ResponseWriter, r *http.Request, sendData string)
return
}
- ctx, diffClient, err := gitaly.NewDiffClient(r.Context(), params.GitalyServer)
+ ctx, diffClient, err := gitaly.NewDiffClient(
+ r.Context(),
+ params.GitalyServer,
+ gitaly.WithFeatures(params.GitalyServer.Features),
+ )
+
if err != nil {
helper.Fail500(w, r, fmt.Errorf("diff.RawPatch: %v", err))
return
diff --git a/workhorse/internal/git/git-http.go b/workhorse/internal/git/git-http.go
index 7f5c1b6c584..86007e16064 100644
--- a/workhorse/internal/git/git-http.go
+++ b/workhorse/internal/git/git-http.go
@@ -22,11 +22,11 @@ const (
)
func ReceivePack(a *api.API) http.Handler {
- return postRPCHandler(a, "handleReceivePack", handleReceivePack)
+ return postRPCHandler(a, "handleReceivePack", handleReceivePack, writeReceivePackError)
}
func UploadPack(a *api.API) http.Handler {
- return postRPCHandler(a, "handleUploadPack", handleUploadPack)
+ return postRPCHandler(a, "handleUploadPack", handleUploadPack, writeUploadPackError)
}
func gitConfigOptions(a *api.Response) []string {
@@ -39,7 +39,12 @@ func gitConfigOptions(a *api.Response) []string {
return out
}
-func postRPCHandler(a *api.API, name string, handler func(*HttpResponseWriter, *http.Request, *api.Response) error) http.Handler {
+func postRPCHandler(
+ a *api.API,
+ name string,
+ handler func(*HttpResponseWriter, *http.Request, *api.Response) error,
+ errWriter func(io.Writer) error,
+) http.Handler {
return repoPreAuthorizeHandler(a, func(rw http.ResponseWriter, r *http.Request, ar *api.Response) {
cr := &countReadCloser{ReadCloser: r.Body}
r.Body = cr
@@ -50,7 +55,8 @@ func postRPCHandler(a *api.API, name string, handler func(*HttpResponseWriter, *
}()
if err := handler(w, r, ar); err != nil {
- // If the handler already wrote a response this WriteHeader call is a
+ handleLimitErr(err, w, errWriter)
+ // If the handler, or handleLimitErr already wrote a response this WriteHeader call is a
// no-op. It never reaches net/http because GitHttpResponseWriter calls
// WriteHeader on its underlying ResponseWriter at most once.
w.WriteHeader(500)
diff --git a/workhorse/internal/git/info-refs.go b/workhorse/internal/git/info-refs.go
index b7f825839f8..2eaed388f60 100644
--- a/workhorse/internal/git/info-refs.go
+++ b/workhorse/internal/git/info-refs.go
@@ -55,7 +55,13 @@ func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response)
}
func handleGetInfoRefsWithGitaly(ctx context.Context, responseWriter *HttpResponseWriter, a *api.Response, rpc, gitProtocol, encoding string) error {
- ctx, smarthttp, err := gitaly.NewSmartHTTPClient(ctx, a.GitalyServer)
+ ctx, smarthttp, err := gitaly.NewSmartHTTPClient(
+ ctx,
+ a.GitalyServer,
+ gitaly.WithFeatures(a.GitalyServer.Features),
+ gitaly.WithUserID(a.GL_ID),
+ gitaly.WithUsername(a.GL_USERNAME),
+ )
if err != nil {
return err
}
diff --git a/workhorse/internal/git/info-refs_test.go b/workhorse/internal/git/info-refs_test.go
index 4f23d1ac174..0df74abe81d 100644
--- a/workhorse/internal/git/info-refs_test.go
+++ b/workhorse/internal/git/info-refs_test.go
@@ -11,7 +11,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
)
type smartHTTPServiceServerWithInfoRefs struct {
@@ -32,7 +31,7 @@ func TestGetInfoRefsHandler(t *testing.T) {
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/?service=git-upload-pack", nil)
- a := &api.Response{GitalyServer: gitaly.Server{Address: addr}}
+ a := &api.Response{GitalyServer: api.GitalyServer{Address: addr}}
handleGetInfoRefs(NewHttpResponseWriter(w), r, a)
require.Equal(t, 503, w.Code)
diff --git a/workhorse/internal/git/receive-pack.go b/workhorse/internal/git/receive-pack.go
index ccde9331b83..a85f0edccac 100644
--- a/workhorse/internal/git/receive-pack.go
+++ b/workhorse/internal/git/receive-pack.go
@@ -20,13 +20,19 @@ func handleReceivePack(w *HttpResponseWriter, r *http.Request, a *api.Response)
gitProtocol := r.Header.Get("Git-Protocol")
- ctx, smarthttp, err := gitaly.NewSmartHTTPClient(r.Context(), a.GitalyServer)
+ ctx, smarthttp, err := gitaly.NewSmartHTTPClient(
+ r.Context(),
+ a.GitalyServer,
+ gitaly.WithFeatures(a.GitalyServer.Features),
+ gitaly.WithUserID(a.GL_ID),
+ gitaly.WithUsername(a.GL_USERNAME),
+ )
if err != nil {
return fmt.Errorf("smarthttp.ReceivePack: %v", err)
}
if err := smarthttp.ReceivePack(ctx, &a.Repository, a.GL_ID, a.GL_USERNAME, a.GL_REPOSITORY, a.GitConfigOptions, cr, cw, gitProtocol); err != nil {
- return fmt.Errorf("smarthttp.ReceivePack: %v", err)
+ return fmt.Errorf("smarthttp.ReceivePack: %w", err)
}
return nil
diff --git a/workhorse/internal/git/snapshot.go b/workhorse/internal/git/snapshot.go
index 152b2fc2b93..77b32f8a05d 100644
--- a/workhorse/internal/git/snapshot.go
+++ b/workhorse/internal/git/snapshot.go
@@ -7,6 +7,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
@@ -18,7 +19,7 @@ type snapshot struct {
}
type snapshotParams struct {
- GitalyServer gitaly.Server
+ GitalyServer api.GitalyServer
GetSnapshotRequest string
}
@@ -40,7 +41,12 @@ func (s *snapshot) Inject(w http.ResponseWriter, r *http.Request, sendData strin
return
}
- ctx, c, err := gitaly.NewRepositoryClient(r.Context(), params.GitalyServer)
+ ctx, c, err := gitaly.NewRepositoryClient(
+ r.Context(),
+ params.GitalyServer,
+ gitaly.WithFeatures(params.GitalyServer.Features),
+ )
+
if err != nil {
helper.Fail500(w, r, fmt.Errorf("SendSnapshot: gitaly.NewRepositoryClient: %v", err))
return
diff --git a/workhorse/internal/git/testhelper_test.go b/workhorse/internal/git/testhelper_test.go
new file mode 100644
index 00000000000..8261dcd125f
--- /dev/null
+++ b/workhorse/internal/git/testhelper_test.go
@@ -0,0 +1,15 @@
+package git
+
+import (
+ "os"
+ "testing"
+
+ "github.com/sirupsen/logrus"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
+)
+
+func TestMain(m *testing.M) {
+ gitaly.InitializeSidechannelRegistry(logrus.StandardLogger())
+ os.Exit(m.Run())
+}
diff --git a/workhorse/internal/git/upload-pack.go b/workhorse/internal/git/upload-pack.go
index acf03284343..bbed5224b2d 100644
--- a/workhorse/internal/git/upload-pack.go
+++ b/workhorse/internal/git/upload-pack.go
@@ -44,13 +44,19 @@ func handleUploadPack(w *HttpResponseWriter, r *http.Request, a *api.Response) e
}
func handleUploadPackWithGitaly(ctx context.Context, a *api.Response, clientRequest io.Reader, clientResponse io.Writer, gitProtocol string) error {
- ctx, smarthttp, err := gitaly.NewSmartHTTPClient(ctx, a.GitalyServer)
+ ctx, smarthttp, err := gitaly.NewSmartHTTPClient(
+ ctx,
+ a.GitalyServer,
+ gitaly.WithFeatures(a.GitalyServer.Features),
+ gitaly.WithUserID(a.GL_ID),
+ gitaly.WithUsername(a.GL_USERNAME),
+ )
if err != nil {
- return fmt.Errorf("smarthttp.UploadPack: %v", err)
+ return fmt.Errorf("get gitaly client: %w", err)
}
if err := smarthttp.UploadPack(ctx, &a.Repository, clientRequest, clientResponse, gitConfigOptions(a), gitProtocol); err != nil {
- return fmt.Errorf("smarthttp.UploadPack: %v", err)
+ return fmt.Errorf("do gitaly call: %w", err)
}
return nil
diff --git a/workhorse/internal/git/upload-pack_test.go b/workhorse/internal/git/upload-pack_test.go
index 211f68a2608..9ffc7117790 100644
--- a/workhorse/internal/git/upload-pack_test.go
+++ b/workhorse/internal/git/upload-pack_test.go
@@ -1,7 +1,10 @@
package git
import (
+ "context"
+ "errors"
"fmt"
+ "io"
"io/ioutil"
"net"
"net/http/httptest"
@@ -13,32 +16,33 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
)
var (
originalUploadPackTimeout = uploadPackTimeout
)
-type fakeReader struct {
- n int
- err error
+type waitReader struct {
+ t time.Duration
}
-func (f *fakeReader) Read(b []byte) (int, error) {
- return f.n, f.err
+func (f *waitReader) Read(b []byte) (int, error) {
+ time.Sleep(f.t)
+ return 0, io.EOF
}
type smartHTTPServiceServer struct {
gitalypb.UnimplementedSmartHTTPServiceServer
- PostUploadPackFunc func(gitalypb.SmartHTTPService_PostUploadPackServer) error
+ handler func(context.Context, *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error)
}
-func (srv *smartHTTPServiceServer) PostUploadPack(s gitalypb.SmartHTTPService_PostUploadPackServer) error {
- return srv.PostUploadPackFunc(s)
+func (srv *smartHTTPServiceServer) PostUploadPackWithSidechannel(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) {
+ return srv.handler(ctx, req)
}
func TestUploadPackTimesOut(t *testing.T) {
@@ -46,21 +50,26 @@ func TestUploadPackTimesOut(t *testing.T) {
defer func() { uploadPackTimeout = originalUploadPackTimeout }()
addr := startSmartHTTPServer(t, &smartHTTPServiceServer{
- PostUploadPackFunc: func(stream gitalypb.SmartHTTPService_PostUploadPackServer) error {
- _, err := stream.Recv() // trigger a read on the client request body
- require.NoError(t, err)
- return nil
+ handler: func(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) {
+ conn, err := client.OpenServerSidechannel(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer conn.Close()
+
+ _, _ = io.Copy(ioutil.Discard, conn)
+ return &gitalypb.PostUploadPackWithSidechannelResponse{}, nil
},
})
- body := &fakeReader{n: 0, err: nil}
+ body := &waitReader{t: 10 * time.Millisecond}
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/", body)
- a := &api.Response{GitalyServer: gitaly.Server{Address: addr}}
+ a := &api.Response{GitalyServer: api.GitalyServer{Address: addr}}
err := handleUploadPack(NewHttpResponseWriter(w), r, a)
- require.EqualError(t, err, "smarthttp.UploadPack: busyReader: context deadline exceeded")
+ require.True(t, errors.Is(err, context.DeadlineExceeded))
}
func startSmartHTTPServer(t testing.TB, s gitalypb.SmartHTTPServiceServer) string {
@@ -73,7 +82,7 @@ func startSmartHTTPServer(t testing.TB, s gitalypb.SmartHTTPServiceServer) strin
ln, err := net.Listen("unix", socket)
require.NoError(t, err)
- srv := grpc.NewServer()
+ srv := grpc.NewServer(testhelper.WithSidechannel())
gitalypb.RegisterSmartHTTPServiceServer(srv, s)
go func() {
require.NoError(t, srv.Serve(ln))
diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go
index 362f380dc4d..db1fd3f8abb 100644
--- a/workhorse/internal/gitaly/gitaly.go
+++ b/workhorse/internal/gitaly/gitaly.go
@@ -19,24 +19,18 @@ import (
gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
)
-type Server struct {
- Address string `json:"address"`
- Token string `json:"token"`
- Features map[string]string `json:"features"`
- Sidechannel bool `json:"sidechannel"`
-}
-
type cacheKey struct {
address, token string
- sidechannel bool
}
-func (server Server) cacheKey() cacheKey {
- return cacheKey{address: server.Address, token: server.Token, sidechannel: server.Sidechannel}
+func getCacheKey(server api.GitalyServer) cacheKey {
+ return cacheKey{address: server.Address, token: server.Token}
}
type connectionsCache struct {
@@ -73,19 +67,42 @@ func InitializeSidechannelRegistry(logger *logrus.Logger) {
}
}
-func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context {
- md := metadata.New(nil)
- for k, v := range features {
- if !strings.HasPrefix(k, "gitaly-feature-") {
- continue
+type MetadataFunc func(metadata.MD)
+
+func WithUserID(userID string) MetadataFunc {
+ return func(md metadata.MD) {
+ md.Append("user_id", userID)
+ }
+}
+
+func WithUsername(username string) MetadataFunc {
+ return func(md metadata.MD) {
+ md.Append("username", username)
+ }
+}
+
+func WithFeatures(features map[string]string) MetadataFunc {
+ return func(md metadata.MD) {
+ for k, v := range features {
+ if !strings.HasPrefix(k, "gitaly-feature-") {
+ continue
+ }
+ md.Append(k, v)
}
- md.Append(k, v)
+ }
+}
+
+func withOutgoingMetadata(ctx context.Context, addMetadataFuncs ...MetadataFunc) context.Context {
+ md := metadata.New(nil)
+
+ for _, f := range addMetadataFuncs {
+ f(md)
}
return metadata.NewOutgoingContext(ctx, md)
}
-func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *SmartHTTPClient, error) {
+func NewSmartHTTPClient(ctx context.Context, server api.GitalyServer, metadataFuncs ...MetadataFunc) (context.Context, *SmartHTTPClient, error) {
conn, err := getOrCreateConnection(server)
if err != nil {
return nil, nil, err
@@ -94,50 +111,53 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S
smartHTTPClient := &SmartHTTPClient{
SmartHTTPServiceClient: grpcClient,
sidechannelRegistry: sidechannelRegistry,
- useSidechannel: server.Sidechannel,
}
- return withOutgoingMetadata(ctx, server.Features), smartHTTPClient, nil
+
+ return withOutgoingMetadata(
+ ctx,
+ metadataFuncs...,
+ ), smartHTTPClient, nil
}
-func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) {
+func NewBlobClient(ctx context.Context, server api.GitalyServer, addMetadataFuncs ...MetadataFunc) (context.Context, *BlobClient, error) {
conn, err := getOrCreateConnection(server)
if err != nil {
return nil, nil, err
}
grpcClient := gitalypb.NewBlobServiceClient(conn)
- return withOutgoingMetadata(ctx, server.Features), &BlobClient{grpcClient}, nil
+ return withOutgoingMetadata(ctx, addMetadataFuncs...), &BlobClient{grpcClient}, nil
}
-func NewRepositoryClient(ctx context.Context, server Server) (context.Context, *RepositoryClient, error) {
+func NewRepositoryClient(ctx context.Context, server api.GitalyServer, addMetadataFuncs ...MetadataFunc) (context.Context, *RepositoryClient, error) {
conn, err := getOrCreateConnection(server)
if err != nil {
return nil, nil, err
}
grpcClient := gitalypb.NewRepositoryServiceClient(conn)
- return withOutgoingMetadata(ctx, server.Features), &RepositoryClient{grpcClient}, nil
+ return withOutgoingMetadata(ctx, addMetadataFuncs...), &RepositoryClient{grpcClient}, nil
}
// NewNamespaceClient is only used by the Gitaly integration tests at present
-func NewNamespaceClient(ctx context.Context, server Server) (context.Context, *NamespaceClient, error) {
+func NewNamespaceClient(ctx context.Context, server api.GitalyServer, addMetadataFuncs ...MetadataFunc) (context.Context, *NamespaceClient, error) {
conn, err := getOrCreateConnection(server)
if err != nil {
return nil, nil, err
}
grpcClient := gitalypb.NewNamespaceServiceClient(conn)
- return withOutgoingMetadata(ctx, server.Features), &NamespaceClient{grpcClient}, nil
+ return withOutgoingMetadata(ctx, addMetadataFuncs...), &NamespaceClient{grpcClient}, nil
}
-func NewDiffClient(ctx context.Context, server Server) (context.Context, *DiffClient, error) {
+func NewDiffClient(ctx context.Context, server api.GitalyServer, addMetadataFuncs ...MetadataFunc) (context.Context, *DiffClient, error) {
conn, err := getOrCreateConnection(server)
if err != nil {
return nil, nil, err
}
grpcClient := gitalypb.NewDiffServiceClient(conn)
- return withOutgoingMetadata(ctx, server.Features), &DiffClient{grpcClient}, nil
+ return withOutgoingMetadata(ctx, addMetadataFuncs...), &DiffClient{grpcClient}, nil
}
-func getOrCreateConnection(server Server) (*grpc.ClientConn, error) {
- key := server.cacheKey()
+func getOrCreateConnection(server api.GitalyServer) (*grpc.ClientConn, error) {
+ key := getCacheKey(server)
cache.RLock()
conn := cache.connections[key]
@@ -173,7 +193,7 @@ func CloseConnections() {
}
}
-func newConnection(server Server) (*grpc.ClientConn, error) {
+func newConnection(server api.GitalyServer) (*grpc.ClientConn, error) {
connOpts := append(gitalyclient.DefaultDialOpts,
grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(server.Token)),
grpc.WithStreamInterceptor(
@@ -197,13 +217,7 @@ func newConnection(server Server) (*grpc.ClientConn, error) {
),
)
- var conn *grpc.ClientConn
- var connErr error
- if server.Sidechannel {
- conn, connErr = gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background
- } else {
- conn, connErr = gitalyclient.Dial(server.Address, connOpts)
- }
+ conn, connErr := gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background
label := "ok"
if connErr != nil {
diff --git a/workhorse/internal/gitaly/gitaly_test.go b/workhorse/internal/gitaly/gitaly_test.go
index 9c54caae8c6..f693f102447 100644
--- a/workhorse/internal/gitaly/gitaly_test.go
+++ b/workhorse/internal/gitaly/gitaly_test.go
@@ -2,56 +2,72 @@ package gitaly
import (
"context"
+ "os"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
-)
-
-func TestNewSmartHTTPClient(t *testing.T) {
- ctx, client, err := NewSmartHTTPClient(context.Background(), serverFixture())
- require.NoError(t, err)
- testOutgoingMetadata(t, ctx)
- require.False(t, client.useSidechannel)
- require.Nil(t, client.sidechannelRegistry)
-}
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+)
-func TestNewSmartHTTPClientWithSidechannel(t *testing.T) {
+func TestMain(m *testing.M) {
InitializeSidechannelRegistry(logrus.StandardLogger())
+ os.Exit(m.Run())
+}
- fixture := serverFixture()
- fixture.Sidechannel = true
-
- ctx, client, err := NewSmartHTTPClient(context.Background(), fixture)
+func TestNewSmartHTTPClient(t *testing.T) {
+ ctx, client, err := NewSmartHTTPClient(
+ context.Background(),
+ serverFixture(),
+ WithFeatures(features()),
+ WithUsername("gl_username"),
+ WithUserID("gl_id"),
+ )
require.NoError(t, err)
testOutgoingMetadata(t, ctx)
-
- require.True(t, client.useSidechannel)
+ testOutgoingIDAndUsername(t, ctx)
require.NotNil(t, client.sidechannelRegistry)
}
func TestNewBlobClient(t *testing.T) {
- ctx, _, err := NewBlobClient(context.Background(), serverFixture())
+ ctx, _, err := NewBlobClient(
+ context.Background(),
+ serverFixture(),
+ WithFeatures(features()),
+ )
require.NoError(t, err)
testOutgoingMetadata(t, ctx)
}
func TestNewRepositoryClient(t *testing.T) {
- ctx, _, err := NewRepositoryClient(context.Background(), serverFixture())
+ ctx, _, err := NewRepositoryClient(
+ context.Background(),
+ serverFixture(),
+ WithFeatures(features()),
+ )
+
require.NoError(t, err)
testOutgoingMetadata(t, ctx)
}
func TestNewNamespaceClient(t *testing.T) {
- ctx, _, err := NewNamespaceClient(context.Background(), serverFixture())
+ ctx, _, err := NewNamespaceClient(
+ context.Background(),
+ serverFixture(),
+ WithFeatures(features()),
+ )
require.NoError(t, err)
testOutgoingMetadata(t, ctx)
}
func TestNewDiffClient(t *testing.T) {
- ctx, _, err := NewDiffClient(context.Background(), serverFixture())
+ ctx, _, err := NewDiffClient(
+ context.Background(),
+ serverFixture(),
+ WithFeatures(features()),
+ )
require.NoError(t, err)
testOutgoingMetadata(t, ctx)
}
@@ -71,16 +87,29 @@ func testOutgoingMetadata(t *testing.T, ctx context.Context) {
}
}
-func serverFixture() Server {
+func testOutgoingIDAndUsername(t *testing.T, ctx context.Context) {
+ md, ok := metadata.FromOutgoingContext(ctx)
+ require.True(t, ok, "get metadata from context")
+
+ require.Equal(t, md["user_id"], []string{"gl_id"})
+ require.Equal(t, md["username"], []string{"gl_username"})
+}
+
+func features() map[string]string {
features := make(map[string]string)
for k, v := range allowedFeatures() {
features[k] = v
}
+
for k, v := range badFeatureMetadata() {
features[k] = v
}
- return Server{Address: "tcp://localhost:123", Features: features}
+ return features
+}
+
+func serverFixture() api.GitalyServer {
+ return api.GitalyServer{Address: "tcp://localhost:123"}
}
func allowedFeatures() map[string]string {
diff --git a/workhorse/internal/gitaly/smarthttp.go b/workhorse/internal/gitaly/smarthttp.go
index de6954efa60..12dffc3ccff 100644
--- a/workhorse/internal/gitaly/smarthttp.go
+++ b/workhorse/internal/gitaly/smarthttp.go
@@ -11,7 +11,6 @@ import (
)
type SmartHTTPClient struct {
- useSidechannel bool
sidechannelRegistry *gitalyclient.SidechannelRegistry
gitalypb.SmartHTTPServiceClient
}
@@ -96,71 +95,17 @@ func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.R
}
func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
- if client.useSidechannel {
- return client.runUploadPackWithSidechannel(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol)
- }
-
- return client.runUploadPack(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol)
-}
-
-func (client *SmartHTTPClient) runUploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
- stream, err := client.PostUploadPack(ctx)
- if err != nil {
- return err
- }
-
- rpcRequest := &gitalypb.PostUploadPackRequest{
- Repository: repo,
- GitConfigOptions: gitConfigOptions,
- GitProtocol: gitProtocol,
- }
-
- if err := stream.Send(rpcRequest); err != nil {
- return fmt.Errorf("initial request: %v", err)
- }
-
- numStreams := 2
- errC := make(chan error, numStreams)
-
- go func() {
- rr := streamio.NewReader(func() ([]byte, error) {
- response, err := stream.Recv()
- return response.GetData(), err
- })
- _, err := io.Copy(clientResponse, rr)
- errC <- err
- }()
-
- go func() {
- sw := streamio.NewWriter(func(data []byte) error {
- return stream.Send(&gitalypb.PostUploadPackRequest{Data: data})
- })
- _, err := io.Copy(sw, clientRequest)
- stream.CloseSend()
- errC <- err
- }()
-
- for i := 0; i < numStreams; i++ {
- if err := <-errC; err != nil {
- return err
- }
- }
-
- return nil
-}
-
-func (client *SmartHTTPClient) runUploadPackWithSidechannel(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
ctx, waiter := client.sidechannelRegistry.Register(ctx, func(conn gitalyclient.SidechannelConn) error {
if _, err := io.Copy(conn, clientRequest); err != nil {
- return err
+ return fmt.Errorf("copy request body: %w", err)
}
if err := conn.CloseWrite(); err != nil {
- return fmt.Errorf("fail to signal sidechannel half-close: %w", err)
+ return fmt.Errorf("close request body: %w", err)
}
if _, err := io.Copy(clientResponse, conn); err != nil {
- return err
+ return fmt.Errorf("copy response body: %w", err)
}
return nil
@@ -174,11 +119,11 @@ func (client *SmartHTTPClient) runUploadPackWithSidechannel(ctx context.Context,
}
if _, err := client.PostUploadPackWithSidechannel(ctx, rpcRequest); err != nil {
- return err
+ return fmt.Errorf("PostUploadPackWithSidechannel: %w", err)
}
if err := waiter.Close(); err != nil {
- return fmt.Errorf("fail to close sidechannel connection: %w", err)
+ return fmt.Errorf("close sidechannel waiter: %w", err)
}
return nil
diff --git a/workhorse/internal/helper/writeafterreader.go b/workhorse/internal/helper/writeafterreader.go
index d583ae4a9b8..7df2279a86a 100644
--- a/workhorse/internal/helper/writeafterreader.go
+++ b/workhorse/internal/helper/writeafterreader.go
@@ -37,7 +37,7 @@ func (r *busyReader) Read(p []byte) (int, error) {
n, err := r.Reader.Read(p)
if err != nil {
if err != io.EOF {
- err = fmt.Errorf("busyReader: %v", err)
+ err = fmt.Errorf("busyReader: %w", err)
}
r.setError(err)
}
@@ -81,13 +81,13 @@ func (w *coupledWriter) Write(data []byte) (int, error) {
if w.busyReader.IsBusy() {
n, err := w.tempfileWrite(data)
if err != nil {
- w.writeError = fmt.Errorf("coupledWriter: %v", err)
+ w.writeError = fmt.Errorf("coupledWriter: %w", err)
}
return n, w.writeError
}
if err := w.Flush(); err != nil {
- w.writeError = fmt.Errorf("coupledWriter: %v", err)
+ w.writeError = fmt.Errorf("coupledWriter: %w", err)
return 0, w.writeError
}
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go
index 13e9fc3f051..82cb082f5f0 100644
--- a/workhorse/internal/redis/keywatcher.go
+++ b/workhorse/internal/redis/keywatcher.go
@@ -1,6 +1,7 @@
package redis
import (
+ "errors"
"fmt"
"strings"
"sync"
@@ -189,7 +190,9 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error)
defer delKeyChan(kw)
currentValue, err := GetString(key)
- if err != nil {
+ if errors.Is(err, redis.ErrNil) {
+ currentValue = ""
+ } else if err != nil {
return WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET: %v", err)
}
if currentValue != value {
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go
index 7ff5f8204c0..a2f2b73898f 100644
--- a/workhorse/internal/redis/keywatcher_test.go
+++ b/workhorse/internal/redis/keywatcher_test.go
@@ -1,10 +1,12 @@
package redis
import (
+ "errors"
"sync"
"testing"
"time"
+ "github.com/gomodule/redigo/redis"
"github.com/rafaeljusto/redigomock"
"github.com/stretchr/testify/require"
)
@@ -65,100 +67,191 @@ func processMessages(numWatchers int, value string) {
processInner(psc)
}
-func TestWatchKeySeenChange(t *testing.T) {
+type keyChangeTestCase struct {
+ desc string
+ returnValue string
+ isKeyMissing bool
+ watchValue string
+ processedValue string
+ expectedStatus WatchKeyStatus
+ timeout time.Duration
+}
+
+func TestKeyChangesBubblesUpError(t *testing.T) {
conn, td := setupMockPool()
defer td()
- conn.Command("GET", runnerKey).Expect("something")
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
+ conn.Command("GET", runnerKey).ExpectError(errors.New("test error"))
- go func() {
- val, err := WatchKey(runnerKey, "something", time.Second)
- require.NoError(t, err, "Expected no error")
- require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change")
- wg.Done()
- }()
+ _, err := WatchKey(runnerKey, "something", time.Second)
+ require.Error(t, err, "Expected error")
- processMessages(1, "somethingelse")
- wg.Wait()
+ deleteWatchers(runnerKey)
}
-func TestWatchKeyNoChange(t *testing.T) {
- conn, td := setupMockPool()
- defer td()
+func TestKeyChangesInstantReturn(t *testing.T) {
+ testCases := []keyChangeTestCase{
+ // WatchKeyStatusAlreadyChanged
+ {
+ desc: "sees change with key existing and changed",
+ returnValue: "somethingelse",
+ watchValue: "something",
+ expectedStatus: WatchKeyStatusAlreadyChanged,
+ timeout: time.Second,
+ },
+ {
+ desc: "sees change with key non-existing",
+ isKeyMissing: true,
+ watchValue: "something",
+ processedValue: "somethingelse",
+ expectedStatus: WatchKeyStatusAlreadyChanged,
+ timeout: time.Second,
+ },
+ // WatchKeyStatusTimeout
+ {
+ desc: "sees timeout with key existing and unchanged",
+ returnValue: "something",
+ watchValue: "something",
+ expectedStatus: WatchKeyStatusTimeout,
+ timeout: time.Millisecond,
+ },
+ {
+ desc: "sees timeout with key non-existing and unchanged",
+ isKeyMissing: true,
+ watchValue: "",
+ expectedStatus: WatchKeyStatusTimeout,
+ timeout: time.Millisecond,
+ },
+ }
- conn.Command("GET", runnerKey).Expect("something")
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ conn, td := setupMockPool()
+ defer td()
- wg := &sync.WaitGroup{}
- wg.Add(1)
+ if tc.isKeyMissing {
+ conn.Command("GET", runnerKey).ExpectError(redis.ErrNil)
+ } else {
+ conn.Command("GET", runnerKey).Expect(tc.returnValue)
+ }
- go func() {
- val, err := WatchKey(runnerKey, "something", time.Second)
- require.NoError(t, err, "Expected no error")
- require.Equal(t, WatchKeyStatusNoChange, val, "Expected notification without change to value")
- wg.Done()
- }()
+ val, err := WatchKey(runnerKey, tc.watchValue, tc.timeout)
- processMessages(1, "something")
- wg.Wait()
-}
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, tc.expectedStatus, val, "Expected value")
-func TestWatchKeyTimeout(t *testing.T) {
- conn, td := setupMockPool()
- defer td()
+ deleteWatchers(runnerKey)
+ })
+ }
+}
- conn.Command("GET", runnerKey).Expect("something")
+func TestKeyChangesWhenWatching(t *testing.T) {
+ testCases := []keyChangeTestCase{
+ // WatchKeyStatusSeenChange
+ {
+ desc: "sees change with key existing",
+ returnValue: "something",
+ watchValue: "something",
+ processedValue: "somethingelse",
+ expectedStatus: WatchKeyStatusSeenChange,
+ },
+ {
+ desc: "sees change with key non-existing, when watching empty value",
+ isKeyMissing: true,
+ watchValue: "",
+ processedValue: "something",
+ expectedStatus: WatchKeyStatusSeenChange,
+ },
+ // WatchKeyStatusNoChange
+ {
+ desc: "sees no change with key existing",
+ returnValue: "something",
+ watchValue: "something",
+ processedValue: "something",
+ expectedStatus: WatchKeyStatusNoChange,
+ },
+ }
- val, err := WatchKey(runnerKey, "something", time.Millisecond)
- require.NoError(t, err, "Expected no error")
- require.Equal(t, WatchKeyStatusTimeout, val, "Expected value to not change")
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ conn, td := setupMockPool()
+ defer td()
- // Clean up watchers since Process isn't doing that for us (not running)
- deleteWatchers(runnerKey)
-}
+ if tc.isKeyMissing {
+ conn.Command("GET", runnerKey).ExpectError(redis.ErrNil)
+ } else {
+ conn.Command("GET", runnerKey).Expect(tc.returnValue)
+ }
-func TestWatchKeyAlreadyChanged(t *testing.T) {
- conn, td := setupMockPool()
- defer td()
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
- conn.Command("GET", runnerKey).Expect("somethingelse")
+ go func() {
+ defer wg.Done()
+ val, err := WatchKey(runnerKey, tc.watchValue, time.Second)
- val, err := WatchKey(runnerKey, "something", time.Second)
- require.NoError(t, err, "Expected no error")
- require.Equal(t, WatchKeyStatusAlreadyChanged, val, "Expected value to have already changed")
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, tc.expectedStatus, val, "Expected value")
+ }()
- // Clean up watchers since Process isn't doing that for us (not running)
- deleteWatchers(runnerKey)
+ processMessages(1, tc.processedValue)
+ wg.Wait()
+ })
+ }
}
-func TestWatchKeyMassivelyParallel(t *testing.T) {
- runTimes := 100 // 100 parallel watchers
+func TestKeyChangesParallel(t *testing.T) {
+ testCases := []keyChangeTestCase{
+ {
+ desc: "massively parallel, sees change with key existing",
+ returnValue: "something",
+ watchValue: "something",
+ processedValue: "somethingelse",
+ expectedStatus: WatchKeyStatusSeenChange,
+ },
+ {
+ desc: "massively parallel, sees change with key existing, watching missing keys",
+ isKeyMissing: true,
+ watchValue: "",
+ processedValue: "somethingelse",
+ expectedStatus: WatchKeyStatusSeenChange,
+ },
+ }
- conn, td := setupMockPool()
- defer td()
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ runTimes := 100
- wg := &sync.WaitGroup{}
- wg.Add(runTimes)
+ conn, td := setupMockPool()
+ defer td()
- getCmd := conn.Command("GET", runnerKey)
+ getCmd := conn.Command("GET", runnerKey)
- for i := 0; i < runTimes; i++ {
- getCmd = getCmd.Expect("something")
- }
+ for i := 0; i < runTimes; i++ {
+ if tc.isKeyMissing {
+ getCmd = getCmd.ExpectError(redis.ErrNil)
+ } else {
+ getCmd = getCmd.Expect(tc.returnValue)
+ }
+ }
- for i := 0; i < runTimes; i++ {
- go func() {
- val, err := WatchKey(runnerKey, "something", time.Second)
- require.NoError(t, err, "Expected no error")
- require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change")
- wg.Done()
- }()
- }
+ wg := &sync.WaitGroup{}
+ wg.Add(runTimes)
- processMessages(runTimes, "somethingelse")
- wg.Wait()
+ for i := 0; i < runTimes; i++ {
+ go func() {
+ defer wg.Done()
+ val, err := WatchKey(runnerKey, tc.watchValue, time.Second)
+
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, tc.expectedStatus, val, "Expected value")
+ }()
+ }
+
+ processMessages(runTimes, tc.processedValue)
+ wg.Wait()
+ })
+ }
}
func TestShutdown(t *testing.T) {
diff --git a/workhorse/internal/testhelper/gitaly.go b/workhorse/internal/testhelper/gitaly.go
index da2fbf30785..747d5e6d078 100644
--- a/workhorse/internal/testhelper/gitaly.go
+++ b/workhorse/internal/testhelper/gitaly.go
@@ -11,12 +11,15 @@ import (
"github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
+ "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/log"
)
@@ -181,93 +184,34 @@ func (s *GitalyTestServer) PostReceivePack(stream gitalypb.SmartHTTPService_Post
return s.finalError()
}
-func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackServer) error {
+func (s *GitalyTestServer) PostUploadPackWithSidechannel(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) {
s.WaitGroup.Add(1)
defer s.WaitGroup.Done()
- req, err := stream.Recv()
- if err != nil {
- return err
- }
-
if err := validateRepository(req.GetRepository()); err != nil {
- return err
- }
-
- marshaler := &jsonpb.Marshaler{}
- jsonBytes := &bytes.Buffer{}
- if err := marshaler.Marshal(jsonBytes, req); err != nil {
- return err
- }
-
- if err := stream.Send(&gitalypb.PostUploadPackResponse{
- Data: append(jsonBytes.Bytes(), 0),
- }); err != nil {
- return err
- }
-
- nSends := 0
- // The body of the request starts in the second message. Gitaly streams PostUploadPack responses
- // as soon as possible without reading the request completely first. We stream messages here
- // directly back to the client to simulate the streaming of the actual implementation.
- for {
- req, err := stream.Recv()
- if err != nil {
- if err != io.EOF {
- return err
- }
- break
- }
-
- if err := stream.Send(&gitalypb.PostUploadPackResponse{Data: req.GetData()}); err != nil {
- return err
- }
-
- nSends++
- }
-
- if nSends <= 1 {
- panic("should have sent more than one message")
- }
-
- return s.finalError()
-}
-
-// PostUploadPackWithSidechannel should be a part of smarthttp server in real
-// server. In workhorse, setting up a real sidechannel server is troublesome.
-// Therefore, we bring up a sidechannel server with a mock server exported via
-// gitalyclient.TestSidechannelServer. This is the handler for that mock
-// server.
-func PostUploadPackWithSidechannel(srv interface{}, stream grpc.ServerStream, conn io.ReadWriteCloser) error {
- if method, ok := grpc.Method(stream.Context()); !ok || method != "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel" {
- return fmt.Errorf("unexpected method: %s", method)
- }
-
- var req gitalypb.PostUploadPackWithSidechannelRequest
- if err := stream.RecvMsg(&req); err != nil {
- return err
+ return nil, err
}
- if err := validateRepository(req.GetRepository()); err != nil {
- return err
+ conn, err := client.OpenServerSidechannel(ctx)
+ if err != nil {
+ return nil, err
}
+ defer conn.Close()
marshaler := &jsonpb.Marshaler{}
jsonBytes := &bytes.Buffer{}
- if err := marshaler.Marshal(jsonBytes, &req); err != nil {
- return err
- }
-
- // Bounce back all data back to the client, plus flushing bytes
- if _, err := conn.Write(append(jsonBytes.Bytes(), 0)); err != nil {
- return err
+ if err := marshaler.Marshal(jsonBytes, req); err != nil {
+ return nil, err
}
- if _, err := io.Copy(conn, conn); err != nil {
- return err
+ if _, err := io.Copy(conn, io.MultiReader(
+ bytes.NewReader(append(jsonBytes.Bytes(), 0)),
+ conn,
+ )); err != nil {
+ return nil, err
}
- return stream.SendMsg(&gitalypb.PostUploadPackWithSidechannelResponse{})
+ return &gitalypb.PostUploadPackWithSidechannelResponse{}, s.finalError()
}
func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAncestorRequest) (*gitalypb.CommitIsAncestorResponse, error) {
@@ -424,3 +368,7 @@ func validateRepository(repo *gitalypb.Repository) error {
}
return nil
}
+
+func WithSidechannel() grpc.ServerOption {
+ return client.SidechannelServer(logrus.NewEntry(logrus.StandardLogger()), insecure.NewCredentials())
+}
diff --git a/workhorse/internal/upload/artifacts_uploader.go b/workhorse/internal/upload/artifacts_uploader.go
index 2a91a05fe3d..c1c49638e21 100644
--- a/workhorse/internal/upload/artifacts_uploader.go
+++ b/workhorse/internal/upload/artifacts_uploader.go
@@ -35,7 +35,6 @@ var zipSubcommandsErrorsCounter = promauto.NewCounterVec(
}, []string{"error"})
type artifactsUploadProcessor struct {
- opts *destination.UploadOpts
format string
SavedFileTracker
@@ -44,7 +43,7 @@ type artifactsUploadProcessor struct {
// Artifacts is like a Multipart but specific for artifacts upload.
func Artifacts(myAPI *api.API, h http.Handler, p Preparer) http.Handler {
return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) {
- opts, _, err := p.Prepare(a)
+ opts, err := p.Prepare(a)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("UploadArtifacts: error preparing file storage options"))
return
@@ -52,7 +51,7 @@ func Artifacts(myAPI *api.API, h http.Handler, p Preparer) http.Handler {
format := r.URL.Query().Get(ArtifactFormatKey)
- mg := &artifactsUploadProcessor{opts: opts, format: format, SavedFileTracker: SavedFileTracker{Request: r}}
+ mg := &artifactsUploadProcessor{format: format, SavedFileTracker: SavedFileTracker{Request: r}}
interceptMultipartFiles(w, r, h, a, mg, opts)
}, "/authorize")
}
@@ -62,12 +61,9 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context,
defer metaWriter.Close()
metaOpts := &destination.UploadOpts{
- LocalTempPath: a.opts.LocalTempPath,
+ LocalTempPath: os.TempDir(),
TempFilePrefix: "metadata.gz",
}
- if metaOpts.LocalTempPath == "" {
- metaOpts.LocalTempPath = os.TempDir()
- }
fileName := file.LocalPath
if fileName == "" {
diff --git a/workhorse/internal/upload/body_uploader.go b/workhorse/internal/upload/body_uploader.go
index d831f9f43a1..6fb201fe677 100644
--- a/workhorse/internal/upload/body_uploader.go
+++ b/workhorse/internal/upload/body_uploader.go
@@ -17,7 +17,7 @@ import (
// request to gitlab-rails without the original request body.
func RequestBody(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
return rails.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) {
- opts, verifier, err := p.Prepare(a)
+ opts, err := p.Prepare(a)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("RequestBody: preparation failed: %v", err))
return
@@ -29,13 +29,6 @@ func RequestBody(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
return
}
- if verifier != nil {
- if err := verifier.Verify(fh); err != nil {
- helper.Fail500(w, r, fmt.Errorf("RequestBody: verification failed: %v", err))
- return
- }
- }
-
data := url.Values{}
fields, err := fh.GitLabFinalizeFields("file")
if err != nil {
diff --git a/workhorse/internal/upload/body_uploader_test.go b/workhorse/internal/upload/body_uploader_test.go
index 47490db8780..35772be5bc3 100644
--- a/workhorse/internal/upload/body_uploader_test.go
+++ b/workhorse/internal/upload/body_uploader_test.go
@@ -49,19 +49,6 @@ func TestRequestBodyCustomPreparer(t *testing.T) {
require.Equal(t, fileContent, string(uploadEcho))
}
-func TestRequestBodyCustomVerifier(t *testing.T) {
- body := strings.NewReader(fileContent)
- verifier := &mockVerifier{}
-
- resp := testUpload(&rails{}, &alwaysLocalPreparer{verifier: verifier}, echoProxy(t, fileLen), body)
- require.Equal(t, http.StatusOK, resp.StatusCode)
-
- uploadEcho, err := ioutil.ReadAll(resp.Body)
- require.NoError(t, err, "Can't read response body")
- require.Equal(t, fileContent, string(uploadEcho))
- require.True(t, verifier.invoked, "Verifier.Verify not invoked")
-}
-
func TestRequestBodyAuthorizationFailure(t *testing.T) {
testNoProxyInvocation(t, http.StatusUnauthorized, &rails{unauthorized: true}, &alwaysLocalPreparer{})
}
@@ -72,7 +59,6 @@ func TestRequestBodyErrors(t *testing.T) {
preparer *alwaysLocalPreparer
}{
{name: "Prepare failure", preparer: &alwaysLocalPreparer{prepareError: fmt.Errorf("")}},
- {name: "Verify failure", preparer: &alwaysLocalPreparer{verifier: &alwaysFailsVerifier{}}},
}
for _, test := range tests {
@@ -165,31 +151,14 @@ func (r *rails) PreAuthorizeHandler(next api.HandleFunc, _ string) http.Handler
}
type alwaysLocalPreparer struct {
- verifier Verifier
prepareError error
}
-func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*destination.UploadOpts, Verifier, error) {
+func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*destination.UploadOpts, error) {
opts, err := destination.GetOpts(&api.Response{TempPath: os.TempDir()})
if err != nil {
- return nil, nil, err
+ return nil, err
}
- return opts, a.verifier, a.prepareError
-}
-
-type alwaysFailsVerifier struct{}
-
-func (alwaysFailsVerifier) Verify(handler *destination.FileHandler) error {
- return fmt.Errorf("Verification failed")
-}
-
-type mockVerifier struct {
- invoked bool
-}
-
-func (m *mockVerifier) Verify(handler *destination.FileHandler) error {
- m.invoked = true
-
- return nil
+ return opts, a.prepareError
}
diff --git a/workhorse/internal/upload/lfs_preparer.go b/workhorse/internal/upload/lfs_preparer.go
deleted file mode 100644
index e7c5cf16a30..00000000000
--- a/workhorse/internal/upload/lfs_preparer.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package upload
-
-import (
- "fmt"
-
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
-)
-
-type object struct {
- size int64
- oid string
-}
-
-func (l *object) Verify(fh *destination.FileHandler) error {
- if fh.Size != l.size {
- return fmt.Errorf("LFSObject: expected size %d, wrote %d", l.size, fh.Size)
- }
-
- if fh.SHA256() != l.oid {
- return fmt.Errorf("LFSObject: expected sha256 %s, got %s", l.oid, fh.SHA256())
- }
-
- return nil
-}
-
-type uploadPreparer struct {
- objectPreparer Preparer
-}
-
-// NewLfs returns a new preparer instance which adds capability to a wrapped
-// preparer to set options required for a LFS upload.
-func NewLfsPreparer(c config.Config, objectPreparer Preparer) Preparer {
- return &uploadPreparer{objectPreparer: objectPreparer}
-}
-
-func (l *uploadPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
- opts, _, err := l.objectPreparer.Prepare(a)
- if err != nil {
- return nil, nil, err
- }
-
- opts.TempFilePrefix = a.LfsOid
-
- return opts, &object{oid: a.LfsOid, size: a.LfsSize}, nil
-}
diff --git a/workhorse/internal/upload/lfs_preparer_test.go b/workhorse/internal/upload/lfs_preparer_test.go
deleted file mode 100644
index 6be4a7c2955..00000000000
--- a/workhorse/internal/upload/lfs_preparer_test.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package upload
-
-import (
- "testing"
-
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
-
- "github.com/stretchr/testify/require"
-)
-
-func TestLfsPreparerWithConfig(t *testing.T) {
- lfsOid := "abcd1234"
- creds := config.S3Credentials{
- AwsAccessKeyID: "test-key",
- AwsSecretAccessKey: "test-secret",
- }
-
- c := config.Config{
- ObjectStorageCredentials: config.ObjectStorageCredentials{
- Provider: "AWS",
- S3Credentials: creds,
- },
- }
-
- r := &api.Response{
- LfsOid: lfsOid,
- RemoteObject: api.RemoteObject{
- ID: "the upload ID",
- UseWorkhorseClient: true,
- ObjectStorage: &api.ObjectStorageParams{
- Provider: "AWS",
- },
- },
- }
-
- uploadPreparer := NewObjectStoragePreparer(c)
- lfsPreparer := NewLfsPreparer(c, uploadPreparer)
- opts, verifier, err := lfsPreparer.Prepare(r)
-
- require.NoError(t, err)
- require.Equal(t, lfsOid, opts.TempFilePrefix)
- require.True(t, opts.ObjectStorageConfig.IsAWS())
- require.True(t, opts.UseWorkhorseClient)
- require.Equal(t, creds, opts.ObjectStorageConfig.S3Credentials)
- require.NotNil(t, verifier)
-}
-
-func TestLfsPreparerWithNoConfig(t *testing.T) {
- c := config.Config{}
- r := &api.Response{RemoteObject: api.RemoteObject{ID: "the upload ID"}}
- uploadPreparer := NewObjectStoragePreparer(c)
- lfsPreparer := NewLfsPreparer(c, uploadPreparer)
- opts, verifier, err := lfsPreparer.Prepare(r)
-
- require.NoError(t, err)
- require.False(t, opts.UseWorkhorseClient)
- require.NotNil(t, verifier)
-}
diff --git a/workhorse/internal/upload/multipart_uploader.go b/workhorse/internal/upload/multipart_uploader.go
index d0097f9e153..34675d2aa14 100644
--- a/workhorse/internal/upload/multipart_uploader.go
+++ b/workhorse/internal/upload/multipart_uploader.go
@@ -17,7 +17,7 @@ func Multipart(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
return rails.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) {
s := &SavedFileTracker{Request: r}
- opts, _, err := p.Prepare(a)
+ opts, err := p.Prepare(a)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("Multipart: error preparing file storage options"))
return
diff --git a/workhorse/internal/upload/object_storage_preparer.go b/workhorse/internal/upload/object_storage_preparer.go
index f28f598c895..d237a9ca6bc 100644
--- a/workhorse/internal/upload/object_storage_preparer.go
+++ b/workhorse/internal/upload/object_storage_preparer.go
@@ -18,14 +18,14 @@ func NewObjectStoragePreparer(c config.Config) Preparer {
return &ObjectStoragePreparer{credentials: c.ObjectStorageCredentials, config: c.ObjectStorageConfig}
}
-func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
+func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*destination.UploadOpts, error) {
opts, err := destination.GetOpts(a)
if err != nil {
- return nil, nil, err
+ return nil, err
}
opts.ObjectStorageConfig.URLMux = p.config.URLMux
opts.ObjectStorageConfig.S3Credentials = p.credentials.S3Credentials
- return opts, nil, nil
+ return opts, nil
}
diff --git a/workhorse/internal/upload/object_storage_preparer_test.go b/workhorse/internal/upload/object_storage_preparer_test.go
index 5856a1bcc92..56de6bbf7d6 100644
--- a/workhorse/internal/upload/object_storage_preparer_test.go
+++ b/workhorse/internal/upload/object_storage_preparer_test.go
@@ -39,24 +39,22 @@ func TestPrepareWithS3Config(t *testing.T) {
}
p := upload.NewObjectStoragePreparer(c)
- opts, v, err := p.Prepare(r)
+ opts, err := p.Prepare(r)
require.NoError(t, err)
require.True(t, opts.ObjectStorageConfig.IsAWS())
require.True(t, opts.UseWorkhorseClient)
require.Equal(t, creds, opts.ObjectStorageConfig.S3Credentials)
require.NotNil(t, opts.ObjectStorageConfig.URLMux)
- require.Equal(t, nil, v)
}
func TestPrepareWithNoConfig(t *testing.T) {
c := config.Config{}
r := &api.Response{RemoteObject: api.RemoteObject{ID: "id"}}
p := upload.NewObjectStoragePreparer(c)
- opts, v, err := p.Prepare(r)
+ opts, err := p.Prepare(r)
require.NoError(t, err)
require.False(t, opts.UseWorkhorseClient)
- require.Nil(t, v)
require.Nil(t, opts.ObjectStorageConfig.URLMux)
}
diff --git a/workhorse/internal/upload/preparer.go b/workhorse/internal/upload/preparer.go
index 46a4cac01b5..4d6d8bd1189 100644
--- a/workhorse/internal/upload/preparer.go
+++ b/workhorse/internal/upload/preparer.go
@@ -5,29 +5,18 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
-// Verifier is an optional pluggable behavior for upload paths. If
-// Verify() returns an error, Workhorse will return an error response to
-// the client instead of propagating the request to Rails. The motivating
-// use case is Git LFS, where Workhorse checks the size and SHA256
-// checksum of the uploaded file.
-type Verifier interface {
- // Verify can abort the upload by returning an error
- Verify(handler *destination.FileHandler) error
-}
-
// Preparer is a pluggable behavior that interprets a Rails API response
// and either tells Workhorse how to handle the upload, via the
-// UploadOpts and Verifier, or it rejects the request by returning a
-// non-nil error. Its intended use is to make sure the upload gets stored
-// in the right location: either a local directory, or one of several
-// supported object storage backends.
+// UploadOpts, or it rejects the request by returning a non-nil error.
+// Its intended use is to make sure the upload gets stored in the right
+// location: either a local directory, or one of several supported object
+// storage backends.
type Preparer interface {
- Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error)
+ Prepare(a *api.Response) (*destination.UploadOpts, error)
}
type DefaultPreparer struct{}
-func (s *DefaultPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
- opts, err := destination.GetOpts(a)
- return opts, nil, err
+func (s *DefaultPreparer) Prepare(a *api.Response) (*destination.UploadOpts, error) {
+ return destination.GetOpts(a)
}
diff --git a/workhorse/internal/upload/uploads_test.go b/workhorse/internal/upload/uploads_test.go
index 9d787b10d1c..a9c8834d4be 100644
--- a/workhorse/internal/upload/uploads_test.go
+++ b/workhorse/internal/upload/uploads_test.go
@@ -46,7 +46,7 @@ func (a *testFormProcessor) Finalize(ctx context.Context) error {
func TestUploadTempPathRequirement(t *testing.T) {
apiResponse := &api.Response{}
preparer := &DefaultPreparer{}
- _, _, err := preparer.Prepare(apiResponse)
+ _, err := preparer.Prepare(apiResponse)
require.Error(t, err)
}
@@ -75,7 +75,7 @@ func TestUploadHandlerForwardingRawData(t *testing.T) {
handler := newProxy(ts.URL)
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
- opts, _, err := preparer.Prepare(apiResponse)
+ opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
interceptMultipartFiles(response, httpRequest, handler, apiResponse, nil, opts)
@@ -146,7 +146,7 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
- opts, _, err := preparer.Prepare(apiResponse)
+ opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
@@ -215,7 +215,7 @@ func TestUploadHandlerDetectingInjectedMultiPartData(t *testing.T) {
handler := newProxy(ts.URL)
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
- opts, _, err := preparer.Prepare(apiResponse)
+ opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
@@ -245,7 +245,7 @@ func TestUploadProcessingField(t *testing.T) {
response := httptest.NewRecorder()
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
- opts, _, err := preparer.Prepare(apiResponse)
+ opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts)
@@ -276,7 +276,7 @@ func TestUploadingMultipleFiles(t *testing.T) {
response := httptest.NewRecorder()
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
- opts, _, err := preparer.Prepare(apiResponse)
+ opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts)
@@ -332,7 +332,7 @@ func TestUploadProcessingFile(t *testing.T) {
response := httptest.NewRecorder()
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
- opts, _, err := preparer.Prepare(apiResponse)
+ opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts)
@@ -378,7 +378,7 @@ func TestInvalidFileNames(t *testing.T) {
response := httptest.NewRecorder()
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
- opts, _, err := preparer.Prepare(apiResponse)
+ opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts)
@@ -444,7 +444,7 @@ func TestContentDispositionRewrite(t *testing.T) {
response := httptest.NewRecorder()
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
- opts, _, err := preparer.Prepare(apiResponse)
+ opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
interceptMultipartFiles(response, httpRequest, customHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts)
@@ -567,7 +567,7 @@ func runUploadTest(t *testing.T, image []byte, filename string, httpCode int, ts
handler := newProxy(ts.URL)
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
- opts, _, err := preparer.Prepare(apiResponse)
+ opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go
index b1d76dfc1bd..dd106053f8b 100644
--- a/workhorse/internal/upstream/routes.go
+++ b/workhorse/internal/upstream/routes.go
@@ -46,13 +46,6 @@ type routeOptions struct {
matchers []matcherFunc
}
-type uploadPreparers struct {
- artifacts upload.Preparer
- lfs upload.Preparer
- packages upload.Preparer
- uploads upload.Preparer
-}
-
const (
apiPattern = `^/api/`
ciAPIPattern = `^/ci/api/`
@@ -225,13 +218,16 @@ func configureRoutes(u *upstream) {
signingTripper := secret.NewRoundTripper(u.RoundTripper, u.Version)
signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config, dependencyProxyInjector)
- preparers := createUploadPreparers(u.Config)
+ preparer := upload.NewObjectStoragePreparer(u.Config)
+ requestBodyUploader := upload.RequestBody(api, signingProxy, preparer)
+ mimeMultipartUploader := upload.Multipart(api, signingProxy, preparer)
+
uploadPath := path.Join(u.DocumentRoot, "uploads/tmp")
- tempfileMultipartProxy := upload.Multipart(&upload.SkipRailsAuthorizer{TempPath: uploadPath}, proxy, preparers.uploads)
+ tempfileMultipartProxy := upload.Multipart(&upload.SkipRailsAuthorizer{TempPath: uploadPath}, proxy, preparer)
ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", tempfileMultipartProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration)
- dependencyProxyInjector.SetUploadHandler(upload.RequestBody(api, signingProxy, preparers.packages))
+ dependencyProxyInjector.SetUploadHandler(requestBodyUploader)
// Serve static files or forward the requests
defaultUpstream := static.ServeExisting(
@@ -247,11 +243,11 @@ func configureRoutes(u *upstream) {
u.route("GET", gitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)),
u.route("POST", gitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))),
u.route("POST", gitProjectPattern+`git-receive-pack\z`, contentEncodingHandler(git.ReceivePack(api)), withMatcher(isContentType("application/x-git-receive-pack-request"))),
- u.route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, upload.RequestBody(api, signingProxy, preparers.lfs), withMatcher(isContentType("application/octet-stream"))),
+ u.route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, requestBodyUploader, withMatcher(isContentType("application/octet-stream"))),
// CI Artifacts
- u.route("POST", apiPattern+`v4/jobs/[0-9]+/artifacts\z`, contentEncodingHandler(upload.Artifacts(api, signingProxy, preparers.artifacts))),
- u.route("POST", ciAPIPattern+`v1/builds/[0-9]+/artifacts\z`, contentEncodingHandler(upload.Artifacts(api, signingProxy, preparers.artifacts))),
+ u.route("POST", apiPattern+`v4/jobs/[0-9]+/artifacts\z`, contentEncodingHandler(upload.Artifacts(api, signingProxy, preparer))),
+ u.route("POST", ciAPIPattern+`v1/builds/[0-9]+/artifacts\z`, contentEncodingHandler(upload.Artifacts(api, signingProxy, preparer))),
// ActionCable websocket
u.wsRoute(`^/-/cable\z`, cableProxy),
@@ -275,32 +271,32 @@ func configureRoutes(u *upstream) {
// https://gitlab.com/gitlab-org/gitlab/-/merge_requests/56731.
// Maven Artifact Repository
- u.route("PUT", apiProjectPattern+`packages/maven/`, upload.RequestBody(api, signingProxy, preparers.packages)),
+ u.route("PUT", apiProjectPattern+`packages/maven/`, requestBodyUploader),
// Conan Artifact Repository
- u.route("PUT", apiPattern+`v4/packages/conan/`, upload.RequestBody(api, signingProxy, preparers.packages)),
- u.route("PUT", apiProjectPattern+`packages/conan/`, upload.RequestBody(api, signingProxy, preparers.packages)),
+ u.route("PUT", apiPattern+`v4/packages/conan/`, requestBodyUploader),
+ u.route("PUT", apiProjectPattern+`packages/conan/`, requestBodyUploader),
// Generic Packages Repository
- u.route("PUT", apiProjectPattern+`packages/generic/`, upload.RequestBody(api, signingProxy, preparers.packages)),
+ u.route("PUT", apiProjectPattern+`packages/generic/`, requestBodyUploader),
// NuGet Artifact Repository
- u.route("PUT", apiProjectPattern+`packages/nuget/`, upload.Multipart(api, signingProxy, preparers.packages)),
+ u.route("PUT", apiProjectPattern+`packages/nuget/`, mimeMultipartUploader),
// PyPI Artifact Repository
- u.route("POST", apiProjectPattern+`packages/pypi`, upload.Multipart(api, signingProxy, preparers.packages)),
+ u.route("POST", apiProjectPattern+`packages/pypi`, mimeMultipartUploader),
// Debian Artifact Repository
- u.route("PUT", apiProjectPattern+`packages/debian/`, upload.RequestBody(api, signingProxy, preparers.packages)),
+ u.route("PUT", apiProjectPattern+`packages/debian/`, requestBodyUploader),
// Gem Artifact Repository
- u.route("POST", apiProjectPattern+`packages/rubygems/`, upload.RequestBody(api, signingProxy, preparers.packages)),
+ u.route("POST", apiProjectPattern+`packages/rubygems/`, requestBodyUploader),
// Terraform Module Package Repository
- u.route("PUT", apiProjectPattern+`packages/terraform/modules/`, upload.RequestBody(api, signingProxy, preparers.packages)),
+ u.route("PUT", apiProjectPattern+`packages/terraform/modules/`, requestBodyUploader),
// Helm Artifact Repository
- u.route("POST", apiProjectPattern+`packages/helm/api/[^/]+/charts\z`, upload.Multipart(api, signingProxy, preparers.packages)),
+ u.route("POST", apiProjectPattern+`packages/helm/api/[^/]+/charts\z`, mimeMultipartUploader),
// We are porting API to disk acceleration
// we need to declare each routes until we have fixed all the routes on the rails codebase.
@@ -309,25 +305,25 @@ func configureRoutes(u *upstream) {
u.route("POST", apiPattern+`graphql\z`, tempfileMultipartProxy),
u.route("POST", apiTopicPattern, tempfileMultipartProxy),
u.route("PUT", apiTopicPattern, tempfileMultipartProxy),
- u.route("POST", apiPattern+`v4/groups/import`, upload.Multipart(api, signingProxy, preparers.uploads)),
- u.route("POST", apiPattern+`v4/projects/import`, upload.Multipart(api, signingProxy, preparers.uploads)),
+ u.route("POST", apiPattern+`v4/groups/import`, mimeMultipartUploader),
+ u.route("POST", apiPattern+`v4/projects/import`, mimeMultipartUploader),
// Project Import via UI upload acceleration
- u.route("POST", importPattern+`gitlab_project`, upload.Multipart(api, signingProxy, preparers.uploads)),
+ u.route("POST", importPattern+`gitlab_project`, mimeMultipartUploader),
// Group Import via UI upload acceleration
- u.route("POST", importPattern+`gitlab_group`, upload.Multipart(api, signingProxy, preparers.uploads)),
+ u.route("POST", importPattern+`gitlab_group`, mimeMultipartUploader),
// Issuable Metric image upload
- u.route("POST", apiProjectPattern+`issues/[0-9]+/metric_images\z`, upload.Multipart(api, signingProxy, preparers.uploads)),
+ u.route("POST", apiProjectPattern+`issues/[0-9]+/metric_images\z`, mimeMultipartUploader),
// Alert Metric image upload
- u.route("POST", apiProjectPattern+`alert_management_alerts/[0-9]+/metric_images\z`, upload.Multipart(api, signingProxy, preparers.uploads)),
+ u.route("POST", apiProjectPattern+`alert_management_alerts/[0-9]+/metric_images\z`, mimeMultipartUploader),
// Requirements Import via UI upload acceleration
- u.route("POST", projectPattern+`requirements_management/requirements/import_csv`, upload.Multipart(api, signingProxy, preparers.uploads)),
+ u.route("POST", projectPattern+`requirements_management/requirements/import_csv`, mimeMultipartUploader),
// Uploads via API
- u.route("POST", apiProjectPattern+`uploads\z`, upload.Multipart(api, signingProxy, preparers.uploads)),
+ u.route("POST", apiProjectPattern+`uploads\z`, mimeMultipartUploader),
// Explicitly proxy API requests
u.route("", apiPattern, proxy),
@@ -345,9 +341,9 @@ func configureRoutes(u *upstream) {
),
// Uploads
- u.route("POST", projectPattern+`uploads\z`, upload.Multipart(api, signingProxy, preparers.uploads)),
- u.route("POST", snippetUploadPattern, upload.Multipart(api, signingProxy, preparers.uploads)),
- u.route("POST", userUploadPattern, upload.Multipart(api, signingProxy, preparers.uploads)),
+ u.route("POST", projectPattern+`uploads\z`, mimeMultipartUploader),
+ u.route("POST", snippetUploadPattern, mimeMultipartUploader),
+ u.route("POST", userUploadPattern, mimeMultipartUploader),
// health checks don't intercept errors and go straight to rails
// TODO: We should probably not return a HTML deploy page?
@@ -411,17 +407,6 @@ func configureRoutes(u *upstream) {
}
}
-func createUploadPreparers(cfg config.Config) uploadPreparers {
- defaultPreparer := upload.NewObjectStoragePreparer(cfg)
-
- return uploadPreparers{
- artifacts: defaultPreparer,
- lfs: upload.NewLfsPreparer(cfg, defaultPreparer),
- packages: defaultPreparer,
- uploads: defaultPreparer,
- }
-}
-
func denyWebsocket(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if websocket.IsWebSocketUpgrade(r) {