diff options
author | Kevin Pulo <kevin.pulo@mongodb.com> | 2020-02-20 21:42:07 +1100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-05 13:03:43 +0000 |
commit | 747ff353cbc819d032fa727d4bd7ffad16ea0437 (patch) | |
tree | d9b3d7e9af26138d7b74e0416a93d6110e326af0 /src/mongo/client | |
parent | 7c4b875a8858c5bd5efc9bf4f285f7f440fdfdc0 (diff) | |
download | mongo-747ff353cbc819d032fa727d4bd7ffad16ea0437.tar.gz |
SERVER-45692 add explicit RWC to inter-node commands (even if merely kImplicitDefault)
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/dbclient_base.cpp | 193 | ||||
-rw-r--r-- | src/mongo/client/dbclient_base.h | 129 | ||||
-rw-r--r-- | src/mongo/client/dbclient_connection.cpp | 10 | ||||
-rw-r--r-- | src/mongo/client/dbclient_connection.h | 29 | ||||
-rw-r--r-- | src/mongo/client/dbclient_cursor.cpp | 21 | ||||
-rw-r--r-- | src/mongo/client/dbclient_cursor.h | 7 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs.cpp | 59 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs.h | 44 |
8 files changed, 350 insertions, 142 deletions
diff --git a/src/mongo/client/dbclient_base.cpp b/src/mongo/client/dbclient_base.cpp index caad3d6cc98..4e2d1b36a19 100644 --- a/src/mongo/client/dbclient_base.cpp +++ b/src/mongo/client/dbclient_base.cpp @@ -319,10 +319,14 @@ bool DBClientBase::runPseudoCommand(StringData db, return success; } -long long DBClientBase::count( - const NamespaceStringOrUUID nsOrUuid, const BSONObj& query, int options, int limit, int skip) { +long long DBClientBase::count(const NamespaceStringOrUUID nsOrUuid, + const BSONObj& query, + int options, + int limit, + int skip, + boost::optional<BSONObj> readConcernObj) { auto dbName = (nsOrUuid.uuid() ? nsOrUuid.dbname() : (*nsOrUuid.nss()).db().toString()); - BSONObj cmd = _countCmd(nsOrUuid, query, options, limit, skip); + BSONObj cmd = _countCmd(nsOrUuid, query, options, limit, skip, readConcernObj); BSONObj res; if (!runCommand(dbName, cmd, res, options)) { auto status = getStatusFromCommandResult(res); @@ -332,8 +336,12 @@ long long DBClientBase::count( return res["n"].numberLong(); } -BSONObj DBClientBase::_countCmd( - const NamespaceStringOrUUID nsOrUuid, const BSONObj& query, int options, int limit, int skip) { +BSONObj DBClientBase::_countCmd(const NamespaceStringOrUUID nsOrUuid, + const BSONObj& query, + int options, + int limit, + int skip, + boost::optional<BSONObj> readConcernObj) { BSONObjBuilder b; if (nsOrUuid.uuid()) { const auto uuid = *nsOrUuid.uuid(); @@ -346,6 +354,9 @@ BSONObj DBClientBase::_countCmd( b.append("limit", limit); if (skip) b.append("skip", skip); + if (readConcernObj) { + b.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcernObj); + } return b.obj(); } @@ -541,8 +552,12 @@ bool DBClientBase::isMaster(bool& isMaster, BSONObj* info) { return ok; } -bool DBClientBase::createCollection( - const string& ns, long long size, bool capped, int max, BSONObj* info) { +bool DBClientBase::createCollection(const string& ns, + long long size, + bool capped, + int max, + BSONObj* info, + boost::optional<BSONObj> writeConcernObj) { verify(!capped || size); BSONObj o; if (info == nullptr) @@ -556,6 +571,9 @@ bool DBClientBase::createCollection( b.append("capped", true); if (max) b.append("max", max); + if (writeConcernObj) { + b.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } return runCommand(db.c_str(), b.done(), *info); } @@ -644,11 +662,18 @@ void DBClientBase::findN(vector<BSONObj>& out, int nToReturn, int nToSkip, const BSONObj* fieldsToReturn, - int queryOptions) { + int queryOptions, + boost::optional<BSONObj> readConcernObj) { out.reserve(nToReturn); - unique_ptr<DBClientCursor> c = - this->query(NamespaceString(ns), query, nToReturn, nToSkip, fieldsToReturn, queryOptions); + unique_ptr<DBClientCursor> c = this->query(NamespaceString(ns), + query, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + 0 /* batchSize */, + readConcernObj); // query() throws on network error so OK to uassert with numeric code here. uassert(10276, @@ -672,15 +697,18 @@ void DBClientBase::findN(vector<BSONObj>& out, BSONObj DBClientBase::findOne(const string& ns, const Query& query, const BSONObj* fieldsToReturn, - int queryOptions) { + int queryOptions, + boost::optional<BSONObj> readConcernObj) { vector<BSONObj> v; - findN(v, ns, query, 1, 0, fieldsToReturn, queryOptions); + findN(v, ns, query, 1, 0, fieldsToReturn, queryOptions, readConcernObj); return v.empty() ? BSONObj() : v[0]; } -std::pair<BSONObj, NamespaceString> DBClientBase::findOneByUUID(const std::string& db, - UUID uuid, - const BSONObj& filter) { +std::pair<BSONObj, NamespaceString> DBClientBase::findOneByUUID( + const std::string& db, + UUID uuid, + const BSONObj& filter, + boost::optional<BSONObj> readConcernObj) { list<BSONObj> results; BSONObj res; @@ -689,6 +717,9 @@ std::pair<BSONObj, NamespaceString> DBClientBase::findOneByUUID(const std::strin cmdBuilder.append("filter", filter); cmdBuilder.append("limit", 1); cmdBuilder.append("singleBatch", true); + if (readConcernObj) { + cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcernObj); + } BSONObj cmd = cmdBuilder.obj(); @@ -721,9 +752,17 @@ unique_ptr<DBClientCursor> DBClientBase::query(const NamespaceStringOrUUID& nsOr int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { - unique_ptr<DBClientCursor> c(new DBClientCursor( - this, nsOrUuid, query.obj, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize)); + int batchSize, + boost::optional<BSONObj> readConcernObj) { + unique_ptr<DBClientCursor> c(new DBClientCursor(this, + nsOrUuid, + query.obj, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize, + readConcernObj)); if (c->init()) return c; return nullptr; @@ -754,11 +793,13 @@ unsigned long long DBClientBase::query(std::function<void(const BSONObj&)> f, Query query, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { + int batchSize, + boost::optional<BSONObj> readConcernObj) { DBClientFunConvertor fun; fun._f = f; std::function<void(DBClientCursorBatchIterator&)> ptr(fun); - return this->query(ptr, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize); + return this->query( + ptr, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize, readConcernObj); } unsigned long long DBClientBase::query(std::function<void(DBClientCursorBatchIterator&)> f, @@ -766,12 +807,13 @@ unsigned long long DBClientBase::query(std::function<void(DBClientCursorBatchIte Query query, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { + int batchSize, + boost::optional<BSONObj> readConcernObj) { // mask options queryOptions &= (int)(QueryOption_NoCursorTimeout | QueryOption_SlaveOk); - unique_ptr<DBClientCursor> c( - this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize)); + unique_ptr<DBClientCursor> c(this->query( + nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize, readConcernObj)); // query() throws on network error so OK to uassert with numeric code here. uassert(16090, "socket error for mapping query", c.get()); @@ -785,34 +827,63 @@ unsigned long long DBClientBase::query(std::function<void(DBClientCursorBatchIte return n; } -void DBClientBase::insert(const string& ns, BSONObj obj, int flags) { - insert(ns, std::vector<BSONObj>{obj}, flags); +void DBClientBase::insert(const string& ns, + BSONObj obj, + int flags, + boost::optional<BSONObj> writeConcernObj) { + insert(ns, std::vector<BSONObj>{obj}, flags, writeConcernObj); } -void DBClientBase::insert(const string& ns, const vector<BSONObj>& v, int flags) { +void DBClientBase::insert(const string& ns, + const vector<BSONObj>& v, + int flags, + boost::optional<BSONObj> writeConcernObj) { bool ordered = !(flags & InsertOption_ContinueOnError); auto nss = NamespaceString(ns); - auto request = - OpMsgRequest::fromDBAndBody(nss.db(), BSON("insert" << nss.coll() << "ordered" << ordered)); + BSONObjBuilder cmdBuilder; + cmdBuilder.append("insert", nss.coll()); + cmdBuilder.append("ordered", ordered); + if (writeConcernObj) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } + auto request = OpMsgRequest::fromDBAndBody(nss.db(), cmdBuilder.obj()); request.sequences.push_back({"documents", v}); runFireAndForgetCommand(std::move(request)); } -void DBClientBase::remove(const string& ns, Query obj, int flags) { +void DBClientBase::remove(const string& ns, + Query obj, + int flags, + boost::optional<BSONObj> writeConcernObj) { int limit = (flags & RemoveOption_JustOne) ? 1 : 0; auto nss = NamespaceString(ns); - auto request = OpMsgRequest::fromDBAndBody(nss.db(), BSON("delete" << nss.coll())); + BSONObjBuilder cmdBuilder; + cmdBuilder.append("delete", nss.coll()); + if (writeConcernObj) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } + auto request = OpMsgRequest::fromDBAndBody(nss.db(), cmdBuilder.obj()); request.sequences.push_back({"deletes", {BSON("q" << obj.obj << "limit" << limit)}}); runFireAndForgetCommand(std::move(request)); } -void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upsert, bool multi) { +void DBClientBase::update(const string& ns, + Query query, + BSONObj obj, + bool upsert, + bool multi, + boost::optional<BSONObj> writeConcernObj) { auto nss = NamespaceString(ns); - auto request = OpMsgRequest::fromDBAndBody(nss.db(), BSON("update" << nss.coll())); + BSONObjBuilder cmdBuilder; + cmdBuilder.append("update", nss.coll()); + if (writeConcernObj) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } + auto request = OpMsgRequest::fromDBAndBody(nss.db(), cmdBuilder.obj()); request.sequences.push_back( {"updates", {BSON("q" << query.obj << "u" << obj << "upsert" << upsert << "multi" << multi)}}); @@ -820,12 +891,17 @@ void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upser runFireAndForgetCommand(std::move(request)); } -void DBClientBase::update(const string& ns, Query query, BSONObj obj, int flags) { +void DBClientBase::update(const string& ns, + Query query, + BSONObj obj, + int flags, + boost::optional<BSONObj> writeConcernObj) { update(ns, std::move(query), std::move(obj), flags & UpdateOption_Upsert, - flags & UpdateOption_Multi); + flags & UpdateOption_Multi, + writeConcernObj); } void DBClientBase::killCursor(const NamespaceString& ns, long long cursorId) { @@ -914,16 +990,24 @@ std::list<BSONObj> DBClientBase::_getIndexSpecs(const NamespaceStringOrUUID& nsO } -void DBClientBase::dropIndex(const string& ns, BSONObj keys) { - dropIndex(ns, genIndexName(keys)); +void DBClientBase::dropIndex(const string& ns, + BSONObj keys, + boost::optional<BSONObj> writeConcernObj) { + dropIndex(ns, genIndexName(keys), writeConcernObj); } -void DBClientBase::dropIndex(const string& ns, const string& indexName) { +void DBClientBase::dropIndex(const string& ns, + const string& indexName, + boost::optional<BSONObj> writeConcernObj) { + BSONObjBuilder cmdBuilder; + cmdBuilder.append("dropIndexes", nsToCollectionSubstring(ns)); + cmdBuilder.append("index", indexName); + if (writeConcernObj) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } BSONObj info; - if (!runCommand(nsToDatabase(ns), - BSON("dropIndexes" << nsToCollectionSubstring(ns) << "index" << indexName), - info)) { + if (!runCommand(nsToDatabase(ns), cmdBuilder.obj(), info)) { LOGV2_DEBUG(20118, logSeverityV1toV2(_logLevel).toInt(), "dropIndex failed: {info}", @@ -932,14 +1016,15 @@ void DBClientBase::dropIndex(const string& ns, const string& indexName) { } } -void DBClientBase::dropIndexes(const string& ns) { +void DBClientBase::dropIndexes(const string& ns, boost::optional<BSONObj> writeConcernObj) { + BSONObjBuilder cmdBuilder; + cmdBuilder.append("dropIndexes", nsToCollectionSubstring(ns)); + cmdBuilder.append("index", "*"); + if (writeConcernObj) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } BSONObj info; - uassert(10008, - "dropIndexes failed", - runCommand(nsToDatabase(ns), - BSON("dropIndexes" << nsToCollectionSubstring(ns) << "index" - << "*"), - info)); + uassert(10008, "dropIndexes failed", runCommand(nsToDatabase(ns), cmdBuilder.obj(), info)); } void DBClientBase::reIndex(const string& ns) { @@ -971,7 +1056,9 @@ string DBClientBase::genIndexName(const BSONObj& keys) { return ss.str(); } -void DBClientBase::createIndexes(StringData ns, const std::vector<const IndexSpec*>& descriptors) { +void DBClientBase::createIndexes(StringData ns, + const std::vector<const IndexSpec*>& descriptors, + boost::optional<BSONObj> writeConcernObj) { BSONObjBuilder command; command.append("createIndexes", nsToCollectionSubstring(ns)); { @@ -980,6 +1067,9 @@ void DBClientBase::createIndexes(StringData ns, const std::vector<const IndexSpe indexes.append(desc->toBSON()); } } + if (writeConcernObj) { + command.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } const BSONObj commandObj = command.done(); BSONObj infoObj; @@ -990,7 +1080,9 @@ void DBClientBase::createIndexes(StringData ns, const std::vector<const IndexSpe } } -void DBClientBase::createIndexes(StringData ns, const std::vector<BSONObj>& specs) { +void DBClientBase::createIndexes(StringData ns, + const std::vector<BSONObj>& specs, + boost::optional<BSONObj> writeConcernObj) { BSONObjBuilder command; command.append("createIndexes", nsToCollectionSubstring(ns)); { @@ -999,6 +1091,9 @@ void DBClientBase::createIndexes(StringData ns, const std::vector<BSONObj>& spec indexes.append(spec); } } + if (writeConcernObj) { + command.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } const BSONObj commandObj = command.done(); BSONObj infoObj; diff --git a/src/mongo/client/dbclient_base.h b/src/mongo/client/dbclient_base.h index 5a3dfcfc766..5db09d9412e 100644 --- a/src/mongo/client/dbclient_base.h +++ b/src/mongo/client/dbclient_base.h @@ -73,27 +73,31 @@ std::string nsGetCollection(const std::string& ns); * them as "final" or "override" as appropriate. */ class DBClientQueryInterface { - virtual std::unique_ptr<DBClientCursor> query(const NamespaceStringOrUUID& nsOrUuid, - Query query, - int nToReturn = 0, - int nToSkip = 0, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0) = 0; + virtual std::unique_ptr<DBClientCursor> query( + const NamespaceStringOrUUID& nsOrUuid, + Query query, + int nToReturn = 0, + int nToSkip = 0, + const BSONObj* fieldsToReturn = nullptr, + int queryOptions = 0, + int batchSize = 0, + boost::optional<BSONObj> readConcernObj = boost::none) = 0; virtual unsigned long long query(std::function<void(const BSONObj&)> f, const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn = nullptr, int queryOptions = 0, - int batchSize = 0) = 0; + int batchSize = 0, + boost::optional<BSONObj> readConcernObj = boost::none) = 0; virtual unsigned long long query(std::function<void(DBClientCursorBatchIterator&)> f, const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn = nullptr, int queryOptions = 0, - int batchSize = 0) = 0; + int batchSize = 0, + boost::optional<BSONObj> readConcernObj = boost::none) = 0; }; /** @@ -119,7 +123,8 @@ public: virtual BSONObj findOne(const std::string& ns, const Query& query, const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0); + int queryOptions = 0, + boost::optional<BSONObj> readConcernObj = boost::none); /** query N objects from the database into an array. makes sense mostly when you want a small * number of results. if a huge number, use query() and iterate the cursor. @@ -130,7 +135,8 @@ public: int nToReturn, int nToSkip = 0, const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0); + int queryOptions = 0, + boost::optional<BSONObj> readConcernObj = boost::none); /** * @return a pair with a single object that matches the filter within the collection specified @@ -140,9 +146,11 @@ public: * the query, an empty BSONObj is returned. * @throws AssertionException */ - virtual std::pair<BSONObj, NamespaceString> findOneByUUID(const std::string& db, - UUID uuid, - const BSONObj& filter); + virtual std::pair<BSONObj, NamespaceString> findOneByUUID( + const std::string& db, + UUID uuid, + const BSONObj& filter, + boost::optional<BSONObj> readConcernObj = boost::none); virtual std::string getServerAddress() const = 0; @@ -351,11 +359,12 @@ public: /** count number of objects in collection ns that match the query criteria specified throws UserAssertion if database returns an error */ - virtual long long count(NamespaceStringOrUUID nsOrUuid, + virtual long long count(const NamespaceStringOrUUID nsOrUuid, const BSONObj& query = BSONObj(), int options = 0, int limit = 0, - int skip = 0); + int skip = 0, + boost::optional<BSONObj> readConcernObj = boost::none); static std::string createPasswordDigest(const std::string& username, const std::string& clearTextPassword); @@ -390,7 +399,8 @@ public: long long size = 0, bool capped = false, int max = 0, - BSONObj* info = nullptr); + BSONObj* info = nullptr, + boost::optional<BSONObj> writeConcernObj = boost::none); /** Get error result from the last write operation (insert/update/delete) on this connection. db doesn't change the command's behavior - it is just for auth checks. @@ -483,8 +493,10 @@ public: * @param keys Document describing keys and index types. You must provide at least one * field and its direction. */ - void createIndex(StringData ns, const BSONObj& keys) { - return createIndex(ns, IndexSpec().addKeys(keys)); + void createIndex(StringData ns, + const BSONObj& keys, + boost::optional<BSONObj> writeConcernObj = boost::none) { + return createIndex(ns, IndexSpec().addKeys(keys), writeConcernObj); } /** Create an index on the collection 'ns' as described by the given @@ -495,20 +507,26 @@ public: * @param descriptor Configuration object describing the index to create. The * descriptor must describe at least one key and index type. */ - virtual void createIndex(StringData ns, const IndexSpec& descriptor) { + virtual void createIndex(StringData ns, + const IndexSpec& descriptor, + boost::optional<BSONObj> writeConcernObj = boost::none) { std::vector<const IndexSpec*> toBuild; toBuild.push_back(&descriptor); - createIndexes(ns, toBuild); + createIndexes(ns, toBuild, writeConcernObj); } - virtual void createIndexes(StringData ns, const std::vector<const IndexSpec*>& descriptor); + virtual void createIndexes(StringData ns, + const std::vector<const IndexSpec*>& descriptor, + boost::optional<BSONObj> writeConcernObj = boost::none); /** * Creates indexes on the collection 'ns' as described by 'specs'. * * Failure to construct the indexes is reported by throwing an AssertionException. */ - virtual void createIndexes(StringData ns, const std::vector<BSONObj>& specs); + virtual void createIndexes(StringData ns, + const std::vector<BSONObj>& specs, + boost::optional<BSONObj> writeConcernObj = boost::none); /** * Lists indexes on the collection 'nsOrUuid'. @@ -523,13 +541,18 @@ public: virtual std::list<BSONObj> getReadyIndexSpecs(const NamespaceStringOrUUID& nsOrUuid, int options = 0); - virtual void dropIndex(const std::string& ns, BSONObj keys); - virtual void dropIndex(const std::string& ns, const std::string& indexName); + virtual void dropIndex(const std::string& ns, + BSONObj keys, + boost::optional<BSONObj> writeConcernObj = boost::none); + virtual void dropIndex(const std::string& ns, + const std::string& indexName, + boost::optional<BSONObj> writeConcernObj = boost::none); /** drops all indexes for the collection */ - virtual void dropIndexes(const std::string& ns); + virtual void dropIndexes(const std::string& ns, + boost::optional<BSONObj> writeConcernObj = boost::none); virtual void reIndex(const std::string& ns); @@ -600,13 +623,15 @@ public: @return cursor. 0 if error (connection failure) @throws AssertionException */ - std::unique_ptr<DBClientCursor> query(const NamespaceStringOrUUID& nsOrUuid, - Query query, - int nToReturn = 0, - int nToSkip = 0, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0) override; + std::unique_ptr<DBClientCursor> query( + const NamespaceStringOrUUID& nsOrUuid, + Query query, + int nToReturn = 0, + int nToSkip = 0, + const BSONObj* fieldsToReturn = nullptr, + int queryOptions = 0, + int batchSize = 0, + boost::optional<BSONObj> readConcernObj = boost::none) override; /** Uses QueryOption_Exhaust, when available and specified in 'queryOptions'. @@ -628,14 +653,16 @@ public: Query query, const BSONObj* fieldsToReturn = nullptr, int queryOptions = QueryOption_Exhaust, - int batchSize = 0) final; + int batchSize = 0, + boost::optional<BSONObj> readConcernObj = boost::none) final; unsigned long long query(std::function<void(DBClientCursorBatchIterator&)> f, const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn = nullptr, int queryOptions = QueryOption_Exhaust, - int batchSize = 0) override; + int batchSize = 0, + boost::optional<BSONObj> readConcernObj = boost::none) override; /** don't use this - called automatically by DBClientCursor for you @@ -651,22 +678,39 @@ public: /** insert an object into the database */ - virtual void insert(const std::string& ns, BSONObj obj, int flags = 0); + virtual void insert(const std::string& ns, + BSONObj obj, + int flags = 0, + boost::optional<BSONObj> writeConcernObj = boost::none); /** insert a vector of objects into the database */ - virtual void insert(const std::string& ns, const std::vector<BSONObj>& v, int flags = 0); + virtual void insert(const std::string& ns, + const std::vector<BSONObj>& v, + int flags = 0, + boost::optional<BSONObj> writeConcernObj = boost::none); /** updates objects matching query */ - virtual void update( - const std::string& ns, Query query, BSONObj obj, bool upsert = false, bool multi = false); + virtual void update(const std::string& ns, + Query query, + BSONObj obj, + bool upsert = false, + bool multi = false, + boost::optional<BSONObj> writeConcernObj = boost::none); - virtual void update(const std::string& ns, Query query, BSONObj obj, int flags); + virtual void update(const std::string& ns, + Query query, + BSONObj obj, + int flags, + boost::optional<BSONObj> writeConcernObj = boost::none); - virtual void remove(const std::string& ns, Query query, int flags = 0); + virtual void remove(const std::string& ns, + Query query, + int flags = 0, + boost::optional<BSONObj> writeConcernObj = boost::none); virtual bool isFailed() const = 0; @@ -716,7 +760,8 @@ protected: const BSONObj& query, int options, int limit, - int skip); + int skip, + boost::optional<BSONObj> readConcernObj); /** * Look up the options available on this client. Caches the answer from diff --git a/src/mongo/client/dbclient_connection.cpp b/src/mongo/client/dbclient_connection.cpp index c7170733b1b..a5a6f3005ce 100644 --- a/src/mongo/client/dbclient_connection.cpp +++ b/src/mongo/client/dbclient_connection.cpp @@ -625,16 +625,18 @@ unsigned long long DBClientConnection::query(std::function<void(DBClientCursorBa Query query, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { + int batchSize, + boost::optional<BSONObj> readConcernObj) { if (!(queryOptions & QueryOption_Exhaust) || !(availableOptions() & QueryOption_Exhaust)) { - return DBClientBase::query(f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize); + return DBClientBase::query( + f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize, readConcernObj); } // mask options queryOptions &= (int)(QueryOption_NoCursorTimeout | QueryOption_SlaveOk | QueryOption_Exhaust); - unique_ptr<DBClientCursor> c( - this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize)); + unique_ptr<DBClientCursor> c(this->query( + nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize, readConcernObj)); // Note that this->query will throw for network errors, so it is OK to return a numeric // error code here. uassert(13386, "socket error for mapping query", c.get()); diff --git a/src/mongo/client/dbclient_connection.h b/src/mongo/client/dbclient_connection.h index 5d76701bc59..6c7a59b4995 100644 --- a/src/mongo/client/dbclient_connection.h +++ b/src/mongo/client/dbclient_connection.h @@ -151,16 +151,24 @@ public: */ void logout(const std::string& dbname, BSONObj& info) override; - std::unique_ptr<DBClientCursor> query(const NamespaceStringOrUUID& nsOrUuid, - Query query = Query(), - int nToReturn = 0, - int nToSkip = 0, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0) override { + std::unique_ptr<DBClientCursor> query( + const NamespaceStringOrUUID& nsOrUuid, + Query query = Query(), + int nToReturn = 0, + int nToSkip = 0, + const BSONObj* fieldsToReturn = nullptr, + int queryOptions = 0, + int batchSize = 0, + boost::optional<BSONObj> readConcernObj = boost::none) override { checkConnection(); - return DBClientBase::query( - nsOrUuid, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); + return DBClientBase::query(nsOrUuid, + query, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize, + readConcernObj); } unsigned long long query(std::function<void(DBClientCursorBatchIterator&)> f, @@ -168,7 +176,8 @@ public: Query query, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize = 0) override; + int batchSize = 0, + boost::optional<BSONObj> readConcernObj = boost::none) override; using DBClientBase::runCommandWithTarget; std::pair<rpc::UniqueReply, DBClientBase*> runCommandWithTarget(OpMsgRequest request) override; diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp index 77d5c802eb7..60034c05514 100644 --- a/src/mongo/client/dbclient_cursor.cpp +++ b/src/mongo/client/dbclient_cursor.cpp @@ -153,6 +153,12 @@ Message DBClientCursor::_assembleInit() { // QueryRequest doesn't handle $readPreference. cmd = BSONObjBuilder(std::move(cmd)).append(readPref).obj(); } + if (!cmd.hasField(repl::ReadConcernArgs::kReadConcernFieldName) && _readConcernObj) { + cmd = BSONObjBuilder(std::move(cmd)) + .append(repl::ReadConcernArgs::kReadConcernFieldName, *_readConcernObj) + .obj(); + } + return assembleCommandRequest(_client, ns.db(), opts, std::move(cmd)); } // else use legacy OP_QUERY request. @@ -516,7 +522,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client, int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) + int batchSize, + boost::optional<BSONObj> readConcernObj) : DBClientCursor(client, nsOrUuid, query, @@ -526,7 +533,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client, fieldsToReturn, queryOptions, batchSize, - {}) {} + {}, + readConcernObj) {} DBClientCursor::DBClientCursor(DBClientBase* client, const NamespaceStringOrUUID& nsOrUuid, @@ -543,7 +551,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client, nullptr, // fieldsToReturn queryOptions, 0, - std::move(initialBatch)) {} // batchSize + std::move(initialBatch), // batchSize + boost::none) {} DBClientCursor::DBClientCursor(DBClientBase* client, const NamespaceStringOrUUID& nsOrUuid, @@ -554,7 +563,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client, const BSONObj* fieldsToReturn, int queryOptions, int batchSize, - std::vector<BSONObj> initialBatch) + std::vector<BSONObj> initialBatch, + boost::optional<BSONObj> readConcernObj) : batch{std::move(initialBatch)}, _client(client), _originalHost(_client->getServerAddress()), @@ -572,7 +582,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client, cursorId(cursorId), _ownCursor(true), wasError(false), - _enabledBSONVersion(Validator<BSONObj>::enabledBSONVersion()) { + _enabledBSONVersion(Validator<BSONObj>::enabledBSONVersion()), + _readConcernObj(readConcernObj) { if (queryOptions & QueryOptionLocal_forceOpQuery) { // Legacy OP_QUERY does not support UUIDs. invariant(!_nsOrUuid.uuid()); diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h index b0e1551802b..33fb9f2e3c9 100644 --- a/src/mongo/client/dbclient_cursor.h +++ b/src/mongo/client/dbclient_cursor.h @@ -152,7 +152,8 @@ public: int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int bs); + int bs, + boost::optional<BSONObj> readConcernObj = boost::none); DBClientCursor(DBClientBase* client, const NamespaceStringOrUUID& nsOrUuid, @@ -273,7 +274,8 @@ private: const BSONObj* fieldsToReturn, int queryOptions, int bs, - std::vector<BSONObj> initialBatch); + std::vector<BSONObj> initialBatch, + boost::optional<BSONObj> readConcernObj); int nextBatchSize(); @@ -307,6 +309,7 @@ private: boost::optional<long long> _term; boost::optional<repl::OpTime> _lastKnownCommittedOpTime; boost::optional<BSONObj> _postBatchResumeToken; + boost::optional<BSONObj> _readConcernObj; void dataReceived(const Message& reply) { bool retry; diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp index 126b7d5f03b..d7744d2c841 100644 --- a/src/mongo/client/dbclient_rs.cpp +++ b/src/mongo/client/dbclient_rs.cpp @@ -522,20 +522,33 @@ void DBClientReplicaSet::logout(const string& dbname, BSONObj& info) { // ------------- simple functions ----------------- -void DBClientReplicaSet::insert(const string& ns, BSONObj obj, int flags) { - checkMaster()->insert(ns, obj, flags); +void DBClientReplicaSet::insert(const string& ns, + BSONObj obj, + int flags, + boost::optional<BSONObj> writeConcernObj) { + checkMaster()->insert(ns, obj, flags, writeConcernObj); } -void DBClientReplicaSet::insert(const string& ns, const vector<BSONObj>& v, int flags) { - checkMaster()->insert(ns, v, flags); +void DBClientReplicaSet::insert(const string& ns, + const vector<BSONObj>& v, + int flags, + boost::optional<BSONObj> writeConcernObj) { + checkMaster()->insert(ns, v, flags, writeConcernObj); } -void DBClientReplicaSet::remove(const string& ns, Query obj, int flags) { - checkMaster()->remove(ns, obj, flags); +void DBClientReplicaSet::remove(const string& ns, + Query obj, + int flags, + boost::optional<BSONObj> writeConcernObj) { + checkMaster()->remove(ns, obj, flags, writeConcernObj); } -void DBClientReplicaSet::update(const string& ns, Query query, BSONObj obj, int flags) { - return checkMaster()->update(ns, query, obj, flags); +void DBClientReplicaSet::update(const string& ns, + Query query, + BSONObj obj, + int flags, + boost::optional<BSONObj> writeConcernObj) { + return checkMaster()->update(ns, query, obj, flags, writeConcernObj); } unique_ptr<DBClientCursor> DBClientReplicaSet::query(const NamespaceStringOrUUID& nsOrUuid, @@ -544,7 +557,8 @@ unique_ptr<DBClientCursor> DBClientReplicaSet::query(const NamespaceStringOrUUID int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { + int batchSize, + boost::optional<BSONObj> readConcernObj) { shared_ptr<ReadPreferenceSetting> readPref(_extractReadPref(query.obj, queryOptions)); invariant(nsOrUuid.nss()); const string ns = nsOrUuid.nss()->ns(); @@ -573,8 +587,14 @@ unique_ptr<DBClientCursor> DBClientReplicaSet::query(const NamespaceStringOrUUID break; } - unique_ptr<DBClientCursor> cursor = conn->query( - nsOrUuid, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); + unique_ptr<DBClientCursor> cursor = conn->query(nsOrUuid, + query, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize, + readConcernObj); return checkSlaveQueryResult(std::move(cursor)); } catch (const DBException& ex) { @@ -599,14 +619,21 @@ unique_ptr<DBClientCursor> DBClientReplicaSet::query(const NamespaceStringOrUUID "dbclient_rs query to primary node in {getMonitor_getName}", "getMonitor_getName"_attr = _getMonitor()->getName()); - return checkMaster()->query( - nsOrUuid, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); + return checkMaster()->query(nsOrUuid, + query, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize, + readConcernObj); } BSONObj DBClientReplicaSet::findOne(const string& ns, const Query& query, const BSONObj* fieldsToReturn, - int queryOptions) { + int queryOptions, + boost::optional<BSONObj> readConcernObj) { shared_ptr<ReadPreferenceSetting> readPref(_extractReadPref(query.obj, queryOptions)); if (_isSecondaryQuery(ns, query.obj, *readPref)) { LOGV2_DEBUG(20135, @@ -633,7 +660,7 @@ BSONObj DBClientReplicaSet::findOne(const string& ns, break; } - return conn->findOne(ns, query, fieldsToReturn, queryOptions); + return conn->findOne(ns, query, fieldsToReturn, queryOptions, readConcernObj); } catch (const DBException& ex) { const Status status = ex.toStatus(str::stream() << "can't findone replica set node " << _lastSlaveOkHost.toString()); @@ -656,7 +683,7 @@ BSONObj DBClientReplicaSet::findOne(const string& ns, "dbclient_rs findOne to primary node in {getMonitor_getName}", "getMonitor_getName"_attr = _getMonitor()->getName()); - return checkMaster()->findOne(ns, query, fieldsToReturn, queryOptions); + return checkMaster()->findOne(ns, query, fieldsToReturn, queryOptions, readConcernObj); } void DBClientReplicaSet::killCursor(const NamespaceString& ns, long long cursorID) { diff --git a/src/mongo/client/dbclient_rs.h b/src/mongo/client/dbclient_rs.h index 48bb2c01c7e..9603842f7d9 100644 --- a/src/mongo/client/dbclient_rs.h +++ b/src/mongo/client/dbclient_rs.h @@ -89,29 +89,45 @@ public: // ----------- simple functions -------------- /** throws userassertion "no master found" */ - std::unique_ptr<DBClientCursor> query(const NamespaceStringOrUUID& nsOrUuid, - Query query, - int nToReturn = 0, - int nToSkip = 0, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0) override; + std::unique_ptr<DBClientCursor> query( + const NamespaceStringOrUUID& nsOrUuid, + Query query, + int nToReturn = 0, + int nToSkip = 0, + const BSONObj* fieldsToReturn = nullptr, + int queryOptions = 0, + int batchSize = 0, + boost::optional<BSONObj> readConcernObj = boost::none) override; /** throws userassertion "no master found" */ BSONObj findOne(const std::string& ns, const Query& query, const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0) override; + int queryOptions = 0, + boost::optional<BSONObj> readConcernObj = boost::none) override; - void insert(const std::string& ns, BSONObj obj, int flags = 0) override; + void insert(const std::string& ns, + BSONObj obj, + int flags = 0, + boost::optional<BSONObj> writeConcernObj = boost::none) override; /** insert multiple objects. Note that single object insert is asynchronous, so this version is only nominally faster and not worth a special effort to try to use. */ - void insert(const std::string& ns, const std::vector<BSONObj>& v, int flags = 0) override; - - void remove(const std::string& ns, Query obj, int flags) override; - - void update(const std::string& ns, Query query, BSONObj obj, int flags) override; + void insert(const std::string& ns, + const std::vector<BSONObj>& v, + int flags = 0, + boost::optional<BSONObj> writeConcernObj = boost::none) override; + + void remove(const std::string& ns, + Query obj, + int flags, + boost::optional<BSONObj> writeConcernObj = boost::none) override; + + void update(const std::string& ns, + Query query, + BSONObj obj, + int flags, + boost::optional<BSONObj> writeConcernObj = boost::none) override; void killCursor(const NamespaceString& ns, long long cursorID) override; |