summaryrefslogtreecommitdiff
path: root/distribution
diff options
context:
space:
mode:
authorSebastiaan van Stijn <github@gone.nl>2022-01-22 12:19:42 +0100
committerSebastiaan van Stijn <github@gone.nl>2022-02-18 16:58:22 +0100
commitd746a836fc810d66ba31695c07821eee29c91a9a (patch)
tree02d53123ca2e4304815e1c8795d5860c325466fb /distribution
parentcf31aa0fa0d1b058bd138569d29d570081481c91 (diff)
downloaddocker-d746a836fc810d66ba31695c07821eee29c91a9a.tar.gz
distribution/xfer: remove TransferManager interface, un-export newTransferManager
The `TransferManager` interface only had a single implementation, and neither `LayerDownloadManager`, nor `LayerUploadManager` currently had an option to provide a custom implementation, so we can un-export this. Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
Diffstat (limited to 'distribution')
-rw-r--r--distribution/xfer/download.go4
-rw-r--r--distribution/xfer/transfer.go19
-rw-r--r--distribution/xfer/transfer_test.go12
-rw-r--r--distribution/xfer/upload.go4
4 files changed, 15 insertions, 24 deletions
diff --git a/distribution/xfer/download.go b/distribution/xfer/download.go
index 6eddfac0dd..eb457830ff 100644
--- a/distribution/xfer/download.go
+++ b/distribution/xfer/download.go
@@ -23,7 +23,7 @@ const maxDownloadAttempts = 5
// layers.
type LayerDownloadManager struct {
layerStore layer.Store
- tm TransferManager
+ tm *transferManager
waitDuration time.Duration
maxDownloadAttempts int
}
@@ -37,7 +37,7 @@ func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager {
manager := LayerDownloadManager{
layerStore: layerStore,
- tm: NewTransferManager(concurrencyLimit),
+ tm: newTransferManager(concurrencyLimit),
waitDuration: time.Second,
maxDownloadAttempts: maxDownloadAttempts,
}
diff --git a/distribution/xfer/transfer.go b/distribution/xfer/transfer.go
index 82df793ebe..8f06f22e89 100644
--- a/distribution/xfer/transfer.go
+++ b/distribution/xfer/transfer.go
@@ -271,18 +271,9 @@ func (t *transfer) Close() {
// This prevents it from taking up a slot.
type DoFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer
-// TransferManager is used by LayerDownloadManager and LayerUploadManager to
-// schedule and deduplicate transfers. It is up to the TransferManager
-// implementation to make the scheduling and concurrency decisions.
-type TransferManager interface {
- // Transfer checks if a transfer with the given key is in progress. If
- // so, it returns progress and error output from that transfer.
- // Otherwise, it will call xferFunc to initiate the transfer.
- Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher)
- // SetConcurrency set the concurrencyLimit so that it is adjustable daemon reload
- SetConcurrency(concurrency int)
-}
-
+// transferManager is used by LayerDownloadManager and LayerUploadManager to
+// schedule and deduplicate transfers. It is up to the transferManager
+// to make the scheduling and concurrency decisions.
type transferManager struct {
mu sync.Mutex
@@ -292,8 +283,8 @@ type transferManager struct {
waitingTransfers []chan struct{}
}
-// NewTransferManager returns a new TransferManager.
-func NewTransferManager(concurrencyLimit int) TransferManager {
+// newTransferManager returns a new transferManager.
+func newTransferManager(concurrencyLimit int) *transferManager {
return &transferManager{
concurrencyLimit: concurrencyLimit,
transfers: make(map[string]Transfer),
diff --git a/distribution/xfer/transfer_test.go b/distribution/xfer/transfer_test.go
index dca5a815bc..f63de0d78e 100644
--- a/distribution/xfer/transfer_test.go
+++ b/distribution/xfer/transfer_test.go
@@ -29,7 +29,7 @@ func TestTransfer(t *testing.T) {
}
}
- tm := NewTransferManager(5)
+ tm := newTransferManager(5)
progressChan := make(chan progress.Progress)
progressDone := make(chan struct{})
receivedProgress := make(map[string]int64)
@@ -91,7 +91,7 @@ func TestConcurrencyLimit(t *testing.T) {
}
}
- tm := NewTransferManager(concurrencyLimit)
+ tm := newTransferManager(concurrencyLimit)
progressChan := make(chan progress.Progress)
progressDone := make(chan struct{})
receivedProgress := make(map[string]int64)
@@ -152,7 +152,7 @@ func TestInactiveJobs(t *testing.T) {
}
}
- tm := NewTransferManager(concurrencyLimit)
+ tm := newTransferManager(concurrencyLimit)
progressChan := make(chan progress.Progress)
progressDone := make(chan struct{})
receivedProgress := make(map[string]int64)
@@ -211,7 +211,7 @@ func TestWatchRelease(t *testing.T) {
}
}
- tm := NewTransferManager(5)
+ tm := newTransferManager(5)
type watcherInfo struct {
watcher *Watcher
@@ -290,7 +290,7 @@ func TestWatchFinishedTransfer(t *testing.T) {
}
}
- tm := NewTransferManager(5)
+ tm := newTransferManager(5)
// Start a transfer
watchers := make([]*Watcher, 3)
@@ -343,7 +343,7 @@ func TestDuplicateTransfer(t *testing.T) {
}
}
- tm := NewTransferManager(5)
+ tm := newTransferManager(5)
type transferInfo struct {
xfer Transfer
diff --git a/distribution/xfer/upload.go b/distribution/xfer/upload.go
index 33b45ad747..98b6799e15 100644
--- a/distribution/xfer/upload.go
+++ b/distribution/xfer/upload.go
@@ -16,7 +16,7 @@ const maxUploadAttempts = 5
// LayerUploadManager provides task management and progress reporting for
// uploads.
type LayerUploadManager struct {
- tm TransferManager
+ tm *transferManager
waitDuration time.Duration
}
@@ -28,7 +28,7 @@ func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
// NewLayerUploadManager returns a new LayerUploadManager.
func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadManager)) *LayerUploadManager {
manager := LayerUploadManager{
- tm: NewTransferManager(concurrencyLimit),
+ tm: newTransferManager(concurrencyLimit),
waitDuration: time.Second,
}
for _, option := range options {