From 81c6113198d2f5debf3da38a42bf61d7a079de2e Mon Sep 17 00:00:00 2001 From: Kevin Pulo Date: Thu, 20 Feb 2020 21:42:07 +1100 Subject: SERVER-45692 add explicit RWC to inter-node commands (even if merely kImplicitDefault) (cherry picked from commit 747ff353cbc819d032fa727d4bd7ffad16ea0437) --- jstests/libs/kill_sessions.js | 2 + jstests/multiVersion/config_chunks_tags_set_fcv.js | 15 +- ...tion_between_mixed_FCV_mixed_version_mongods.js | 16 +- jstests/multiVersion/migrations_with_mixed_fcv.js | 17 +- .../isMaster_feature_compatibility_version.js | 17 +- jstests/replsets/awaitable_ismaster_fcv_change.js | 17 +- .../read_write_concern_defaults_application.js | 135 ++++++++------ src/mongo/client/dbclient_base.cpp | 193 +++++++++++++++------ src/mongo/client/dbclient_base.h | 129 +++++++++----- src/mongo/client/dbclient_connection.cpp | 10 +- src/mongo/client/dbclient_connection.h | 29 ++-- src/mongo/client/dbclient_cursor.cpp | 21 ++- src/mongo/client/dbclient_cursor.h | 7 +- src/mongo/client/dbclient_rs.cpp | 59 +++++-- src/mongo/client/dbclient_rs.h | 44 +++-- src/mongo/db/cloner.cpp | 5 +- src/mongo/db/dbdirectclient.cpp | 15 +- src/mongo/db/dbdirectclient.h | 19 +- .../shardsvr_process_interface.cpp | 46 ++--- src/mongo/db/pipeline/sharded_agg_helpers.cpp | 17 +- src/mongo/db/query/query_request.cpp | 4 + src/mongo/db/read_concern_mongod.cpp | 4 +- src/mongo/db/repl/SConscript | 1 + src/mongo/db/repl/collection_cloner.cpp | 10 +- src/mongo/db/repl/initial_syncer.cpp | 3 +- src/mongo/db/repl/oplog_interface_remote.cpp | 11 +- src/mongo/db/repl/read_concern_args.cpp | 5 + src/mongo/db/repl/read_concern_args.h | 2 + src/mongo/db/repl/replication_info.cpp | 10 +- .../db/repl/roll_back_local_operations_test.cpp | 3 +- src/mongo/db/repl/rollback_source_impl.cpp | 9 +- src/mongo/db/repl/sync_source_resolver.cpp | 8 +- src/mongo/db/s/add_shard_cmd.cpp | 2 +- src/mongo/db/s/balancer/migration_manager.cpp | 11 ++ .../db/s/migration_chunk_cloner_source_legacy.cpp | 22 +++ src/mongo/db/s/migration_util.cpp | 4 +- src/mongo/db/s/shard_key_util.cpp | 3 +- src/mongo/db/sessions_collection.cpp | 12 +- src/mongo/db/write_concern_options.cpp | 6 + src/mongo/db/write_concern_options.h | 1 + .../dbtests/mock/mock_dbclient_connection.cpp | 27 ++- src/mongo/dbtests/mock/mock_dbclient_connection.h | 34 ++-- src/mongo/dbtests/mock/mock_remote_db_server.cpp | 3 +- src/mongo/dbtests/mock/mock_remote_db_server.h | 3 +- src/mongo/s/commands/cluster_find_cmd.cpp | 2 +- src/mongo/s/commands/cluster_write_cmd.cpp | 24 +++ src/mongo/s/query/cluster_aggregation_planner.cpp | 10 +- .../s/request_types/merge_chunk_request_test.cpp | 6 +- .../s/request_types/merge_chunk_request_type.cpp | 3 +- .../migration_secondary_throttle_options.cpp | 8 +- .../migration_secondary_throttle_options_test.cpp | 32 ++-- .../s/request_types/split_chunk_request_test.cpp | 6 +- .../s/request_types/split_chunk_request_type.cpp | 3 +- src/mongo/s/transaction_router.cpp | 18 +- src/mongo/s/transaction_router_test.cpp | 7 +- src/mongo/shell/encrypted_dbclient_base.cpp | 26 ++- src/mongo/shell/encrypted_dbclient_base.h | 16 +- src/mongo/shell/feature_compatibility_version.js | 8 +- src/mongo/shell/mongo.js | 16 ++ src/mongo/shell/replsettest.js | 61 +++---- 60 files changed, 852 insertions(+), 405 deletions(-) diff --git a/jstests/libs/kill_sessions.js b/jstests/libs/kill_sessions.js index c05dae370a6..8fcb114e7a2 100644 --- a/jstests/libs/kill_sessions.js +++ b/jstests/libs/kill_sessions.js @@ -290,6 +290,8 @@ var _kill_sessions_api_module = (function() { "$readPreference": { mode: "primaryPreferred", }, + readConcern: {}, + writeConcern: {w: 1}, }; var cursors = {}; diff --git a/jstests/multiVersion/config_chunks_tags_set_fcv.js b/jstests/multiVersion/config_chunks_tags_set_fcv.js index 66189823b59..14d14f663f4 100644 --- a/jstests/multiVersion/config_chunks_tags_set_fcv.js +++ b/jstests/multiVersion/config_chunks_tags_set_fcv.js @@ -16,9 +16,18 @@ function verifyChunkOperationsFailDuringSetFCV(st, ns) { ErrorCodes.ConflictingOperationInProgress); verifyChunkDistribution(st, ns, [2, 1]); - assert.commandFailedWithCode( - st.s.adminCommand({moveChunk: ns, find: {_id: 0}, to: st.shard0.shardName}), - ErrorCodes.ConflictingOperationInProgress); + // Shards running with old FCV won't automatically add writeConcern when running moveChunk or + // _recvChunkStart on shards, which shards running FCV find objectionable. So we pass explicit + // writeConcern to the mongos moveChunk command (which also requires secondaryThrottle: true), + // which causes it to be passed through to the shard commands. + assert.commandFailedWithCode(st.s.adminCommand({ + moveChunk: ns, + find: {_id: 0}, + to: st.shard0.shardName, + secondaryThrottle: true, + writeConcern: {w: 1} + }), + ErrorCodes.ConflictingOperationInProgress); verifyChunkDistribution(st, ns, [2, 1]); assert.commandFailedWithCode( diff --git a/jstests/multiVersion/genericSetFCVUsage/migration_between_mixed_FCV_mixed_version_mongods.js b/jstests/multiVersion/genericSetFCVUsage/migration_between_mixed_FCV_mixed_version_mongods.js index 94fdb1172f3..fb8764c68eb 100644 --- a/jstests/multiVersion/genericSetFCVUsage/migration_between_mixed_FCV_mixed_version_mongods.js +++ b/jstests/multiVersion/genericSetFCVUsage/migration_between_mixed_FCV_mixed_version_mongods.js @@ -30,11 +30,17 @@ checkFCV(st.shard0.getDB("admin"), latestFCV); checkFCV(st.shard1.getDB("admin"), lastStableFCV); // It is not possible to move a chunk from a latestFCV shard to a last-stable binary version -// shard. -assert.commandFailedWithCode( - st.s.adminCommand( - {moveChunk: testDB.coll.getFullName(), find: {a: 1}, to: st.shard1.shardName}), - ErrorCodes.IncompatibleServerVersion); +// shard. Pass explicit writeConcern (which requires secondaryThrottle: true) to avoid problems +// if the last-stable doesn't automatically include writeConcern when running _recvChunkStart on the +// newer shard. +assert.commandFailedWithCode(st.s.adminCommand({ + moveChunk: testDB.coll.getFullName(), + find: {a: 1}, + to: st.shard1.shardName, + secondaryThrottle: true, + writeConcern: {w: 1} +}), + ErrorCodes.IncompatibleServerVersion); st.stop(); })(); diff --git a/jstests/multiVersion/migrations_with_mixed_fcv.js b/jstests/multiVersion/migrations_with_mixed_fcv.js index 3b6c9178439..5285053b711 100644 --- a/jstests/multiVersion/migrations_with_mixed_fcv.js +++ b/jstests/multiVersion/migrations_with_mixed_fcv.js @@ -181,10 +181,19 @@ function testMigrateFromLastStableToLatest() { checkFCV(st.shard0.getDB("admin"), lastStableFCV); checkFCV(st.shard1.getDB("admin"), latestFCV); - // Move chunk [50, inf) to shard1 should fail. - assert.commandFailedWithCode( - st.s.adminCommand({moveChunk: ns, find: {x: 50}, to: st.shard1.shardName}), - ErrorCodes.ConflictingOperationInProgress); + // Move chunk [50, inf) to shard1 should fail. Since shard1 is running FCV 4.4, it expects + // _recvChunkStart to include explicit writeConcern. Since shard0 is running FCV 4.2, it will + // not add it automatically. So we pass explicit writeConcern to the mongos moveChunk command + // (which also requires secondaryThrottle: true), which causes it to be passed through + // explicitly to shard0, which will use it when calling _recvChunkStart on shard1 + assert.commandFailedWithCode(st.s.adminCommand({ + moveChunk: ns, + find: {x: 50}, + to: st.shard1.shardName, + secondaryThrottle: true, + writeConcern: {w: 1} + }), + ErrorCodes.ConflictingOperationInProgress); st.stop(); } diff --git a/jstests/noPassthroughWithMongod/isMaster_feature_compatibility_version.js b/jstests/noPassthroughWithMongod/isMaster_feature_compatibility_version.js index 62ba469ba73..7dcc280c066 100644 --- a/jstests/noPassthroughWithMongod/isMaster_feature_compatibility_version.js +++ b/jstests/noPassthroughWithMongod/isMaster_feature_compatibility_version.js @@ -7,6 +7,11 @@ "use strict"; const adminDB = db.getSiblingDB("admin"); + +// This test manually runs isMaster with internalClient, which means that to the mongod, the +// connection appears to be from another server. Since mongod expects other cluster members to +// always include explicit read/write concern (on commands that accept read/write concern), this +// test must be careful to mimic this behavior. const isMasterCommand = { isMaster: 1, internalClient: {minWireVersion: NumberInt(0), maxWireVersion: NumberInt(7)} @@ -23,7 +28,8 @@ assert.eq(res.minWireVersion, res.maxWireVersion, tojson(res)); // returns minWireVersion == maxWireVersion. assert.commandWorked( adminDB.system.version.update({_id: "featureCompatibilityVersion"}, - {$set: {version: lastStableFCV, targetVersion: latestFCV}})); + {$set: {version: lastStableFCV, targetVersion: latestFCV}}, + {writeConcern: {w: 1}})); res = adminDB.runCommand(isMasterCommand); assert.commandWorked(res); assert.eq(res.minWireVersion, res.maxWireVersion, tojson(res)); @@ -32,14 +38,16 @@ assert.eq(res.minWireVersion, res.maxWireVersion, tojson(res)); // returns minWireVersion == maxWireVersion. assert.commandWorked( adminDB.system.version.update({_id: "featureCompatibilityVersion"}, - {$set: {version: lastStableFCV, targetVersion: lastStableFCV}})); + {$set: {version: lastStableFCV, targetVersion: lastStableFCV}}, + {writeConcern: {w: 1}})); res = adminDB.runCommand(isMasterCommand); assert.commandWorked(res); assert.eq(res.minWireVersion, res.maxWireVersion, tojson(res)); // When the featureCompatibilityVersion is equal to the downgrade version, running isMaster with // internalClient returns minWireVersion + 1 == maxWireVersion. -assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: lastStableFCV})); +assert.commandWorked( + adminDB.runCommand({setFeatureCompatibilityVersion: lastStableFCV, writeConcern: {w: 1}})); res = adminDB.runCommand(isMasterCommand); assert.commandWorked(res); assert.eq(res.minWireVersion + 1, res.maxWireVersion, tojson(res)); @@ -47,7 +55,8 @@ assert.eq(res.minWireVersion + 1, res.maxWireVersion, tojson(res)); // When the internalClient field is missing from the isMaster command, the response returns the // full wire version range from minWireVersion == 0 to maxWireVersion == latest version, even if // the featureCompatibilityVersion is equal to the upgrade version. -assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: latestFCV})); +assert.commandWorked( + adminDB.runCommand({setFeatureCompatibilityVersion: latestFCV, writeConcern: {w: 1}})); res = adminDB.runCommand({isMaster: 1}); assert.commandWorked(res); assert.eq(res.minWireVersion, 0, tojson(res)); diff --git a/jstests/replsets/awaitable_ismaster_fcv_change.js b/jstests/replsets/awaitable_ismaster_fcv_change.js index 5099acf462d..b89c7300b59 100644 --- a/jstests/replsets/awaitable_ismaster_fcv_change.js +++ b/jstests/replsets/awaitable_ismaster_fcv_change.js @@ -19,6 +19,11 @@ const secondary = rst.getSecondary(); const primaryAdminDB = primary.getDB("admin"); const secondaryAdminDB = secondary.getDB("admin"); +// This test manually runs isMaster with internalClient, which means that to the mongod, the +// connection appears to be from another server. Since mongod expects other cluster members to +// always include explicit read/write concern (on commands that accept read/write concern), this +// test must be careful to mimic this behavior. + // Get the server topologyVersion, minWireVersion, and maxWireversion. const primaryResult = assert.commandWorked(primaryAdminDB.runCommand( {isMaster: 1, internalClient: {minWireVersion: NumberInt(0), maxWireVersion: NumberInt(9)}})); @@ -109,13 +114,15 @@ primaryFailPoint.wait(); secondaryFailPoint.wait(); // Setting the FCV to the same version will not trigger an isMaster response. -assert.commandWorked(primaryAdminDB.runCommand({setFeatureCompatibilityVersion: latestFCV})); +assert.commandWorked( + primaryAdminDB.runCommand({setFeatureCompatibilityVersion: latestFCV, writeConcern: {w: 1}})); checkFCV(primaryAdminDB, latestFCV); checkFCV(secondaryAdminDB, latestFCV); jsTestLog("Downgrade the featureCompatibilityVersion."); // Downgrading the FCV will cause the isMaster requests to respond on both primary and secondary. -assert.commandWorked(primaryAdminDB.runCommand({setFeatureCompatibilityVersion: lastStableFCV})); +assert.commandWorked(primaryAdminDB.runCommand( + {setFeatureCompatibilityVersion: lastStableFCV, writeConcern: {w: 1}})); awaitIsMasterBeforeDowngradeFCVOnPrimary(); awaitIsMasterBeforeDowngradeFCVOnSecondary(); // Ensure the featureCompatibilityVersion document update has been replicated. @@ -162,13 +169,15 @@ primaryFailPoint.wait(); secondaryFailPoint.wait(); // Setting the FCV to the same version will not trigger an isMaster response. -assert.commandWorked(primaryAdminDB.runCommand({setFeatureCompatibilityVersion: lastStableFCV})); +assert.commandWorked(primaryAdminDB.runCommand( + {setFeatureCompatibilityVersion: lastStableFCV, writeConcern: {w: 1}})); checkFCV(primaryAdminDB, lastStableFCV); checkFCV(secondaryAdminDB, lastStableFCV); jsTestLog("Upgrade the featureCompatibilityVersion."); // Upgrading the FCV will cause the isMaster requests to respond on both primary and secondary. -assert.commandWorked(primaryAdminDB.runCommand({setFeatureCompatibilityVersion: latestFCV})); +assert.commandWorked( + primaryAdminDB.runCommand({setFeatureCompatibilityVersion: latestFCV, writeConcern: {w: 1}})); awaitIsMasterBeforeUpgradeFCVOnPrimary(); awaitIsMasterBeforeUpgradeFCVOnSecondary(); // Ensure the featureCompatibilityVersion document update has been replicated. diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js index 72fec834018..d53cce7a65d 100644 --- a/jstests/sharding/read_write_concern_defaults_application.js +++ b/jstests/sharding/read_write_concern_defaults_application.js @@ -4,10 +4,12 @@ * The following fields are required for each command that is not skipped: * * - setUp: [OPTIONAL] A function that does any set up (inserts, etc.) needed to check the command's - * results. + * results. These operations will run with the default RWC set, so ensure they use appropriate + * explicit RWC if necessary. * - command: The command to run, with all required options. If a function, is called with the * connection as the argument, and the returned object is used. The readConcern/writeConcern - * fields may be appended to this object. + * fields may be appended to this object. The command object cannot include its own explicit + * readConcern/writeConcern fields. * - checkReadConcern: Boolean that controls whether to check the application of readConcern. * - checkWriteConcern: Boolean that controls whether to check the application of writeConcern. * - db: [OPTIONAL] The database to run the command against. @@ -117,10 +119,11 @@ let testCases = { _transferMods: {skip: "internal command"}, abortTransaction: { setUp: function(conn) { - assert.commandWorked(conn.getDB(db).runCommand({create: coll})); + assert.commandWorked(conn.getDB(db).runCommand({create: coll, writeConcern: {w: 1}})); // Ensure that the dbVersion is known. - assert.commandWorked(conn.getCollection(nss).insert({x: 1})); - assert.eq(1, conn.getCollection(nss).findOne({x: 1}).x); + assert.commandWorked(conn.getCollection(nss).insert({x: 1}, {writeConcern: {w: 1}})); + assert.eq(1, + conn.getCollection(nss).find({x: 1}).readConcern("local").limit(1).next().x); // Start the transaction. assert.commandWorked(conn.getDB(db).runCommand({ insert: coll, @@ -143,7 +146,7 @@ let testCases = { addShardToZone: {skip: "does not accept read or write concern"}, aggregate: { setUp: function(conn) { - assert.commandWorked(conn.getCollection(nss).insert({x: 1})); + assert.commandWorked(conn.getCollection(nss).insert({x: 1}, {writeConcern: {w: 1}})); }, command: {aggregate: coll, pipeline: [{$match: {x: 1}}, {$out: "out"}], cursor: {}}, checkReadConcern: true, @@ -173,7 +176,7 @@ let testCases = { clone: {skip: "deprecated"}, cloneCollectionAsCapped: { setUp: function(conn) { - assert.commandWorked(conn.getDB(db).runCommand({create: coll})); + assert.commandWorked(conn.getDB(db).runCommand({create: coll, writeConcern: {w: 1}})); }, command: {cloneCollectionAsCapped: coll, toCollection: coll + "2", size: 10 * 1024 * 1024}, checkReadConcern: false, @@ -181,7 +184,7 @@ let testCases = { }, collMod: { setUp: function(conn) { - assert.commandWorked(conn.getDB(db).runCommand({create: coll})); + assert.commandWorked(conn.getDB(db).runCommand({create: coll, writeConcern: {w: 1}})); }, command: {collMod: coll, validator: {}}, checkReadConcern: false, @@ -190,10 +193,11 @@ let testCases = { collStats: {skip: "does not accept read or write concern"}, commitTransaction: { setUp: function(conn) { - assert.commandWorked(conn.getDB(db).runCommand({create: coll})); + assert.commandWorked(conn.getDB(db).runCommand({create: coll, writeConcern: {w: 1}})); // Ensure that the dbVersion is known. - assert.commandWorked(conn.getCollection(nss).insert({x: 1})); - assert.eq(1, conn.getCollection(nss).findOne({x: 1}).x); + assert.commandWorked(conn.getCollection(nss).insert({x: 1}, {writeConcern: {w: 1}})); + assert.eq(1, + conn.getCollection(nss).find({x: 1}).readConcern("local").limit(1).next().x); // Start the transaction. assert.commandWorked(conn.getDB(db).runCommand({ insert: coll, @@ -219,7 +223,7 @@ let testCases = { connectionStatus: {skip: "does not accept read or write concern"}, convertToCapped: { setUp: function(conn) { - assert.commandWorked(conn.getDB(db).runCommand({create: coll})); + assert.commandWorked(conn.getDB(db).runCommand({create: coll, writeConcern: {w: 1}})); }, command: {convertToCapped: coll, size: 10 * 1024 * 1024}, checkReadConcern: false, @@ -228,7 +232,7 @@ let testCases = { coordinateCommitTransaction: {skip: "internal command"}, count: { setUp: function(conn) { - assert.commandWorked(conn.getCollection(nss).insert({x: 1})); + assert.commandWorked(conn.getCollection(nss).insert({x: 1}, {writeConcern: {w: 1}})); }, command: {count: coll, query: {x: 1}}, checkReadConcern: true, @@ -242,7 +246,7 @@ let testCases = { }, createIndexes: { setUp: function(conn) { - assert.commandWorked(conn.getCollection(nss).insert({x: 1})); + assert.commandWorked(conn.getCollection(nss).insert({x: 1}, {writeConcern: {w: 1}})); }, command: {createIndexes: coll, indexes: [{key: {x: 1}, name: "foo"}]}, checkReadConcern: false, @@ -269,7 +273,7 @@ let testCases = { dbStats: {skip: "does not accept read or write concern"}, delete: { setUp: function(conn) { - assert.commandWorked(conn.getCollection(nss).insert({x: 1})); + assert.commandWorked(conn.getCollection(nss).insert({x: 1}, {writeConcern: {w: 1}})); }, command: {delete: coll, deletes: [{q: {x: 1}, limit: 1}]}, checkReadConcern: false, @@ -280,7 +284,7 @@ let testCases = { }, distinct: { setUp: function(conn) { - assert.commandWorked(conn.getCollection(nss).insert({x: 1})); + assert.commandWorked(conn.getCollection(nss).insert({x: 1}, {writeConcern: {w: 1}})); }, command: {distinct: coll, key: "x"}, checkReadConcern: true, @@ -289,7 +293,7 @@ let testCases = { driverOIDTest: {skip: "internal command"}, drop: { setUp: function(conn) { - assert.commandWorked(conn.getDB(db).runCommand({create: coll})); + assert.commandWorked(conn.getDB(db).runCommand({create: coll, writeConcern: {w: 1}})); }, command: {drop: coll}, checkReadConcern: false, @@ -297,8 +301,8 @@ let testCases = { }, dropAllRolesFromDatabase: { setUp: function(conn) { - assert.commandWorked( - conn.getDB(db).runCommand({createRole: "foo", privileges: [], roles: []})); + assert.commandWorked(conn.getDB(db).runCommand( + {createRole: "foo", privileges: [], roles: [], writeConcern: {w: 1}})); }, command: {dropAllRolesFromDatabase: 1}, checkReadConcern: false, @@ -308,8 +312,8 @@ let testCases = { }, dropAllUsersFromDatabase: { setUp: function(conn) { - assert.commandWorked( - conn.getDB(db).runCommand({createUser: "foo", pwd: "bar", roles: []})); + assert.commandWorked(conn.getDB(db).runCommand( + {createUser: "foo", pwd: "bar", roles: [], writeConcern: {w: 1}})); }, command: {dropAllUsersFromDatabase: 1}, checkReadConcern: false, @@ -321,9 +325,12 @@ let testCases = { dropDatabase: {skip: "not profiled or logged"}, dropIndexes: { setUp: function(conn) { - assert.commandWorked(conn.getCollection(nss).insert({x: 1})); - assert.commandWorked(conn.getDB(db).runCommand( - {createIndexes: coll, indexes: [{key: {x: 1}, name: "foo"}]})); + assert.commandWorked(conn.getCollection(nss).insert({x: 1}, {writeConcern: {w: 1}})); + assert.commandWorked(conn.getDB(db).runCommand({ + createIndexes: coll, + indexes: [{key: {x: 1}, name: "foo"}], + writeConcern: {w: 1} + })); }, command: {dropIndexes: coll, index: "foo"}, checkReadConcern: false, @@ -331,8 +338,8 @@ let testCases = { }, dropRole: { setUp: function(conn) { - assert.commandWorked( - conn.getDB(db).runCommand({createRole: "foo", privileges: [], roles: []})); + assert.commandWorked(conn.getDB(db).runCommand( + {createRole: "foo", privileges: [], roles: [], writeConcern: {w: 1}})); }, command: {dropRole: "foo"}, checkReadConcern: false, @@ -342,8 +349,8 @@ let testCases = { }, dropUser: { setUp: function(conn) { - assert.commandWorked( - conn.getDB(db).runCommand({createUser: "foo", pwd: "bar", roles: []})); + assert.commandWorked(conn.getDB(db).runCommand( + {createUser: "foo", pwd: "bar", roles: [], writeConcern: {w: 1}})); }, command: {dropUser: "foo"}, checkReadConcern: false, @@ -360,7 +367,7 @@ let testCases = { filemd5: {skip: "does not accept read or write concern"}, find: { setUp: function(conn) { - assert.commandWorked(conn.getCollection(nss).insert({x: 1})); + assert.commandWorked(conn.getCollection(nss).insert({x: 1}, {writeConcern: {w: 1}})); }, command: {find: coll, filter: {x: 1}}, checkReadConcern: true, @@ -368,7 +375,7 @@ let testCases = { }, findAndModify: { setUp: function(conn) { - assert.commandWorked(conn.getCollection(nss).insert({x: 1})); + assert.commandWorked(conn.getCollection(nss).insert({x: 1}, {writeConcern: {w: 1}})); }, command: {findAndModify: coll, query: {x: 1}, update: {$set: {x: 2}}}, checkReadConcern: false, @@ -380,8 +387,11 @@ let testCases = { fsyncUnlock: {skip: "does not accept read or write concern"}, geoSearch: { setUp: function(conn) { - assert.commandWorked( - conn.getCollection(nss).createIndex({loc: "geoHaystack", foo: 1}, {bucketSize: 1})); + assert.commandWorked(conn.getDB(db).runCommand({ + createIndexes: coll, + indexes: [{key: {loc: "geoHaystack", foo: 1}, bucketSize: 1, name: "foo"}], + writeConcern: {w: 1} + })); }, command: {geoSearch: coll, search: {}, near: [0, 0], maxDistance: 1}, checkReadConcern: true, @@ -402,8 +412,8 @@ let testCases = { godinsert: {skip: "for testing only"}, grantPrivilegesToRole: { setUp: function(conn) { - assert.commandWorked( - conn.getDB(db).runCommand({createRole: "foo", privileges: [], roles: []})); + assert.commandWorked(conn.getDB(db).runCommand( + {createRole: "foo", privileges: [], roles: [], writeConcern: {w: 1}})); }, command: { grantPrivilegesToRole: "foo", @@ -416,10 +426,10 @@ let testCases = { }, grantRolesToRole: { setUp: function(conn) { - assert.commandWorked( - conn.getDB(db).runCommand({createRole: "foo", privileges: [], roles: []})); - assert.commandWorked( - conn.getDB(db).runCommand({createRole: "bar", privileges: [], roles: []})); + assert.commandWorked(conn.getDB(db).runCommand( + {createRole: "foo", privileges: [], roles: [], writeConcern: {w: 1}})); + assert.commandWorked(conn.getDB(db).runCommand( + {createRole: "bar", privileges: [], roles: [], writeConcern: {w: 1}})); }, command: {grantRolesToRole: "foo", roles: [{role: "bar", db: db}]}, checkReadConcern: false, @@ -429,10 +439,10 @@ let testCases = { }, grantRolesToUser: { setUp: function(conn) { - assert.commandWorked( - conn.getDB(db).runCommand({createRole: "foo", privileges: [], roles: []})); - assert.commandWorked( - conn.getDB(db).runCommand({createUser: "foo", pwd: "bar", roles: []})); + assert.commandWorked(conn.getDB(db).runCommand( + {createRole: "foo", privileges: [], roles: [], writeConcern: {w: 1}})); + assert.commandWorked(conn.getDB(db).runCommand( + {createUser: "foo", pwd: "bar", roles: [], writeConcern: {w: 1}})); }, command: {grantRolesToUser: "foo", roles: [{role: "foo", db: db}]}, checkReadConcern: false, @@ -445,7 +455,7 @@ let testCases = { httpClientRequest: {skip: "does not accept read or write concern"}, insert: { setUp: function(conn) { - assert.commandWorked(conn.getDB(db).runCommand({create: coll})); + assert.commandWorked(conn.getDB(db).runCommand({create: coll, writeConcern: {w: 1}})); }, command: {insert: coll, documents: [{_id: ObjectId()}]}, checkReadConcern: false, @@ -497,7 +507,7 @@ let testCases = { removeShardFromZone: {skip: "does not accept read or write concern"}, renameCollection: { setUp: function(conn) { - assert.commandWorked(conn.getDB(db).runCommand({create: coll})); + assert.commandWorked(conn.getDB(db).runCommand({create: coll, writeConcern: {w: 1}})); }, command: {renameCollection: nss, to: nss + "2"}, db: "admin", @@ -528,7 +538,8 @@ let testCases = { assert.commandWorked(conn.getDB(db).runCommand({ createRole: "foo", privileges: [{resource: {db: db, collection: coll}, actions: ["find"]}], - roles: [] + roles: [], + writeConcern: {w: 1} })); }, command: { @@ -542,10 +553,14 @@ let testCases = { }, revokeRolesFromRole: { setUp: function(conn) { - assert.commandWorked( - conn.getDB(db).runCommand({createRole: "bar", privileges: [], roles: []})); assert.commandWorked(conn.getDB(db).runCommand( - {createRole: "foo", privileges: [], roles: [{role: "bar", db: db}]})); + {createRole: "bar", privileges: [], roles: [], writeConcern: {w: 1}})); + assert.commandWorked(conn.getDB(db).runCommand({ + createRole: "foo", + privileges: [], + roles: [{role: "bar", db: db}], + writeConcern: {w: 1} + })); }, command: {revokeRolesFromRole: "foo", roles: [{role: "foo", db: db}]}, checkReadConcern: false, @@ -555,10 +570,14 @@ let testCases = { }, revokeRolesFromUser: { setUp: function(conn) { - assert.commandWorked( - conn.getDB(db).runCommand({createRole: "foo", privileges: [], roles: []})); assert.commandWorked(conn.getDB(db).runCommand( - {createUser: "foo", pwd: "bar", roles: [{role: "foo", db: db}]})); + {createRole: "foo", privileges: [], roles: [], writeConcern: {w: 1}})); + assert.commandWorked(conn.getDB(db).runCommand({ + createUser: "foo", + pwd: "bar", + roles: [{role: "foo", db: db}], + writeConcern: {w: 1} + })); }, command: {revokeRolesFromUser: "foo", roles: [{role: "foo", db: db}]}, checkReadConcern: false, @@ -593,7 +612,7 @@ let testCases = { unsetSharding: {skip: "internal command"}, update: { setUp: function(conn) { - assert.commandWorked(conn.getCollection(nss).insert({x: 1})); + assert.commandWorked(conn.getCollection(nss).insert({x: 1}, {writeConcern: {w: 1}})); }, command: {update: coll, updates: [{q: {x: 1}, u: {x: 2}}]}, checkReadConcern: false, @@ -604,8 +623,8 @@ let testCases = { }, updateRole: { setUp: function(conn) { - assert.commandWorked( - conn.getDB(db).runCommand({createRole: "foo", privileges: [], roles: []})); + assert.commandWorked(conn.getDB(db).runCommand( + {createRole: "foo", privileges: [], roles: [], writeConcern: {w: 1}})); }, command: {updateRole: "foo", privileges: []}, checkReadConcern: false, @@ -615,8 +634,8 @@ let testCases = { }, updateUser: { setUp: function(conn) { - assert.commandWorked( - conn.getDB(db).runCommand({createUser: "foo", pwd: "bar", roles: []})); + assert.commandWorked(conn.getDB(db).runCommand( + {createUser: "foo", pwd: "bar", roles: [], writeConcern: {w: 1}})); }, command: {updateUser: "foo", pwd: "bar2"}, checkReadConcern: false, @@ -788,13 +807,15 @@ function runScenario( // Do any test-specific setup. if (typeof (test.setUp) === "function") { - test.setUp(conn); + conn._runWithForcedReadMode("commands", test.setUp); } // Get the command from the test case. let actualCmd = (typeof (test.command) === "function") ? test.command(conn) : Object.assign({}, test.command, {}); + assert.eq("undefined", typeof (actualCmd.readConcern)); + assert.eq("undefined", typeof (actualCmd.writeConcern)); // Add extra fields for RWC if necessary, and an identifying comment. // When sharded, the field order is: comment, readConcern, writeConcern. diff --git a/src/mongo/client/dbclient_base.cpp b/src/mongo/client/dbclient_base.cpp index caad3d6cc98..4e2d1b36a19 100644 --- a/src/mongo/client/dbclient_base.cpp +++ b/src/mongo/client/dbclient_base.cpp @@ -319,10 +319,14 @@ bool DBClientBase::runPseudoCommand(StringData db, return success; } -long long DBClientBase::count( - const NamespaceStringOrUUID nsOrUuid, const BSONObj& query, int options, int limit, int skip) { +long long DBClientBase::count(const NamespaceStringOrUUID nsOrUuid, + const BSONObj& query, + int options, + int limit, + int skip, + boost::optional readConcernObj) { auto dbName = (nsOrUuid.uuid() ? nsOrUuid.dbname() : (*nsOrUuid.nss()).db().toString()); - BSONObj cmd = _countCmd(nsOrUuid, query, options, limit, skip); + BSONObj cmd = _countCmd(nsOrUuid, query, options, limit, skip, readConcernObj); BSONObj res; if (!runCommand(dbName, cmd, res, options)) { auto status = getStatusFromCommandResult(res); @@ -332,8 +336,12 @@ long long DBClientBase::count( return res["n"].numberLong(); } -BSONObj DBClientBase::_countCmd( - const NamespaceStringOrUUID nsOrUuid, const BSONObj& query, int options, int limit, int skip) { +BSONObj DBClientBase::_countCmd(const NamespaceStringOrUUID nsOrUuid, + const BSONObj& query, + int options, + int limit, + int skip, + boost::optional readConcernObj) { BSONObjBuilder b; if (nsOrUuid.uuid()) { const auto uuid = *nsOrUuid.uuid(); @@ -346,6 +354,9 @@ BSONObj DBClientBase::_countCmd( b.append("limit", limit); if (skip) b.append("skip", skip); + if (readConcernObj) { + b.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcernObj); + } return b.obj(); } @@ -541,8 +552,12 @@ bool DBClientBase::isMaster(bool& isMaster, BSONObj* info) { return ok; } -bool DBClientBase::createCollection( - const string& ns, long long size, bool capped, int max, BSONObj* info) { +bool DBClientBase::createCollection(const string& ns, + long long size, + bool capped, + int max, + BSONObj* info, + boost::optional writeConcernObj) { verify(!capped || size); BSONObj o; if (info == nullptr) @@ -556,6 +571,9 @@ bool DBClientBase::createCollection( b.append("capped", true); if (max) b.append("max", max); + if (writeConcernObj) { + b.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } return runCommand(db.c_str(), b.done(), *info); } @@ -644,11 +662,18 @@ void DBClientBase::findN(vector& out, int nToReturn, int nToSkip, const BSONObj* fieldsToReturn, - int queryOptions) { + int queryOptions, + boost::optional readConcernObj) { out.reserve(nToReturn); - unique_ptr c = - this->query(NamespaceString(ns), query, nToReturn, nToSkip, fieldsToReturn, queryOptions); + unique_ptr c = this->query(NamespaceString(ns), + query, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + 0 /* batchSize */, + readConcernObj); // query() throws on network error so OK to uassert with numeric code here. uassert(10276, @@ -672,15 +697,18 @@ void DBClientBase::findN(vector& out, BSONObj DBClientBase::findOne(const string& ns, const Query& query, const BSONObj* fieldsToReturn, - int queryOptions) { + int queryOptions, + boost::optional readConcernObj) { vector v; - findN(v, ns, query, 1, 0, fieldsToReturn, queryOptions); + findN(v, ns, query, 1, 0, fieldsToReturn, queryOptions, readConcernObj); return v.empty() ? BSONObj() : v[0]; } -std::pair DBClientBase::findOneByUUID(const std::string& db, - UUID uuid, - const BSONObj& filter) { +std::pair DBClientBase::findOneByUUID( + const std::string& db, + UUID uuid, + const BSONObj& filter, + boost::optional readConcernObj) { list results; BSONObj res; @@ -689,6 +717,9 @@ std::pair DBClientBase::findOneByUUID(const std::strin cmdBuilder.append("filter", filter); cmdBuilder.append("limit", 1); cmdBuilder.append("singleBatch", true); + if (readConcernObj) { + cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcernObj); + } BSONObj cmd = cmdBuilder.obj(); @@ -721,9 +752,17 @@ unique_ptr DBClientBase::query(const NamespaceStringOrUUID& nsOr int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { - unique_ptr c(new DBClientCursor( - this, nsOrUuid, query.obj, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize)); + int batchSize, + boost::optional readConcernObj) { + unique_ptr c(new DBClientCursor(this, + nsOrUuid, + query.obj, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize, + readConcernObj)); if (c->init()) return c; return nullptr; @@ -754,11 +793,13 @@ unsigned long long DBClientBase::query(std::function f, Query query, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { + int batchSize, + boost::optional readConcernObj) { DBClientFunConvertor fun; fun._f = f; std::function ptr(fun); - return this->query(ptr, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize); + return this->query( + ptr, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize, readConcernObj); } unsigned long long DBClientBase::query(std::function f, @@ -766,12 +807,13 @@ unsigned long long DBClientBase::query(std::function readConcernObj) { // mask options queryOptions &= (int)(QueryOption_NoCursorTimeout | QueryOption_SlaveOk); - unique_ptr c( - this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize)); + unique_ptr c(this->query( + nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize, readConcernObj)); // query() throws on network error so OK to uassert with numeric code here. uassert(16090, "socket error for mapping query", c.get()); @@ -785,34 +827,63 @@ unsigned long long DBClientBase::query(std::function{obj}, flags); +void DBClientBase::insert(const string& ns, + BSONObj obj, + int flags, + boost::optional writeConcernObj) { + insert(ns, std::vector{obj}, flags, writeConcernObj); } -void DBClientBase::insert(const string& ns, const vector& v, int flags) { +void DBClientBase::insert(const string& ns, + const vector& v, + int flags, + boost::optional writeConcernObj) { bool ordered = !(flags & InsertOption_ContinueOnError); auto nss = NamespaceString(ns); - auto request = - OpMsgRequest::fromDBAndBody(nss.db(), BSON("insert" << nss.coll() << "ordered" << ordered)); + BSONObjBuilder cmdBuilder; + cmdBuilder.append("insert", nss.coll()); + cmdBuilder.append("ordered", ordered); + if (writeConcernObj) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } + auto request = OpMsgRequest::fromDBAndBody(nss.db(), cmdBuilder.obj()); request.sequences.push_back({"documents", v}); runFireAndForgetCommand(std::move(request)); } -void DBClientBase::remove(const string& ns, Query obj, int flags) { +void DBClientBase::remove(const string& ns, + Query obj, + int flags, + boost::optional writeConcernObj) { int limit = (flags & RemoveOption_JustOne) ? 1 : 0; auto nss = NamespaceString(ns); - auto request = OpMsgRequest::fromDBAndBody(nss.db(), BSON("delete" << nss.coll())); + BSONObjBuilder cmdBuilder; + cmdBuilder.append("delete", nss.coll()); + if (writeConcernObj) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } + auto request = OpMsgRequest::fromDBAndBody(nss.db(), cmdBuilder.obj()); request.sequences.push_back({"deletes", {BSON("q" << obj.obj << "limit" << limit)}}); runFireAndForgetCommand(std::move(request)); } -void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upsert, bool multi) { +void DBClientBase::update(const string& ns, + Query query, + BSONObj obj, + bool upsert, + bool multi, + boost::optional writeConcernObj) { auto nss = NamespaceString(ns); - auto request = OpMsgRequest::fromDBAndBody(nss.db(), BSON("update" << nss.coll())); + BSONObjBuilder cmdBuilder; + cmdBuilder.append("update", nss.coll()); + if (writeConcernObj) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } + auto request = OpMsgRequest::fromDBAndBody(nss.db(), cmdBuilder.obj()); request.sequences.push_back( {"updates", {BSON("q" << query.obj << "u" << obj << "upsert" << upsert << "multi" << multi)}}); @@ -820,12 +891,17 @@ void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upser runFireAndForgetCommand(std::move(request)); } -void DBClientBase::update(const string& ns, Query query, BSONObj obj, int flags) { +void DBClientBase::update(const string& ns, + Query query, + BSONObj obj, + int flags, + boost::optional writeConcernObj) { update(ns, std::move(query), std::move(obj), flags & UpdateOption_Upsert, - flags & UpdateOption_Multi); + flags & UpdateOption_Multi, + writeConcernObj); } void DBClientBase::killCursor(const NamespaceString& ns, long long cursorId) { @@ -914,16 +990,24 @@ std::list DBClientBase::_getIndexSpecs(const NamespaceStringOrUUID& nsO } -void DBClientBase::dropIndex(const string& ns, BSONObj keys) { - dropIndex(ns, genIndexName(keys)); +void DBClientBase::dropIndex(const string& ns, + BSONObj keys, + boost::optional writeConcernObj) { + dropIndex(ns, genIndexName(keys), writeConcernObj); } -void DBClientBase::dropIndex(const string& ns, const string& indexName) { +void DBClientBase::dropIndex(const string& ns, + const string& indexName, + boost::optional writeConcernObj) { + BSONObjBuilder cmdBuilder; + cmdBuilder.append("dropIndexes", nsToCollectionSubstring(ns)); + cmdBuilder.append("index", indexName); + if (writeConcernObj) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } BSONObj info; - if (!runCommand(nsToDatabase(ns), - BSON("dropIndexes" << nsToCollectionSubstring(ns) << "index" << indexName), - info)) { + if (!runCommand(nsToDatabase(ns), cmdBuilder.obj(), info)) { LOGV2_DEBUG(20118, logSeverityV1toV2(_logLevel).toInt(), "dropIndex failed: {info}", @@ -932,14 +1016,15 @@ void DBClientBase::dropIndex(const string& ns, const string& indexName) { } } -void DBClientBase::dropIndexes(const string& ns) { +void DBClientBase::dropIndexes(const string& ns, boost::optional writeConcernObj) { + BSONObjBuilder cmdBuilder; + cmdBuilder.append("dropIndexes", nsToCollectionSubstring(ns)); + cmdBuilder.append("index", "*"); + if (writeConcernObj) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } BSONObj info; - uassert(10008, - "dropIndexes failed", - runCommand(nsToDatabase(ns), - BSON("dropIndexes" << nsToCollectionSubstring(ns) << "index" - << "*"), - info)); + uassert(10008, "dropIndexes failed", runCommand(nsToDatabase(ns), cmdBuilder.obj(), info)); } void DBClientBase::reIndex(const string& ns) { @@ -971,7 +1056,9 @@ string DBClientBase::genIndexName(const BSONObj& keys) { return ss.str(); } -void DBClientBase::createIndexes(StringData ns, const std::vector& descriptors) { +void DBClientBase::createIndexes(StringData ns, + const std::vector& descriptors, + boost::optional writeConcernObj) { BSONObjBuilder command; command.append("createIndexes", nsToCollectionSubstring(ns)); { @@ -980,6 +1067,9 @@ void DBClientBase::createIndexes(StringData ns, const std::vectortoBSON()); } } + if (writeConcernObj) { + command.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } const BSONObj commandObj = command.done(); BSONObj infoObj; @@ -990,7 +1080,9 @@ void DBClientBase::createIndexes(StringData ns, const std::vector& specs) { +void DBClientBase::createIndexes(StringData ns, + const std::vector& specs, + boost::optional writeConcernObj) { BSONObjBuilder command; command.append("createIndexes", nsToCollectionSubstring(ns)); { @@ -999,6 +1091,9 @@ void DBClientBase::createIndexes(StringData ns, const std::vector& spec indexes.append(spec); } } + if (writeConcernObj) { + command.append(WriteConcernOptions::kWriteConcernField, *writeConcernObj); + } const BSONObj commandObj = command.done(); BSONObj infoObj; diff --git a/src/mongo/client/dbclient_base.h b/src/mongo/client/dbclient_base.h index 5a3dfcfc766..5db09d9412e 100644 --- a/src/mongo/client/dbclient_base.h +++ b/src/mongo/client/dbclient_base.h @@ -73,27 +73,31 @@ std::string nsGetCollection(const std::string& ns); * them as "final" or "override" as appropriate. */ class DBClientQueryInterface { - virtual std::unique_ptr query(const NamespaceStringOrUUID& nsOrUuid, - Query query, - int nToReturn = 0, - int nToSkip = 0, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0) = 0; + virtual std::unique_ptr query( + const NamespaceStringOrUUID& nsOrUuid, + Query query, + int nToReturn = 0, + int nToSkip = 0, + const BSONObj* fieldsToReturn = nullptr, + int queryOptions = 0, + int batchSize = 0, + boost::optional readConcernObj = boost::none) = 0; virtual unsigned long long query(std::function f, const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn = nullptr, int queryOptions = 0, - int batchSize = 0) = 0; + int batchSize = 0, + boost::optional readConcernObj = boost::none) = 0; virtual unsigned long long query(std::function f, const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn = nullptr, int queryOptions = 0, - int batchSize = 0) = 0; + int batchSize = 0, + boost::optional readConcernObj = boost::none) = 0; }; /** @@ -119,7 +123,8 @@ public: virtual BSONObj findOne(const std::string& ns, const Query& query, const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0); + int queryOptions = 0, + boost::optional readConcernObj = boost::none); /** query N objects from the database into an array. makes sense mostly when you want a small * number of results. if a huge number, use query() and iterate the cursor. @@ -130,7 +135,8 @@ public: int nToReturn, int nToSkip = 0, const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0); + int queryOptions = 0, + boost::optional readConcernObj = boost::none); /** * @return a pair with a single object that matches the filter within the collection specified @@ -140,9 +146,11 @@ public: * the query, an empty BSONObj is returned. * @throws AssertionException */ - virtual std::pair findOneByUUID(const std::string& db, - UUID uuid, - const BSONObj& filter); + virtual std::pair findOneByUUID( + const std::string& db, + UUID uuid, + const BSONObj& filter, + boost::optional readConcernObj = boost::none); virtual std::string getServerAddress() const = 0; @@ -351,11 +359,12 @@ public: /** count number of objects in collection ns that match the query criteria specified throws UserAssertion if database returns an error */ - virtual long long count(NamespaceStringOrUUID nsOrUuid, + virtual long long count(const NamespaceStringOrUUID nsOrUuid, const BSONObj& query = BSONObj(), int options = 0, int limit = 0, - int skip = 0); + int skip = 0, + boost::optional readConcernObj = boost::none); static std::string createPasswordDigest(const std::string& username, const std::string& clearTextPassword); @@ -390,7 +399,8 @@ public: long long size = 0, bool capped = false, int max = 0, - BSONObj* info = nullptr); + BSONObj* info = nullptr, + boost::optional writeConcernObj = boost::none); /** Get error result from the last write operation (insert/update/delete) on this connection. db doesn't change the command's behavior - it is just for auth checks. @@ -483,8 +493,10 @@ public: * @param keys Document describing keys and index types. You must provide at least one * field and its direction. */ - void createIndex(StringData ns, const BSONObj& keys) { - return createIndex(ns, IndexSpec().addKeys(keys)); + void createIndex(StringData ns, + const BSONObj& keys, + boost::optional writeConcernObj = boost::none) { + return createIndex(ns, IndexSpec().addKeys(keys), writeConcernObj); } /** Create an index on the collection 'ns' as described by the given @@ -495,20 +507,26 @@ public: * @param descriptor Configuration object describing the index to create. The * descriptor must describe at least one key and index type. */ - virtual void createIndex(StringData ns, const IndexSpec& descriptor) { + virtual void createIndex(StringData ns, + const IndexSpec& descriptor, + boost::optional writeConcernObj = boost::none) { std::vector toBuild; toBuild.push_back(&descriptor); - createIndexes(ns, toBuild); + createIndexes(ns, toBuild, writeConcernObj); } - virtual void createIndexes(StringData ns, const std::vector& descriptor); + virtual void createIndexes(StringData ns, + const std::vector& descriptor, + boost::optional writeConcernObj = boost::none); /** * Creates indexes on the collection 'ns' as described by 'specs'. * * Failure to construct the indexes is reported by throwing an AssertionException. */ - virtual void createIndexes(StringData ns, const std::vector& specs); + virtual void createIndexes(StringData ns, + const std::vector& specs, + boost::optional writeConcernObj = boost::none); /** * Lists indexes on the collection 'nsOrUuid'. @@ -523,13 +541,18 @@ public: virtual std::list getReadyIndexSpecs(const NamespaceStringOrUUID& nsOrUuid, int options = 0); - virtual void dropIndex(const std::string& ns, BSONObj keys); - virtual void dropIndex(const std::string& ns, const std::string& indexName); + virtual void dropIndex(const std::string& ns, + BSONObj keys, + boost::optional writeConcernObj = boost::none); + virtual void dropIndex(const std::string& ns, + const std::string& indexName, + boost::optional writeConcernObj = boost::none); /** drops all indexes for the collection */ - virtual void dropIndexes(const std::string& ns); + virtual void dropIndexes(const std::string& ns, + boost::optional writeConcernObj = boost::none); virtual void reIndex(const std::string& ns); @@ -600,13 +623,15 @@ public: @return cursor. 0 if error (connection failure) @throws AssertionException */ - std::unique_ptr query(const NamespaceStringOrUUID& nsOrUuid, - Query query, - int nToReturn = 0, - int nToSkip = 0, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0) override; + std::unique_ptr query( + const NamespaceStringOrUUID& nsOrUuid, + Query query, + int nToReturn = 0, + int nToSkip = 0, + const BSONObj* fieldsToReturn = nullptr, + int queryOptions = 0, + int batchSize = 0, + boost::optional readConcernObj = boost::none) override; /** Uses QueryOption_Exhaust, when available and specified in 'queryOptions'. @@ -628,14 +653,16 @@ public: Query query, const BSONObj* fieldsToReturn = nullptr, int queryOptions = QueryOption_Exhaust, - int batchSize = 0) final; + int batchSize = 0, + boost::optional readConcernObj = boost::none) final; unsigned long long query(std::function f, const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn = nullptr, int queryOptions = QueryOption_Exhaust, - int batchSize = 0) override; + int batchSize = 0, + boost::optional readConcernObj = boost::none) override; /** don't use this - called automatically by DBClientCursor for you @@ -651,22 +678,39 @@ public: /** insert an object into the database */ - virtual void insert(const std::string& ns, BSONObj obj, int flags = 0); + virtual void insert(const std::string& ns, + BSONObj obj, + int flags = 0, + boost::optional writeConcernObj = boost::none); /** insert a vector of objects into the database */ - virtual void insert(const std::string& ns, const std::vector& v, int flags = 0); + virtual void insert(const std::string& ns, + const std::vector& v, + int flags = 0, + boost::optional writeConcernObj = boost::none); /** updates objects matching query */ - virtual void update( - const std::string& ns, Query query, BSONObj obj, bool upsert = false, bool multi = false); + virtual void update(const std::string& ns, + Query query, + BSONObj obj, + bool upsert = false, + bool multi = false, + boost::optional writeConcernObj = boost::none); - virtual void update(const std::string& ns, Query query, BSONObj obj, int flags); + virtual void update(const std::string& ns, + Query query, + BSONObj obj, + int flags, + boost::optional writeConcernObj = boost::none); - virtual void remove(const std::string& ns, Query query, int flags = 0); + virtual void remove(const std::string& ns, + Query query, + int flags = 0, + boost::optional writeConcernObj = boost::none); virtual bool isFailed() const = 0; @@ -716,7 +760,8 @@ protected: const BSONObj& query, int options, int limit, - int skip); + int skip, + boost::optional readConcernObj); /** * Look up the options available on this client. Caches the answer from diff --git a/src/mongo/client/dbclient_connection.cpp b/src/mongo/client/dbclient_connection.cpp index c7170733b1b..a5a6f3005ce 100644 --- a/src/mongo/client/dbclient_connection.cpp +++ b/src/mongo/client/dbclient_connection.cpp @@ -625,16 +625,18 @@ unsigned long long DBClientConnection::query(std::function readConcernObj) { if (!(queryOptions & QueryOption_Exhaust) || !(availableOptions() & QueryOption_Exhaust)) { - return DBClientBase::query(f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize); + return DBClientBase::query( + f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize, readConcernObj); } // mask options queryOptions &= (int)(QueryOption_NoCursorTimeout | QueryOption_SlaveOk | QueryOption_Exhaust); - unique_ptr c( - this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize)); + unique_ptr c(this->query( + nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize, readConcernObj)); // Note that this->query will throw for network errors, so it is OK to return a numeric // error code here. uassert(13386, "socket error for mapping query", c.get()); diff --git a/src/mongo/client/dbclient_connection.h b/src/mongo/client/dbclient_connection.h index 5d76701bc59..6c7a59b4995 100644 --- a/src/mongo/client/dbclient_connection.h +++ b/src/mongo/client/dbclient_connection.h @@ -151,16 +151,24 @@ public: */ void logout(const std::string& dbname, BSONObj& info) override; - std::unique_ptr query(const NamespaceStringOrUUID& nsOrUuid, - Query query = Query(), - int nToReturn = 0, - int nToSkip = 0, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0) override { + std::unique_ptr query( + const NamespaceStringOrUUID& nsOrUuid, + Query query = Query(), + int nToReturn = 0, + int nToSkip = 0, + const BSONObj* fieldsToReturn = nullptr, + int queryOptions = 0, + int batchSize = 0, + boost::optional readConcernObj = boost::none) override { checkConnection(); - return DBClientBase::query( - nsOrUuid, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); + return DBClientBase::query(nsOrUuid, + query, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize, + readConcernObj); } unsigned long long query(std::function f, @@ -168,7 +176,8 @@ public: Query query, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize = 0) override; + int batchSize = 0, + boost::optional readConcernObj = boost::none) override; using DBClientBase::runCommandWithTarget; std::pair runCommandWithTarget(OpMsgRequest request) override; diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp index 77d5c802eb7..60034c05514 100644 --- a/src/mongo/client/dbclient_cursor.cpp +++ b/src/mongo/client/dbclient_cursor.cpp @@ -153,6 +153,12 @@ Message DBClientCursor::_assembleInit() { // QueryRequest doesn't handle $readPreference. cmd = BSONObjBuilder(std::move(cmd)).append(readPref).obj(); } + if (!cmd.hasField(repl::ReadConcernArgs::kReadConcernFieldName) && _readConcernObj) { + cmd = BSONObjBuilder(std::move(cmd)) + .append(repl::ReadConcernArgs::kReadConcernFieldName, *_readConcernObj) + .obj(); + } + return assembleCommandRequest(_client, ns.db(), opts, std::move(cmd)); } // else use legacy OP_QUERY request. @@ -516,7 +522,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client, int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) + int batchSize, + boost::optional readConcernObj) : DBClientCursor(client, nsOrUuid, query, @@ -526,7 +533,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client, fieldsToReturn, queryOptions, batchSize, - {}) {} + {}, + readConcernObj) {} DBClientCursor::DBClientCursor(DBClientBase* client, const NamespaceStringOrUUID& nsOrUuid, @@ -543,7 +551,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client, nullptr, // fieldsToReturn queryOptions, 0, - std::move(initialBatch)) {} // batchSize + std::move(initialBatch), // batchSize + boost::none) {} DBClientCursor::DBClientCursor(DBClientBase* client, const NamespaceStringOrUUID& nsOrUuid, @@ -554,7 +563,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client, const BSONObj* fieldsToReturn, int queryOptions, int batchSize, - std::vector initialBatch) + std::vector initialBatch, + boost::optional readConcernObj) : batch{std::move(initialBatch)}, _client(client), _originalHost(_client->getServerAddress()), @@ -572,7 +582,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client, cursorId(cursorId), _ownCursor(true), wasError(false), - _enabledBSONVersion(Validator::enabledBSONVersion()) { + _enabledBSONVersion(Validator::enabledBSONVersion()), + _readConcernObj(readConcernObj) { if (queryOptions & QueryOptionLocal_forceOpQuery) { // Legacy OP_QUERY does not support UUIDs. invariant(!_nsOrUuid.uuid()); diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h index b0e1551802b..33fb9f2e3c9 100644 --- a/src/mongo/client/dbclient_cursor.h +++ b/src/mongo/client/dbclient_cursor.h @@ -152,7 +152,8 @@ public: int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int bs); + int bs, + boost::optional readConcernObj = boost::none); DBClientCursor(DBClientBase* client, const NamespaceStringOrUUID& nsOrUuid, @@ -273,7 +274,8 @@ private: const BSONObj* fieldsToReturn, int queryOptions, int bs, - std::vector initialBatch); + std::vector initialBatch, + boost::optional readConcernObj); int nextBatchSize(); @@ -307,6 +309,7 @@ private: boost::optional _term; boost::optional _lastKnownCommittedOpTime; boost::optional _postBatchResumeToken; + boost::optional _readConcernObj; void dataReceived(const Message& reply) { bool retry; diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp index 126b7d5f03b..d7744d2c841 100644 --- a/src/mongo/client/dbclient_rs.cpp +++ b/src/mongo/client/dbclient_rs.cpp @@ -522,20 +522,33 @@ void DBClientReplicaSet::logout(const string& dbname, BSONObj& info) { // ------------- simple functions ----------------- -void DBClientReplicaSet::insert(const string& ns, BSONObj obj, int flags) { - checkMaster()->insert(ns, obj, flags); +void DBClientReplicaSet::insert(const string& ns, + BSONObj obj, + int flags, + boost::optional writeConcernObj) { + checkMaster()->insert(ns, obj, flags, writeConcernObj); } -void DBClientReplicaSet::insert(const string& ns, const vector& v, int flags) { - checkMaster()->insert(ns, v, flags); +void DBClientReplicaSet::insert(const string& ns, + const vector& v, + int flags, + boost::optional writeConcernObj) { + checkMaster()->insert(ns, v, flags, writeConcernObj); } -void DBClientReplicaSet::remove(const string& ns, Query obj, int flags) { - checkMaster()->remove(ns, obj, flags); +void DBClientReplicaSet::remove(const string& ns, + Query obj, + int flags, + boost::optional writeConcernObj) { + checkMaster()->remove(ns, obj, flags, writeConcernObj); } -void DBClientReplicaSet::update(const string& ns, Query query, BSONObj obj, int flags) { - return checkMaster()->update(ns, query, obj, flags); +void DBClientReplicaSet::update(const string& ns, + Query query, + BSONObj obj, + int flags, + boost::optional writeConcernObj) { + return checkMaster()->update(ns, query, obj, flags, writeConcernObj); } unique_ptr DBClientReplicaSet::query(const NamespaceStringOrUUID& nsOrUuid, @@ -544,7 +557,8 @@ unique_ptr DBClientReplicaSet::query(const NamespaceStringOrUUID int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { + int batchSize, + boost::optional readConcernObj) { shared_ptr readPref(_extractReadPref(query.obj, queryOptions)); invariant(nsOrUuid.nss()); const string ns = nsOrUuid.nss()->ns(); @@ -573,8 +587,14 @@ unique_ptr DBClientReplicaSet::query(const NamespaceStringOrUUID break; } - unique_ptr cursor = conn->query( - nsOrUuid, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); + unique_ptr cursor = conn->query(nsOrUuid, + query, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize, + readConcernObj); return checkSlaveQueryResult(std::move(cursor)); } catch (const DBException& ex) { @@ -599,14 +619,21 @@ unique_ptr DBClientReplicaSet::query(const NamespaceStringOrUUID "dbclient_rs query to primary node in {getMonitor_getName}", "getMonitor_getName"_attr = _getMonitor()->getName()); - return checkMaster()->query( - nsOrUuid, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); + return checkMaster()->query(nsOrUuid, + query, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize, + readConcernObj); } BSONObj DBClientReplicaSet::findOne(const string& ns, const Query& query, const BSONObj* fieldsToReturn, - int queryOptions) { + int queryOptions, + boost::optional readConcernObj) { shared_ptr readPref(_extractReadPref(query.obj, queryOptions)); if (_isSecondaryQuery(ns, query.obj, *readPref)) { LOGV2_DEBUG(20135, @@ -633,7 +660,7 @@ BSONObj DBClientReplicaSet::findOne(const string& ns, break; } - return conn->findOne(ns, query, fieldsToReturn, queryOptions); + return conn->findOne(ns, query, fieldsToReturn, queryOptions, readConcernObj); } catch (const DBException& ex) { const Status status = ex.toStatus(str::stream() << "can't findone replica set node " << _lastSlaveOkHost.toString()); @@ -656,7 +683,7 @@ BSONObj DBClientReplicaSet::findOne(const string& ns, "dbclient_rs findOne to primary node in {getMonitor_getName}", "getMonitor_getName"_attr = _getMonitor()->getName()); - return checkMaster()->findOne(ns, query, fieldsToReturn, queryOptions); + return checkMaster()->findOne(ns, query, fieldsToReturn, queryOptions, readConcernObj); } void DBClientReplicaSet::killCursor(const NamespaceString& ns, long long cursorID) { diff --git a/src/mongo/client/dbclient_rs.h b/src/mongo/client/dbclient_rs.h index 48bb2c01c7e..9603842f7d9 100644 --- a/src/mongo/client/dbclient_rs.h +++ b/src/mongo/client/dbclient_rs.h @@ -89,29 +89,45 @@ public: // ----------- simple functions -------------- /** throws userassertion "no master found" */ - std::unique_ptr query(const NamespaceStringOrUUID& nsOrUuid, - Query query, - int nToReturn = 0, - int nToSkip = 0, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0) override; + std::unique_ptr query( + const NamespaceStringOrUUID& nsOrUuid, + Query query, + int nToReturn = 0, + int nToSkip = 0, + const BSONObj* fieldsToReturn = nullptr, + int queryOptions = 0, + int batchSize = 0, + boost::optional readConcernObj = boost::none) override; /** throws userassertion "no master found" */ BSONObj findOne(const std::string& ns, const Query& query, const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0) override; + int queryOptions = 0, + boost::optional readConcernObj = boost::none) override; - void insert(const std::string& ns, BSONObj obj, int flags = 0) override; + void insert(const std::string& ns, + BSONObj obj, + int flags = 0, + boost::optional writeConcernObj = boost::none) override; /** insert multiple objects. Note that single object insert is asynchronous, so this version is only nominally faster and not worth a special effort to try to use. */ - void insert(const std::string& ns, const std::vector& v, int flags = 0) override; - - void remove(const std::string& ns, Query obj, int flags) override; - - void update(const std::string& ns, Query query, BSONObj obj, int flags) override; + void insert(const std::string& ns, + const std::vector& v, + int flags = 0, + boost::optional writeConcernObj = boost::none) override; + + void remove(const std::string& ns, + Query obj, + int flags, + boost::optional writeConcernObj = boost::none) override; + + void update(const std::string& ns, + Query query, + BSONObj obj, + int flags, + boost::optional writeConcernObj = boost::none) override; void killCursor(const NamespaceString& ns, long long cursorID) override; diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index 4f444b9f755..c76f86163b3 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -58,6 +58,7 @@ #include "mongo/db/op_observer.h" #include "mongo/db/ops/insert.h" #include "mongo/db/repl/isself.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/durable_catalog.h" @@ -311,7 +312,9 @@ void Cloner::copy(OperationContext* opCtx, from_collection, query, nullptr, - options); + options, + 0 /* batchSize */, + repl::ReadConcernArgs::kImplicitDefault); } uassert(ErrorCodes::PrimarySteppedDown, diff --git a/src/mongo/db/dbdirectclient.cpp b/src/mongo/db/dbdirectclient.cpp index 84818dcf8c4..6c8a8d3d43e 100644 --- a/src/mongo/db/dbdirectclient.cpp +++ b/src/mongo/db/dbdirectclient.cpp @@ -166,15 +166,22 @@ unique_ptr DBDirectClient::query(const NamespaceStringOrUUID& ns int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { + int batchSize, + boost::optional readConcernObj) { + invariant(!readConcernObj, "passing readConcern to DBDirectClient functions is not supported"); return DBClientBase::query( nsOrUuid, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); } -long long DBDirectClient::count( - const NamespaceStringOrUUID nsOrUuid, const BSONObj& query, int options, int limit, int skip) { +long long DBDirectClient::count(const NamespaceStringOrUUID nsOrUuid, + const BSONObj& query, + int options, + int limit, + int skip, + boost::optional readConcernObj) { + invariant(!readConcernObj, "passing readConcern to DBDirectClient functions is not supported"); DirectClientScope directClientScope(_opCtx); - BSONObj cmdObj = _countCmd(nsOrUuid, query, options, limit, skip); + BSONObj cmdObj = _countCmd(nsOrUuid, query, options, limit, skip, boost::none); auto dbName = (nsOrUuid.uuid() ? nsOrUuid.dbname() : (*nsOrUuid.nss()).db().toString()); diff --git a/src/mongo/db/dbdirectclient.h b/src/mongo/db/dbdirectclient.h index 9ff0dc9cd80..0554334d6fd 100644 --- a/src/mongo/db/dbdirectclient.h +++ b/src/mongo/db/dbdirectclient.h @@ -58,13 +58,15 @@ public: // XXX: is this valid or useful? void setOpCtx(OperationContext* opCtx); - virtual std::unique_ptr query(const NamespaceStringOrUUID& nsOrUuid, - Query query, - int nToReturn = 0, - int nToSkip = 0, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0); + virtual std::unique_ptr query( + const NamespaceStringOrUUID& nsOrUuid, + Query query, + int nToReturn = 0, + int nToSkip = 0, + const BSONObj* fieldsToReturn = nullptr, + int queryOptions = 0, + int batchSize = 0, + boost::optional readConcernObj = boost::none); virtual bool isFailed() const; @@ -85,7 +87,8 @@ public: const BSONObj& query = BSONObj(), int options = 0, int limit = 0, - int skip = 0); + int skip = 0, + boost::optional readConcernObj = boost::none); virtual ConnectionString::ConnectionType type() const; diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 671529d5cac..5bde04b25a8 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -59,18 +59,6 @@ using write_ops::Insert; using write_ops::Update; using write_ops::UpdateOpEntry; -namespace { - -// Attaches the write concern to the given batch request. If it looks like 'writeConcern' has -// been default initialized to {w: 0, wtimeout: 0} then we do not bother attaching it. -void attachWriteConcern(const WriteConcernOptions& writeConcern, BatchedCommandRequest* request) { - if (!writeConcern.wMode.empty() || writeConcern.wNumNodes > 0) { - request->setWriteConcern(writeConcern.toBSON()); - } -} - -} // namespace - bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); Lock::CollectionLock collLock(opCtx, nss, MODE_IS); @@ -123,8 +111,7 @@ Status ShardServerProcessInterface::insert(const boost::intrusive_ptrbypassDocumentValidation)); - // If applicable, attach a write concern to the batched command request. - attachWriteConcern(wc, &insertCommand); + insertCommand.setWriteConcern(wc.toBSON()); ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response, targetEpoch); @@ -144,8 +131,7 @@ StatusWith ShardServerProcessInterface::upd BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); - // If applicable, attach a write concern to the batched command request. - attachWriteConcern(wc, &updateCommand); + updateCommand.setWriteConcern(wc.toBSON()); ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response, targetEpoch); @@ -176,10 +162,8 @@ void ShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( auto newCmdObj = CommonMongodProcessInterface::_convertRenameToInternalRename( opCtx, renameCommandObj, originalCollectionOptions, originalIndexes); BSONObjBuilder newCmdWithWriteConcernBuilder(std::move(newCmdObj)); - if (!opCtx->getWriteConcern().usedDefault) { - newCmdWithWriteConcernBuilder.append(WriteConcernOptions::kWriteConcernField, - opCtx->getWriteConcern().toBSON()); - } + newCmdWithWriteConcernBuilder.append(WriteConcernOptions::kWriteConcernField, + opCtx->getWriteConcern().toBSON()); newCmdObj = newCmdWithWriteConcernBuilder.done(); auto cachedDbInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, destinationNs.db())); @@ -273,12 +257,10 @@ void ShardServerProcessInterface::createCollection(OperationContext* opCtx, const BSONObj& cmdObj) { auto cachedDbInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); - BSONObj finalCmdObj = cmdObj; - if (!opCtx->getWriteConcern().usedDefault) { - auto writeObj = - BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON()); - finalCmdObj = cmdObj.addField(writeObj.getField(WriteConcernOptions::kWriteConcernField)); - } + BSONObjBuilder finalCmdBuilder(cmdObj); + finalCmdBuilder.append(WriteConcernOptions::kWriteConcernField, + opCtx->getWriteConcern().toBSON()); + BSONObj finalCmdObj = finalCmdBuilder.obj(); auto response = executeCommandAgainstDatabasePrimary(opCtx, dbName, @@ -303,10 +285,8 @@ void ShardServerProcessInterface::createIndexesOnEmptyCollection( BSONObjBuilder newCmdBuilder; newCmdBuilder.append("createIndexes", ns.coll()); newCmdBuilder.append("indexes", indexSpecs); - if (!opCtx->getWriteConcern().usedDefault) { - newCmdBuilder.append(WriteConcernOptions::kWriteConcernField, - opCtx->getWriteConcern().toBSON()); - } + newCmdBuilder.append(WriteConcernOptions::kWriteConcernField, + opCtx->getWriteConcern().toBSON()); auto cmdObj = newCmdBuilder.done(); auto response = executeCommandAgainstDatabasePrimary(opCtx, @@ -332,10 +312,8 @@ void ShardServerProcessInterface::dropCollection(OperationContext* opCtx, uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db())); BSONObjBuilder newCmdBuilder; newCmdBuilder.append("drop", ns.coll()); - if (!opCtx->getWriteConcern().usedDefault) { - newCmdBuilder.append(WriteConcernOptions::kWriteConcernField, - opCtx->getWriteConcern().toBSON()); - } + newCmdBuilder.append(WriteConcernOptions::kWriteConcernField, + opCtx->getWriteConcern().toBSON()); auto cmdObj = newCmdBuilder.done(); auto response = executeCommandAgainstDatabasePrimary(opCtx, diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 00bef143bb2..eb656343911 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -108,13 +108,16 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptropCtx, - expCtx->mongoProcessInterface->taskExecutor, - aggReq.getNamespaceString(), - ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, - {{configShard->getId(), aggReq.serializeToCommandObj().toBson()}}, - false); + auto cmdObjWithRWC = applyReadWriteConcern(expCtx->opCtx, + true, /* appendRC */ + !expCtx->explain, /* appendWC */ + aggReq.serializeToCommandObj().toBson()); + auto configCursor = establishCursors(expCtx->opCtx, + expCtx->mongoProcessInterface->taskExecutor, + aggReq.getNamespaceString(), + ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, + {{configShard->getId(), cmdObjWithRWC}}, + false); invariant(configCursor.size() == 1); return std::move(*configCursor.begin()); } diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp index cd308aae77a..dd80516535c 100644 --- a/src/mongo/db/query/query_request.cpp +++ b/src/mongo/db/query/query_request.cpp @@ -835,6 +835,10 @@ Status QueryRequest::init(int ntoskip, } else { _filter = queryObj.getOwned(); } + // It's not possible to specify readConcern in a legacy query message, so initialize it to + // an empty readConcern object, ie. equivalent to `readConcern: {}`. This ensures that + // mongos passes this empty readConcern to shards. + _readConcern = BSONObj(); } else { // This is the debugging code path. _filter = queryObj.getOwned(); diff --git a/src/mongo/db/read_concern_mongod.cpp b/src/mongo/db/read_concern_mongod.cpp index 54588856e58..dbe4ba193cf 100644 --- a/src/mongo/db/read_concern_mongod.cpp +++ b/src/mongo/db/read_concern_mongod.cpp @@ -180,7 +180,9 @@ Status makeNoopWriteIfNeeded(OperationContext* opCtx, LogicalTime clusterTime) { "admin", BSON("appendOplogNote" << 1 << "maxClusterTime" << clusterTime.asTimestamp() << "data" - << BSON("noop write for afterClusterTime read concern" << 1)), + << BSON("noop write for afterClusterTime read concern" << 1) + << WriteConcernOptions::kWriteConcernField + << WriteConcernOptions::kImplicitDefault), Shard::RetryPolicy::kIdempotent); status = swRes.getStatus(); std::get<1>(myWriteRequest)->set(status); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 3460b58a465..ad051fa0a83 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -896,6 +896,7 @@ env.Library( LIBDEPS=[ 'oplog_entry', 'optime', + 'read_concern_args', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/client/fetcher', '$BUILD_DIR/mongo/db/namespace_string', diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 458487ab754..2cb89055f4b 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -132,7 +132,12 @@ BaseCloner::AfterStageBehavior CollectionCloner::CollectionClonerStage::run() { } BaseCloner::AfterStageBehavior CollectionCloner::countStage() { - auto count = getClient()->count(_sourceDbAndUuid, {} /* Query */, QueryOption_SlaveOk); + auto count = getClient()->count(_sourceDbAndUuid, + {} /* Query */, + QueryOption_SlaveOk, + 0 /* limit */, + 0 /* skip */, + ReadConcernArgs::kImplicitDefault); // The count command may return a negative value after an unclean shutdown, // so we set it to zero here to avoid aborting the collection clone. @@ -233,7 +238,8 @@ void CollectionCloner::runQuery() { nullptr /* fieldsToReturn */, QueryOption_NoCursorTimeout | QueryOption_SlaveOk | (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0), - _collectionClonerBatchSize); + _collectionClonerBatchSize, + ReadConcernArgs::kImplicitDefault); } catch (...) { auto status = exceptionToStatus(); diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 6bc07e723b7..39ad438a288 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -1736,7 +1736,8 @@ void InitialSyncer::_finishCallback(StatusWith lastApplied) { Status InitialSyncer::_scheduleLastOplogEntryFetcher_inlock( Fetcher::CallbackFn callback, LastOplogEntryFetcherRetryStrategy retryStrategy) { BSONObj query = BSON("find" << _opts.remoteOplogNS.coll() << "sort" << BSON("$natural" << -1) - << "limit" << 1); + << "limit" << 1 << ReadConcernArgs::kReadConcernFieldName + << ReadConcernArgs::kImplicitDefault); _lastOplogEntryFetcher = std::make_unique( _exec, diff --git a/src/mongo/db/repl/oplog_interface_remote.cpp b/src/mongo/db/repl/oplog_interface_remote.cpp index a71aae7cb15..883596318b9 100644 --- a/src/mongo/db/repl/oplog_interface_remote.cpp +++ b/src/mongo/db/repl/oplog_interface_remote.cpp @@ -34,6 +34,7 @@ #include "mongo/client/dbclient_base.h" #include "mongo/client/dbclient_cursor.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/util/str.h" namespace mongo { @@ -83,8 +84,14 @@ std::unique_ptr OplogInterfaceRemote::makeIterator() c const Query query = Query().sort(BSON("$natural" << -1)); const BSONObj fields = BSON("ts" << 1 << "t" << 1); return std::unique_ptr( - new OplogIteratorRemote(_getConnection()->query( - NamespaceString(_collectionName), query, 0, 0, &fields, 0, _batchSize))); + new OplogIteratorRemote(_getConnection()->query(NamespaceString(_collectionName), + query, + 0, + 0, + &fields, + 0, + _batchSize, + ReadConcernArgs::kImplicitDefault))); } std::unique_ptr diff --git a/src/mongo/db/repl/read_concern_args.cpp b/src/mongo/db/repl/read_concern_args.cpp index 2ef071a38cc..0c446ce4780 100644 --- a/src/mongo/db/repl/read_concern_args.cpp +++ b/src/mongo/db/repl/read_concern_args.cpp @@ -57,6 +57,11 @@ const ReadConcernArgs& ReadConcernArgs::get(const OperationContext* opCtx) { } +// The "kImplicitDefault" read concern, used by internal operations, is deliberately empty (no +// 'level' specified). This allows internal operations to specify a read concern, while still +// allowing it to be either local or available on sharded secondaries. +const BSONObj ReadConcernArgs::kImplicitDefault; + ReadConcernArgs::ReadConcernArgs() : _specified(false) {} ReadConcernArgs::ReadConcernArgs(boost::optional level) diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h index 1adba9777fc..46353040528 100644 --- a/src/mongo/db/repl/read_concern_args.h +++ b/src/mongo/db/repl/read_concern_args.h @@ -55,6 +55,8 @@ public: static constexpr StringData kAtClusterTimeFieldName = "atClusterTime"_sd; static constexpr StringData kLevelFieldName = "level"_sd; + static const BSONObj kImplicitDefault; + /** * Represents the internal mechanism an operation uses to satisfy 'majority' read concern. */ diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index fd8ce22e8d0..bf33e4334d7 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -185,9 +185,15 @@ TopologyVersion appendReplicationInfo(OperationContext* opCtx, DBClientConnection* cliConn = dynamic_cast(&conn.conn()); if (cliConn && replAuthenticate(cliConn).isOK()) { BSONObj first = conn->findOne((string) "local.oplog.$" + sourcename, - Query().sort(BSON("$natural" << 1))); + Query().sort(BSON("$natural" << 1)), + nullptr /* fieldsToReturn */, + 0 /* queryOptions */, + ReadConcernArgs::kImplicitDefault); BSONObj last = conn->findOne((string) "local.oplog.$" + sourcename, - Query().sort(BSON("$natural" << -1))); + Query().sort(BSON("$natural" << -1)), + nullptr /* fieldsToReturn */, + 0 /* queryOptions */, + ReadConcernArgs::kImplicitDefault); bb.appendDate("masterFirst", first["ts"].timestampTime()); bb.appendDate("masterLast", last["ts"].timestampTime()); const auto lag = (last["ts"].timestampTime() - s["syncedTo"].timestampTime()); diff --git a/src/mongo/db/repl/roll_back_local_operations_test.cpp b/src/mongo/db/repl/roll_back_local_operations_test.cpp index f51e5404d4b..6862dbf20bb 100644 --- a/src/mongo/db/repl/roll_back_local_operations_test.cpp +++ b/src/mongo/db/repl/roll_back_local_operations_test.cpp @@ -326,7 +326,8 @@ public: int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) override { + int batchSize, + boost::optional readConcernObj) override { if (_initFailuresLeft > 0) { _initFailuresLeft--; LOGV2(21657, diff --git a/src/mongo/db/repl/rollback_source_impl.cpp b/src/mongo/db/repl/rollback_source_impl.cpp index 8d06503b597..bc111c75397 100644 --- a/src/mongo/db/repl/rollback_source_impl.cpp +++ b/src/mongo/db/repl/rollback_source_impl.cpp @@ -35,6 +35,7 @@ #include "mongo/db/cloner.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/replication_auth.h" #include "mongo/util/assert_util.h" #include "mongo/util/str.h" @@ -68,19 +69,21 @@ int RollbackSourceImpl::getRollbackId() const { BSONObj RollbackSourceImpl::getLastOperation() const { const Query query = Query().sort(BSON("$natural" << -1)); - return _getConnection()->findOne(_collectionName, query, nullptr, QueryOption_SlaveOk); + return _getConnection()->findOne( + _collectionName, query, nullptr, QueryOption_SlaveOk, ReadConcernArgs::kImplicitDefault); } BSONObj RollbackSourceImpl::findOne(const NamespaceString& nss, const BSONObj& filter) const { return _getConnection() - ->findOne(nss.toString(), filter, nullptr, QueryOption_SlaveOk) + ->findOne( + nss.toString(), filter, nullptr, QueryOption_SlaveOk, ReadConcernArgs::kImplicitDefault) .getOwned(); } std::pair RollbackSourceImpl::findOneByUUID(const std::string& db, UUID uuid, const BSONObj& filter) const { - return _getConnection()->findOneByUUID(db, uuid, filter); + return _getConnection()->findOneByUUID(db, uuid, filter, ReadConcernArgs::kImplicitDefault); } void RollbackSourceImpl::copyCollectionFromRemote(OperationContext* opCtx, diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index a7e289e3996..e75c1782192 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -173,9 +173,7 @@ std::unique_ptr SyncSourceResolver::_makeFirstOplogEntryFetcher( << "projection" << BSON(OplogEntryBase::kTimestampFieldName << 1 << OplogEntryBase::kTermFieldName << 1) - << "readConcern" - << BSON("level" - << "local")), + << ReadConcernArgs::kReadConcernFieldName << ReadConcernArgs::kImplicitDefault), [=](const StatusWith& response, Fetcher::NextAction*, BSONObjBuilder*) { @@ -198,9 +196,7 @@ std::unique_ptr SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndP BSON("find" << kLocalOplogNss.coll() << "oplogReplay" << true << "filter" << BSON("ts" << BSON("$gte" << _requiredOpTime.getTimestamp() << "$lte" << _requiredOpTime.getTimestamp())) - << "readConcern" - << BSON("level" - << "local")), + << ReadConcernArgs::kReadConcernFieldName << ReadConcernArgs::kImplicitDefault), [=](const StatusWith& response, Fetcher::NextAction*, BSONObjBuilder*) { diff --git a/src/mongo/db/s/add_shard_cmd.cpp b/src/mongo/db/s/add_shard_cmd.cpp index c2e8e3684a2..aab1f4d7b79 100644 --- a/src/mongo/db/s/add_shard_cmd.cpp +++ b/src/mongo/db/s/add_shard_cmd.cpp @@ -79,7 +79,7 @@ public: private: bool supportsWriteConcern() const override { - return true; + return false; } // The command parameter happens to be string so it's historically been interpreted diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 4a1d614d34d..100855905b7 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -469,6 +469,17 @@ shared_ptr> MigrationManager::_schedule( waitForDelete, migrateInfo.forceJumbo); + // In 4.4, commands sent to shards that accept writeConcern, must always have writeConcern. + // So if the MoveChunkRequest didn't add writeConcern (from secondaryThrottle), then we add + // the implicit server default writeConcern. + if (!builder.hasField(WriteConcernOptions::kWriteConcernField) && + serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44) { + builder.append(WriteConcernOptions::kWriteConcernField, + WriteConcernOptions::kImplicitDefault); + } + stdx::lock_guard lock(_mutex); if (_state != State::kEnabled && _state != State::kRecovering) { diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 3f4976497fa..00a63ba4f49 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -296,6 +296,17 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx, _shardKeyPattern.toBSON(), _args.getSecondaryThrottle()); + if (serverGlobalParams.featureCompatibility.isVersion( + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44)) { + // In 4.4, commands sent to shards that accept writeConcern, must always have writeConcern. + // So if the StartChunkCloneRequest didn't add writeConcern (from secondaryThrottle), then + // we add the implicit server default writeConcern. + if (!cmdBuilder.hasField(WriteConcernOptions::kWriteConcernField)) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, + WriteConcernOptions::kImplicitDefault); + } + } + auto startChunkCloneResponseStatus = _callRecipient(cmdBuilder.obj()); if (!startChunkCloneResponseStatus.isOK()) { return startChunkCloneResponseStatus.getStatus(); @@ -367,6 +378,17 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { _shardKeyPattern.toBSON(), _args.getSecondaryThrottle()); + if (serverGlobalParams.featureCompatibility.isVersion( + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44)) { + // In 4.4, commands sent to shards that accept writeConcern, must always have writeConcern. + // So if the StartChunkCloneRequest didn't add writeConcern (from secondaryThrottle), then + // we add the implicit server default writeConcern. + if (!cmdBuilder.hasField(WriteConcernOptions::kWriteConcernField)) { + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, + WriteConcernOptions::kImplicitDefault); + } + } + auto startChunkCloneResponseStatus = _callRecipient(cmdBuilder.obj()); if (!startChunkCloneResponseStatus.isOK()) { return startChunkCloneResponseStatus.getStatus(); diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 5752e748879..e97ac91342f 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -41,6 +41,7 @@ #include "mongo/client/query.h" #include "mongo/db/catalog/collection_catalog_helper.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/commands.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/logical_session_cache.h" #include "mongo/db/namespace_string.h" @@ -693,7 +694,8 @@ void ensureChunkVersionIsGreaterThan(OperationContext* opCtx, newOpCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", - ensureChunkVersionIsGreaterThanRequestBSON, + CommandHelpers::appendMajorityWriteConcern( + ensureChunkVersionIsGreaterThanRequestBSON), Shard::RetryPolicy::kIdempotent); const auto ensureChunkVersionIsGreaterThanStatus = Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse); diff --git a/src/mongo/db/s/shard_key_util.cpp b/src/mongo/db/s/shard_key_util.cpp index 906757d353c..62c67614e2e 100644 --- a/src/mongo/db/s/shard_key_util.cpp +++ b/src/mongo/db/s/shard_key_util.cpp @@ -167,7 +167,8 @@ void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx, } } - auto countCmd = BSON("count" << nss.coll()); + auto countCmd = BSON("count" << nss.coll() << repl::ReadConcernArgs::kReadConcernFieldName + << repl::ReadConcernArgs::kImplicitDefault); auto countRes = uassertStatusOK(primaryShard->runCommand(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index 4e0e121abaf..b883bebf76c 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -40,6 +40,7 @@ #include "mongo/db/create_indexes_gen.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/write_ops/batched_command_response.h" @@ -234,7 +235,10 @@ LogicalSessionIdSet SessionsCollection::_doFindRemoved( LogicalSessionIdSet removed{sessions.begin(), sessions.end()}; auto wrappedSend = [&](BSONObj batch) { - auto swBatchResult = send(batch); + BSONObjBuilder batchWithReadConcernLocal(batch); + batchWithReadConcernLocal.append(repl::ReadConcernArgs::kReadConcernFieldName, + repl::ReadConcernArgs::kImplicitDefault); + auto swBatchResult = send(batchWithReadConcernLocal.obj()); auto result = SessionsCollectionFetchResult::parse("SessionsCollectionFetchResult"_sd, swBatchResult); @@ -279,7 +283,9 @@ BSONObj SessionsCollection::generateCreateIndexesCmd() { createIndexes.setCreateIndexes(NamespaceString::kLogicalSessionsNamespace.coll()); createIndexes.setIndexes(std::move(indexes)); - return createIndexes.toBSON(); + return BSONObjBuilder(createIndexes.toBSON()) + .append(WriteConcernOptions::kWriteConcernField, WriteConcernOptions::kImplicitDefault) + .obj(); } BSONObj SessionsCollection::generateCollModCmd() { @@ -292,6 +298,8 @@ BSONObj SessionsCollection::generateCollModCmd() { indexBuilder << "expireAfterSeconds" << localLogicalSessionTimeoutMinutes * 60; indexBuilder.done(); + collModCmdBuilder.append(WriteConcernOptions::kWriteConcernField, + WriteConcernOptions::kImplicitDefault); collModCmdBuilder.done(); return collModCmdBuilder.obj(); diff --git a/src/mongo/db/write_concern_options.cpp b/src/mongo/db/write_concern_options.cpp index 298941fbdef..7ff06c9d224 100644 --- a/src/mongo/db/write_concern_options.cpp +++ b/src/mongo/db/write_concern_options.cpp @@ -72,6 +72,12 @@ const BSONObj WriteConcernOptions::Default = BSONObj(); const BSONObj WriteConcernOptions::Acknowledged(BSON("w" << W_NORMAL)); const BSONObj WriteConcernOptions::Unacknowledged(BSON("w" << W_NONE)); const BSONObj WriteConcernOptions::Majority(BSON("w" << WriteConcernOptions::kMajority)); + +// The "kImplicitDefault" write concern, used by internal operations, is deliberately empty (no +// 'w' or 'wtimeout' specified). This allows internal operations to specify a write concern, while +// still allowing it to be either w:1 or automatically upconverted to w:majority on configsvrs. +const BSONObj WriteConcernOptions::kImplicitDefault; + constexpr Seconds WriteConcernOptions::kWriteConcernTimeoutSystem; constexpr Seconds WriteConcernOptions::kWriteConcernTimeoutMigration; constexpr Seconds WriteConcernOptions::kWriteConcernTimeoutSharding; diff --git a/src/mongo/db/write_concern_options.h b/src/mongo/db/write_concern_options.h index 07cd6a6e1d7..0d9cca66889 100644 --- a/src/mongo/db/write_concern_options.h +++ b/src/mongo/db/write_concern_options.h @@ -54,6 +54,7 @@ public: static const BSONObj Unacknowledged; static const BSONObj Majority; static const BSONObj ConfigMajority; + static const BSONObj kImplicitDefault; static constexpr StringData kWriteConcernField = "writeConcern"_sd; static const char kMajority[]; // = "majority" diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp index 3bb3a845d3e..b360a283e2d 100644 --- a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp +++ b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp @@ -105,7 +105,8 @@ std::unique_ptr MockDBClientConnection::query( int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { + int batchSize, + boost::optional readConcernObj) { checkConnection(); try { @@ -117,7 +118,8 @@ std::unique_ptr MockDBClientConnection::query( nToSkip, fieldsToReturn, queryOptions, - batchSize)); + batchSize, + readConcernObj)); BSONArray resultsInCursor; @@ -189,26 +191,37 @@ unsigned long long MockDBClientConnection::query( mongo::Query query, const mongo::BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { - return DBClientBase::query(f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize); + int batchSize, + boost::optional readConcernObj) { + return DBClientBase::query( + f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize, readConcernObj); } uint64_t MockDBClientConnection::getSockCreationMicroSec() const { return _sockCreationTime; } -void MockDBClientConnection::insert(const string& ns, BSONObj obj, int flags) { +void MockDBClientConnection::insert(const string& ns, + BSONObj obj, + int flags, + boost::optional writeConcernObj) { invariant(_remoteServer); _remoteServer->insert(ns, obj, flags); } -void MockDBClientConnection::insert(const string& ns, const vector& objList, int flags) { +void MockDBClientConnection::insert(const string& ns, + const vector& objList, + int flags, + boost::optional writeConcernObj) { for (vector::const_iterator iter = objList.begin(); iter != objList.end(); ++iter) { insert(ns, *iter, flags); } } -void MockDBClientConnection::remove(const string& ns, Query query, int flags) { +void MockDBClientConnection::remove(const string& ns, + Query query, + int flags, + boost::optional writeConcernObj) { invariant(_remoteServer); _remoteServer->remove(ns, query, flags); } diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.h b/src/mongo/dbtests/mock/mock_dbclient_connection.h index 5a8d9fa727e..4a7c4891db6 100644 --- a/src/mongo/dbtests/mock/mock_dbclient_connection.h +++ b/src/mongo/dbtests/mock/mock_dbclient_connection.h @@ -118,21 +118,32 @@ public: using DBClientBase::runCommandWithTarget; std::pair runCommandWithTarget(OpMsgRequest request) override; - std::unique_ptr query(const NamespaceStringOrUUID& nsOrUuid, - mongo::Query query = mongo::Query(), - int nToReturn = 0, - int nToSkip = 0, - const mongo::BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0) override; + std::unique_ptr query( + const NamespaceStringOrUUID& nsOrUuid, + mongo::Query query = mongo::Query(), + int nToReturn = 0, + int nToSkip = 0, + const mongo::BSONObj* fieldsToReturn = nullptr, + int queryOptions = 0, + int batchSize = 0, + boost::optional readConcernObj = boost::none) override; uint64_t getSockCreationMicroSec() const override; - void insert(const std::string& ns, BSONObj obj, int flags = 0) override; + void insert(const std::string& ns, + BSONObj obj, + int flags = 0, + boost::optional writeConcernObj = boost::none) override; - void insert(const std::string& ns, const std::vector& objList, int flags = 0) override; + void insert(const std::string& ns, + const std::vector& objList, + int flags = 0, + boost::optional writeConcernObj = boost::none) override; - void remove(const std::string& ns, Query query, int flags = 0) override; + void remove(const std::string& ns, + Query query, + int flags = 0, + boost::optional writeConcernObj = boost::none) override; bool call(mongo::Message& toSend, mongo::Message& response, @@ -175,7 +186,8 @@ public: mongo::Query query, const mongo::BSONObj* fieldsToReturn = nullptr, int queryOptions = 0, - int batchSize = 0) override; + int batchSize = 0, + boost::optional readConcernObj = boost::none) override; // // Unsupported methods (these are pure virtuals in the base class) diff --git a/src/mongo/dbtests/mock/mock_remote_db_server.cpp b/src/mongo/dbtests/mock/mock_remote_db_server.cpp index 0f8c0f69d4d..9e37c54c23e 100644 --- a/src/mongo/dbtests/mock/mock_remote_db_server.cpp +++ b/src/mongo/dbtests/mock/mock_remote_db_server.cpp @@ -181,7 +181,8 @@ mongo::BSONArray MockRemoteDBServer::query(MockRemoteDBServer::InstanceID id, int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, - int batchSize) { + int batchSize, + boost::optional readConcernObj) { checkIfUp(id); if (_delayMilliSec > 0) { diff --git a/src/mongo/dbtests/mock/mock_remote_db_server.h b/src/mongo/dbtests/mock/mock_remote_db_server.h index db6361ac915..e346923acf2 100644 --- a/src/mongo/dbtests/mock/mock_remote_db_server.h +++ b/src/mongo/dbtests/mock/mock_remote_db_server.h @@ -168,7 +168,8 @@ public: int nToSkip = 0, const mongo::BSONObj* fieldsToReturn = nullptr, int queryOptions = 0, - int batchSize = 0); + int batchSize = 0, + boost::optional readConcernObj = boost::none); // // Getters diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index ff5c2d8ef4a..4e2bdcbac76 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -69,7 +69,7 @@ std::unique_ptr parseCmdObjectToQueryRequest(OperationContext* opC // operation in a transaction, or not running in a transaction, then use the readConcern // from the opCtx (which may be a cluster-wide default). const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - qr->setReadConcern(readConcernArgs.toBSON()["readConcern"].Obj()); + qr->setReadConcern(readConcernArgs.toBSONInner()); } } uassert( diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 823360ac898..d7fc3224347 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -467,6 +467,30 @@ private: BSONObjBuilder& result) const { BatchWriteExecStats stats; BatchedCommandResponse response; + + // The batched request will only have WC if it was supplied by the client. Otherwise, the + // batched request should use the WC from the opCtx. + if (!batchedRequest.hasWriteConcern()) { + if (opCtx->getWriteConcern().usedDefault) { + // Pass writeConcern: {}, rather than {w: 1, wtimeout: 0}, so as to not override the + // configsvr w:majority upconvert. + batchedRequest.setWriteConcern(BSONObj()); + } else { + batchedRequest.setWriteConcern(opCtx->getWriteConcern().toBSON()); + } + } + + // Write ops are never allowed to have writeConcern inside transactions. Normally + // disallowing WC on non-terminal commands in a transaction is handled earlier, during + // command dispatch. However, if this is a regular write operation being automatically + // retried inside a transaction (such as changing a document's shard key across shards), + // then batchedRequest will have a writeConcern (added by the if() above) from when it was + // initially run outside a transaction. Thus it's necessary to unconditionally clear the + // writeConcern when in a transaction. + if (TransactionRouter::get(opCtx)) { + batchedRequest.unsetWriteConcern(); + } + ClusterWriter::write(opCtx, batchedRequest, &stats, &response); bool updatedShardKey = false; diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 0988e6fff7f..aeeeb965b33 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -146,7 +146,10 @@ BSONObj createCommandForMergingShard(Document serializedCommand, mergeCmd.remove("readConcern"); } - return mergeCmd.freeze().toBson(); + return applyReadWriteConcern(mergeCtx->opCtx, + !(txnRouter && mergingShardContributesData), /* appendRC */ + !mergeCtx->explain, /* appendWC */ + mergeCmd.freeze().toBson()); } Status dispatchMergingPipeline(const boost::intrusive_ptr& expCtx, @@ -389,7 +392,10 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( expCtx, serializedCommand, consumerPipelines.back(), boost::none, false); requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx], - consumerCmdObj); + applyReadWriteConcern(opCtx, + true, /* appendRC */ + !expCtx->explain, /* appendWC */ + consumerCmdObj)); } auto cursors = establishCursors(opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), diff --git a/src/mongo/s/request_types/merge_chunk_request_test.cpp b/src/mongo/s/request_types/merge_chunk_request_test.cpp index 94a7bf68511..88d5d165148 100644 --- a/src/mongo/s/request_types/merge_chunk_request_test.cpp +++ b/src/mongo/s/request_types/merge_chunk_request_test.cpp @@ -61,13 +61,13 @@ TEST(MergeChunkRequest, ConfigCommandtoBSON) { << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)) << "shard" << "shard0000" << "validAfter" << Timestamp{100}); - BSONObj writeConcernObj = BSON("writeConcern" << BSON("w" - << "majority")); + BSONObj writeConcernObj = BSON("w" + << "majority"); BSONObjBuilder cmdBuilder; { cmdBuilder.appendElements(serializedRequest); - cmdBuilder.appendElements(writeConcernObj); + cmdBuilder.append("writeConcern", writeConcernObj); } auto request = assertGet(MergeChunkRequest::parseFromConfigCommand(serializedRequest)); diff --git a/src/mongo/s/request_types/merge_chunk_request_type.cpp b/src/mongo/s/request_types/merge_chunk_request_type.cpp index 4bd9844578b..387bacc16b0 100644 --- a/src/mongo/s/request_types/merge_chunk_request_type.cpp +++ b/src/mongo/s/request_types/merge_chunk_request_type.cpp @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" +#include "mongo/db/write_concern_options.h" namespace mongo { namespace { @@ -131,7 +132,7 @@ BSONObj MergeChunkRequest::toConfigCommandBSON(const BSONObj& writeConcern) { appendAsConfigCommand(&cmdBuilder); // Tack on passed-in writeConcern - cmdBuilder.appendElements(writeConcern); + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, writeConcern); return cmdBuilder.obj(); } diff --git a/src/mongo/s/request_types/migration_secondary_throttle_options.cpp b/src/mongo/s/request_types/migration_secondary_throttle_options.cpp index d739671b1e3..c143fc9ea60 100644 --- a/src/mongo/s/request_types/migration_secondary_throttle_options.cpp +++ b/src/mongo/s/request_types/migration_secondary_throttle_options.cpp @@ -98,8 +98,12 @@ StatusWith MigrationSecondaryThrottleOptions: } if (secondaryThrottle != kOn) { - return Status(ErrorCodes::UnsupportedFormat, - "Cannot specify write concern when secondaryThrottle is not set"); + // Ignore the specified writeConcern, since it won't be used. This is necessary + // to normalize the otherwise non-standard way that moveChunk uses writeConcern (ie. + // only using it when secondaryThrottle: true), so that shardsvrs can enforce always + // receiving writeConcern on internalClient connections (at the ServiceEntryPoint + // layer). + return MigrationSecondaryThrottleOptions(secondaryThrottle, boost::none); } writeConcernBSON = writeConcernElem.Obj().getOwned(); diff --git a/src/mongo/s/request_types/migration_secondary_throttle_options_test.cpp b/src/mongo/s/request_types/migration_secondary_throttle_options_test.cpp index b295e3f0b3d..59c3fb130bd 100644 --- a/src/mongo/s/request_types/migration_secondary_throttle_options_test.cpp +++ b/src/mongo/s/request_types/migration_secondary_throttle_options_test.cpp @@ -168,20 +168,24 @@ TEST(MigrationSecondaryThrottleOptions, DisabledInBalancerConfig) { ASSERT_EQ(MigrationSecondaryThrottleOptions::kOff, options.getSecondaryThrottle()); } -TEST(MigrationSecondaryThrottleOptions, ParseFailsDisabledInCommandBSONWriteConcernSpecified) { - auto status = MigrationSecondaryThrottleOptions::createFromCommand( - BSON("someOtherField" << 1 << "secondaryThrottle" << false << "writeConcern" - << BSON("w" - << "majority"))); - ASSERT_EQ(ErrorCodes::UnsupportedFormat, status.getStatus().code()); -} - -TEST(MigrationSecondaryThrottleOptions, ParseFailsNotSpecifiedInCommandBSONWriteConcernSpecified) { - auto status = MigrationSecondaryThrottleOptions::createFromCommand( - BSON("someOtherField" << 1 << "writeConcern" - << BSON("w" - << "majority"))); - ASSERT_EQ(ErrorCodes::UnsupportedFormat, status.getStatus().code()); +TEST(MigrationSecondaryThrottleOptions, IgnoreWriteConcernWhenSecondaryThrottleOff) { + MigrationSecondaryThrottleOptions options = + assertGet(MigrationSecondaryThrottleOptions::createFromCommand( + BSON("someOtherField" << 1 << "_secondaryThrottle" << false << "writeConcern" + << BSON("w" + << "majority")))); + ASSERT_EQ(MigrationSecondaryThrottleOptions::kOff, options.getSecondaryThrottle()); + ASSERT(!options.isWriteConcernSpecified()); +} + +TEST(MigrationSecondaryThrottleOptions, IgnoreWriteConcernWhenSecondaryThrottleAbsent) { + MigrationSecondaryThrottleOptions options = + assertGet(MigrationSecondaryThrottleOptions::createFromCommand( + BSON("someOtherField" << 1 << "writeConcern" + << BSON("w" + << "majority")))); + ASSERT_EQ(MigrationSecondaryThrottleOptions::kDefault, options.getSecondaryThrottle()); + ASSERT(!options.isWriteConcernSpecified()); } TEST(MigrationSecondaryThrottleOptions, EqualityOperatorSameValue) { diff --git a/src/mongo/s/request_types/split_chunk_request_test.cpp b/src/mongo/s/request_types/split_chunk_request_test.cpp index 1727c3aa792..5759519a2b4 100644 --- a/src/mongo/s/request_types/split_chunk_request_test.cpp +++ b/src/mongo/s/request_types/split_chunk_request_test.cpp @@ -77,13 +77,13 @@ TEST(SplitChunkRequest, ConfigCommandtoBSON) { << "collEpoch" << OID("7fffffff0000000000000001") << "min" << BSON("a" << 1) << "max" << BSON("a" << 10) << "splitPoints" << BSON_ARRAY(BSON("a" << 5)) << "shard" << "shard0000"); - BSONObj writeConcernObj = BSON("writeConcern" << BSON("w" - << "majority")); + BSONObj writeConcernObj = BSON("w" + << "majority"); BSONObjBuilder cmdBuilder; { cmdBuilder.appendElements(serializedRequest); - cmdBuilder.appendElements(writeConcernObj); + cmdBuilder.append("writeConcern", writeConcernObj); } auto request = assertGet(SplitChunkRequest::parseFromConfigCommand(serializedRequest)); diff --git a/src/mongo/s/request_types/split_chunk_request_type.cpp b/src/mongo/s/request_types/split_chunk_request_type.cpp index 6773e413197..20e826c1400 100644 --- a/src/mongo/s/request_types/split_chunk_request_type.cpp +++ b/src/mongo/s/request_types/split_chunk_request_type.cpp @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" +#include "mongo/db/write_concern_options.h" namespace mongo { @@ -120,7 +121,7 @@ BSONObj SplitChunkRequest::toConfigCommandBSON(const BSONObj& writeConcern) { appendAsConfigCommand(&cmdBuilder); // Tack on passed-in writeConcern - cmdBuilder.appendElements(writeConcern); + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, writeConcern); return cmdBuilder.obj(); } diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 5d5b2ee0417..a37ccd808b8 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -129,9 +129,9 @@ BSONObjBuilder appendFieldsForStartTransaction(BSONObj cmd, repl::ReadConcernArgs readConcernArgs, boost::optional atClusterTime, bool doAppendStartTransaction) { - auto cmdWithReadConcern = !readConcernArgs.isEmpty() - ? appendReadConcernForTxn(std::move(cmd), readConcernArgs, atClusterTime) - : std::move(cmd); + // startTransaction: true always requires readConcern, even if it's empty. + auto cmdWithReadConcern = + appendReadConcernForTxn(std::move(cmd), readConcernArgs, atClusterTime); BSONObjBuilder bob(std::move(cmdWithReadConcern)); @@ -668,8 +668,8 @@ void TransactionRouter::Router::_assertAbortStatusIsOkOrNoSuchTransaction( << " from shard: " << response.shardId, status.isOK() || status.code() == ErrorCodes::NoSuchTransaction); - // abortTransaction is sent with no write concern, so there's no need to check for a write - // concern error. + // abortTransaction is sent with "local" write concern (w: 1), so there's no need to check for a + // write concern error. } std::vector TransactionRouter::Router::_getPendingParticipants() const { @@ -689,7 +689,10 @@ void TransactionRouter::Router::_clearPendingParticipants(OperationContext* opCt // transactions will be left open if the retry does not re-target any of these shards. std::vector abortRequests; for (const auto& participant : pendingParticipants) { - abortRequests.emplace_back(participant, BSON("abortTransaction" << 1)); + abortRequests.emplace_back(participant, + BSON("abortTransaction" + << 1 << WriteConcernOptions::kWriteConcernField + << WriteConcernOptions().toBSON())); } auto responses = gatherResponses(opCtx, NamespaceString::kAdminDb, @@ -1224,7 +1227,8 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC p().terminationInitiated = true; - auto abortCmd = BSON("abortTransaction" << 1); + auto abortCmd = BSON("abortTransaction" << 1 << WriteConcernOptions::kWriteConcernField + << WriteConcernOptions().toBSON()); std::vector abortRequests; for (const auto& participantEntry : o().participants) { abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd); diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index 18bc6833570..b5c313c4299 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -733,7 +733,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotSpecifyReadConcernAfterFir ErrorCodes::InvalidOptions); } -TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughNoReadConcernToParticipants) { +TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughEmptyReadConcernToParticipants) { repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); TxnNumber txnNum{3}; @@ -745,8 +745,9 @@ TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughNoReadConcernToPart BSONObj expectedNewObj = BSON("insert" << "test" - << "startTransaction" << true << "coordinator" << true - << "autocommit" << false << "txnNumber" << txnNum); + << "readConcern" << BSONObj() << "startTransaction" << true + << "coordinator" << true << "autocommit" << false << "txnNumber" + << txnNum); auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, diff --git a/src/mongo/shell/encrypted_dbclient_base.cpp b/src/mongo/shell/encrypted_dbclient_base.cpp index 883453e11db..f08af53e152 100644 --- a/src/mongo/shell/encrypted_dbclient_base.cpp +++ b/src/mongo/shell/encrypted_dbclient_base.cpp @@ -500,15 +500,23 @@ JS::Value EncryptedDBClientBase::getCollection() const { } -std::unique_ptr EncryptedDBClientBase::query(const NamespaceStringOrUUID& nsOrUuid, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) { - return _conn->query( - nsOrUuid, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); +std::unique_ptr EncryptedDBClientBase::query( + const NamespaceStringOrUUID& nsOrUuid, + Query query, + int nToReturn, + int nToSkip, + const BSONObj* fieldsToReturn, + int queryOptions, + int batchSize, + boost::optional readConcernObj) { + return _conn->query(nsOrUuid, + query, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize, + readConcernObj); } bool EncryptedDBClientBase::isFailed() const { diff --git a/src/mongo/shell/encrypted_dbclient_base.h b/src/mongo/shell/encrypted_dbclient_base.h index 518c3631a5f..406c5884391 100644 --- a/src/mongo/shell/encrypted_dbclient_base.h +++ b/src/mongo/shell/encrypted_dbclient_base.h @@ -118,13 +118,15 @@ public: void trace(JSTracer* trc) final; using DBClientBase::query; - std::unique_ptr query(const NamespaceStringOrUUID& nsOrUuid, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) final; + std::unique_ptr query( + const NamespaceStringOrUUID& nsOrUuid, + Query query, + int nToReturn, + int nToSkip, + const BSONObj* fieldsToReturn, + int queryOptions, + int batchSize, + boost::optional readConcernObj = boost::none) final; bool isFailed() const final; diff --git a/src/mongo/shell/feature_compatibility_version.js b/src/mongo/shell/feature_compatibility_version.js index 0894a2a0fd2..599e7a4c79f 100644 --- a/src/mongo/shell/feature_compatibility_version.js +++ b/src/mongo/shell/feature_compatibility_version.js @@ -25,7 +25,13 @@ function checkFCV(adminDB, version, targetVersion) { assert.eq(res.featureCompatibilityVersion.version, version, tojson(res)); assert.eq(res.featureCompatibilityVersion.targetVersion, targetVersion, tojson(res)); - let doc = adminDB.system.version.findOne({_id: "featureCompatibilityVersion"}); + // This query specifies an explicit readConcern because some FCV tests pass a connection that + // has manually run isMaster with internalClient, and mongod expects internalClients (ie. other + // cluster members) to include read/write concern (on commands that accept read/write concern). + let doc = adminDB.system.version.find({_id: "featureCompatibilityVersion"}) + .limit(1) + .readConcern("local") + .next(); assert.eq(doc.version, version, tojson(doc)); assert.eq(doc.targetVersion, targetVersion, tojson(doc)); } diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js index 3e9de7cb996..db7733a1c79 100644 --- a/src/mongo/shell/mongo.js +++ b/src/mongo/shell/mongo.js @@ -505,6 +505,22 @@ Mongo.prototype.readMode = function() { return this._readMode; }; +/** + * Run a function while forcing a certain readMode, and then return the readMode to its original + * setting afterwards. Passes this connection to the given function, and returns the function's + * result. + */ +Mongo.prototype._runWithForcedReadMode = function(forcedReadMode, fn) { + let origReadMode = this.readMode(); + this.forceReadMode(forcedReadMode); + try { + var res = fn(this); + } finally { + this.forceReadMode(origReadMode); + } + return res; +}; + // // Write Concern can be set at the connection level, and is used for all write operations unless // overridden at the collection level. diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index a5489f7e24b..b83991f7eeb 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -1672,17 +1672,6 @@ var ReplSetTest = function(opts) { "established on " + id); }; - function _runInCommandReadMode(conn, fn) { - let origReadMode = conn.readMode(); - conn.forceReadMode("commands"); - try { - var res = fn(); - } finally { - conn.forceReadMode(origReadMode); - } - return res; - } - // Wait until the optime of the specified type reaches the primary's last applied optime. Blocks // on all secondary nodes or just 'slaves', if specified. The timeout will reset if any of the // secondaries makes progress. @@ -1747,13 +1736,13 @@ var ReplSetTest = function(opts) { var slaveName = slave.host; var slaveConfigVersion = - _runInCommandReadMode(slave, - () => slave.getDB("local")['system.replset'] - .find() - .readConcern("local") - .limit(1) - .next() - .version); + slave._runWithForcedReadMode("commands", + () => slave.getDB("local")['system.replset'] + .find() + .readConcern("local") + .limit(1) + .next() + .version); if (masterConfigVersion != slaveConfigVersion) { print("ReplSetTest awaitReplication: secondary #" + secondaryCount + ", " + @@ -1763,13 +1752,13 @@ var ReplSetTest = function(opts) { if (slaveConfigVersion > masterConfigVersion) { master = self.getPrimary(); masterConfigVersion = - _runInCommandReadMode(master, - () => master.getDB("local")['system.replset'] - .find() - .readConcern("local") - .limit(1) - .next() - .version); + master._runWithForcedReadMode("commands", + () => master.getDB("local")['system.replset'] + .find() + .readConcern("local") + .limit(1) + .next() + .version); masterName = master.host; print("ReplSetTest awaitReplication: optime for primary, " + masterName + @@ -2459,7 +2448,8 @@ var ReplSetTest = function(opts) { } try { - return _runInCommandReadMode(this.mongo, () => operation(this.cursor)); + return this.mongo._runWithForcedReadMode("commands", + () => operation(this.cursor)); } catch (err) { print("Error: " + name + " threw '" + err.message + "' on " + this.mongo.host); // Occasionally, the capped collection will get truncated while we are iterating @@ -2495,20 +2485,21 @@ var ReplSetTest = function(opts) { // changed "cursorTimeoutMillis" to a short time period. this._cursorExhausted = false; // Although this line sets the read concern, it does not need to be called via - // _runInCommandReadMode() because it only creates the client-side cursor. It's not - // until next()/hasNext() are called that the find command gets sent to the server. + // _runWithForcedReadMode() because it only creates the client-side cursor. It's + // not until next()/hasNext() are called that the find command gets sent to the + // server. this.cursor = coll.find(query).sort({$natural: -1}).noCursorTimeout().readConcern("local"); }; this.getFirstDoc = function() { - return _runInCommandReadMode(this.mongo, - () => this.getOplogColl() - .find() - .sort({$natural: 1}) - .readConcern("local") - .limit(-1) - .next()); + return this.mongo._runWithForcedReadMode("commands", + () => this.getOplogColl() + .find() + .sort({$natural: 1}) + .readConcern("local") + .limit(-1) + .next()); }; this.getOplogColl = function() { -- cgit v1.2.1