summaryrefslogtreecommitdiff
path: root/distribution/xfer/upload_test.go
blob: 9150a63d7a5cab532ef3672e1edcf9e9b349716f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package xfer // import "github.com/docker/docker/distribution/xfer"

import (
	"context"
	"errors"
	"sync/atomic"
	"testing"
	"time"

	"github.com/docker/distribution"
	"github.com/docker/docker/layer"
	"github.com/docker/docker/pkg/progress"
)

const maxUploadConcurrency = 3

type mockUploadDescriptor struct {
	currentUploads  *int32
	diffID          layer.DiffID
	simulateRetries int
}

// Key returns the key used to deduplicate downloads.
func (u *mockUploadDescriptor) Key() string {
	return u.diffID.String()
}

// ID returns the ID for display purposes.
func (u *mockUploadDescriptor) ID() string {
	return u.diffID.String()
}

// DiffID should return the DiffID for this layer.
func (u *mockUploadDescriptor) DiffID() layer.DiffID {
	return u.diffID
}

// SetRemoteDescriptor is not used in the mock.
func (u *mockUploadDescriptor) SetRemoteDescriptor(remoteDescriptor distribution.Descriptor) {
}

// Upload is called to perform the upload.
func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
	if u.currentUploads != nil {
		defer atomic.AddInt32(u.currentUploads, -1)

		if atomic.AddInt32(u.currentUploads, 1) > maxUploadConcurrency {
			return distribution.Descriptor{}, errors.New("concurrency limit exceeded")
		}
	}

	// Sleep a bit to simulate a time-consuming upload.
	for i := int64(0); i <= 10; i++ {
		select {
		case <-ctx.Done():
			return distribution.Descriptor{}, ctx.Err()
		case <-time.After(10 * time.Millisecond):
			progressOutput.WriteProgress(progress.Progress{ID: u.ID(), Current: i, Total: 10})
		}
	}

	if u.simulateRetries != 0 {
		u.simulateRetries--
		return distribution.Descriptor{}, errors.New("simulating retry")
	}

	return distribution.Descriptor{}, nil
}

func uploadDescriptors(currentUploads *int32) []UploadDescriptor {
	return []UploadDescriptor{
		&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"},
		&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:1515325234325236634634608943609283523908626098235490238423902343"},
		&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:6929356290463485374960346430698374523437683470934634534953453453"},
		&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"},
		&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:8159352387436803946235346346368745389534789534897538734598734987", simulateRetries: 1},
		&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:4637863963478346897346987346987346789346789364879364897364987346"},
	}
}

func TestSuccessfulUpload(t *testing.T) {
	lum := NewLayerUploadManager(maxUploadConcurrency, func(m *LayerUploadManager) { m.waitDuration = time.Millisecond })

	progressChan := make(chan progress.Progress)
	progressDone := make(chan struct{})
	receivedProgress := make(map[string]int64)

	go func() {
		for p := range progressChan {
			receivedProgress[p.ID] = p.Current
		}
		close(progressDone)
	}()

	var currentUploads int32
	descriptors := uploadDescriptors(&currentUploads)

	err := lum.Upload(context.Background(), descriptors, progress.ChanOutput(progressChan))
	if err != nil {
		t.Fatalf("upload error: %v", err)
	}

	close(progressChan)
	<-progressDone
}

func TestCancelledUpload(t *testing.T) {
	lum := NewLayerUploadManager(maxUploadConcurrency, func(m *LayerUploadManager) { m.waitDuration = time.Millisecond })

	progressChan := make(chan progress.Progress)
	progressDone := make(chan struct{})

	go func() {
		for range progressChan {
		}
		close(progressDone)
	}()

	ctx, cancel := context.WithCancel(context.Background())

	go func() {
		<-time.After(time.Millisecond)
		cancel()
	}()

	descriptors := uploadDescriptors(nil)
	err := lum.Upload(ctx, descriptors, progress.ChanOutput(progressChan))
	if err != context.Canceled {
		t.Fatal("expected upload to be cancelled")
	}

	close(progressChan)
	<-progressDone
}