summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/mongoreplay/playbackfile.go
blob: a5224d27f56fa57cfd7de93011a3cfab750a80a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
// Copyright (C) MongoDB, Inc. 2014-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package mongoreplay

import (
	"compress/gzip"
	"fmt"
	"io"
	"os"
	"runtime"
	"time"

	"github.com/10gen/llmgo/bson"
	"github.com/mongodb/mongo-tools/common/util"
)

const PlaybackFileVersion = 1

type PlaybackFileMetadata struct {
	PlaybackFileVersion int
	DriverOpsFiltered   bool
}

// PlaybackFileReader stores the necessary information for a playback source,
// which is just an io.ReadCloser.
type PlaybackFileReader struct {
	io.ReadSeeker
	fname                   string
	parallelFileReadManager *parallelFileReadManager
	metadata                PlaybackFileMetadata
}

// PlaybackFileWriter stores the necessary information for a playback destination,
// which is an io.WriteCloser and its location.
type PlaybackFileWriter struct {
	io.WriteCloser
	fname string

	metadata PlaybackFileMetadata
}

// GzipReadSeeker wraps an io.ReadSeeker for gzip reading
type GzipReadSeeker struct {
	readSeeker io.ReadSeeker
	*gzip.Reader
}

// NewPlaybackFileReader initializes a new PlaybackFileReader
func NewPlaybackFileReader(filename string, gzip bool) (*PlaybackFileReader, error) {
	var readSeeker io.ReadSeeker

	readSeeker, err := os.Open(filename)
	if err != nil {
		return nil, err
	}

	if gzip {
		readSeeker, err = NewGzipReadSeeker(readSeeker)
		if err != nil {
			return nil, err
		}
	}

	return playbackFileReaderFromReadSeeker(readSeeker, filename)
}

func playbackFileReaderFromReadSeeker(rs io.ReadSeeker, filename string) (*PlaybackFileReader, error) {
	// read the metadata from the file
	metadata := new(PlaybackFileMetadata)
	err := bsonFromReader(rs, metadata)
	if err != nil {
		return nil, fmt.Errorf("error reading metadata: %v", err)
	}

	return &PlaybackFileReader{
		ReadSeeker: rs,
		fname:      filename,

		metadata: *metadata,
	}, nil
}

func (pfReader *PlaybackFileReader) beginParallelRead() {
	pfReader.parallelFileReadManager = &parallelFileReadManager{}
	numWorkers := runtime.NumCPU()
	pfReader.parallelFileReadManager.begin(numWorkers, pfReader.ReadSeeker)
}

// NextRecordedOp iterates through the PlaybackFileReader to yield the next
// RecordedOp. It returns io.EOF when successfully complete.
func (file *PlaybackFileReader) NextRecordedOp() (*RecordedOp, error) {
	return file.parallelFileReadManager.next()
}

// NewPlaybackFileWriter initializes a new PlaybackFileWriter
func NewPlaybackFileWriter(playbackFileName string, driverOpsFiltered, isGzipWriter bool) (*PlaybackFileWriter, error) {
	metadata := PlaybackFileMetadata{
		PlaybackFileVersion: PlaybackFileVersion,
		DriverOpsFiltered:   driverOpsFiltered,
	}

	toolDebugLogger.Logvf(DebugLow, "Opening playback file %v", playbackFileName)
	file, err := os.Create(playbackFileName)
	if err != nil {
		return nil, fmt.Errorf("error opening playback file to write to: %v", err)
	}

	var wc io.WriteCloser
	wc = file

	if isGzipWriter {
		wc = &util.WrappedWriteCloser{gzip.NewWriter(file), file}
	}

	return playbackFileWriterFromWriteCloser(wc, playbackFileName, metadata)
}

func playbackFileWriterFromWriteCloser(wc io.WriteCloser, filename string,
	metadata PlaybackFileMetadata) (*PlaybackFileWriter, error) {

	bsonBytes, err := bson.Marshal(metadata)
	if err != nil {
		return nil, fmt.Errorf("error writing metadata: %v", err)
	}

	_, err = wc.Write(bsonBytes)
	if err != nil {
		return nil, fmt.Errorf("error writing metadata: %v", err)
	}

	return &PlaybackFileWriter{
		WriteCloser: wc,
		fname:       filename,

		metadata: metadata,
	}, nil

}

// NewGzipReadSeeker initializes a new GzipReadSeeker
func NewGzipReadSeeker(rs io.ReadSeeker) (*GzipReadSeeker, error) {
	gzipReader, err := gzip.NewReader(rs)
	if err != nil {
		return nil, err
	}
	return &GzipReadSeeker{rs, gzipReader}, nil
}

// Seek sets the offset for the next Read, and can only seek to the
// beginning of the file.
func (g *GzipReadSeeker) Seek(offset int64, whence int) (int64, error) {
	if whence != 0 || offset != 0 {
		return 0, fmt.Errorf("GzipReadSeeker can only seek to beginning of file")
	}
	_, err := g.readSeeker.Seek(offset, whence)
	if err != nil {
		return 0, err
	}
	g.Reset(g.readSeeker)
	return 0, nil
}

// OpChan runs a goroutine that will read and unmarshal recorded ops
// from a file and push them in to a recorded op chan. Any errors encountered
// are pushed to an error chan. Both the recorded op chan and the error chan are
// returned by the function.
// The error chan won't be readable until the recorded op chan gets closed.
func (pfReader *PlaybackFileReader) OpChan(repeat int) (<-chan *RecordedOp, <-chan error) {
	ch := make(chan *RecordedOp)
	e := make(chan error)

	var last time.Time
	var first time.Time
	var loopDelta time.Duration
	go func() {
		defer close(e)
		e <- func() error {
			defer close(ch)
			toolDebugLogger.Logv(Info, "Beginning playback file read")
			for generation := 0; generation < repeat; generation++ {
				_, err := pfReader.Seek(0, 0)
				if err != nil {
					return fmt.Errorf("PlaybackFile Seek: %v", err)
				}

				// Must read the metadata since file was seeked to 0
				metadata := new(PlaybackFileMetadata)
				err = bsonFromReader(pfReader, metadata)
				if err != nil {
					return fmt.Errorf("bson read error: %v", err)
				}

				pfReader.beginParallelRead()
				var order int64
				for {
					if err = pfReader.parallelFileReadManager.err(); err != nil {
						return err
					}
					recordedOp, err := pfReader.NextRecordedOp()
					if err != nil {
						if err == io.EOF {
							break
						}
						return err
					}
					last = recordedOp.Seen.Time
					if first.IsZero() {
						first = recordedOp.Seen.Time
					}
					recordedOp.Seen.Time = recordedOp.Seen.Add(loopDelta)
					recordedOp.Generation = generation
					recordedOp.Order = order
					// We want to suppress EOF's unless you're in the last
					// generation because all of the ops for one connection
					// across different generations get executed in the same
					// session. We don't want to close the session until the
					// connection closes in the last generation.
					if !recordedOp.EOF || generation == repeat-1 {
						ch <- recordedOp
					}
					order++
				}
				toolDebugLogger.Logvf(DebugHigh, "generation: %v", generation)
				loopDelta += last.Sub(first)
				first = time.Time{}
				continue
			}
			return io.EOF
		}()
	}()
	return ch, e
}