summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyle Erf <erf@mongodb.com>2014-11-03 14:31:19 -0500
committerKyle Erf <erf@mongodb.com>2014-11-04 10:54:20 -0500
commit65d742d5f97a0c05a0a6871a61416bfbe4dafcd7 (patch)
treef719381c0e213ef9ec168f6961ba2c22a6587339
parent5132aab1afb99b285b541deded2de41bc3997dab (diff)
downloadmongo-65d742d5f97a0c05a0a6871a61416bfbe4dafcd7.tar.gz
TOOLS-307 implement --oplogLimit in mongorestore
Former-commit-id: 7a74946c2b2b0f8e286ccef8a2858efb6f45529a
-rw-r--r--mongorestore/mongorestore.go13
-rw-r--r--mongorestore/oplog.go76
-rw-r--r--mongorestore/oplog_test.go113
3 files changed, 191 insertions, 11 deletions
diff --git a/mongorestore/mongorestore.go b/mongorestore/mongorestore.go
index 515457f582c..f44332f378b 100644
--- a/mongorestore/mongorestore.go
+++ b/mongorestore/mongorestore.go
@@ -9,6 +9,7 @@ import (
"github.com/mongodb/mongo-tools/common/progress"
"github.com/mongodb/mongo-tools/mongorestore/options"
"gopkg.in/mgo.v2"
+ "gopkg.in/mgo.v2/bson"
)
type MongoRestore struct {
@@ -28,6 +29,7 @@ type MongoRestore struct {
safety *mgo.Safe
progressManager *progress.Manager
objCheck bool
+ oplogLimit bson.MongoTimestamp
}
func (restore *MongoRestore) ParseAndValidateOptions() error {
@@ -53,6 +55,17 @@ func (restore *MongoRestore) ParseAndValidateOptions() error {
return fmt.Errorf("cannot dump a collection without a specified database")
}
+ if restore.InputOptions.OplogLimit != "" {
+ if !restore.InputOptions.OplogReplay {
+ return fmt.Errorf("cannot use --oplogLimit without --oplogReplay enabled")
+ }
+ var err error
+ restore.oplogLimit, err = ParseTimestampFlag(restore.InputOptions.OplogLimit)
+ if err != nil {
+ return fmt.Errorf("error parsing timestamp argument to --oplogLimit: %v", err) //TODO help text?
+ }
+ }
+
if restore.OutputOptions.WriteConcern > 0 {
restore.safety = &mgo.Safe{W: restore.OutputOptions.WriteConcern} //TODO, audit extra steps
log.Logf(log.DebugHigh, "\tdumping with w=%v", restore.safety.W)
diff --git a/mongorestore/oplog.go b/mongorestore/oplog.go
index 0247b0ad3a5..eac84c02900 100644
--- a/mongorestore/oplog.go
+++ b/mongorestore/oplog.go
@@ -9,20 +9,21 @@ import (
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"os"
+ "strconv"
+ "strings"
"time"
)
const OplogMaxCommandSize = 1024 * 1024 * 16.5
-//TODO move this to common if any of the other tools need it
type Oplog struct {
- Timestamp bson.MongoTimestamp `bson:"ts"`
- HistoryID int64 `bson:"h"`
- Version int `bson:"v"`
- Operation string `bson:"op"`
- Namespace string `bson:"ns"`
- Object bson.M `bson:"o"`
- Query bson.M `bson:"o2"`
+ Timestamp bson.MongoTimestamp `bson:"ts,omitempty"`
+ HistoryID int64 `bson:"h,omitempty"`
+ Version int `bson:"v,omitempty"`
+ Operation string `bson:"op,omitempty"`
+ Namespace string `bson:"ns,omitempty"`
+ Object bson.M `bson:"o,omitempty"`
+ Query bson.M `bson:"o2,omitempty"`
}
func (restore *MongoRestore) RestoreOplog() error {
@@ -53,6 +54,7 @@ func (restore *MongoRestore) RestoreOplog() error {
var totalBytes, entrySize, bufferedBytes int
bar := progress.ProgressBar{
+ Name: "oplog",
Max: int(size),
CounterPtr: &totalBytes,
WaitTime: 3 * time.Second,
@@ -83,15 +85,24 @@ func (restore *MongoRestore) RestoreOplog() error {
bufferedBytes = 0
}
- entryAsD := bson.D{}
- err = bson.Unmarshal(rawOplogEntry.Data, &entryAsD)
+ entryAsOplog := Oplog{}
+ err = bson.Unmarshal(rawOplogEntry.Data, &entryAsOplog)
if err != nil {
return fmt.Errorf("error reading oplog: %v", err)
}
+ if !restore.TimestampBeforeLimit(entryAsOplog.Timestamp) {
+ log.Logf(
+ log.DebugLow,
+ "timestamp %v is not below limit of %v; ending oplog restoration",
+ entryAsOplog.Timestamp,
+ restore.oplogLimit,
+ )
+ break
+ }
bufferedBytes += entrySize
totalBytes += entrySize
- entryArray = append(entryArray, entryAsD)
+ entryArray = append(entryArray, entryAsOplog)
}
// finally, flush the remaining entries
if len(entryArray) > 0 {
@@ -119,3 +130,46 @@ func (restore *MongoRestore) ApplyOps(session *mgo.Session, entries []interface{
return nil
}
+
+// OplogTimestampIsValid returns whether the given timestamp is allowed to be
+// applied to mongorestore's target database.
+func (restore *MongoRestore) TimestampBeforeLimit(ts bson.MongoTimestamp) bool {
+ if restore.oplogLimit == 0 {
+ // always valid if there is no --oplogLimit set
+ return true
+ }
+ return ts < restore.oplogLimit
+}
+
+// ParseTimestampFlag takes in a string the form of <time_t>:<ordinal>,
+// where <time_t> is the seconds since the UNIX epoch, and <ordinal> represents
+// a counter of operations in the oplog that occurred in the specified second.
+// It parses this timestamp string and returns a bson.MongoTimestamp type.
+func ParseTimestampFlag(ts string) (bson.MongoTimestamp, error) {
+ var seconds, increment int
+ timestampFields := strings.Split(ts, ":")
+ if len(timestampFields) > 2 {
+ return 0, fmt.Errorf("too many : characters")
+ }
+
+ seconds, err := strconv.Atoi(timestampFields[0])
+ if err != nil {
+ return 0, fmt.Errorf("error parsing timestamp seconds: %v", err)
+ }
+
+ // parse the increment field if it exists
+ if len(timestampFields) == 2 {
+ if len(timestampFields[1]) > 0 {
+ increment, err = strconv.Atoi(timestampFields[1])
+ if err != nil {
+ return 0, fmt.Errorf("error parsing timestamp increment: %v", err)
+ }
+ } else {
+ // handle the case where the user writes "<time_t>:" with no ordinal
+ increment = 0
+ }
+ }
+
+ timestamp := (int64(seconds) << 32) | int64(increment)
+ return bson.MongoTimestamp(timestamp), nil
+}
diff --git a/mongorestore/oplog_test.go b/mongorestore/oplog_test.go
new file mode 100644
index 00000000000..10a320ec96d
--- /dev/null
+++ b/mongorestore/oplog_test.go
@@ -0,0 +1,113 @@
+package mongorestore
+
+import (
+ "github.com/mongodb/mongo-tools/common/testutil"
+ . "github.com/smartystreets/goconvey/convey"
+ "gopkg.in/mgo.v2/bson"
+ "testing"
+)
+
+func TestTimestampStringParsing(t *testing.T) {
+
+ testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE)
+
+ Convey("Testing some possible timestamp strings:", t, func() {
+ Convey("123:456 [should pass]", func() {
+ ts, err := ParseTimestampFlag("123:456")
+ So(err, ShouldBeNil)
+ So(ts, ShouldEqual, (int64(123)<<32 | int64(456)))
+ })
+
+ Convey("123 [should pass]", func() {
+ ts, err := ParseTimestampFlag("123")
+ So(err, ShouldBeNil)
+ So(ts, ShouldEqual, int64(123)<<32)
+ })
+
+ Convey("123: [should pass]", func() {
+ ts, err := ParseTimestampFlag("123:")
+ So(err, ShouldBeNil)
+ So(ts, ShouldEqual, int64(123)<<32)
+ })
+
+ Convey("123.123 [should fail]", func() {
+ ts, err := ParseTimestampFlag("123.123")
+ So(err, ShouldNotBeNil)
+ So(ts, ShouldEqual, 0)
+ })
+
+ Convey(": [should fail]", func() {
+ ts, err := ParseTimestampFlag(":")
+ So(err, ShouldNotBeNil)
+ So(ts, ShouldEqual, 0)
+ })
+
+ Convey("1:1:1 [should fail]", func() {
+ ts, err := ParseTimestampFlag("1:1:1")
+ So(err, ShouldNotBeNil)
+ So(ts, ShouldEqual, 0)
+ })
+
+ Convey("cats [should fail]", func() {
+ ts, err := ParseTimestampFlag("cats")
+ So(err, ShouldNotBeNil)
+ So(ts, ShouldEqual, 0)
+ })
+
+ Convey("[empty string] [should fail]", func() {
+ ts, err := ParseTimestampFlag("")
+ So(err, ShouldNotBeNil)
+ So(ts, ShouldEqual, 0)
+ })
+ })
+}
+
+func TestValidOplogLimitChecking(t *testing.T) {
+
+ Convey("With a MongoRestore instance with oplogLimit of 5:0", t, func() {
+ mr := &MongoRestore{
+ oplogLimit: bson.MongoTimestamp(int64(5) << 32),
+ }
+
+ Convey("an oplog entry with ts=1000:0 should be invalid", func() {
+ So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(1000)<<32)), ShouldBeFalse)
+ })
+
+ Convey("an oplog entry with ts=5:1 should be invalid", func() {
+ So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(5)<<32|1)), ShouldBeFalse)
+ })
+
+ Convey("an oplog entry with ts=5:0 should be invalid", func() {
+ So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(5)<<32)), ShouldBeFalse)
+ })
+
+ Convey("an oplog entry with ts=4:9 should be valid", func() {
+ So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(4)<<32|9)), ShouldBeTrue)
+ })
+
+ Convey("an oplog entry with ts=4:0 should be valid", func() {
+ So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(4)<<32)), ShouldBeTrue)
+ })
+
+ Convey("an oplog entry with ts=0:1 should be valid", func() {
+ So(mr.TimestampBeforeLimit(bson.MongoTimestamp(1)), ShouldBeTrue)
+ })
+ })
+
+ Convey("With a MongoRestore instance with no oplogLimit", t, func() {
+ mr := &MongoRestore{}
+
+ Convey("an oplog entry with ts=1000:0 should be valid", func() {
+ So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(1000)<<32)), ShouldBeTrue)
+ })
+
+ Convey("an oplog entry with ts=5:1 should be valid", func() {
+ So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(5)<<32|1)), ShouldBeTrue)
+ })
+
+ Convey("an oplog entry with ts=5:0 should be valid", func() {
+ So(mr.TimestampBeforeLimit(bson.MongoTimestamp(int64(5)<<32)), ShouldBeTrue)
+ })
+ })
+
+}