1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
|
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package netchan
import (
"encoding/gob"
"errors"
"io"
"reflect"
"sync"
"time"
)
// The direction of a connection from the client's perspective.
type Dir int
const (
Recv Dir = iota
Send
)
func (dir Dir) String() string {
switch dir {
case Recv:
return "Recv"
case Send:
return "Send"
}
return "???"
}
// Payload types
const (
payRequest = iota // request structure follows
payError // error structure follows
payData // user payload follows
payAck // acknowledgement; no payload
payClosed // channel is now closed
payAckSend // payload has been delivered.
)
// A header is sent as a prefix to every transmission. It will be followed by
// a request structure, an error structure, or an arbitrary user payload structure.
type header struct {
Id int
PayloadType int
SeqNum int64
}
// Sent with a header once per channel from importer to exporter to report
// that it wants to bind to a channel with the specified direction for count
// messages, with space for size buffered values. If count is -1, it means unlimited.
type request struct {
Name string
Count int64
Size int
Dir Dir
}
// Sent with a header to report an error.
type error_ struct {
Error string
}
// Used to unify management of acknowledgements for import and export.
type unackedCounter interface {
unackedCount() int64
ack() int64
seq() int64
}
// A channel and its direction.
type chanDir struct {
ch reflect.Value
dir Dir
}
// clientSet contains the objects and methods needed for tracking
// clients of an exporter and draining outstanding messages.
type clientSet struct {
mu sync.Mutex // protects access to channel and client maps
names map[string]*chanDir
clients map[unackedCounter]bool
}
// Mutex-protected encoder and decoder pair.
type encDec struct {
decLock sync.Mutex
dec *gob.Decoder
encLock sync.Mutex
enc *gob.Encoder
}
func newEncDec(conn io.ReadWriter) *encDec {
return &encDec{
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(conn),
}
}
// Decode an item from the connection.
func (ed *encDec) decode(value reflect.Value) error {
ed.decLock.Lock()
err := ed.dec.DecodeValue(value)
if err != nil {
// TODO: tear down connection?
}
ed.decLock.Unlock()
return err
}
// Encode a header and payload onto the connection.
func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error {
ed.encLock.Lock()
hdr.PayloadType = payloadType
err := ed.enc.Encode(hdr)
if err == nil {
if payload != nil {
err = ed.enc.Encode(payload)
}
}
if err != nil {
// TODO: tear down connection if there is an error?
}
ed.encLock.Unlock()
return err
}
// See the comment for Exporter.Drain.
func (cs *clientSet) drain(timeout int64) error {
startTime := time.Nanoseconds()
for {
pending := false
cs.mu.Lock()
// Any messages waiting for a client?
for _, chDir := range cs.names {
if chDir.ch.Len() > 0 {
pending = true
}
}
// Any unacknowledged messages?
for client := range cs.clients {
n := client.unackedCount()
if n > 0 { // Check for > rather than != just to be safe.
pending = true
break
}
}
cs.mu.Unlock()
if !pending {
break
}
if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
return errors.New("timeout")
}
time.Sleep(100 * 1e6) // 100 milliseconds
}
return nil
}
// See the comment for Exporter.Sync.
func (cs *clientSet) sync(timeout int64) error {
startTime := time.Nanoseconds()
// seq remembers the clients and their seqNum at point of entry.
seq := make(map[unackedCounter]int64)
for client := range cs.clients {
seq[client] = client.seq()
}
for {
pending := false
cs.mu.Lock()
// Any unacknowledged messages? Look only at clients that existed
// when we started and are still in this client set.
for client := range seq {
if _, ok := cs.clients[client]; ok {
if client.ack() < seq[client] {
pending = true
break
}
}
}
cs.mu.Unlock()
if !pending {
break
}
if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
return errors.New("timeout")
}
time.Sleep(100 * 1e6) // 100 milliseconds
}
return nil
}
// A netChan represents a channel imported or exported
// on a single connection. Flow is controlled by the receiving
// side by sending payAckSend messages when values
// are delivered into the local channel.
type netChan struct {
*chanDir
name string
id int
size int // buffer size of channel.
closed bool
// sender-specific state
ackCh chan bool // buffered with space for all the acks we need
space int // available space.
// receiver-specific state
sendCh chan reflect.Value // buffered channel of values received from other end.
ed *encDec // so that we can send acks.
count int64 // number of values still to receive.
}
// Create a new netChan with the given name (only used for
// messages), id, direction, buffer size, and count.
// The connection to the other side is represented by ed.
func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
if c.dir == Send {
c.ackCh = make(chan bool, size)
c.space = size
}
return c
}
// Close the channel.
func (nch *netChan) close() {
if nch.closed {
return
}
if nch.dir == Recv {
if nch.sendCh != nil {
// If the sender goroutine is active, close the channel to it.
// It will close nch.ch when it can.
close(nch.sendCh)
} else {
nch.ch.Close()
}
} else {
nch.ch.Close()
close(nch.ackCh)
}
nch.closed = true
}
// Send message from remote side to local receiver.
func (nch *netChan) send(val reflect.Value) {
if nch.dir != Recv {
panic("send on wrong direction of channel")
}
if nch.sendCh == nil {
// If possible, do local send directly and ack immediately.
if nch.ch.TrySend(val) {
nch.sendAck()
return
}
// Start sender goroutine to manage delayed delivery of values.
nch.sendCh = make(chan reflect.Value, nch.size)
go nch.sender()
}
select {
case nch.sendCh <- val:
// ok
default:
// TODO: should this be more resilient?
panic("netchan: remote sender sent more values than allowed")
}
}
// sendAck sends an acknowledgment that a message has left
// the channel's buffer. If the messages remaining to be sent
// will fit in the channel's buffer, then we don't
// need to send an ack.
func (nch *netChan) sendAck() {
if nch.count < 0 || nch.count > int64(nch.size) {
nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
}
if nch.count > 0 {
nch.count--
}
}
// The sender process forwards items from the sending queue
// to the destination channel, acknowledging each item.
func (nch *netChan) sender() {
if nch.dir != Recv {
panic("sender on wrong direction of channel")
}
// When Exporter.Hangup is called, the underlying channel is closed,
// and so we may get a "too many operations on closed channel" error
// if there are outstanding messages in sendCh.
// Make sure that this doesn't panic the whole program.
defer func() {
if r := recover(); r != nil {
// TODO check that r is "too many operations", otherwise re-panic.
}
}()
for v := range nch.sendCh {
nch.ch.Send(v)
nch.sendAck()
}
nch.ch.Close()
}
// Receive value from local side for sending to remote side.
func (nch *netChan) recv() (val reflect.Value, ok bool) {
if nch.dir != Send {
panic("recv on wrong direction of channel")
}
if nch.space == 0 {
// Wait for buffer space.
<-nch.ackCh
nch.space++
}
nch.space--
return nch.ch.Recv()
}
// acked is called when the remote side indicates that
// a value has been delivered.
func (nch *netChan) acked() {
if nch.dir != Send {
panic("recv on wrong direction of channel")
}
select {
case nch.ackCh <- true:
// ok
default:
// TODO: should this be more resilient?
panic("netchan: remote receiver sent too many acks")
}
}
|