summaryrefslogtreecommitdiff
path: root/workhorse/internal
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal')
-rw-r--r--workhorse/internal/api/api.go2
-rw-r--r--workhorse/internal/artifacts/artifacts_store_test.go16
-rw-r--r--workhorse/internal/artifacts/artifacts_upload_test.go6
-rw-r--r--workhorse/internal/dependencyproxy/dependencyproxy.go123
-rw-r--r--workhorse/internal/dependencyproxy/dependencyproxy_test.go183
-rw-r--r--workhorse/internal/filestore/save_file_opts_test.go8
-rw-r--r--workhorse/internal/git/archive.go4
-rw-r--r--workhorse/internal/git/info-refs.go20
-rw-r--r--workhorse/internal/git/info-refs_test.go42
-rw-r--r--workhorse/internal/git/upload-pack_test.go13
-rw-r--r--workhorse/internal/gitaly/gitaly.go46
-rw-r--r--workhorse/internal/gitaly/gitaly_test.go20
-rw-r--r--workhorse/internal/gitaly/smarthttp.go46
-rw-r--r--workhorse/internal/gitaly/unmarshal_test.go13
-rw-r--r--workhorse/internal/testhelper/gitaly.go47
-rw-r--r--workhorse/internal/upload/uploads_test.go8
-rw-r--r--workhorse/internal/upstream/routes.go19
-rw-r--r--workhorse/internal/upstream/upstream.go18
-rw-r--r--workhorse/internal/upstream/upstream_test.go107
-rw-r--r--workhorse/internal/zipartifacts/open_archive_test.go137
20 files changed, 756 insertions, 122 deletions
diff --git a/workhorse/internal/api/api.go b/workhorse/internal/api/api.go
index 417ee71dbdc..7f696f70c7a 100644
--- a/workhorse/internal/api/api.go
+++ b/workhorse/internal/api/api.go
@@ -155,8 +155,6 @@ type Response struct {
ProcessLsifReferences bool
// The maximum accepted size in bytes of the upload
MaximumSize int64
- // DEPRECATED: Feature flag used to determine whether to strip the multipart filename of any directories
- FeatureFlagExtractBase bool
}
// singleJoiningSlash is taken from reverseproxy.go:singleJoiningSlash
diff --git a/workhorse/internal/artifacts/artifacts_store_test.go b/workhorse/internal/artifacts/artifacts_store_test.go
index 67a7793e853..a01a723298f 100644
--- a/workhorse/internal/artifacts/artifacts_store_test.go
+++ b/workhorse/internal/artifacts/artifacts_store_test.go
@@ -102,11 +102,11 @@ func TestUploadHandlerSendingToExternalStorage(t *testing.T) {
tests := []struct {
name string
- preauth api.Response
+ preauth *api.Response
}{
{
name: "ObjectStore Upload",
- preauth: api.Response{
+ preauth: &api.Response{
RemoteObject: api.RemoteObject{
StoreURL: storeServer.URL + "/url/put" + qs,
ID: "store-id",
@@ -145,7 +145,7 @@ func TestUploadHandlerSendingToExternalStorageAndStorageServerUnreachable(t *tes
t.Fatal("it should not be called")
}
- authResponse := api.Response{
+ authResponse := &api.Response{
TempPath: tempPath,
RemoteObject: api.RemoteObject{
StoreURL: "http://localhost:12323/invalid/url",
@@ -171,7 +171,7 @@ func TestUploadHandlerSendingToExternalStorageAndInvalidURLIsUsed(t *testing.T)
t.Fatal("it should not be called")
}
- authResponse := api.Response{
+ authResponse := &api.Response{
TempPath: tempPath,
RemoteObject: api.RemoteObject{
StoreURL: "htt:////invalid-url",
@@ -203,7 +203,7 @@ func TestUploadHandlerSendingToExternalStorageAndItReturnsAnError(t *testing.T)
storeServer := httptest.NewServer(storeServerMux)
defer storeServer.Close()
- authResponse := api.Response{
+ authResponse := &api.Response{
RemoteObject: api.RemoteObject{
StoreURL: storeServer.URL + "/url/put",
ID: "store-id",
@@ -236,7 +236,7 @@ func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testin
storeServer := httptest.NewServer(storeServerMux)
defer storeServer.Close()
- authResponse := api.Response{
+ authResponse := &api.Response{
RemoteObject: api.RemoteObject{
StoreURL: storeServer.URL + "/url/put",
ID: "store-id",
@@ -262,7 +262,7 @@ func TestUploadHandlerMultipartUploadSizeLimit(t *testing.T) {
objectURL := server.URL + test.ObjectPath
uploadSize := 10
- preauth := api.Response{
+ preauth := &api.Response{
RemoteObject: api.RemoteObject{
ID: "store-id",
MultipartUpload: &api.MultipartUploadParams{
@@ -304,7 +304,7 @@ func TestUploadHandlerMultipartUploadMaximumSizeFromApi(t *testing.T) {
uploadSize := int64(10)
maxSize := uploadSize - 1
- preauth := api.Response{
+ preauth := &api.Response{
MaximumSize: maxSize,
RemoteObject: api.RemoteObject{
ID: "store-id",
diff --git a/workhorse/internal/artifacts/artifacts_upload_test.go b/workhorse/internal/artifacts/artifacts_upload_test.go
index 2b11d56f4ee..3e8a52be1a1 100644
--- a/workhorse/internal/artifacts/artifacts_upload_test.go
+++ b/workhorse/internal/artifacts/artifacts_upload_test.go
@@ -35,7 +35,7 @@ const (
Path = "/url/path"
)
-func testArtifactsUploadServer(t *testing.T, authResponse api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *httptest.Server {
+func testArtifactsUploadServer(t *testing.T, authResponse *api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc(Path+"/authorize", func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
@@ -51,7 +51,7 @@ func testArtifactsUploadServer(t *testing.T, authResponse api.Response, bodyProc
w.Write(data)
})
mux.HandleFunc(Path, func(w http.ResponseWriter, r *http.Request) {
- opts, err := filestore.GetOpts(&authResponse)
+ opts, err := filestore.GetOpts(authResponse)
require.NoError(t, err)
if r.Method != "POST" {
@@ -128,7 +128,7 @@ func setupWithTmpPath(t *testing.T, filename string, includeFormat bool, format
authResponse = &api.Response{TempPath: tempPath}
}
- ts := testArtifactsUploadServer(t, *authResponse, bodyProcessor)
+ ts := testArtifactsUploadServer(t, authResponse, bodyProcessor)
var buffer bytes.Buffer
writer := multipart.NewWriter(&buffer)
diff --git a/workhorse/internal/dependencyproxy/dependencyproxy.go b/workhorse/internal/dependencyproxy/dependencyproxy.go
new file mode 100644
index 00000000000..cfb3045544f
--- /dev/null
+++ b/workhorse/internal/dependencyproxy/dependencyproxy.go
@@ -0,0 +1,123 @@
+package dependencyproxy
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "time"
+
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "gitlab.com/gitlab-org/labkit/log"
+ "gitlab.com/gitlab-org/labkit/tracing"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/senddata"
+)
+
+// httpTransport defines a http.Transport with values
+// that are more restrictive than for http.DefaultTransport,
+// they define shorter TLS Handshake, and more aggressive connection closing
+// to prevent the connection hanging and reduce FD usage
+var httpTransport = tracing.NewRoundTripper(correlation.NewInstrumentedRoundTripper(&http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ DialContext: (&net.Dialer{
+ Timeout: 30 * time.Second,
+ KeepAlive: 10 * time.Second,
+ }).DialContext,
+ MaxIdleConns: 2,
+ IdleConnTimeout: 30 * time.Second,
+ TLSHandshakeTimeout: 10 * time.Second,
+ ExpectContinueTimeout: 10 * time.Second,
+ ResponseHeaderTimeout: 30 * time.Second,
+}))
+
+var httpClient = &http.Client{
+ Transport: httpTransport,
+}
+
+type Injector struct {
+ senddata.Prefix
+ uploadHandler http.Handler
+}
+
+type entryParams struct {
+ Url string
+ Header http.Header
+}
+
+type nullResponseWriter struct {
+ header http.Header
+ status int
+}
+
+func (nullResponseWriter) Write(p []byte) (int, error) {
+ return len(p), nil
+}
+
+func (w *nullResponseWriter) Header() http.Header {
+ return w.header
+}
+
+func (w *nullResponseWriter) WriteHeader(status int) {
+ if w.status == 0 {
+ w.status = status
+ }
+}
+
+func NewInjector() *Injector {
+ return &Injector{Prefix: "send-dependency:"}
+}
+
+func (p *Injector) SetUploadHandler(uploadHandler http.Handler) {
+ p.uploadHandler = uploadHandler
+}
+
+func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
+ dependencyResponse, err := p.fetchUrl(r.Context(), sendData)
+ if err != nil {
+ helper.Fail500(w, r, err)
+ return
+ }
+ defer dependencyResponse.Body.Close()
+ if dependencyResponse.StatusCode >= 400 {
+ w.WriteHeader(dependencyResponse.StatusCode)
+ io.Copy(w, dependencyResponse.Body)
+ return
+ }
+
+ teeReader := io.TeeReader(dependencyResponse.Body, w)
+ saveFileRequest, err := http.NewRequestWithContext(r.Context(), "POST", r.URL.String()+"/upload", teeReader)
+ if err != nil {
+ helper.Fail500(w, r, fmt.Errorf("dependency proxy: failed to create request: %w", err))
+ }
+ saveFileRequest.Header = helper.HeaderClone(r.Header)
+ saveFileRequest.ContentLength = dependencyResponse.ContentLength
+
+ w.Header().Del("Content-Length")
+
+ nrw := &nullResponseWriter{header: make(http.Header)}
+ p.uploadHandler.ServeHTTP(nrw, saveFileRequest)
+
+ if nrw.status != http.StatusOK {
+ fields := log.Fields{"code": nrw.status}
+
+ helper.Fail500WithFields(nrw, r, fmt.Errorf("dependency proxy: failed to upload file"), fields)
+ }
+}
+
+func (p *Injector) fetchUrl(ctx context.Context, sendData string) (*http.Response, error) {
+ var params entryParams
+ if err := p.Unpack(&params, sendData); err != nil {
+ return nil, fmt.Errorf("dependency proxy: unpack sendData: %v", err)
+ }
+
+ r, err := http.NewRequestWithContext(ctx, "GET", params.Url, nil)
+ if err != nil {
+ return nil, fmt.Errorf("dependency proxy: failed to fetch dependency: %v", err)
+ }
+ r.Header = params.Header
+
+ return httpClient.Do(r)
+}
diff --git a/workhorse/internal/dependencyproxy/dependencyproxy_test.go b/workhorse/internal/dependencyproxy/dependencyproxy_test.go
new file mode 100644
index 00000000000..37e54c0b756
--- /dev/null
+++ b/workhorse/internal/dependencyproxy/dependencyproxy_test.go
@@ -0,0 +1,183 @@
+package dependencyproxy
+
+import (
+ "encoding/base64"
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "strconv"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload"
+)
+
+type fakeUploadHandler struct {
+ request *http.Request
+ body []byte
+ handler func(w http.ResponseWriter, r *http.Request)
+}
+
+func (f *fakeUploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ f.request = r
+
+ f.body, _ = io.ReadAll(r.Body)
+
+ f.handler(w, r)
+}
+
+type errWriter struct{ writes int }
+
+func (w *errWriter) Header() http.Header { return nil }
+func (w *errWriter) WriteHeader(h int) {}
+
+// First call of Write function succeeds while all the subsequent ones fail
+func (w *errWriter) Write(p []byte) (int, error) {
+ if w.writes > 0 {
+ return 0, fmt.Errorf("client error")
+ }
+
+ w.writes++
+
+ return len(p), nil
+}
+
+type fakePreAuthHandler struct{}
+
+func (f *fakePreAuthHandler) PreAuthorizeHandler(handler api.HandleFunc, _ string) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ handler(w, r, &api.Response{TempPath: "../../testdata/scratch"})
+ })
+}
+
+func TestInject(t *testing.T) {
+ contentLength := 32768 + 1
+ content := strings.Repeat("p", contentLength)
+
+ testCases := []struct {
+ desc string
+ responseWriter http.ResponseWriter
+ contentLength int
+ handlerMustBeCalled bool
+ }{
+ {
+ desc: "the uploading successfully finalized",
+ responseWriter: httptest.NewRecorder(),
+ contentLength: contentLength,
+ handlerMustBeCalled: true,
+ }, {
+ desc: "a user failed to receive the response",
+ responseWriter: &errWriter{},
+ contentLength: contentLength,
+ handlerMustBeCalled: false,
+ }, {
+ desc: "the origin resource server returns partial response",
+ responseWriter: httptest.NewRecorder(),
+ contentLength: contentLength + 1,
+ handlerMustBeCalled: false,
+ },
+ }
+ testhelper.ConfigureSecret()
+
+ for _, tc := range testCases {
+ originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Length", strconv.Itoa(tc.contentLength))
+ w.Write([]byte(content))
+ }))
+ defer originResourceServer.Close()
+
+ // BodyUploader expects http.Handler as its second param, we can create a stub function and verify that
+ // it's only called for successful requests
+ handlerIsCalled := false
+ handlerFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handlerIsCalled = true })
+
+ bodyUploader := upload.BodyUploader(&fakePreAuthHandler{}, handlerFunc, &upload.DefaultPreparer{})
+
+ injector := NewInjector()
+ injector.SetUploadHandler(bodyUploader)
+
+ r := httptest.NewRequest("GET", "/target", nil)
+ sendData := base64.StdEncoding.EncodeToString([]byte(`{"Token": "token", "Url": "` + originResourceServer.URL + `/url"}`))
+
+ injector.Inject(tc.responseWriter, r, sendData)
+
+ require.Equal(t, tc.handlerMustBeCalled, handlerIsCalled, "a partial file must not be saved")
+ }
+}
+
+func TestSuccessfullRequest(t *testing.T) {
+ content := []byte("result")
+ originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Length", strconv.Itoa(len(content)))
+ w.Write(content)
+ }))
+
+ uploadHandler := &fakeUploadHandler{
+ handler: func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(200)
+ },
+ }
+
+ injector := NewInjector()
+ injector.SetUploadHandler(uploadHandler)
+
+ response := makeRequest(injector, `{"Token": "token", "Url": "`+originResourceServer.URL+`/url"}`)
+
+ require.Equal(t, "/target/upload", uploadHandler.request.URL.Path)
+ require.Equal(t, int64(6), uploadHandler.request.ContentLength)
+
+ require.Equal(t, content, uploadHandler.body)
+
+ require.Equal(t, 200, response.Code)
+ require.Equal(t, string(content), response.Body.String())
+}
+
+func TestIncorrectSendData(t *testing.T) {
+ response := makeRequest(NewInjector(), "")
+
+ require.Equal(t, 500, response.Code)
+ require.Equal(t, "Internal server error\n", response.Body.String())
+}
+
+func TestIncorrectSendDataUrl(t *testing.T) {
+ response := makeRequest(NewInjector(), `{"Token": "token", "Url": "url"}`)
+
+ require.Equal(t, 500, response.Code)
+ require.Equal(t, "Internal server error\n", response.Body.String())
+}
+
+func TestFailedOriginServer(t *testing.T) {
+ originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(404)
+ w.Write([]byte("Not found"))
+ }))
+
+ uploadHandler := &fakeUploadHandler{
+ handler: func(w http.ResponseWriter, r *http.Request) {
+ require.FailNow(t, "the error response must not be uploaded")
+ },
+ }
+
+ injector := NewInjector()
+ injector.SetUploadHandler(uploadHandler)
+
+ response := makeRequest(injector, `{"Token": "token", "Url": "`+originResourceServer.URL+`/url"}`)
+
+ require.Equal(t, 404, response.Code)
+ require.Equal(t, "Not found", response.Body.String())
+}
+
+func makeRequest(injector *Injector, data string) *httptest.ResponseRecorder {
+ w := httptest.NewRecorder()
+ r := httptest.NewRequest("GET", "/target", nil)
+
+ sendData := base64.StdEncoding.EncodeToString([]byte(data))
+ injector.Inject(w, r, sendData)
+
+ return w
+}
diff --git a/workhorse/internal/filestore/save_file_opts_test.go b/workhorse/internal/filestore/save_file_opts_test.go
index ead5ba86889..f389b2054e5 100644
--- a/workhorse/internal/filestore/save_file_opts_test.go
+++ b/workhorse/internal/filestore/save_file_opts_test.go
@@ -141,21 +141,21 @@ func TestGetOpts(t *testing.T) {
func TestGetOptsFail(t *testing.T) {
testCases := []struct {
desc string
- in api.Response
+ in *api.Response
}{
{
desc: "neither local nor remote",
- in: api.Response{},
+ in: &api.Response{},
},
{
desc: "both local and remote",
- in: api.Response{TempPath: "/foobar", RemoteObject: api.RemoteObject{ID: "id"}},
+ in: &api.Response{TempPath: "/foobar", RemoteObject: api.RemoteObject{ID: "id"}},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- _, err := filestore.GetOpts(&tc.in)
+ _, err := filestore.GetOpts(tc.in)
require.Error(t, err, "expect input to be rejected")
})
}
diff --git a/workhorse/internal/git/archive.go b/workhorse/internal/git/archive.go
index bb35f2eb269..fc12094cc14 100644
--- a/workhorse/internal/git/archive.go
+++ b/workhorse/internal/git/archive.go
@@ -102,7 +102,7 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string
var archiveReader io.Reader
- archiveReader, err = handleArchiveWithGitaly(r, params, format)
+ archiveReader, err = handleArchiveWithGitaly(r, &params, format)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("operations.GetArchive: %v", err))
return
@@ -130,7 +130,7 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string
}
}
-func handleArchiveWithGitaly(r *http.Request, params archiveParams, format gitalypb.GetArchiveRequest_Format) (io.Reader, error) {
+func handleArchiveWithGitaly(r *http.Request, params *archiveParams, format gitalypb.GetArchiveRequest_Format) (io.Reader, error) {
var request *gitalypb.GetArchiveRequest
ctx, c, err := gitaly.NewRepositoryClient(r.Context(), params.GitalyServer)
if err != nil {
diff --git a/workhorse/internal/git/info-refs.go b/workhorse/internal/git/info-refs.go
index 0d8edbeff06..8390143b99b 100644
--- a/workhorse/internal/git/info-refs.go
+++ b/workhorse/internal/git/info-refs.go
@@ -8,8 +8,8 @@ import (
"net/http"
"github.com/golang/gddo/httputil"
-
- "gitlab.com/gitlab-org/labkit/log"
+ grpccodes "google.golang.org/grpc/codes"
+ grpcstatus "google.golang.org/grpc/status"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
@@ -26,6 +26,7 @@ func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response)
defer responseWriter.Log(r, 0)
rpc := getService(r)
+
if !(rpc == "git-upload-pack" || rpc == "git-receive-pack") {
// The 'dumb' Git HTTP protocol is not supported
http.Error(responseWriter, "Not Found", 404)
@@ -41,19 +42,26 @@ func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response)
encoding := httputil.NegotiateContentEncoding(r, offers)
if err := handleGetInfoRefsWithGitaly(r.Context(), responseWriter, a, rpc, gitProtocol, encoding); err != nil {
- helper.Fail500(responseWriter, r, fmt.Errorf("handleGetInfoRefs: %v", err))
+ status := grpcstatus.Convert(err)
+ err = fmt.Errorf("handleGetInfoRefs: %v", err)
+
+ if status != nil && status.Code() == grpccodes.Unavailable {
+ helper.CaptureAndFail(responseWriter, r, err, "The git server, Gitaly, is not available at this time. Please contact your administrator.", http.StatusServiceUnavailable)
+ } else {
+ helper.Fail500(responseWriter, r, err)
+ }
}
}
func handleGetInfoRefsWithGitaly(ctx context.Context, responseWriter *HttpResponseWriter, a *api.Response, rpc, gitProtocol, encoding string) error {
ctx, smarthttp, err := gitaly.NewSmartHTTPClient(ctx, a.GitalyServer)
if err != nil {
- return fmt.Errorf("GetInfoRefsHandler: %v", err)
+ return err
}
infoRefsResponseReader, err := smarthttp.InfoRefsResponseReader(ctx, &a.Repository, rpc, gitConfigOptions(a), gitProtocol)
if err != nil {
- return fmt.Errorf("GetInfoRefsHandler: %v", err)
+ return err
}
var w io.Writer
@@ -69,7 +77,7 @@ func handleGetInfoRefsWithGitaly(ctx context.Context, responseWriter *HttpRespon
}
if _, err = io.Copy(w, infoRefsResponseReader); err != nil {
- log.WithError(err).Error("GetInfoRefsHandler: error copying gitaly response")
+ return err
}
return nil
diff --git a/workhorse/internal/git/info-refs_test.go b/workhorse/internal/git/info-refs_test.go
new file mode 100644
index 00000000000..4f23d1ac174
--- /dev/null
+++ b/workhorse/internal/git/info-refs_test.go
@@ -0,0 +1,42 @@
+package git
+
+import (
+ "net/http/httptest"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ grpccodes "google.golang.org/grpc/codes"
+ grpcstatus "google.golang.org/grpc/status"
+
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
+)
+
+type smartHTTPServiceServerWithInfoRefs struct {
+ gitalypb.UnimplementedSmartHTTPServiceServer
+ InfoRefsUploadPackFunc func(*gitalypb.InfoRefsRequest, gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error
+}
+
+func (srv *smartHTTPServiceServerWithInfoRefs) InfoRefsUploadPack(r *gitalypb.InfoRefsRequest, s gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error {
+ return srv.InfoRefsUploadPackFunc(r, s)
+}
+
+func TestGetInfoRefsHandler(t *testing.T) {
+ addr := startSmartHTTPServer(t, &smartHTTPServiceServerWithInfoRefs{
+ InfoRefsUploadPackFunc: func(r *gitalypb.InfoRefsRequest, s gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error {
+ return grpcstatus.Error(grpccodes.Unavailable, "error")
+ },
+ })
+
+ w := httptest.NewRecorder()
+ r := httptest.NewRequest("GET", "/?service=git-upload-pack", nil)
+ a := &api.Response{GitalyServer: gitaly.Server{Address: addr}}
+
+ handleGetInfoRefs(NewHttpResponseWriter(w), r, a)
+ require.Equal(t, 503, w.Code)
+
+ msg := "The git server, Gitaly, is not available at this time. Please contact your administrator.\n"
+ require.Equal(t, msg, w.Body.String())
+}
diff --git a/workhorse/internal/git/upload-pack_test.go b/workhorse/internal/git/upload-pack_test.go
index ebefe360122..211f68a2608 100644
--- a/workhorse/internal/git/upload-pack_test.go
+++ b/workhorse/internal/git/upload-pack_test.go
@@ -45,14 +45,13 @@ func TestUploadPackTimesOut(t *testing.T) {
uploadPackTimeout = time.Millisecond
defer func() { uploadPackTimeout = originalUploadPackTimeout }()
- addr, cleanUp := startSmartHTTPServer(t, &smartHTTPServiceServer{
+ addr := startSmartHTTPServer(t, &smartHTTPServiceServer{
PostUploadPackFunc: func(stream gitalypb.SmartHTTPService_PostUploadPackServer) error {
_, err := stream.Recv() // trigger a read on the client request body
require.NoError(t, err)
return nil
},
})
- defer cleanUp()
body := &fakeReader{n: 0, err: nil}
@@ -64,7 +63,9 @@ func TestUploadPackTimesOut(t *testing.T) {
require.EqualError(t, err, "smarthttp.UploadPack: busyReader: context deadline exceeded")
}
-func startSmartHTTPServer(t testing.TB, s gitalypb.SmartHTTPServiceServer) (string, func()) {
+func startSmartHTTPServer(t testing.TB, s gitalypb.SmartHTTPServiceServer) string {
+ t.Helper()
+
tmp, err := ioutil.TempDir("", "")
require.NoError(t, err)
@@ -78,8 +79,10 @@ func startSmartHTTPServer(t testing.TB, s gitalypb.SmartHTTPServiceServer) (stri
require.NoError(t, srv.Serve(ln))
}()
- return fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr().String()), func() {
+ t.Cleanup(func() {
srv.GracefulStop()
require.NoError(t, os.RemoveAll(tmp), "error removing temp dir %q", tmp)
- }
+ })
+
+ return fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr().String())
}
diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go
index 6ea99962056..362f380dc4d 100644
--- a/workhorse/internal/gitaly/gitaly.go
+++ b/workhorse/internal/gitaly/gitaly.go
@@ -11,6 +11,7 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
+ "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
@@ -23,15 +24,19 @@ import (
)
type Server struct {
- Address string `json:"address"`
- Token string `json:"token"`
- Features map[string]string `json:"features"`
+ Address string `json:"address"`
+ Token string `json:"token"`
+ Features map[string]string `json:"features"`
+ Sidechannel bool `json:"sidechannel"`
}
-type cacheKey struct{ address, token string }
+type cacheKey struct {
+ address, token string
+ sidechannel bool
+}
func (server Server) cacheKey() cacheKey {
- return cacheKey{address: server.Address, token: server.Token}
+ return cacheKey{address: server.Address, token: server.Token, sidechannel: server.Sidechannel}
}
type connectionsCache struct {
@@ -41,9 +46,17 @@ type connectionsCache struct {
var (
jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true}
- cache = connectionsCache{
+ // This connection cache map contains two types of connections:
+ // - Normal gRPC connections
+ // - Sidechannel connections. When client dials to the Gitaly server, the
+ // server multiplexes the connection using Yamux. In the future, the server
+ // can open another stream to transfer data without gRPC. Besides, we apply
+ // a framing protocol to add the half-close capability to Yamux streams.
+ // Hence, we cannot use those connections interchangeably.
+ cache = connectionsCache{
connections: make(map[cacheKey]*grpc.ClientConn),
}
+ sidechannelRegistry *gitalyclient.SidechannelRegistry
connectionsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
@@ -54,6 +67,12 @@ var (
)
)
+func InitializeSidechannelRegistry(logger *logrus.Logger) {
+ if sidechannelRegistry == nil {
+ sidechannelRegistry = gitalyclient.NewSidechannelRegistry(logrus.NewEntry(logger))
+ }
+}
+
func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context {
md := metadata.New(nil)
for k, v := range features {
@@ -72,7 +91,12 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S
return nil, nil, err
}
grpcClient := gitalypb.NewSmartHTTPServiceClient(conn)
- return withOutgoingMetadata(ctx, server.Features), &SmartHTTPClient{grpcClient}, nil
+ smartHTTPClient := &SmartHTTPClient{
+ SmartHTTPServiceClient: grpcClient,
+ sidechannelRegistry: sidechannelRegistry,
+ useSidechannel: server.Sidechannel,
+ }
+ return withOutgoingMetadata(ctx, server.Features), smartHTTPClient, nil
}
func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) {
@@ -173,7 +197,13 @@ func newConnection(server Server) (*grpc.ClientConn, error) {
),
)
- conn, connErr := gitalyclient.Dial(server.Address, connOpts)
+ var conn *grpc.ClientConn
+ var connErr error
+ if server.Sidechannel {
+ conn, connErr = gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background
+ } else {
+ conn, connErr = gitalyclient.Dial(server.Address, connOpts)
+ }
label := "ok"
if connErr != nil {
diff --git a/workhorse/internal/gitaly/gitaly_test.go b/workhorse/internal/gitaly/gitaly_test.go
index b17fb5c1d7b..9c54caae8c6 100644
--- a/workhorse/internal/gitaly/gitaly_test.go
+++ b/workhorse/internal/gitaly/gitaly_test.go
@@ -4,14 +4,32 @@ import (
"context"
"testing"
+ "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
)
func TestNewSmartHTTPClient(t *testing.T) {
- ctx, _, err := NewSmartHTTPClient(context.Background(), serverFixture())
+ ctx, client, err := NewSmartHTTPClient(context.Background(), serverFixture())
require.NoError(t, err)
testOutgoingMetadata(t, ctx)
+
+ require.False(t, client.useSidechannel)
+ require.Nil(t, client.sidechannelRegistry)
+}
+
+func TestNewSmartHTTPClientWithSidechannel(t *testing.T) {
+ InitializeSidechannelRegistry(logrus.StandardLogger())
+
+ fixture := serverFixture()
+ fixture.Sidechannel = true
+
+ ctx, client, err := NewSmartHTTPClient(context.Background(), fixture)
+ require.NoError(t, err)
+ testOutgoingMetadata(t, ctx)
+
+ require.True(t, client.useSidechannel)
+ require.NotNil(t, client.sidechannelRegistry)
}
func TestNewBlobClient(t *testing.T) {
diff --git a/workhorse/internal/gitaly/smarthttp.go b/workhorse/internal/gitaly/smarthttp.go
index 69656ab0a92..de6954efa60 100644
--- a/workhorse/internal/gitaly/smarthttp.go
+++ b/workhorse/internal/gitaly/smarthttp.go
@@ -5,11 +5,14 @@ import (
"fmt"
"io"
+ gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
)
type SmartHTTPClient struct {
+ useSidechannel bool
+ sidechannelRegistry *gitalyclient.SidechannelRegistry
gitalypb.SmartHTTPServiceClient
}
@@ -93,6 +96,14 @@ func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.R
}
func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
+ if client.useSidechannel {
+ return client.runUploadPackWithSidechannel(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol)
+ }
+
+ return client.runUploadPack(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol)
+}
+
+func (client *SmartHTTPClient) runUploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
stream, err := client.PostUploadPack(ctx)
if err != nil {
return err
@@ -137,3 +148,38 @@ func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Re
return nil
}
+
+func (client *SmartHTTPClient) runUploadPackWithSidechannel(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
+ ctx, waiter := client.sidechannelRegistry.Register(ctx, func(conn gitalyclient.SidechannelConn) error {
+ if _, err := io.Copy(conn, clientRequest); err != nil {
+ return err
+ }
+
+ if err := conn.CloseWrite(); err != nil {
+ return fmt.Errorf("fail to signal sidechannel half-close: %w", err)
+ }
+
+ if _, err := io.Copy(clientResponse, conn); err != nil {
+ return err
+ }
+
+ return nil
+ })
+ defer waiter.Close()
+
+ rpcRequest := &gitalypb.PostUploadPackWithSidechannelRequest{
+ Repository: repo,
+ GitConfigOptions: gitConfigOptions,
+ GitProtocol: gitProtocol,
+ }
+
+ if _, err := client.PostUploadPackWithSidechannel(ctx, rpcRequest); err != nil {
+ return err
+ }
+
+ if err := waiter.Close(); err != nil {
+ return fmt.Errorf("fail to close sidechannel connection: %w", err)
+ }
+
+ return nil
+}
diff --git a/workhorse/internal/gitaly/unmarshal_test.go b/workhorse/internal/gitaly/unmarshal_test.go
index 270b96f900d..730eff4005a 100644
--- a/workhorse/internal/gitaly/unmarshal_test.go
+++ b/workhorse/internal/gitaly/unmarshal_test.go
@@ -3,6 +3,7 @@ package gitaly
import (
"testing"
+ "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
@@ -11,25 +12,25 @@ func TestUnmarshalJSON(t *testing.T) {
testCases := []struct {
desc string
in string
- out gitalypb.Repository
+ out *gitalypb.Repository
}{
{
desc: "basic example",
in: `{"relative_path":"foo/bar.git"}`,
- out: gitalypb.Repository{RelativePath: "foo/bar.git"},
+ out: &gitalypb.Repository{RelativePath: "foo/bar.git"},
},
{
desc: "unknown field",
in: `{"relative_path":"foo/bar.git","unknown_field":12345}`,
- out: gitalypb.Repository{RelativePath: "foo/bar.git"},
+ out: &gitalypb.Repository{RelativePath: "foo/bar.git"},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- result := gitalypb.Repository{}
- require.NoError(t, UnmarshalJSON(tc.in, &result))
- require.Equal(t, tc.out, result)
+ result := &gitalypb.Repository{}
+ require.NoError(t, UnmarshalJSON(tc.in, result))
+ require.True(t, proto.Equal(tc.out, result))
})
}
}
diff --git a/workhorse/internal/testhelper/gitaly.go b/workhorse/internal/testhelper/gitaly.go
index 13e71c570a7..da2fbf30785 100644
--- a/workhorse/internal/testhelper/gitaly.go
+++ b/workhorse/internal/testhelper/gitaly.go
@@ -1,6 +1,7 @@
package testhelper
import (
+ "bytes"
"fmt"
"io"
"io/ioutil"
@@ -11,6 +12,7 @@ import (
"github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"golang.org/x/net/context"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
@@ -23,6 +25,7 @@ type GitalyTestServer struct {
finalMessageCode codes.Code
sync.WaitGroup
LastIncomingMetadata metadata.MD
+ gitalypb.UnimplementedSmartHTTPServiceServer
gitalypb.UnimplementedRepositoryServiceServer
gitalypb.UnimplementedBlobServiceServer
gitalypb.UnimplementedDiffServiceServer
@@ -191,13 +194,14 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU
return err
}
- jsonString, err := marshalJSON(req)
- if err != nil {
+ marshaler := &jsonpb.Marshaler{}
+ jsonBytes := &bytes.Buffer{}
+ if err := marshaler.Marshal(jsonBytes, req); err != nil {
return err
}
if err := stream.Send(&gitalypb.PostUploadPackResponse{
- Data: []byte(strings.Join([]string{jsonString}, "\000") + "\000"),
+ Data: append(jsonBytes.Bytes(), 0),
}); err != nil {
return err
}
@@ -229,6 +233,43 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU
return s.finalError()
}
+// PostUploadPackWithSidechannel should be a part of smarthttp server in real
+// server. In workhorse, setting up a real sidechannel server is troublesome.
+// Therefore, we bring up a sidechannel server with a mock server exported via
+// gitalyclient.TestSidechannelServer. This is the handler for that mock
+// server.
+func PostUploadPackWithSidechannel(srv interface{}, stream grpc.ServerStream, conn io.ReadWriteCloser) error {
+ if method, ok := grpc.Method(stream.Context()); !ok || method != "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel" {
+ return fmt.Errorf("unexpected method: %s", method)
+ }
+
+ var req gitalypb.PostUploadPackWithSidechannelRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return err
+ }
+
+ if err := validateRepository(req.GetRepository()); err != nil {
+ return err
+ }
+
+ marshaler := &jsonpb.Marshaler{}
+ jsonBytes := &bytes.Buffer{}
+ if err := marshaler.Marshal(jsonBytes, &req); err != nil {
+ return err
+ }
+
+ // Bounce back all data back to the client, plus flushing bytes
+ if _, err := conn.Write(append(jsonBytes.Bytes(), 0)); err != nil {
+ return err
+ }
+
+ if _, err := io.Copy(conn, conn); err != nil {
+ return err
+ }
+
+ return stream.SendMsg(&gitalypb.PostUploadPackWithSidechannelResponse{})
+}
+
func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAncestorRequest) (*gitalypb.CommitIsAncestorResponse, error) {
return nil, nil
}
diff --git a/workhorse/internal/upload/uploads_test.go b/workhorse/internal/upload/uploads_test.go
index 5fae5cf7bcb..29acf849e05 100644
--- a/workhorse/internal/upload/uploads_test.go
+++ b/workhorse/internal/upload/uploads_test.go
@@ -271,19 +271,19 @@ func TestUploadProcessingFile(t *testing.T) {
tests := []struct {
name string
- preauth api.Response
+ preauth *api.Response
}{
{
name: "FileStore Upload",
- preauth: api.Response{TempPath: tempPath},
+ preauth: &api.Response{TempPath: tempPath},
},
{
name: "ObjectStore Upload",
- preauth: api.Response{RemoteObject: api.RemoteObject{StoreURL: storeUrl}},
+ preauth: &api.Response{RemoteObject: api.RemoteObject{StoreURL: storeUrl}},
},
{
name: "ObjectStore and FileStore Upload",
- preauth: api.Response{
+ preauth: &api.Response{
TempPath: tempPath,
RemoteObject: api.RemoteObject{StoreURL: storeUrl},
},
diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go
index 8c85c5144e5..d39ba845dc5 100644
--- a/workhorse/internal/upstream/routes.go
+++ b/workhorse/internal/upstream/routes.go
@@ -16,6 +16,7 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/builds"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/channel"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/dependencyproxy"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/git"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/imageresizer"
@@ -56,6 +57,7 @@ const (
apiPattern = `^/api/`
ciAPIPattern = `^/ci/api/`
gitProjectPattern = `^/.+\.git/`
+ geoGitProjectPattern = `^/[^-].+\.git/` // Prevent matching routes like /-/push_from_secondary
projectPattern = `^/([^/]+/){1,}[^/]+/`
apiProjectPattern = apiPattern + `v4/projects/[^/]+/` // API: Projects can be encoded via group%2Fsubgroup%2Fproject
snippetUploadPattern = `^/uploads/personal_snippet`
@@ -170,7 +172,7 @@ func (ro *routeEntry) isMatch(cleanedPath string, req *http.Request) bool {
return ok
}
-func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config) http.Handler {
+func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config, dependencyProxyInjector *dependencyproxy.Injector) http.Handler {
proxier := proxypkg.NewProxy(backend, version, rt)
return senddata.SendData(
@@ -183,6 +185,7 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf
artifacts.SendEntry,
sendurl.SendURL,
imageresizer.NewResizer(cfg),
+ dependencyProxyInjector,
)
}
@@ -193,7 +196,8 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf
func configureRoutes(u *upstream) {
api := u.APIClient
static := &staticpages.Static{DocumentRoot: u.DocumentRoot, Exclude: staticExclude}
- proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config)
+ dependencyProxyInjector := dependencyproxy.NewInjector()
+ proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config, dependencyProxyInjector)
cableProxy := proxypkg.NewProxy(u.CableBackend, u.Version, u.CableRoundTripper)
assetsNotFoundHandler := NotFoundUnless(u.DevelopmentMode, proxy)
@@ -207,7 +211,7 @@ func configureRoutes(u *upstream) {
}
signingTripper := secret.NewRoundTripper(u.RoundTripper, u.Version)
- signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config)
+ signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config, dependencyProxyInjector)
preparers := createUploadPreparers(u.Config)
uploadPath := path.Join(u.DocumentRoot, "uploads/tmp")
@@ -215,6 +219,8 @@ func configureRoutes(u *upstream) {
ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration)
+ dependencyProxyInjector.SetUploadHandler(upload.BodyUploader(api, signingProxy, preparers.packages))
+
// Serve static files or forward the requests
defaultUpstream := static.ServeExisting(
u.URLPrefix,
@@ -348,10 +354,9 @@ func configureRoutes(u *upstream) {
// pulls are performed against a different source of truth. Ideally, we'd
// proxy/redirect pulls as well, when the secondary is not up-to-date.
//
- u.route("GET", gitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)),
- u.route("POST", gitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))),
- u.route("POST", gitProjectPattern+`git-receive-pack\z`, contentEncodingHandler(git.ReceivePack(api)), withMatcher(isContentType("application/x-git-receive-pack-request"))),
- u.route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, lfs.PutStore(api, signingProxy, preparers.lfs), withMatcher(isContentType("application/octet-stream"))),
+ u.route("GET", geoGitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)),
+ u.route("POST", geoGitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))),
+ u.route("GET", geoGitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})\z`, defaultUpstream),
// Serve health checks from this Geo secondary
u.route("", "^/-/(readiness|liveness)$", static.DeployPage(probeUpstream)),
diff --git a/workhorse/internal/upstream/upstream.go b/workhorse/internal/upstream/upstream.go
index d6e5e7766b5..e57f58d59dd 100644
--- a/workhorse/internal/upstream/upstream.go
+++ b/workhorse/internal/upstream/upstream.go
@@ -50,7 +50,7 @@ type upstream struct {
geoLocalRoutes []routeEntry
geoProxyCableRoute routeEntry
geoProxyRoute routeEntry
- geoProxyTestChannel chan struct{}
+ geoProxyPollSleep func(time.Duration)
accessLogger *logrus.Logger
enableGeoProxyFeature bool
mu sync.RWMutex
@@ -68,6 +68,9 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback
enableGeoProxyFeature: os.Getenv("GEO_SECONDARY_PROXY") == "1",
geoProxyBackend: &url.URL{},
}
+ if up.geoProxyPollSleep == nil {
+ up.geoProxyPollSleep = time.Sleep
+ }
if up.Backend == nil {
up.Backend = DefaultBackend
}
@@ -205,13 +208,7 @@ func (u *upstream) findGeoProxyRoute(cleanedPath string, r *http.Request) *route
func (u *upstream) pollGeoProxyAPI() {
for {
u.callGeoProxyAPI()
-
- // Notify tests when callGeoProxyAPI() finishes
- if u.geoProxyTestChannel != nil {
- u.geoProxyTestChannel <- struct{}{}
- }
-
- time.Sleep(geoProxyApiPollingInterval)
+ u.geoProxyPollSleep(geoProxyApiPollingInterval)
}
}
@@ -234,6 +231,11 @@ func (u *upstream) updateGeoProxyFields(geoProxyURL *url.URL) {
defer u.mu.Unlock()
u.geoProxyBackend = geoProxyURL
+
+ if u.geoProxyBackend.String() == "" {
+ return
+ }
+
geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode)
geoProxyUpstream := proxypkg.NewProxy(u.geoProxyBackend, u.Version, geoProxyRoundTripper)
u.geoProxyCableRoute = u.wsRoute(`^/-/cable\z`, geoProxyUpstream)
diff --git a/workhorse/internal/upstream/upstream_test.go b/workhorse/internal/upstream/upstream_test.go
index c86c03920f0..3c942767384 100644
--- a/workhorse/internal/upstream/upstream_test.go
+++ b/workhorse/internal/upstream/upstream_test.go
@@ -7,6 +7,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
+ "time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
@@ -71,10 +72,10 @@ func TestGeoProxyFeatureDisabledOnGeoSecondarySite(t *testing.T) {
defer rsDeferredClose()
geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
- railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody)
+ railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose()
- ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false)
+ ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, false)
defer wsDeferredClose()
testCases := []testCase{
@@ -91,13 +92,15 @@ func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) {
defer rsDeferredClose()
geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
- railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody)
+ railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose()
- ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true)
+ ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose()
testCases := []testCase{
+ {"push from secondary is forwarded", "/-/push_from_secondary/foo/bar.git/info/refs", "Geo primary received request to path /-/push_from_secondary/foo/bar.git/info/refs"},
+ {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"},
{"jobs request is forwarded", "/api/v4/jobs/request", "Geo primary received request to path /api/v4/jobs/request"},
{"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
{"unknown route is forwarded", "/anything", "Geo primary received request to path /anything"},
@@ -109,13 +112,14 @@ func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) {
// This test can be removed when the environment variable `GEO_SECONDARY_PROXY` is removed
func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) {
geoProxyEndpointResponseBody := "{}"
- railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody)
+ railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose()
- ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false)
+ ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, false)
defer wsDeferredClose()
testCases := []testCase{
+ {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"},
{"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"},
{"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
{"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"},
@@ -126,13 +130,14 @@ func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) {
func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) {
geoProxyEndpointResponseBody := "{}"
- railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody)
+ railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose()
- ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true)
+ ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose()
testCases := []testCase{
+ {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"},
{"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"},
{"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
{"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"},
@@ -143,13 +148,14 @@ func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) {
func TestGeoProxyFeatureEnabledButWithAPIError(t *testing.T) {
geoProxyEndpointResponseBody := "Invalid response"
- railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody)
+ railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose()
- ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true)
+ ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose()
testCases := []testCase{
+ {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"},
{"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"},
{"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
{"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"},
@@ -158,6 +164,52 @@ func TestGeoProxyFeatureEnabledButWithAPIError(t *testing.T) {
runTestCases(t, ws, testCases)
}
+func TestGeoProxyFeatureEnablingAndDisabling(t *testing.T) {
+ remoteServer, rsDeferredClose := startRemoteServer("Geo primary")
+ defer rsDeferredClose()
+
+ geoProxyEndpointEnabledResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
+ geoProxyEndpointDisabledResponseBody := "{}"
+ geoProxyEndpointResponseBody := geoProxyEndpointEnabledResponseBody
+
+ railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
+ defer deferredClose()
+
+ ws, wsDeferredClose, waitForNextApiPoll := startWorkhorseServer(railsServer.URL, true)
+ defer wsDeferredClose()
+
+ testCasesLocal := []testCase{
+ {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"},
+ {"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"},
+ {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
+ {"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"},
+ }
+
+ testCasesProxied := []testCase{
+ {"push from secondary is forwarded", "/-/push_from_secondary/foo/bar.git/info/refs", "Geo primary received request to path /-/push_from_secondary/foo/bar.git/info/refs"},
+ {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"},
+ {"jobs request is forwarded", "/api/v4/jobs/request", "Geo primary received request to path /api/v4/jobs/request"},
+ {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
+ {"unknown route is forwarded", "/anything", "Geo primary received request to path /anything"},
+ }
+
+ // Enabled initially, run tests
+ runTestCases(t, ws, testCasesProxied)
+
+ // Disable proxying and run tests. It's safe to write to
+ // geoProxyEndpointResponseBody because the polling goroutine is blocked.
+ geoProxyEndpointResponseBody = geoProxyEndpointDisabledResponseBody
+ waitForNextApiPoll()
+
+ runTestCases(t, ws, testCasesLocal)
+
+ // Re-enable proxying and run tests
+ geoProxyEndpointResponseBody = geoProxyEndpointEnabledResponseBody
+ waitForNextApiPoll()
+
+ runTestCases(t, ws, testCasesProxied)
+}
+
func runTestCases(t *testing.T, ws *httptest.Server, testCases []testCase) {
t.Helper()
for _, tc := range testCases {
@@ -195,13 +247,13 @@ func startRemoteServer(serverName string) (*httptest.Server, func()) {
return ts, ts.Close
}
-func startRailsServer(railsServerName string, geoProxyEndpointResponseBody string) (*httptest.Server, func()) {
+func startRailsServer(railsServerName string, geoProxyEndpointResponseBody *string) (*httptest.Server, func()) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var body string
if r.URL.Path == geoProxyEndpoint {
w.Header().Set("Content-Type", "application/vnd.gitlab-workhorse+json")
- body = geoProxyEndpointResponseBody
+ body = *geoProxyEndpointResponseBody
} else {
body = railsServerName + " received request to path " + r.URL.Path
}
@@ -213,15 +265,19 @@ func startRailsServer(railsServerName string, geoProxyEndpointResponseBody strin
return ts, ts.Close
}
-func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func()) {
- geoProxyTestChannel := make(chan struct{})
+func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func(), func()) {
+ geoProxySleepC := make(chan struct{})
+ geoProxySleep := func(time.Duration) {
+ geoProxySleepC <- struct{}{}
+ <-geoProxySleepC
+ }
myConfigureRoutes := func(u *upstream) {
// Enable environment variable "feature flag"
u.enableGeoProxyFeature = enableGeoProxyFeature
- // An empty message will be sent to this channel after every callGeoProxyAPI()
- u.geoProxyTestChannel = geoProxyTestChannel
+ // Replace the time.Sleep function with geoProxySleep
+ u.geoProxyPollSleep = geoProxySleep
// call original
configureRoutes(u)
@@ -231,13 +287,20 @@ func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*h
ws := httptest.NewServer(upstreamHandler)
testhelper.ConfigureSecret()
+ waitForNextApiPoll := func() {}
+
if enableGeoProxyFeature {
- // Wait for an empty message from callGeoProxyAPI(). This should be done on
- // all tests where enableGeoProxyFeature is true, including the ones where
- // we expect geoProxyURL to be nil or error, to ensure the tests do not pass
- // by coincidence.
- <-geoProxyTestChannel
+ // Wait for geoProxySleep to be entered for the first time
+ <-geoProxySleepC
+
+ waitForNextApiPoll = func() {
+ // Cause geoProxySleep to return
+ geoProxySleepC <- struct{}{}
+
+ // Wait for geoProxySleep to be entered again
+ <-geoProxySleepC
+ }
}
- return ws, ws.Close
+ return ws, ws.Close, waitForNextApiPoll
}
diff --git a/workhorse/internal/zipartifacts/open_archive_test.go b/workhorse/internal/zipartifacts/open_archive_test.go
index f7624d053d9..ea1fc606784 100644
--- a/workhorse/internal/zipartifacts/open_archive_test.go
+++ b/workhorse/internal/zipartifacts/open_archive_test.go
@@ -2,56 +2,124 @@ package zipartifacts
import (
"archive/zip"
+ "bytes"
"context"
"fmt"
- "io/ioutil"
+ "io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
+ "runtime"
+ "strings"
"testing"
"github.com/stretchr/testify/require"
)
-func TestOpenHTTPArchive(t *testing.T) {
- const (
- zipFile = "test.zip"
- entryName = "hello.txt"
- contents = "world"
- testRoot = "testdata/public"
- )
-
- require.NoError(t, os.MkdirAll(testRoot, 0755))
- f, err := os.Create(filepath.Join(testRoot, zipFile))
- require.NoError(t, err, "create file")
+func createArchive(t *testing.T, dir string) (map[string][]byte, int64) {
+ f, err := os.Create(filepath.Join(dir, "test.zip"))
+ require.NoError(t, err)
defer f.Close()
-
zw := zip.NewWriter(f)
- w, err := zw.Create(entryName)
- require.NoError(t, err, "create zip entry")
- _, err = fmt.Fprint(w, contents)
- require.NoError(t, err, "write zip entry contents")
- require.NoError(t, zw.Close(), "close zip writer")
- require.NoError(t, f.Close(), "close file")
-
- srv := httptest.NewServer(http.FileServer(http.Dir(testRoot)))
+
+ entries := make(map[string][]byte)
+ for _, size := range []int{0, 32 * 1024, 128 * 1024, 5 * 1024 * 1024} {
+ entryName := fmt.Sprintf("file_%d", size)
+ entries[entryName] = bytes.Repeat([]byte{'z'}, size)
+
+ w, err := zw.Create(entryName)
+ require.NoError(t, err)
+
+ _, err = w.Write(entries[entryName])
+ require.NoError(t, err)
+ }
+
+ require.NoError(t, zw.Close())
+ fi, err := f.Stat()
+ require.NoError(t, err)
+ require.NoError(t, f.Close())
+
+ return entries, fi.Size()
+}
+
+func TestOpenHTTPArchive(t *testing.T) {
+ dir := t.TempDir()
+ entries, _ := createArchive(t, dir)
+
+ srv := httptest.NewServer(http.FileServer(http.Dir(dir)))
defer srv.Close()
- zr, err := OpenArchive(context.Background(), srv.URL+"/"+zipFile)
- require.NoError(t, err, "call OpenArchive")
- require.Len(t, zr.File, 1)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ zr, err := OpenArchive(ctx, srv.URL+"/test.zip")
+ require.NoError(t, err)
+ require.Len(t, zr.File, len(entries))
- zf := zr.File[0]
- require.Equal(t, entryName, zf.Name, "zip entry name")
+ for _, zf := range zr.File {
+ entry, ok := entries[zf.Name]
+ require.True(t, ok)
- entry, err := zf.Open()
- require.NoError(t, err, "get zip entry reader")
- defer entry.Close()
+ r, err := zf.Open()
+ require.NoError(t, err)
- actualContents, err := ioutil.ReadAll(entry)
- require.NoError(t, err, "read zip entry contents")
- require.Equal(t, contents, string(actualContents), "compare zip entry contents")
+ contents, err := io.ReadAll(r)
+ require.NoError(t, err)
+ require.Equal(t, entry, contents)
+
+ require.NoError(t, r.Close())
+ }
+}
+
+func TestMinimalRangeRequests(t *testing.T) {
+ if strings.HasPrefix(runtime.Version(), "go1.17") {
+ t.Skipf("skipped for go1.17: https://gitlab.com/gitlab-org/gitlab/-/issues/340778")
+ }
+
+ dir := t.TempDir()
+ entries, archiveSize := createArchive(t, dir)
+
+ mux := http.NewServeMux()
+
+ var ranges []string
+ mux.HandleFunc("/", func(rw http.ResponseWriter, r *http.Request) {
+ rangeHdr := r.Header.Get("Range")
+ if rangeHdr == "" {
+ rw.Header().Add("Content-Length", fmt.Sprintf("%d", archiveSize))
+ return
+ }
+
+ ranges = append(ranges, rangeHdr)
+ http.FileServer(http.Dir(dir)).ServeHTTP(rw, r)
+ })
+
+ srv := httptest.NewServer(mux)
+ defer srv.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ zr, err := OpenArchive(ctx, srv.URL+"/test.zip")
+ require.NoError(t, err)
+ require.Len(t, zr.File, len(entries))
+
+ require.Len(t, ranges, 2, "range requests should be minimal")
+ require.NotContains(t, ranges, "bytes=0-", "range request should not request from zero")
+
+ for _, zf := range zr.File {
+ r, err := zf.Open()
+ require.NoError(t, err)
+
+ _, err = io.Copy(io.Discard, r)
+ require.NoError(t, err)
+
+ require.NoError(t, r.Close())
+ }
+
+ // ensure minimal requests: https://gitlab.com/gitlab-org/gitlab/-/issues/340778
+ require.Len(t, ranges, 3, "range requests should be minimal")
+ require.Contains(t, ranges, "bytes=0-")
}
func TestOpenHTTPArchiveNotSendingAcceptEncodingHeader(t *testing.T) {
@@ -64,5 +132,8 @@ func TestOpenHTTPArchiveNotSendingAcceptEncodingHeader(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(requestHandler))
defer srv.Close()
- OpenArchive(context.Background(), srv.URL)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ OpenArchive(ctx, srv.URL)
}