diff options
Diffstat (limited to 'workhorse/internal')
20 files changed, 756 insertions, 122 deletions
diff --git a/workhorse/internal/api/api.go b/workhorse/internal/api/api.go index 417ee71dbdc..7f696f70c7a 100644 --- a/workhorse/internal/api/api.go +++ b/workhorse/internal/api/api.go @@ -155,8 +155,6 @@ type Response struct { ProcessLsifReferences bool // The maximum accepted size in bytes of the upload MaximumSize int64 - // DEPRECATED: Feature flag used to determine whether to strip the multipart filename of any directories - FeatureFlagExtractBase bool } // singleJoiningSlash is taken from reverseproxy.go:singleJoiningSlash diff --git a/workhorse/internal/artifacts/artifacts_store_test.go b/workhorse/internal/artifacts/artifacts_store_test.go index 67a7793e853..a01a723298f 100644 --- a/workhorse/internal/artifacts/artifacts_store_test.go +++ b/workhorse/internal/artifacts/artifacts_store_test.go @@ -102,11 +102,11 @@ func TestUploadHandlerSendingToExternalStorage(t *testing.T) { tests := []struct { name string - preauth api.Response + preauth *api.Response }{ { name: "ObjectStore Upload", - preauth: api.Response{ + preauth: &api.Response{ RemoteObject: api.RemoteObject{ StoreURL: storeServer.URL + "/url/put" + qs, ID: "store-id", @@ -145,7 +145,7 @@ func TestUploadHandlerSendingToExternalStorageAndStorageServerUnreachable(t *tes t.Fatal("it should not be called") } - authResponse := api.Response{ + authResponse := &api.Response{ TempPath: tempPath, RemoteObject: api.RemoteObject{ StoreURL: "http://localhost:12323/invalid/url", @@ -171,7 +171,7 @@ func TestUploadHandlerSendingToExternalStorageAndInvalidURLIsUsed(t *testing.T) t.Fatal("it should not be called") } - authResponse := api.Response{ + authResponse := &api.Response{ TempPath: tempPath, RemoteObject: api.RemoteObject{ StoreURL: "htt:////invalid-url", @@ -203,7 +203,7 @@ func TestUploadHandlerSendingToExternalStorageAndItReturnsAnError(t *testing.T) storeServer := httptest.NewServer(storeServerMux) defer storeServer.Close() - authResponse := api.Response{ + authResponse := &api.Response{ RemoteObject: api.RemoteObject{ StoreURL: storeServer.URL + "/url/put", ID: "store-id", @@ -236,7 +236,7 @@ func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testin storeServer := httptest.NewServer(storeServerMux) defer storeServer.Close() - authResponse := api.Response{ + authResponse := &api.Response{ RemoteObject: api.RemoteObject{ StoreURL: storeServer.URL + "/url/put", ID: "store-id", @@ -262,7 +262,7 @@ func TestUploadHandlerMultipartUploadSizeLimit(t *testing.T) { objectURL := server.URL + test.ObjectPath uploadSize := 10 - preauth := api.Response{ + preauth := &api.Response{ RemoteObject: api.RemoteObject{ ID: "store-id", MultipartUpload: &api.MultipartUploadParams{ @@ -304,7 +304,7 @@ func TestUploadHandlerMultipartUploadMaximumSizeFromApi(t *testing.T) { uploadSize := int64(10) maxSize := uploadSize - 1 - preauth := api.Response{ + preauth := &api.Response{ MaximumSize: maxSize, RemoteObject: api.RemoteObject{ ID: "store-id", diff --git a/workhorse/internal/artifacts/artifacts_upload_test.go b/workhorse/internal/artifacts/artifacts_upload_test.go index 2b11d56f4ee..3e8a52be1a1 100644 --- a/workhorse/internal/artifacts/artifacts_upload_test.go +++ b/workhorse/internal/artifacts/artifacts_upload_test.go @@ -35,7 +35,7 @@ const ( Path = "/url/path" ) -func testArtifactsUploadServer(t *testing.T, authResponse api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *httptest.Server { +func testArtifactsUploadServer(t *testing.T, authResponse *api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc(Path+"/authorize", func(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { @@ -51,7 +51,7 @@ func testArtifactsUploadServer(t *testing.T, authResponse api.Response, bodyProc w.Write(data) }) mux.HandleFunc(Path, func(w http.ResponseWriter, r *http.Request) { - opts, err := filestore.GetOpts(&authResponse) + opts, err := filestore.GetOpts(authResponse) require.NoError(t, err) if r.Method != "POST" { @@ -128,7 +128,7 @@ func setupWithTmpPath(t *testing.T, filename string, includeFormat bool, format authResponse = &api.Response{TempPath: tempPath} } - ts := testArtifactsUploadServer(t, *authResponse, bodyProcessor) + ts := testArtifactsUploadServer(t, authResponse, bodyProcessor) var buffer bytes.Buffer writer := multipart.NewWriter(&buffer) diff --git a/workhorse/internal/dependencyproxy/dependencyproxy.go b/workhorse/internal/dependencyproxy/dependencyproxy.go new file mode 100644 index 00000000000..cfb3045544f --- /dev/null +++ b/workhorse/internal/dependencyproxy/dependencyproxy.go @@ -0,0 +1,123 @@ +package dependencyproxy + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "time" + + "gitlab.com/gitlab-org/labkit/correlation" + "gitlab.com/gitlab-org/labkit/log" + "gitlab.com/gitlab-org/labkit/tracing" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/senddata" +) + +// httpTransport defines a http.Transport with values +// that are more restrictive than for http.DefaultTransport, +// they define shorter TLS Handshake, and more aggressive connection closing +// to prevent the connection hanging and reduce FD usage +var httpTransport = tracing.NewRoundTripper(correlation.NewInstrumentedRoundTripper(&http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 10 * time.Second, + }).DialContext, + MaxIdleConns: 2, + IdleConnTimeout: 30 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 10 * time.Second, + ResponseHeaderTimeout: 30 * time.Second, +})) + +var httpClient = &http.Client{ + Transport: httpTransport, +} + +type Injector struct { + senddata.Prefix + uploadHandler http.Handler +} + +type entryParams struct { + Url string + Header http.Header +} + +type nullResponseWriter struct { + header http.Header + status int +} + +func (nullResponseWriter) Write(p []byte) (int, error) { + return len(p), nil +} + +func (w *nullResponseWriter) Header() http.Header { + return w.header +} + +func (w *nullResponseWriter) WriteHeader(status int) { + if w.status == 0 { + w.status = status + } +} + +func NewInjector() *Injector { + return &Injector{Prefix: "send-dependency:"} +} + +func (p *Injector) SetUploadHandler(uploadHandler http.Handler) { + p.uploadHandler = uploadHandler +} + +func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData string) { + dependencyResponse, err := p.fetchUrl(r.Context(), sendData) + if err != nil { + helper.Fail500(w, r, err) + return + } + defer dependencyResponse.Body.Close() + if dependencyResponse.StatusCode >= 400 { + w.WriteHeader(dependencyResponse.StatusCode) + io.Copy(w, dependencyResponse.Body) + return + } + + teeReader := io.TeeReader(dependencyResponse.Body, w) + saveFileRequest, err := http.NewRequestWithContext(r.Context(), "POST", r.URL.String()+"/upload", teeReader) + if err != nil { + helper.Fail500(w, r, fmt.Errorf("dependency proxy: failed to create request: %w", err)) + } + saveFileRequest.Header = helper.HeaderClone(r.Header) + saveFileRequest.ContentLength = dependencyResponse.ContentLength + + w.Header().Del("Content-Length") + + nrw := &nullResponseWriter{header: make(http.Header)} + p.uploadHandler.ServeHTTP(nrw, saveFileRequest) + + if nrw.status != http.StatusOK { + fields := log.Fields{"code": nrw.status} + + helper.Fail500WithFields(nrw, r, fmt.Errorf("dependency proxy: failed to upload file"), fields) + } +} + +func (p *Injector) fetchUrl(ctx context.Context, sendData string) (*http.Response, error) { + var params entryParams + if err := p.Unpack(¶ms, sendData); err != nil { + return nil, fmt.Errorf("dependency proxy: unpack sendData: %v", err) + } + + r, err := http.NewRequestWithContext(ctx, "GET", params.Url, nil) + if err != nil { + return nil, fmt.Errorf("dependency proxy: failed to fetch dependency: %v", err) + } + r.Header = params.Header + + return httpClient.Do(r) +} diff --git a/workhorse/internal/dependencyproxy/dependencyproxy_test.go b/workhorse/internal/dependencyproxy/dependencyproxy_test.go new file mode 100644 index 00000000000..37e54c0b756 --- /dev/null +++ b/workhorse/internal/dependencyproxy/dependencyproxy_test.go @@ -0,0 +1,183 @@ +package dependencyproxy + +import ( + "encoding/base64" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload" +) + +type fakeUploadHandler struct { + request *http.Request + body []byte + handler func(w http.ResponseWriter, r *http.Request) +} + +func (f *fakeUploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + f.request = r + + f.body, _ = io.ReadAll(r.Body) + + f.handler(w, r) +} + +type errWriter struct{ writes int } + +func (w *errWriter) Header() http.Header { return nil } +func (w *errWriter) WriteHeader(h int) {} + +// First call of Write function succeeds while all the subsequent ones fail +func (w *errWriter) Write(p []byte) (int, error) { + if w.writes > 0 { + return 0, fmt.Errorf("client error") + } + + w.writes++ + + return len(p), nil +} + +type fakePreAuthHandler struct{} + +func (f *fakePreAuthHandler) PreAuthorizeHandler(handler api.HandleFunc, _ string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handler(w, r, &api.Response{TempPath: "../../testdata/scratch"}) + }) +} + +func TestInject(t *testing.T) { + contentLength := 32768 + 1 + content := strings.Repeat("p", contentLength) + + testCases := []struct { + desc string + responseWriter http.ResponseWriter + contentLength int + handlerMustBeCalled bool + }{ + { + desc: "the uploading successfully finalized", + responseWriter: httptest.NewRecorder(), + contentLength: contentLength, + handlerMustBeCalled: true, + }, { + desc: "a user failed to receive the response", + responseWriter: &errWriter{}, + contentLength: contentLength, + handlerMustBeCalled: false, + }, { + desc: "the origin resource server returns partial response", + responseWriter: httptest.NewRecorder(), + contentLength: contentLength + 1, + handlerMustBeCalled: false, + }, + } + testhelper.ConfigureSecret() + + for _, tc := range testCases { + originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Length", strconv.Itoa(tc.contentLength)) + w.Write([]byte(content)) + })) + defer originResourceServer.Close() + + // BodyUploader expects http.Handler as its second param, we can create a stub function and verify that + // it's only called for successful requests + handlerIsCalled := false + handlerFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handlerIsCalled = true }) + + bodyUploader := upload.BodyUploader(&fakePreAuthHandler{}, handlerFunc, &upload.DefaultPreparer{}) + + injector := NewInjector() + injector.SetUploadHandler(bodyUploader) + + r := httptest.NewRequest("GET", "/target", nil) + sendData := base64.StdEncoding.EncodeToString([]byte(`{"Token": "token", "Url": "` + originResourceServer.URL + `/url"}`)) + + injector.Inject(tc.responseWriter, r, sendData) + + require.Equal(t, tc.handlerMustBeCalled, handlerIsCalled, "a partial file must not be saved") + } +} + +func TestSuccessfullRequest(t *testing.T) { + content := []byte("result") + originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Length", strconv.Itoa(len(content))) + w.Write(content) + })) + + uploadHandler := &fakeUploadHandler{ + handler: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + }, + } + + injector := NewInjector() + injector.SetUploadHandler(uploadHandler) + + response := makeRequest(injector, `{"Token": "token", "Url": "`+originResourceServer.URL+`/url"}`) + + require.Equal(t, "/target/upload", uploadHandler.request.URL.Path) + require.Equal(t, int64(6), uploadHandler.request.ContentLength) + + require.Equal(t, content, uploadHandler.body) + + require.Equal(t, 200, response.Code) + require.Equal(t, string(content), response.Body.String()) +} + +func TestIncorrectSendData(t *testing.T) { + response := makeRequest(NewInjector(), "") + + require.Equal(t, 500, response.Code) + require.Equal(t, "Internal server error\n", response.Body.String()) +} + +func TestIncorrectSendDataUrl(t *testing.T) { + response := makeRequest(NewInjector(), `{"Token": "token", "Url": "url"}`) + + require.Equal(t, 500, response.Code) + require.Equal(t, "Internal server error\n", response.Body.String()) +} + +func TestFailedOriginServer(t *testing.T) { + originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(404) + w.Write([]byte("Not found")) + })) + + uploadHandler := &fakeUploadHandler{ + handler: func(w http.ResponseWriter, r *http.Request) { + require.FailNow(t, "the error response must not be uploaded") + }, + } + + injector := NewInjector() + injector.SetUploadHandler(uploadHandler) + + response := makeRequest(injector, `{"Token": "token", "Url": "`+originResourceServer.URL+`/url"}`) + + require.Equal(t, 404, response.Code) + require.Equal(t, "Not found", response.Body.String()) +} + +func makeRequest(injector *Injector, data string) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + r := httptest.NewRequest("GET", "/target", nil) + + sendData := base64.StdEncoding.EncodeToString([]byte(data)) + injector.Inject(w, r, sendData) + + return w +} diff --git a/workhorse/internal/filestore/save_file_opts_test.go b/workhorse/internal/filestore/save_file_opts_test.go index ead5ba86889..f389b2054e5 100644 --- a/workhorse/internal/filestore/save_file_opts_test.go +++ b/workhorse/internal/filestore/save_file_opts_test.go @@ -141,21 +141,21 @@ func TestGetOpts(t *testing.T) { func TestGetOptsFail(t *testing.T) { testCases := []struct { desc string - in api.Response + in *api.Response }{ { desc: "neither local nor remote", - in: api.Response{}, + in: &api.Response{}, }, { desc: "both local and remote", - in: api.Response{TempPath: "/foobar", RemoteObject: api.RemoteObject{ID: "id"}}, + in: &api.Response{TempPath: "/foobar", RemoteObject: api.RemoteObject{ID: "id"}}, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - _, err := filestore.GetOpts(&tc.in) + _, err := filestore.GetOpts(tc.in) require.Error(t, err, "expect input to be rejected") }) } diff --git a/workhorse/internal/git/archive.go b/workhorse/internal/git/archive.go index bb35f2eb269..fc12094cc14 100644 --- a/workhorse/internal/git/archive.go +++ b/workhorse/internal/git/archive.go @@ -102,7 +102,7 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string var archiveReader io.Reader - archiveReader, err = handleArchiveWithGitaly(r, params, format) + archiveReader, err = handleArchiveWithGitaly(r, ¶ms, format) if err != nil { helper.Fail500(w, r, fmt.Errorf("operations.GetArchive: %v", err)) return @@ -130,7 +130,7 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string } } -func handleArchiveWithGitaly(r *http.Request, params archiveParams, format gitalypb.GetArchiveRequest_Format) (io.Reader, error) { +func handleArchiveWithGitaly(r *http.Request, params *archiveParams, format gitalypb.GetArchiveRequest_Format) (io.Reader, error) { var request *gitalypb.GetArchiveRequest ctx, c, err := gitaly.NewRepositoryClient(r.Context(), params.GitalyServer) if err != nil { diff --git a/workhorse/internal/git/info-refs.go b/workhorse/internal/git/info-refs.go index 0d8edbeff06..8390143b99b 100644 --- a/workhorse/internal/git/info-refs.go +++ b/workhorse/internal/git/info-refs.go @@ -8,8 +8,8 @@ import ( "net/http" "github.com/golang/gddo/httputil" - - "gitlab.com/gitlab-org/labkit/log" + grpccodes "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" @@ -26,6 +26,7 @@ func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response) defer responseWriter.Log(r, 0) rpc := getService(r) + if !(rpc == "git-upload-pack" || rpc == "git-receive-pack") { // The 'dumb' Git HTTP protocol is not supported http.Error(responseWriter, "Not Found", 404) @@ -41,19 +42,26 @@ func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response) encoding := httputil.NegotiateContentEncoding(r, offers) if err := handleGetInfoRefsWithGitaly(r.Context(), responseWriter, a, rpc, gitProtocol, encoding); err != nil { - helper.Fail500(responseWriter, r, fmt.Errorf("handleGetInfoRefs: %v", err)) + status := grpcstatus.Convert(err) + err = fmt.Errorf("handleGetInfoRefs: %v", err) + + if status != nil && status.Code() == grpccodes.Unavailable { + helper.CaptureAndFail(responseWriter, r, err, "The git server, Gitaly, is not available at this time. Please contact your administrator.", http.StatusServiceUnavailable) + } else { + helper.Fail500(responseWriter, r, err) + } } } func handleGetInfoRefsWithGitaly(ctx context.Context, responseWriter *HttpResponseWriter, a *api.Response, rpc, gitProtocol, encoding string) error { ctx, smarthttp, err := gitaly.NewSmartHTTPClient(ctx, a.GitalyServer) if err != nil { - return fmt.Errorf("GetInfoRefsHandler: %v", err) + return err } infoRefsResponseReader, err := smarthttp.InfoRefsResponseReader(ctx, &a.Repository, rpc, gitConfigOptions(a), gitProtocol) if err != nil { - return fmt.Errorf("GetInfoRefsHandler: %v", err) + return err } var w io.Writer @@ -69,7 +77,7 @@ func handleGetInfoRefsWithGitaly(ctx context.Context, responseWriter *HttpRespon } if _, err = io.Copy(w, infoRefsResponseReader); err != nil { - log.WithError(err).Error("GetInfoRefsHandler: error copying gitaly response") + return err } return nil diff --git a/workhorse/internal/git/info-refs_test.go b/workhorse/internal/git/info-refs_test.go new file mode 100644 index 00000000000..4f23d1ac174 --- /dev/null +++ b/workhorse/internal/git/info-refs_test.go @@ -0,0 +1,42 @@ +package git + +import ( + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + grpccodes "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" + + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" +) + +type smartHTTPServiceServerWithInfoRefs struct { + gitalypb.UnimplementedSmartHTTPServiceServer + InfoRefsUploadPackFunc func(*gitalypb.InfoRefsRequest, gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error +} + +func (srv *smartHTTPServiceServerWithInfoRefs) InfoRefsUploadPack(r *gitalypb.InfoRefsRequest, s gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error { + return srv.InfoRefsUploadPackFunc(r, s) +} + +func TestGetInfoRefsHandler(t *testing.T) { + addr := startSmartHTTPServer(t, &smartHTTPServiceServerWithInfoRefs{ + InfoRefsUploadPackFunc: func(r *gitalypb.InfoRefsRequest, s gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error { + return grpcstatus.Error(grpccodes.Unavailable, "error") + }, + }) + + w := httptest.NewRecorder() + r := httptest.NewRequest("GET", "/?service=git-upload-pack", nil) + a := &api.Response{GitalyServer: gitaly.Server{Address: addr}} + + handleGetInfoRefs(NewHttpResponseWriter(w), r, a) + require.Equal(t, 503, w.Code) + + msg := "The git server, Gitaly, is not available at this time. Please contact your administrator.\n" + require.Equal(t, msg, w.Body.String()) +} diff --git a/workhorse/internal/git/upload-pack_test.go b/workhorse/internal/git/upload-pack_test.go index ebefe360122..211f68a2608 100644 --- a/workhorse/internal/git/upload-pack_test.go +++ b/workhorse/internal/git/upload-pack_test.go @@ -45,14 +45,13 @@ func TestUploadPackTimesOut(t *testing.T) { uploadPackTimeout = time.Millisecond defer func() { uploadPackTimeout = originalUploadPackTimeout }() - addr, cleanUp := startSmartHTTPServer(t, &smartHTTPServiceServer{ + addr := startSmartHTTPServer(t, &smartHTTPServiceServer{ PostUploadPackFunc: func(stream gitalypb.SmartHTTPService_PostUploadPackServer) error { _, err := stream.Recv() // trigger a read on the client request body require.NoError(t, err) return nil }, }) - defer cleanUp() body := &fakeReader{n: 0, err: nil} @@ -64,7 +63,9 @@ func TestUploadPackTimesOut(t *testing.T) { require.EqualError(t, err, "smarthttp.UploadPack: busyReader: context deadline exceeded") } -func startSmartHTTPServer(t testing.TB, s gitalypb.SmartHTTPServiceServer) (string, func()) { +func startSmartHTTPServer(t testing.TB, s gitalypb.SmartHTTPServiceServer) string { + t.Helper() + tmp, err := ioutil.TempDir("", "") require.NoError(t, err) @@ -78,8 +79,10 @@ func startSmartHTTPServer(t testing.TB, s gitalypb.SmartHTTPServiceServer) (stri require.NoError(t, srv.Serve(ln)) }() - return fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr().String()), func() { + t.Cleanup(func() { srv.GracefulStop() require.NoError(t, os.RemoveAll(tmp), "error removing temp dir %q", tmp) - } + }) + + return fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr().String()) } diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go index 6ea99962056..362f380dc4d 100644 --- a/workhorse/internal/gitaly/gitaly.go +++ b/workhorse/internal/gitaly/gitaly.go @@ -11,6 +11,7 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -23,15 +24,19 @@ import ( ) type Server struct { - Address string `json:"address"` - Token string `json:"token"` - Features map[string]string `json:"features"` + Address string `json:"address"` + Token string `json:"token"` + Features map[string]string `json:"features"` + Sidechannel bool `json:"sidechannel"` } -type cacheKey struct{ address, token string } +type cacheKey struct { + address, token string + sidechannel bool +} func (server Server) cacheKey() cacheKey { - return cacheKey{address: server.Address, token: server.Token} + return cacheKey{address: server.Address, token: server.Token, sidechannel: server.Sidechannel} } type connectionsCache struct { @@ -41,9 +46,17 @@ type connectionsCache struct { var ( jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true} - cache = connectionsCache{ + // This connection cache map contains two types of connections: + // - Normal gRPC connections + // - Sidechannel connections. When client dials to the Gitaly server, the + // server multiplexes the connection using Yamux. In the future, the server + // can open another stream to transfer data without gRPC. Besides, we apply + // a framing protocol to add the half-close capability to Yamux streams. + // Hence, we cannot use those connections interchangeably. + cache = connectionsCache{ connections: make(map[cacheKey]*grpc.ClientConn), } + sidechannelRegistry *gitalyclient.SidechannelRegistry connectionsTotal = promauto.NewCounterVec( prometheus.CounterOpts{ @@ -54,6 +67,12 @@ var ( ) ) +func InitializeSidechannelRegistry(logger *logrus.Logger) { + if sidechannelRegistry == nil { + sidechannelRegistry = gitalyclient.NewSidechannelRegistry(logrus.NewEntry(logger)) + } +} + func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context { md := metadata.New(nil) for k, v := range features { @@ -72,7 +91,12 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S return nil, nil, err } grpcClient := gitalypb.NewSmartHTTPServiceClient(conn) - return withOutgoingMetadata(ctx, server.Features), &SmartHTTPClient{grpcClient}, nil + smartHTTPClient := &SmartHTTPClient{ + SmartHTTPServiceClient: grpcClient, + sidechannelRegistry: sidechannelRegistry, + useSidechannel: server.Sidechannel, + } + return withOutgoingMetadata(ctx, server.Features), smartHTTPClient, nil } func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) { @@ -173,7 +197,13 @@ func newConnection(server Server) (*grpc.ClientConn, error) { ), ) - conn, connErr := gitalyclient.Dial(server.Address, connOpts) + var conn *grpc.ClientConn + var connErr error + if server.Sidechannel { + conn, connErr = gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background + } else { + conn, connErr = gitalyclient.Dial(server.Address, connOpts) + } label := "ok" if connErr != nil { diff --git a/workhorse/internal/gitaly/gitaly_test.go b/workhorse/internal/gitaly/gitaly_test.go index b17fb5c1d7b..9c54caae8c6 100644 --- a/workhorse/internal/gitaly/gitaly_test.go +++ b/workhorse/internal/gitaly/gitaly_test.go @@ -4,14 +4,32 @@ import ( "context" "testing" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" ) func TestNewSmartHTTPClient(t *testing.T) { - ctx, _, err := NewSmartHTTPClient(context.Background(), serverFixture()) + ctx, client, err := NewSmartHTTPClient(context.Background(), serverFixture()) require.NoError(t, err) testOutgoingMetadata(t, ctx) + + require.False(t, client.useSidechannel) + require.Nil(t, client.sidechannelRegistry) +} + +func TestNewSmartHTTPClientWithSidechannel(t *testing.T) { + InitializeSidechannelRegistry(logrus.StandardLogger()) + + fixture := serverFixture() + fixture.Sidechannel = true + + ctx, client, err := NewSmartHTTPClient(context.Background(), fixture) + require.NoError(t, err) + testOutgoingMetadata(t, ctx) + + require.True(t, client.useSidechannel) + require.NotNil(t, client.sidechannelRegistry) } func TestNewBlobClient(t *testing.T) { diff --git a/workhorse/internal/gitaly/smarthttp.go b/workhorse/internal/gitaly/smarthttp.go index 69656ab0a92..de6954efa60 100644 --- a/workhorse/internal/gitaly/smarthttp.go +++ b/workhorse/internal/gitaly/smarthttp.go @@ -5,11 +5,14 @@ import ( "fmt" "io" + gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" ) type SmartHTTPClient struct { + useSidechannel bool + sidechannelRegistry *gitalyclient.SidechannelRegistry gitalypb.SmartHTTPServiceClient } @@ -93,6 +96,14 @@ func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.R } func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { + if client.useSidechannel { + return client.runUploadPackWithSidechannel(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol) + } + + return client.runUploadPack(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol) +} + +func (client *SmartHTTPClient) runUploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { stream, err := client.PostUploadPack(ctx) if err != nil { return err @@ -137,3 +148,38 @@ func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Re return nil } + +func (client *SmartHTTPClient) runUploadPackWithSidechannel(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { + ctx, waiter := client.sidechannelRegistry.Register(ctx, func(conn gitalyclient.SidechannelConn) error { + if _, err := io.Copy(conn, clientRequest); err != nil { + return err + } + + if err := conn.CloseWrite(); err != nil { + return fmt.Errorf("fail to signal sidechannel half-close: %w", err) + } + + if _, err := io.Copy(clientResponse, conn); err != nil { + return err + } + + return nil + }) + defer waiter.Close() + + rpcRequest := &gitalypb.PostUploadPackWithSidechannelRequest{ + Repository: repo, + GitConfigOptions: gitConfigOptions, + GitProtocol: gitProtocol, + } + + if _, err := client.PostUploadPackWithSidechannel(ctx, rpcRequest); err != nil { + return err + } + + if err := waiter.Close(); err != nil { + return fmt.Errorf("fail to close sidechannel connection: %w", err) + } + + return nil +} diff --git a/workhorse/internal/gitaly/unmarshal_test.go b/workhorse/internal/gitaly/unmarshal_test.go index 270b96f900d..730eff4005a 100644 --- a/workhorse/internal/gitaly/unmarshal_test.go +++ b/workhorse/internal/gitaly/unmarshal_test.go @@ -3,6 +3,7 @@ package gitaly import ( "testing" + "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" ) @@ -11,25 +12,25 @@ func TestUnmarshalJSON(t *testing.T) { testCases := []struct { desc string in string - out gitalypb.Repository + out *gitalypb.Repository }{ { desc: "basic example", in: `{"relative_path":"foo/bar.git"}`, - out: gitalypb.Repository{RelativePath: "foo/bar.git"}, + out: &gitalypb.Repository{RelativePath: "foo/bar.git"}, }, { desc: "unknown field", in: `{"relative_path":"foo/bar.git","unknown_field":12345}`, - out: gitalypb.Repository{RelativePath: "foo/bar.git"}, + out: &gitalypb.Repository{RelativePath: "foo/bar.git"}, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - result := gitalypb.Repository{} - require.NoError(t, UnmarshalJSON(tc.in, &result)) - require.Equal(t, tc.out, result) + result := &gitalypb.Repository{} + require.NoError(t, UnmarshalJSON(tc.in, result)) + require.True(t, proto.Equal(tc.out, result)) }) } } diff --git a/workhorse/internal/testhelper/gitaly.go b/workhorse/internal/testhelper/gitaly.go index 13e71c570a7..da2fbf30785 100644 --- a/workhorse/internal/testhelper/gitaly.go +++ b/workhorse/internal/testhelper/gitaly.go @@ -1,6 +1,7 @@ package testhelper import ( + "bytes" "fmt" "io" "io/ioutil" @@ -11,6 +12,7 @@ import ( "github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "golang.org/x/net/context" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -23,6 +25,7 @@ type GitalyTestServer struct { finalMessageCode codes.Code sync.WaitGroup LastIncomingMetadata metadata.MD + gitalypb.UnimplementedSmartHTTPServiceServer gitalypb.UnimplementedRepositoryServiceServer gitalypb.UnimplementedBlobServiceServer gitalypb.UnimplementedDiffServiceServer @@ -191,13 +194,14 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU return err } - jsonString, err := marshalJSON(req) - if err != nil { + marshaler := &jsonpb.Marshaler{} + jsonBytes := &bytes.Buffer{} + if err := marshaler.Marshal(jsonBytes, req); err != nil { return err } if err := stream.Send(&gitalypb.PostUploadPackResponse{ - Data: []byte(strings.Join([]string{jsonString}, "\000") + "\000"), + Data: append(jsonBytes.Bytes(), 0), }); err != nil { return err } @@ -229,6 +233,43 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU return s.finalError() } +// PostUploadPackWithSidechannel should be a part of smarthttp server in real +// server. In workhorse, setting up a real sidechannel server is troublesome. +// Therefore, we bring up a sidechannel server with a mock server exported via +// gitalyclient.TestSidechannelServer. This is the handler for that mock +// server. +func PostUploadPackWithSidechannel(srv interface{}, stream grpc.ServerStream, conn io.ReadWriteCloser) error { + if method, ok := grpc.Method(stream.Context()); !ok || method != "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel" { + return fmt.Errorf("unexpected method: %s", method) + } + + var req gitalypb.PostUploadPackWithSidechannelRequest + if err := stream.RecvMsg(&req); err != nil { + return err + } + + if err := validateRepository(req.GetRepository()); err != nil { + return err + } + + marshaler := &jsonpb.Marshaler{} + jsonBytes := &bytes.Buffer{} + if err := marshaler.Marshal(jsonBytes, &req); err != nil { + return err + } + + // Bounce back all data back to the client, plus flushing bytes + if _, err := conn.Write(append(jsonBytes.Bytes(), 0)); err != nil { + return err + } + + if _, err := io.Copy(conn, conn); err != nil { + return err + } + + return stream.SendMsg(&gitalypb.PostUploadPackWithSidechannelResponse{}) +} + func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAncestorRequest) (*gitalypb.CommitIsAncestorResponse, error) { return nil, nil } diff --git a/workhorse/internal/upload/uploads_test.go b/workhorse/internal/upload/uploads_test.go index 5fae5cf7bcb..29acf849e05 100644 --- a/workhorse/internal/upload/uploads_test.go +++ b/workhorse/internal/upload/uploads_test.go @@ -271,19 +271,19 @@ func TestUploadProcessingFile(t *testing.T) { tests := []struct { name string - preauth api.Response + preauth *api.Response }{ { name: "FileStore Upload", - preauth: api.Response{TempPath: tempPath}, + preauth: &api.Response{TempPath: tempPath}, }, { name: "ObjectStore Upload", - preauth: api.Response{RemoteObject: api.RemoteObject{StoreURL: storeUrl}}, + preauth: &api.Response{RemoteObject: api.RemoteObject{StoreURL: storeUrl}}, }, { name: "ObjectStore and FileStore Upload", - preauth: api.Response{ + preauth: &api.Response{ TempPath: tempPath, RemoteObject: api.RemoteObject{StoreURL: storeUrl}, }, diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go index 8c85c5144e5..d39ba845dc5 100644 --- a/workhorse/internal/upstream/routes.go +++ b/workhorse/internal/upstream/routes.go @@ -16,6 +16,7 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/builds" "gitlab.com/gitlab-org/gitlab/workhorse/internal/channel" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/dependencyproxy" "gitlab.com/gitlab-org/gitlab/workhorse/internal/git" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/imageresizer" @@ -56,6 +57,7 @@ const ( apiPattern = `^/api/` ciAPIPattern = `^/ci/api/` gitProjectPattern = `^/.+\.git/` + geoGitProjectPattern = `^/[^-].+\.git/` // Prevent matching routes like /-/push_from_secondary projectPattern = `^/([^/]+/){1,}[^/]+/` apiProjectPattern = apiPattern + `v4/projects/[^/]+/` // API: Projects can be encoded via group%2Fsubgroup%2Fproject snippetUploadPattern = `^/uploads/personal_snippet` @@ -170,7 +172,7 @@ func (ro *routeEntry) isMatch(cleanedPath string, req *http.Request) bool { return ok } -func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config) http.Handler { +func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config, dependencyProxyInjector *dependencyproxy.Injector) http.Handler { proxier := proxypkg.NewProxy(backend, version, rt) return senddata.SendData( @@ -183,6 +185,7 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf artifacts.SendEntry, sendurl.SendURL, imageresizer.NewResizer(cfg), + dependencyProxyInjector, ) } @@ -193,7 +196,8 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf func configureRoutes(u *upstream) { api := u.APIClient static := &staticpages.Static{DocumentRoot: u.DocumentRoot, Exclude: staticExclude} - proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config) + dependencyProxyInjector := dependencyproxy.NewInjector() + proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config, dependencyProxyInjector) cableProxy := proxypkg.NewProxy(u.CableBackend, u.Version, u.CableRoundTripper) assetsNotFoundHandler := NotFoundUnless(u.DevelopmentMode, proxy) @@ -207,7 +211,7 @@ func configureRoutes(u *upstream) { } signingTripper := secret.NewRoundTripper(u.RoundTripper, u.Version) - signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config) + signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config, dependencyProxyInjector) preparers := createUploadPreparers(u.Config) uploadPath := path.Join(u.DocumentRoot, "uploads/tmp") @@ -215,6 +219,8 @@ func configureRoutes(u *upstream) { ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout) ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration) + dependencyProxyInjector.SetUploadHandler(upload.BodyUploader(api, signingProxy, preparers.packages)) + // Serve static files or forward the requests defaultUpstream := static.ServeExisting( u.URLPrefix, @@ -348,10 +354,9 @@ func configureRoutes(u *upstream) { // pulls are performed against a different source of truth. Ideally, we'd // proxy/redirect pulls as well, when the secondary is not up-to-date. // - u.route("GET", gitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)), - u.route("POST", gitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))), - u.route("POST", gitProjectPattern+`git-receive-pack\z`, contentEncodingHandler(git.ReceivePack(api)), withMatcher(isContentType("application/x-git-receive-pack-request"))), - u.route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, lfs.PutStore(api, signingProxy, preparers.lfs), withMatcher(isContentType("application/octet-stream"))), + u.route("GET", geoGitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)), + u.route("POST", geoGitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))), + u.route("GET", geoGitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})\z`, defaultUpstream), // Serve health checks from this Geo secondary u.route("", "^/-/(readiness|liveness)$", static.DeployPage(probeUpstream)), diff --git a/workhorse/internal/upstream/upstream.go b/workhorse/internal/upstream/upstream.go index d6e5e7766b5..e57f58d59dd 100644 --- a/workhorse/internal/upstream/upstream.go +++ b/workhorse/internal/upstream/upstream.go @@ -50,7 +50,7 @@ type upstream struct { geoLocalRoutes []routeEntry geoProxyCableRoute routeEntry geoProxyRoute routeEntry - geoProxyTestChannel chan struct{} + geoProxyPollSleep func(time.Duration) accessLogger *logrus.Logger enableGeoProxyFeature bool mu sync.RWMutex @@ -68,6 +68,9 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback enableGeoProxyFeature: os.Getenv("GEO_SECONDARY_PROXY") == "1", geoProxyBackend: &url.URL{}, } + if up.geoProxyPollSleep == nil { + up.geoProxyPollSleep = time.Sleep + } if up.Backend == nil { up.Backend = DefaultBackend } @@ -205,13 +208,7 @@ func (u *upstream) findGeoProxyRoute(cleanedPath string, r *http.Request) *route func (u *upstream) pollGeoProxyAPI() { for { u.callGeoProxyAPI() - - // Notify tests when callGeoProxyAPI() finishes - if u.geoProxyTestChannel != nil { - u.geoProxyTestChannel <- struct{}{} - } - - time.Sleep(geoProxyApiPollingInterval) + u.geoProxyPollSleep(geoProxyApiPollingInterval) } } @@ -234,6 +231,11 @@ func (u *upstream) updateGeoProxyFields(geoProxyURL *url.URL) { defer u.mu.Unlock() u.geoProxyBackend = geoProxyURL + + if u.geoProxyBackend.String() == "" { + return + } + geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode) geoProxyUpstream := proxypkg.NewProxy(u.geoProxyBackend, u.Version, geoProxyRoundTripper) u.geoProxyCableRoute = u.wsRoute(`^/-/cable\z`, geoProxyUpstream) diff --git a/workhorse/internal/upstream/upstream_test.go b/workhorse/internal/upstream/upstream_test.go index c86c03920f0..3c942767384 100644 --- a/workhorse/internal/upstream/upstream_test.go +++ b/workhorse/internal/upstream/upstream_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" @@ -71,10 +72,10 @@ func TestGeoProxyFeatureDisabledOnGeoSecondarySite(t *testing.T) { defer rsDeferredClose() geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL) - railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) defer deferredClose() - ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false) + ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, false) defer wsDeferredClose() testCases := []testCase{ @@ -91,13 +92,15 @@ func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) { defer rsDeferredClose() geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL) - railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) defer deferredClose() - ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) + ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true) defer wsDeferredClose() testCases := []testCase{ + {"push from secondary is forwarded", "/-/push_from_secondary/foo/bar.git/info/refs", "Geo primary received request to path /-/push_from_secondary/foo/bar.git/info/refs"}, + {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"}, {"jobs request is forwarded", "/api/v4/jobs/request", "Geo primary received request to path /api/v4/jobs/request"}, {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, {"unknown route is forwarded", "/anything", "Geo primary received request to path /anything"}, @@ -109,13 +112,14 @@ func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) { // This test can be removed when the environment variable `GEO_SECONDARY_PROXY` is removed func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) { geoProxyEndpointResponseBody := "{}" - railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) defer deferredClose() - ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false) + ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, false) defer wsDeferredClose() testCases := []testCase{ + {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"}, {"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"}, {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, {"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"}, @@ -126,13 +130,14 @@ func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) { func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) { geoProxyEndpointResponseBody := "{}" - railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) defer deferredClose() - ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) + ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true) defer wsDeferredClose() testCases := []testCase{ + {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"}, {"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"}, {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, {"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"}, @@ -143,13 +148,14 @@ func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) { func TestGeoProxyFeatureEnabledButWithAPIError(t *testing.T) { geoProxyEndpointResponseBody := "Invalid response" - railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) defer deferredClose() - ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) + ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true) defer wsDeferredClose() testCases := []testCase{ + {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"}, {"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"}, {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, {"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"}, @@ -158,6 +164,52 @@ func TestGeoProxyFeatureEnabledButWithAPIError(t *testing.T) { runTestCases(t, ws, testCases) } +func TestGeoProxyFeatureEnablingAndDisabling(t *testing.T) { + remoteServer, rsDeferredClose := startRemoteServer("Geo primary") + defer rsDeferredClose() + + geoProxyEndpointEnabledResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL) + geoProxyEndpointDisabledResponseBody := "{}" + geoProxyEndpointResponseBody := geoProxyEndpointEnabledResponseBody + + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) + defer deferredClose() + + ws, wsDeferredClose, waitForNextApiPoll := startWorkhorseServer(railsServer.URL, true) + defer wsDeferredClose() + + testCasesLocal := []testCase{ + {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"}, + {"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"}, + {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, + {"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"}, + } + + testCasesProxied := []testCase{ + {"push from secondary is forwarded", "/-/push_from_secondary/foo/bar.git/info/refs", "Geo primary received request to path /-/push_from_secondary/foo/bar.git/info/refs"}, + {"LFS files are served locally", "/group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6", "Local Rails server received request to path /group/project.git/gitlab-lfs/objects/37446575700829a11278ad3a550f244f45d5ae4fe1552778fa4f041f9eaeecf6"}, + {"jobs request is forwarded", "/api/v4/jobs/request", "Geo primary received request to path /api/v4/jobs/request"}, + {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, + {"unknown route is forwarded", "/anything", "Geo primary received request to path /anything"}, + } + + // Enabled initially, run tests + runTestCases(t, ws, testCasesProxied) + + // Disable proxying and run tests. It's safe to write to + // geoProxyEndpointResponseBody because the polling goroutine is blocked. + geoProxyEndpointResponseBody = geoProxyEndpointDisabledResponseBody + waitForNextApiPoll() + + runTestCases(t, ws, testCasesLocal) + + // Re-enable proxying and run tests + geoProxyEndpointResponseBody = geoProxyEndpointEnabledResponseBody + waitForNextApiPoll() + + runTestCases(t, ws, testCasesProxied) +} + func runTestCases(t *testing.T, ws *httptest.Server, testCases []testCase) { t.Helper() for _, tc := range testCases { @@ -195,13 +247,13 @@ func startRemoteServer(serverName string) (*httptest.Server, func()) { return ts, ts.Close } -func startRailsServer(railsServerName string, geoProxyEndpointResponseBody string) (*httptest.Server, func()) { +func startRailsServer(railsServerName string, geoProxyEndpointResponseBody *string) (*httptest.Server, func()) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var body string if r.URL.Path == geoProxyEndpoint { w.Header().Set("Content-Type", "application/vnd.gitlab-workhorse+json") - body = geoProxyEndpointResponseBody + body = *geoProxyEndpointResponseBody } else { body = railsServerName + " received request to path " + r.URL.Path } @@ -213,15 +265,19 @@ func startRailsServer(railsServerName string, geoProxyEndpointResponseBody strin return ts, ts.Close } -func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func()) { - geoProxyTestChannel := make(chan struct{}) +func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func(), func()) { + geoProxySleepC := make(chan struct{}) + geoProxySleep := func(time.Duration) { + geoProxySleepC <- struct{}{} + <-geoProxySleepC + } myConfigureRoutes := func(u *upstream) { // Enable environment variable "feature flag" u.enableGeoProxyFeature = enableGeoProxyFeature - // An empty message will be sent to this channel after every callGeoProxyAPI() - u.geoProxyTestChannel = geoProxyTestChannel + // Replace the time.Sleep function with geoProxySleep + u.geoProxyPollSleep = geoProxySleep // call original configureRoutes(u) @@ -231,13 +287,20 @@ func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*h ws := httptest.NewServer(upstreamHandler) testhelper.ConfigureSecret() + waitForNextApiPoll := func() {} + if enableGeoProxyFeature { - // Wait for an empty message from callGeoProxyAPI(). This should be done on - // all tests where enableGeoProxyFeature is true, including the ones where - // we expect geoProxyURL to be nil or error, to ensure the tests do not pass - // by coincidence. - <-geoProxyTestChannel + // Wait for geoProxySleep to be entered for the first time + <-geoProxySleepC + + waitForNextApiPoll = func() { + // Cause geoProxySleep to return + geoProxySleepC <- struct{}{} + + // Wait for geoProxySleep to be entered again + <-geoProxySleepC + } } - return ws, ws.Close + return ws, ws.Close, waitForNextApiPoll } diff --git a/workhorse/internal/zipartifacts/open_archive_test.go b/workhorse/internal/zipartifacts/open_archive_test.go index f7624d053d9..ea1fc606784 100644 --- a/workhorse/internal/zipartifacts/open_archive_test.go +++ b/workhorse/internal/zipartifacts/open_archive_test.go @@ -2,56 +2,124 @@ package zipartifacts import ( "archive/zip" + "bytes" "context" "fmt" - "io/ioutil" + "io" "net/http" "net/http/httptest" "os" "path/filepath" + "runtime" + "strings" "testing" "github.com/stretchr/testify/require" ) -func TestOpenHTTPArchive(t *testing.T) { - const ( - zipFile = "test.zip" - entryName = "hello.txt" - contents = "world" - testRoot = "testdata/public" - ) - - require.NoError(t, os.MkdirAll(testRoot, 0755)) - f, err := os.Create(filepath.Join(testRoot, zipFile)) - require.NoError(t, err, "create file") +func createArchive(t *testing.T, dir string) (map[string][]byte, int64) { + f, err := os.Create(filepath.Join(dir, "test.zip")) + require.NoError(t, err) defer f.Close() - zw := zip.NewWriter(f) - w, err := zw.Create(entryName) - require.NoError(t, err, "create zip entry") - _, err = fmt.Fprint(w, contents) - require.NoError(t, err, "write zip entry contents") - require.NoError(t, zw.Close(), "close zip writer") - require.NoError(t, f.Close(), "close file") - - srv := httptest.NewServer(http.FileServer(http.Dir(testRoot))) + + entries := make(map[string][]byte) + for _, size := range []int{0, 32 * 1024, 128 * 1024, 5 * 1024 * 1024} { + entryName := fmt.Sprintf("file_%d", size) + entries[entryName] = bytes.Repeat([]byte{'z'}, size) + + w, err := zw.Create(entryName) + require.NoError(t, err) + + _, err = w.Write(entries[entryName]) + require.NoError(t, err) + } + + require.NoError(t, zw.Close()) + fi, err := f.Stat() + require.NoError(t, err) + require.NoError(t, f.Close()) + + return entries, fi.Size() +} + +func TestOpenHTTPArchive(t *testing.T) { + dir := t.TempDir() + entries, _ := createArchive(t, dir) + + srv := httptest.NewServer(http.FileServer(http.Dir(dir))) defer srv.Close() - zr, err := OpenArchive(context.Background(), srv.URL+"/"+zipFile) - require.NoError(t, err, "call OpenArchive") - require.Len(t, zr.File, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + zr, err := OpenArchive(ctx, srv.URL+"/test.zip") + require.NoError(t, err) + require.Len(t, zr.File, len(entries)) - zf := zr.File[0] - require.Equal(t, entryName, zf.Name, "zip entry name") + for _, zf := range zr.File { + entry, ok := entries[zf.Name] + require.True(t, ok) - entry, err := zf.Open() - require.NoError(t, err, "get zip entry reader") - defer entry.Close() + r, err := zf.Open() + require.NoError(t, err) - actualContents, err := ioutil.ReadAll(entry) - require.NoError(t, err, "read zip entry contents") - require.Equal(t, contents, string(actualContents), "compare zip entry contents") + contents, err := io.ReadAll(r) + require.NoError(t, err) + require.Equal(t, entry, contents) + + require.NoError(t, r.Close()) + } +} + +func TestMinimalRangeRequests(t *testing.T) { + if strings.HasPrefix(runtime.Version(), "go1.17") { + t.Skipf("skipped for go1.17: https://gitlab.com/gitlab-org/gitlab/-/issues/340778") + } + + dir := t.TempDir() + entries, archiveSize := createArchive(t, dir) + + mux := http.NewServeMux() + + var ranges []string + mux.HandleFunc("/", func(rw http.ResponseWriter, r *http.Request) { + rangeHdr := r.Header.Get("Range") + if rangeHdr == "" { + rw.Header().Add("Content-Length", fmt.Sprintf("%d", archiveSize)) + return + } + + ranges = append(ranges, rangeHdr) + http.FileServer(http.Dir(dir)).ServeHTTP(rw, r) + }) + + srv := httptest.NewServer(mux) + defer srv.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + zr, err := OpenArchive(ctx, srv.URL+"/test.zip") + require.NoError(t, err) + require.Len(t, zr.File, len(entries)) + + require.Len(t, ranges, 2, "range requests should be minimal") + require.NotContains(t, ranges, "bytes=0-", "range request should not request from zero") + + for _, zf := range zr.File { + r, err := zf.Open() + require.NoError(t, err) + + _, err = io.Copy(io.Discard, r) + require.NoError(t, err) + + require.NoError(t, r.Close()) + } + + // ensure minimal requests: https://gitlab.com/gitlab-org/gitlab/-/issues/340778 + require.Len(t, ranges, 3, "range requests should be minimal") + require.Contains(t, ranges, "bytes=0-") } func TestOpenHTTPArchiveNotSendingAcceptEncodingHeader(t *testing.T) { @@ -64,5 +132,8 @@ func TestOpenHTTPArchiveNotSendingAcceptEncodingHeader(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(requestHandler)) defer srv.Close() - OpenArchive(context.Background(), srv.URL) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + OpenArchive(ctx, srv.URL) } |