diff options
Diffstat (limited to 'workhorse/internal/objectstore')
19 files changed, 2025 insertions, 0 deletions
diff --git a/workhorse/internal/objectstore/gocloud_object.go b/workhorse/internal/objectstore/gocloud_object.go new file mode 100644 index 00000000000..38545086994 --- /dev/null +++ b/workhorse/internal/objectstore/gocloud_object.go @@ -0,0 +1,100 @@ +package objectstore + +import ( + "context" + "io" + "time" + + "gitlab.com/gitlab-org/labkit/log" + "gocloud.dev/blob" + "gocloud.dev/gcerrors" +) + +type GoCloudObject struct { + bucket *blob.Bucket + mux *blob.URLMux + bucketURL string + objectName string + *uploader +} + +type GoCloudObjectParams struct { + Ctx context.Context + Mux *blob.URLMux + BucketURL string + ObjectName string +} + +func NewGoCloudObject(p *GoCloudObjectParams) (*GoCloudObject, error) { + bucket, err := p.Mux.OpenBucket(p.Ctx, p.BucketURL) + if err != nil { + return nil, err + } + + o := &GoCloudObject{ + bucket: bucket, + mux: p.Mux, + bucketURL: p.BucketURL, + objectName: p.ObjectName, + } + + o.uploader = newUploader(o) + return o, nil +} + +func (o *GoCloudObject) Upload(ctx context.Context, r io.Reader) error { + defer o.bucket.Close() + + writer, err := o.bucket.NewWriter(ctx, o.objectName, nil) + if err != nil { + log.ContextLogger(ctx).WithError(err).Error("error creating GoCloud bucket") + return err + } + + if _, err = io.Copy(writer, r); err != nil { + log.ContextLogger(ctx).WithError(err).Error("error writing to GoCloud bucket") + writer.Close() + return err + } + + if err := writer.Close(); err != nil { + log.ContextLogger(ctx).WithError(err).Error("error closing GoCloud bucket") + return err + } + + return nil +} + +func (o *GoCloudObject) ETag() string { + return "" +} + +func (o *GoCloudObject) Abort() { + o.Delete() +} + +// Delete will always attempt to delete the temporary file. +// According to https://github.com/google/go-cloud/blob/7818961b5c9a112f7e092d3a2d8479cbca80d187/blob/azureblob/azureblob.go#L881-L883, +// if the writer is closed before any Write is called, Close will create an empty file. +func (o *GoCloudObject) Delete() { + if o.bucketURL == "" || o.objectName == "" { + return + } + + // Note we can't use the request context because in a successful + // case, the original request has already completed. + deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background + defer cancel() + + bucket, err := o.mux.OpenBucket(deleteCtx, o.bucketURL) + if err != nil { + log.WithError(err).Error("error opening bucket for delete") + return + } + + if err := bucket.Delete(deleteCtx, o.objectName); err != nil { + if gcerrors.Code(err) != gcerrors.NotFound { + log.WithError(err).Error("error deleting object") + } + } +} diff --git a/workhorse/internal/objectstore/gocloud_object_test.go b/workhorse/internal/objectstore/gocloud_object_test.go new file mode 100644 index 00000000000..4dc9d2d75cc --- /dev/null +++ b/workhorse/internal/objectstore/gocloud_object_test.go @@ -0,0 +1,56 @@ +package objectstore_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper" +) + +func TestGoCloudObjectUpload(t *testing.T) { + mux, _, cleanup := test.SetupGoCloudFileBucket(t, "azuretest") + defer cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + deadline := time.Now().Add(testTimeout) + + objectName := "test.png" + testURL := "azuretest://azure.example.com/test-container" + p := &objectstore.GoCloudObjectParams{Ctx: ctx, Mux: mux, BucketURL: testURL, ObjectName: objectName} + object, err := objectstore.NewGoCloudObject(p) + require.NotNil(t, object) + require.NoError(t, err) + + // copy data + n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.NoError(t, err) + require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch") + + bucket, err := mux.OpenBucket(ctx, testURL) + require.NoError(t, err) + + // Verify the data was copied correctly. + received, err := bucket.ReadAll(ctx, objectName) + require.NoError(t, err) + require.Equal(t, []byte(test.ObjectContent), received) + + cancel() + + testhelper.Retry(t, 5*time.Second, func() error { + exists, err := bucket.Exists(ctx, objectName) + require.NoError(t, err) + + if exists { + return fmt.Errorf("file %s is still present", objectName) + } else { + return nil + } + }) +} diff --git a/workhorse/internal/objectstore/multipart.go b/workhorse/internal/objectstore/multipart.go new file mode 100644 index 00000000000..fd1c0ed487d --- /dev/null +++ b/workhorse/internal/objectstore/multipart.go @@ -0,0 +1,188 @@ +package objectstore + +import ( + "bytes" + "context" + "encoding/xml" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + + "gitlab.com/gitlab-org/labkit/log" + "gitlab.com/gitlab-org/labkit/mask" +) + +// ErrNotEnoughParts will be used when writing more than size * len(partURLs) +var ErrNotEnoughParts = errors.New("not enough Parts") + +// Multipart represents a MultipartUpload on a S3 compatible Object Store service. +// It can be used as io.WriteCloser for uploading an object +type Multipart struct { + PartURLs []string + // CompleteURL is a presigned URL for CompleteMultipartUpload + CompleteURL string + // AbortURL is a presigned URL for AbortMultipartUpload + AbortURL string + // DeleteURL is a presigned URL for RemoveObject + DeleteURL string + PutHeaders map[string]string + partSize int64 + etag string + + *uploader +} + +// NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes +// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent. +// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources +func NewMultipart(partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, partSize int64) (*Multipart, error) { + m := &Multipart{ + PartURLs: partURLs, + CompleteURL: completeURL, + AbortURL: abortURL, + DeleteURL: deleteURL, + PutHeaders: putHeaders, + partSize: partSize, + } + + m.uploader = newUploader(m) + return m, nil +} + +func (m *Multipart) Upload(ctx context.Context, r io.Reader) error { + cmu := &CompleteMultipartUpload{} + for i, partURL := range m.PartURLs { + src := io.LimitReader(r, m.partSize) + part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1) + if err != nil { + return err + } + if part == nil { + break + } else { + cmu.Part = append(cmu.Part, part) + } + } + + n, err := io.Copy(ioutil.Discard, r) + if err != nil { + return fmt.Errorf("drain pipe: %v", err) + } + if n > 0 { + return ErrNotEnoughParts + } + + if err := m.complete(ctx, cmu); err != nil { + return err + } + + return nil +} + +func (m *Multipart) ETag() string { + return m.etag +} +func (m *Multipart) Abort() { + deleteURL(m.AbortURL) +} + +func (m *Multipart) Delete() { + deleteURL(m.DeleteURL) +} + +func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { + file, err := ioutil.TempFile("", "part-buffer") + if err != nil { + return nil, fmt.Errorf("create temporary buffer file: %v", err) + } + defer func(path string) { + if err := os.Remove(path); err != nil { + log.WithError(err).WithField("file", path).Warning("Unable to delete temporary file") + } + }(file.Name()) + + n, err := io.Copy(file, src) + if err != nil { + return nil, err + } + if n == 0 { + return nil, nil + } + + if _, err = file.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err) + } + + etag, err := m.uploadPart(ctx, partURL, putHeaders, file, n) + if err != nil { + return nil, fmt.Errorf("upload part %d: %v", partNumber, err) + } + return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil +} + +func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) { + deadline, ok := ctx.Deadline() + if !ok { + return "", fmt.Errorf("missing deadline") + } + + part, err := newObject(url, "", headers, size, false) + if err != nil { + return "", err + } + + if n, err := part.Consume(ctx, io.LimitReader(body, size), deadline); err != nil || n < size { + if err == nil { + err = io.ErrUnexpectedEOF + } + return "", err + } + + return part.ETag(), nil +} + +func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error { + body, err := xml.Marshal(cmu) + if err != nil { + return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err) + } + + req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("create CompleteMultipartUpload request: %v", err) + } + req.ContentLength = int64(len(body)) + req.Header.Set("Content-Type", "application/xml") + req = req.WithContext(ctx) + + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status) + } + + result := &compoundCompleteMultipartUploadResult{} + decoder := xml.NewDecoder(resp.Body) + if err := decoder.Decode(&result); err != nil { + return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err) + } + + if result.isError() { + return result + } + + if result.CompleteMultipartUploadResult == nil { + return fmt.Errorf("empty CompleteMultipartUploadResult") + } + + m.etag = extractETag(result.ETag) + + return nil +} diff --git a/workhorse/internal/objectstore/multipart_test.go b/workhorse/internal/objectstore/multipart_test.go new file mode 100644 index 00000000000..00d6efc0982 --- /dev/null +++ b/workhorse/internal/objectstore/multipart_test.go @@ -0,0 +1,64 @@ +package objectstore_test + +import ( + "context" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test" +) + +func TestMultipartUploadWithUpcaseETags(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var putCnt, postCnt int + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + defer r.Body.Close() + + // Part upload request + if r.Method == "PUT" { + putCnt++ + + w.Header().Set("ETag", strings.ToUpper(test.ObjectMD5)) + } + + // POST with CompleteMultipartUpload request + if r.Method == "POST" { + completeBody := `<CompleteMultipartUploadResult> + <Bucket>test-bucket</Bucket> + <ETag>No Longer Checked</ETag> + </CompleteMultipartUploadResult>` + postCnt++ + + w.Write([]byte(completeBody)) + } + })) + defer ts.Close() + + deadline := time.Now().Add(testTimeout) + + m, err := objectstore.NewMultipart( + []string{ts.URL}, // a single presigned part URL + ts.URL, // the complete multipart upload URL + "", // no abort + "", // no delete + map[string]string{}, // no custom headers + test.ObjectSize) // parts size equal to the whole content. Only 1 part + require.NoError(t, err) + + _, err = m.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.NoError(t, err) + require.Equal(t, 1, putCnt, "1 part expected") + require.Equal(t, 1, postCnt, "1 complete multipart upload expected") +} diff --git a/workhorse/internal/objectstore/object.go b/workhorse/internal/objectstore/object.go new file mode 100644 index 00000000000..eaf3bfb2e36 --- /dev/null +++ b/workhorse/internal/objectstore/object.go @@ -0,0 +1,114 @@ +package objectstore + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "time" + + "gitlab.com/gitlab-org/labkit/correlation" + "gitlab.com/gitlab-org/labkit/mask" + "gitlab.com/gitlab-org/labkit/tracing" +) + +// httpTransport defines a http.Transport with values +// that are more restrictive than for http.DefaultTransport, +// they define shorter TLS Handshake, and more aggressive connection closing +// to prevent the connection hanging and reduce FD usage +var httpTransport = tracing.NewRoundTripper(correlation.NewInstrumentedRoundTripper(&http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 10 * time.Second, + }).DialContext, + MaxIdleConns: 2, + IdleConnTimeout: 30 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 10 * time.Second, + ResponseHeaderTimeout: 30 * time.Second, +})) + +var httpClient = &http.Client{ + Transport: httpTransport, +} + +// Object represents an object on a S3 compatible Object Store service. +// It can be used as io.WriteCloser for uploading an object +type Object struct { + // putURL is a presigned URL for PutObject + putURL string + // deleteURL is a presigned URL for RemoveObject + deleteURL string + putHeaders map[string]string + size int64 + etag string + metrics bool + + *uploader +} + +type StatusCodeError error + +// NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading. +func NewObject(putURL, deleteURL string, putHeaders map[string]string, size int64) (*Object, error) { + return newObject(putURL, deleteURL, putHeaders, size, true) +} + +func newObject(putURL, deleteURL string, putHeaders map[string]string, size int64, metrics bool) (*Object, error) { + o := &Object{ + putURL: putURL, + deleteURL: deleteURL, + putHeaders: putHeaders, + size: size, + metrics: metrics, + } + + o.uploader = newETagCheckUploader(o, metrics) + return o, nil +} + +func (o *Object) Upload(ctx context.Context, r io.Reader) error { + // we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err) + req, err := http.NewRequest(http.MethodPut, o.putURL, ioutil.NopCloser(r)) + + if err != nil { + return fmt.Errorf("PUT %q: %v", mask.URL(o.putURL), err) + } + req.ContentLength = o.size + + for k, v := range o.putHeaders { + req.Header.Set(k, v) + } + + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("PUT request %q: %v", mask.URL(o.putURL), err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + if o.metrics { + objectStorageUploadRequestsInvalidStatus.Inc() + } + return StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.putURL), resp.Status)) + } + + o.etag = extractETag(resp.Header.Get("ETag")) + + return nil +} + +func (o *Object) ETag() string { + return o.etag +} + +func (o *Object) Abort() { + o.Delete() +} + +func (o *Object) Delete() { + deleteURL(o.deleteURL) +} diff --git a/workhorse/internal/objectstore/object_test.go b/workhorse/internal/objectstore/object_test.go new file mode 100644 index 00000000000..2ec45520e97 --- /dev/null +++ b/workhorse/internal/objectstore/object_test.go @@ -0,0 +1,155 @@ +package objectstore_test + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test" +) + +const testTimeout = 10 * time.Second + +type osFactory func() (*test.ObjectstoreStub, *httptest.Server) + +func testObjectUploadNoErrors(t *testing.T, startObjectStore osFactory, useDeleteURL bool, contentType string) { + osStub, ts := startObjectStore() + defer ts.Close() + + objectURL := ts.URL + test.ObjectPath + var deleteURL string + if useDeleteURL { + deleteURL = objectURL + } + + putHeaders := map[string]string{"Content-Type": contentType} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + deadline := time.Now().Add(testTimeout) + object, err := objectstore.NewObject(objectURL, deleteURL, putHeaders, test.ObjectSize) + require.NoError(t, err) + + // copy data + n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.NoError(t, err) + require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch") + + require.Equal(t, contentType, osStub.GetHeader(test.ObjectPath, "Content-Type")) + + // Checking MD5 extraction + require.Equal(t, osStub.GetObjectMD5(test.ObjectPath), object.ETag()) + + // Checking cleanup + cancel() + require.Equal(t, 1, osStub.PutsCnt(), "Object hasn't been uploaded") + + var expectedDeleteCnt int + if useDeleteURL { + expectedDeleteCnt = 1 + } + // Poll because the object removal is async + for i := 0; i < 100; i++ { + if osStub.DeletesCnt() == expectedDeleteCnt { + break + } + time.Sleep(10 * time.Millisecond) + } + + if useDeleteURL { + require.Equal(t, 1, osStub.DeletesCnt(), "Object hasn't been deleted") + } else { + require.Equal(t, 0, osStub.DeletesCnt(), "Object has been deleted") + } +} + +func TestObjectUpload(t *testing.T) { + t.Run("with delete URL", func(t *testing.T) { + testObjectUploadNoErrors(t, test.StartObjectStore, true, "application/octet-stream") + }) + t.Run("without delete URL", func(t *testing.T) { + testObjectUploadNoErrors(t, test.StartObjectStore, false, "application/octet-stream") + }) + t.Run("with custom content type", func(t *testing.T) { + testObjectUploadNoErrors(t, test.StartObjectStore, false, "image/jpeg") + }) + t.Run("with upcase ETAG", func(t *testing.T) { + factory := func() (*test.ObjectstoreStub, *httptest.Server) { + md5s := map[string]string{ + test.ObjectPath: strings.ToUpper(test.ObjectMD5), + } + + return test.StartObjectStoreWithCustomMD5(md5s) + } + + testObjectUploadNoErrors(t, factory, false, "application/octet-stream") + }) +} + +func TestObjectUpload404(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + deadline := time.Now().Add(testTimeout) + objectURL := ts.URL + test.ObjectPath + object, err := objectstore.NewObject(objectURL, "", map[string]string{}, test.ObjectSize) + require.NoError(t, err) + _, err = object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + + require.Error(t, err) + _, isStatusCodeError := err.(objectstore.StatusCodeError) + require.True(t, isStatusCodeError, "Should fail with StatusCodeError") + require.Contains(t, err.Error(), "404") +} + +type endlessReader struct{} + +func (e *endlessReader) Read(p []byte) (n int, err error) { + for i := 0; i < len(p); i++ { + p[i] = '*' + } + + return len(p), nil +} + +// TestObjectUploadBrokenConnection purpose is to ensure that errors caused by the upload destination get propagated back correctly. +// This is important for troubleshooting in production. +func TestObjectUploadBrokenConnection(t *testing.T) { + // This test server closes connection immediately + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hj, ok := w.(http.Hijacker) + if !ok { + require.FailNow(t, "webserver doesn't support hijacking") + } + conn, _, err := hj.Hijack() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + conn.Close() + })) + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + deadline := time.Now().Add(testTimeout) + objectURL := ts.URL + test.ObjectPath + object, err := objectstore.NewObject(objectURL, "", map[string]string{}, -1) + require.NoError(t, err) + + _, copyErr := object.Consume(ctx, &endlessReader{}, deadline) + require.Error(t, copyErr) + require.NotEqual(t, io.ErrClosedPipe, copyErr, "We are shadowing the real error") +} diff --git a/workhorse/internal/objectstore/prometheus.go b/workhorse/internal/objectstore/prometheus.go new file mode 100644 index 00000000000..20762fb52bc --- /dev/null +++ b/workhorse/internal/objectstore/prometheus.go @@ -0,0 +1,39 @@ +package objectstore + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + objectStorageUploadRequests = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_object_storage_upload_requests", + Help: "How many object storage requests have been processed", + }, + []string{"status"}, + ) + objectStorageUploadsOpen = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "gitlab_workhorse_object_storage_upload_open", + Help: "Describes many object storage requests are open now", + }, + ) + objectStorageUploadBytes = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_object_storage_upload_bytes", + Help: "How many bytes were sent to object storage", + }, + ) + objectStorageUploadTime = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "gitlab_workhorse_object_storage_upload_time", + Help: "How long it took to upload objects", + Buckets: objectStorageUploadTimeBuckets, + }) + + objectStorageUploadRequestsRequestFailed = objectStorageUploadRequests.WithLabelValues("request-failed") + objectStorageUploadRequestsInvalidStatus = objectStorageUploadRequests.WithLabelValues("invalid-status") + + objectStorageUploadTimeBuckets = []float64{.1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100} +) diff --git a/workhorse/internal/objectstore/s3_complete_multipart_api.go b/workhorse/internal/objectstore/s3_complete_multipart_api.go new file mode 100644 index 00000000000..b84f5757f49 --- /dev/null +++ b/workhorse/internal/objectstore/s3_complete_multipart_api.go @@ -0,0 +1,51 @@ +package objectstore + +import ( + "encoding/xml" + "fmt" +) + +// CompleteMultipartUpload is the S3 CompleteMultipartUpload body +type CompleteMultipartUpload struct { + Part []*completeMultipartUploadPart +} + +type completeMultipartUploadPart struct { + PartNumber int + ETag string +} + +// CompleteMultipartUploadResult is the S3 answer to CompleteMultipartUpload request +type CompleteMultipartUploadResult struct { + Location string + Bucket string + Key string + ETag string +} + +// CompleteMultipartUploadError is the in-body error structure +// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html#mpUploadComplete-examples +// the answer contains other fields we are not using +type CompleteMultipartUploadError struct { + XMLName xml.Name `xml:"Error"` + Code string + Message string +} + +func (c *CompleteMultipartUploadError) Error() string { + return fmt.Sprintf("CompleteMultipartUpload remote error %q: %s", c.Code, c.Message) +} + +// compoundCompleteMultipartUploadResult holds both CompleteMultipartUploadResult and CompleteMultipartUploadError +// this allow us to deserialize the response body where the root element can either be Error orCompleteMultipartUploadResult +type compoundCompleteMultipartUploadResult struct { + *CompleteMultipartUploadResult + *CompleteMultipartUploadError + + // XMLName this overrides CompleteMultipartUploadError.XMLName tags + XMLName xml.Name +} + +func (c *compoundCompleteMultipartUploadResult) isError() bool { + return c.CompleteMultipartUploadError != nil +} diff --git a/workhorse/internal/objectstore/s3_object.go b/workhorse/internal/objectstore/s3_object.go new file mode 100644 index 00000000000..1f79f88224f --- /dev/null +++ b/workhorse/internal/objectstore/s3_object.go @@ -0,0 +1,119 @@ +package objectstore + +import ( + "context" + "io" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "gitlab.com/gitlab-org/labkit/log" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" +) + +type S3Object struct { + credentials config.S3Credentials + config config.S3Config + objectName string + uploaded bool + + *uploader +} + +func NewS3Object(objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config) (*S3Object, error) { + o := &S3Object{ + credentials: s3Credentials, + config: s3Config, + objectName: objectName, + } + + o.uploader = newUploader(o) + return o, nil +} + +func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config) { + if s3Config.ServerSideEncryption != "" { + input.ServerSideEncryption = aws.String(s3Config.ServerSideEncryption) + + if s3Config.ServerSideEncryption == s3.ServerSideEncryptionAwsKms && s3Config.SSEKMSKeyID != "" { + input.SSEKMSKeyId = aws.String(s3Config.SSEKMSKeyID) + } + } +} + +func (s *S3Object) Upload(ctx context.Context, r io.Reader) error { + sess, err := setupS3Session(s.credentials, s.config) + if err != nil { + log.WithError(err).Error("error creating S3 session") + return err + } + + uploader := s3manager.NewUploader(sess) + + input := &s3manager.UploadInput{ + Bucket: aws.String(s.config.Bucket), + Key: aws.String(s.objectName), + Body: r, + } + + setEncryptionOptions(input, s.config) + + _, err = uploader.UploadWithContext(ctx, input) + if err != nil { + log.WithError(err).Error("error uploading S3 session") + // Get the root cause, such as ErrEntityTooLarge, so we can return the proper HTTP status code + return unwrapAWSError(err) + } + + s.uploaded = true + + return nil +} + +func (s *S3Object) ETag() string { + return "" +} + +func (s *S3Object) Abort() { + s.Delete() +} + +func (s *S3Object) Delete() { + if !s.uploaded { + return + } + + session, err := setupS3Session(s.credentials, s.config) + if err != nil { + log.WithError(err).Error("error setting up S3 session in delete") + return + } + + svc := s3.New(session) + input := &s3.DeleteObjectInput{ + Bucket: aws.String(s.config.Bucket), + Key: aws.String(s.objectName), + } + + // Note we can't use the request context because in a successful + // case, the original request has already completed. + deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background + defer cancel() + + _, err = svc.DeleteObjectWithContext(deleteCtx, input) + if err != nil { + log.WithError(err).Error("error deleting S3 object", err) + } +} + +// This is needed until https://github.com/aws/aws-sdk-go/issues/2820 is closed. +func unwrapAWSError(e error) error { + if awsErr, ok := e.(awserr.Error); ok { + return unwrapAWSError(awsErr.OrigErr()) + } + + return e +} diff --git a/workhorse/internal/objectstore/s3_object_test.go b/workhorse/internal/objectstore/s3_object_test.go new file mode 100644 index 00000000000..d9ebbd7f979 --- /dev/null +++ b/workhorse/internal/objectstore/s3_object_test.go @@ -0,0 +1,174 @@ +package objectstore_test + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper" +) + +type failedReader struct { + io.Reader +} + +func (r *failedReader) Read(p []byte) (int, error) { + origErr := fmt.Errorf("entity is too large") + return 0, awserr.New("Read", "read failed", origErr) +} + +func TestS3ObjectUpload(t *testing.T) { + testCases := []struct { + encryption string + }{ + {encryption: ""}, + {encryption: s3.ServerSideEncryptionAes256}, + {encryption: s3.ServerSideEncryptionAwsKms}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("encryption=%v", tc.encryption), func(t *testing.T) { + creds, config, sess, ts := test.SetupS3(t, tc.encryption) + defer ts.Close() + + deadline := time.Now().Add(testTimeout) + tmpDir, err := ioutil.TempDir("", "workhorse-test-") + require.NoError(t, err) + defer os.Remove(tmpDir) + + objectName := filepath.Join(tmpDir, "s3-test-data") + ctx, cancel := context.WithCancel(context.Background()) + + object, err := objectstore.NewS3Object(objectName, creds, config) + require.NoError(t, err) + + // copy data + n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.NoError(t, err) + require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch") + + test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent) + test.CheckS3Metadata(t, sess, config, objectName) + + cancel() + + testhelper.Retry(t, 5*time.Second, func() error { + if test.S3ObjectDoesNotExist(t, sess, config, objectName) { + return nil + } + + return fmt.Errorf("file is still present") + }) + }) + } +} + +func TestConcurrentS3ObjectUpload(t *testing.T) { + creds, uploadsConfig, uploadsSession, uploadServer := test.SetupS3WithBucket(t, "uploads", "") + defer uploadServer.Close() + + // This will return a separate S3 endpoint + _, artifactsConfig, artifactsSession, artifactsServer := test.SetupS3WithBucket(t, "artifacts", "") + defer artifactsServer.Close() + + deadline := time.Now().Add(testTimeout) + tmpDir, err := ioutil.TempDir("", "workhorse-test-") + require.NoError(t, err) + defer os.Remove(tmpDir) + + var wg sync.WaitGroup + + for i := 0; i < 4; i++ { + wg.Add(1) + + go func(index int) { + var sess *session.Session + var config config.S3Config + + if index%2 == 0 { + sess = uploadsSession + config = uploadsConfig + } else { + sess = artifactsSession + config = artifactsConfig + } + + name := fmt.Sprintf("s3-test-data-%d", index) + objectName := filepath.Join(tmpDir, name) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + object, err := objectstore.NewS3Object(objectName, creds, config) + require.NoError(t, err) + + // copy data + n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.NoError(t, err) + require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch") + + test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent) + wg.Done() + }(i) + } + + wg.Wait() +} + +func TestS3ObjectUploadCancel(t *testing.T) { + creds, config, _, ts := test.SetupS3(t, "") + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + + deadline := time.Now().Add(testTimeout) + tmpDir, err := ioutil.TempDir("", "workhorse-test-") + require.NoError(t, err) + defer os.Remove(tmpDir) + + objectName := filepath.Join(tmpDir, "s3-test-data") + + object, err := objectstore.NewS3Object(objectName, creds, config) + + require.NoError(t, err) + + // Cancel the transfer before the data has been copied to ensure + // we handle this gracefully. + cancel() + + _, err = object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.Error(t, err) + require.Equal(t, "context canceled", err.Error()) +} + +func TestS3ObjectUploadLimitReached(t *testing.T) { + creds, config, _, ts := test.SetupS3(t, "") + defer ts.Close() + + deadline := time.Now().Add(testTimeout) + tmpDir, err := ioutil.TempDir("", "workhorse-test-") + require.NoError(t, err) + defer os.Remove(tmpDir) + + objectName := filepath.Join(tmpDir, "s3-test-data") + object, err := objectstore.NewS3Object(objectName, creds, config) + require.NoError(t, err) + + _, err = object.Consume(context.Background(), &failedReader{}, deadline) + require.Error(t, err) + require.Equal(t, "entity is too large", err.Error()) +} diff --git a/workhorse/internal/objectstore/s3_session.go b/workhorse/internal/objectstore/s3_session.go new file mode 100644 index 00000000000..ebc8daf534c --- /dev/null +++ b/workhorse/internal/objectstore/s3_session.go @@ -0,0 +1,94 @@ +package objectstore + +import ( + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" +) + +type s3Session struct { + session *session.Session + expiry time.Time +} + +type s3SessionCache struct { + // An S3 session is cached by its input configuration (e.g. region, + // endpoint, path style, etc.), but the bucket is actually + // determined by the type of object to be uploaded (e.g. CI + // artifact, LFS, etc.) during runtime. In practice, we should only + // need one session per Workhorse process if we only allow one + // configuration for many different buckets. However, using a map + // indexed by the config avoids potential pitfalls in case the + // bucket configuration is supplied at startup or we need to support + // multiple S3 endpoints. + sessions map[config.S3Config]*s3Session + sync.Mutex +} + +func (s *s3Session) isExpired() bool { + return time.Now().After(s.expiry) +} + +func newS3SessionCache() *s3SessionCache { + return &s3SessionCache{sessions: make(map[config.S3Config]*s3Session)} +} + +var ( + // By default, it looks like IAM instance profiles may last 6 hours + // (via curl http://169.254.169.254/latest/meta-data/iam/security-credentials/<role_name>), + // but this may be configurable from anywhere for 15 minutes to 12 + // hours. To be safe, refresh AWS sessions every 10 minutes. + sessionExpiration = time.Duration(10 * time.Minute) + sessionCache = newS3SessionCache() +) + +// SetupS3Session initializes a new AWS S3 session and refreshes one if +// necessary. As recommended in https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html, +// sessions should be cached when possible. Sessions are safe to use +// concurrently as long as the session isn't modified. +func setupS3Session(s3Credentials config.S3Credentials, s3Config config.S3Config) (*session.Session, error) { + sessionCache.Lock() + defer sessionCache.Unlock() + + if s, ok := sessionCache.sessions[s3Config]; ok && !s.isExpired() { + return s.session, nil + } + + cfg := &aws.Config{ + Region: aws.String(s3Config.Region), + S3ForcePathStyle: aws.Bool(s3Config.PathStyle), + } + + // In case IAM profiles aren't being used, use the static credentials + if s3Credentials.AwsAccessKeyID != "" && s3Credentials.AwsSecretAccessKey != "" { + cfg.Credentials = credentials.NewStaticCredentials(s3Credentials.AwsAccessKeyID, s3Credentials.AwsSecretAccessKey, "") + } + + if s3Config.Endpoint != "" { + cfg.Endpoint = aws.String(s3Config.Endpoint) + } + + sess, err := session.NewSession(cfg) + if err != nil { + return nil, err + } + + sessionCache.sessions[s3Config] = &s3Session{ + expiry: time.Now().Add(sessionExpiration), + session: sess, + } + + return sess, nil +} + +func ResetS3Session(s3Config config.S3Config) { + sessionCache.Lock() + defer sessionCache.Unlock() + + delete(sessionCache.sessions, s3Config) +} diff --git a/workhorse/internal/objectstore/s3_session_test.go b/workhorse/internal/objectstore/s3_session_test.go new file mode 100644 index 00000000000..8601f305917 --- /dev/null +++ b/workhorse/internal/objectstore/s3_session_test.go @@ -0,0 +1,57 @@ +package objectstore + +import ( + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" +) + +func TestS3SessionSetup(t *testing.T) { + credentials := config.S3Credentials{} + cfg := config.S3Config{Region: "us-west-1", PathStyle: true} + + sess, err := setupS3Session(credentials, cfg) + require.NoError(t, err) + + require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1") + require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle)) + + require.Equal(t, len(sessionCache.sessions), 1) + anotherConfig := cfg + _, err = setupS3Session(credentials, anotherConfig) + require.NoError(t, err) + require.Equal(t, len(sessionCache.sessions), 1) + + ResetS3Session(cfg) +} + +func TestS3SessionExpiry(t *testing.T) { + credentials := config.S3Credentials{} + cfg := config.S3Config{Region: "us-west-1", PathStyle: true} + + sess, err := setupS3Session(credentials, cfg) + require.NoError(t, err) + + require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1") + require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle)) + + firstSession, ok := sessionCache.sessions[cfg] + require.True(t, ok) + require.False(t, firstSession.isExpired()) + + firstSession.expiry = time.Now().Add(-1 * time.Second) + require.True(t, firstSession.isExpired()) + + _, err = setupS3Session(credentials, cfg) + require.NoError(t, err) + + nextSession, ok := sessionCache.sessions[cfg] + require.True(t, ok) + require.False(t, nextSession.isExpired()) + + ResetS3Session(cfg) +} diff --git a/workhorse/internal/objectstore/test/consts.go b/workhorse/internal/objectstore/test/consts.go new file mode 100644 index 00000000000..7a1bcc28d45 --- /dev/null +++ b/workhorse/internal/objectstore/test/consts.go @@ -0,0 +1,19 @@ +package test + +// Some useful const for testing purpose +const ( + // ObjectContent an example textual content + ObjectContent = "TEST OBJECT CONTENT" + // ObjectSize is the ObjectContent size + ObjectSize = int64(len(ObjectContent)) + // Objectpath is an example remote object path (including bucket name) + ObjectPath = "/bucket/object" + // ObjectMD5 is ObjectContent MD5 hash + ObjectMD5 = "42d000eea026ee0760677e506189cb33" + // ObjectSHA1 is ObjectContent SHA1 hash + ObjectSHA1 = "173cfd58c6b60cb910f68a26cbb77e3fc5017a6d" + // ObjectSHA256 is ObjectContent SHA256 hash + ObjectSHA256 = "b0257e9e657ef19b15eed4fbba975bd5238d651977564035ef91cb45693647aa" + // ObjectSHA512 is ObjectContent SHA512 hash + ObjectSHA512 = "51af8197db2047f7894652daa7437927bf831d5aa63f1b0b7277c4800b06f5e3057251f0e4c2d344ca8c2daf1ffc08a28dd3b2f5fe0e316d3fd6c3af58c34b97" +) diff --git a/workhorse/internal/objectstore/test/gocloud_stub.go b/workhorse/internal/objectstore/test/gocloud_stub.go new file mode 100644 index 00000000000..cf22075e407 --- /dev/null +++ b/workhorse/internal/objectstore/test/gocloud_stub.go @@ -0,0 +1,47 @@ +package test + +import ( + "context" + "io/ioutil" + "net/url" + "os" + "testing" + + "github.com/stretchr/testify/require" + "gocloud.dev/blob" + "gocloud.dev/blob/fileblob" +) + +type dirOpener struct { + tmpDir string +} + +func (o *dirOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) { + return fileblob.OpenBucket(o.tmpDir, nil) +} + +func SetupGoCloudFileBucket(t *testing.T, scheme string) (m *blob.URLMux, bucketDir string, cleanup func()) { + tmpDir, err := ioutil.TempDir("", "") + require.NoError(t, err) + + mux := new(blob.URLMux) + fake := &dirOpener{tmpDir: tmpDir} + mux.RegisterBucket(scheme, fake) + cleanup = func() { + os.RemoveAll(tmpDir) + } + + return mux, tmpDir, cleanup +} + +func GoCloudObjectExists(t *testing.T, bucketDir string, objectName string) { + bucket, err := fileblob.OpenBucket(bucketDir, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) // lint:allow context.Background + defer cancel() + + exists, err := bucket.Exists(ctx, objectName) + require.NoError(t, err) + require.True(t, exists) +} diff --git a/workhorse/internal/objectstore/test/objectstore_stub.go b/workhorse/internal/objectstore/test/objectstore_stub.go new file mode 100644 index 00000000000..31ef4913305 --- /dev/null +++ b/workhorse/internal/objectstore/test/objectstore_stub.go @@ -0,0 +1,278 @@ +package test + +import ( + "crypto/md5" + "encoding/hex" + "encoding/xml" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "sync" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore" +) + +type partsEtagMap map[int]string + +// ObjectstoreStub is a testing implementation of ObjectStore. +// Instead of storing objects it will just save md5sum. +type ObjectstoreStub struct { + // bucket contains md5sum of uploaded objects + bucket map[string]string + // overwriteMD5 contains overwrites for md5sum that should be return instead of the regular hash + overwriteMD5 map[string]string + // multipart is a map of MultipartUploads + multipart map[string]partsEtagMap + // HTTP header sent along request + headers map[string]*http.Header + + puts int + deletes int + + m sync.Mutex +} + +// StartObjectStore will start an ObjectStore stub +func StartObjectStore() (*ObjectstoreStub, *httptest.Server) { + return StartObjectStoreWithCustomMD5(make(map[string]string)) +} + +// StartObjectStoreWithCustomMD5 will start an ObjectStore stub: md5Hashes contains overwrites for md5sum that should be return on PutObject +func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStub, *httptest.Server) { + os := &ObjectstoreStub{ + bucket: make(map[string]string), + multipart: make(map[string]partsEtagMap), + overwriteMD5: make(map[string]string), + headers: make(map[string]*http.Header), + } + + for k, v := range md5Hashes { + os.overwriteMD5[k] = v + } + + return os, httptest.NewServer(os) +} + +// PutsCnt counts PutObject invocations +func (o *ObjectstoreStub) PutsCnt() int { + o.m.Lock() + defer o.m.Unlock() + + return o.puts +} + +// DeletesCnt counts DeleteObject invocation of a valid object +func (o *ObjectstoreStub) DeletesCnt() int { + o.m.Lock() + defer o.m.Unlock() + + return o.deletes +} + +// GetObjectMD5 return the calculated MD5 of the object uploaded to path +// it will return an empty string if no object has been uploaded on such path +func (o *ObjectstoreStub) GetObjectMD5(path string) string { + o.m.Lock() + defer o.m.Unlock() + + return o.bucket[path] +} + +// GetHeader returns a given HTTP header of the object uploaded to the path +func (o *ObjectstoreStub) GetHeader(path, key string) string { + o.m.Lock() + defer o.m.Unlock() + + if val, ok := o.headers[path]; ok { + return val.Get(key) + } + + return "" +} + +// InitiateMultipartUpload prepare the ObjectstoreStob to receive a MultipartUpload on path +// It will return an error if a MultipartUpload is already in progress on that path +// InitiateMultipartUpload is only used during test setup. +// Workhorse's production code does not know how to initiate a multipart upload. +// +// Real S3 multipart uploads are more complicated than what we do here, +// but this is enough to verify that workhorse's production code behaves as intended. +func (o *ObjectstoreStub) InitiateMultipartUpload(path string) error { + o.m.Lock() + defer o.m.Unlock() + + if o.multipart[path] != nil { + return fmt.Errorf("MultipartUpload for %q already in progress", path) + } + + o.multipart[path] = make(partsEtagMap) + return nil +} + +// IsMultipartUpload check if the given path has a MultipartUpload in progress +func (o *ObjectstoreStub) IsMultipartUpload(path string) bool { + o.m.Lock() + defer o.m.Unlock() + + return o.isMultipartUpload(path) +} + +// isMultipartUpload is the lock free version of IsMultipartUpload +func (o *ObjectstoreStub) isMultipartUpload(path string) bool { + return o.multipart[path] != nil +} + +func (o *ObjectstoreStub) removeObject(w http.ResponseWriter, r *http.Request) { + o.m.Lock() + defer o.m.Unlock() + + objectPath := r.URL.Path + if o.isMultipartUpload(objectPath) { + o.deletes++ + delete(o.multipart, objectPath) + + w.WriteHeader(200) + } else if _, ok := o.bucket[objectPath]; ok { + o.deletes++ + delete(o.bucket, objectPath) + + w.WriteHeader(200) + } else { + w.WriteHeader(404) + } +} + +func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) { + o.m.Lock() + defer o.m.Unlock() + + objectPath := r.URL.Path + + etag, overwritten := o.overwriteMD5[objectPath] + if !overwritten { + hasher := md5.New() + io.Copy(hasher, r.Body) + + checksum := hasher.Sum(nil) + etag = hex.EncodeToString(checksum) + } + + o.headers[objectPath] = &r.Header + o.puts++ + if o.isMultipartUpload(objectPath) { + pNumber := r.URL.Query().Get("partNumber") + idx, err := strconv.Atoi(pNumber) + if err != nil { + http.Error(w, fmt.Sprintf("malformed partNumber: %v", err), 400) + return + } + + o.multipart[objectPath][idx] = etag + } else { + o.bucket[objectPath] = etag + } + + w.Header().Set("ETag", etag) + w.WriteHeader(200) +} + +func MultipartUploadInternalError() *objectstore.CompleteMultipartUploadError { + return &objectstore.CompleteMultipartUploadError{Code: "InternalError", Message: "malformed object path"} +} + +func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http.Request) { + o.m.Lock() + defer o.m.Unlock() + + objectPath := r.URL.Path + + multipart := o.multipart[objectPath] + if multipart == nil { + http.Error(w, "Unknown MultipartUpload", 404) + return + } + + buf, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + + var msg objectstore.CompleteMultipartUpload + err = xml.Unmarshal(buf, &msg) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + + for _, part := range msg.Part { + etag := multipart[part.PartNumber] + if etag != part.ETag { + msg := fmt.Sprintf("ETag mismatch on part %d. Expected %q got %q", part.PartNumber, etag, part.ETag) + http.Error(w, msg, 400) + return + } + } + + etag, overwritten := o.overwriteMD5[objectPath] + if !overwritten { + etag = "CompleteMultipartUploadETag" + } + + o.bucket[objectPath] = etag + delete(o.multipart, objectPath) + + w.Header().Set("ETag", etag) + split := strings.SplitN(objectPath[1:], "/", 2) + if len(split) < 2 { + encodeXMLAnswer(w, MultipartUploadInternalError()) + return + } + + bucket := split[0] + key := split[1] + answer := objectstore.CompleteMultipartUploadResult{ + Location: r.URL.String(), + Bucket: bucket, + Key: key, + ETag: etag, + } + encodeXMLAnswer(w, answer) +} + +func encodeXMLAnswer(w http.ResponseWriter, answer interface{}) { + w.Header().Set("Content-Type", "text/xml") + + enc := xml.NewEncoder(w) + if err := enc.Encode(answer); err != nil { + http.Error(w, err.Error(), 500) + } +} + +func (o *ObjectstoreStub) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Body != nil { + defer r.Body.Close() + } + + fmt.Println("ObjectStore Stub:", r.Method, r.URL.String()) + + if r.URL.Path == "" { + http.Error(w, "No path provided", 404) + return + } + + switch r.Method { + case "DELETE": + o.removeObject(w, r) + case "PUT": + o.putObject(w, r) + case "POST": + o.completeMultipartUpload(w, r) + default: + w.WriteHeader(404) + } +} diff --git a/workhorse/internal/objectstore/test/objectstore_stub_test.go b/workhorse/internal/objectstore/test/objectstore_stub_test.go new file mode 100644 index 00000000000..8c0d52a2d79 --- /dev/null +++ b/workhorse/internal/objectstore/test/objectstore_stub_test.go @@ -0,0 +1,167 @@ +package test + +import ( + "fmt" + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func doRequest(method, url string, body io.Reader) error { + req, err := http.NewRequest(method, url, body) + if err != nil { + return err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + return resp.Body.Close() +} + +func TestObjectStoreStub(t *testing.T) { + stub, ts := StartObjectStore() + defer ts.Close() + + require.Equal(t, 0, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + + objectURL := ts.URL + ObjectPath + + require.NoError(t, doRequest(http.MethodPut, objectURL, strings.NewReader(ObjectContent))) + + require.Equal(t, 1, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + require.Equal(t, ObjectMD5, stub.GetObjectMD5(ObjectPath)) + + require.NoError(t, doRequest(http.MethodDelete, objectURL, nil)) + + require.Equal(t, 1, stub.PutsCnt()) + require.Equal(t, 1, stub.DeletesCnt()) +} + +func TestObjectStoreStubDelete404(t *testing.T) { + stub, ts := StartObjectStore() + defer ts.Close() + + objectURL := ts.URL + ObjectPath + + req, err := http.NewRequest(http.MethodDelete, objectURL, nil) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, 404, resp.StatusCode) + + require.Equal(t, 0, stub.DeletesCnt()) +} + +func TestObjectStoreInitiateMultipartUpload(t *testing.T) { + stub, ts := StartObjectStore() + defer ts.Close() + + path := "/my-multipart" + err := stub.InitiateMultipartUpload(path) + require.NoError(t, err) + + err = stub.InitiateMultipartUpload(path) + require.Error(t, err, "second attempt to open the same MultipartUpload") +} + +func TestObjectStoreCompleteMultipartUpload(t *testing.T) { + stub, ts := StartObjectStore() + defer ts.Close() + + objectURL := ts.URL + ObjectPath + parts := []struct { + number int + content string + contentMD5 string + }{ + { + number: 1, + content: "first part", + contentMD5: "550cf6b6e60f65a0e3104a26e70fea42", + }, { + number: 2, + content: "second part", + contentMD5: "920b914bca0a70780b40881b8f376135", + }, + } + + stub.InitiateMultipartUpload(ObjectPath) + + require.True(t, stub.IsMultipartUpload(ObjectPath)) + require.Equal(t, 0, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + + // Workhorse knows nothing about S3 MultipartUpload, it receives some URLs + // from GitLab-rails and PUTs chunk of data to each of them. + // Then it completes the upload with a final POST + partPutURLs := []string{ + fmt.Sprintf("%s?partNumber=%d", objectURL, 1), + fmt.Sprintf("%s?partNumber=%d", objectURL, 2), + } + completePostURL := objectURL + + for i, partPutURL := range partPutURLs { + part := parts[i] + + require.NoError(t, doRequest(http.MethodPut, partPutURL, strings.NewReader(part.content))) + + require.Equal(t, i+1, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + require.Equal(t, part.contentMD5, stub.multipart[ObjectPath][part.number], "Part %d was not uploaded into ObjectStorage", part.number) + require.Empty(t, stub.GetObjectMD5(ObjectPath), "Part %d was mistakenly uploaded as a single object", part.number) + require.True(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload completed or aborted") + } + + completeBody := fmt.Sprintf(`<CompleteMultipartUpload> + <Part> + <PartNumber>1</PartNumber> + <ETag>%s</ETag> + </Part> + <Part> + <PartNumber>2</PartNumber> + <ETag>%s</ETag> + </Part> + </CompleteMultipartUpload>`, parts[0].contentMD5, parts[1].contentMD5) + require.NoError(t, doRequest(http.MethodPost, completePostURL, strings.NewReader(completeBody))) + + require.Equal(t, len(parts), stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + require.False(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload is still in progress") +} + +func TestObjectStoreAbortMultipartUpload(t *testing.T) { + stub, ts := StartObjectStore() + defer ts.Close() + + stub.InitiateMultipartUpload(ObjectPath) + + require.True(t, stub.IsMultipartUpload(ObjectPath)) + require.Equal(t, 0, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + + objectURL := ts.URL + ObjectPath + require.NoError(t, doRequest(http.MethodPut, fmt.Sprintf("%s?partNumber=%d", objectURL, 1), strings.NewReader(ObjectContent))) + + require.Equal(t, 1, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + require.Equal(t, ObjectMD5, stub.multipart[ObjectPath][1], "Part was not uploaded into ObjectStorage") + require.Empty(t, stub.GetObjectMD5(ObjectPath), "Part was mistakenly uploaded as a single object") + require.True(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload completed or aborted") + + require.NoError(t, doRequest(http.MethodDelete, objectURL, nil)) + + require.Equal(t, 1, stub.PutsCnt()) + require.Equal(t, 1, stub.DeletesCnt()) + require.Empty(t, stub.GetObjectMD5(ObjectPath), "MultiUpload has been completed") + require.False(t, stub.IsMultipartUpload(ObjectPath), "MultiUpload is still in progress") +} diff --git a/workhorse/internal/objectstore/test/s3_stub.go b/workhorse/internal/objectstore/test/s3_stub.go new file mode 100644 index 00000000000..36514b3b887 --- /dev/null +++ b/workhorse/internal/objectstore/test/s3_stub.go @@ -0,0 +1,142 @@ +package test + +import ( + "io/ioutil" + "net/http/httptest" + "os" + "strings" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" + + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + + "github.com/johannesboyne/gofakes3" + "github.com/johannesboyne/gofakes3/backend/s3mem" +) + +func SetupS3(t *testing.T, encryption string) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) { + return SetupS3WithBucket(t, "test-bucket", encryption) +} + +func SetupS3WithBucket(t *testing.T, bucket string, encryption string) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) { + backend := s3mem.New() + faker := gofakes3.New(backend) + ts := httptest.NewServer(faker.Server()) + + creds := config.S3Credentials{ + AwsAccessKeyID: "YOUR-ACCESSKEYID", + AwsSecretAccessKey: "YOUR-SECRETACCESSKEY", + } + + config := config.S3Config{ + Bucket: bucket, + Endpoint: ts.URL, + Region: "eu-central-1", + PathStyle: true, + } + + if encryption != "" { + config.ServerSideEncryption = encryption + + if encryption == s3.ServerSideEncryptionAwsKms { + config.SSEKMSKeyID = "arn:aws:1234" + } + } + + sess, err := session.NewSession(&aws.Config{ + Credentials: credentials.NewStaticCredentials(creds.AwsAccessKeyID, creds.AwsSecretAccessKey, ""), + Endpoint: aws.String(ts.URL), + Region: aws.String(config.Region), + DisableSSL: aws.Bool(true), + S3ForcePathStyle: aws.Bool(true), + }) + require.NoError(t, err) + + // Create S3 service client + svc := s3.New(sess) + + _, err = svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + require.NoError(t, err) + + return creds, config, sess, ts +} + +// S3ObjectExists will fail the test if the file does not exist. +func S3ObjectExists(t *testing.T, sess *session.Session, config config.S3Config, objectName string, expectedBytes string) { + downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) { + require.NoError(t, err) + require.Equal(t, int64(len(expectedBytes)), numBytes) + + output, err := ioutil.ReadFile(tmpfile.Name()) + require.NoError(t, err) + + require.Equal(t, []byte(expectedBytes), output) + }) +} + +func CheckS3Metadata(t *testing.T, sess *session.Session, config config.S3Config, objectName string) { + // In a real S3 provider, s3crypto.NewDecryptionClient should probably be used + svc := s3.New(sess) + result, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(config.Bucket), + Key: aws.String(objectName), + }) + require.NoError(t, err) + + if config.ServerSideEncryption != "" { + require.Equal(t, aws.String(config.ServerSideEncryption), result.ServerSideEncryption) + + if config.ServerSideEncryption == s3.ServerSideEncryptionAwsKms { + require.Equal(t, aws.String(config.SSEKMSKeyID), result.SSEKMSKeyId) + } else { + require.Nil(t, result.SSEKMSKeyId) + } + } else { + require.Nil(t, result.ServerSideEncryption) + require.Nil(t, result.SSEKMSKeyId) + } +} + +// S3ObjectDoesNotExist returns true if the object has been deleted, +// false otherwise. The return signature is different from +// S3ObjectExists because deletion may need to be retried since deferred +// clean up callsinternal/objectstore/test/s3_stub.go may cause the actual deletion to happen after the +// initial check. +func S3ObjectDoesNotExist(t *testing.T, sess *session.Session, config config.S3Config, objectName string) bool { + deleted := false + + downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) { + if err != nil && strings.Contains(err.Error(), "NoSuchKey") { + deleted = true + } + }) + + return deleted +} + +func downloadObject(t *testing.T, sess *session.Session, config config.S3Config, objectName string, handler func(tmpfile *os.File, numBytes int64, err error)) { + tmpDir, err := ioutil.TempDir("", "workhorse-test-") + require.NoError(t, err) + defer os.Remove(tmpDir) + + tmpfile, err := ioutil.TempFile(tmpDir, "s3-output") + require.NoError(t, err) + defer os.Remove(tmpfile.Name()) + + downloadSvc := s3manager.NewDownloader(sess) + numBytes, err := downloadSvc.Download(tmpfile, &s3.GetObjectInput{ + Bucket: aws.String(config.Bucket), + Key: aws.String(objectName), + }) + + handler(tmpfile, numBytes, err) +} diff --git a/workhorse/internal/objectstore/upload_strategy.go b/workhorse/internal/objectstore/upload_strategy.go new file mode 100644 index 00000000000..5707ba5f24e --- /dev/null +++ b/workhorse/internal/objectstore/upload_strategy.go @@ -0,0 +1,46 @@ +package objectstore + +import ( + "context" + "io" + "net/http" + + "gitlab.com/gitlab-org/labkit/log" + "gitlab.com/gitlab-org/labkit/mask" +) + +type uploadStrategy interface { + Upload(ctx context.Context, r io.Reader) error + ETag() string + Abort() + Delete() +} + +func deleteURL(url string) { + if url == "" { + return + } + + req, err := http.NewRequest("DELETE", url, nil) + if err != nil { + log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") + return + } + // TODO: consider adding the context to the outgoing request for better instrumentation + + // here we are not using u.ctx because we must perform cleanup regardless of parent context + resp, err := httpClient.Do(req) + if err != nil { + log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") + return + } + resp.Body.Close() +} + +func extractETag(rawETag string) string { + if rawETag != "" && rawETag[0] == '"' { + rawETag = rawETag[1 : len(rawETag)-1] + } + + return rawETag +} diff --git a/workhorse/internal/objectstore/uploader.go b/workhorse/internal/objectstore/uploader.go new file mode 100644 index 00000000000..aedfbe55ead --- /dev/null +++ b/workhorse/internal/objectstore/uploader.go @@ -0,0 +1,115 @@ +package objectstore + +import ( + "context" + "crypto/md5" + "encoding/hex" + "fmt" + "hash" + "io" + "strings" + "time" + + "gitlab.com/gitlab-org/labkit/log" +) + +// uploader consumes an io.Reader and uploads it using a pluggable uploadStrategy. +type uploader struct { + strategy uploadStrategy + + // In the case of S3 uploads, we have a multipart upload which + // instantiates uploads for the individual parts. We don't want to + // increment metrics for the individual parts, so that is why we have + // this boolean flag. + metrics bool + + // With S3 we compare the MD5 of the data we sent with the ETag returned + // by the object storage server. + checkETag bool +} + +func newUploader(strategy uploadStrategy) *uploader { + return &uploader{strategy: strategy, metrics: true} +} + +func newETagCheckUploader(strategy uploadStrategy, metrics bool) *uploader { + return &uploader{strategy: strategy, metrics: metrics, checkETag: true} +} + +func hexString(h hash.Hash) string { return hex.EncodeToString(h.Sum(nil)) } + +// Consume reads the reader until it reaches EOF or an error. It spawns a +// goroutine that waits for outerCtx to be done, after which the remote +// file is deleted. The deadline applies to the upload performed inside +// Consume, not to outerCtx. +func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline time.Time) (_ int64, err error) { + if u.metrics { + objectStorageUploadsOpen.Inc() + defer func(started time.Time) { + objectStorageUploadsOpen.Dec() + objectStorageUploadTime.Observe(time.Since(started).Seconds()) + if err != nil { + objectStorageUploadRequestsRequestFailed.Inc() + } + }(time.Now()) + } + + defer func() { + // We do this mainly to abort S3 multipart uploads: it is not enough to + // "delete" them. + if err != nil { + u.strategy.Abort() + } + }() + + go func() { + // Once gitlab-rails is done handling the request, we are supposed to + // delete the upload from its temporary location. + <-outerCtx.Done() + u.strategy.Delete() + }() + + uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadline) + defer cancelFn() + + var hasher hash.Hash + if u.checkETag { + hasher = md5.New() + reader = io.TeeReader(reader, hasher) + } + + cr := &countReader{r: reader} + if err := u.strategy.Upload(uploadCtx, cr); err != nil { + return cr.n, err + } + + if u.checkETag { + if err := compareMD5(hexString(hasher), u.strategy.ETag()); err != nil { + log.ContextLogger(uploadCtx).WithError(err).Error("error comparing MD5 checksum") + return cr.n, err + } + } + + objectStorageUploadBytes.Add(float64(cr.n)) + + return cr.n, nil +} + +func compareMD5(local, remote string) error { + if !strings.EqualFold(local, remote) { + return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote) + } + + return nil +} + +type countReader struct { + r io.Reader + n int64 +} + +func (cr *countReader) Read(p []byte) (int, error) { + nRead, err := cr.r.Read(p) + cr.n += int64(nRead) + return nRead, err +} |