summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/common/archive/multiplexer.go
blob: fe52b3d520d9e65e4f250fc6c4a8b516811c32c7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
// Copyright (C) MongoDB, Inc. 2014-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package archive

import (
	"fmt"
	"hash"
	"hash/crc64"
	"io"
	"reflect"

	"github.com/mongodb/mongo-tools/common/db"
	"github.com/mongodb/mongo-tools/common/intents"
	"github.com/mongodb/mongo-tools/common/log"
	"gopkg.in/mgo.v2/bson"
)

// bufferSize enables or disables the MuxIn buffering
// TODO: remove this constant and the non-buffered MuxIn implementations
const bufferWrites = true
const bufferSize = db.MaxBSONSize

// Multiplexer is what one uses to create interleaved intents in an archive
type Multiplexer struct {
	Out       io.WriteCloser
	Control   chan *MuxIn
	Completed chan error
	// shutdownInputs allows the mux to tell the intent dumping worker
	// go routines to shutdown, so that we can shutdown
	shutdownInputs notifier
	// ins and selectCases are correlating slices
	ins              []*MuxIn
	selectCases      []reflect.SelectCase
	currentNamespace string
}

type notifier interface {
	Notify()
}

// NewMultiplexer creates a Multiplexer and populates its Control/Completed chans
// it takes a WriteCloser, which is where in inputs will get multiplexed on to,
// and it takes a notifier, which should allow the multiplexer to ask for the shutdown
// of the inputs.
func NewMultiplexer(out io.WriteCloser, shutdownInputs notifier) *Multiplexer {
	mux := &Multiplexer{
		Out:            out,
		Control:        make(chan *MuxIn),
		Completed:      make(chan error),
		shutdownInputs: shutdownInputs,
		ins: []*MuxIn{
			nil, // There is no MuxIn for the Control case
		},
	}
	mux.selectCases = []reflect.SelectCase{
		reflect.SelectCase{
			Dir:  reflect.SelectRecv,
			Chan: reflect.ValueOf(mux.Control),
			Send: reflect.Value{},
		},
	}
	return mux
}

// Run multiplexes until it receives an EOF on its Control chan.
func (mux *Multiplexer) Run() {
	var err, completionErr error
	for {
		index, value, notEOF := reflect.Select(mux.selectCases)
		EOF := !notEOF
		if index == 0 { //Control index
			if EOF {
				log.Logvf(log.DebugLow, "Mux finish")
				mux.Out.Close()
				if completionErr != nil {
					mux.Completed <- completionErr
				} else if len(mux.selectCases) != 1 {
					mux.Completed <- fmt.Errorf("Mux ending but selectCases still open %v",
						len(mux.selectCases))
				} else {
					mux.Completed <- nil
				}
				return
			}
			muxIn, ok := value.Interface().(*MuxIn)
			if !ok {
				mux.Completed <- fmt.Errorf("non MuxIn received on Control chan") // one for the MuxIn.Open
				return
			}
			log.Logvf(log.DebugLow, "Mux open namespace %v", muxIn.Intent.Namespace())
			mux.selectCases = append(mux.selectCases, reflect.SelectCase{
				Dir:  reflect.SelectRecv,
				Chan: reflect.ValueOf(muxIn.writeChan),
				Send: reflect.Value{},
			})
			mux.ins = append(mux.ins, muxIn)
		} else {
			if EOF {
				// We need to let the MuxIn know that we've
				// noticed this close. This fixes a race where
				// the intent processing threads finish, then the main
				// thread closes the mux's control chan and the mux
				// processes the close on the control chan before it processes
				// the close on the MuxIn chan
				mux.ins[index].writeCloseFinishedChan <- struct{}{}

				err = mux.formatEOF(index, mux.ins[index])
				if err != nil {
					mux.shutdownInputs.Notify()
					mux.Out = &nopCloseNopWriter{}
					completionErr = err
				}
				log.Logvf(log.DebugLow, "Mux close namespace %v", mux.ins[index].Intent.Namespace())
				mux.currentNamespace = ""
				mux.selectCases = append(mux.selectCases[:index], mux.selectCases[index+1:]...)
				mux.ins = append(mux.ins[:index], mux.ins[index+1:]...)
			} else {
				bsonBytes, ok := value.Interface().([]byte)
				if !ok {
					mux.Completed <- fmt.Errorf("multiplexer received a value that wasn't a []byte")
					return
				}
				err = mux.formatBody(mux.ins[index], bsonBytes)
				if err != nil {
					mux.shutdownInputs.Notify()
					mux.Out = &nopCloseNopWriter{}
					completionErr = err
				}
			}
		}
	}
}

type nopCloseNopWriter struct{}

func (*nopCloseNopWriter) Close() error                { return nil }
func (*nopCloseNopWriter) Write(p []byte) (int, error) { return len(p), nil }

