diff options
Diffstat (limited to 'src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go')
-rw-r--r-- | src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go b/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go index 4bc9b5405de..b356cb6e5ae 100644 --- a/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go +++ b/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go @@ -654,11 +654,10 @@ func aggregate(a aggregateParams) (*Cursor, error) { sess = nil } - defaultSelector := a.readSelector - if hasOutputStage { - defaultSelector = a.writeSelector + selector := makePinnedSelector(sess, a.writeSelector) + if !hasOutputStage { + selector = makeReadPrefSelector(sess, a.readSelector, a.client.localThreshold) } - selector := makePinnedSelector(sess, defaultSelector) ao := options.MergeAggregateOptions(a.opts...) cursorOpts := driver.CursorOptions{ @@ -756,8 +755,7 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{}, rc = nil } - selector := makePinnedSelector(sess, coll.readSelector) - + selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold) op := operation.NewAggregate(pipelineArr).Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference). CommandMonitor(coll.client.monitor).ServerSelector(selector).ClusterClock(coll.client.clock).Database(coll.db.name). Collection(coll.name).Deployment(coll.client.topology) @@ -832,8 +830,7 @@ func (coll *Collection) EstimatedDocumentCount(ctx context.Context, rc = nil } - selector := makePinnedSelector(sess, coll.readSelector) - + selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold) op := operation.NewCount().Session(sess).ClusterClock(coll.client.clock). Database(coll.db.name).Collection(coll.name).CommandMonitor(coll.client.monitor). Deployment(coll.client.topology).ReadConcern(rc).ReadPreference(coll.readPreference). @@ -888,8 +885,7 @@ func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter i rc = nil } - selector := makePinnedSelector(sess, coll.readSelector) - + selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold) option := options.MergeDistinctOptions(opts...) op := operation.NewDistinct(fieldName, bsoncore.Document(f)). @@ -971,8 +967,7 @@ func (coll *Collection) Find(ctx context.Context, filter interface{}, rc = nil } - selector := makePinnedSelector(sess, coll.readSelector) - + selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold) op := operation.NewFind(f). Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference). CommandMonitor(coll.client.monitor).ServerSelector(selector). @@ -1421,3 +1416,14 @@ func makePinnedSelector(sess *session.Client, defaultSelector description.Server return defaultSelector.SelectServer(t, svrs) } } + +func makeReadPrefSelector(sess *session.Client, selector description.ServerSelector, localThreshold time.Duration) description.ServerSelectorFunc { + if sess != nil && sess.TransactionRunning() { + selector = description.CompositeSelector([]description.ServerSelector{ + description.ReadPrefSelector(sess.CurrentRp), + description.LatencySelector(localThreshold), + }) + } + + return makePinnedSelector(sess, selector) +} |