package objectstore import ( "bytes" "context" "encoding/xml" "errors" "fmt" "io" "io/ioutil" "net/http" "os" "gitlab.com/gitlab-org/labkit/mask" ) // ErrNotEnoughParts will be used when writing more than size * len(partURLs) var ErrNotEnoughParts = errors.New("not enough Parts") // Multipart represents a MultipartUpload on a S3 compatible Object Store service. // It can be used as io.WriteCloser for uploading an object type Multipart struct { PartURLs []string // CompleteURL is a presigned URL for CompleteMultipartUpload CompleteURL string // AbortURL is a presigned URL for AbortMultipartUpload AbortURL string // DeleteURL is a presigned URL for RemoveObject DeleteURL string PutHeaders map[string]string partSize int64 etag string *uploader } // NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes // then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent. // In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources func NewMultipart(partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, partSize int64) (*Multipart, error) { m := &Multipart{ PartURLs: partURLs, CompleteURL: completeURL, AbortURL: abortURL, DeleteURL: deleteURL, PutHeaders: putHeaders, partSize: partSize, } m.uploader = newUploader(m) return m, nil } func (m *Multipart) Upload(ctx context.Context, r io.Reader) error { cmu := &CompleteMultipartUpload{} for i, partURL := range m.PartURLs { src := io.LimitReader(r, m.partSize) part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1) if err != nil { return err } if part == nil { break } else { cmu.Part = append(cmu.Part, part) } } n, err := io.Copy(ioutil.Discard, r) if err != nil { return fmt.Errorf("drain pipe: %v", err) } if n > 0 { return ErrNotEnoughParts } if err := m.complete(ctx, cmu); err != nil { return err } return nil } func (m *Multipart) ETag() string { return m.etag } func (m *Multipart) Abort() { deleteURL(m.AbortURL) } func (m *Multipart) Delete() { deleteURL(m.DeleteURL) } func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { file, err := ioutil.TempFile("", "part-buffer") if err != nil { return nil, fmt.Errorf("create temporary buffer file: %v", err) } defer file.Close() if err := os.Remove(file.Name()); err != nil { return nil, err } n, err := io.Copy(file, src) if err != nil { return nil, err } if n == 0 { return nil, nil } if _, err = file.Seek(0, io.SeekStart); err != nil { return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err) } etag, err := m.uploadPart(ctx, partURL, putHeaders, file, n) if err != nil { return nil, fmt.Errorf("upload part %d: %v", partNumber, err) } return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil } func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) { deadline, ok := ctx.Deadline() if !ok { return "", fmt.Errorf("missing deadline") } part, err := newObject(url, "", headers, size, false) if err != nil { return "", err } if n, err := part.Consume(ctx, io.LimitReader(body, size), deadline); err != nil || n < size { if err == nil { err = io.ErrUnexpectedEOF } return "", err } return part.ETag(), nil } func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error { body, err := xml.Marshal(cmu) if err != nil { return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err) } req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body)) if err != nil { return fmt.Errorf("create CompleteMultipartUpload request: %v", err) } req.ContentLength = int64(len(body)) req.Header.Set("Content-Type", "application/xml") req = req.WithContext(ctx) resp, err := httpClient.Do(req) if err != nil { return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status) } result := &compoundCompleteMultipartUploadResult{} decoder := xml.NewDecoder(resp.Body) if err := decoder.Decode(&result); err != nil { return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err) } if result.isError() { return result } if result.CompleteMultipartUploadResult == nil { return fmt.Errorf("empty CompleteMultipartUploadResult") } m.etag = extractETag(result.ETag) return nil }