summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyle Erf <erf@mongodb.com>2014-10-15 11:28:13 -0400
committerKyle Erf <erf@mongodb.com>2014-10-17 13:39:49 -0400
commit330cd0eaefdbc2e0af6c9bbd61aff933a60fb96f (patch)
tree2c26148c0ce27a38fd2720587bd0d36952e4ef86
parent0cf93b8b8f4741983e6f058a70e7b561a697e0f4 (diff)
downloadmongo-330cd0eaefdbc2e0af6c9bbd61aff933a60fb96f.tar.gz
TOOLS-283 restore multiple collections in parallel
* prioritizes based on available databases and collection size Former-commit-id: 1121529476f36cd32ed58a297401eb8cece1e51f
-rw-r--r--mongorestore/filepath.go3
-rw-r--r--mongorestore/filepath_test.go4
-rw-r--r--mongorestore/intent.go75
-rw-r--r--mongorestore/intent_prioritizer.go193
-rw-r--r--mongorestore/intent_prioritizer_test.go176
-rw-r--r--mongorestore/intent_test.go7
-rw-r--r--mongorestore/main/mongorestore.go6
-rw-r--r--mongorestore/metadata.go5
-rw-r--r--mongorestore/mongorestore.go6
-rw-r--r--mongorestore/oplog.go1
-rw-r--r--mongorestore/options/options.go1
-rw-r--r--mongorestore/restore.go47
12 files changed, 502 insertions, 22 deletions
diff --git a/mongorestore/filepath.go b/mongorestore/filepath.go
index ce84ec52ace..e14b8c3cf52 100644
--- a/mongorestore/filepath.go
+++ b/mongorestore/filepath.go
@@ -63,6 +63,7 @@ func (restore *MongoRestore) CreateAllIntents(fullpath string) error {
restore.manager.Put(&Intent{
C: "oplog", //TODO make this a helper in intent
BSONPath: filepath.Join(fullpath, entry.Name()),
+ BSONSize: entry.Size(),
})
} else {
log.Logf(0, `don't know what to do with file "%v", skipping...`,
@@ -104,6 +105,7 @@ func (restore *MongoRestore) CreateIntentsForDB(db, fullpath string) error {
intent := &Intent{
DB: db,
C: collection,
+ BSONSize: entry.Size(),
BSONPath: filepath.Join(fullpath, entry.Name()),
}
log.Logf(1, "found collection %v bson to restore", intent.Key())
@@ -160,6 +162,7 @@ func (restore *MongoRestore) CreateIntentForCollection(
DB: db,
C: collection,
BSONPath: fullpath,
+ BSONSize: file.Size(),
}
// finally, check if it has a .metadata.json file in its folder
diff --git a/mongorestore/filepath_test.go b/mongorestore/filepath_test.go
index 4ba7cd67529..2f505967d0b 100644
--- a/mongorestore/filepath_test.go
+++ b/mongorestore/filepath_test.go
@@ -47,6 +47,7 @@ func TestCreateAllIntents(t *testing.T) {
Convey("running CreateAllIntents should succeed", func() {
So(mr.CreateAllIntents("testdata/testdirs/"), ShouldBeNil)
+ mr.manager.Finalize(Legacy)
//TODO handle oplog!
Convey("and reading the intents should show alphabetical order", func() {
@@ -112,6 +113,7 @@ func TestCreateIntentsForDB(t *testing.T) {
Convey("running CreateIntentsForDB should succeed", func() {
err := mr.CreateIntentsForDB("myDB", "testdata/testdirs/db1")
So(err, ShouldBeNil)
+ mr.manager.Finalize(Legacy)
Convey("and reading the intents should show alphabetical order", func() {
i0 := mr.manager.Pop()
@@ -164,6 +166,7 @@ func TestCreateIntentsForCollection(t *testing.T) {
err := mr.CreateIntentForCollection(
"myDB", "myC", util.ToUniversalPath("testdata/testdirs/db1/c2.bson"))
So(err, ShouldBeNil)
+ mr.manager.Finalize(Legacy)
Convey("should create one intent with 'myDb' and 'myC' fields", func() {
i0 := mr.manager.Pop()
@@ -186,6 +189,7 @@ func TestCreateIntentsForCollection(t *testing.T) {
err := mr.CreateIntentForCollection(
"myDB", "myC", util.ToUniversalPath("testdata/testdirs/db1/c1.bson"))
So(err, ShouldBeNil)
+ mr.manager.Finalize(Legacy)
Convey("should create one intent with 'myDb' and 'myC' fields", func() {
i0 := mr.manager.Pop()
diff --git a/mongorestore/intent.go b/mongorestore/intent.go
index b7944ab0441..332ce6b6e0f 100644
--- a/mongorestore/intent.go
+++ b/mongorestore/intent.go
@@ -1,5 +1,10 @@
package mongorestore
+import (
+ "github.com/mongodb/mongo-tools/common/log"
+ "sync"
+)
+
// TODO: make this reusable for dump?
// mongorestore first scans the directory to generate a list
@@ -12,6 +17,9 @@ type Intent struct {
// File locations as absolute paths
BSONPath string
MetadataPath string
+
+ // File size, for some prioritizer implementations.
+ BSONSize int64
}
func (it *Intent) Key() string {
@@ -47,7 +55,6 @@ func (it *Intent) IsSystemIndexes() bool {
}
// Intent Manager
-// TODO make this an interface, for testing ease
type IntentManager struct {
// map for merging metadata with BSON intents
@@ -55,7 +62,13 @@ type IntentManager struct {
// legacy mongorestore works in the order that paths are discovered,
// so we need an ordered data structure to preserve this behavior.
- queue []*Intent
+ intentsByDiscoveryOrder []*Intent
+
+ // we need different scheduling order depending on the target
+ // mongod/mongos and whether or not we are multi threading;
+ // the IntentPrioritizer interface encapsulates this.
+ prioritizer IntentPrioritizer
+ priotitizerLock *sync.Mutex
// special cases that should be saved but not be part of the queue.
// used to deal with oplog and user/roles restoration, which are
@@ -68,16 +81,16 @@ type IntentManager struct {
func NewIntentManager() *IntentManager {
return &IntentManager{
- intents: map[string]*Intent{},
- queue: []*Intent{},
- indexIntents: map[string]*Intent{},
+ intents: map[string]*Intent{},
+ intentsByDiscoveryOrder: []*Intent{},
+ indexIntents: map[string]*Intent{},
+ priotitizerLock: &sync.Mutex{},
}
}
// Put inserts an intent into the manager. Intents for the same collection
// are merged together, so that BSON and metadata files for the same collection
-// are returned in the same intent. Not currently thread safe, but could be made
-// so very easily.
+// are returned in the same intent.
func (manager *IntentManager) Put(intent *Intent) {
if intent == nil {
panic("cannot insert nil *Intent into IntentManager")
@@ -113,6 +126,9 @@ func (manager *IntentManager) Put(intent *Intent) {
if existing.BSONPath == "" {
existing.BSONPath = intent.BSONPath
}
+ if existing.BSONSize == 0 {
+ existing.BSONSize = intent.BSONSize
+ }
if existing.MetadataPath == "" {
existing.MetadataPath = intent.MetadataPath
}
@@ -121,25 +137,27 @@ func (manager *IntentManager) Put(intent *Intent) {
// if key doesn't already exist, add it to the manager
manager.intents[intent.Key()] = intent
- manager.queue = append(manager.queue, intent)
+ manager.intentsByDiscoveryOrder = append(manager.intentsByDiscoveryOrder, intent)
}
// Pop returns the next available intent from the manager. If the manager is
-// empty, it returns nil. Not currently thread safe, but could be made
-// so very easily.
+// empty, it returns nil. Pop is thread safe.
func (manager *IntentManager) Pop() *Intent {
- var intent *Intent
-
- if len(manager.queue) == 0 {
- return nil
- }
-
- intent, manager.queue = manager.queue[0], manager.queue[1:]
- delete(manager.intents, intent.Key())
+ manager.priotitizerLock.Lock()
+ defer manager.priotitizerLock.Unlock()
+ intent := manager.prioritizer.Get()
return intent
}
+// Finish tells the prioritizer that mongorestore is done restoring
+// the given collection intent.
+func (manager *IntentManager) Finish(intent *Intent) {
+ manager.priotitizerLock.Lock()
+ defer manager.priotitizerLock.Unlock()
+ manager.prioritizer.Finish(intent)
+}
+
// Oplog returns the intent representing the oplog, which isn't
// stored with the other intents, because it is dumped and restored in
// a very different way from other collections.
@@ -152,10 +170,31 @@ func (manager *IntentManager) SystemIndexes(dbName string) *Intent {
return manager.indexIntents[dbName]
}
+// Users returns the intent of the users collection to restore, a special case
func (manager *IntentManager) Users() *Intent {
return manager.usersIntent
}
+// Roles returns the intent of the user roles collection to restore, a special case
func (manager *IntentManager) Roles() *Intent {
return manager.rolesIntent
}
+
+// Finalize processes the intents for prioritization. Currently only two
+// kinds of prioritizers are supported. No more "Put" operations may be done
+// after finalize is called.
+func (manager *IntentManager) Finalize(pType PriorityType) {
+ switch pType {
+ case Legacy:
+ log.Log(3, "finalizing intent manager with legacy prioritizer")
+ manager.prioritizer = NewLegacyPrioritizer(manager.intentsByDiscoveryOrder)
+ case MultiDatabaseLTF:
+ log.Log(3, "finalizing intent manager with multi-database largest task first prioritizer")
+ manager.prioritizer = NewMultiDatabaseLTFPrioritizer(manager.intentsByDiscoveryOrder)
+ default:
+ panic("cannot initialize IntentPrioritizer with unknown type")
+ }
+ // release these for the garbage collector and to ensure code correctness
+ manager.intents = nil
+ manager.intentsByDiscoveryOrder = nil
+}
diff --git a/mongorestore/intent_prioritizer.go b/mongorestore/intent_prioritizer.go
new file mode 100644
index 00000000000..529263376c5
--- /dev/null
+++ b/mongorestore/intent_prioritizer.go
@@ -0,0 +1,193 @@
+package mongorestore
+
+import (
+ "container/heap"
+ "sort"
+)
+
+type PriorityType int
+
+const (
+ Legacy PriorityType = iota
+ MultiDatabaseLTF
+)
+
+// IntentPrioritizer encapsulates the logic of scheduling intents
+// for restoration. It can know about which intents are in the
+// process of being restored through the "Finish" hook.
+//
+// Oplog entries and auth entries are not handled by the prioritizer,
+// as these are special cases handled by the regular mongorestore code.
+type IntentPrioritizer interface {
+ Get() *Intent
+ Finish(*Intent)
+}
+
+//===== Legacy =====
+
+// legacyPrioritizer processes the intents in the order they were read off the
+// file system, keeping with legacy mongorestore behavior.
+type legacyPrioritizer struct {
+ queue []*Intent
+}
+
+func NewLegacyPrioritizer(intentList []*Intent) *legacyPrioritizer {
+ return &legacyPrioritizer{queue: intentList}
+}
+
+func (legacy *legacyPrioritizer) Get() *Intent {
+ var intent *Intent
+
+ if len(legacy.queue) == 0 {
+ return nil
+ }
+
+ intent, legacy.queue = legacy.queue[0], legacy.queue[1:]
+ return intent
+}
+
+func (legacy *legacyPrioritizer) Finish(*Intent) {
+ // no-op
+ return
+}
+
+//===== Multi Database Longest Task First =====
+
+// multiDatabaseLTF is designed to properly schedule intents with two constraints:
+// 1. it is optimized to run in a multi-processor environment
+// 2. it is optimized for parallelism against 2.6's db-level write lock
+// These goals result in a design that attempts to have as many different
+// database's intents being restored as possible and attempts to restore the
+// largest collections first.
+//
+// If we can have a minimum number of collections in flight for a given db,
+// we avoid lock contention in an optimal way on 2.6 systems. That is,
+// it is better to have two restore jobs where
+// job1 = "test.mycollection"
+// job2 = "mydb2.othercollection"
+// so that these collections do not compete for the db-level write lock.
+//
+// We also schedule the largest jobs first, in a greedy fashion, in order
+// to minimize total restoration time. Each database's intents are sorted
+// by decreasing file size at initialization, so that the largest jobs are
+// run first. Admittedly, .bson file size is not a direct predictor of restore
+// time, but there is certainly a strong correlation. Note that this attribute
+// is secondary to the multi-db scheduling laid out above, since multi-db will
+// get us bigger wins in terms of parallelism.
+type multiDatabaseLTFPrioritizer struct {
+ dbHeap heap.Interface
+ counterMap map[string]*dbCounter
+}
+
+// NewMultiDatabaseLTFPrioritizer takes in a list of intents and returns an
+// initialized prioritizer.
+func NewMultiDatabaseLTFPrioritizer(intents []*Intent) *multiDatabaseLTFPrioritizer {
+ prioritizer := &multiDatabaseLTFPrioritizer{
+ counterMap: map[string]*dbCounter{},
+ dbHeap: &DBHeap{},
+ }
+ heap.Init(prioritizer.dbHeap)
+ // first, create all database counters
+ for _, intent := range intents {
+ counter, exists := prioritizer.counterMap[intent.DB]
+ if !exists {
+ // initialize a new counter if one doesn't exist for DB
+ counter = &dbCounter{}
+ prioritizer.counterMap[intent.DB] = counter
+ }
+ counter.collections = append(counter.collections, intent)
+ }
+ // then ensure that all the dbCounters have sorted intents
+ for _, counter := range prioritizer.counterMap {
+ counter.SortCollectionsBySize()
+ heap.Push(prioritizer.dbHeap, counter)
+ }
+ return prioritizer
+}
+
+// Get returns the next prioritized intent and updates the count of active
+// restores for the returned intent's DB. Get is not thread safe, and depends
+// on the implementation of the intent manager to lock around it.
+func (mdb *multiDatabaseLTFPrioritizer) Get() *Intent {
+ if mdb.dbHeap.Len() == 0 {
+ // we're out of things to return
+ return nil
+ }
+ optimalDB := heap.Pop(mdb.dbHeap).(*dbCounter)
+ optimalDB.active++
+ nextIntent := optimalDB.PopIntent()
+ // only release the db counter if it's out of collections
+ if len(optimalDB.collections) > 0 {
+ heap.Push(mdb.dbHeap, optimalDB)
+ }
+ return nextIntent
+}
+
+// Finish decreases the number of active restore jobs for the given intent's
+// database, and reshuffles the heap accordingly. Finish is not thread safe,
+// and depends on the implementation of the intent manager to lock around it.
+func (mdb *multiDatabaseLTFPrioritizer) Finish(intent *Intent) {
+ counter := mdb.counterMap[intent.DB]
+ counter.active--
+ // only fix up the heap if the counter is still in the heap
+ if len(counter.collections) > 0 {
+ // This is an O(n) operation on the heap. We could make all heap
+ // operations O(log(n)) if we set up dbCounters to track their own
+ // position in the heap, but in practice this overhead is likely negligible.
+ heap.Init(mdb.dbHeap)
+ }
+}
+
+type dbCounter struct {
+ active int
+ collections []*Intent
+}
+
+func (dbc *dbCounter) SortCollectionsBySize() {
+ sort.Sort(ByFilesize(dbc.collections))
+}
+
+// PopIntent returns the largest intent remaining for the database
+func (dbc *dbCounter) PopIntent() *Intent {
+ var intent *Intent
+ if len(dbc.collections) > 0 {
+ intent, dbc.collections = dbc.collections[0], dbc.collections[1:]
+ }
+ return intent
+}
+
+// For sorting intents from largest to smallest BSON filesize
+type ByFilesize []*Intent
+
+func (s ByFilesize) Len() int { return len(s) }
+func (s ByFilesize) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+func (s ByFilesize) Less(i, j int) bool { return s[i].BSONSize > s[j].BSONSize }
+
+// Returns the largest collection of the databases with the least active restores.
+// Implements the container/heap interface. None of its methods are meant to be
+// called directly.
+type DBHeap []*dbCounter
+
+func (dbh DBHeap) Len() int { return len(dbh) }
+func (dbh DBHeap) Swap(i, j int) { dbh[i], dbh[j] = dbh[j], dbh[i] }
+func (dbh DBHeap) Less(i, j int) bool {
+ if dbh[i].active == dbh[j].active {
+ // prioritize the largest bson file if dbs have the same number
+ // of restorations in progress
+ return dbh[i].collections[0].BSONSize > dbh[j].collections[0].BSONSize
+ }
+ return dbh[i].active < dbh[j].active
+}
+
+func (dbh *DBHeap) Push(x interface{}) {
+ *dbh = append(*dbh, x.(*dbCounter))
+}
+
+func (dbh *DBHeap) Pop() interface{} {
+ // for container/heap package: removes the top entry and resizes the heap array
+ old := *dbh
+ n := len(old)
+ toPop := old[n-1]
+ *dbh = old[0 : n-1]
+ return toPop
+}
diff --git a/mongorestore/intent_prioritizer_test.go b/mongorestore/intent_prioritizer_test.go
new file mode 100644
index 00000000000..474a2c26aab
--- /dev/null
+++ b/mongorestore/intent_prioritizer_test.go
@@ -0,0 +1,176 @@
+package mongorestore
+
+import (
+ "container/heap"
+ "github.com/mongodb/mongo-tools/common/testutil"
+ . "github.com/smartystreets/goconvey/convey"
+ "testing"
+)
+
+func TestLegacyPrioritizer(t *testing.T) {
+
+ testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE)
+
+ Convey("With a legacyPrioritizer initialized with an ordered intent list", t, func() {
+ testList := []*Intent{
+ &Intent{DB: "1"},
+ &Intent{DB: "2"},
+ &Intent{DB: "3"},
+ }
+ legacy := NewLegacyPrioritizer(testList)
+ So(legacy, ShouldNotBeNil)
+
+ Convey("the priority should be defined by 'first-in-first-out'", func() {
+ it0 := legacy.Get()
+ it1 := legacy.Get()
+ it2 := legacy.Get()
+ it3 := legacy.Get()
+ So(it3, ShouldBeNil)
+ So(it0.DB, ShouldBeLessThan, it1.DB)
+ So(it1.DB, ShouldBeLessThan, it2.DB)
+ })
+ })
+}
+
+//TODO test the hell out of the heap
+
+func TestBasicDBHeapBehavior(t *testing.T) {
+ var dbheap heap.Interface
+
+ testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE)
+
+ Convey("With an empty dbHeap", t, func() {
+ dbheap = &DBHeap{}
+ heap.Init(dbheap)
+
+ Convey("when inserting unordered dbCounters with different active counts", func() {
+ heap.Push(dbheap, &dbCounter{75, nil})
+ heap.Push(dbheap, &dbCounter{121, nil})
+ heap.Push(dbheap, &dbCounter{76, nil})
+ heap.Push(dbheap, &dbCounter{51, nil})
+ heap.Push(dbheap, &dbCounter{82, nil})
+ heap.Push(dbheap, &dbCounter{117, nil})
+ heap.Push(dbheap, &dbCounter{49, nil})
+ heap.Push(dbheap, &dbCounter{101, nil})
+ heap.Push(dbheap, &dbCounter{122, nil})
+ heap.Push(dbheap, &dbCounter{33, nil})
+ heap.Push(dbheap, &dbCounter{0, nil})
+
+ Convey("they should pop in active order, least to greatest", func() {
+ prev := -1
+ for dbheap.Len() > 0 {
+ popped := heap.Pop(dbheap).(*dbCounter)
+ So(popped.active, ShouldBeGreaterThan, prev)
+ prev = popped.active
+ }
+ })
+ })
+
+ Convey("when inserting unordered dbCounters with different bson sizes", func() {
+ heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{BSONSize: 70}}})
+ heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{BSONSize: 1024}}})
+ heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{BSONSize: 97}}})
+ heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{BSONSize: 3}}})
+ heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{BSONSize: 1024 * 1024}}})
+
+ Convey("they should pop in bson size order, greatest to least", func() {
+ prev := int64(1024*1024 + 1) // Maximum
+ for dbheap.Len() > 0 {
+ popped := heap.Pop(dbheap).(*dbCounter)
+ So(popped.collections[0].BSONSize, ShouldBeLessThan, prev)
+ prev = popped.collections[0].BSONSize
+ }
+ })
+ })
+ })
+}
+
+func TestDBCounterCollectionSorting(t *testing.T) {
+
+ testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE)
+
+ Convey("With a dbCounter and an unordered collection of intents", t, func() {
+ dbc := &dbCounter{
+ collections: []*Intent{
+ &Intent{BSONSize: 100},
+ &Intent{BSONSize: 1000},
+ &Intent{BSONSize: 1},
+ &Intent{BSONSize: 10},
+ },
+ }
+
+ Convey("popping the sorted intents should return in decreasing BSONSize", func() {
+ dbc.SortCollectionsBySize()
+ So(dbc.PopIntent().BSONSize, ShouldEqual, 1000)
+ So(dbc.PopIntent().BSONSize, ShouldEqual, 100)
+ So(dbc.PopIntent().BSONSize, ShouldEqual, 10)
+ So(dbc.PopIntent().BSONSize, ShouldEqual, 1)
+ So(dbc.PopIntent(), ShouldBeNil)
+ So(dbc.PopIntent(), ShouldBeNil)
+ })
+ })
+}
+
+func TestSimulatedMultiDBJob(t *testing.T) {
+ var prioritizer IntentPrioritizer
+
+ testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE)
+
+ Convey("With a prioritizer initialized with a set of intents", t, func() {
+ intents := []*Intent{
+ &Intent{C: "small", DB: "db2", BSONSize: 32},
+ &Intent{C: "medium", DB: "db2", BSONSize: 128},
+ &Intent{C: "giant", DB: "db1", BSONSize: 1024},
+ &Intent{C: "tiny", DB: "db1", BSONSize: 2},
+ }
+ prioritizer = NewMultiDatabaseLTFPrioritizer(intents)
+ So(prioritizer, ShouldNotBeNil)
+
+ Convey("and a running simulation of two jobs threads:", func() {
+ Convey("the first two intents should be of different dbs", func() {
+ i0 := prioritizer.Get()
+ So(i0, ShouldNotBeNil)
+ i1 := prioritizer.Get()
+ So(i1, ShouldNotBeNil)
+
+ Convey("the first intent should be the largest bson file", func() {
+ So(i0.C, ShouldEqual, "giant")
+ So(i0.DB, ShouldEqual, "db1")
+ })
+
+ Convey("the second intent should be the largest bson file of db2", func() {
+ So(i1.C, ShouldEqual, "medium")
+ So(i1.DB, ShouldEqual, "db2")
+ })
+
+ Convey("with the second job finishing the smaller intents", func() {
+ prioritizer.Finish(i1)
+ i2 := prioritizer.Get()
+ So(i2, ShouldNotBeNil)
+ prioritizer.Finish(i2)
+ i3 := prioritizer.Get()
+ So(i3, ShouldNotBeNil)
+
+ Convey("the next job should be from db2", func() {
+ So(i2.C, ShouldEqual, "small")
+ So(i2.DB, ShouldEqual, "db2")
+ })
+
+ Convey("the final job should be from db1", func() {
+ So(i3.C, ShouldEqual, "tiny")
+ So(i3.DB, ShouldEqual, "db1")
+
+ Convey("which means that there should be two active db1 jobs", func() {
+ counter := prioritizer.(*multiDatabaseLTFPrioritizer).counterMap["db1"]
+ So(counter.active, ShouldEqual, 2)
+ })
+ })
+
+ Convey("the heap should now be empty", func() {
+ So(prioritizer.(*multiDatabaseLTFPrioritizer).dbHeap.Len(), ShouldEqual, 0)
+ })
+ })
+ })
+ })
+ })
+}
diff --git a/mongorestore/intent_test.go b/mongorestore/intent_test.go
index 221f6470b6e..f2d3e3f2850 100644
--- a/mongorestore/intent_test.go
+++ b/mongorestore/intent_test.go
@@ -20,7 +20,7 @@ func TestIntentManager(t *testing.T) {
manager.Put(&Intent{DB: "1", C: "2", BSONPath: "/b2/"})
manager.Put(&Intent{DB: "1", C: "3", BSONPath: "/b3/"})
manager.Put(&Intent{DB: "2", C: "1", BSONPath: "/b4/"})
- So(len(manager.queue), ShouldEqual, 4)
+ So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 4)
So(len(manager.intents), ShouldEqual, 4)
Convey("and then some matching metadata intents", func() {
@@ -30,11 +30,12 @@ func TestIntentManager(t *testing.T) {
manager.Put(&Intent{DB: "1", C: "2", MetadataPath: "/2m/"})
Convey("the size of the queue should be unchanged", func() {
- So(len(manager.queue), ShouldEqual, 4)
+ So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 4)
So(len(manager.intents), ShouldEqual, 4)
})
Convey("popping them from the IntentManager", func() {
+ manager.Finalize(Legacy)
it0 := manager.Pop()
it1 := manager.Pop()
it2 := manager.Pop()
@@ -60,7 +61,7 @@ func TestIntentManager(t *testing.T) {
manager.Put(&Intent{DB: "27", C: "B", MetadataPath: "/6/"})
Convey("should increase the size, because they are not merged in", func() {
- So(len(manager.queue), ShouldEqual, 6)
+ So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 6)
So(len(manager.intents), ShouldEqual, 6)
})
})
diff --git a/mongorestore/main/mongorestore.go b/mongorestore/main/mongorestore.go
index a13d6839d41..e11796ee69a 100644
--- a/mongorestore/main/mongorestore.go
+++ b/mongorestore/main/mongorestore.go
@@ -9,6 +9,7 @@ import (
"github.com/mongodb/mongo-tools/mongorestore"
"github.com/mongodb/mongo-tools/mongorestore/options"
"os"
+ "runtime"
)
func main() {
@@ -36,6 +37,11 @@ func main() {
log.SetVerbosity(opts.Verbosity)
+ if outputOpts.JobThreads > 0 {
+ log.Logf(1, "running mongorestore with %v job threads", outputOpts.JobThreads)
+ runtime.GOMAXPROCS(outputOpts.JobThreads)
+ }
+
targetDir := ""
if inputOpts.Directory != "" {
targetDir = inputOpts.Directory
diff --git a/mongorestore/metadata.go b/mongorestore/metadata.go
index 0da67823e22..2351ab4ba34 100644
--- a/mongorestore/metadata.go
+++ b/mongorestore/metadata.go
@@ -100,6 +100,7 @@ func (restore *MongoRestore) DBHasCollection(intent *Intent) (bool, error) {
if err != nil {
return false, fmt.Errorf("error establishing connection: %v", err)
}
+ session.SetSocketTimeout(0)
defer session.Close()
err = session.DB(intent.DB).C("system.namespaces").Find(bson.M{"name": collectionNS}).One(&result)
if err != nil {
@@ -173,6 +174,7 @@ func (restore *MongoRestore) LegacyInsertIndex(intent *Intent, index IndexDocume
if err != nil {
return fmt.Errorf("error establishing connection: %v", err)
}
+ session.SetSocketTimeout(0)
defer session.Close()
// overwrite safety to make sure we catch errors
@@ -199,6 +201,7 @@ func (restore *MongoRestore) CreateCollection(intent *Intent, options bson.D) er
if err != nil {
return fmt.Errorf("error establishing connection: %v", err)
}
+ session.SetSocketTimeout(0)
defer session.Close()
res := bson.M{}
@@ -258,6 +261,7 @@ func (restore *MongoRestore) RestoreUsersOrRoles(collectionType string, intent *
log.Logf(0, "error establishing connection to drop temporary collection %v: %v", tempCol, err)
return
}
+ session.SetSocketTimeout(0)
defer session.Close()
log.Logf(3, "dropping temporary collection %v", tempCol)
err = session.DB("admin").C(tempCol).DropCollection()
@@ -281,6 +285,7 @@ func (restore *MongoRestore) RestoreUsersOrRoles(collectionType string, intent *
}
session, err := restore.SessionProvider.GetSession()
+ session.SetSocketTimeout(0)
if err != nil {
return fmt.Errorf("error establishing connection: %v", err)
}
diff --git a/mongorestore/mongorestore.go b/mongorestore/mongorestore.go
index b735ba85793..0f5c8b7d452 100644
--- a/mongorestore/mongorestore.go
+++ b/mongorestore/mongorestore.go
@@ -101,6 +101,12 @@ func (restore *MongoRestore) Restore() error {
}
// 2. Restore them...
+ if restore.OutputOptions.JobThreads > 0 {
+ restore.manager.Finalize(MultiDatabaseLTF)
+ } else {
+ // use legacy restoration order if we are single-threaded
+ restore.manager.Finalize(Legacy)
+ }
err = restore.RestoreIntents()
if err != nil {
return fmt.Errorf("restore error: %v", err)
diff --git a/mongorestore/oplog.go b/mongorestore/oplog.go
index 03e7add4740..a911895f1a9 100644
--- a/mongorestore/oplog.go
+++ b/mongorestore/oplog.go
@@ -66,6 +66,7 @@ func (restore *MongoRestore) RestoreOplog() error {
if err != nil {
return fmt.Errorf("error establishing connection: %v", err)
}
+ session.SetSocketTimeout(0)
defer session.Close()
// To restore the oplog, we iterate over the oplog entries,
diff --git a/mongorestore/options/options.go b/mongorestore/options/options.go
index 1a05db5cae7..e121a54b6c3 100644
--- a/mongorestore/options/options.go
+++ b/mongorestore/options/options.go
@@ -19,6 +19,7 @@ type OutputOptions struct {
NoIndexRestore bool `long:"noIndexRestore" description:"Don't restore indexes"`
NoOptionsRestore bool `long:"noOptionsRestore" description:"Don't restore options"`
KeepIndexVersion bool `long:"keepIndexVersion" description:"Don't update index version"`
+ JobThreads int `long:"jobThreads" short:"j" description:"TODO"`
}
func (self *OutputOptions) Name() string {
diff --git a/mongorestore/restore.go b/mongorestore/restore.go
index d9e8930d7d5..6c313ff6cad 100644
--- a/mongorestore/restore.go
+++ b/mongorestore/restore.go
@@ -17,11 +17,54 @@ const ProgressBarLength = 24
// RestoreIntents iterates through all of the normal intents
// stored in the IntentManager, and restores them.
func (restore *MongoRestore) RestoreIntents() error {
+
+ if restore.OutputOptions.JobThreads > 0 {
+ errChan := make(chan error)
+ doneChan := make(chan struct{})
+ doneCount := 0
+
+ // start a goroutine for each job thread
+ for i := 0; i < restore.OutputOptions.JobThreads; i++ {
+ go func(id int) {
+ log.Logf(3, "starting restore routine with id=%v", id)
+ for {
+ intent := restore.manager.Pop()
+ if intent == nil {
+ break
+ }
+ err := restore.RestoreIntent(intent)
+ if err != nil {
+ errChan <- err
+ return
+ }
+ restore.manager.Finish(intent)
+ }
+ log.Logf(3, "ending restore routine with id=%v, no more work to do", id)
+ doneChan <- struct{}{}
+ }(i)
+ }
+
+ // wait until all goroutines are done or one of them errors out
+ for {
+ select {
+ case err := <-errChan:
+ return err
+ case <-doneChan:
+ doneCount++
+ if doneCount == restore.OutputOptions.JobThreads {
+ return nil
+ }
+ }
+ }
+ }
+
+ // single-threaded
for intent := restore.manager.Pop(); intent != nil; intent = restore.manager.Pop() {
err := restore.RestoreIntent(intent)
if err != nil {
return err
}
+ restore.manager.Finish(intent)
}
return nil
}
@@ -49,6 +92,7 @@ func (restore *MongoRestore) RestoreIntent(intent *Intent) error {
log.Logf(1, "dropping collection %v before restoring", intent.Key())
// TODO(erf) maybe encapsulate this so that the session is closed sooner
session, err := restore.SessionProvider.GetSession()
+ session.SetSocketTimeout(0)
if err != nil {
return fmt.Errorf("error establishing connection: %v", err)
}
@@ -172,12 +216,13 @@ func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string,
bar.Start()
defer bar.Stop()
+ //TODO use a goroutine here
doc := &bson.Raw{}
for bsonSource.Next(doc) {
bytesRead += len(doc.Data)
if restore.objCheck {
//TODO encapsulate to reuse bson obj??
- err := bson.Unmarshal(doc.Data, &bson.M{})
+ err := bson.Unmarshal(doc.Data, &bson.D{})
if err != nil {
return err
break