summaryrefslogtreecommitdiff
path: root/workhorse
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2022-12-22 09:07:22 +0000
committerGitLab Bot <gitlab-bot@gitlab.com>2022-12-22 09:07:22 +0000
commit58e69d174512e267079ebb6afc60dd097070bf35 (patch)
treed7207e894f4813a57db661de9620b9f7728d1afb /workhorse
parentf8edcff7e9aff93f8ac605c19e542204b0ed9ba2 (diff)
downloadgitlab-ce-58e69d174512e267079ebb6afc60dd097070bf35.tar.gz
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse')
-rw-r--r--workhorse/internal/api/api.go2
-rw-r--r--workhorse/internal/upload/destination/destination.go8
-rw-r--r--workhorse/internal/upload/destination/destination_test.go75
-rw-r--r--workhorse/internal/upload/destination/filestore/filestore.go4
-rw-r--r--workhorse/internal/upload/destination/objectstore/uploader.go34
-rw-r--r--workhorse/internal/upload/destination/upload_opts.go3
-rw-r--r--workhorse/internal/upload/destination/upload_opts_test.go2
7 files changed, 95 insertions, 33 deletions
diff --git a/workhorse/internal/api/api.go b/workhorse/internal/api/api.go
index 1758bb5a6a8..92e88ae3f70 100644
--- a/workhorse/internal/api/api.go
+++ b/workhorse/internal/api/api.go
@@ -98,6 +98,8 @@ type RemoteObject struct {
GetURL string
// DeleteURL is a presigned S3 RemoveObject URL
DeleteURL string
+ // Whether Workhorse needs to delete the temporary object or not.
+ SkipDelete bool
// StoreURL is the temporary presigned S3 PutObject URL to which upload the first found file
StoreURL string
// Boolean to indicate whether to use headers included in PutHeaders
diff --git a/workhorse/internal/upload/destination/destination.go b/workhorse/internal/upload/destination/destination.go
index 5e145e2cb2a..0a7e2493cd6 100644
--- a/workhorse/internal/upload/destination/destination.go
+++ b/workhorse/internal/upload/destination/destination.go
@@ -108,6 +108,7 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) (map[string]string, e
type consumer interface {
Consume(context.Context, io.Reader, time.Time) (int64, error)
+ ConsumeWithoutDelete(context.Context, io.Reader, time.Time) (int64, error)
}
// Upload persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
@@ -185,7 +186,12 @@ func Upload(ctx context.Context, reader io.Reader, size int64, name string, opts
reader = hlr
}
- fh.Size, err = uploadDestination.Consume(ctx, reader, opts.Deadline)
+ if opts.SkipDelete {
+ fh.Size, err = uploadDestination.ConsumeWithoutDelete(ctx, reader, opts.Deadline)
+ } else {
+ fh.Size, err = uploadDestination.Consume(ctx, reader, opts.Deadline)
+ }
+
if err != nil {
if (err == objectstore.ErrNotEnoughParts) || (hlr != nil && hlr.n < 0) {
err = ErrEntityTooLarge
diff --git a/workhorse/internal/upload/destination/destination_test.go b/workhorse/internal/upload/destination/destination_test.go
index b355935e347..928ffdb9f5a 100644
--- a/workhorse/internal/upload/destination/destination_test.go
+++ b/workhorse/internal/upload/destination/destination_test.go
@@ -134,13 +134,17 @@ func TestUpload(t *testing.T) {
tmpFolder := t.TempDir()
tests := []struct {
- name string
- local bool
- remote remote
+ name string
+ local bool
+ remote remote
+ skipDelete bool
}{
{name: "Local only", local: true},
{name: "Remote Single only", remote: remoteSingle},
{name: "Remote Multipart only", remote: remoteMultipart},
+ {name: "Local only With SkipDelete", local: true, skipDelete: true},
+ {name: "Remote Single only With SkipDelete", remote: remoteSingle, skipDelete: true},
+ {name: "Remote Multipart only With SkipDelete", remote: remoteMultipart, skipDelete: true},
}
for _, spec := range tests {
@@ -183,6 +187,12 @@ func TestUpload(t *testing.T) {
opts.LocalTempPath = tmpFolder
}
+ opts.SkipDelete = spec.skipDelete
+
+ if opts.SkipDelete {
+ expectedDeletes = 0
+ }
+
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -343,31 +353,48 @@ func TestUploadWithUnknownGoCloudScheme(t *testing.T) {
}
func TestUploadMultipartInBodyFailure(t *testing.T) {
- osStub, ts := test.StartObjectStore()
- defer ts.Close()
-
- // this is a broken path because it contains bucket name but no key
- // this is the only way to get an in-body failure from our ObjectStoreStub
- objectPath := "/bucket-but-no-object-key"
- objectURL := ts.URL + objectPath
- opts := UploadOpts{
- RemoteID: "test-file",
- RemoteURL: objectURL,
- PartSize: test.ObjectSize,
- PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"},
- PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature",
- Deadline: testDeadline(),
+ tests := []struct {
+ name string
+ skipDelete bool
+ }{
+ {name: "With skipDelete false", skipDelete: false},
+ {name: "With skipDelete true", skipDelete: true},
}
- osStub.InitiateMultipartUpload(objectPath)
+ for _, spec := range tests {
+ t.Run(spec.name, func(t *testing.T) {
+ osStub, ts := test.StartObjectStore()
+ defer ts.Close()
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
+ // this is a broken path because it contains bucket name but no key
+ // this is the only way to get an in-body failure from our ObjectStoreStub
+ objectPath := "/bucket-but-no-object-key"
+ objectURL := ts.URL + objectPath
+ opts := UploadOpts{
+ RemoteID: "test-file",
+ RemoteURL: objectURL,
+ PartSize: test.ObjectSize,
+ PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"},
+ PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature",
+ PresignedDelete: objectURL + "?Signature=AnotherSignature",
+ Deadline: testDeadline(),
+ SkipDelete: spec.skipDelete,
+ }
- fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
- require.Nil(t, fh)
- require.Error(t, err)
- require.EqualError(t, err, test.MultipartUploadInternalError().Error())
+ osStub.InitiateMultipartUpload(objectPath)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
+ require.Nil(t, fh)
+ require.Error(t, err)
+ require.EqualError(t, err, test.MultipartUploadInternalError().Error())
+
+ cancel() // this will trigger an async cleanup
+ requireObjectStoreDeletedAsync(t, 1, osStub)
+ })
+ }
}
func TestUploadRemoteFileWithLimit(t *testing.T) {
diff --git a/workhorse/internal/upload/destination/filestore/filestore.go b/workhorse/internal/upload/destination/filestore/filestore.go
index 2d88874bf25..6b2d8270b51 100644
--- a/workhorse/internal/upload/destination/filestore/filestore.go
+++ b/workhorse/internal/upload/destination/filestore/filestore.go
@@ -19,3 +19,7 @@ func (lf *LocalFile) Consume(_ context.Context, r io.Reader, _ time.Time) (int64
}
return n, err
}
+
+func (lf *LocalFile) ConsumeWithoutDelete(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) {
+ return lf.Consume(outerCtx, reader, deadLine)
+}
diff --git a/workhorse/internal/upload/destination/objectstore/uploader.go b/workhorse/internal/upload/destination/objectstore/uploader.go
index 43e573872ee..798a693aa93 100644
--- a/workhorse/internal/upload/destination/objectstore/uploader.go
+++ b/workhorse/internal/upload/destination/objectstore/uploader.go
@@ -38,11 +38,21 @@ func newETagCheckUploader(strategy uploadStrategy, metrics bool) *uploader {
func hexString(h hash.Hash) string { return hex.EncodeToString(h.Sum(nil)) }
+func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) {
+ return u.consume(outerCtx, reader, deadLine, false)
+}
+
+func (u *uploader) ConsumeWithoutDelete(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) {
+ return u.consume(outerCtx, reader, deadLine, true)
+}
+
// Consume reads the reader until it reaches EOF or an error. It spawns a
// goroutine that waits for outerCtx to be done, after which the remote
// file is deleted. The deadline applies to the upload performed inside
// Consume, not to outerCtx.
-func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline time.Time) (_ int64, err error) {
+// SkipDelete optionaly call the Delete() function on the strategy once
+// rails is done handling the upload request.
+func (u *uploader) consume(outerCtx context.Context, reader io.Reader, deadLine time.Time, skipDelete bool) (_ int64, err error) {
if u.metrics {
objectStorageUploadsOpen.Inc()
defer func(started time.Time) {
@@ -59,17 +69,25 @@ func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline
// "delete" them.
if err != nil {
u.strategy.Abort()
+
+ if skipDelete {
+ // skipDelete avoided the object removal (see the goroutine below). Make
+ // here that the object is deleted if aborted.
+ u.strategy.Delete()
+ }
}
}()
- go func() {
- // Once gitlab-rails is done handling the request, we are supposed to
- // delete the upload from its temporary location.
- <-outerCtx.Done()
- u.strategy.Delete()
- }()
+ if !skipDelete {
+ go func() {
+ // Once gitlab-rails is done handling the request, we are supposed to
+ // delete the upload from its temporary location.
+ <-outerCtx.Done()
+ u.strategy.Delete()
+ }()
+ }
- uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadline)
+ uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadLine)
defer cancelFn()
var hasher hash.Hash
diff --git a/workhorse/internal/upload/destination/upload_opts.go b/workhorse/internal/upload/destination/upload_opts.go
index 58427b38b30..d81fa10e97c 100644
--- a/workhorse/internal/upload/destination/upload_opts.go
+++ b/workhorse/internal/upload/destination/upload_opts.go
@@ -39,6 +39,8 @@ type UploadOpts struct {
PresignedPut string
// PresignedDelete is a presigned S3 DeleteObject compatible URL.
PresignedDelete string
+ // Whether Workhorse needs to delete the temporary object or not.
+ SkipDelete bool
// HTTP headers to be sent along with PUT request
PutHeaders map[string]string
// Whether to ignore Rails pre-signed URLs and have Workhorse directly access object storage provider
@@ -95,6 +97,7 @@ func GetOpts(apiResponse *api.Response) (*UploadOpts, error) {
RemoteURL: apiResponse.RemoteObject.GetURL,
PresignedPut: apiResponse.RemoteObject.StoreURL,
PresignedDelete: apiResponse.RemoteObject.DeleteURL,
+ SkipDelete: apiResponse.RemoteObject.SkipDelete,
PutHeaders: apiResponse.RemoteObject.PutHeaders,
UseWorkhorseClient: apiResponse.RemoteObject.UseWorkhorseClient,
RemoteTempObjectID: apiResponse.RemoteObject.RemoteTempObjectID,
diff --git a/workhorse/internal/upload/destination/upload_opts_test.go b/workhorse/internal/upload/destination/upload_opts_test.go
index a420e842e4d..0fda3bd2381 100644
--- a/workhorse/internal/upload/destination/upload_opts_test.go
+++ b/workhorse/internal/upload/destination/upload_opts_test.go
@@ -102,6 +102,7 @@ func TestGetOpts(t *testing.T) {
MultipartUpload: test.multipart,
CustomPutHeaders: test.customPutHeaders,
PutHeaders: test.putHeaders,
+ SkipDelete: true,
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
@@ -111,6 +112,7 @@ func TestGetOpts(t *testing.T) {
require.Equal(t, apiResponse.TempPath, opts.LocalTempPath)
require.WithinDuration(t, deadline, opts.Deadline, time.Second)
require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID)
+ require.Equal(t, apiResponse.RemoteObject.SkipDelete, opts.SkipDelete)
require.Equal(t, apiResponse.RemoteObject.GetURL, opts.RemoteURL)
require.Equal(t, apiResponse.RemoteObject.StoreURL, opts.PresignedPut)
require.Equal(t, apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete)