diff options
Diffstat (limited to 'mongorestore/restore.go')
-rw-r--r-- | mongorestore/restore.go | 91 |
1 files changed, 79 insertions, 12 deletions
diff --git a/mongorestore/restore.go b/mongorestore/restore.go index 6c313ff6cad..ff4cb8d0ef2 100644 --- a/mongorestore/restore.go +++ b/mongorestore/restore.go @@ -204,9 +204,11 @@ func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string, collection := session.DB(dbName).C(colName) - //progress bar handler + // progress bar handlers bytesRead := 0 + bar := progress.ProgressBar{ + Name: fmt.Sprintf("%v.%v", dbName, colName), Max: int(fileSize), CounterPtr: &bytesRead, WaitTime: 3 * time.Second, @@ -216,23 +218,88 @@ func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string, bar.Start() defer bar.Stop() - //TODO use a goroutine here - doc := &bson.Raw{} - for bsonSource.Next(doc) { - bytesRead += len(doc.Data) - if restore.objCheck { - //TODO encapsulate to reuse bson obj?? - err := bson.Unmarshal(doc.Data, &bson.D{}) - if err != nil { - return err - break + MaxInsertThreads := restore.OutputOptions.BulkWriters + if restore.OutputOptions.PreserveDocOrder { + MaxInsertThreads = 1 + } + if MaxInsertThreads == 0 { + MaxInsertThreads = 4 //default + } + BulkBufferSize := restore.OutputOptions.BulkBufferSize + if BulkBufferSize == 0 { + BulkBufferSize = 2048 + } + docChan := make(chan bson.Raw, BulkBufferSize*MaxInsertThreads) + resultChan := make(chan error, MaxInsertThreads) + killChan := make(chan struct{}) + + // start a goroutine for adding up the number of bytes read + bytesReadChan := make(chan int, MaxInsertThreads*16) + go func() { + for { + size, alive := <-bytesReadChan + if !alive { + return } + bytesRead += size + } + }() + defer close(bytesReadChan) + + go func() { + doc := bson.Raw{} + for bsonSource.Next(&doc) { + rawBytes := make([]byte, len(doc.Data)) + copy(rawBytes, doc.Data) + docChan <- bson.Raw{Data: rawBytes} } - err := collection.Insert(doc) + close(docChan) + }() + + for i := 0; i < MaxInsertThreads; i++ { + go func() { + bulk := db.NewBufferedBulkInserter(collection, BulkBufferSize, false) + for { + select { + case rawDoc, alive := <-docChan: + if !alive { + resultChan <- bulk.Flush() + return + } + if restore.objCheck { + //TODO encapsulate to reuse bson obj?? + err := bson.Unmarshal(rawDoc.Data, &bson.D{}) + if err != nil { + resultChan <- fmt.Errorf("invalid object: %v", err) + return + } + } + err := bulk.Insert(rawDoc) + if err != nil { + resultChan <- err + return + } + bytesReadChan <- len(rawDoc.Data) + case <-killChan: + return + } + } + }() + + // sleep to prevent all threads from inserting at the same time at start + time.Sleep(time.Duration(i) * 10 * time.Millisecond) //FIXME magic numbers + } + + // wait until all insert jobs finish + for done := 0; done < MaxInsertThreads; done++ { + err := <-resultChan if err != nil { + close(killChan) + //TODO actually wait for things to end? return err } } + // final error check if err = bsonSource.Err(); err != nil { return err } |