1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
package plugin
import (
"context"
"fmt"
"sync"
"github.com/docker/docker/pkg/plugingetter"
"github.com/moby/swarmkit/v2/api"
)
const (
// DockerCSIPluginCap is the capability name of the plugins we use with the
// PluginGetter to get only the plugins we need. The full name of the
// plugin interface is "docker.csinode/1.0". This gets only plugins with
// Node capabilities.
DockerCSIPluginCap = "csinode"
)
// PluginManager manages the multiple CSI plugins that may be in use on the
// node. PluginManager should be thread-safe.
type PluginManager interface {
// Get gets the plugin with the given name
Get(name string) (NodePlugin, error)
// NodeInfo returns the NodeCSIInfo for every active plugin.
NodeInfo(ctx context.Context) ([]*api.NodeCSIInfo, error)
}
type pluginManager struct {
plugins map[string]NodePlugin
pluginsMu sync.Mutex
// newNodePluginFunc usually points to NewNodePlugin. However, for testing,
// NewNodePlugin can be swapped out with a function that creates fake node
// plugins
newNodePluginFunc func(string, plugingetter.CompatPlugin, plugingetter.PluginAddr, SecretGetter) NodePlugin
// secrets is a SecretGetter for use by node plugins.
secrets SecretGetter
pg plugingetter.PluginGetter
}
func NewPluginManager(pg plugingetter.PluginGetter, secrets SecretGetter) PluginManager {
return &pluginManager{
plugins: map[string]NodePlugin{},
newNodePluginFunc: NewNodePlugin,
secrets: secrets,
pg: pg,
}
}
func (pm *pluginManager) Get(name string) (NodePlugin, error) {
pm.pluginsMu.Lock()
defer pm.pluginsMu.Unlock()
plugin, err := pm.getPlugin(name)
if err != nil {
return nil, fmt.Errorf("cannot get plugin %v: %v", name, err)
}
return plugin, nil
}
func (pm *pluginManager) NodeInfo(ctx context.Context) ([]*api.NodeCSIInfo, error) {
// TODO(dperny): do not acquire this lock for the duration of the the
// function call. that's too long and too blocking.
pm.pluginsMu.Lock()
defer pm.pluginsMu.Unlock()
// first, we should make sure all of the plugins are initialized. do this
// by looking up all the current plugins with DockerCSIPluginCap.
plugins := pm.pg.GetAllManagedPluginsByCap(DockerCSIPluginCap)
for _, plugin := range plugins {
// TODO(dperny): use this opportunity to drop plugins that we're
// tracking but which no longer exist.
// we don't actually need the plugin returned, we just need it loaded
// as a side effect.
pm.getPlugin(plugin.Name())
}
nodeInfo := []*api.NodeCSIInfo{}
for _, plugin := range pm.plugins {
info, err := plugin.NodeGetInfo(ctx)
if err != nil {
// skip any plugin that returns an error
continue
}
nodeInfo = append(nodeInfo, info)
}
return nodeInfo, nil
}
// getPlugin looks up the plugin with the specified name. Loads the plugin if
// not yet loaded.
//
// pm.pluginsMu must be obtained before calling this method.
func (pm *pluginManager) getPlugin(name string) (NodePlugin, error) {
if p, ok := pm.plugins[name]; ok {
return p, nil
}
pc, err := pm.pg.Get(name, DockerCSIPluginCap, plugingetter.Lookup)
if err != nil {
return nil, err
}
pa, ok := pc.(plugingetter.PluginAddr)
if !ok {
return nil, fmt.Errorf("plugin does not implement PluginAddr interface")
}
p := pm.newNodePluginFunc(name, pc, pa, pm.secrets)
pm.plugins[name] = p
return p, nil
}
|