From 58e69d174512e267079ebb6afc60dd097070bf35 Mon Sep 17 00:00:00 2001 From: GitLab Bot Date: Thu, 22 Dec 2022 09:07:22 +0000 Subject: Add latest changes from gitlab-org/gitlab@master --- workhorse/internal/api/api.go | 2 + .../internal/upload/destination/destination.go | 8 ++- .../upload/destination/destination_test.go | 75 +++++++++++++++------- .../upload/destination/filestore/filestore.go | 4 ++ .../upload/destination/objectstore/uploader.go | 34 +++++++--- .../internal/upload/destination/upload_opts.go | 3 + .../upload/destination/upload_opts_test.go | 2 + 7 files changed, 95 insertions(+), 33 deletions(-) (limited to 'workhorse') diff --git a/workhorse/internal/api/api.go b/workhorse/internal/api/api.go index 1758bb5a6a8..92e88ae3f70 100644 --- a/workhorse/internal/api/api.go +++ b/workhorse/internal/api/api.go @@ -98,6 +98,8 @@ type RemoteObject struct { GetURL string // DeleteURL is a presigned S3 RemoveObject URL DeleteURL string + // Whether Workhorse needs to delete the temporary object or not. + SkipDelete bool // StoreURL is the temporary presigned S3 PutObject URL to which upload the first found file StoreURL string // Boolean to indicate whether to use headers included in PutHeaders diff --git a/workhorse/internal/upload/destination/destination.go b/workhorse/internal/upload/destination/destination.go index 5e145e2cb2a..0a7e2493cd6 100644 --- a/workhorse/internal/upload/destination/destination.go +++ b/workhorse/internal/upload/destination/destination.go @@ -108,6 +108,7 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) (map[string]string, e type consumer interface { Consume(context.Context, io.Reader, time.Time) (int64, error) + ConsumeWithoutDelete(context.Context, io.Reader, time.Time) (int64, error) } // Upload persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done @@ -185,7 +186,12 @@ func Upload(ctx context.Context, reader io.Reader, size int64, name string, opts reader = hlr } - fh.Size, err = uploadDestination.Consume(ctx, reader, opts.Deadline) + if opts.SkipDelete { + fh.Size, err = uploadDestination.ConsumeWithoutDelete(ctx, reader, opts.Deadline) + } else { + fh.Size, err = uploadDestination.Consume(ctx, reader, opts.Deadline) + } + if err != nil { if (err == objectstore.ErrNotEnoughParts) || (hlr != nil && hlr.n < 0) { err = ErrEntityTooLarge diff --git a/workhorse/internal/upload/destination/destination_test.go b/workhorse/internal/upload/destination/destination_test.go index b355935e347..928ffdb9f5a 100644 --- a/workhorse/internal/upload/destination/destination_test.go +++ b/workhorse/internal/upload/destination/destination_test.go @@ -134,13 +134,17 @@ func TestUpload(t *testing.T) { tmpFolder := t.TempDir() tests := []struct { - name string - local bool - remote remote + name string + local bool + remote remote + skipDelete bool }{ {name: "Local only", local: true}, {name: "Remote Single only", remote: remoteSingle}, {name: "Remote Multipart only", remote: remoteMultipart}, + {name: "Local only With SkipDelete", local: true, skipDelete: true}, + {name: "Remote Single only With SkipDelete", remote: remoteSingle, skipDelete: true}, + {name: "Remote Multipart only With SkipDelete", remote: remoteMultipart, skipDelete: true}, } for _, spec := range tests { @@ -183,6 +187,12 @@ func TestUpload(t *testing.T) { opts.LocalTempPath = tmpFolder } + opts.SkipDelete = spec.skipDelete + + if opts.SkipDelete { + expectedDeletes = 0 + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -343,31 +353,48 @@ func TestUploadWithUnknownGoCloudScheme(t *testing.T) { } func TestUploadMultipartInBodyFailure(t *testing.T) { - osStub, ts := test.StartObjectStore() - defer ts.Close() - - // this is a broken path because it contains bucket name but no key - // this is the only way to get an in-body failure from our ObjectStoreStub - objectPath := "/bucket-but-no-object-key" - objectURL := ts.URL + objectPath - opts := UploadOpts{ - RemoteID: "test-file", - RemoteURL: objectURL, - PartSize: test.ObjectSize, - PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"}, - PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature", - Deadline: testDeadline(), + tests := []struct { + name string + skipDelete bool + }{ + {name: "With skipDelete false", skipDelete: false}, + {name: "With skipDelete true", skipDelete: true}, } - osStub.InitiateMultipartUpload(objectPath) + for _, spec := range tests { + t.Run(spec.name, func(t *testing.T) { + osStub, ts := test.StartObjectStore() + defer ts.Close() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + // this is a broken path because it contains bucket name but no key + // this is the only way to get an in-body failure from our ObjectStoreStub + objectPath := "/bucket-but-no-object-key" + objectURL := ts.URL + objectPath + opts := UploadOpts{ + RemoteID: "test-file", + RemoteURL: objectURL, + PartSize: test.ObjectSize, + PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"}, + PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature", + PresignedDelete: objectURL + "?Signature=AnotherSignature", + Deadline: testDeadline(), + SkipDelete: spec.skipDelete, + } - fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts) - require.Nil(t, fh) - require.Error(t, err) - require.EqualError(t, err, test.MultipartUploadInternalError().Error()) + osStub.InitiateMultipartUpload(objectPath) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts) + require.Nil(t, fh) + require.Error(t, err) + require.EqualError(t, err, test.MultipartUploadInternalError().Error()) + + cancel() // this will trigger an async cleanup + requireObjectStoreDeletedAsync(t, 1, osStub) + }) + } } func TestUploadRemoteFileWithLimit(t *testing.T) { diff --git a/workhorse/internal/upload/destination/filestore/filestore.go b/workhorse/internal/upload/destination/filestore/filestore.go index 2d88874bf25..6b2d8270b51 100644 --- a/workhorse/internal/upload/destination/filestore/filestore.go +++ b/workhorse/internal/upload/destination/filestore/filestore.go @@ -19,3 +19,7 @@ func (lf *LocalFile) Consume(_ context.Context, r io.Reader, _ time.Time) (int64 } return n, err } + +func (lf *LocalFile) ConsumeWithoutDelete(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) { + return lf.Consume(outerCtx, reader, deadLine) +} diff --git a/workhorse/internal/upload/destination/objectstore/uploader.go b/workhorse/internal/upload/destination/objectstore/uploader.go index 43e573872ee..798a693aa93 100644 --- a/workhorse/internal/upload/destination/objectstore/uploader.go +++ b/workhorse/internal/upload/destination/objectstore/uploader.go @@ -38,11 +38,21 @@ func newETagCheckUploader(strategy uploadStrategy, metrics bool) *uploader { func hexString(h hash.Hash) string { return hex.EncodeToString(h.Sum(nil)) } +func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) { + return u.consume(outerCtx, reader, deadLine, false) +} + +func (u *uploader) ConsumeWithoutDelete(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) { + return u.consume(outerCtx, reader, deadLine, true) +} + // Consume reads the reader until it reaches EOF or an error. It spawns a // goroutine that waits for outerCtx to be done, after which the remote // file is deleted. The deadline applies to the upload performed inside // Consume, not to outerCtx. -func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline time.Time) (_ int64, err error) { +// SkipDelete optionaly call the Delete() function on the strategy once +// rails is done handling the upload request. +func (u *uploader) consume(outerCtx context.Context, reader io.Reader, deadLine time.Time, skipDelete bool) (_ int64, err error) { if u.metrics { objectStorageUploadsOpen.Inc() defer func(started time.Time) { @@ -59,17 +69,25 @@ func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline // "delete" them. if err != nil { u.strategy.Abort() + + if skipDelete { + // skipDelete avoided the object removal (see the goroutine below). Make + // here that the object is deleted if aborted. + u.strategy.Delete() + } } }() - go func() { - // Once gitlab-rails is done handling the request, we are supposed to - // delete the upload from its temporary location. - <-outerCtx.Done() - u.strategy.Delete() - }() + if !skipDelete { + go func() { + // Once gitlab-rails is done handling the request, we are supposed to + // delete the upload from its temporary location. + <-outerCtx.Done() + u.strategy.Delete() + }() + } - uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadline) + uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadLine) defer cancelFn() var hasher hash.Hash diff --git a/workhorse/internal/upload/destination/upload_opts.go b/workhorse/internal/upload/destination/upload_opts.go index 58427b38b30..d81fa10e97c 100644 --- a/workhorse/internal/upload/destination/upload_opts.go +++ b/workhorse/internal/upload/destination/upload_opts.go @@ -39,6 +39,8 @@ type UploadOpts struct { PresignedPut string // PresignedDelete is a presigned S3 DeleteObject compatible URL. PresignedDelete string + // Whether Workhorse needs to delete the temporary object or not. + SkipDelete bool // HTTP headers to be sent along with PUT request PutHeaders map[string]string // Whether to ignore Rails pre-signed URLs and have Workhorse directly access object storage provider @@ -95,6 +97,7 @@ func GetOpts(apiResponse *api.Response) (*UploadOpts, error) { RemoteURL: apiResponse.RemoteObject.GetURL, PresignedPut: apiResponse.RemoteObject.StoreURL, PresignedDelete: apiResponse.RemoteObject.DeleteURL, + SkipDelete: apiResponse.RemoteObject.SkipDelete, PutHeaders: apiResponse.RemoteObject.PutHeaders, UseWorkhorseClient: apiResponse.RemoteObject.UseWorkhorseClient, RemoteTempObjectID: apiResponse.RemoteObject.RemoteTempObjectID, diff --git a/workhorse/internal/upload/destination/upload_opts_test.go b/workhorse/internal/upload/destination/upload_opts_test.go index a420e842e4d..0fda3bd2381 100644 --- a/workhorse/internal/upload/destination/upload_opts_test.go +++ b/workhorse/internal/upload/destination/upload_opts_test.go @@ -102,6 +102,7 @@ func TestGetOpts(t *testing.T) { MultipartUpload: test.multipart, CustomPutHeaders: test.customPutHeaders, PutHeaders: test.putHeaders, + SkipDelete: true, }, } deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) @@ -111,6 +112,7 @@ func TestGetOpts(t *testing.T) { require.Equal(t, apiResponse.TempPath, opts.LocalTempPath) require.WithinDuration(t, deadline, opts.Deadline, time.Second) require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID) + require.Equal(t, apiResponse.RemoteObject.SkipDelete, opts.SkipDelete) require.Equal(t, apiResponse.RemoteObject.GetURL, opts.RemoteURL) require.Equal(t, apiResponse.RemoteObject.StoreURL, opts.PresignedPut) require.Equal(t, apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete) -- cgit v1.2.1