summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/common/intents
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/gotools/common/intents')
-rw-r--r--src/mongo/gotools/common/intents/intent.go466
-rw-r--r--src/mongo/gotools/common/intents/intent_prioritizer.go241
-rw-r--r--src/mongo/gotools/common/intents/intent_prioritizer_test.go174
-rw-r--r--src/mongo/gotools/common/intents/intent_test.go81
4 files changed, 962 insertions, 0 deletions
diff --git a/src/mongo/gotools/common/intents/intent.go b/src/mongo/gotools/common/intents/intent.go
new file mode 100644
index 00000000000..42999806744
--- /dev/null
+++ b/src/mongo/gotools/common/intents/intent.go
@@ -0,0 +1,466 @@
+// Package intents provides utilities for performing dump/restore operations.
+package intents
+
+import (
+ "fmt"
+ "io"
+
+ "github.com/mongodb/mongo-tools/common"
+ "github.com/mongodb/mongo-tools/common/log"
+ "github.com/mongodb/mongo-tools/common/util"
+ "gopkg.in/mgo.v2/bson"
+)
+
+type file interface {
+ io.ReadWriteCloser
+ Open() error
+ Pos() int64
+}
+
+// DestinationConflictError occurs when multiple namespaces map to the same
+// destination.
+type DestinationConflictError struct {
+ Src, Dst string
+}
+
+func (e DestinationConflictError) Error() string {
+ return fmt.Sprintf("destination conflict: %s (src) => %s (dst)", e.Src, e.Dst)
+}
+
+// FileNeedsIOBuffer is an interface that denotes that a struct needs
+// an IO buffer that is managed by an outside control. This interface
+// is used to both hand off a buffer to a struct and signal that it should
+// release its buffer. Added to reduce memory usage as outlined in TOOLS-1088.
+type FileNeedsIOBuffer interface {
+ TakeIOBuffer([]byte)
+ ReleaseIOBuffer()
+}
+
+// mongorestore first scans the directory to generate a list
+// of all files to restore and what they map to. TODO comments
+type Intent struct {
+ // Destination namespace info
+ DB string
+ C string
+
+ // File locations as absolute paths
+ BSONFile file
+ BSONSize int64
+ MetadataFile file
+
+ // Indicates where the intent will be read from or written to
+ Location string
+ MetadataLocation string
+
+ // Collection options
+ Options *bson.D
+
+ // File/collection size, for some prioritizer implementations.
+ // Units don't matter as long as they are consistent for a given use case.
+ Size int64
+}
+
+func (it *Intent) Namespace() string {
+ return it.DB + "." + it.C
+}
+
+func (it *Intent) IsOplog() bool {
+ if it.DB == "" && it.C == "oplog" {
+ return true
+ }
+ return it.DB == "local" && (it.C == "oplog.rs" || it.C == "oplog.$main")
+}
+
+func (it *Intent) IsUsers() bool {
+ if it.C == "$admin.system.users" {
+ return true
+ }
+ if it.DB == "admin" && it.C == "system.users" {
+ return true
+ }
+ return false
+}
+
+func (it *Intent) IsRoles() bool {
+ if it.C == "$admin.system.roles" {
+ return true
+ }
+ if it.DB == "admin" && it.C == "system.roles" {
+ return true
+ }
+ return false
+}
+
+func (it *Intent) IsAuthVersion() bool {
+ if it.C == "$admin.system.version" {
+ return true
+ }
+ if it.DB == "admin" && it.C == "system.version" {
+ return true
+ }
+ return false
+}
+
+func (it *Intent) IsSystemIndexes() bool {
+ return it.C == "system.indexes"
+}
+
+func (intent *Intent) IsSpecialCollection() bool {
+ return intent.IsSystemIndexes() || intent.IsUsers() || intent.IsRoles() || intent.IsAuthVersion()
+}
+
+func (existing *Intent) MergeIntent(intent *Intent) {
+ // merge new intent into old intent
+ if existing.BSONFile == nil {
+ existing.BSONFile = intent.BSONFile
+ }
+ if existing.Size == 0 {
+ existing.Size = intent.Size
+ }
+ if existing.Location == "" {
+ existing.Location = intent.Location
+ }
+ if existing.MetadataFile == nil {
+ existing.MetadataFile = intent.MetadataFile
+ }
+ if existing.MetadataLocation == "" {
+ existing.MetadataLocation = intent.MetadataLocation
+ }
+
+}
+
+type Manager struct {
+ // intents are for all of the regular user created collections
+ intents map[string]*Intent
+ // special intents are for all of the collections that are created by mongod
+ // and require special handling
+ specialIntents map[string]*Intent
+
+ // legacy mongorestore works in the order that paths are discovered,
+ // so we need an ordered data structure to preserve this behavior.
+ intentsByDiscoveryOrder []*Intent
+
+ // we need different scheduling order depending on the target
+ // mongod/mongos and whether or not we are multi threading;
+ // the IntentPrioritizer interface encapsulates this.
+ prioritizer IntentPrioritizer
+
+ // special cases that should be saved but not be part of the queue.
+ // used to deal with oplog and user/roles restoration, which are
+ // handled outside of the basic logic of the tool
+ oplogIntent *Intent
+ usersIntent *Intent
+ rolesIntent *Intent
+ versionIntent *Intent
+ indexIntents map[string]*Intent
+
+ // Tells the manager if it should choose a single oplog when multiple are provided.
+ smartPickOplog bool
+
+ // Indicates if an the manager has seen two conflicting oplogs.
+ oplogConflict bool
+
+ // prevent conflicting destinations by checking which sources map to the
+ // same namespace
+ destinations map[string][]string
+}
+
+func NewIntentManager() *Manager {
+ return &Manager{
+ intents: map[string]*Intent{},
+ specialIntents: map[string]*Intent{},
+ intentsByDiscoveryOrder: []*Intent{},
+ indexIntents: map[string]*Intent{},
+ smartPickOplog: false,
+ oplogConflict: false,
+ destinations: map[string][]string{},
+ }
+}
+
+func (mgr *Manager) SetSmartPickOplog(smartPick bool) {
+ mgr.smartPickOplog = smartPick
+}
+
+// HasConfigDBIntent returns a bool indicating if any of the intents refer to the "config" database.
+// This can be used to check for possible unwanted conflicts before restoring to a sharded system.
+func (mgr *Manager) HasConfigDBIntent() bool {
+ for _, intent := range mgr.intentsByDiscoveryOrder {
+ if intent.DB == "config" {
+ return true
+ }
+ }
+ return false
+}
+
+// PutOplogIntent takes an intent for an oplog and stores it in the intent manager with the
+// provided key. If the manager has smartPickOplog enabled, then it uses a priority system
+// to determine which oplog intent to maintain as the actual oplog.
+func (manager *Manager) PutOplogIntent(intent *Intent, managerKey string) {
+ if manager.smartPickOplog {
+ if existing := manager.specialIntents[managerKey]; existing != nil {
+ existing.MergeIntent(intent)
+ return
+ }
+ if manager.oplogIntent == nil {
+ // If there is no oplog intent, make this one the oplog.
+ manager.oplogIntent = intent
+ manager.specialIntents[managerKey] = intent
+ } else if intent.DB == "" {
+ // We already have an oplog and this is a top priority oplog.
+ if manager.oplogIntent.DB == "" {
+ // If the manager's current oplog is also top priority, we have a
+ // conflict and ignore this oplog.
+ manager.oplogConflict = true
+ } else {
+ // If the manager's current oplog is lower priority, replace it and
+ // move that one to be a normal intent.
+ manager.putNormalIntent(manager.oplogIntent)
+ delete(manager.specialIntents, manager.oplogIntent.Namespace())
+ manager.oplogIntent = intent
+ manager.specialIntents[managerKey] = intent
+ }
+ } else {
+ // We already have an oplog and this is a low priority oplog.
+ if manager.oplogIntent.DB != "" {
+ // If the manager's current oplog is also low priority, set a conflict.
+ manager.oplogConflict = true
+ }
+ // No matter what, set this lower priority oplog to be a normal intent.
+ manager.putNormalIntent(intent)
+ }
+ } else {
+ if intent.DB == "" && intent.C == "oplog" {
+ // If this is a normal oplog, then add it as an oplog intent.
+ if existing := manager.specialIntents[managerKey]; existing != nil {
+ existing.MergeIntent(intent)
+ return
+ }
+ manager.oplogIntent = intent
+ manager.specialIntents[managerKey] = intent
+ } else {
+ manager.putNormalIntent(intent)
+ }
+ }
+}
+
+func (manager *Manager) putNormalIntent(intent *Intent) {
+ manager.putNormalIntentWithNamespace(intent.Namespace(), intent)
+}
+
+func (manager *Manager) putNormalIntentWithNamespace(ns string, intent *Intent) {
+ // BSON and metadata files for the same collection are merged
+ // into the same intent. This is done to allow for simple
+ // pairing of BSON + metadata without keeping track of the
+ // state of the filepath walker
+ if existing := manager.intents[ns]; existing != nil {
+ if existing.Namespace() != intent.Namespace() {
+ // remove old destination, add new one
+ dst := existing.Namespace()
+ dsts := manager.destinations[dst]
+ i := util.StringSliceIndex(dsts, ns)
+ manager.destinations[dst] = append(dsts[:i], dsts[i+1:]...)
+
+ dsts = manager.destinations[intent.Namespace()]
+ manager.destinations[intent.Namespace()] = append(dsts, ns)
+ }
+ existing.MergeIntent(intent)
+ return
+ }
+
+ // if key doesn't already exist, add it to the manager
+ manager.intents[ns] = intent
+ manager.intentsByDiscoveryOrder = append(manager.intentsByDiscoveryOrder, intent)
+
+ manager.destinations[intent.Namespace()] = append(manager.destinations[intent.Namespace()], ns)
+}
+
+// Put inserts an intent into the manager with the same source namespace as
+// its destinations.
+func (manager *Manager) Put(intent *Intent) {
+ manager.PutWithNamespace(intent.Namespace(), intent)
+}
+
+// PutWithNamespace inserts an intent into the manager with the source set
+// to the provided namespace. Intents for the same collection are merged
+// together, so that BSON and metadata files for the same collection are
+// returned in the same intent.
+func (manager *Manager) PutWithNamespace(ns string, intent *Intent) {
+ if intent == nil {
+ panic("cannot insert nil *Intent into IntentManager")
+ }
+ db, _ := common.SplitNamespace(ns)
+
+ // bucket special-case collections
+ if intent.IsOplog() {
+ manager.PutOplogIntent(intent, intent.Namespace())
+ return
+ }
+ if intent.IsSystemIndexes() {
+ if intent.BSONFile != nil {
+ manager.indexIntents[db] = intent
+ manager.specialIntents[ns] = intent
+ }
+ return
+ }
+ if intent.IsUsers() {
+ if intent.BSONFile != nil {
+ manager.usersIntent = intent
+ manager.specialIntents[ns] = intent
+ }
+ return
+ }
+ if intent.IsRoles() {
+ if intent.BSONFile != nil {
+ manager.rolesIntent = intent
+ manager.specialIntents[ns] = intent
+ }
+ return
+ }
+ if intent.IsAuthVersion() {
+ if intent.BSONFile != nil {
+ manager.versionIntent = intent
+ manager.specialIntents[ns] = intent
+ }
+ return
+ }
+
+ manager.putNormalIntentWithNamespace(ns, intent)
+}
+
+func (manager *Manager) GetOplogConflict() bool {
+ return manager.oplogConflict
+}
+
+func (manager *Manager) GetDestinationConflicts() (errs []DestinationConflictError) {
+ for dst, srcs := range manager.destinations {
+ if len(srcs) <= 1 {
+ continue
+ }
+ for _, src := range srcs {
+ errs = append(errs, DestinationConflictError{Dst: dst, Src: src})
+ }
+ }
+ return
+}
+
+// Intents returns a slice containing all of the intents in the manager.
+// Intents is not thread safe
+func (manager *Manager) Intents() []*Intent {
+ allIntents := []*Intent{}
+ for _, intent := range manager.intents {
+ allIntents = append(allIntents, intent)
+ }
+ for _, intent := range manager.indexIntents {
+ allIntents = append(allIntents, intent)
+ }
+ if manager.oplogIntent != nil {
+ allIntents = append(allIntents, manager.oplogIntent)
+ }
+ if manager.usersIntent != nil {
+ allIntents = append(allIntents, manager.usersIntent)
+ }
+ if manager.rolesIntent != nil {
+ allIntents = append(allIntents, manager.rolesIntent)
+ }
+ if manager.versionIntent != nil {
+ allIntents = append(allIntents, manager.versionIntent)
+ }
+ return allIntents
+}
+
+func (manager *Manager) IntentForNamespace(ns string) *Intent {
+ intent := manager.intents[ns]
+ if intent != nil {
+ return intent
+ }
+ intent = manager.specialIntents[ns]
+ return intent
+}
+
+// Pop returns the next available intent from the manager. If the manager is
+// empty, it returns nil. Pop is thread safe.
+func (manager *Manager) Pop() *Intent {
+ return manager.prioritizer.Get()
+}
+
+// Peek returns a copy of a stored intent from the manager without removing
+// the intent. This method is useful for edge cases that need to look ahead
+// at what collections are in the manager before they are scheduled.
+//
+// NOTE: There are no guarantees that peek will return a usable
+// intent after Finalize() is called.
+func (manager *Manager) Peek() *Intent {
+ if len(manager.intentsByDiscoveryOrder) == 0 {
+ return nil
+ }
+ intentCopy := *manager.intentsByDiscoveryOrder[0]
+ return &intentCopy
+}
+
+// Finish tells the prioritizer that mongorestore is done restoring
+// the given collection intent.
+func (manager *Manager) Finish(intent *Intent) {
+ manager.prioritizer.Finish(intent)
+}
+
+// Oplog returns the intent representing the oplog, which isn't
+// stored with the other intents, because it is dumped and restored in
+// a very different way from other collections.
+func (manager *Manager) Oplog() *Intent {
+ return manager.oplogIntent
+}
+
+// SystemIndexes returns the system.indexes bson for a database
+func (manager *Manager) SystemIndexes(dbName string) *Intent {
+ return manager.indexIntents[dbName]
+}
+
+// SystemIndexes returns the databases for which there are system.indexes
+func (manager *Manager) SystemIndexDBs() []string {
+ databases := []string{}
+ for dbname := range manager.indexIntents {
+ databases = append(databases, dbname)
+ }
+ return databases
+}
+
+// Users returns the intent of the users collection to restore, a special case
+func (manager *Manager) Users() *Intent {
+ return manager.usersIntent
+}
+
+// Roles returns the intent of the user roles collection to restore, a special case
+func (manager *Manager) Roles() *Intent {
+ return manager.rolesIntent
+}
+
+// AuthVersion returns the intent of the version collection to restore, a special case
+func (manager *Manager) AuthVersion() *Intent {
+ return manager.versionIntent
+}
+
+// Finalize processes the intents for prioritization. Currently only two
+// kinds of prioritizers are supported. No more "Put" operations may be done
+// after finalize is called.
+func (manager *Manager) Finalize(pType PriorityType) {
+ switch pType {
+ case Legacy:
+ log.Logv(log.DebugHigh, "finalizing intent manager with legacy prioritizer")
+ manager.prioritizer = NewLegacyPrioritizer(manager.intentsByDiscoveryOrder)
+ case LongestTaskFirst:
+ log.Logv(log.DebugHigh, "finalizing intent manager with longest task first prioritizer")
+ manager.prioritizer = NewLongestTaskFirstPrioritizer(manager.intentsByDiscoveryOrder)
+ case MultiDatabaseLTF:
+ log.Logv(log.DebugHigh, "finalizing intent manager with multi-database longest task first prioritizer")
+ manager.prioritizer = NewMultiDatabaseLTFPrioritizer(manager.intentsByDiscoveryOrder)
+ default:
+ panic("cannot initialize IntentPrioritizer with unknown type")
+ }
+ // release these for the garbage collector and to ensure code correctness
+ manager.intents = nil
+ manager.intentsByDiscoveryOrder = nil
+}
+
+func (manager *Manager) UsePrioritizer(prioritizer IntentPrioritizer) {
+ manager.prioritizer = prioritizer
+}
diff --git a/src/mongo/gotools/common/intents/intent_prioritizer.go b/src/mongo/gotools/common/intents/intent_prioritizer.go
new file mode 100644
index 00000000000..290a7c83d1e
--- /dev/null
+++ b/src/mongo/gotools/common/intents/intent_prioritizer.go
@@ -0,0 +1,241 @@
+package intents
+
+import (
+ "container/heap"
+ "sort"
+ "sync"
+)
+
+type PriorityType int
+
+const (
+ Legacy PriorityType = iota
+ LongestTaskFirst
+ MultiDatabaseLTF
+)
+
+// IntentPrioritizer encapsulates the logic of scheduling intents
+// for restoration. It can know about which intents are in the
+// process of being restored through the "Finish" hook.
+//
+// Oplog entries and auth entries are not handled by the prioritizer,
+// as these are special cases handled by the regular mongorestore code.
+type IntentPrioritizer interface {
+ Get() *Intent
+ Finish(*Intent)
+}
+
+//===== Legacy =====
+
+// legacyPrioritizer processes the intents in the order they were read off the
+// file system, keeping with legacy mongorestore behavior.
+type legacyPrioritizer struct {
+ sync.Mutex
+ queue []*Intent
+}
+
+func NewLegacyPrioritizer(intentList []*Intent) *legacyPrioritizer {
+ return &legacyPrioritizer{queue: intentList}
+}
+
+func (legacy *legacyPrioritizer) Get() *Intent {
+ legacy.Lock()
+ defer legacy.Unlock()
+
+ if len(legacy.queue) == 0 {
+ return nil
+ }
+
+ var intent *Intent
+ intent, legacy.queue = legacy.queue[0], legacy.queue[1:]
+ return intent
+}
+
+func (legacy *legacyPrioritizer) Finish(*Intent) {
+ // no-op
+ return
+}
+
+//===== Longest Task First =====
+
+// longestTaskFirstPrioritizer returns intents in the order of largest -> smallest,
+// which is better at minimizing total runtime in parallel environments than
+// other simple orderings.
+type longestTaskFirstPrioritizer struct {
+ sync.Mutex
+ queue []*Intent
+}
+
+// NewLongestTaskFirstPrioritizer returns an initialized LTP prioritizer
+func NewLongestTaskFirstPrioritizer(intents []*Intent) *longestTaskFirstPrioritizer {
+ sort.Sort(BySize(intents))
+ return &longestTaskFirstPrioritizer{
+ queue: intents,
+ }
+}
+
+func (ltf *longestTaskFirstPrioritizer) Get() *Intent {
+ ltf.Lock()
+ defer ltf.Unlock()
+
+ if len(ltf.queue) == 0 {
+ return nil
+ }
+
+ var intent *Intent
+ intent, ltf.queue = ltf.queue[0], ltf.queue[1:]
+ return intent
+}
+
+func (ltf *longestTaskFirstPrioritizer) Finish(*Intent) {
+ // no-op
+ return
+}
+
+// For sorting intents from largest to smallest size
+type BySize []*Intent
+
+func (s BySize) Len() int { return len(s) }
+func (s BySize) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+func (s BySize) Less(i, j int) bool { return s[i].Size > s[j].Size }
+
+//===== Multi Database Longest Task First =====
+
+// multiDatabaseLTF is designed to properly schedule intents with two constraints:
+// 1. it is optimized to run in a multi-processor environment
+// 2. it is optimized for parallelism against 2.6's db-level write lock
+// These goals result in a design that attempts to have as many different
+// database's intents being restored as possible and attempts to restore the
+// largest collections first.
+//
+// If we can have a minimum number of collections in flight for a given db,
+// we avoid lock contention in an optimal way on 2.6 systems. That is,
+// it is better to have two restore jobs where
+// job1 = "test.mycollection"
+// job2 = "mydb2.othercollection"
+// so that these collections do not compete for the db-level write lock.
+//
+// We also schedule the largest jobs first, in a greedy fashion, in order
+// to minimize total restoration time. Each database's intents are sorted
+// by decreasing file size at initialization, so that the largest jobs are
+// run first. Admittedly, .bson file size is not a direct predictor of restore
+// time, but there is certainly a strong correlation. Note that this attribute
+// is secondary to the multi-db scheduling laid out above, since multi-db will
+// get us bigger wins in terms of parallelism.
+type multiDatabaseLTFPrioritizer struct {
+ sync.Mutex
+ dbHeap heap.Interface
+ counterMap map[string]*dbCounter
+}
+
+// NewMultiDatabaseLTFPrioritizer takes in a list of intents and returns an
+// initialized prioritizer.
+func NewMultiDatabaseLTFPrioritizer(intents []*Intent) *multiDatabaseLTFPrioritizer {
+ prioritizer := &multiDatabaseLTFPrioritizer{
+ counterMap: map[string]*dbCounter{},
+ dbHeap: &DBHeap{},
+ }
+ heap.Init(prioritizer.dbHeap)
+ // first, create all database counters
+ for _, intent := range intents {
+ counter, exists := prioritizer.counterMap[intent.DB]
+ if !exists {
+ // initialize a new counter if one doesn't exist for DB
+ counter = &dbCounter{}
+ prioritizer.counterMap[intent.DB] = counter
+ }
+ counter.collections = append(counter.collections, intent)
+ }
+ // then ensure that all the dbCounters have sorted intents
+ for _, counter := range prioritizer.counterMap {
+ counter.SortCollectionsBySize()
+ heap.Push(prioritizer.dbHeap, counter)
+ }
+ return prioritizer
+}
+
+// Get returns the next prioritized intent and updates the count of active
+// restores for the returned intent's DB. Get is not thread safe, and depends
+// on the implementation of the intent manager to lock around it.
+func (mdb *multiDatabaseLTFPrioritizer) Get() *Intent {
+ mdb.Lock()
+ defer mdb.Unlock()
+
+ if mdb.dbHeap.Len() == 0 {
+ // we're out of things to return
+ return nil
+ }
+ optimalDB := heap.Pop(mdb.dbHeap).(*dbCounter)
+ optimalDB.active++
+ nextIntent := optimalDB.PopIntent()
+ // only release the db counter if it's out of collections
+ if len(optimalDB.collections) > 0 {
+ heap.Push(mdb.dbHeap, optimalDB)
+ }
+ return nextIntent
+}
+
+// Finish decreases the number of active restore jobs for the given intent's
+// database, and reshuffles the heap accordingly. Finish is not thread safe,
+// and depends on the implementation of the intent manager to lock around it.
+func (mdb *multiDatabaseLTFPrioritizer) Finish(intent *Intent) {
+ mdb.Lock()
+ defer mdb.Unlock()
+
+ counter := mdb.counterMap[intent.DB]
+ counter.active--
+ // only fix up the heap if the counter is still in the heap
+ if len(counter.collections) > 0 {
+ // This is an O(n) operation on the heap. We could make all heap
+ // operations O(log(n)) if we set up dbCounters to track their own
+ // position in the heap, but in practice this overhead is likely negligible.
+ heap.Init(mdb.dbHeap)
+ }
+}
+
+type dbCounter struct {
+ active int
+ collections []*Intent
+}
+
+func (dbc *dbCounter) SortCollectionsBySize() {
+ sort.Sort(BySize(dbc.collections))
+}
+
+// PopIntent returns the largest intent remaining for the database
+func (dbc *dbCounter) PopIntent() *Intent {
+ var intent *Intent
+ if len(dbc.collections) > 0 {
+ intent, dbc.collections = dbc.collections[0], dbc.collections[1:]
+ }
+ return intent
+}
+
+// Returns the largest collection of the databases with the least active restores.
+// Implements the container/heap interface. None of its methods are meant to be
+// called directly.
+type DBHeap []*dbCounter
+
+func (dbh DBHeap) Len() int { return len(dbh) }
+func (dbh DBHeap) Swap(i, j int) { dbh[i], dbh[j] = dbh[j], dbh[i] }
+func (dbh DBHeap) Less(i, j int) bool {
+ if dbh[i].active == dbh[j].active {
+ // prioritize the largest bson file if dbs have the same number
+ // of restorations in progress
+ return dbh[i].collections[0].Size > dbh[j].collections[0].Size
+ }
+ return dbh[i].active < dbh[j].active
+}
+
+func (dbh *DBHeap) Push(x interface{}) {
+ *dbh = append(*dbh, x.(*dbCounter))
+}
+
+func (dbh *DBHeap) Pop() interface{} {
+ // for container/heap package: removes the top entry and resizes the heap array
+ old := *dbh
+ n := len(old)
+ toPop := old[n-1]
+ *dbh = old[0 : n-1]
+ return toPop
+}
diff --git a/src/mongo/gotools/common/intents/intent_prioritizer_test.go b/src/mongo/gotools/common/intents/intent_prioritizer_test.go
new file mode 100644
index 00000000000..2abd79e5641
--- /dev/null
+++ b/src/mongo/gotools/common/intents/intent_prioritizer_test.go
@@ -0,0 +1,174 @@
+package intents
+
+import (
+ "container/heap"
+ "github.com/mongodb/mongo-tools/common/testutil"
+ . "github.com/smartystreets/goconvey/convey"
+ "testing"
+)
+
+func TestLegacyPrioritizer(t *testing.T) {
+
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("With a legacyPrioritizer initialized with an ordered intent list", t, func() {
+ testList := []*Intent{
+ &Intent{DB: "1"},
+ &Intent{DB: "2"},
+ &Intent{DB: "3"},
+ }
+ legacy := NewLegacyPrioritizer(testList)
+ So(legacy, ShouldNotBeNil)
+
+ Convey("the priority should be defined by 'first-in-first-out'", func() {
+ it0 := legacy.Get()
+ it1 := legacy.Get()
+ it2 := legacy.Get()
+ it3 := legacy.Get()
+ So(it3, ShouldBeNil)
+ So(it0.DB, ShouldBeLessThan, it1.DB)
+ So(it1.DB, ShouldBeLessThan, it2.DB)
+ })
+ })
+}
+
+func TestBasicDBHeapBehavior(t *testing.T) {
+ var dbheap heap.Interface
+
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("With an empty dbHeap", t, func() {
+ dbheap = &DBHeap{}
+ heap.Init(dbheap)
+
+ Convey("when inserting unordered dbCounters with different active counts", func() {
+ heap.Push(dbheap, &dbCounter{75, nil})
+ heap.Push(dbheap, &dbCounter{121, nil})
+ heap.Push(dbheap, &dbCounter{76, nil})
+ heap.Push(dbheap, &dbCounter{51, nil})
+ heap.Push(dbheap, &dbCounter{82, nil})
+ heap.Push(dbheap, &dbCounter{117, nil})
+ heap.Push(dbheap, &dbCounter{49, nil})
+ heap.Push(dbheap, &dbCounter{101, nil})
+ heap.Push(dbheap, &dbCounter{122, nil})
+ heap.Push(dbheap, &dbCounter{33, nil})
+ heap.Push(dbheap, &dbCounter{0, nil})
+
+ Convey("they should pop in active order, least to greatest", func() {
+ prev := -1
+ for dbheap.Len() > 0 {
+ popped := heap.Pop(dbheap).(*dbCounter)
+ So(popped.active, ShouldBeGreaterThan, prev)
+ prev = popped.active
+ }
+ })
+ })
+
+ Convey("when inserting unordered dbCounters with different bson sizes", func() {
+ heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{Size: 70}}})
+ heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{Size: 1024}}})
+ heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{Size: 97}}})
+ heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{Size: 3}}})
+ heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{Size: 1024 * 1024}}})
+
+ Convey("they should pop in bson size order, greatest to least", func() {
+ prev := int64(1024*1024 + 1) // Maximum
+ for dbheap.Len() > 0 {
+ popped := heap.Pop(dbheap).(*dbCounter)
+ So(popped.collections[0].Size, ShouldBeLessThan, prev)
+ prev = popped.collections[0].Size
+ }
+ })
+ })
+ })
+}
+
+func TestDBCounterCollectionSorting(t *testing.T) {
+
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("With a dbCounter and an unordered collection of intents", t, func() {
+ dbc := &dbCounter{
+ collections: []*Intent{
+ &Intent{Size: 100},
+ &Intent{Size: 1000},
+ &Intent{Size: 1},
+ &Intent{Size: 10},
+ },
+ }
+
+ Convey("popping the sorted intents should return in decreasing BSONSize", func() {
+ dbc.SortCollectionsBySize()
+ So(dbc.PopIntent().Size, ShouldEqual, 1000)
+ So(dbc.PopIntent().Size, ShouldEqual, 100)
+ So(dbc.PopIntent().Size, ShouldEqual, 10)
+ So(dbc.PopIntent().Size, ShouldEqual, 1)
+ So(dbc.PopIntent(), ShouldBeNil)
+ So(dbc.PopIntent(), ShouldBeNil)
+ })
+ })
+}
+
+func TestSimulatedMultiDBJob(t *testing.T) {
+ var prioritizer IntentPrioritizer
+
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("With a prioritizer initialized with a set of intents", t, func() {
+ intents := []*Intent{
+ &Intent{C: "small", DB: "db2", Size: 32},
+ &Intent{C: "medium", DB: "db2", Size: 128},
+ &Intent{C: "giant", DB: "db1", Size: 1024},
+ &Intent{C: "tiny", DB: "db1", Size: 2},
+ }
+ prioritizer = NewMultiDatabaseLTFPrioritizer(intents)
+ So(prioritizer, ShouldNotBeNil)
+
+ Convey("and a running simulation of two jobs threads:", func() {
+ Convey("the first two intents should be of different dbs", func() {
+ i0 := prioritizer.Get()
+ So(i0, ShouldNotBeNil)
+ i1 := prioritizer.Get()
+ So(i1, ShouldNotBeNil)
+
+ Convey("the first intent should be the largest bson file", func() {
+ So(i0.C, ShouldEqual, "giant")
+ So(i0.DB, ShouldEqual, "db1")
+ })
+
+ Convey("the second intent should be the largest bson file of db2", func() {
+ So(i1.C, ShouldEqual, "medium")
+ So(i1.DB, ShouldEqual, "db2")
+ })
+
+ Convey("with the second job finishing the smaller intents", func() {
+ prioritizer.Finish(i1)
+ i2 := prioritizer.Get()
+ So(i2, ShouldNotBeNil)
+ prioritizer.Finish(i2)
+ i3 := prioritizer.Get()
+ So(i3, ShouldNotBeNil)
+
+ Convey("the next job should be from db2", func() {
+ So(i2.C, ShouldEqual, "small")
+ So(i2.DB, ShouldEqual, "db2")
+ })
+
+ Convey("the final job should be from db1", func() {
+ So(i3.C, ShouldEqual, "tiny")
+ So(i3.DB, ShouldEqual, "db1")
+
+ Convey("which means that there should be two active db1 jobs", func() {
+ counter := prioritizer.(*multiDatabaseLTFPrioritizer).counterMap["db1"]
+ So(counter.active, ShouldEqual, 2)
+ })
+ })
+
+ Convey("the heap should now be empty", func() {
+ So(prioritizer.(*multiDatabaseLTFPrioritizer).dbHeap.Len(), ShouldEqual, 0)
+ })
+ })
+ })
+ })
+ })
+}
diff --git a/src/mongo/gotools/common/intents/intent_test.go b/src/mongo/gotools/common/intents/intent_test.go
new file mode 100644
index 00000000000..15b99f1af30
--- /dev/null
+++ b/src/mongo/gotools/common/intents/intent_test.go
@@ -0,0 +1,81 @@
+package intents
+
+import (
+ "github.com/mongodb/mongo-tools/common/testutil"
+ . "github.com/smartystreets/goconvey/convey"
+ "testing"
+)
+
+func TestIntentManager(t *testing.T) {
+ var manager *Manager
+
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("With an empty IntentManager", t, func() {
+ manager = NewIntentManager()
+ So(manager, ShouldNotBeNil)
+
+ Convey("putting a series of added bson intents", func() {
+ manager.Put(&Intent{DB: "1", C: "1", Location: "/b1/"})
+ manager.Put(&Intent{DB: "1", C: "2", Location: "/b2/"})
+ manager.Put(&Intent{DB: "1", C: "3", Location: "/b3/"})
+ manager.Put(&Intent{DB: "2", C: "1", Location: "/b4/"})
+ So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 4)
+ So(len(manager.intents), ShouldEqual, 4)
+
+ Convey("and then some matching metadata intents", func() {
+ manager.Put(&Intent{DB: "2", C: "1", MetadataLocation: "/4m/"})
+ manager.Put(&Intent{DB: "1", C: "3", MetadataLocation: "/3m/"})
+ manager.Put(&Intent{DB: "1", C: "1", MetadataLocation: "/1m/"})
+ manager.Put(&Intent{DB: "1", C: "2", MetadataLocation: "/2m/"})
+
+ Convey("the size of the queue should be unchanged", func() {
+ So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 4)
+ So(len(manager.intents), ShouldEqual, 4)
+ })
+
+ Convey("popping them from the IntentManager", func() {
+ manager.Finalize(Legacy)
+ it0 := manager.Pop()
+ it1 := manager.Pop()
+ it2 := manager.Pop()
+ it3 := manager.Pop()
+ it4 := manager.Pop()
+ So(it4, ShouldBeNil)
+
+ Convey("should return them in insert order", func() {
+ So(*it0, ShouldResemble,
+ Intent{DB: "1", C: "1", Location: "/b1/", MetadataLocation: "/1m/"})
+ So(*it1, ShouldResemble,
+ Intent{DB: "1", C: "2", Location: "/b2/", MetadataLocation: "/2m/"})
+ So(*it2, ShouldResemble,
+ Intent{DB: "1", C: "3", Location: "/b3/", MetadataLocation: "/3m/"})
+ So(*it3, ShouldResemble,
+ Intent{DB: "2", C: "1", Location: "/b4/", MetadataLocation: "/4m/"})
+ })
+ })
+ })
+
+ Convey("but adding non-matching intents", func() {
+ manager.Put(&Intent{DB: "7", C: "49", MetadataLocation: "/5/"})
+ manager.Put(&Intent{DB: "27", C: "B", MetadataLocation: "/6/"})
+
+ Convey("should increase the size, because they are not merged in", func() {
+ So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 6)
+ So(len(manager.intents), ShouldEqual, 6)
+ })
+ })
+
+ Convey("using the Peek() method", func() {
+ peeked := manager.Peek()
+ So(peeked, ShouldNotBeNil)
+ So(peeked, ShouldResemble, manager.intentsByDiscoveryOrder[0])
+
+ Convey("modifying the returned copy should not modify the original", func() {
+ peeked.DB = "SHINY NEW VALUE"
+ So(peeked, ShouldNotResemble, manager.intentsByDiscoveryOrder[0])
+ })
+ })
+ })
+ })
+}