diff options
Diffstat (limited to 'workhorse/internal/upload')
19 files changed, 295 insertions, 204 deletions
diff --git a/workhorse/internal/upload/body_uploader_test.go b/workhorse/internal/upload/body_uploader_test.go index eff33757845..837d119e72e 100644 --- a/workhorse/internal/upload/body_uploader_test.go +++ b/workhorse/internal/upload/body_uploader_test.go @@ -92,11 +92,7 @@ func echoProxy(t *testing.T, expectedBodyLength int) http.Handler { require.Equal(t, "application/x-www-form-urlencoded", r.Header.Get("Content-Type"), "Wrong Content-Type header") - if destination.FIPSEnabled() { - require.NotContains(t, r.PostForm, "file.md5") - } else { - require.Contains(t, r.PostForm, "file.md5") - } + require.Contains(t, r.PostForm, "file.md5") require.Contains(t, r.PostForm, "file.sha1") require.Contains(t, r.PostForm, "file.sha256") require.Contains(t, r.PostForm, "file.sha512") @@ -123,11 +119,7 @@ func echoProxy(t *testing.T, expectedBodyLength int) http.Handler { require.Contains(t, uploadFields, "remote_url") require.Contains(t, uploadFields, "remote_id") require.Contains(t, uploadFields, "size") - if destination.FIPSEnabled() { - require.NotContains(t, uploadFields, "md5") - } else { - require.Contains(t, uploadFields, "md5") - } + require.Contains(t, uploadFields, "md5") require.Contains(t, uploadFields, "sha1") require.Contains(t, uploadFields, "sha256") require.Contains(t, uploadFields, "sha512") diff --git a/workhorse/internal/upload/destination/destination.go b/workhorse/internal/upload/destination/destination.go index 5e145e2cb2a..a9fb81540d5 100644 --- a/workhorse/internal/upload/destination/destination.go +++ b/workhorse/internal/upload/destination/destination.go @@ -108,6 +108,7 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) (map[string]string, e type consumer interface { Consume(context.Context, io.Reader, time.Time) (int64, error) + ConsumeWithoutDelete(context.Context, io.Reader, time.Time) (int64, error) } // Upload persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done @@ -120,7 +121,7 @@ func Upload(ctx context.Context, reader io.Reader, size int64, name string, opts } uploadStartTime := time.Now() defer func() { fh.uploadDuration = time.Since(uploadStartTime).Seconds() }() - hashes := newMultiHash() + hashes := newMultiHash(opts.UploadHashFunctions) reader = io.TeeReader(reader, hashes.Writer) var clientMode string @@ -185,7 +186,12 @@ func Upload(ctx context.Context, reader io.Reader, size int64, name string, opts reader = hlr } - fh.Size, err = uploadDestination.Consume(ctx, reader, opts.Deadline) + if opts.SkipDelete { + fh.Size, err = uploadDestination.ConsumeWithoutDelete(ctx, reader, opts.Deadline) + } else { + fh.Size, err = uploadDestination.Consume(ctx, reader, opts.Deadline) + } + if err != nil { if (err == objectstore.ErrNotEnoughParts) || (hlr != nil && hlr.n < 0) { err = ErrEntityTooLarge diff --git a/workhorse/internal/upload/destination/destination_test.go b/workhorse/internal/upload/destination/destination_test.go index 97645be168f..69dd02ca7c2 100644 --- a/workhorse/internal/upload/destination/destination_test.go +++ b/workhorse/internal/upload/destination/destination_test.go @@ -1,4 +1,4 @@ -package destination_test +package destination import ( "context" @@ -17,12 +17,11 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" ) func testDeadline() time.Time { - return time.Now().Add(destination.DefaultObjectStoreTimeout) + return time.Now().Add(DefaultObjectStoreTimeout) } func requireFileGetsRemovedAsync(t *testing.T, filePath string) { @@ -44,10 +43,10 @@ func TestUploadWrongSize(t *testing.T) { tmpFolder := t.TempDir() - opts := &destination.UploadOpts{LocalTempPath: tmpFolder} - fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, "upload", opts) + opts := &UploadOpts{LocalTempPath: tmpFolder} + fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, "upload", opts) require.Error(t, err) - _, isSizeError := err.(destination.SizeError) + _, isSizeError := err.(SizeError) require.True(t, isSizeError, "Should fail with SizeError") require.Nil(t, fh) } @@ -58,10 +57,10 @@ func TestUploadWithKnownSizeExceedLimit(t *testing.T) { tmpFolder := t.TempDir() - opts := &destination.UploadOpts{LocalTempPath: tmpFolder, MaximumSize: test.ObjectSize - 1} - fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", opts) + opts := &UploadOpts{LocalTempPath: tmpFolder, MaximumSize: test.ObjectSize - 1} + fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", opts) require.Error(t, err) - _, isSizeError := err.(destination.SizeError) + _, isSizeError := err.(SizeError) require.True(t, isSizeError, "Should fail with SizeError") require.Nil(t, fh) } @@ -72,9 +71,9 @@ func TestUploadWithUnknownSizeExceedLimit(t *testing.T) { tmpFolder := t.TempDir() - opts := &destination.UploadOpts{LocalTempPath: tmpFolder, MaximumSize: test.ObjectSize - 1} - fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), -1, "upload", opts) - require.Equal(t, err, destination.ErrEntityTooLarge) + opts := &UploadOpts{LocalTempPath: tmpFolder, MaximumSize: test.ObjectSize - 1} + fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), -1, "upload", opts) + require.Equal(t, err, ErrEntityTooLarge) require.Nil(t, fh) } @@ -94,7 +93,7 @@ func TestUploadWrongETag(t *testing.T) { objectURL := ts.URL + test.ObjectPath - opts := &destination.UploadOpts{ + opts := &UploadOpts{ RemoteID: "test-file", RemoteURL: objectURL, PresignedPut: objectURL + "?Signature=ASignature", @@ -110,7 +109,7 @@ func TestUploadWrongETag(t *testing.T) { osStub.InitiateMultipartUpload(test.ObjectPath) } ctx, cancel := context.WithCancel(context.Background()) - fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", opts) + fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", opts) require.Nil(t, fh) require.Error(t, err) require.Equal(t, 1, osStub.PutsCnt(), "File not uploaded") @@ -135,18 +134,22 @@ func TestUpload(t *testing.T) { tmpFolder := t.TempDir() tests := []struct { - name string - local bool - remote remote + name string + local bool + remote remote + skipDelete bool }{ {name: "Local only", local: true}, {name: "Remote Single only", remote: remoteSingle}, {name: "Remote Multipart only", remote: remoteMultipart}, + {name: "Local only With SkipDelete", local: true, skipDelete: true}, + {name: "Remote Single only With SkipDelete", remote: remoteSingle, skipDelete: true}, + {name: "Remote Multipart only With SkipDelete", remote: remoteMultipart, skipDelete: true}, } for _, spec := range tests { t.Run(spec.name, func(t *testing.T) { - var opts destination.UploadOpts + var opts UploadOpts var expectedDeletes, expectedPuts int osStub, ts := test.StartObjectStore() @@ -184,10 +187,16 @@ func TestUpload(t *testing.T) { opts.LocalTempPath = tmpFolder } + opts.SkipDelete = spec.skipDelete + + if opts.SkipDelete { + expectedDeletes = 0 + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts) + fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts) require.NoError(t, err) require.NotNil(t, fh) @@ -206,11 +215,7 @@ func TestUpload(t *testing.T) { } require.Equal(t, test.ObjectSize, fh.Size) - if destination.FIPSEnabled() { - require.Empty(t, fh.MD5()) - } else { - require.Equal(t, test.ObjectMD5, fh.MD5()) - } + require.Equal(t, test.ObjectMD5, fh.MD5()) require.Equal(t, test.ObjectSHA256, fh.SHA256()) require.Equal(t, expectedPuts, osStub.PutsCnt(), "ObjectStore PutObject count mismatch") @@ -255,7 +260,7 @@ func TestUploadWithS3WorkhorseClient(t *testing.T) { name: "unknown object size with limit", objectSize: -1, maxSize: test.ObjectSize - 1, - expectedErr: destination.ErrEntityTooLarge, + expectedErr: ErrEntityTooLarge, }, } @@ -269,12 +274,12 @@ func TestUploadWithS3WorkhorseClient(t *testing.T) { defer cancel() remoteObject := "tmp/test-file/1" - opts := destination.UploadOpts{ + opts := UploadOpts{ RemoteID: "test-file", Deadline: testDeadline(), UseWorkhorseClient: true, RemoteTempObjectID: remoteObject, - ObjectStorageConfig: destination.ObjectStorageConfig{ + ObjectStorageConfig: ObjectStorageConfig{ Provider: "AWS", S3Credentials: s3Creds, S3Config: s3Config, @@ -282,7 +287,7 @@ func TestUploadWithS3WorkhorseClient(t *testing.T) { MaximumSize: tc.maxSize, } - _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), tc.objectSize, "upload", &opts) + _, err := Upload(ctx, strings.NewReader(test.ObjectContent), tc.objectSize, "upload", &opts) if tc.expectedErr == nil { require.NoError(t, err) @@ -302,19 +307,19 @@ func TestUploadWithAzureWorkhorseClient(t *testing.T) { defer cancel() remoteObject := "tmp/test-file/1" - opts := destination.UploadOpts{ + opts := UploadOpts{ RemoteID: "test-file", Deadline: testDeadline(), UseWorkhorseClient: true, RemoteTempObjectID: remoteObject, - ObjectStorageConfig: destination.ObjectStorageConfig{ + ObjectStorageConfig: ObjectStorageConfig{ Provider: "AzureRM", URLMux: mux, GoCloudConfig: config.GoCloudConfig{URL: "azblob://test-container"}, }, } - _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts) + _, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts) require.NoError(t, err) test.GoCloudObjectExists(t, bucketDir, remoteObject) @@ -327,48 +332,65 @@ func TestUploadWithUnknownGoCloudScheme(t *testing.T) { mux := new(blob.URLMux) remoteObject := "tmp/test-file/1" - opts := destination.UploadOpts{ + opts := UploadOpts{ RemoteID: "test-file", Deadline: testDeadline(), UseWorkhorseClient: true, RemoteTempObjectID: remoteObject, - ObjectStorageConfig: destination.ObjectStorageConfig{ + ObjectStorageConfig: ObjectStorageConfig{ Provider: "SomeCloud", URLMux: mux, GoCloudConfig: config.GoCloudConfig{URL: "foo://test-container"}, }, } - _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts) + _, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts) require.Error(t, err) } func TestUploadMultipartInBodyFailure(t *testing.T) { - osStub, ts := test.StartObjectStore() - defer ts.Close() - - // this is a broken path because it contains bucket name but no key - // this is the only way to get an in-body failure from our ObjectStoreStub - objectPath := "/bucket-but-no-object-key" - objectURL := ts.URL + objectPath - opts := destination.UploadOpts{ - RemoteID: "test-file", - RemoteURL: objectURL, - PartSize: test.ObjectSize, - PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"}, - PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature", - Deadline: testDeadline(), + tests := []struct { + name string + skipDelete bool + }{ + {name: "With skipDelete false", skipDelete: false}, + {name: "With skipDelete true", skipDelete: true}, } - osStub.InitiateMultipartUpload(objectPath) + for _, spec := range tests { + t.Run(spec.name, func(t *testing.T) { + osStub, ts := test.StartObjectStore() + defer ts.Close() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + // this is a broken path because it contains bucket name but no key + // this is the only way to get an in-body failure from our ObjectStoreStub + objectPath := "/bucket-but-no-object-key" + objectURL := ts.URL + objectPath + opts := UploadOpts{ + RemoteID: "test-file", + RemoteURL: objectURL, + PartSize: test.ObjectSize, + PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"}, + PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature", + PresignedDelete: objectURL + "?Signature=AnotherSignature", + Deadline: testDeadline(), + SkipDelete: spec.skipDelete, + } - fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts) - require.Nil(t, fh) - require.Error(t, err) - require.EqualError(t, err, test.MultipartUploadInternalError().Error()) + osStub.InitiateMultipartUpload(objectPath) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts) + require.Nil(t, fh) + require.Error(t, err) + require.EqualError(t, err, test.MultipartUploadInternalError().Error()) + + cancel() // this will trigger an async cleanup + requireObjectStoreDeletedAsync(t, 1, osStub) + }) + } } func TestUploadRemoteFileWithLimit(t *testing.T) { @@ -405,20 +427,20 @@ func TestUploadRemoteFileWithLimit(t *testing.T) { testData: test.ObjectContent, objectSize: -1, maxSize: test.ObjectSize - 1, - expectedErr: destination.ErrEntityTooLarge, + expectedErr: ErrEntityTooLarge, }, { name: "large object with unknown size with limit", testData: string(make([]byte, 20000)), objectSize: -1, maxSize: 19000, - expectedErr: destination.ErrEntityTooLarge, + expectedErr: ErrEntityTooLarge, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - var opts destination.UploadOpts + var opts UploadOpts for _, remoteType := range remoteTypes { osStub, ts := test.StartObjectStore() @@ -454,7 +476,7 @@ func TestUploadRemoteFileWithLimit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fh, err := destination.Upload(ctx, strings.NewReader(tc.testData), tc.objectSize, "upload", &opts) + fh, err := Upload(ctx, strings.NewReader(tc.testData), tc.objectSize, "upload", &opts) if tc.expectedErr == nil { require.NoError(t, err) @@ -468,7 +490,7 @@ func TestUploadRemoteFileWithLimit(t *testing.T) { } } -func checkFileHandlerWithFields(t *testing.T, fh *destination.FileHandler, fields map[string]string, prefix string) { +func checkFileHandlerWithFields(t *testing.T, fh *FileHandler, fields map[string]string, prefix string) { key := func(field string) string { if prefix == "" { return field @@ -482,11 +504,7 @@ func checkFileHandlerWithFields(t *testing.T, fh *destination.FileHandler, field require.Equal(t, fh.RemoteURL, fields[key("remote_url")]) require.Equal(t, fh.RemoteID, fields[key("remote_id")]) require.Equal(t, strconv.FormatInt(test.ObjectSize, 10), fields[key("size")]) - if destination.FIPSEnabled() { - require.Empty(t, fields[key("md5")]) - } else { - require.Equal(t, test.ObjectMD5, fields[key("md5")]) - } + require.Equal(t, test.ObjectMD5, fields[key("md5")]) require.Equal(t, test.ObjectSHA1, fields[key("sha1")]) require.Equal(t, test.ObjectSHA256, fields[key("sha256")]) require.Equal(t, test.ObjectSHA512, fields[key("sha512")]) diff --git a/workhorse/internal/upload/destination/filestore/filestore.go b/workhorse/internal/upload/destination/filestore/filestore.go index 2d88874bf25..6b2d8270b51 100644 --- a/workhorse/internal/upload/destination/filestore/filestore.go +++ b/workhorse/internal/upload/destination/filestore/filestore.go @@ -19,3 +19,7 @@ func (lf *LocalFile) Consume(_ context.Context, r io.Reader, _ time.Time) (int64 } return n, err } + +func (lf *LocalFile) ConsumeWithoutDelete(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) { + return lf.Consume(outerCtx, reader, deadLine) +} diff --git a/workhorse/internal/upload/destination/multi_hash.go b/workhorse/internal/upload/destination/multi_hash.go index 8d5bf4424a8..3f8b0cbd903 100644 --- a/workhorse/internal/upload/destination/multi_hash.go +++ b/workhorse/internal/upload/destination/multi_hash.go @@ -8,9 +8,6 @@ import ( "encoding/hex" "hash" "io" - "os" - - "gitlab.com/gitlab-org/labkit/fips" ) var hashFactories = map[string](func() hash.Hash){ @@ -20,39 +17,39 @@ var hashFactories = map[string](func() hash.Hash){ "sha512": sha512.New, } -var fipsHashFactories = map[string](func() hash.Hash){ - "sha1": sha1.New, - "sha256": sha256.New, - "sha512": sha512.New, -} - func factories() map[string](func() hash.Hash) { - if FIPSEnabled() { - return fipsHashFactories - } - return hashFactories } -func FIPSEnabled() bool { - if fips.Enabled() { +type multiHash struct { + io.Writer + hashes map[string]hash.Hash +} + +func permittedHashFunction(hashFunctions []string, hash string) bool { + if len(hashFunctions) == 0 { return true } - return os.Getenv("WORKHORSE_TEST_FIPS_ENABLED") == "1" -} + for _, name := range hashFunctions { + if name == hash { + return true + } + } -type multiHash struct { - io.Writer - hashes map[string]hash.Hash + return false } -func newMultiHash() (m *multiHash) { +func newMultiHash(hashFunctions []string) (m *multiHash) { m = &multiHash{} m.hashes = make(map[string]hash.Hash) var writers []io.Writer for hash, hashFactory := range factories() { + if !permittedHashFunction(hashFunctions, hash) { + continue + } + writer := hashFactory() m.hashes[hash] = writer diff --git a/workhorse/internal/upload/destination/multi_hash_test.go b/workhorse/internal/upload/destination/multi_hash_test.go new file mode 100644 index 00000000000..9a976f5d25d --- /dev/null +++ b/workhorse/internal/upload/destination/multi_hash_test.go @@ -0,0 +1,52 @@ +package destination + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewMultiHash(t *testing.T) { + tests := []struct { + name string + allowedHashes []string + expectedHashes []string + }{ + { + name: "default", + allowedHashes: nil, + expectedHashes: []string{"md5", "sha1", "sha256", "sha512"}, + }, + { + name: "blank", + allowedHashes: []string{}, + expectedHashes: []string{"md5", "sha1", "sha256", "sha512"}, + }, + { + name: "no MD5", + allowedHashes: []string{"sha1", "sha256", "sha512"}, + expectedHashes: []string{"sha1", "sha256", "sha512"}, + }, + + { + name: "unlisted hash", + allowedHashes: []string{"sha1", "sha256", "sha512", "sha3-256"}, + expectedHashes: []string{"sha1", "sha256", "sha512"}, + }, + } + + for _, test := range tests { + mh := newMultiHash(test.allowedHashes) + + require.Equal(t, len(test.expectedHashes), len(mh.hashes)) + + var keys []string + for key := range mh.hashes { + keys = append(keys, key) + } + + sort.Strings(keys) + require.Equal(t, test.expectedHashes, keys) + } +} diff --git a/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go b/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go index 55d886087be..5a6a4b90b34 100644 --- a/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go +++ b/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go @@ -1,4 +1,4 @@ -package objectstore_test +package objectstore import ( "context" @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" ) @@ -22,8 +21,8 @@ func TestGoCloudObjectUpload(t *testing.T) { objectName := "test.png" testURL := "azuretest://azure.example.com/test-container" - p := &objectstore.GoCloudObjectParams{Ctx: ctx, Mux: mux, BucketURL: testURL, ObjectName: objectName} - object, err := objectstore.NewGoCloudObject(p) + p := &GoCloudObjectParams{Ctx: ctx, Mux: mux, BucketURL: testURL, ObjectName: objectName} + object, err := NewGoCloudObject(p) require.NotNil(t, object) require.NoError(t, err) @@ -48,8 +47,8 @@ func TestGoCloudObjectUpload(t *testing.T) { if exists { return fmt.Errorf("file %s is still present", objectName) - } else { - return nil } + + return nil }) } diff --git a/workhorse/internal/upload/destination/objectstore/multipart.go b/workhorse/internal/upload/destination/objectstore/multipart.go index df336d2d901..900ca040dad 100644 --- a/workhorse/internal/upload/destination/objectstore/multipart.go +++ b/workhorse/internal/upload/destination/objectstore/multipart.go @@ -11,6 +11,8 @@ import ( "os" "gitlab.com/gitlab-org/labkit/mask" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/s3api" ) // ErrNotEnoughParts will be used when writing more than size * len(partURLs) @@ -51,7 +53,7 @@ func NewMultipart(partURLs []string, completeURL, abortURL, deleteURL string, pu } func (m *Multipart) Upload(ctx context.Context, r io.Reader) error { - cmu := &CompleteMultipartUpload{} + cmu := &s3api.CompleteMultipartUpload{} for i, partURL := range m.PartURLs { src := io.LimitReader(r, m.partSize) part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1) @@ -91,7 +93,7 @@ func (m *Multipart) Delete() { deleteURL(m.DeleteURL) } -func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { +func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*s3api.CompleteMultipartUploadPart, error) { file, err := os.CreateTemp("", "part-buffer") if err != nil { return nil, fmt.Errorf("create temporary buffer file: %v", err) @@ -118,7 +120,7 @@ func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, pu if err != nil { return nil, fmt.Errorf("upload part %d: %v", partNumber, err) } - return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil + return &s3api.CompleteMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil } func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) { @@ -142,7 +144,7 @@ func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[stri return part.ETag(), nil } -func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error { +func (m *Multipart) complete(ctx context.Context, cmu *s3api.CompleteMultipartUpload) error { body, err := xml.Marshal(cmu) if err != nil { return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err) diff --git a/workhorse/internal/upload/destination/objectstore/multipart_test.go b/workhorse/internal/upload/destination/objectstore/multipart_test.go index 2a5161e42e7..00244a5c50b 100644 --- a/workhorse/internal/upload/destination/objectstore/multipart_test.go +++ b/workhorse/internal/upload/destination/objectstore/multipart_test.go @@ -1,4 +1,4 @@ -package objectstore_test +package objectstore import ( "context" @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" ) @@ -48,7 +47,7 @@ func TestMultipartUploadWithUpcaseETags(t *testing.T) { deadline := time.Now().Add(testTimeout) - m, err := objectstore.NewMultipart( + m, err := NewMultipart( []string{ts.URL}, // a single presigned part URL ts.URL, // the complete multipart upload URL "", // no abort diff --git a/workhorse/internal/upload/destination/objectstore/object_test.go b/workhorse/internal/upload/destination/objectstore/object_test.go index 24117891b6d..2b94cd9e3b1 100644 --- a/workhorse/internal/upload/destination/objectstore/object_test.go +++ b/workhorse/internal/upload/destination/objectstore/object_test.go @@ -1,4 +1,4 @@ -package objectstore_test +package objectstore import ( "context" @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" ) @@ -35,7 +34,7 @@ func testObjectUploadNoErrors(t *testing.T, startObjectStore osFactory, useDelet defer cancel() deadline := time.Now().Add(testTimeout) - object, err := objectstore.NewObject(objectURL, deleteURL, putHeaders, test.ObjectSize) + object, err := NewObject(objectURL, deleteURL, putHeaders, test.ObjectSize) require.NoError(t, err) // copy data @@ -97,12 +96,12 @@ func TestObjectUpload404(t *testing.T) { deadline := time.Now().Add(testTimeout) objectURL := ts.URL + test.ObjectPath - object, err := objectstore.NewObject(objectURL, "", map[string]string{}, test.ObjectSize) + object, err := NewObject(objectURL, "", map[string]string{}, test.ObjectSize) require.NoError(t, err) _, err = object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) require.Error(t, err) - _, isStatusCodeError := err.(objectstore.StatusCodeError) + _, isStatusCodeError := err.(StatusCodeError) require.True(t, isStatusCodeError, "Should fail with StatusCodeError") require.Contains(t, err.Error(), "404") } @@ -140,7 +139,7 @@ func TestObjectUploadBrokenConnection(t *testing.T) { deadline := time.Now().Add(testTimeout) objectURL := ts.URL + test.ObjectPath - object, err := objectstore.NewObject(objectURL, "", map[string]string{}, -1) + object, err := NewObject(objectURL, "", map[string]string{}, -1) require.NoError(t, err) _, copyErr := object.Consume(ctx, &endlessReader{}, deadline) diff --git a/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go b/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go index b84f5757f49..02799d0b9b0 100644 --- a/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go +++ b/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go @@ -2,45 +2,15 @@ package objectstore import ( "encoding/xml" - "fmt" -) - -// CompleteMultipartUpload is the S3 CompleteMultipartUpload body -type CompleteMultipartUpload struct { - Part []*completeMultipartUploadPart -} -type completeMultipartUploadPart struct { - PartNumber int - ETag string -} - -// CompleteMultipartUploadResult is the S3 answer to CompleteMultipartUpload request -type CompleteMultipartUploadResult struct { - Location string - Bucket string - Key string - ETag string -} - -// CompleteMultipartUploadError is the in-body error structure -// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html#mpUploadComplete-examples -// the answer contains other fields we are not using -type CompleteMultipartUploadError struct { - XMLName xml.Name `xml:"Error"` - Code string - Message string -} - -func (c *CompleteMultipartUploadError) Error() string { - return fmt.Sprintf("CompleteMultipartUpload remote error %q: %s", c.Code, c.Message) -} + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/s3api" +) // compoundCompleteMultipartUploadResult holds both CompleteMultipartUploadResult and CompleteMultipartUploadError // this allow us to deserialize the response body where the root element can either be Error orCompleteMultipartUploadResult type compoundCompleteMultipartUploadResult struct { - *CompleteMultipartUploadResult - *CompleteMultipartUploadError + *s3api.CompleteMultipartUploadResult + *s3api.CompleteMultipartUploadError // XMLName this overrides CompleteMultipartUploadError.XMLName tags XMLName xml.Name diff --git a/workhorse/internal/upload/destination/objectstore/s3_object_test.go b/workhorse/internal/upload/destination/objectstore/s3_object_test.go index 0ed14a2e844..c99712d18ad 100644 --- a/workhorse/internal/upload/destination/objectstore/s3_object_test.go +++ b/workhorse/internal/upload/destination/objectstore/s3_object_test.go @@ -1,4 +1,4 @@ -package objectstore_test +package objectstore import ( "context" @@ -17,7 +17,6 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" ) @@ -50,7 +49,7 @@ func TestS3ObjectUpload(t *testing.T) { objectName := filepath.Join(tmpDir, "s3-test-data") ctx, cancel := context.WithCancel(context.Background()) - object, err := objectstore.NewS3Object(objectName, creds, config) + object, err := NewS3Object(objectName, creds, config) require.NoError(t, err) // copy data @@ -107,7 +106,7 @@ func TestConcurrentS3ObjectUpload(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - object, err := objectstore.NewS3Object(objectName, creds, config) + object, err := NewS3Object(objectName, creds, config) require.NoError(t, err) // copy data @@ -134,7 +133,7 @@ func TestS3ObjectUploadCancel(t *testing.T) { objectName := filepath.Join(tmpDir, "s3-test-data") - object, err := objectstore.NewS3Object(objectName, creds, config) + object, err := NewS3Object(objectName, creds, config) require.NoError(t, err) @@ -155,7 +154,7 @@ func TestS3ObjectUploadLimitReached(t *testing.T) { tmpDir := t.TempDir() objectName := filepath.Join(tmpDir, "s3-test-data") - object, err := objectstore.NewS3Object(objectName, creds, config) + object, err := NewS3Object(objectName, creds, config) require.NoError(t, err) _, err = object.Consume(context.Background(), &failedReader{}, deadline) diff --git a/workhorse/internal/upload/destination/objectstore/s3api/s3api.go b/workhorse/internal/upload/destination/objectstore/s3api/s3api.go new file mode 100644 index 00000000000..49ab9347911 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/s3api/s3api.go @@ -0,0 +1,37 @@ +package s3api + +import ( + "encoding/xml" + "fmt" +) + +// CompleteMultipartUploadError is the in-body error structure +// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html#mpUploadComplete-examples +// the answer contains other fields we are not using +type CompleteMultipartUploadError struct { + XMLName xml.Name `xml:"Error"` + Code string + Message string +} + +func (c *CompleteMultipartUploadError) Error() string { + return fmt.Sprintf("CompleteMultipartUpload remote error %q: %s", c.Code, c.Message) +} + +// CompleteMultipartUploadResult is the S3 answer to CompleteMultipartUpload request +type CompleteMultipartUploadResult struct { + Location string + Bucket string + Key string + ETag string +} + +// CompleteMultipartUpload is the S3 CompleteMultipartUpload body +type CompleteMultipartUpload struct { + Part []*CompleteMultipartUploadPart +} + +type CompleteMultipartUploadPart struct { + PartNumber int + ETag string +} diff --git a/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go index 1a380bd5083..8fbb746d6ce 100644 --- a/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go +++ b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go @@ -12,7 +12,7 @@ import ( "strings" "sync" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/s3api" ) type partsEtagMap map[int]string @@ -190,8 +190,8 @@ func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) } -func MultipartUploadInternalError() *objectstore.CompleteMultipartUploadError { - return &objectstore.CompleteMultipartUploadError{Code: "InternalError", Message: "malformed object path"} +func MultipartUploadInternalError() *s3api.CompleteMultipartUploadError { + return &s3api.CompleteMultipartUploadError{Code: "InternalError", Message: "malformed object path"} } func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http.Request) { @@ -212,7 +212,7 @@ func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http return } - var msg objectstore.CompleteMultipartUpload + var msg s3api.CompleteMultipartUpload err = xml.Unmarshal(buf, &msg) if err != nil { http.Error(w, err.Error(), 400) @@ -245,7 +245,7 @@ func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http bucket := split[0] key := split[1] - answer := objectstore.CompleteMultipartUploadResult{ + answer := s3api.CompleteMultipartUploadResult{ Location: r.URL.String(), Bucket: bucket, Key: key, diff --git a/workhorse/internal/upload/destination/objectstore/uploader.go b/workhorse/internal/upload/destination/objectstore/uploader.go index 43e573872ee..798a693aa93 100644 --- a/workhorse/internal/upload/destination/objectstore/uploader.go +++ b/workhorse/internal/upload/destination/objectstore/uploader.go @@ -38,11 +38,21 @@ func newETagCheckUploader(strategy uploadStrategy, metrics bool) *uploader { func hexString(h hash.Hash) string { return hex.EncodeToString(h.Sum(nil)) } +func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) { + return u.consume(outerCtx, reader, deadLine, false) +} + +func (u *uploader) ConsumeWithoutDelete(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) { + return u.consume(outerCtx, reader, deadLine, true) +} + // Consume reads the reader until it reaches EOF or an error. It spawns a // goroutine that waits for outerCtx to be done, after which the remote // file is deleted. The deadline applies to the upload performed inside // Consume, not to outerCtx. -func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline time.Time) (_ int64, err error) { +// SkipDelete optionaly call the Delete() function on the strategy once +// rails is done handling the upload request. +func (u *uploader) consume(outerCtx context.Context, reader io.Reader, deadLine time.Time, skipDelete bool) (_ int64, err error) { if u.metrics { objectStorageUploadsOpen.Inc() defer func(started time.Time) { @@ -59,17 +69,25 @@ func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline // "delete" them. if err != nil { u.strategy.Abort() + + if skipDelete { + // skipDelete avoided the object removal (see the goroutine below). Make + // here that the object is deleted if aborted. + u.strategy.Delete() + } } }() - go func() { - // Once gitlab-rails is done handling the request, we are supposed to - // delete the upload from its temporary location. - <-outerCtx.Done() - u.strategy.Delete() - }() + if !skipDelete { + go func() { + // Once gitlab-rails is done handling the request, we are supposed to + // delete the upload from its temporary location. + <-outerCtx.Done() + u.strategy.Delete() + }() + } - uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadline) + uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadLine) defer cancelFn() var hasher hash.Hash diff --git a/workhorse/internal/upload/destination/upload_opts.go b/workhorse/internal/upload/destination/upload_opts.go index 58427b38b30..72efaebc16c 100644 --- a/workhorse/internal/upload/destination/upload_opts.go +++ b/workhorse/internal/upload/destination/upload_opts.go @@ -39,6 +39,8 @@ type UploadOpts struct { PresignedPut string // PresignedDelete is a presigned S3 DeleteObject compatible URL. PresignedDelete string + // Whether Workhorse needs to delete the temporary object or not. + SkipDelete bool // HTTP headers to be sent along with PUT request PutHeaders map[string]string // Whether to ignore Rails pre-signed URLs and have Workhorse directly access object storage provider @@ -61,6 +63,8 @@ type UploadOpts struct { PresignedCompleteMultipart string // PresignedAbortMultipart is a presigned URL for AbortMultipartUpload PresignedAbortMultipart string + // UploadHashFunctions contains a list of allowed hash functions (md5, sha1, etc.) + UploadHashFunctions []string } // UseWorkhorseClientEnabled checks if the options require direct access to object storage @@ -90,16 +94,18 @@ func GetOpts(apiResponse *api.Response) (*UploadOpts, error) { } opts := UploadOpts{ - LocalTempPath: apiResponse.TempPath, - RemoteID: apiResponse.RemoteObject.ID, - RemoteURL: apiResponse.RemoteObject.GetURL, - PresignedPut: apiResponse.RemoteObject.StoreURL, - PresignedDelete: apiResponse.RemoteObject.DeleteURL, - PutHeaders: apiResponse.RemoteObject.PutHeaders, - UseWorkhorseClient: apiResponse.RemoteObject.UseWorkhorseClient, - RemoteTempObjectID: apiResponse.RemoteObject.RemoteTempObjectID, - Deadline: time.Now().Add(timeout), - MaximumSize: apiResponse.MaximumSize, + LocalTempPath: apiResponse.TempPath, + RemoteID: apiResponse.RemoteObject.ID, + RemoteURL: apiResponse.RemoteObject.GetURL, + PresignedPut: apiResponse.RemoteObject.StoreURL, + PresignedDelete: apiResponse.RemoteObject.DeleteURL, + SkipDelete: apiResponse.RemoteObject.SkipDelete, + PutHeaders: apiResponse.RemoteObject.PutHeaders, + UseWorkhorseClient: apiResponse.RemoteObject.UseWorkhorseClient, + RemoteTempObjectID: apiResponse.RemoteObject.RemoteTempObjectID, + Deadline: time.Now().Add(timeout), + MaximumSize: apiResponse.MaximumSize, + UploadHashFunctions: apiResponse.UploadHashFunctions, } if opts.LocalTempPath != "" && opts.RemoteID != "" { diff --git a/workhorse/internal/upload/destination/upload_opts_test.go b/workhorse/internal/upload/destination/upload_opts_test.go index fd9e56db194..0fda3bd2381 100644 --- a/workhorse/internal/upload/destination/upload_opts_test.go +++ b/workhorse/internal/upload/destination/upload_opts_test.go @@ -1,4 +1,4 @@ -package destination_test +package destination import ( "testing" @@ -8,7 +8,6 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" ) @@ -43,7 +42,7 @@ func TestUploadOptsLocalAndRemote(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - opts := destination.UploadOpts{ + opts := UploadOpts{ LocalTempPath: test.localTempPath, PresignedPut: test.presignedPut, PartSize: test.partSize, @@ -103,15 +102,17 @@ func TestGetOpts(t *testing.T) { MultipartUpload: test.multipart, CustomPutHeaders: test.customPutHeaders, PutHeaders: test.putHeaders, + SkipDelete: true, }, } deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) - opts, err := destination.GetOpts(apiResponse) + opts, err := GetOpts(apiResponse) require.NoError(t, err) require.Equal(t, apiResponse.TempPath, opts.LocalTempPath) require.WithinDuration(t, deadline, opts.Deadline, time.Second) require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID) + require.Equal(t, apiResponse.RemoteObject.SkipDelete, opts.SkipDelete) require.Equal(t, apiResponse.RemoteObject.GetURL, opts.RemoteURL) require.Equal(t, apiResponse.RemoteObject.StoreURL, opts.PresignedPut) require.Equal(t, apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete) @@ -155,22 +156,22 @@ func TestGetOptsFail(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - _, err := destination.GetOpts(tc.in) + _, err := GetOpts(tc.in) require.Error(t, err, "expect input to be rejected") }) } } func TestGetOptsDefaultTimeout(t *testing.T) { - deadline := time.Now().Add(destination.DefaultObjectStoreTimeout) - opts, err := destination.GetOpts(&api.Response{TempPath: "/foo/bar"}) + deadline := time.Now().Add(DefaultObjectStoreTimeout) + opts, err := GetOpts(&api.Response{TempPath: "/foo/bar"}) require.NoError(t, err) require.WithinDuration(t, deadline, opts.Deadline, time.Minute) } func TestUseWorkhorseClientEnabled(t *testing.T) { - cfg := destination.ObjectStorageConfig{ + cfg := ObjectStorageConfig{ Provider: "AWS", S3Config: config.S3Config{ Bucket: "test-bucket", @@ -195,7 +196,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) { name string UseWorkhorseClient bool remoteTempObjectID string - objectStorageConfig destination.ObjectStorageConfig + objectStorageConfig ObjectStorageConfig expected bool }{ { @@ -243,7 +244,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) { name: "missing S3 bucket", UseWorkhorseClient: true, remoteTempObjectID: "test-object", - objectStorageConfig: destination.ObjectStorageConfig{ + objectStorageConfig: ObjectStorageConfig{ Provider: "AWS", S3Config: config.S3Config{}, }, @@ -269,7 +270,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) { }, } deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) - opts, err := destination.GetOpts(apiResponse) + opts, err := GetOpts(apiResponse) require.NoError(t, err) opts.ObjectStorageConfig = test.objectStorageConfig @@ -322,7 +323,7 @@ func TestGoCloudConfig(t *testing.T) { }, } deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) - opts, err := destination.GetOpts(apiResponse) + opts, err := GetOpts(apiResponse) require.NoError(t, err) opts.ObjectStorageConfig.URLMux = mux diff --git a/workhorse/internal/upload/object_storage_preparer_test.go b/workhorse/internal/upload/object_storage_preparer_test.go index 56de6bbf7d6..b983d68f1ad 100644 --- a/workhorse/internal/upload/object_storage_preparer_test.go +++ b/workhorse/internal/upload/object_storage_preparer_test.go @@ -1,4 +1,4 @@ -package upload_test +package upload import ( "testing" @@ -7,7 +7,6 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload" "github.com/stretchr/testify/require" ) @@ -38,7 +37,7 @@ func TestPrepareWithS3Config(t *testing.T) { }, } - p := upload.NewObjectStoragePreparer(c) + p := NewObjectStoragePreparer(c) opts, err := p.Prepare(r) require.NoError(t, err) @@ -51,7 +50,7 @@ func TestPrepareWithS3Config(t *testing.T) { func TestPrepareWithNoConfig(t *testing.T) { c := config.Config{} r := &api.Response{RemoteObject: api.RemoteObject{ID: "id"}} - p := upload.NewObjectStoragePreparer(c) + p := NewObjectStoragePreparer(c) opts, err := p.Prepare(r) require.NoError(t, err) diff --git a/workhorse/internal/upload/uploads_test.go b/workhorse/internal/upload/uploads_test.go index cc786079e36..69baa2dab6e 100644 --- a/workhorse/internal/upload/uploads_test.go +++ b/workhorse/internal/upload/uploads_test.go @@ -24,7 +24,6 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper" ) @@ -111,13 +110,7 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) { expectedLen := 12 - if destination.FIPSEnabled() { - expectedLen-- - require.Empty(t, r.FormValue("file.md5"), "file hash md5") - } else { - require.Equal(t, "098f6bcd4621d373cade4e832627b4f6", r.FormValue("file.md5"), "file hash md5") - } - + require.Equal(t, "098f6bcd4621d373cade4e832627b4f6", r.FormValue("file.md5"), "file hash md5") require.Len(t, r.MultipartForm.Value, expectedLen, "multipart form values") w.WriteHeader(202) |