summaryrefslogtreecommitdiff
path: root/mongorestore/restore.go
diff options
context:
space:
mode:
Diffstat (limited to 'mongorestore/restore.go')
-rw-r--r--mongorestore/restore.go91
1 files changed, 79 insertions, 12 deletions
diff --git a/mongorestore/restore.go b/mongorestore/restore.go
index 6c313ff6cad..ff4cb8d0ef2 100644
--- a/mongorestore/restore.go
+++ b/mongorestore/restore.go
@@ -204,9 +204,11 @@ func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string,
collection := session.DB(dbName).C(colName)
- //progress bar handler
+ // progress bar handlers
bytesRead := 0
+
bar := progress.ProgressBar{
+ Name: fmt.Sprintf("%v.%v", dbName, colName),
Max: int(fileSize),
CounterPtr: &bytesRead,
WaitTime: 3 * time.Second,
@@ -216,23 +218,88 @@ func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string,
bar.Start()
defer bar.Stop()
- //TODO use a goroutine here
- doc := &bson.Raw{}
- for bsonSource.Next(doc) {
- bytesRead += len(doc.Data)
- if restore.objCheck {
- //TODO encapsulate to reuse bson obj??
- err := bson.Unmarshal(doc.Data, &bson.D{})
- if err != nil {
- return err
- break
+ MaxInsertThreads := restore.OutputOptions.BulkWriters
+ if restore.OutputOptions.PreserveDocOrder {
+ MaxInsertThreads = 1
+ }
+ if MaxInsertThreads == 0 {
+ MaxInsertThreads = 4 //default
+ }
+ BulkBufferSize := restore.OutputOptions.BulkBufferSize
+ if BulkBufferSize == 0 {
+ BulkBufferSize = 2048
+ }
+ docChan := make(chan bson.Raw, BulkBufferSize*MaxInsertThreads)
+ resultChan := make(chan error, MaxInsertThreads)
+ killChan := make(chan struct{})
+
+ // start a goroutine for adding up the number of bytes read
+ bytesReadChan := make(chan int, MaxInsertThreads*16)
+ go func() {
+ for {
+ size, alive := <-bytesReadChan
+ if !alive {
+ return
}
+ bytesRead += size
+ }
+ }()
+ defer close(bytesReadChan)
+
+ go func() {
+ doc := bson.Raw{}
+ for bsonSource.Next(&doc) {
+ rawBytes := make([]byte, len(doc.Data))
+ copy(rawBytes, doc.Data)
+ docChan <- bson.Raw{Data: rawBytes}
}
- err := collection.Insert(doc)
+ close(docChan)
+ }()
+
+ for i := 0; i < MaxInsertThreads; i++ {
+ go func() {
+ bulk := db.NewBufferedBulkInserter(collection, BulkBufferSize, false)
+ for {
+ select {
+ case rawDoc, alive := <-docChan:
+ if !alive {
+ resultChan <- bulk.Flush()
+ return
+ }
+ if restore.objCheck {
+ //TODO encapsulate to reuse bson obj??
+ err := bson.Unmarshal(rawDoc.Data, &bson.D{})
+ if err != nil {
+ resultChan <- fmt.Errorf("invalid object: %v", err)
+ return
+ }
+ }
+ err := bulk.Insert(rawDoc)
+ if err != nil {
+ resultChan <- err
+ return
+ }
+ bytesReadChan <- len(rawDoc.Data)
+ case <-killChan:
+ return
+ }
+ }
+ }()
+
+ // sleep to prevent all threads from inserting at the same time at start
+ time.Sleep(time.Duration(i) * 10 * time.Millisecond) //FIXME magic numbers
+ }
+
+ // wait until all insert jobs finish
+ for done := 0; done < MaxInsertThreads; done++ {
+ err := <-resultChan
if err != nil {
+ close(killChan)
+ //TODO actually wait for things to end?
return err
}
}
+ // final error check
if err = bsonSource.Err(); err != nil {
return err
}