// formatBody writes the BSON in to the archive, potentially writing a new header
// if the document belongs to a different namespace from the last header.
func (mux *Multiplexer) formatBody(in *MuxIn, bsonBytes []byte) error {
	var err error
	var length int
	defer func() {
		in.writeLenChan <- length
	}()
	if in.Intent.Namespace() != mux.currentNamespace {
		// Handle the change of which DB/Collection we're writing docs for
		// If mux.currentNamespace then we need to terminate the current block
		if mux.currentNamespace != "" {
			l, err := mux.Out.Write(terminatorBytes)
			if err != nil {
				return err
			}
			if l != len(terminatorBytes) {
				return io.ErrShortWrite
			}
		}
		header, err := bson.Marshal(NamespaceHeader{
			Database:   in.Intent.DB,
			Collection: in.Intent.C,
		})
		if err != nil {
			return err
		}
		l, err := mux.Out.Write(header)
		if err != nil {
			return err
		}
		if l != len(header) {
			return io.ErrShortWrite
		}
	}
	mux.currentNamespace = in.Intent.Namespace()
	length, err = mux.Out.Write(bsonBytes)
	if err != nil {
		return err
	}
	return nil
}

// formatEOF writes the EOF header in to the archive
func (mux *Multiplexer) formatEOF(index int, in *MuxIn) error {
	var err error
	if mux.currentNamespace != "" {
		l, err := mux.Out.Write(terminatorBytes)
		if err != nil {
			return err
		}
		if l != len(terminatorBytes) {
			return io.ErrShortWrite
		}
	}
	eofHeader, err := bson.Marshal(NamespaceHeader{
		Database:   in.Intent.DB,
		Collection: in.Intent.C,
		EOF:        true,
		CRC:        int64(in.hash.Sum64()),
	})
	if err != nil {
		return err
	}
	l, err := mux.Out.Write(eofHeader)
	if err != nil {
		return err
	}
	if l != len(eofHeader) {
		return io.ErrShortWrite
	}
	l, err = mux.Out.Write(terminatorBytes)
	if err != nil {
		return err
	}
	if l != len(terminatorBytes) {
		return io.ErrShortWrite
	}
	return nil
}

// MuxIn is an implementation of the intents.file interface.
// They live in the intents, and are potentially owned by different threads than
// the thread owning the Multiplexer.
// They are out the intents write data to the multiplexer
type MuxIn struct {
	writeChan              chan []byte
	writeLenChan           chan int
	writeCloseFinishedChan chan struct{}
	buf                    []byte
	hash                   hash.Hash64
	Intent                 *intents.Intent
	Mux                    *Multiplexer
}

// Read does nothing for MuxIns
func (muxIn *MuxIn) Read([]byte) (int, error) {
	return 0, nil
}

func (muxIn *MuxIn) Pos() int64 {
	return 0
}

// Close closes the chans in the MuxIn.
// Ultimately the multiplexer will detect that they are closed and cause a
// formatEOF to occur.
func (muxIn *MuxIn) Close() error {
	// the mux side of this gets closed in the mux when it gets an eof on the read
	log.Logvf(log.DebugHigh, "MuxIn close %v", muxIn.Intent.Namespace())
	if bufferWrites {
		muxIn.writeChan <- muxIn.buf
		length := <-muxIn.writeLenChan
		if length != len(muxIn.buf) {
			return io.ErrShortWrite
		}
		muxIn.buf = nil
	}
	close(muxIn.writeChan)
	close(muxIn.writeLenChan)
	// We need to wait for the close on the writeChan to be processed before proceeding
	// Otherwise we might assume that all work is finished and exit the program before
	// the mux finishes writing the end of the archive
	<-muxIn.writeCloseFinishedChan
	return nil
}

// Open is implemented in Mux.open, but in short, it creates chans and a select case
// and adds the SelectCase and the MuxIn in to the Multiplexer.
func (muxIn *MuxIn) Open() error {
	log.Logvf(log.DebugHigh, "MuxIn open %v", muxIn.Intent.Namespace())
	muxIn.writeChan = make(chan []byte)
	muxIn.writeLenChan = make(chan int)
	muxIn.writeCloseFinishedChan = make(chan struct{})
	muxIn.buf = make([]byte, 0, bufferSize)
	muxIn.hash = crc64.New(crc64.MakeTable(crc64.ECMA))
	if bufferWrites {
		muxIn.buf = make([]byte, 0, db.MaxBSONSize)
	}
	muxIn.Mux.Control <- muxIn
	return nil
}

// Write hands a buffer to the Multiplexer and receives a written length from the multiplexer
// after the length is received, the buffer is free to be reused.
func (muxIn *MuxIn) Write(buf []byte) (int, error) {
	size := int(
		(uint32(buf[0]) << 0) |
			(uint32(buf[1]) << 8) |
			(uint32(buf[2]) << 16) |
			(uint32(buf[3]) << 24),
	)
	// TODO remove these checks, they're for debugging
	if len(buf) < size {
		panic(fmt.Errorf("corrupt bson in MuxIn.Write (size %v/%v)", size, len(buf)))
	}
	if buf[size-1] != 0 {
		panic(fmt.Errorf("corrupt bson in MuxIn.Write bson has no-zero terminator %v, (size %v/%v)", buf[size-1], size, len(buf)))
	}
	if bufferWrites {
		if len(muxIn.buf)+len(buf) > cap(muxIn.buf) {
			muxIn.writeChan <- muxIn.buf
			length := <-muxIn.writeLenChan
			if length != len(muxIn.buf) {
				return 0, io.ErrShortWrite
			}
			muxIn.buf = muxIn.buf[:0]
		}
		muxIn.buf = append(muxIn.buf, buf...)
	} else {
		muxIn.writeChan <- buf
		length := <-muxIn.writeLenChan
		if length != len(buf) {
			return 0, io.ErrShortWrite
		}
	}
	muxIn.hash.Write(buf)
	return len(buf), nil
}