diff options
Diffstat (limited to 'src/mongo/gotools/common/intents')
-rw-r--r-- | src/mongo/gotools/common/intents/intent.go | 466 | ||||
-rw-r--r-- | src/mongo/gotools/common/intents/intent_prioritizer.go | 241 | ||||
-rw-r--r-- | src/mongo/gotools/common/intents/intent_prioritizer_test.go | 174 | ||||
-rw-r--r-- | src/mongo/gotools/common/intents/intent_test.go | 81 |
4 files changed, 962 insertions, 0 deletions
diff --git a/src/mongo/gotools/common/intents/intent.go b/src/mongo/gotools/common/intents/intent.go new file mode 100644 index 00000000000..42999806744 --- /dev/null +++ b/src/mongo/gotools/common/intents/intent.go @@ -0,0 +1,466 @@ +// Package intents provides utilities for performing dump/restore operations. +package intents + +import ( + "fmt" + "io" + + "github.com/mongodb/mongo-tools/common" + "github.com/mongodb/mongo-tools/common/log" + "github.com/mongodb/mongo-tools/common/util" + "gopkg.in/mgo.v2/bson" +) + +type file interface { + io.ReadWriteCloser + Open() error + Pos() int64 +} + +// DestinationConflictError occurs when multiple namespaces map to the same +// destination. +type DestinationConflictError struct { + Src, Dst string +} + +func (e DestinationConflictError) Error() string { + return fmt.Sprintf("destination conflict: %s (src) => %s (dst)", e.Src, e.Dst) +} + +// FileNeedsIOBuffer is an interface that denotes that a struct needs +// an IO buffer that is managed by an outside control. This interface +// is used to both hand off a buffer to a struct and signal that it should +// release its buffer. Added to reduce memory usage as outlined in TOOLS-1088. +type FileNeedsIOBuffer interface { + TakeIOBuffer([]byte) + ReleaseIOBuffer() +} + +// mongorestore first scans the directory to generate a list +// of all files to restore and what they map to. TODO comments +type Intent struct { + // Destination namespace info + DB string + C string + + // File locations as absolute paths + BSONFile file + BSONSize int64 + MetadataFile file + + // Indicates where the intent will be read from or written to + Location string + MetadataLocation string + + // Collection options + Options *bson.D + + // File/collection size, for some prioritizer implementations. + // Units don't matter as long as they are consistent for a given use case. + Size int64 +} + +func (it *Intent) Namespace() string { + return it.DB + "." + it.C +} + +func (it *Intent) IsOplog() bool { + if it.DB == "" && it.C == "oplog" { + return true + } + return it.DB == "local" && (it.C == "oplog.rs" || it.C == "oplog.$main") +} + +func (it *Intent) IsUsers() bool { + if it.C == "$admin.system.users" { + return true + } + if it.DB == "admin" && it.C == "system.users" { + return true + } + return false +} + +func (it *Intent) IsRoles() bool { + if it.C == "$admin.system.roles" { + return true + } + if it.DB == "admin" && it.C == "system.roles" { + return true + } + return false +} + +func (it *Intent) IsAuthVersion() bool { + if it.C == "$admin.system.version" { + return true + } + if it.DB == "admin" && it.C == "system.version" { + return true + } + return false +} + +func (it *Intent) IsSystemIndexes() bool { + return it.C == "system.indexes" +} + +func (intent *Intent) IsSpecialCollection() bool { + return intent.IsSystemIndexes() || intent.IsUsers() || intent.IsRoles() || intent.IsAuthVersion() +} + +func (existing *Intent) MergeIntent(intent *Intent) { + // merge new intent into old intent + if existing.BSONFile == nil { + existing.BSONFile = intent.BSONFile + } + if existing.Size == 0 { + existing.Size = intent.Size + } + if existing.Location == "" { + existing.Location = intent.Location + } + if existing.MetadataFile == nil { + existing.MetadataFile = intent.MetadataFile + } + if existing.MetadataLocation == "" { + existing.MetadataLocation = intent.MetadataLocation + } + +} + +type Manager struct { + // intents are for all of the regular user created collections + intents map[string]*Intent + // special intents are for all of the collections that are created by mongod + // and require special handling + specialIntents map[string]*Intent + + // legacy mongorestore works in the order that paths are discovered, + // so we need an ordered data structure to preserve this behavior. + intentsByDiscoveryOrder []*Intent + + // we need different scheduling order depending on the target + // mongod/mongos and whether or not we are multi threading; + // the IntentPrioritizer interface encapsulates this. + prioritizer IntentPrioritizer + + // special cases that should be saved but not be part of the queue. + // used to deal with oplog and user/roles restoration, which are + // handled outside of the basic logic of the tool + oplogIntent *Intent + usersIntent *Intent + rolesIntent *Intent + versionIntent *Intent + indexIntents map[string]*Intent + + // Tells the manager if it should choose a single oplog when multiple are provided. + smartPickOplog bool + + // Indicates if an the manager has seen two conflicting oplogs. + oplogConflict bool + + // prevent conflicting destinations by checking which sources map to the + // same namespace + destinations map[string][]string +} + +func NewIntentManager() *Manager { + return &Manager{ + intents: map[string]*Intent{}, + specialIntents: map[string]*Intent{}, + intentsByDiscoveryOrder: []*Intent{}, + indexIntents: map[string]*Intent{}, + smartPickOplog: false, + oplogConflict: false, + destinations: map[string][]string{}, + } +} + +func (mgr *Manager) SetSmartPickOplog(smartPick bool) { + mgr.smartPickOplog = smartPick +} + +// HasConfigDBIntent returns a bool indicating if any of the intents refer to the "config" database. +// This can be used to check for possible unwanted conflicts before restoring to a sharded system. +func (mgr *Manager) HasConfigDBIntent() bool { + for _, intent := range mgr.intentsByDiscoveryOrder { + if intent.DB == "config" { + return true + } + } + return false +} + +// PutOplogIntent takes an intent for an oplog and stores it in the intent manager with the +// provided key. If the manager has smartPickOplog enabled, then it uses a priority system +// to determine which oplog intent to maintain as the actual oplog. +func (manager *Manager) PutOplogIntent(intent *Intent, managerKey string) { + if manager.smartPickOplog { + if existing := manager.specialIntents[managerKey]; existing != nil { + existing.MergeIntent(intent) + return + } + if manager.oplogIntent == nil { + // If there is no oplog intent, make this one the oplog. + manager.oplogIntent = intent + manager.specialIntents[managerKey] = intent + } else if intent.DB == "" { + // We already have an oplog and this is a top priority oplog. + if manager.oplogIntent.DB == "" { + // If the manager's current oplog is also top priority, we have a + // conflict and ignore this oplog. + manager.oplogConflict = true + } else { + // If the manager's current oplog is lower priority, replace it and + // move that one to be a normal intent. + manager.putNormalIntent(manager.oplogIntent) + delete(manager.specialIntents, manager.oplogIntent.Namespace()) + manager.oplogIntent = intent + manager.specialIntents[managerKey] = intent + } + } else { + // We already have an oplog and this is a low priority oplog. + if manager.oplogIntent.DB != "" { + // If the manager's current oplog is also low priority, set a conflict. + manager.oplogConflict = true + } + // No matter what, set this lower priority oplog to be a normal intent. + manager.putNormalIntent(intent) + } + } else { + if intent.DB == "" && intent.C == "oplog" { + // If this is a normal oplog, then add it as an oplog intent. + if existing := manager.specialIntents[managerKey]; existing != nil { + existing.MergeIntent(intent) + return + } + manager.oplogIntent = intent + manager.specialIntents[managerKey] = intent + } else { + manager.putNormalIntent(intent) + } + } +} + +func (manager *Manager) putNormalIntent(intent *Intent) { + manager.putNormalIntentWithNamespace(intent.Namespace(), intent) +} + +func (manager *Manager) putNormalIntentWithNamespace(ns string, intent *Intent) { + // BSON and metadata files for the same collection are merged + // into the same intent. This is done to allow for simple + // pairing of BSON + metadata without keeping track of the + // state of the filepath walker + if existing := manager.intents[ns]; existing != nil { + if existing.Namespace() != intent.Namespace() { + // remove old destination, add new one + dst := existing.Namespace() + dsts := manager.destinations[dst] + i := util.StringSliceIndex(dsts, ns) + manager.destinations[dst] = append(dsts[:i], dsts[i+1:]...) + + dsts = manager.destinations[intent.Namespace()] + manager.destinations[intent.Namespace()] = append(dsts, ns) + } + existing.MergeIntent(intent) + return + } + + // if key doesn't already exist, add it to the manager + manager.intents[ns] = intent + manager.intentsByDiscoveryOrder = append(manager.intentsByDiscoveryOrder, intent) + + manager.destinations[intent.Namespace()] = append(manager.destinations[intent.Namespace()], ns) +} + +// Put inserts an intent into the manager with the same source namespace as +// its destinations. +func (manager *Manager) Put(intent *Intent) { + manager.PutWithNamespace(intent.Namespace(), intent) +} + +// PutWithNamespace inserts an intent into the manager with the source set +// to the provided namespace. Intents for the same collection are merged +// together, so that BSON and metadata files for the same collection are +// returned in the same intent. +func (manager *Manager) PutWithNamespace(ns string, intent *Intent) { + if intent == nil { + panic("cannot insert nil *Intent into IntentManager") + } + db, _ := common.SplitNamespace(ns) + + // bucket special-case collections + if intent.IsOplog() { + manager.PutOplogIntent(intent, intent.Namespace()) + return + } + if intent.IsSystemIndexes() { + if intent.BSONFile != nil { + manager.indexIntents[db] = intent + manager.specialIntents[ns] = intent + } + return + } + if intent.IsUsers() { + if intent.BSONFile != nil { + manager.usersIntent = intent + manager.specialIntents[ns] = intent + } + return + } + if intent.IsRoles() { + if intent.BSONFile != nil { + manager.rolesIntent = intent + manager.specialIntents[ns] = intent + } + return + } + if intent.IsAuthVersion() { + if intent.BSONFile != nil { + manager.versionIntent = intent + manager.specialIntents[ns] = intent + } + return + } + + manager.putNormalIntentWithNamespace(ns, intent) +} + +func (manager *Manager) GetOplogConflict() bool { + return manager.oplogConflict +} + +func (manager *Manager) GetDestinationConflicts() (errs []DestinationConflictError) { + for dst, srcs := range manager.destinations { + if len(srcs) <= 1 { + continue + } + for _, src := range srcs { + errs = append(errs, DestinationConflictError{Dst: dst, Src: src}) + } + } + return +} + +// Intents returns a slice containing all of the intents in the manager. +// Intents is not thread safe +func (manager *Manager) Intents() []*Intent { + allIntents := []*Intent{} + for _, intent := range manager.intents { + allIntents = append(allIntents, intent) + } + for _, intent := range manager.indexIntents { + allIntents = append(allIntents, intent) + } + if manager.oplogIntent != nil { + allIntents = append(allIntents, manager.oplogIntent) + } + if manager.usersIntent != nil { + allIntents = append(allIntents, manager.usersIntent) + } + if manager.rolesIntent != nil { + allIntents = append(allIntents, manager.rolesIntent) + } + if manager.versionIntent != nil { + allIntents = append(allIntents, manager.versionIntent) + } + return allIntents +} + +func (manager *Manager) IntentForNamespace(ns string) *Intent { + intent := manager.intents[ns] + if intent != nil { + return intent + } + intent = manager.specialIntents[ns] + return intent +} + +// Pop returns the next available intent from the manager. If the manager is +// empty, it returns nil. Pop is thread safe. +func (manager *Manager) Pop() *Intent { + return manager.prioritizer.Get() +} + +// Peek returns a copy of a stored intent from the manager without removing +// the intent. This method is useful for edge cases that need to look ahead +// at what collections are in the manager before they are scheduled. +// +// NOTE: There are no guarantees that peek will return a usable +// intent after Finalize() is called. +func (manager *Manager) Peek() *Intent { + if len(manager.intentsByDiscoveryOrder) == 0 { + return nil + } + intentCopy := *manager.intentsByDiscoveryOrder[0] + return &intentCopy +} + +// Finish tells the prioritizer that mongorestore is done restoring +// the given collection intent. +func (manager *Manager) Finish(intent *Intent) { + manager.prioritizer.Finish(intent) +} + +// Oplog returns the intent representing the oplog, which isn't +// stored with the other intents, because it is dumped and restored in +// a very different way from other collections. +func (manager *Manager) Oplog() *Intent { + return manager.oplogIntent +} + +// SystemIndexes returns the system.indexes bson for a database +func (manager *Manager) SystemIndexes(dbName string) *Intent { + return manager.indexIntents[dbName] +} + +// SystemIndexes returns the databases for which there are system.indexes +func (manager *Manager) SystemIndexDBs() []string { + databases := []string{} + for dbname := range manager.indexIntents { + databases = append(databases, dbname) + } + return databases +} + +// Users returns the intent of the users collection to restore, a special case +func (manager *Manager) Users() *Intent { + return manager.usersIntent +} + +// Roles returns the intent of the user roles collection to restore, a special case +func (manager *Manager) Roles() *Intent { + return manager.rolesIntent +} + +// AuthVersion returns the intent of the version collection to restore, a special case +func (manager *Manager) AuthVersion() *Intent { + return manager.versionIntent +} + +// Finalize processes the intents for prioritization. Currently only two +// kinds of prioritizers are supported. No more "Put" operations may be done +// after finalize is called. +func (manager *Manager) Finalize(pType PriorityType) { + switch pType { + case Legacy: + log.Logv(log.DebugHigh, "finalizing intent manager with legacy prioritizer") + manager.prioritizer = NewLegacyPrioritizer(manager.intentsByDiscoveryOrder) + case LongestTaskFirst: + log.Logv(log.DebugHigh, "finalizing intent manager with longest task first prioritizer") + manager.prioritizer = NewLongestTaskFirstPrioritizer(manager.intentsByDiscoveryOrder) + case MultiDatabaseLTF: + log.Logv(log.DebugHigh, "finalizing intent manager with multi-database longest task first prioritizer") + manager.prioritizer = NewMultiDatabaseLTFPrioritizer(manager.intentsByDiscoveryOrder) + default: + panic("cannot initialize IntentPrioritizer with unknown type") + } + // release these for the garbage collector and to ensure code correctness + manager.intents = nil + manager.intentsByDiscoveryOrder = nil +} + +func (manager *Manager) UsePrioritizer(prioritizer IntentPrioritizer) { + manager.prioritizer = prioritizer +} diff --git a/src/mongo/gotools/common/intents/intent_prioritizer.go b/src/mongo/gotools/common/intents/intent_prioritizer.go new file mode 100644 index 00000000000..290a7c83d1e --- /dev/null +++ b/src/mongo/gotools/common/intents/intent_prioritizer.go @@ -0,0 +1,241 @@ +package intents + +import ( + "container/heap" + "sort" + "sync" +) + +type PriorityType int + +const ( + Legacy PriorityType = iota + LongestTaskFirst + MultiDatabaseLTF +) + +// IntentPrioritizer encapsulates the logic of scheduling intents +// for restoration. It can know about which intents are in the +// process of being restored through the "Finish" hook. +// +// Oplog entries and auth entries are not handled by the prioritizer, +// as these are special cases handled by the regular mongorestore code. +type IntentPrioritizer interface { + Get() *Intent + Finish(*Intent) +} + +//===== Legacy ===== + +// legacyPrioritizer processes the intents in the order they were read off the +// file system, keeping with legacy mongorestore behavior. +type legacyPrioritizer struct { + sync.Mutex + queue []*Intent +} + +func NewLegacyPrioritizer(intentList []*Intent) *legacyPrioritizer { + return &legacyPrioritizer{queue: intentList} +} + +func (legacy *legacyPrioritizer) Get() *Intent { + legacy.Lock() + defer legacy.Unlock() + + if len(legacy.queue) == 0 { + return nil + } + + var intent *Intent + intent, legacy.queue = legacy.queue[0], legacy.queue[1:] + return intent +} + +func (legacy *legacyPrioritizer) Finish(*Intent) { + // no-op + return +} + +//===== Longest Task First ===== + +// longestTaskFirstPrioritizer returns intents in the order of largest -> smallest, +// which is better at minimizing total runtime in parallel environments than +// other simple orderings. +type longestTaskFirstPrioritizer struct { + sync.Mutex + queue []*Intent +} + +// NewLongestTaskFirstPrioritizer returns an initialized LTP prioritizer +func NewLongestTaskFirstPrioritizer(intents []*Intent) *longestTaskFirstPrioritizer { + sort.Sort(BySize(intents)) + return &longestTaskFirstPrioritizer{ + queue: intents, + } +} + +func (ltf *longestTaskFirstPrioritizer) Get() *Intent { + ltf.Lock() + defer ltf.Unlock() + + if len(ltf.queue) == 0 { + return nil + } + + var intent *Intent + intent, ltf.queue = ltf.queue[0], ltf.queue[1:] + return intent +} + +func (ltf *longestTaskFirstPrioritizer) Finish(*Intent) { + // no-op + return +} + +// For sorting intents from largest to smallest size +type BySize []*Intent + +func (s BySize) Len() int { return len(s) } +func (s BySize) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s BySize) Less(i, j int) bool { return s[i].Size > s[j].Size } + +//===== Multi Database Longest Task First ===== + +// multiDatabaseLTF is designed to properly schedule intents with two constraints: +// 1. it is optimized to run in a multi-processor environment +// 2. it is optimized for parallelism against 2.6's db-level write lock +// These goals result in a design that attempts to have as many different +// database's intents being restored as possible and attempts to restore the +// largest collections first. +// +// If we can have a minimum number of collections in flight for a given db, +// we avoid lock contention in an optimal way on 2.6 systems. That is, +// it is better to have two restore jobs where +// job1 = "test.mycollection" +// job2 = "mydb2.othercollection" +// so that these collections do not compete for the db-level write lock. +// +// We also schedule the largest jobs first, in a greedy fashion, in order +// to minimize total restoration time. Each database's intents are sorted +// by decreasing file size at initialization, so that the largest jobs are +// run first. Admittedly, .bson file size is not a direct predictor of restore +// time, but there is certainly a strong correlation. Note that this attribute +// is secondary to the multi-db scheduling laid out above, since multi-db will +// get us bigger wins in terms of parallelism. +type multiDatabaseLTFPrioritizer struct { + sync.Mutex + dbHeap heap.Interface + counterMap map[string]*dbCounter +} + +// NewMultiDatabaseLTFPrioritizer takes in a list of intents and returns an +// initialized prioritizer. +func NewMultiDatabaseLTFPrioritizer(intents []*Intent) *multiDatabaseLTFPrioritizer { + prioritizer := &multiDatabaseLTFPrioritizer{ + counterMap: map[string]*dbCounter{}, + dbHeap: &DBHeap{}, + } + heap.Init(prioritizer.dbHeap) + // first, create all database counters + for _, intent := range intents { + counter, exists := prioritizer.counterMap[intent.DB] + if !exists { + // initialize a new counter if one doesn't exist for DB + counter = &dbCounter{} + prioritizer.counterMap[intent.DB] = counter + } + counter.collections = append(counter.collections, intent) + } + // then ensure that all the dbCounters have sorted intents + for _, counter := range prioritizer.counterMap { + counter.SortCollectionsBySize() + heap.Push(prioritizer.dbHeap, counter) + } + return prioritizer +} + +// Get returns the next prioritized intent and updates the count of active +// restores for the returned intent's DB. Get is not thread safe, and depends +// on the implementation of the intent manager to lock around it. +func (mdb *multiDatabaseLTFPrioritizer) Get() *Intent { + mdb.Lock() + defer mdb.Unlock() + + if mdb.dbHeap.Len() == 0 { + // we're out of things to return + return nil + } + optimalDB := heap.Pop(mdb.dbHeap).(*dbCounter) + optimalDB.active++ + nextIntent := optimalDB.PopIntent() + // only release the db counter if it's out of collections + if len(optimalDB.collections) > 0 { + heap.Push(mdb.dbHeap, optimalDB) + } + return nextIntent +} + +// Finish decreases the number of active restore jobs for the given intent's +// database, and reshuffles the heap accordingly. Finish is not thread safe, +// and depends on the implementation of the intent manager to lock around it. +func (mdb *multiDatabaseLTFPrioritizer) Finish(intent *Intent) { + mdb.Lock() + defer mdb.Unlock() + + counter := mdb.counterMap[intent.DB] + counter.active-- + // only fix up the heap if the counter is still in the heap + if len(counter.collections) > 0 { + // This is an O(n) operation on the heap. We could make all heap + // operations O(log(n)) if we set up dbCounters to track their own + // position in the heap, but in practice this overhead is likely negligible. + heap.Init(mdb.dbHeap) + } +} + +type dbCounter struct { + active int + collections []*Intent +} + +func (dbc *dbCounter) SortCollectionsBySize() { + sort.Sort(BySize(dbc.collections)) +} + +// PopIntent returns the largest intent remaining for the database +func (dbc *dbCounter) PopIntent() *Intent { + var intent *Intent + if len(dbc.collections) > 0 { + intent, dbc.collections = dbc.collections[0], dbc.collections[1:] + } + return intent +} + +// Returns the largest collection of the databases with the least active restores. +// Implements the container/heap interface. None of its methods are meant to be +// called directly. +type DBHeap []*dbCounter + +func (dbh DBHeap) Len() int { return len(dbh) } +func (dbh DBHeap) Swap(i, j int) { dbh[i], dbh[j] = dbh[j], dbh[i] } +func (dbh DBHeap) Less(i, j int) bool { + if dbh[i].active == dbh[j].active { + // prioritize the largest bson file if dbs have the same number + // of restorations in progress + return dbh[i].collections[0].Size > dbh[j].collections[0].Size + } + return dbh[i].active < dbh[j].active +} + +func (dbh *DBHeap) Push(x interface{}) { + *dbh = append(*dbh, x.(*dbCounter)) +} + +func (dbh *DBHeap) Pop() interface{} { + // for container/heap package: removes the top entry and resizes the heap array + old := *dbh + n := len(old) + toPop := old[n-1] + *dbh = old[0 : n-1] + return toPop +} diff --git a/src/mongo/gotools/common/intents/intent_prioritizer_test.go b/src/mongo/gotools/common/intents/intent_prioritizer_test.go new file mode 100644 index 00000000000..2abd79e5641 --- /dev/null +++ b/src/mongo/gotools/common/intents/intent_prioritizer_test.go @@ -0,0 +1,174 @@ +package intents + +import ( + "container/heap" + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "testing" +) + +func TestLegacyPrioritizer(t *testing.T) { + + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("With a legacyPrioritizer initialized with an ordered intent list", t, func() { + testList := []*Intent{ + &Intent{DB: "1"}, + &Intent{DB: "2"}, + &Intent{DB: "3"}, + } + legacy := NewLegacyPrioritizer(testList) + So(legacy, ShouldNotBeNil) + + Convey("the priority should be defined by 'first-in-first-out'", func() { + it0 := legacy.Get() + it1 := legacy.Get() + it2 := legacy.Get() + it3 := legacy.Get() + So(it3, ShouldBeNil) + So(it0.DB, ShouldBeLessThan, it1.DB) + So(it1.DB, ShouldBeLessThan, it2.DB) + }) + }) +} + +func TestBasicDBHeapBehavior(t *testing.T) { + var dbheap heap.Interface + + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("With an empty dbHeap", t, func() { + dbheap = &DBHeap{} + heap.Init(dbheap) + + Convey("when inserting unordered dbCounters with different active counts", func() { + heap.Push(dbheap, &dbCounter{75, nil}) + heap.Push(dbheap, &dbCounter{121, nil}) + heap.Push(dbheap, &dbCounter{76, nil}) + heap.Push(dbheap, &dbCounter{51, nil}) + heap.Push(dbheap, &dbCounter{82, nil}) + heap.Push(dbheap, &dbCounter{117, nil}) + heap.Push(dbheap, &dbCounter{49, nil}) + heap.Push(dbheap, &dbCounter{101, nil}) + heap.Push(dbheap, &dbCounter{122, nil}) + heap.Push(dbheap, &dbCounter{33, nil}) + heap.Push(dbheap, &dbCounter{0, nil}) + + Convey("they should pop in active order, least to greatest", func() { + prev := -1 + for dbheap.Len() > 0 { + popped := heap.Pop(dbheap).(*dbCounter) + So(popped.active, ShouldBeGreaterThan, prev) + prev = popped.active + } + }) + }) + + Convey("when inserting unordered dbCounters with different bson sizes", func() { + heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{Size: 70}}}) + heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{Size: 1024}}}) + heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{Size: 97}}}) + heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{Size: 3}}}) + heap.Push(dbheap, &dbCounter{0, []*Intent{&Intent{Size: 1024 * 1024}}}) + + Convey("they should pop in bson size order, greatest to least", func() { + prev := int64(1024*1024 + 1) // Maximum + for dbheap.Len() > 0 { + popped := heap.Pop(dbheap).(*dbCounter) + So(popped.collections[0].Size, ShouldBeLessThan, prev) + prev = popped.collections[0].Size + } + }) + }) + }) +} + +func TestDBCounterCollectionSorting(t *testing.T) { + + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("With a dbCounter and an unordered collection of intents", t, func() { + dbc := &dbCounter{ + collections: []*Intent{ + &Intent{Size: 100}, + &Intent{Size: 1000}, + &Intent{Size: 1}, + &Intent{Size: 10}, + }, + } + + Convey("popping the sorted intents should return in decreasing BSONSize", func() { + dbc.SortCollectionsBySize() + So(dbc.PopIntent().Size, ShouldEqual, 1000) + So(dbc.PopIntent().Size, ShouldEqual, 100) + So(dbc.PopIntent().Size, ShouldEqual, 10) + So(dbc.PopIntent().Size, ShouldEqual, 1) + So(dbc.PopIntent(), ShouldBeNil) + So(dbc.PopIntent(), ShouldBeNil) + }) + }) +} + +func TestSimulatedMultiDBJob(t *testing.T) { + var prioritizer IntentPrioritizer + + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("With a prioritizer initialized with a set of intents", t, func() { + intents := []*Intent{ + &Intent{C: "small", DB: "db2", Size: 32}, + &Intent{C: "medium", DB: "db2", Size: 128}, + &Intent{C: "giant", DB: "db1", Size: 1024}, + &Intent{C: "tiny", DB: "db1", Size: 2}, + } + prioritizer = NewMultiDatabaseLTFPrioritizer(intents) + So(prioritizer, ShouldNotBeNil) + + Convey("and a running simulation of two jobs threads:", func() { + Convey("the first two intents should be of different dbs", func() { + i0 := prioritizer.Get() + So(i0, ShouldNotBeNil) + i1 := prioritizer.Get() + So(i1, ShouldNotBeNil) + + Convey("the first intent should be the largest bson file", func() { + So(i0.C, ShouldEqual, "giant") + So(i0.DB, ShouldEqual, "db1") + }) + + Convey("the second intent should be the largest bson file of db2", func() { + So(i1.C, ShouldEqual, "medium") + So(i1.DB, ShouldEqual, "db2") + }) + + Convey("with the second job finishing the smaller intents", func() { + prioritizer.Finish(i1) + i2 := prioritizer.Get() + So(i2, ShouldNotBeNil) + prioritizer.Finish(i2) + i3 := prioritizer.Get() + So(i3, ShouldNotBeNil) + + Convey("the next job should be from db2", func() { + So(i2.C, ShouldEqual, "small") + So(i2.DB, ShouldEqual, "db2") + }) + + Convey("the final job should be from db1", func() { + So(i3.C, ShouldEqual, "tiny") + So(i3.DB, ShouldEqual, "db1") + + Convey("which means that there should be two active db1 jobs", func() { + counter := prioritizer.(*multiDatabaseLTFPrioritizer).counterMap["db1"] + So(counter.active, ShouldEqual, 2) + }) + }) + + Convey("the heap should now be empty", func() { + So(prioritizer.(*multiDatabaseLTFPrioritizer).dbHeap.Len(), ShouldEqual, 0) + }) + }) + }) + }) + }) +} diff --git a/src/mongo/gotools/common/intents/intent_test.go b/src/mongo/gotools/common/intents/intent_test.go new file mode 100644 index 00000000000..15b99f1af30 --- /dev/null +++ b/src/mongo/gotools/common/intents/intent_test.go @@ -0,0 +1,81 @@ +package intents + +import ( + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "testing" +) + +func TestIntentManager(t *testing.T) { + var manager *Manager + + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("With an empty IntentManager", t, func() { + manager = NewIntentManager() + So(manager, ShouldNotBeNil) + + Convey("putting a series of added bson intents", func() { + manager.Put(&Intent{DB: "1", C: "1", Location: "/b1/"}) + manager.Put(&Intent{DB: "1", C: "2", Location: "/b2/"}) + manager.Put(&Intent{DB: "1", C: "3", Location: "/b3/"}) + manager.Put(&Intent{DB: "2", C: "1", Location: "/b4/"}) + So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 4) + So(len(manager.intents), ShouldEqual, 4) + + Convey("and then some matching metadata intents", func() { + manager.Put(&Intent{DB: "2", C: "1", MetadataLocation: "/4m/"}) + manager.Put(&Intent{DB: "1", C: "3", MetadataLocation: "/3m/"}) + manager.Put(&Intent{DB: "1", C: "1", MetadataLocation: "/1m/"}) + manager.Put(&Intent{DB: "1", C: "2", MetadataLocation: "/2m/"}) + + Convey("the size of the queue should be unchanged", func() { + So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 4) + So(len(manager.intents), ShouldEqual, 4) + }) + + Convey("popping them from the IntentManager", func() { + manager.Finalize(Legacy) + it0 := manager.Pop() + it1 := manager.Pop() + it2 := manager.Pop() + it3 := manager.Pop() + it4 := manager.Pop() + So(it4, ShouldBeNil) + + Convey("should return them in insert order", func() { + So(*it0, ShouldResemble, + Intent{DB: "1", C: "1", Location: "/b1/", MetadataLocation: "/1m/"}) + So(*it1, ShouldResemble, + Intent{DB: "1", C: "2", Location: "/b2/", MetadataLocation: "/2m/"}) + So(*it2, ShouldResemble, + Intent{DB: "1", C: "3", Location: "/b3/", MetadataLocation: "/3m/"}) + So(*it3, ShouldResemble, + Intent{DB: "2", C: "1", Location: "/b4/", MetadataLocation: "/4m/"}) + }) + }) + }) + + Convey("but adding non-matching intents", func() { + manager.Put(&Intent{DB: "7", C: "49", MetadataLocation: "/5/"}) + manager.Put(&Intent{DB: "27", C: "B", MetadataLocation: "/6/"}) + + Convey("should increase the size, because they are not merged in", func() { + So(len(manager.intentsByDiscoveryOrder), ShouldEqual, 6) + So(len(manager.intents), ShouldEqual, 6) + }) + }) + + Convey("using the Peek() method", func() { + peeked := manager.Peek() + So(peeked, ShouldNotBeNil) + So(peeked, ShouldResemble, manager.intentsByDiscoveryOrder[0]) + + Convey("modifying the returned copy should not modify the original", func() { + peeked.DB = "SHINY NEW VALUE" + So(peeked, ShouldNotResemble, manager.intentsByDiscoveryOrder[0]) + }) + }) + }) + }) +} |