summaryrefslogtreecommitdiff
path: root/vendor/github.com/moby/swarmkit/v2/agent/reporter.go
blob: 0abb565a03dbe4362bc5284672374d608d9729b7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package agent

import (
	"context"
	"reflect"
	"sync"

	"github.com/moby/swarmkit/v2/api"
	"github.com/moby/swarmkit/v2/log"
)

// StatusReporter receives updates to task status. Method may be called
// concurrently, so implementations should be goroutine-safe.
type StatusReporter interface {
	UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error
}

// Reporter receives update to both task and volume status.
type Reporter interface {
	StatusReporter
	ReportVolumeUnpublished(ctx context.Context, volumeID string) error
}

type statusReporterFunc func(ctx context.Context, taskID string, status *api.TaskStatus) error

func (fn statusReporterFunc) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
	return fn(ctx, taskID, status)
}

//nolint:unused // currently only used in tests.
type volumeReporterFunc func(ctx context.Context, volumeID string) error

//nolint:unused // currently only used in tests.
func (fn volumeReporterFunc) ReportVolumeUnpublished(ctx context.Context, volumeID string) error {
	return fn(ctx, volumeID)
}

//nolint:unused // currently only used in tests.
type statusReporterCombined struct {
	statusReporterFunc
	volumeReporterFunc
}

// statusReporter creates a reliable StatusReporter that will always succeed.
// It handles several tasks at once, ensuring all statuses are reported.
//
// The reporter will continue reporting the current status until it succeeds.
type statusReporter struct {
	reporter Reporter
	statuses map[string]*api.TaskStatus
	// volumes is a set of volumes which are to be reported unpublished.
	volumes map[string]struct{}
	mu      sync.Mutex
	cond    sync.Cond
	closed  bool
}

func newStatusReporter(ctx context.Context, upstream Reporter) *statusReporter {
	r := &statusReporter{
		reporter: upstream,
		statuses: make(map[string]*api.TaskStatus),
		volumes:  make(map[string]struct{}),
	}

	r.cond.L = &r.mu

	go r.run(ctx)
	return r
}

func (sr *statusReporter) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
	sr.mu.Lock()
	defer sr.mu.Unlock()

	current, ok := sr.statuses[taskID]
	if ok {
		if reflect.DeepEqual(current, status) {
			return nil
		}

		if current.State > status.State {
			return nil // ignore old updates
		}
	}
	sr.statuses[taskID] = status
	sr.cond.Signal()

	return nil
}

func (sr *statusReporter) ReportVolumeUnpublished(ctx context.Context, volumeID string) error {
	sr.mu.Lock()
	defer sr.mu.Unlock()

	sr.volumes[volumeID] = struct{}{}
	sr.cond.Signal()

	return nil
}

func (sr *statusReporter) Close() error {
	sr.mu.Lock()
	defer sr.mu.Unlock()

	sr.closed = true
	sr.cond.Signal()

	return nil
}

func (sr *statusReporter) run(ctx context.Context) {
	done := make(chan struct{})
	defer close(done)

	sr.mu.Lock() // released during wait, below.
	defer sr.mu.Unlock()

	go func() {
		select {
		case <-ctx.Done():
			sr.Close()
		case <-done:
			return
		}
	}()

	for {
		if len(sr.statuses) == 0 && len(sr.volumes) == 0 {
			sr.cond.Wait()
		}

		if sr.closed {
			// TODO(stevvooe): Add support here for waiting until all
			// statuses are flushed before shutting down.
			return
		}

		for taskID, status := range sr.statuses {
			delete(sr.statuses, taskID) // delete the entry, while trying to send.

			sr.mu.Unlock()
			err := sr.reporter.UpdateTaskStatus(ctx, taskID, status)
			sr.mu.Lock()

			// reporter might be closed during UpdateTaskStatus call
			if sr.closed {
				return
			}

			if err != nil {
				log.G(ctx).WithError(err).Error("status reporter failed to report status to agent")

				// place it back in the map, if not there, allowing us to pick
				// the value if a new one came in when we were sending the last
				// update.
				if _, ok := sr.statuses[taskID]; !ok {
					sr.statuses[taskID] = status
				}
			}
		}

		for volumeID := range sr.volumes {
			delete(sr.volumes, volumeID)

			sr.mu.Unlock()
			err := sr.reporter.ReportVolumeUnpublished(ctx, volumeID)
			sr.mu.Lock()

			// reporter might be closed during ReportVolumeUnpublished call
			if sr.closed {
				return
			}

			if err != nil {
				log.G(ctx).WithError(err).Error("status reporter failed to report volume status to agent")
				sr.volumes[volumeID] = struct{}{}
			}
		}
	}
}