summaryrefslogtreecommitdiff
path: root/libgo/go/netchan/common.go
blob: d2cd8efc559753cbc4190e6ee73a07dd4f89022f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package netchan

import (
	"gob"
	"io"
	"os"
	"reflect"
	"sync"
	"time"
)

// The direction of a connection from the client's perspective.
type Dir int

const (
	Recv Dir = iota
	Send
)

func (dir Dir) String() string {
	switch dir {
	case Recv:
		return "Recv"
	case Send:
		return "Send"
	}
	return "???"
}

// Payload types
const (
	payRequest = iota // request structure follows
	payError          // error structure follows
	payData           // user payload follows
	payAck            // acknowledgement; no payload
	payClosed         // channel is now closed
	payAckSend        // payload has been delivered.
)

// A header is sent as a prefix to every transmission.  It will be followed by
// a request structure, an error structure, or an arbitrary user payload structure.
type header struct {
	Id          int
	PayloadType int
	SeqNum      int64
}

// Sent with a header once per channel from importer to exporter to report
// that it wants to bind to a channel with the specified direction for count
// messages, with space for size buffered values. If count is -1, it means unlimited.
type request struct {
	Name  string
	Count int64
	Size  int
	Dir   Dir
}

// Sent with a header to report an error.
type error struct {
	Error string
}

// Used to unify management of acknowledgements for import and export.
type unackedCounter interface {
	unackedCount() int64
	ack() int64
	seq() int64
}

// A channel and its direction.
type chanDir struct {
	ch  *reflect.ChanValue
	dir Dir
}

// clientSet contains the objects and methods needed for tracking
// clients of an exporter and draining outstanding messages.
type clientSet struct {
	mu      sync.Mutex // protects access to channel and client maps
	names   map[string]*chanDir
	clients map[unackedCounter]bool
}

// Mutex-protected encoder and decoder pair.
type encDec struct {
	decLock sync.Mutex
	dec     *gob.Decoder
	encLock sync.Mutex
	enc     *gob.Encoder
}

func newEncDec(conn io.ReadWriter) *encDec {
	return &encDec{
		dec: gob.NewDecoder(conn),
		enc: gob.NewEncoder(conn),
	}
}

// Decode an item from the connection.
func (ed *encDec) decode(value reflect.Value) os.Error {
	ed.decLock.Lock()
	err := ed.dec.DecodeValue(value)
	if err != nil {
		// TODO: tear down connection?
	}
	ed.decLock.Unlock()
	return err
}

// Encode a header and payload onto the connection.
func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error {
	ed.encLock.Lock()
	hdr.PayloadType = payloadType
	err := ed.enc.Encode(hdr)
	if err == nil {
		if payload != nil {
			err = ed.enc.Encode(payload)
		}
	}
	if err != nil {
		// TODO: tear down connection if there is an error?
	}
	ed.encLock.Unlock()
	return err
}

// See the comment for Exporter.Drain.
func (cs *clientSet) drain(timeout int64) os.Error {
	startTime := time.Nanoseconds()
	for {
		pending := false
		cs.mu.Lock()
		// Any messages waiting for a client?
		for _, chDir := range cs.names {
			if chDir.ch.Len() > 0 {
				pending = true
			}
		}
		// Any unacknowledged messages?
		for client := range cs.clients {
			n := client.unackedCount()
			if n > 0 { // Check for > rather than != just to be safe.
				pending = true
				break
			}
		}
		cs.mu.Unlock()
		if !pending {
			break
		}
		if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
			return os.ErrorString("timeout")
		}
		time.Sleep(100 * 1e6) // 100 milliseconds
	}
	return nil
}

// See the comment for Exporter.Sync.
func (cs *clientSet) sync(timeout int64) os.Error {
	startTime := time.Nanoseconds()
	// seq remembers the clients and their seqNum at point of entry.
	seq := make(map[unackedCounter]int64)
	for client := range cs.clients {
		seq[client] = client.seq()
	}
	for {
		pending := false
		cs.mu.Lock()
		// Any unacknowledged messages?  Look only at clients that existed
		// when we started and are still in this client set.
		for client := range seq {
			if _, ok := cs.clients[client]; ok {
				if client.ack() < seq[client] {
					pending = true
					break
				}
			}
		}
		cs.mu.Unlock()
		if !pending {
			break
		}
		if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
			return os.ErrorString("timeout")
		}
		time.Sleep(100 * 1e6) // 100 milliseconds
	}
	return nil
}

