diff options
author | Kyle Erf <erf@mongodb.com> | 2014-10-15 11:28:13 -0400 |
---|---|---|
committer | Kyle Erf <erf@mongodb.com> | 2014-10-17 13:39:49 -0400 |
commit | 330cd0eaefdbc2e0af6c9bbd61aff933a60fb96f (patch) | |
tree | 2c26148c0ce27a38fd2720587bd0d36952e4ef86 | |
parent | 0cf93b8b8f4741983e6f058a70e7b561a697e0f4 (diff) | |
download | mongo-330cd0eaefdbc2e0af6c9bbd61aff933a60fb96f.tar.gz |
TOOLS-283 restore multiple collections in parallel
* prioritizes based on available databases and collection size
Former-commit-id: 1121529476f36cd32ed58a297401eb8cece1e51f
-rw-r--r-- | mongorestore/filepath.go | 3 | ||||
-rw-r--r-- | mongorestore/filepath_test.go | 4 | ||||
-rw-r--r-- | mongorestore/intent.go | 75 | ||||
-rw-r--r-- | mongorestore/intent_prioritizer.go | 193 | ||||
-rw-r--r-- | mongorestore/intent_prioritizer_test.go | 176 | ||||
-rw-r--r-- | mongorestore/intent_test.go | 7 | ||||
-rw-r--r-- | mongorestore/main/mongorestore.go | 6 | ||||
-rw-r--r-- | mongorestore/metadata.go | 5 | ||||
-rw-r--r-- | mongorestore/mongorestore.go | 6 | ||||
-rw-r--r-- | mongorestore/oplog.go | 1 | ||||
-rw-r--r-- | mongorestore/options/options.go | 1 | ||||
-rw-r--r-- | mongorestore/restore.go | 47 |
12 files changed, 502 insertions, 22 deletions
diff --git a/mongorestore/filepath.go b/mongorestore/filepath.go index ce84ec52ace..e14b8c3cf52 100644 --- a/mongorestore/filepath.go +++ b/mongorestore/filepath.go @@ -63,6 +63,7 @@ func (restore *MongoRestore) CreateAllIntents(fullpath string) error { restore.manager.Put(&Intent{ C: "oplog", //TODO make this a helper in intent BSONPath: filepath.Join(fullpath, entry.Name()), + BSONSize: entry.Size(), }) } else { log.Logf(0, `don't know what to do with file "%v", skipping...`, @@ -104,6 +105,7 @@ func (restore *MongoRestore) CreateIntentsForDB(db, fullpath string) error { intent := &Intent{ DB: db, C: collection, + BSONSize: entry.Size(), BSONPath: filepath.Join(fullpath, entry.Name()), } log.Logf(1, "found collection %v bson to restore", intent.Key()) @@ -160,6 +162,7 @@ func (restore *MongoRestore) CreateIntentForCollection( DB: db, C: collection, BSONPath: fullpath, + BSONSize: file.Size(), } // finally, check if it has a .metadata.json file in its folder diff --git a/mongorestore/filepath_test.go b/mongorestore/filepath_test.go index 4ba7cd67529..2f505967d0b 100644 --- a/mongorestore/filepath_test.go +++ b/mongorestore/filepath_test.go @@ -47,6 +47,7 @@ func TestCreateAllIntents(t *testing.T) { Convey("running CreateAllIntents should succeed", func() { So(mr.CreateAllIntents("testdata/testdirs/"), ShouldBeNil) + mr.manager.Finalize(Legacy) //TODO handle oplog! Convey("and reading the intents should show alphabetical order", func() { @@ -112,6 +113,7 @@ func TestCreateIntentsForDB(t *testing.T) { Convey("running CreateIntentsForDB should succeed", func() { err := mr.CreateIntentsForDB("myDB", "testdata/testdirs/db1") So(err, ShouldBeNil) + mr.manager.Finalize(Legacy) Convey("and reading the intents should show alphabetical order", func() { i0 := mr.manager.Pop() @@ -164,6 +166,7 @@ func TestCreateIntentsForCollection(t *testing.T) { err := mr.CreateIntentForCollection( "myDB", "myC", util.ToUniversalPath("testdata/testdirs/db1/c2.bson")) So(err, ShouldBeNil) + mr.manager.Finalize(Legacy) Convey("should create one intent with 'myDb' and 'myC' fields", func() { i0 := mr.manager.Pop() @@ -186,6 +189,7 @@ func TestCreateIntentsForCollection(t *testing.T) { err := mr.CreateIntentForCollection( "myDB", "myC", util.ToUniversalPath("testdata/testdirs/db1/c1.bson")) So(err, ShouldBeNil) + mr.manager.Finalize(Legacy) Convey("should create one intent with 'myDb' and 'myC' fields", func() { i0 := mr.manager.Pop() diff --git a/mongorestore/intent.go b/mongorestore/intent.go index b7944ab0441..332ce6b6e0f 100644 --- a/mongorestore/intent.go +++ b/mongorestore/intent.go @@ -1,5 +1,10 @@ package mongorestore +import ( + "github.com/mongodb/mongo-tools/common/log" + "sync" +) + // TODO: make this reusable for dump? // mongorestore first scans the directory to generate a list @@ -12,6 +17,9 @@ type Intent struct { // File locations as absolute paths BSONPath string MetadataPath string + + // File size, for some prioritizer implementations. + BSONSize int64 } func (it *Intent) Key() string { @@ -47,7 +55,6 @@ func (it *Intent) IsSystemIndexes() bool { } // Intent Manager -// TODO make this an interface, for testing ease type IntentManager struct { // map for merging metadata with BSON intents @@ -55,7 +62,13 @@ type IntentManager struct { // legacy mongorestore works in the order that paths are discovered, // so we need an ordered data structure to preserve this behavior. - queue []*Intent + intentsByDiscoveryOrder []*Intent + + // we need different scheduling order depending on the target + // mongod/mongos and whether or not we are multi threading; + // the IntentPrioritizer interface encapsulates this. + prioritizer IntentPrioritizer + priotitizerLock *sync.Mutex // special cases that should be saved but not be part of the queue. // used to deal with oplog and user/roles restoration, which are @@ -68,16 +81,16 @@ type IntentManager struct { func NewIntentManager() *IntentManager { return &IntentManager{ - intents: map[string]*Intent{}, - queue: []*Intent{}, - indexIntents: map[string]*Intent{}, + intents: map[string]*Intent{}, + intentsByDiscoveryOrder: []*Intent{}, + indexIntents: map[string]*Intent{}, + priotitizerLock: &sync.Mutex{}, } } // Put inserts an intent into the manager. Intents for the same collection // are merged together, so that BSON and metadata files for the same collection -// are returned in the same intent. Not currently thread safe, but could be made -// so very easily. +// are returned in the same intent. func (manager *IntentManager) Put(intent *Intent) { if intent == nil { panic("cannot insert nil *Intent into IntentManager") @@ -113,6 +126,9 @@ func (manager *IntentManager) Put(intent *Intent) { if existing.BSONPath == "" { existing.BSONPath = intent.BSONPath } + if existing.BSONSize == 0 { + existing.BSONSize = intent.BSONSize + } if existing.MetadataPath == "" { existing.MetadataPath = intent.MetadataPath } @@ -121,25 +137,27 @@ func (manager *IntentManager) Put(intent *Intent) { // if key doesn't already exist, add it to the manager manager.intents[intent.Key()] = intent - manager.queue = append(manager.queue, intent) + manager.intentsByDiscoveryOrder = append(manager.intentsByDiscoveryOrder, intent) } // Pop returns the next available intent from the manager. If the manager is -// empty, it returns nil. Not currently thread safe, but could be made -// so very easily. +// empty, it returns nil. Pop is thread safe. func (manager *IntentManager) Pop() *Intent { - var intent *Intent - - if len(manager.queue) == 0 { - return nil - } - - intent, manager.queue = manager.queue[0], manager.queue[1:] - delete(manager.intents, intent.Key()) + manager.priotitizerLock.Lock() + defer manager.priotitizerLock.Unlock() + intent := manager.prioritizer.Get() return intent } +// Finish tells the prioritizer that mongorestore is done restoring +// the given collection intent. +func (manager *IntentManager) Finish(intent *Intent) { + manager.priotitizerLock.Lock() + defer manager.priotitizerLock.Unlock() + manager.prioritizer.Finish(intent) +} + // Oplog returns the intent representing the oplog, which isn't // stored with the other intents, because it is dumped and restored in // a very different way from other collections. @@ -152,10 +170,31 @@ func (manager *IntentManager) SystemIndexes(dbName string) *Intent { return manager.indexIntents[dbName] } +// Users returns the intent of the users collection to restore, a special case func (manager *IntentManager) Users() *Intent { return manager.usersIntent } +// Roles returns the intent of the user roles collection to restore, a special case func (manager *IntentManager) Roles() *Intent { return manager.rolesIntent } + +// Finalize processes the intents for prioritization. Currently only two +// kinds of prioritizers are supported. No more "Put" operations may be done +// after finalize is called. +func (manager *IntentManager) Finalize(pType PriorityType) { + switch pType { + case Legacy: + log.Log(3, "finalizing intent manager with legacy prioritizer") + manager.prioritizer = NewLegacyPrioritizer(manager.intentsByDiscoveryOrder) + case MultiDatabaseLTF: + log.Log(3, "finalizing intent manager with multi-database largest task first prioritizer") + manager.prioritizer = NewMultiDatabaseLTFPrioritizer(manager.intentsByDiscoveryOrder) + default: + panic("cannot initialize IntentPrioritizer with unknown type") + } + // release these for the garbage collector and to ensure code correctness + manager.intents = nil + manager.intentsByDiscoveryOrder = nil +} diff --git a/mongorestore/intent_prioritizer.go b/mongorestore/intent_prioritizer.go new file mode 100644 index 00000000000..529263376c5 --- /dev/null +++ b/mongorestore/intent_prioritizer.go @@ -0,0 +1,193 @@ +package mongorestore + +import ( + "container/heap" + "sort" +) + +type PriorityType int + +const ( + Legacy PriorityType = iota + MultiDatabaseLTF +) + +// IntentPrioritizer encapsulates the logic of scheduling intents +// for restoration. It can know about which intents are in the +// process of being restored through the "Finish" hook. +// +// Oplog entries and auth entries are not handled by the prioritizer, +// as these are special cases handled by the regular mongorestore code. +type IntentPrioritizer interface { + Get() *Intent + Finish(*Intent) +} + +//===== Legacy ===== + +// legacyPrioritizer processes the intents in the order they were read off the +// file system, keeping with legacy mongorestore behavior. +type legacyPrioritizer struct { + queue []*Intent +} + +func NewLegacyPrioritizer(intentList []*Intent) *legacyPrioritizer { + return &legacyPrioritizer{queue: intentList} +} + +func (legacy *legacyPrioritizer) Get() *Intent { + var intent *Intent + + if len(legacy.queue) == 0 { + return nil + } + + intent, legacy.queue = legacy.queue[0], legacy.queue[1:] + return intent +} + +func (legacy *legacyPrioritizer) Finish(*Intent) { + // no-op + return +} + +//===== Multi Database Longest Task First ===== + +// multiDatabaseLTF is designed to properly schedule intents with two constraints: +// 1. it is optimized to run in a multi-processor environment +// 2. it is optimized for parallelism against 2.6's db-level write lock +// These goals result in a design that attempts to have as many different +// database's intents being restored as possible and attempts to restore the +// largest collections first. +// +// If we can have a minimum number of collections in flight for a given db, +// we avoid lock contention in an optimal way on 2.6 systems. That is, +// it is better to have two restore jobs where +// job1 = "test.mycollection" +// job2 = "mydb2.othercollection" +// so that these collections do not compete for the db-level write lock. +// +// We also schedule the largest jobs first, in a greedy fashion, in order +// to minimize total restoration time. Each database's intents are sorted +// by decreasing file size at initialization, so that the largest jobs are +// run first. Admittedly, .bson file size is not a direct predictor of restore +// time, but there is certainly a strong correlation. Note that this attribute +// is secondary to the multi-db scheduling laid out above, since multi-db will +// get us bigger wins in terms of parallelism. +type multiDatabaseLTFPrioritizer struct { + dbHeap heap.Interface + counterMap map[string]*dbCounter +} + +// NewMultiDatabaseLTFPrioritizer takes in a list of intents and returns an +// initialized prioritizer. +func NewMultiDatabaseLTFPrioritizer(intents []*Intent) *multiDatabaseLTFPrioritizer { + prioritizer := &multiDatabaseLTFPrioritizer{ + counterMap: map[string]*dbCounter{}, + dbHeap: &DBHeap{}, + } + heap.Init(prioritizer.dbHeap) + // first, create all database counters + for _, intent := range intents { + counter, exists := prioritizer.counterMap[intent.DB] + if !exists { + // initialize a new counter if one doesn't exist for DB + counter = &dbCounter{} + prioritizer.counterMap[intent.DB] = counter + } + counter.collections = append(counter.collections, intent) + } + // then ensure that all the dbCounters have sorted intents + for _, counter := range prioritizer.counterMap { + counter.SortCollectionsBySize() + heap.Push(prioritizer.dbHeap, counter) + } + return prioritizer +} + +// Get returns the next prioritized intent and updates the count of active +// restores for the returned intent's DB. Get is not thread safe, and depends +// on the implementation of the intent manager to lock around it. +func (mdb *multiDatabaseLTFPrioritizer) Get() *Intent { + if mdb.dbHeap.Len() == 0 { + // we're out of things to return + return nil + } + optimalDB := heap.Pop(mdb.dbHeap).(*dbCounter) + optimalDB.active++ + nextIntent := optimalDB.PopIntent() + // only release the db counter if it's out of collections + if len(optimalDB.collections) > 0 { + heap.Push(mdb.dbHeap, optimalDB) + } + return nextIntent +} + +// Finish decreases the number of active restore jobs for the given intent's +// database, and reshuffles the heap accordingly. Finish is not thread safe, +// and depends on the implementation of the intent manager to lock around it. +func (mdb *multiDatabaseLTFPrioritizer) Finish(intent *Intent) { + counter := mdb.counterMap[intent.DB] + counter.active-- + // only fix up the heap if the counter is still in the heap + if len(counter.collections) > 0 { + // This is an O(n) operation on the heap. We could make all heap + // operations O(log(n)) if we set up dbCounters to track their own + // position in the heap, but in practice this overhead is likely negligible. + heap.Init(mdb.dbHeap) + } +} + +type dbCounter struct { + active int + collections []*Intent +} + +func (dbc *dbCounter) SortCollectionsBySize() { + sort.Sort(ByFilesize(dbc.collections)) +} + +// PopIntent returns the largest intent remaining for the database +func (dbc *dbCounter) PopIntent() *Intent { + var intent *Intent + if len(dbc.collections) > 0 { + intent, dbc.collections = dbc.collections[0], dbc.collections[1:] + } + return intent +} + +// For sorting intents from largest to smallest BSON filesize +type ByFilesize []*Intent + +func (s ByFilesize) Len() int { return len(s) } +func (s ByFilesize) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s ByFilesize) Less(i, j int) bool { return s[i].BSONSize > s[j].BSONSize } + +// Returns the largest collection of the databases with the least active restores. +// Implements the container/heap interface. None of its methods are meant to be +// called directly. +type DBHeap []*dbCounter + +func (dbh DBHeap) Len() int { return len(dbh) } +func (dbh DBHeap) Swap(i, j int) { dbh[i], dbh[j] = dbh[j], dbh[i] } +func (dbh DBHeap) Less(i, j int) bool { + if dbh[i].active == dbh[j].active { + // prioritize the largest bson file if dbs have the same number + // of restorations in progress + return dbh[i].collections[0].BSONSize > dbh[j].collections[0].BSONSize + } + return dbh[i].active < dbh[j].active +} + +func (dbh *DBHeap) Push(x interface{}) { + *dbh = append(*dbh, x.(*dbCounter)) +} + +func (dbh *DBHeap) Pop() interface{} { + // for container/heap package: removes the top entry and resizes the heap array + old := *dbh + n := len(old) + toPop := old[n-1] + *dbh = old[0 : n-1] + return toPop +} diff --git a/mongorestore/intent_prioritizer_test.go b/mongorestore/intent_prioritizer_test.go new file mode 100644 index 00000000000..474a2c26aab --- /dev/null +++ b/mongorestore/intent_prioritizer_test.go @@ -0,0 +1,176 @@ +package mongorestore + +import ( + "container/heap" + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "testing" +) + +func TestLegacyPrioritizer(t *testing.T) { + + testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE) + + Convey("With a legacyPrioritizer initialized with an ordered intent list", t, func() { + testList := []*Intent{ + &Intent{DB: "1"}, + &Intent{DB: "2"}, + &Intent{DB: "3"}, + } + legacy := NewLegacyPrioritizer(testList) + So(legacy, ShouldNotBeNil) + + Convey("the priority should be defined by 'first-in-first-out'", func() { + it0 := legacy.Get() + it1 := legacy.Get() + it2 := legacy.Get() + it3 := legacy.Get() + So(it3, ShouldBeNil) + So(it0.DB, ShouldBeLessThan, it1.DB) + So(it1.DB, ShouldBeLessThan, it2.DB) + }) + }) +} + +//TODO test the hell out of the heap + +func TestBasicDBHeapBehavior(t *testing.T) { + var dbheap heap.Interface + + testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE) + + Convey("With an empty dbHeap", t, func() { + dbheap = &DBHeap{} + heap.Init(dbheap) + + Convey("when inserting unordered dbCounters with different active counts", func() { + heap.Push(dbheap, &dbCounter{75, nil}) + heap.Push(dbheap, &dbCounter{121, nil}) + heap.Push(dbheap, &dbCounter{76, nil}) + heap.Push(dbheap, &dbCounter{51, nil}) + heap.Push(dbheap, &dbCounter{82, nil}) + heap.Push(dbheap, &dbCounter{117, nil}) + heap.Push(dbheap, &dbCounter{49, nil}) + heap.Push(dbheap, &dbCounter{101, nil}) + heap.Push(dbheap, &dbCounter{122, nil}) + heap.Push(dbheap, &dbCounter{33, nil}) + heap.Push(dbheap, &dbCounter{0, nil}) + + Convey("they should pop in active order, least to greatest", func() { + prev := -1 + for dbheap.Len() > 0 { + popped := heap.Pop(dbheap).(*dbCounter) + So(popped.active, ShouldBeGreaterThan, prev) + prev = popped.active + } + }) + }) + + Convey("when inserting unordered dbCounters with different bson sizes", func() { + heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{BSONSize: 70}}}) + heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{BSONSize: 1024}}}) + heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{BSONSize: 97}}}) + heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{BSONSize: 3}}}) + heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{BSONSize: 1024 * 1024}}}) + + Convey("they should pop in bson size order, greatest to least", func() { + prev := int64(1024*1024 + 1) // Maximum + for dbheap.Len() > 0 { + popped := heap.Pop(dbheap).(*dbCounter) + So(popped.collections[0].BSONSize, ShouldBeLessThan, prev) + prev = popped.collections[0].BSONSize + } + }) + }) + }) +} + +func TestDBCounterCollectionSorting(t *testing.T) { + + testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE) + + Convey("With a dbCounter and an unordered collection of intents", t, func() { + dbc := &dbCounter{ + collections: []*Intent{ + &Intent{BSONSize: 100}, + &Intent{BSONSize: 1000}, + &Intent{BSONSize: 1}, + &Intent{BSONSize: 10}, + }, + } + + Convey("popping the sorted intents should return in decreasing BSONSize", func() { + dbc.SortCollectionsBySize() + So(dbc.PopIntent().BSONSize, ShouldEqual, 1000) + So(dbc.PopIntent().BSONSize, ShouldEqual, 100) + So(dbc.PopIntent().BSONSize, ShouldEqual, 10) + So(dbc.PopIntent().BSONSize, ShouldEqual, 1) + So(dbc.PopIntent(), ShouldBeNil) + So(dbc.PopIntent(), ShouldBeNil) + }) + }) +} + +func TestSimulatedMultiDBJob(t *testing.T) { + var prioritizer IntentPrioritizer + + testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE) + + Convey("With a prioritizer initialized with a set of intents", t, func() { + intents := []*Intent{ + &Intent{C: "small", DB: "db2", BSONSize: 32}, + &Intent{C: "medium", DB: "db2", BSONSize: 128}, + &Intent{C: "giant", DB: "db1", BSONSize: 1024}, + &Intent{C: "tiny", DB: "db1", BSONSize: 2}, + } + prioritizer = NewMultiDatabaseLTFPrioritizer(intents) + So(prioritizer, ShouldNotBeNil) + + Convey("and a running simulation of two jobs threads:", func() { + Convey("the first two intents should be of different dbs", func() { + i0 := prioritizer.Get() + So(i0, ShouldNotBeNil) + i1 := prioritizer.Get() + So(i1, ShouldNotBeNil) + + Convey("the first intent should be the largest bson file", func() { + So(i0.C, ShouldEqual, "giant") + So(i0.DB, ShouldEqual, "db1") + }) + + Convey("the second intent should be the largest bson file of db2", func() { + So(i1.C, ShouldEqual, "medium") + So(i1.DB, ShouldEqual, "db2") + }) + + Convey("with the second job finishing the smaller intents", func() { + prioritizer.Finish(i1) + i2 := prioritizer.Get() + So(i2, ShouldNotBeNil) + prioritizer.Finish(i2) + i3 := prioritizer.Get() + So(i3, ShouldNotBeNil) + + Convey("the next job should be from db2", func() { + So(i2.C, ShouldEqual, "small") + So(i2.DB, ShouldEqual, "db2") + }) + + Convey("the final job should be from db1", func() { + So(i3.C, ShouldEqual, "tiny") + So(i3.DB, ShouldEqual, "db1") + + Convey("which means that there should be two active db1 jobs", func() { + counter := prioritizer.(*multiDatabaseLTFPrioritizer).counterMap["db1"] + So(counter.active, ShouldEqual, 2) + }) + }) + + Convey("the heap should now be empty", func() { + So(prioritizer.(*multiDatabaseLTFPrioritizer).dbHeap.Len(), ShouldEqual, 0) + }) + }) + }) + }) + }) +} diff --git a/mongorestore/intent_test.go b/mongorestore/intent_test.go index 221f6470b6e..f2d3e3f2850 100644 --- a/mongorestore/intent_test.go +++ b/mongorestore/intent_test.go @@ -20,7 +20,7 @@ func TestIntentManager(t *testing.T) { manager.Put(&Intent{DB: "1", C: "2", BSONPath: "/b2/"}) manager.Put(&Intent{DB: "1", C: "3", BSONPath: "/b3/"}) manager.Put(&Intent{DB: "2", C: "1", BSONPath: "/b4/"}) - So(len(manager.queue), ShouldEqual, 4) + So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 4) So(len(manager.intents), ShouldEqual, 4) Convey("and then some matching metadata intents", func() { @@ -30,11 +30,12 @@ func TestIntentManager(t *testing.T) { manager.Put(&Intent{DB: "1", C: "2", MetadataPath: "/2m/"}) Convey("the size of the queue should be unchanged", func() { - So(len(manager.queue), ShouldEqual, 4) + So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 4) So(len(manager.intents), ShouldEqual, 4) }) Convey("popping them from the IntentManager", func() { + manager.Finalize(Legacy) it0 := manager.Pop() it1 := manager.Pop() it2 := manager.Pop() @@ -60,7 +61,7 @@ func TestIntentManager(t *testing.T) { manager.Put(&Intent{DB: "27", C: "B", MetadataPath: "/6/"}) Convey("should increase the size, because they are not merged in", func() { - So(len(manager.queue), ShouldEqual, 6) + So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 6) So(len(manager.intents), ShouldEqual, 6) }) }) diff --git a/mongorestore/main/mongorestore.go b/mongorestore/main/mongorestore.go index a13d6839d41..e11796ee69a 100644 --- a/mongorestore/main/mongorestore.go +++ b/mongorestore/main/mongorestore.go @@ -9,6 +9,7 @@ import ( "github.com/mongodb/mongo-tools/mongorestore" "github.com/mongodb/mongo-tools/mongorestore/options" "os" + "runtime" ) func main() { @@ -36,6 +37,11 @@ func main() { log.SetVerbosity(opts.Verbosity) + if outputOpts.JobThreads > 0 { + log.Logf(1, "running mongorestore with %v job threads", outputOpts.JobThreads) + runtime.GOMAXPROCS(outputOpts.JobThreads) + } + targetDir := "" if inputOpts.Directory != "" { targetDir = inputOpts.Directory diff --git a/mongorestore/metadata.go b/mongorestore/metadata.go index 0da67823e22..2351ab4ba34 100644 --- a/mongorestore/metadata.go +++ b/mongorestore/metadata.go @@ -100,6 +100,7 @@ func (restore *MongoRestore) DBHasCollection(intent *Intent) (bool, error) { if err != nil { return false, fmt.Errorf("error establishing connection: %v", err) } + session.SetSocketTimeout(0) defer session.Close() err = session.DB(intent.DB).C("system.namespaces").Find(bson.M{"name": collectionNS}).One(&result) if err != nil { @@ -173,6 +174,7 @@ func (restore *MongoRestore) LegacyInsertIndex(intent *Intent, index IndexDocume if err != nil { return fmt.Errorf("error establishing connection: %v", err) } + session.SetSocketTimeout(0) defer session.Close() // overwrite safety to make sure we catch errors @@ -199,6 +201,7 @@ func (restore *MongoRestore) CreateCollection(intent *Intent, options bson.D) er if err != nil { return fmt.Errorf("error establishing connection: %v", err) } + session.SetSocketTimeout(0) defer session.Close() res := bson.M{} @@ -258,6 +261,7 @@ func (restore *MongoRestore) RestoreUsersOrRoles(collectionType string, intent * log.Logf(0, "error establishing connection to drop temporary collection %v: %v", tempCol, err) return } + session.SetSocketTimeout(0) defer session.Close() log.Logf(3, "dropping temporary collection %v", tempCol) err = session.DB("admin").C(tempCol).DropCollection() @@ -281,6 +285,7 @@ func (restore *MongoRestore) RestoreUsersOrRoles(collectionType string, intent * } session, err := restore.SessionProvider.GetSession() + session.SetSocketTimeout(0) if err != nil { return fmt.Errorf("error establishing connection: %v", err) } diff --git a/mongorestore/mongorestore.go b/mongorestore/mongorestore.go index b735ba85793..0f5c8b7d452 100644 --- a/mongorestore/mongorestore.go +++ b/mongorestore/mongorestore.go @@ -101,6 +101,12 @@ func (restore *MongoRestore) Restore() error { } // 2. Restore them... + if restore.OutputOptions.JobThreads > 0 { + restore.manager.Finalize(MultiDatabaseLTF) + } else { + // use legacy restoration order if we are single-threaded + restore.manager.Finalize(Legacy) + } err = restore.RestoreIntents() if err != nil { return fmt.Errorf("restore error: %v", err) diff --git a/mongorestore/oplog.go b/mongorestore/oplog.go index 03e7add4740..a911895f1a9 100644 --- a/mongorestore/oplog.go +++ b/mongorestore/oplog.go @@ -66,6 +66,7 @@ func (restore *MongoRestore) RestoreOplog() error { if err != nil { return fmt.Errorf("error establishing connection: %v", err) } + session.SetSocketTimeout(0) defer session.Close() // To restore the oplog, we iterate over the oplog entries, diff --git a/mongorestore/options/options.go b/mongorestore/options/options.go index 1a05db5cae7..e121a54b6c3 100644 --- a/mongorestore/options/options.go +++ b/mongorestore/options/options.go @@ -19,6 +19,7 @@ type OutputOptions struct { NoIndexRestore bool `long:"noIndexRestore" description:"Don't restore indexes"` NoOptionsRestore bool `long:"noOptionsRestore" description:"Don't restore options"` KeepIndexVersion bool `long:"keepIndexVersion" description:"Don't update index version"` + JobThreads int `long:"jobThreads" short:"j" description:"TODO"` } func (self *OutputOptions) Name() string { diff --git a/mongorestore/restore.go b/mongorestore/restore.go index d9e8930d7d5..6c313ff6cad 100644 --- a/mongorestore/restore.go +++ b/mongorestore/restore.go @@ -17,11 +17,54 @@ const ProgressBarLength = 24 // RestoreIntents iterates through all of the normal intents // stored in the IntentManager, and restores them. func (restore *MongoRestore) RestoreIntents() error { + + if restore.OutputOptions.JobThreads > 0 { + errChan := make(chan error) + doneChan := make(chan struct{}) + doneCount := 0 + + // start a goroutine for each job thread + for i := 0; i < restore.OutputOptions.JobThreads; i++ { + go func(id int) { + log.Logf(3, "starting restore routine with id=%v", id) + for { + intent := restore.manager.Pop() + if intent == nil { + break + } + err := restore.RestoreIntent(intent) + if err != nil { + errChan <- err + return + } + restore.manager.Finish(intent) + } + log.Logf(3, "ending restore routine with id=%v, no more work to do", id) + doneChan <- struct{}{} + }(i) + } + + // wait until all goroutines are done or one of them errors out + for { + select { + case err := <-errChan: + return err + case <-doneChan: + doneCount++ + if doneCount == restore.OutputOptions.JobThreads { + return nil + } + } + } + } + + // single-threaded for intent := restore.manager.Pop(); intent != nil; intent = restore.manager.Pop() { err := restore.RestoreIntent(intent) if err != nil { return err } + restore.manager.Finish(intent) } return nil } @@ -49,6 +92,7 @@ func (restore *MongoRestore) RestoreIntent(intent *Intent) error { log.Logf(1, "dropping collection %v before restoring", intent.Key()) // TODO(erf) maybe encapsulate this so that the session is closed sooner session, err := restore.SessionProvider.GetSession() + session.SetSocketTimeout(0) if err != nil { return fmt.Errorf("error establishing connection: %v", err) } @@ -172,12 +216,13 @@ func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string, bar.Start() defer bar.Stop() + //TODO use a goroutine here doc := &bson.Raw{} for bsonSource.Next(doc) { bytesRead += len(doc.Data) if restore.objCheck { //TODO encapsulate to reuse bson obj?? - err := bson.Unmarshal(doc.Data, &bson.M{}) + err := bson.Unmarshal(doc.Data, &bson.D{}) if err != nil { return err break |