summaryrefslogtreecommitdiff
path: root/vendor/github.com/moby/swarmkit/v2/agent/csi/plugin/manager.go
blob: 39bb60f450abfd9a75f0d25103933427c955cfdd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package plugin

import (
	"context"
	"fmt"
	"sync"

	"github.com/docker/docker/pkg/plugingetter"

	"github.com/moby/swarmkit/v2/api"
)

const (
	// DockerCSIPluginCap is the capability name of the plugins we use with the
	// PluginGetter to get only the plugins we need. The full name of the
	// plugin interface is "docker.csinode/1.0". This gets only plugins with
	// Node capabilities.
	DockerCSIPluginCap = "csinode"
)

// PluginManager manages the multiple CSI plugins that may be in use on the
// node. PluginManager should be thread-safe.
type PluginManager interface {
	// Get gets the plugin with the given name
	Get(name string) (NodePlugin, error)

	// NodeInfo returns the NodeCSIInfo for every active plugin.
	NodeInfo(ctx context.Context) ([]*api.NodeCSIInfo, error)
}

type pluginManager struct {
	plugins   map[string]NodePlugin
	pluginsMu sync.Mutex

	// newNodePluginFunc usually points to NewNodePlugin. However, for testing,
	// NewNodePlugin can be swapped out with a function that creates fake node
	// plugins
	newNodePluginFunc func(string, plugingetter.CompatPlugin, plugingetter.PluginAddr, SecretGetter) NodePlugin

	// secrets is a SecretGetter for use by node plugins.
	secrets SecretGetter

	pg plugingetter.PluginGetter
}

func NewPluginManager(pg plugingetter.PluginGetter, secrets SecretGetter) PluginManager {
	return &pluginManager{
		plugins:           map[string]NodePlugin{},
		newNodePluginFunc: NewNodePlugin,
		secrets:           secrets,
		pg:                pg,
	}
}

func (pm *pluginManager) Get(name string) (NodePlugin, error) {
	pm.pluginsMu.Lock()
	defer pm.pluginsMu.Unlock()

	plugin, err := pm.getPlugin(name)
	if err != nil {
		return nil, fmt.Errorf("cannot get plugin %v: %v", name, err)
	}

	return plugin, nil
}

func (pm *pluginManager) NodeInfo(ctx context.Context) ([]*api.NodeCSIInfo, error) {
	// TODO(dperny): do not acquire this lock for the duration of the the
	// function call. that's too long and too blocking.
	pm.pluginsMu.Lock()
	defer pm.pluginsMu.Unlock()

	// first, we should make sure all of the plugins are initialized. do this
	// by looking up all the current plugins with DockerCSIPluginCap.
	plugins := pm.pg.GetAllManagedPluginsByCap(DockerCSIPluginCap)
	for _, plugin := range plugins {
		// TODO(dperny): use this opportunity to drop plugins that we're
		// tracking but which no longer exist.

		// we don't actually need the plugin returned, we just need it loaded
		// as a side effect.
		pm.getPlugin(plugin.Name())
	}

	nodeInfo := []*api.NodeCSIInfo{}
	for _, plugin := range pm.plugins {
		info, err := plugin.NodeGetInfo(ctx)
		if err != nil {
			// skip any plugin that returns an error
			continue
		}

		nodeInfo = append(nodeInfo, info)
	}
	return nodeInfo, nil
}

// getPlugin looks up the plugin with the specified name. Loads the plugin if
// not yet loaded.
//
// pm.pluginsMu must be obtained before calling this method.
func (pm *pluginManager) getPlugin(name string) (NodePlugin, error) {
	if p, ok := pm.plugins[name]; ok {
		return p, nil
	}

	pc, err := pm.pg.Get(name, DockerCSIPluginCap, plugingetter.Lookup)
	if err != nil {
		return nil, err
	}

	pa, ok := pc.(plugingetter.PluginAddr)
	if !ok {
		return nil, fmt.Errorf("plugin does not implement PluginAddr interface")
	}

	p := pm.newNodePluginFunc(name, pc, pa, pm.secrets)
	pm.plugins[name] = p
	return p, nil
}