summaryrefslogtreecommitdiff
path: root/vendor/github.com/moby/swarmkit/v2/agent/csi/volumes.go
blob: a2127fc963e5e4258d8d6a15b7f3640c18e9dd85 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
package csi

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/sirupsen/logrus"

	"github.com/docker/docker/pkg/plugingetter"

	"github.com/moby/swarmkit/v2/agent/csi/plugin"
	"github.com/moby/swarmkit/v2/agent/exec"
	"github.com/moby/swarmkit/v2/api"
	"github.com/moby/swarmkit/v2/log"
	"github.com/moby/swarmkit/v2/volumequeue"
)

const CSI_CALL_TIMEOUT = 15 * time.Second

// volumeState keeps track of the state of a volume on this node.
type volumeState struct {
	// volume is the actual VolumeAssignment for this volume
	volume *api.VolumeAssignment
	// remove is true if the volume is to be removed, or false if it should be
	// active.
	remove bool
	// removeCallback is called when the volume is successfully removed.
	removeCallback func(id string)
}

// volumes is a map that keeps all the currently available volumes to the agent
// mapped by volume ID.
type volumes struct {
	// mu guards access to the volumes map.
	mu sync.RWMutex

	// volumes is a mapping of volume ID to volumeState
	volumes map[string]volumeState

	// plugins is the PluginManager, which provides translation to the CSI RPCs
	plugins plugin.PluginManager

	// pendingVolumes is a VolumeQueue which manages which volumes are
	// processed and when.
	pendingVolumes *volumequeue.VolumeQueue
}

// NewManager returns a place to store volumes.
func NewManager(pg plugingetter.PluginGetter, secrets exec.SecretGetter) exec.VolumesManager {
	r := &volumes{
		volumes:        map[string]volumeState{},
		plugins:        plugin.NewPluginManager(pg, secrets),
		pendingVolumes: volumequeue.NewVolumeQueue(),
	}
	go r.retryVolumes()

	return r
}

// retryVolumes runs in a goroutine to retry failing volumes.
func (r *volumes) retryVolumes() {
	ctx := log.WithModule(context.Background(), "node/agent/csi")
	for {
		vid, attempt := r.pendingVolumes.Wait()

		dctx := log.WithFields(ctx, logrus.Fields{
			"volume.id": vid,
			"attempt":   fmt.Sprintf("%d", attempt),
		})

		// this case occurs when the Stop method has been called on
		// pendingVolumes, and means that we should pack up and exit.
		if vid == "" && attempt == 0 {
			break
		}
		r.tryVolume(dctx, vid, attempt)
	}
}

// tryVolume synchronously tries one volume. it puts the volume back into the
// queue if the attempt fails.
func (r *volumes) tryVolume(ctx context.Context, id string, attempt uint) {
	r.mu.RLock()
	vs, ok := r.volumes[id]
	r.mu.RUnlock()

	if !ok {
		return
	}

	// create a sub-context with a timeout. because we can only process one
	// volume at a time, if we rely on the server-side or default timeout, we
	// may be waiting a very long time for a particular volume to fail.
	//
	// TODO(dperny): there is almost certainly a more intelligent way to do
	// this. For example, we could:
	//
	//   * Change code such that we can service volumes managed by different
	//     plugins at the same time.
	//   * Take longer timeouts when we don't have any other volumes in the
	//     queue
	//   * Have interruptible attempts, so that if we're taking longer
	//     timeouts, we can abort them to service new volumes.
	//
	// These are too complicated to be worth the engineering effort at this
	// time.

	timeoutCtx, cancel := context.WithTimeout(ctx, CSI_CALL_TIMEOUT)
	// always gotta call the WithTimeout cancel
	defer cancel()

	if !vs.remove {
		if err := r.publishVolume(timeoutCtx, vs.volume); err != nil {
			log.G(timeoutCtx).WithError(err).Info("publishing volume failed")
			r.pendingVolumes.Enqueue(id, attempt+1)
		}
	} else {
		if err := r.unpublishVolume(timeoutCtx, vs.volume); err != nil {
			log.G(timeoutCtx).WithError(err).Info("upublishing volume failed")
			r.pendingVolumes.Enqueue(id, attempt+1)
		} else {
			// if unpublishing was successful, then call the callback
			vs.removeCallback(id)
		}
	}
}

