summaryrefslogtreecommitdiff
path: root/workhorse/internal/objectstore/s3_object.go
blob: ce0cccc7eb1e24213760464e5dab407229495dca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package objectstore

import (
	"context"
	"io"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/awserr"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
	"gitlab.com/gitlab-org/labkit/log"

	"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
)

type S3Object struct {
	credentials config.S3Credentials
	config      config.S3Config
	objectName  string
	uploaded    bool

	*uploader
}

func NewS3Object(objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config) (*S3Object, error) {
	o := &S3Object{
		credentials: s3Credentials,
		config:      s3Config,
		objectName:  objectName,
	}

	o.uploader = newUploader(o)
	return o, nil
}

func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config) {
	if s3Config.ServerSideEncryption != "" {
		input.ServerSideEncryption = aws.String(s3Config.ServerSideEncryption)

		if s3Config.ServerSideEncryption == s3.ServerSideEncryptionAwsKms && s3Config.SSEKMSKeyID != "" {
			input.SSEKMSKeyId = aws.String(s3Config.SSEKMSKeyID)
		}
	}
}

func (s *S3Object) Upload(ctx context.Context, r io.Reader) error {
	sess, err := setupS3Session(s.credentials, s.config)
	if err != nil {
		log.WithError(err).Error("error creating S3 session")
		return err
	}

	uploader := s3manager.NewUploader(sess)

	input := &s3manager.UploadInput{
		Bucket: aws.String(s.config.Bucket),
		Key:    aws.String(s.objectName),
		Body:   r,
	}

	setEncryptionOptions(input, s.config)

	_, err = uploader.UploadWithContext(ctx, input)
	if err != nil {
		log.WithError(err).Error("error uploading S3 session")
		// Get the root cause, such as ErrEntityTooLarge, so we can return the proper HTTP status code
		return unwrapAWSError(err)
	}

	s.uploaded = true

	return nil
}

func (s *S3Object) ETag() string {
	return ""
}

func (s *S3Object) Abort() {
	s.Delete()
}

func (s *S3Object) Delete() {
	if !s.uploaded {
		return
	}

	session, err := setupS3Session(s.credentials, s.config)
	if err != nil {
		log.WithError(err).Error("error setting up S3 session in delete")
		return
	}

	svc := s3.New(session)
	input := &s3.DeleteObjectInput{
		Bucket: aws.String(s.config.Bucket),
		Key:    aws.String(s.objectName),
	}

	// Note we can't use the request context because in a successful
	// case, the original request has already completed.
	deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background
	defer cancel()

	_, err = svc.DeleteObjectWithContext(deleteCtx, input)
	if err != nil {
		log.WithError(err).Error("error deleting S3 object", err)
	}
}

// This is needed until https://github.com/aws/aws-sdk-go/issues/2820 is closed.
func unwrapAWSError(e error) error {
	if awsErr, ok := e.(awserr.Error); ok {
		return unwrapAWSError(awsErr.OrigErr())
	}

	return e
}