diff options
author | Kyle Erf <erf@mongodb.com> | 2014-10-16 15:27:27 -0400 |
---|---|---|
committer | Kyle Erf <erf@mongodb.com> | 2014-10-21 14:33:07 -0400 |
commit | 4a7aaad331daf81acf0b02de6c9b3a3b49320fd4 (patch) | |
tree | 5359630febe9eafd3b1e6b454ffee50be2b2d141 | |
parent | 71450fca9cf21b7bdf233341e37623958f37fc87 (diff) | |
download | mongo-4a7aaad331daf81acf0b02de6c9b3a3b49320fd4.tar.gz |
TOOLS-282 Use bulk inserts in mongorestore
Former-commit-id: fddf3298588ff264c7b3e7062651b7955246124c
-rw-r--r-- | common/db/buffered_bulk.go | 82 | ||||
-rw-r--r-- | common/db/buffered_bulk_test.go | 104 | ||||
-rw-r--r-- | mongorestore/options/options.go | 6 | ||||
-rw-r--r-- | mongorestore/restore.go | 91 |
4 files changed, 270 insertions, 13 deletions
diff --git a/common/db/buffered_bulk.go b/common/db/buffered_bulk.go new file mode 100644 index 00000000000..547adb05161 --- /dev/null +++ b/common/db/buffered_bulk.go @@ -0,0 +1,82 @@ +package db + +import ( + "fmt" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" +) + +const ( + MaxMessageSize = 2 * 16 * 1024 * 1024 // max size the drivers allow +) + +// BufferedBulkInserter implements a bufio.Writer-like design for queuing up +// documents and inserting them in bulk when the given doc limit (or max +// message size) is reached. Must be flushed at the end to ensure that all +// documents are written. +type BufferedBulkInserter struct { + bulk *mgo.Bulk + collection *mgo.Collection + continueOnError bool + docLimit int + + byteCount int + docCount int +} + +// NewBufferedBulkInserter returns an initialized BufferedBulkInserter +// for writing. +func NewBufferedBulkInserter(collection *mgo.Collection, docLimit int, + continueOnError bool) *BufferedBulkInserter { + + bb := &BufferedBulkInserter{ + collection: collection, + continueOnError: continueOnError, + docLimit: docLimit, + } + bb.resetBulk() + return bb +} + +// throw away the old bulk and init a new one +func (bb *BufferedBulkInserter) resetBulk() { + bb.bulk = bb.collection.Bulk() + if bb.continueOnError { + bb.bulk.Unordered() + } + bb.byteCount = 0 + bb.docCount = 0 +} + +// Insert buffers a document for bulk insertion. If the buffer is full, the bulk +// insert is made, returning any errors that occur. +func (bb *BufferedBulkInserter) Insert(doc interface{}) error { + rawBytes, err := bson.Marshal(doc) + if err != nil { + return fmt.Errorf("bson encoding error: %v", err) + } + // flush if we are full + if bb.docCount >= bb.docLimit || bb.byteCount+len(rawBytes) > MaxMessageSize { + if err := bb.Flush(); err != nil { + return fmt.Errorf("error writing bulk insert: %v", err) + } + } + // buffer the document + bb.docCount++ + bb.byteCount += len(rawBytes) + bb.bulk.Insert(bson.Raw{Data: rawBytes}) + return nil +} + +// Flush sends all buffered documents in one bulk insert +// then resets the bulk buffer +func (bb *BufferedBulkInserter) Flush() error { + if bb.docCount == 0 { + return nil + } + if _, err := bb.bulk.Run(); err != nil { + return err + } + bb.resetBulk() + return nil +} diff --git a/common/db/buffered_bulk_test.go b/common/db/buffered_bulk_test.go new file mode 100644 index 00000000000..788315fd9d7 --- /dev/null +++ b/common/db/buffered_bulk_test.go @@ -0,0 +1,104 @@ +package db + +import ( + "github.com/mongodb/mongo-tools/common/options" + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "gopkg.in/mgo.v2/bson" + "testing" +) + +func TestBufferedBulkInserterInserts(t *testing.T) { + var bufBulk *BufferedBulkInserter + + testutil.VerifyTestType(t, "db") + + Convey("With a valid session", t, func() { + opts := options.ToolOptions{ + Connection: &options.Connection{}, + SSL: &options.SSL{}, + Auth: &options.Auth{}, + } + provider, err := InitSessionProvider(opts) + session, err := provider.GetSession() + So(session, ShouldNotBeNil) + So(err, ShouldBeNil) + + Convey("using a test collection and a doc limit of 3", func() { + testCol := session.DB("tools-test").C("bulk1") + bufBulk = NewBufferedBulkInserter(testCol, 3, false) + So(bufBulk, ShouldNotBeNil) + + Convey("inserting 10 documents into the BufferedBulkInserter", func() { + flushCount := 0 + for i := 0; i < 10; i++ { + So(bufBulk.Insert(bson.D{}), ShouldBeNil) + if bufBulk.docCount%3 == 0 { + flushCount++ + } + } + + Convey("should have flushed 3 times with one doc still buffered", func() { + So(flushCount, ShouldEqual, 3) + So(bufBulk.byteCount, ShouldBeGreaterThan, 0) + So(bufBulk.docCount, ShouldEqual, 1) + }) + }) + }) + + Convey("using a test collection and a doc limit of 1", func() { + testCol := session.DB("tools-test").C("bulk2") + bufBulk = NewBufferedBulkInserter(testCol, 1, false) + So(bufBulk, ShouldNotBeNil) + + Convey("inserting 10 documents into the BufferedBulkInserter and flushing", func() { + for i := 0; i < 10; i++ { + So(bufBulk.Insert(bson.D{}), ShouldBeNil) + } + So(bufBulk.Flush(), ShouldBeNil) + + Convey("should have no docs buffered", func() { + So(bufBulk.docCount, ShouldEqual, 0) + So(bufBulk.byteCount, ShouldEqual, 0) + }) + }) + }) + + Convey("using a test collection and a doc limit of 1000", func() { + testCol := session.DB("tools-test").C("bulk3") + bufBulk = NewBufferedBulkInserter(testCol, 100, false) + So(bufBulk, ShouldNotBeNil) + + Convey("inserting 1,000,000 documents into the BufferedBulkInserter and flushing", func() { + for i := 0; i < 1000000; i++ { + bufBulk.Insert(bson.M{"_id": i}) + } + So(bufBulk.Flush(), ShouldBeNil) + + Convey("should have inserted all of the documents", func() { + count, err := testCol.Count() + So(err, ShouldBeNil) + So(count, ShouldEqual, 1000000) + + // test values + testDoc := bson.M{} + err = testCol.Find(bson.M{"_id": 477232}).One(&testDoc) + So(err, ShouldBeNil) + So(testDoc["_id"], ShouldEqual, 477232) + err = testCol.Find(bson.M{"_id": 999999}).One(&testDoc) + So(err, ShouldBeNil) + So(testDoc["_id"], ShouldEqual, 999999) + err = testCol.Find(bson.M{"_id": 1}).One(&testDoc) + So(err, ShouldBeNil) + So(testDoc["_id"], ShouldEqual, 1) + + }) + }) + }) + + Reset(func() { + session.DB("tools-test").DropDatabase() + }) + }) + +} diff --git a/mongorestore/options/options.go b/mongorestore/options/options.go index e121a54b6c3..4ca9b277e50 100644 --- a/mongorestore/options/options.go +++ b/mongorestore/options/options.go @@ -19,7 +19,11 @@ type OutputOptions struct { NoIndexRestore bool `long:"noIndexRestore" description:"Don't restore indexes"` NoOptionsRestore bool `long:"noOptionsRestore" description:"Don't restore options"` KeepIndexVersion bool `long:"keepIndexVersion" description:"Don't update index version"` - JobThreads int `long:"jobThreads" short:"j" description:"TODO"` + + JobThreads int `long:"numProcessingThreads" short:"j" description:"Number of collections to restore in parallel"` + BulkWriters int `long:"numIngestionThreads" description:"Number of insert connections per collection"` + BulkBufferSize int `long:"batchSize" description:"Buffer size, in bytes, of each bulk buffer"` + PreserveDocOrder bool `long:"preserveOrder" description:"Preserve order of documents during restoration"` } func (self *OutputOptions) Name() string { diff --git a/mongorestore/restore.go b/mongorestore/restore.go index 6c313ff6cad..ff4cb8d0ef2 100644 --- a/mongorestore/restore.go +++ b/mongorestore/restore.go @@ -204,9 +204,11 @@ func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string, collection := session.DB(dbName).C(colName) - //progress bar handler + // progress bar handlers bytesRead := 0 + bar := progress.ProgressBar{ + Name: fmt.Sprintf("%v.%v", dbName, colName), Max: int(fileSize), CounterPtr: &bytesRead, WaitTime: 3 * time.Second, @@ -216,23 +218,88 @@ func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string, bar.Start() defer bar.Stop() - //TODO use a goroutine here - doc := &bson.Raw{} - for bsonSource.Next(doc) { - bytesRead += len(doc.Data) - if restore.objCheck { - //TODO encapsulate to reuse bson obj?? - err := bson.Unmarshal(doc.Data, &bson.D{}) - if err != nil { - return err - break + MaxInsertThreads := restore.OutputOptions.BulkWriters + if restore.OutputOptions.PreserveDocOrder { + MaxInsertThreads = 1 + } + if MaxInsertThreads == 0 { + MaxInsertThreads = 4 //default + } + BulkBufferSize := restore.OutputOptions.BulkBufferSize + if BulkBufferSize == 0 { + BulkBufferSize = 2048 + } + docChan := make(chan bson.Raw, BulkBufferSize*MaxInsertThreads) + resultChan := make(chan error, MaxInsertThreads) + killChan := make(chan struct{}) + + // start a goroutine for adding up the number of bytes read + bytesReadChan := make(chan int, MaxInsertThreads*16) + go func() { + for { + size, alive := <-bytesReadChan + if !alive { + return } + bytesRead += size + } + }() + defer close(bytesReadChan) + + go func() { + doc := bson.Raw{} + for bsonSource.Next(&doc) { + rawBytes := make([]byte, len(doc.Data)) + copy(rawBytes, doc.Data) + docChan <- bson.Raw{Data: rawBytes} } - err := collection.Insert(doc) + close(docChan) + }() + + for i := 0; i < MaxInsertThreads; i++ { + go func() { + bulk := db.NewBufferedBulkInserter(collection, BulkBufferSize, false) + for { + select { + case rawDoc, alive := <-docChan: + if !alive { + resultChan <- bulk.Flush() + return + } + if restore.objCheck { + //TODO encapsulate to reuse bson obj?? + err := bson.Unmarshal(rawDoc.Data, &bson.D{}) + if err != nil { + resultChan <- fmt.Errorf("invalid object: %v", err) + return + } + } + err := bulk.Insert(rawDoc) + if err != nil { + resultChan <- err + return + } + bytesReadChan <- len(rawDoc.Data) + case <-killChan: + return + } + } + }() + + // sleep to prevent all threads from inserting at the same time at start + time.Sleep(time.Duration(i) * 10 * time.Millisecond) //FIXME magic numbers + } + + // wait until all insert jobs finish + for done := 0; done < MaxInsertThreads; done++ { + err := <-resultChan if err != nil { + close(killChan) + //TODO actually wait for things to end? return err } } + // final error check if err = bsonSource.Err(); err != nil { return err } |