// Get returns a volume published path for the provided volume ID.  If the volume doesn't exist, returns empty string.
func (r *volumes) Get(volumeID string) (string, error) {
	r.mu.Lock()
	defer r.mu.Unlock()
	if vs, ok := r.volumes[volumeID]; ok {
		if vs.remove {
			// TODO(dperny): use a structured error
			return "", fmt.Errorf("volume being removed")
		}

		if p, err := r.plugins.Get(vs.volume.Driver.Name); err == nil {
			path := p.GetPublishedPath(volumeID)
			if path != "" {
				return path, nil
			}
			// don't put this line here, it spams like crazy.
			// log.L.WithField("method", "(*volumes).Get").Debugf("Path not published for volume:%v", volumeID)
		} else {
			return "", err
		}

	}
	return "", fmt.Errorf("%w: published path is unavailable", exec.ErrDependencyNotReady)
}

// Add adds one or more volumes to the volume map.
func (r *volumes) Add(volumes ...api.VolumeAssignment) {
	r.mu.Lock()
	defer r.mu.Unlock()

	for _, volume := range volumes {
		// if we get an Add operation, then we will always restart the retries.
		v := volume.Copy()
		r.volumes[volume.ID] = volumeState{
			volume: v,
		}
		// enqueue the volume so that we process it
		r.pendingVolumes.Enqueue(volume.ID, 0)
		log.L.WithField("method", "(*volumes).Add").Debugf("Add Volume: %v", volume.VolumeID)
	}
}

// Remove removes one or more volumes from this manager. callback is called
// whenever the removal is successful.
func (r *volumes) Remove(volumes []api.VolumeAssignment, callback func(id string)) {
	r.mu.Lock()
	defer r.mu.Unlock()

	for _, volume := range volumes {
		// if we get a Remove call, then we always restart the retries and
		// attempt removal.
		v := volume.Copy()
		r.volumes[volume.ID] = volumeState{
			volume:         v,
			remove:         true,
			removeCallback: callback,
		}
		r.pendingVolumes.Enqueue(volume.ID, 0)
	}
}

func (r *volumes) publishVolume(ctx context.Context, assignment *api.VolumeAssignment) error {
	log.G(ctx).Info("attempting to publish volume")
	p, err := r.plugins.Get(assignment.Driver.Name)
	if err != nil {
		return err
	}

	// even though this may have succeeded already, the call to NodeStageVolume
	// is idempotent, so we can retry it every time.
	if err := p.NodeStageVolume(ctx, assignment); err != nil {
		return err
	}

	log.G(ctx).Debug("staging volume succeeded, attempting to publish volume")

	return p.NodePublishVolume(ctx, assignment)
}

func (r *volumes) unpublishVolume(ctx context.Context, assignment *api.VolumeAssignment) error {
	log.G(ctx).Info("attempting to unpublish volume")
	p, err := r.plugins.Get(assignment.Driver.Name)
	if err != nil {
		return err
	}

	if err := p.NodeUnpublishVolume(ctx, assignment); err != nil {
		return err
	}

	return p.NodeUnstageVolume(ctx, assignment)
}

func (r *volumes) Plugins() exec.VolumePluginManager {
	return r.plugins
}

// taskRestrictedVolumesProvider restricts the ids to the task.
type taskRestrictedVolumesProvider struct {
	volumes   exec.VolumeGetter
	volumeIDs map[string]struct{}
}

func (sp *taskRestrictedVolumesProvider) Get(volumeID string) (string, error) {
	if _, ok := sp.volumeIDs[volumeID]; !ok {
		return "", fmt.Errorf("task not authorized to access volume %s", volumeID)
	}

	return sp.volumes.Get(volumeID)
}

// Restrict provides a getter that only allows access to the volumes
// referenced by the task.
func Restrict(volumes exec.VolumeGetter, t *api.Task) exec.VolumeGetter {
	vids := map[string]struct{}{}

	for _, v := range t.Volumes {
		vids[v.ID] = struct{}{}
	}

	return &taskRestrictedVolumesProvider{volumes: volumes, volumeIDs: vids}
}