summaryrefslogtreecommitdiff
path: root/workhorse/internal/upload/destination/objectstore/object_test.go
blob: 24117891b6da3ff4211e8f39f5210b3bbf8d578f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package objectstore_test

import (
	"context"
	"io"
	"net/http"
	"net/http/httptest"
	"strings"
	"testing"
	"time"

	"github.com/stretchr/testify/require"

	"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
	"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)

const testTimeout = 10 * time.Second

type osFactory func() (*test.ObjectstoreStub, *httptest.Server)

func testObjectUploadNoErrors(t *testing.T, startObjectStore osFactory, useDeleteURL bool, contentType string) {
	osStub, ts := startObjectStore()
	defer ts.Close()

	objectURL := ts.URL + test.ObjectPath
	var deleteURL string
	if useDeleteURL {
		deleteURL = objectURL
	}

	putHeaders := map[string]string{"Content-Type": contentType}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	deadline := time.Now().Add(testTimeout)
	object, err := objectstore.NewObject(objectURL, deleteURL, putHeaders, test.ObjectSize)
	require.NoError(t, err)

	// copy data
	n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
	require.NoError(t, err)
	require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")

	require.Equal(t, contentType, osStub.GetHeader(test.ObjectPath, "Content-Type"))

	// Checking MD5 extraction
	require.Equal(t, osStub.GetObjectMD5(test.ObjectPath), object.ETag())

	// Checking cleanup
	cancel()
	require.Equal(t, 1, osStub.PutsCnt(), "Object hasn't been uploaded")

	var expectedDeleteCnt int
	if useDeleteURL {
		expectedDeleteCnt = 1
	}
	require.Eventually(t, func() bool { return osStub.DeletesCnt() == expectedDeleteCnt }, time.Second, time.Millisecond)

	if useDeleteURL {
		require.Equal(t, 1, osStub.DeletesCnt(), "Object hasn't been deleted")
	} else {
		require.Equal(t, 0, osStub.DeletesCnt(), "Object has been deleted")
	}
}

func TestObjectUpload(t *testing.T) {
	t.Run("with delete URL", func(t *testing.T) {
		testObjectUploadNoErrors(t, test.StartObjectStore, true, "application/octet-stream")
	})
	t.Run("without delete URL", func(t *testing.T) {
		testObjectUploadNoErrors(t, test.StartObjectStore, false, "application/octet-stream")
	})
	t.Run("with custom content type", func(t *testing.T) {
		testObjectUploadNoErrors(t, test.StartObjectStore, false, "image/jpeg")
	})
	t.Run("with upcase ETAG", func(t *testing.T) {
		factory := func() (*test.ObjectstoreStub, *httptest.Server) {
			md5s := map[string]string{
				test.ObjectPath: strings.ToUpper(test.ObjectMD5),
			}

			return test.StartObjectStoreWithCustomMD5(md5s)
		}

		testObjectUploadNoErrors(t, factory, false, "application/octet-stream")
	})
}

func TestObjectUpload404(t *testing.T) {
	ts := httptest.NewServer(http.NotFoundHandler())
	defer ts.Close()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	deadline := time.Now().Add(testTimeout)
	objectURL := ts.URL + test.ObjectPath
	object, err := objectstore.NewObject(objectURL, "", map[string]string{}, test.ObjectSize)
	require.NoError(t, err)
	_, err = object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)

	require.Error(t, err)
	_, isStatusCodeError := err.(objectstore.StatusCodeError)
	require.True(t, isStatusCodeError, "Should fail with StatusCodeError")
	require.Contains(t, err.Error(), "404")
}

type endlessReader struct{}

func (e *endlessReader) Read(p []byte) (n int, err error) {
	for i := 0; i < len(p); i++ {
		p[i] = '*'
	}

	return len(p), nil
}

// TestObjectUploadBrokenConnection purpose is to ensure that errors caused by the upload destination get propagated back correctly.
// This is important for troubleshooting in production.
func TestObjectUploadBrokenConnection(t *testing.T) {
	// This test server closes connection immediately
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		hj, ok := w.(http.Hijacker)
		if !ok {
			require.FailNow(t, "webserver doesn't support hijacking")
		}
		conn, _, err := hj.Hijack()
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
		conn.Close()
	}))
	defer ts.Close()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	deadline := time.Now().Add(testTimeout)
	objectURL := ts.URL + test.ObjectPath
	object, err := objectstore.NewObject(objectURL, "", map[string]string{}, -1)
	require.NoError(t, err)

	_, copyErr := object.Consume(ctx, &endlessReader{}, deadline)
	require.Error(t, copyErr)
	require.NotEqual(t, io.ErrClosedPipe, copyErr, "We are shadowing the real error")
}