summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/mongoreplay/filter.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/gotools/mongoreplay/filter.go')
-rw-r--r--src/mongo/gotools/mongoreplay/filter.go85
1 files changed, 69 insertions, 16 deletions
diff --git a/src/mongo/gotools/mongoreplay/filter.go b/src/mongo/gotools/mongoreplay/filter.go
index 8ba38e2de7f..de8927a50c4 100644
--- a/src/mongo/gotools/mongoreplay/filter.go
+++ b/src/mongo/gotools/mongoreplay/filter.go
@@ -21,13 +21,34 @@ type FilterCommand struct {
OutFile string `description:"path to the output file to write to" short:"o" long:"outputFile"`
SplitFilePrefix string `description:"prefix file name to use for the output files being written when splitting traffic" long:"outfilePrefix"`
StartTime string `description:"ISO 8601 timestamp to remove all operations before" long:"startAt"`
+ Duration string `description:"truncate the end of the file after a certain duration from the time of the first seen operation" long:"duration"`
Split int `description:"split the traffic into n files with roughly equal numbers of connecitons in each" default:"1" long:"split"`
RemoveDriverOps bool `description:"remove driver issued operations from the playback" long:"removeDriverOps"`
Gzip bool `long:"gzip" description:"decompress gzipped input"`
+ duration time.Duration
startTime time.Time
}
+type skipConfig struct {
+ firstOpTime, lastOpTime *time.Time
+ truncateDuration *time.Duration
+ removeDriverOps bool
+}
+
+func newSkipConfig(removeDriverOps bool, startTime time.Time, truncateDuration time.Duration) *skipConfig {
+ skipConf := &skipConfig{
+ removeDriverOps: removeDriverOps,
+ }
+ if !startTime.IsZero() {
+ skipConf.firstOpTime = &startTime
+ }
+ if truncateDuration.Nanoseconds() != 0 {
+ skipConf.truncateDuration = &truncateDuration
+ }
+ return skipConf
+}
+
// Execute runs the program for the 'filter' subcommand
func (filter *FilterCommand) Execute(args []string) error {
err := filter.ValidateParams(args)
@@ -65,7 +86,9 @@ func (filter *FilterCommand) Execute(args []string) error {
}
}
- if err := Filter(opChan, outfiles, filter.RemoveDriverOps, filter.startTime); err != nil {
+ skipConf := newSkipConfig(filter.RemoveDriverOps, filter.startTime, filter.duration)
+
+ if err := Filter(opChan, outfiles, skipConf); err != nil {
userInfoLogger.Logvf(Always, "Filter: %v\n", err)
}
@@ -79,8 +102,7 @@ func (filter *FilterCommand) Execute(args []string) error {
func Filter(opChan <-chan *RecordedOp,
outfiles []*PlaybackFileWriter,
- removeDriverOps bool,
- truncateTime time.Time) error {
+ skipConf *skipConfig) error {
opWriters := make([]chan<- *RecordedOp, len(outfiles))
errChan := make(chan error)
@@ -89,26 +111,19 @@ func Filter(opChan <-chan *RecordedOp,
for i := range outfiles {
opWriters[i] = newParallelPlaybackWriter(outfiles[i], errChan, wg)
}
+
for op := range opChan {
- // if specified, bypass driver operations
- if removeDriverOps {
- parsedOp, err := op.RawOp.Parse()
- if err != nil {
- return err
- }
- if IsDriverOp(parsedOp) {
- continue
- }
+ shouldSkip, err := skipConf.shouldFilterOp(op)
+ if err != nil {
+ return err
}
- // if specified, ignore ops before the given timestamp
- // if truncateTime not specified, it will be time zero and all
- // operation times will be greater than it
- if op.Seen.Time.Before(truncateTime) {
+ if shouldSkip {
continue
}
fileNum := op.SeenConnectionNum % int64(len(outfiles))
opWriters[fileNum] <- op
}
+
for _, opWriter := range opWriters {
close(opWriter)
}
@@ -175,5 +190,43 @@ func (filter *FilterCommand) ValidateParams(args []string) error {
}
filter.startTime = t
}
+
+ if filter.Duration != "" {
+ d, err := time.ParseDuration(filter.Duration)
+ if err != nil {
+ return fmt.Errorf("error parsing duration argument: %v", err)
+ }
+ filter.duration = d
+ }
+
return nil
}
+
+func (sc *skipConfig) shouldFilterOp(op *RecordedOp) (bool, error) {
+ // Skip ops until the target first time if specified
+ if sc.firstOpTime != nil && op.Seen.Before(*sc.firstOpTime) {
+ return true, nil
+ }
+
+ // Initialize target last op time based on first op kept after initial truncation
+ if sc.lastOpTime == nil && sc.truncateDuration != nil {
+ lastOpTime := op.Seen.Add(*sc.truncateDuration)
+ sc.lastOpTime = &lastOpTime
+ }
+
+ // Skip ops after a target last time if specified
+ if sc.lastOpTime != nil && op.Seen.After(*sc.lastOpTime) {
+ return true, nil
+ }
+
+ // Check if driver op
+ if sc.removeDriverOps {
+ parsedOp, err := op.RawOp.Parse()
+ if err != nil {
+ return true, err
+ }
+ return IsDriverOp(parsedOp), nil
+ }
+
+ return false, nil
+}