diff options
author | Kyle Erf <erf@mongodb.com> | 2014-11-03 14:31:19 -0500 |
---|---|---|
committer | Kyle Erf <erf@mongodb.com> | 2014-11-04 10:54:20 -0500 |
commit | 65d742d5f97a0c05a0a6871a61416bfbe4dafcd7 (patch) | |
tree | f719381c0e213ef9ec168f6961ba2c22a6587339 | |
parent | 5132aab1afb99b285b541deded2de41bc3997dab (diff) | |
download | mongo-65d742d5f97a0c05a0a6871a61416bfbe4dafcd7.tar.gz |
TOOLS-307 implement --oplogLimit in mongorestore
Former-commit-id: 7a74946c2b2b0f8e286ccef8a2858efb6f45529a
-rw-r--r-- | mongorestore/mongorestore.go | 13 | ||||
-rw-r--r-- | mongorestore/oplog.go | 76 | ||||
-rw-r--r-- | mongorestore/oplog_test.go | 113 |
3 files changed, 191 insertions, 11 deletions
diff --git a/mongorestore/mongorestore.go b/mongorestore/mongorestore.go index 515457f582c..f44332f378b 100644 --- a/mongorestore/mongorestore.go +++ b/mongorestore/mongorestore.go @@ -9,6 +9,7 @@ import ( "github.com/mongodb/mongo-tools/common/progress" "github.com/mongodb/mongo-tools/mongorestore/options" "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" ) type MongoRestore struct { @@ -28,6 +29,7 @@ type MongoRestore struct { safety *mgo.Safe progressManager *progress.Manager objCheck bool + oplogLimit bson.MongoTimestamp } func (restore *MongoRestore) ParseAndValidateOptions() error { @@ -53,6 +55,17 @@ func (restore *MongoRestore) ParseAndValidateOptions() error { return fmt.Errorf("cannot dump a collection without a specified database") } + if restore.InputOptions.OplogLimit != "" { + if !restore.InputOptions.OplogReplay { + return fmt.Errorf("cannot use --oplogLimit without --oplogReplay enabled") + } + var err error + restore.oplogLimit, err = ParseTimestampFlag(restore.InputOptions.OplogLimit) + if err != nil { + return fmt.Errorf("error parsing timestamp argument to --oplogLimit: %v", err) //TODO help text? + } + } + if restore.OutputOptions.WriteConcern > 0 { restore.safety = &mgo.Safe{W: restore.OutputOptions.WriteConcern} //TODO, audit extra steps log.Logf(log.DebugHigh, "\tdumping with w=%v", restore.safety.W) diff --git a/mongorestore/oplog.go b/mongorestore/oplog.go index 0247b0ad3a5..eac84c02900 100644 --- a/mongorestore/oplog.go +++ b/mongorestore/oplog.go @@ -9,20 +9,21 @@ import ( "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "os" + "strconv" + "strings" "time" ) const OplogMaxCommandSize = 1024 * 1024 * 16.5 -//TODO move this to common if any of the other tools need it type Oplog struct { - Timestamp bson.MongoTimestamp `bson:"ts"` - HistoryID int64 `bson:"h"` - Version int `bson:"v"` - Operation string `bson:"op"` - Namespace string `bson:"ns"` - Object bson.M `bson:"o"` - Query bson.M `bson:"o2"` + Timestamp bson.MongoTimestamp `bson:"ts,omitempty"` + HistoryID int64 `bson:"h,omitempty"` + Version int `bson:"v,omitempty"` + Operation string `bson:"op,omitempty"` + Namespace string `bson:"ns,omitempty"` + Object bson.M `bson:"o,omitempty"` + Query bson.M `bson:"o2,omitempty"` } func (restore *MongoRestore) RestoreOplog() error { @@ -53,6 +54,7 @@ func (restore *MongoRestore) RestoreOplog() error { var totalBytes, entrySize, bufferedBytes int bar := progress.ProgressBar{ + Name: "oplog", Max: int(size), CounterPtr: &totalBytes, WaitTime: 3 * time.Second, @@ -83,15 +85,24 @@ func (restore *MongoRestore) RestoreOplog() error { bufferedBytes = 0 } - entryAsD := bson.D{} - err = bson.Unmarshal(rawOplogEntry.Data, &entryAsD) + entryAsOplog := Oplog{} + err = bson.Unmarshal(rawOplogEntry.Data, &entryAsOplog) if err != nil { return fmt.Errorf("error reading oplog: %v", err) } + if !restore.TimestampBeforeLimit(entryAsOplog.Timestamp) { + log.Logf( + log.DebugLow, + "timestamp %v is not below limit of %v; ending oplog restoration", + entryAsOplog.Timestamp, + restore.oplogLimit, + ) + break + } bufferedBytes += entrySize totalBytes += entrySize - entryArray = append(entryArray, entryAsD) + entryArray = append(entryArray, entryAsOplog) } // finally, flush the remaining entries if len(entryArray) > 0 { @@ -119,3 +130,46 @@ func (restore *MongoRestore) ApplyOps(session *mgo.Session, entries []interface{ return nil } + +// OplogTimestampIsValid returns whether the given timestamp is allowed to be +// applied to mongorestore's target database. +func (restore *MongoRestore) TimestampBeforeLimit(ts bson.MongoTimestamp) bool { + if restore.oplogLimit == 0 { + // always valid if there is no --oplogLimit set + return true + } + return ts < restore.oplogLimit +} + +// ParseTimestampFlag takes in a string the form of <time_t>:<ordinal>, +// where <time_t> is the seconds since the UNIX epoch, and <ordinal> represents +// a counter of operations in the oplog that occurred in the specified second. +// It parses this timestamp string and returns a bson.MongoTimestamp type. +func ParseTimestampFlag(ts string) (bson.MongoTimestamp, error) { + var seconds, increment int + timestampFields := strings.Split(ts, ":") + if len(timestampFields) > 2 { + return 0, fmt.Errorf("too many : characters") + } + + seconds, err := strconv.Atoi(timestampFields[0]) + if err != nil { + return 0, fmt.Errorf("error parsing timestamp seconds: %v", err) + } + + // parse the increment field if it exists + if len(timestampFields) == 2 { + if len(timestampFields[1]) > 0 { + increment, err = strconv.Atoi(timestampFields[1]) + if err != nil { + return 0, fmt.Errorf("error parsing timestamp increment: %v", err) + } + } else { + // handle the case where the user writes "<time_t>:" with no ordinal + increment = 0 + } + } + + timestamp := (int64(seconds) << 32) | int64(increment) + return bson.MongoTimestamp(timestamp), nil +} diff --git a/mongorestore/oplog_test.go b/mongorestore/oplog_test.go new file mode 100644 index 00000000000..10a320ec96d --- /dev/null +++ b/mongorestore/oplog_test.go @@ -0,0 +1,113 @@ +package mongorestore + +import ( + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "gopkg.in/mgo.v2/bson" + "testing" +) + +func TestTimestampStringParsing(t *testing.T) { + + testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE) + + Convey("Testing some possible timestamp strings:", t, func() { + Convey("123:456 [should pass]", func() { + ts, err := ParseTimestampFlag("123:456") + So(err, ShouldBeNil) + So(ts, ShouldEqual, (int64(123)<<32 | int64(456))) + }) + + Convey("123 [should pass]", func() { + ts, err := ParseTimestampFlag("123") + So(err, ShouldBeNil) + So(ts, ShouldEqual, int64(123)<<32) + }) + + Convey("123: [should pass]", func() { + ts, err := ParseTimestampFlag("123:") + So(err, ShouldBeNil) + So(ts, ShouldEqual, int64(123)<<32) + }) + + Convey("123.123 [should fail]", func() { + ts, err := ParseTimestampFlag("123.123") + So(err, ShouldNotBeNil) + So(ts, ShouldEqual, 0) + }) + + Convey(": [should fail]", func() { + ts, err := ParseTimestampFlag(":") + So(err, ShouldNotBeNil) + So(ts, ShouldEqual, 0) + }) + + Convey("1:1:1 [should fail]", func() { + ts, err := ParseTimestampFlag("1:1:1") + So(err, ShouldNotBeNil) + So(ts, ShouldEqual, 0) + }) + + Convey("cats [should fail]", func() { + ts, err := ParseTimestampFlag("cats") + So(err, ShouldNotBeNil) + So(ts, ShouldEqual, 0) + }) + + Convey("[empty string] [should fail]", func() { + ts, err := ParseTimestampFlag("") + So(err, ShouldNotBeNil) + So(ts, ShouldEqual, 0) + }) + }) +} + +func TestValidOplogLimitChecking(t *testing.T) { + + Convey("With a MongoRestore instance with oplogLimit of 5:0", t, func() { + mr := &MongoRestore{ + oplogLimit: bson.MongoTimestamp(int64(5) << 32), + } + + Convey("an oplog entry with ts=1000:0 should be invalid", func() { + So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(1000)<<32)), ShouldBeFalse) + }) + + Convey("an oplog entry with ts=5:1 should be invalid", func() { + So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(5)<<32|1)), ShouldBeFalse) + }) + + Convey("an oplog entry with ts=5:0 should be invalid", func() { + So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(5)<<32)), ShouldBeFalse) + }) + + Convey("an oplog entry with ts=4:9 should be valid", func() { + So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(4)<<32|9)), ShouldBeTrue) + }) + + Convey("an oplog entry with ts=4:0 should be valid", func() { + So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(4)<<32)), ShouldBeTrue) + }) + + Convey("an oplog entry with ts=0:1 should be valid", func() { + So(mr.TimestampBeforeLimit(bson.MongoTimestamp(1)), ShouldBeTrue) + }) + }) + + Convey("With a MongoRestore instance with no oplogLimit", t, func() { + mr := &MongoRestore{} + + Convey("an oplog entry with ts=1000:0 should be valid", func() { + So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(1000)<<32)), ShouldBeTrue) + }) + + Convey("an oplog entry with ts=5:1 should be valid", func() { + So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(5)<<32|1)), ShouldBeTrue) + }) + + Convey("an oplog entry with ts=5:0 should be valid", func() { + So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(5)<<32)), ShouldBeTrue) + }) + }) + +} |