summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go')
-rw-r--r--src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go30
1 files changed, 18 insertions, 12 deletions
diff --git a/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go b/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go
index 4bc9b5405de..b356cb6e5ae 100644
--- a/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go
+++ b/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/go.mongodb.org/mongo-driver/mongo/collection.go
@@ -654,11 +654,10 @@ func aggregate(a aggregateParams) (*Cursor, error) {
sess = nil
}
- defaultSelector := a.readSelector
- if hasOutputStage {
- defaultSelector = a.writeSelector
+ selector := makePinnedSelector(sess, a.writeSelector)
+ if !hasOutputStage {
+ selector = makeReadPrefSelector(sess, a.readSelector, a.client.localThreshold)
}
- selector := makePinnedSelector(sess, defaultSelector)
ao := options.MergeAggregateOptions(a.opts...)
cursorOpts := driver.CursorOptions{
@@ -756,8 +755,7 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
rc = nil
}
- selector := makePinnedSelector(sess, coll.readSelector)
-
+ selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
op := operation.NewAggregate(pipelineArr).Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference).
CommandMonitor(coll.client.monitor).ServerSelector(selector).ClusterClock(coll.client.clock).Database(coll.db.name).
Collection(coll.name).Deployment(coll.client.topology)
@@ -832,8 +830,7 @@ func (coll *Collection) EstimatedDocumentCount(ctx context.Context,
rc = nil
}
- selector := makePinnedSelector(sess, coll.readSelector)
-
+ selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
op := operation.NewCount().Session(sess).ClusterClock(coll.client.clock).
Database(coll.db.name).Collection(coll.name).CommandMonitor(coll.client.monitor).
Deployment(coll.client.topology).ReadConcern(rc).ReadPreference(coll.readPreference).
@@ -888,8 +885,7 @@ func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter i
rc = nil
}
- selector := makePinnedSelector(sess, coll.readSelector)
-
+ selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
option := options.MergeDistinctOptions(opts...)
op := operation.NewDistinct(fieldName, bsoncore.Document(f)).
@@ -971,8 +967,7 @@ func (coll *Collection) Find(ctx context.Context, filter interface{},
rc = nil
}
- selector := makePinnedSelector(sess, coll.readSelector)
-
+ selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold)
op := operation.NewFind(f).
Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference).
CommandMonitor(coll.client.monitor).ServerSelector(selector).
@@ -1421,3 +1416,14 @@ func makePinnedSelector(sess *session.Client, defaultSelector description.Server
return defaultSelector.SelectServer(t, svrs)
}
}
+
+func makeReadPrefSelector(sess *session.Client, selector description.ServerSelector, localThreshold time.Duration) description.ServerSelectorFunc {
+ if sess != nil && sess.TransactionRunning() {
+ selector = description.CompositeSelector([]description.ServerSelector{
+ description.ReadPrefSelector(sess.CurrentRp),
+ description.LatencySelector(localThreshold),
+ })
+ }
+
+ return makePinnedSelector(sess, selector)
+}