// A netChan represents a channel imported or exported
// on a single connection. Flow is controlled by the receiving
// side by sending payAckSend messages when values
// are delivered into the local channel.
type netChan struct {
	*chanDir
	name   string
	id     int
	size   int // buffer size of channel.
	closed bool

	// sender-specific state
	ackCh chan bool // buffered with space for all the acks we need
	space int       // available space.

	// receiver-specific state
	sendCh chan reflect.Value // buffered channel of values received from other end.
	ed     *encDec            // so that we can send acks.
	count  int64              // number of values still to receive.
}

// Create a new netChan with the given name (only used for
// messages), id, direction, buffer size, and count.
// The connection to the other side is represented by ed.
func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
	c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
	if c.dir == Send {
		c.ackCh = make(chan bool, size)
		c.space = size
	}
	return c
}

// Close the channel.
func (nch *netChan) close() {
	if nch.closed {
		return
	}
	if nch.dir == Recv {
		if nch.sendCh != nil {
			// If the sender goroutine is active, close the channel to it.
			// It will close nch.ch when it can.
			close(nch.sendCh)
		} else {
			nch.ch.Close()
		}
	} else {
		nch.ch.Close()
		close(nch.ackCh)
	}
	nch.closed = true
}

// Send message from remote side to local receiver.
func (nch *netChan) send(val reflect.Value) {
	if nch.dir != Recv {
		panic("send on wrong direction of channel")
	}
	if nch.sendCh == nil {
		// If possible, do local send directly and ack immediately.
		if nch.ch.TrySend(val) {
			nch.sendAck()
			return
		}
		// Start sender goroutine to manage delayed delivery of values.
		nch.sendCh = make(chan reflect.Value, nch.size)
		go nch.sender()
	}
	select {
	case nch.sendCh <- val:
		// ok
	default:
		// TODO: should this be more resilient?
		panic("netchan: remote sender sent more values than allowed")
	}
}

// sendAck sends an acknowledgment that a message has left
// the channel's buffer. If the messages remaining to be sent
// will fit in the channel's buffer, then we don't
// need to send an ack.
func (nch *netChan) sendAck() {
	if nch.count < 0 || nch.count > int64(nch.size) {
		nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
	}
	if nch.count > 0 {
		nch.count--
	}
}

// The sender process forwards items from the sending queue
// to the destination channel, acknowledging each item.
func (nch *netChan) sender() {
	if nch.dir != Recv {
		panic("sender on wrong direction of channel")
	}
	// When Exporter.Hangup is called, the underlying channel is closed,
	// and so we may get a "too many operations on closed channel" error
	// if there are outstanding messages in sendCh.
	// Make sure that this doesn't panic the whole program.
	defer func() {
		if r := recover(); r != nil {
			// TODO check that r is "too many operations", otherwise re-panic.
		}
	}()
	for v := range nch.sendCh {
		nch.ch.Send(v)
		nch.sendAck()
	}
	nch.ch.Close()
}

// Receive value from local side for sending to remote side.
func (nch *netChan) recv() (val reflect.Value, ok bool) {
	if nch.dir != Send {
		panic("recv on wrong direction of channel")
	}

	if nch.space == 0 {
		// Wait for buffer space.
		<-nch.ackCh
		nch.space++
	}
	nch.space--
	return nch.ch.Recv()
}

// acked is called when the remote side indicates that
// a value has been delivered.
func (nch *netChan) acked() {
	if nch.dir != Send {
		panic("recv on wrong direction of channel")
	}
	select {
	case nch.ackCh <- true:
		// ok
	default:
		// TODO: should this be more resilient?
		panic("netchan: remote receiver sent too many acks")
	}
}