summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyle Erf <erf@mongodb.com>2014-10-16 15:27:27 -0400
committerKyle Erf <erf@mongodb.com>2014-10-21 14:33:07 -0400
commit4a7aaad331daf81acf0b02de6c9b3a3b49320fd4 (patch)
tree5359630febe9eafd3b1e6b454ffee50be2b2d141
parent71450fca9cf21b7bdf233341e37623958f37fc87 (diff)
downloadmongo-4a7aaad331daf81acf0b02de6c9b3a3b49320fd4.tar.gz
TOOLS-282 Use bulk inserts in mongorestore
Former-commit-id: fddf3298588ff264c7b3e7062651b7955246124c
-rw-r--r--common/db/buffered_bulk.go82
-rw-r--r--common/db/buffered_bulk_test.go104
-rw-r--r--mongorestore/options/options.go6
-rw-r--r--mongorestore/restore.go91
4 files changed, 270 insertions, 13 deletions
diff --git a/common/db/buffered_bulk.go b/common/db/buffered_bulk.go
new file mode 100644
index 00000000000..547adb05161
--- /dev/null
+++ b/common/db/buffered_bulk.go
@@ -0,0 +1,82 @@
+package db
+
+import (
+ "fmt"
+ "gopkg.in/mgo.v2"
+ "gopkg.in/mgo.v2/bson"
+)
+
+const (
+ MaxMessageSize = 2 * 16 * 1024 * 1024 // max size the drivers allow
+)
+
+// BufferedBulkInserter implements a bufio.Writer-like design for queuing up
+// documents and inserting them in bulk when the given doc limit (or max
+// message size) is reached. Must be flushed at the end to ensure that all
+// documents are written.
+type BufferedBulkInserter struct {
+ bulk *mgo.Bulk
+ collection *mgo.Collection
+ continueOnError bool
+ docLimit int
+
+ byteCount int
+ docCount int
+}
+
+// NewBufferedBulkInserter returns an initialized BufferedBulkInserter
+// for writing.
+func NewBufferedBulkInserter(collection *mgo.Collection, docLimit int,
+ continueOnError bool) *BufferedBulkInserter {
+
+ bb := &BufferedBulkInserter{
+ collection: collection,
+ continueOnError: continueOnError,
+ docLimit: docLimit,
+ }
+ bb.resetBulk()
+ return bb
+}
+
+// throw away the old bulk and init a new one
+func (bb *BufferedBulkInserter) resetBulk() {
+ bb.bulk = bb.collection.Bulk()
+ if bb.continueOnError {
+ bb.bulk.Unordered()
+ }
+ bb.byteCount = 0
+ bb.docCount = 0
+}
+
+// Insert buffers a document for bulk insertion. If the buffer is full, the bulk
+// insert is made, returning any errors that occur.
+func (bb *BufferedBulkInserter) Insert(doc interface{}) error {
+ rawBytes, err := bson.Marshal(doc)
+ if err != nil {
+ return fmt.Errorf("bson encoding error: %v", err)
+ }
+ // flush if we are full
+ if bb.docCount >= bb.docLimit || bb.byteCount+len(rawBytes) > MaxMessageSize {
+ if err := bb.Flush(); err != nil {
+ return fmt.Errorf("error writing bulk insert: %v", err)
+ }
+ }
+ // buffer the document
+ bb.docCount++
+ bb.byteCount += len(rawBytes)
+ bb.bulk.Insert(bson.Raw{Data: rawBytes})
+ return nil
+}
+
+// Flush sends all buffered documents in one bulk insert
+// then resets the bulk buffer
+func (bb *BufferedBulkInserter) Flush() error {
+ if bb.docCount == 0 {
+ return nil
+ }
+ if _, err := bb.bulk.Run(); err != nil {
+ return err
+ }
+ bb.resetBulk()
+ return nil
+}
diff --git a/common/db/buffered_bulk_test.go b/common/db/buffered_bulk_test.go
new file mode 100644
index 00000000000..788315fd9d7
--- /dev/null
+++ b/common/db/buffered_bulk_test.go
@@ -0,0 +1,104 @@
+package db
+
+import (
+ "github.com/mongodb/mongo-tools/common/options"
+ "github.com/mongodb/mongo-tools/common/testutil"
+ . "github.com/smartystreets/goconvey/convey"
+ "gopkg.in/mgo.v2/bson"
+ "testing"
+)
+
+func TestBufferedBulkInserterInserts(t *testing.T) {
+ var bufBulk *BufferedBulkInserter
+
+ testutil.VerifyTestType(t, "db")
+
+ Convey("With a valid session", t, func() {
+ opts := options.ToolOptions{
+ Connection: &options.Connection{},
+ SSL: &options.SSL{},
+ Auth: &options.Auth{},
+ }
+ provider, err := InitSessionProvider(opts)
+ session, err := provider.GetSession()
+ So(session, ShouldNotBeNil)
+ So(err, ShouldBeNil)
+
+ Convey("using a test collection and a doc limit of 3", func() {
+ testCol := session.DB("tools-test").C("bulk1")
+ bufBulk = NewBufferedBulkInserter(testCol, 3, false)
+ So(bufBulk, ShouldNotBeNil)
+
+ Convey("inserting 10 documents into the BufferedBulkInserter", func() {
+ flushCount := 0
+ for i := 0; i < 10; i++ {
+ So(bufBulk.Insert(bson.D{}), ShouldBeNil)
+ if bufBulk.docCount%3 == 0 {
+ flushCount++
+ }
+ }
+
+ Convey("should have flushed 3 times with one doc still buffered", func() {
+ So(flushCount, ShouldEqual, 3)
+ So(bufBulk.byteCount, ShouldBeGreaterThan, 0)
+ So(bufBulk.docCount, ShouldEqual, 1)
+ })
+ })
+ })
+
+ Convey("using a test collection and a doc limit of 1", func() {
+ testCol := session.DB("tools-test").C("bulk2")
+ bufBulk = NewBufferedBulkInserter(testCol, 1, false)
+ So(bufBulk, ShouldNotBeNil)
+
+ Convey("inserting 10 documents into the BufferedBulkInserter and flushing", func() {
+ for i := 0; i < 10; i++ {
+ So(bufBulk.Insert(bson.D{}), ShouldBeNil)
+ }
+ So(bufBulk.Flush(), ShouldBeNil)
+
+ Convey("should have no docs buffered", func() {
+ So(bufBulk.docCount, ShouldEqual, 0)
+ So(bufBulk.byteCount, ShouldEqual, 0)
+ })
+ })
+ })
+
+ Convey("using a test collection and a doc limit of 1000", func() {
+ testCol := session.DB("tools-test").C("bulk3")
+ bufBulk = NewBufferedBulkInserter(testCol, 100, false)
+ So(bufBulk, ShouldNotBeNil)
+
+ Convey("inserting 1,000,000 documents into the BufferedBulkInserter and flushing", func() {
+ for i := 0; i < 1000000; i++ {
+ bufBulk.Insert(bson.M{"_id": i})
+ }
+ So(bufBulk.Flush(), ShouldBeNil)
+
+ Convey("should have inserted all of the documents", func() {
+ count, err := testCol.Count()
+ So(err, ShouldBeNil)
+ So(count, ShouldEqual, 1000000)
+
+ // test values
+ testDoc := bson.M{}
+ err = testCol.Find(bson.M{"_id": 477232}).One(&testDoc)
+ So(err, ShouldBeNil)
+ So(testDoc["_id"], ShouldEqual, 477232)
+ err = testCol.Find(bson.M{"_id": 999999}).One(&testDoc)
+ So(err, ShouldBeNil)
+ So(testDoc["_id"], ShouldEqual, 999999)
+ err = testCol.Find(bson.M{"_id": 1}).One(&testDoc)
+ So(err, ShouldBeNil)
+ So(testDoc["_id"], ShouldEqual, 1)
+
+ })
+ })
+ })
+
+ Reset(func() {
+ session.DB("tools-test").DropDatabase()
+ })
+ })
+
+}
diff --git a/mongorestore/options/options.go b/mongorestore/options/options.go
index e121a54b6c3..4ca9b277e50 100644
--- a/mongorestore/options/options.go
+++ b/mongorestore/options/options.go
@@ -19,7 +19,11 @@ type OutputOptions struct {
NoIndexRestore bool `long:"noIndexRestore" description:"Don't restore indexes"`
NoOptionsRestore bool `long:"noOptionsRestore" description:"Don't restore options"`
KeepIndexVersion bool `long:"keepIndexVersion" description:"Don't update index version"`
- JobThreads int `long:"jobThreads" short:"j" description:"TODO"`
+
+ JobThreads int `long:"numProcessingThreads" short:"j" description:"Number of collections to restore in parallel"`
+ BulkWriters int `long:"numIngestionThreads" description:"Number of insert connections per collection"`
+ BulkBufferSize int `long:"batchSize" description:"Buffer size, in bytes, of each bulk buffer"`
+ PreserveDocOrder bool `long:"preserveOrder" description:"Preserve order of documents during restoration"`
}
func (self *OutputOptions) Name() string {
diff --git a/mongorestore/restore.go b/mongorestore/restore.go
index 6c313ff6cad..ff4cb8d0ef2 100644
--- a/mongorestore/restore.go
+++ b/mongorestore/restore.go
@@ -204,9 +204,11 @@ func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string,
collection := session.DB(dbName).C(colName)
- //progress bar handler
+ // progress bar handlers
bytesRead := 0
+
bar := progress.ProgressBar{
+ Name: fmt.Sprintf("%v.%v", dbName, colName),
Max: int(fileSize),
CounterPtr: &bytesRead,
WaitTime: 3 * time.Second,
@@ -216,23 +218,88 @@ func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string,
bar.Start()
defer bar.Stop()
- //TODO use a goroutine here
- doc := &bson.Raw{}
- for bsonSource.Next(doc) {
- bytesRead += len(doc.Data)
- if restore.objCheck {
- //TODO encapsulate to reuse bson obj??
- err := bson.Unmarshal(doc.Data, &bson.D{})
- if err != nil {
- return err
- break
+ MaxInsertThreads := restore.OutputOptions.BulkWriters
+ if restore.OutputOptions.PreserveDocOrder {
+ MaxInsertThreads = 1
+ }
+ if MaxInsertThreads == 0 {
+ MaxInsertThreads = 4 //default
+ }
+ BulkBufferSize := restore.OutputOptions.BulkBufferSize
+ if BulkBufferSize == 0 {
+ BulkBufferSize = 2048
+ }
+ docChan := make(chan bson.Raw, BulkBufferSize*MaxInsertThreads)
+ resultChan := make(chan error, MaxInsertThreads)
+ killChan := make(chan struct{})
+
+ // start a goroutine for adding up the number of bytes read
+ bytesReadChan := make(chan int, MaxInsertThreads*16)
+ go func() {
+ for {
+ size, alive := <-bytesReadChan
+ if !alive {
+ return
}
+ bytesRead += size
+ }
+ }()
+ defer close(bytesReadChan)
+
+ go func() {
+ doc := bson.Raw{}
+ for bsonSource.Next(&doc) {
+ rawBytes := make([]byte, len(doc.Data))
+ copy(rawBytes, doc.Data)
+ docChan <- bson.Raw{Data: rawBytes}
}
- err := collection.Insert(doc)
+ close(docChan)
+ }()
+
+ for i := 0; i < MaxInsertThreads; i++ {
+ go func() {
+ bulk := db.NewBufferedBulkInserter(collection, BulkBufferSize, false)
+ for {
+ select {
+ case rawDoc, alive := <-docChan:
+ if !alive {
+ resultChan <- bulk.Flush()
+ return
+ }
+ if restore.objCheck {
+ //TODO encapsulate to reuse bson obj??
+ err := bson.Unmarshal(rawDoc.Data, &bson.D{})
+ if err != nil {
+ resultChan <- fmt.Errorf("invalid object: %v", err)
+ return
+ }
+ }
+ err := bulk.Insert(rawDoc)
+ if err != nil {
+ resultChan <- err
+ return
+ }
+ bytesReadChan <- len(rawDoc.Data)
+ case <-killChan:
+ return
+ }
+ }
+ }()
+
+ // sleep to prevent all threads from inserting at the same time at start
+ time.Sleep(time.Duration(i) * 10 * time.Millisecond) //FIXME magic numbers
+ }
+
+ // wait until all insert jobs finish
+ for done := 0; done < MaxInsertThreads; done++ {
+ err := <-resultChan
if err != nil {
+ close(killChan)
+ //TODO actually wait for things to end?
return err
}
}
+ // final error check
if err = bsonSource.Err(); err != nil {
return err
}