diff options
author | James Wahlin <james@mongodb.com> | 2019-05-20 14:45:00 -0400 |
---|---|---|
committer | James Wahlin <james@mongodb.com> | 2019-05-23 10:05:59 -0400 |
commit | 358c0af2fe875d6a768cf87d7ddfaeb3181f804a (patch) | |
tree | 502be585e7d8389abb5b8dc7d032ea76ceaf7e85 | |
parent | 017f8b201baffabbd8a66f278a4fbcaf8baced79 (diff) | |
download | mongo-358c0af2fe875d6a768cf87d7ddfaeb3181f804a.tar.gz |
SERVER-33727 Do not wait for write concern if opTime didn't change during write
-rw-r--r-- | jstests/core/aggregation_accepts_write_concern.js | 31 | ||||
-rw-r--r-- | jstests/core/commands_that_do_not_write_do_not_accept_wc.js | 6 | ||||
-rw-r--r-- | jstests/core/explain_agg_write_concern.js | 4 | ||||
-rw-r--r-- | jstests/replsets/aggregation_write_concern.js | 49 | ||||
-rw-r--r-- | src/mongo/db/commands.h | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_client_info.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_client_info.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/txn_two_phase_commit_cmds.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_mongod.cpp | 38 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_pipeline_cmd.cpp | 4 |
14 files changed, 150 insertions, 25 deletions
diff --git a/jstests/core/aggregation_accepts_write_concern.js b/jstests/core/aggregation_accepts_write_concern.js new file mode 100644 index 00000000000..6db86a31411 --- /dev/null +++ b/jstests/core/aggregation_accepts_write_concern.js @@ -0,0 +1,31 @@ +/** + * Confirms that the aggregate command accepts writeConcern regardless of whether the pipeline + * writes or is read-only. + * @tags: [assumes_write_concern_unchanged, does_not_support_stepdowns] + */ +(function() { + "use strict"; + + const testDB = db.getSiblingDB("aggregation_accepts_write_concern"); + assert.commandWorked(testDB.dropDatabase()); + const collName = "test"; + + assert.commandWorked(testDB.runCommand( + {insert: collName, documents: [{_id: 1}], writeConcern: {w: "majority"}})); + + // A read-only aggregation accepts writeConcern. + assert.commandWorked(testDB.runCommand({ + aggregate: collName, + pipeline: [{$match: {_id: 1}}], + cursor: {}, + writeConcern: {w: "majority"} + })); + + // An aggregation pipeline that writes accepts writeConcern. + assert.commandWorked(testDB.runCommand({ + aggregate: collName, + pipeline: [{$match: {_id: 1}}, {$out: collName + "_out"}], + cursor: {}, + writeConcern: {w: "majority"} + })); +})(); diff --git a/jstests/core/commands_that_do_not_write_do_not_accept_wc.js b/jstests/core/commands_that_do_not_write_do_not_accept_wc.js index e2e4e55a51b..17396961a74 100644 --- a/jstests/core/commands_that_do_not_write_do_not_accept_wc.js +++ b/jstests/core/commands_that_do_not_write_do_not_accept_wc.js @@ -15,9 +15,11 @@ var commands = []; - commands.push({count: collName, query: {type: 'oak'}}); + commands.push({find: collName, query: {_id: 1}}); + + commands.push({distinct: collName, key: "_id"}); - commands.push({aggregate: collName, pipeline: [{$sort: {type: 1}}], cursor: {}}); + commands.push({count: collName, query: {type: 'oak'}}); commands.push({ mapReduce: collName, diff --git a/jstests/core/explain_agg_write_concern.js b/jstests/core/explain_agg_write_concern.js index 84f83cb0755..5377d0011c3 100644 --- a/jstests/core/explain_agg_write_concern.js +++ b/jstests/core/explain_agg_write_concern.js @@ -32,8 +32,8 @@ assert.eq(1, outColl.find().itcount()); outColl.drop(); - // Agg should reject writeConcern if the last stage is not an $out. - assert.commandFailed( + // Agg should accept writeConcern even if read-only. + assert.commandWorked( db.runCommand({aggregate: coll.getName(), pipeline: [], cursor: {}, writeConcern: {w: 1}})); // Agg should succeed if the last stage is an $out and the explain flag is set. diff --git a/jstests/replsets/aggregation_write_concern.js b/jstests/replsets/aggregation_write_concern.js new file mode 100644 index 00000000000..8622fc0f113 --- /dev/null +++ b/jstests/replsets/aggregation_write_concern.js @@ -0,0 +1,49 @@ +/** + * Confirms that the aggregate command accepts writeConcern and that a read-only aggregation will + * not wait for the writeConcern specified to be satisfied. + */ +(function() { + "use strict"; + + load("jstests/libs/write_concern_util.js"); // For stopReplicationOnSecondaries, + // restartReplicationOnSecondaries + const name = "aggregation_write_concern"; + + const replTest = new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0}}]}); + + replTest.startSet(); + replTest.initiate(); + + const testDB = replTest.getPrimary().getDB(name); + const collectionName = "test"; + + // Stop replication and perform a w: 1 write. This will block subsequent 'writeConcern: + // majority' reads if the read command waits on writeConcern. + + stopReplicationOnSecondaries(replTest); + assert.commandWorked( + testDB.runCommand({insert: collectionName, documents: [{_id: 1}], writeConcern: {w: 1}})); + + // A read-only aggregation accepts the writeConcern option but does not wait for it. + let res = assert.commandWorked(testDB.runCommand({ + aggregate: collectionName, + pipeline: [{$match: {_id: 1}}], + cursor: {}, + writeConcern: {w: "majority"} + })); + assert(res.cursor.firstBatch.length); + assert.eq(res.cursor.firstBatch[0], {_id: 1}); + + // An aggregation pipeline that writes will block on writeConcern. + assert.commandFailedWithCode(testDB.runCommand({ + aggregate: collectionName, + pipeline: [{$match: {_id: 1}}, {$out: collectionName + "_out"}], + cursor: {}, + writeConcern: {w: "majority", wtimeout: 1000} + }), + ErrorCodes.WriteConcernFailed); + + restartReplicationOnSecondaries(replTest); + replTest.awaitLastOpCommitted(); + replTest.stopSet(); +})(); diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index b11daed69f0..0715354ba07 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -605,8 +605,7 @@ public: * field and wait for that write concern to be satisfied after the command runs. * * @param cmd is a BSONObj representation of the command that is used to determine if the - * the command supports a write concern. Ex. aggregate only supports write concern - * when $out is provided. + * the command supports a write concern. */ virtual bool supportsWriteConcern(const BSONObj& cmdObj) const = 0; diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 65703ffec1d..4550b138626 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -73,7 +73,7 @@ public: private: bool supportsWriteConcern() const override { - return Pipeline::aggSupportsWriteConcern(this->_request.body); + return true; } bool canIgnorePrepareConflicts() const override { @@ -102,7 +102,7 @@ public: void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { CommandHelpers::handleMarkKillOnClientDisconnect( - opCtx, !Pipeline::aggSupportsWriteConcern(_request.body)); + opCtx, !Pipeline::aggHasWriteStage(_request.body)); const auto aggregationRequest = uassertStatusOK( AggregationRequest::parseFromBSON(_dbName, _request.body, boost::none)); diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 0addbb6bf8a..08392b4c0de 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -297,7 +297,7 @@ void Pipeline::optimizePipeline() { stitch(); } -bool Pipeline::aggSupportsWriteConcern(const BSONObj& cmd) { +bool Pipeline::aggHasWriteStage(const BSONObj& cmd) { auto pipelineElement = cmd["pipeline"]; if (pipelineElement.type() != BSONType::Array) { return false; diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 71d94b800de..06c35e36c1c 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -131,7 +131,7 @@ public: /** * Returns true if the provided aggregation command has an $out or $merge stage. */ - static bool aggSupportsWriteConcern(const BSONObj& cmd); + static bool aggHasWriteStage(const BSONObj& cmd); /** * Given 'pathsOfInterest' which describes a set of paths which the caller is interested in, diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 2e9190198bf..9e7572e1fc1 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -476,7 +476,7 @@ void _logOpsInner(OperationContext* opCtx, // We set the last op on the client to 'finalOpTime', because that contains the // timestamp of the operation that the client actually performed. - ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime); + ReplClientInfo::forClient(opCtx->getClient()).setLastOp(opCtx, finalOpTime); }); } diff --git a/src/mongo/db/repl/repl_client_info.cpp b/src/mongo/db/repl/repl_client_info.cpp index 331c54c7ea6..dc4c04342c7 100644 --- a/src/mongo/db/repl/repl_client_info.cpp +++ b/src/mongo/db/repl/repl_client_info.cpp @@ -45,9 +45,25 @@ namespace repl { const Client::Decoration<ReplClientInfo> ReplClientInfo::forClient = Client::declareDecoration<ReplClientInfo>(); -void ReplClientInfo::setLastOp(const OpTime& ot) { +namespace { +// We use a struct to wrap lastOpSetExplicitly here in order to give the boolean a default value +// when initially constructed for the associated OperationContext. +struct LastOpInfo { + bool lastOpSetExplicitly = false; +}; +static const OperationContext::Decoration<LastOpInfo> lastOpInfo = + OperationContext::declareDecoration<LastOpInfo>(); +} // namespace + +bool ReplClientInfo::lastOpWasSetExplicitlyByClientForCurrentOperation( + OperationContext* opCtx) const { + return lastOpInfo(opCtx).lastOpSetExplicitly; +} + +void ReplClientInfo::setLastOp(OperationContext* opCtx, const OpTime& ot) { invariant(ot >= _lastOp); _lastOp = ot; + lastOpInfo(opCtx).lastOpSetExplicitly = true; } void ReplClientInfo::setLastOpToSystemLastOpTime(OperationContext* opCtx) { @@ -66,6 +82,8 @@ void ReplClientInfo::setLastOpToSystemLastOpTime(OperationContext* opCtx) { << " as that would be moving the OpTime backwards. This should only happen if " "there was a rollback recently"; } + + lastOpInfo(opCtx).lastOpSetExplicitly = true; } } diff --git a/src/mongo/db/repl/repl_client_info.h b/src/mongo/db/repl/repl_client_info.h index ca6651ef120..5f8e19cc6be 100644 --- a/src/mongo/db/repl/repl_client_info.h +++ b/src/mongo/db/repl/repl_client_info.h @@ -46,12 +46,18 @@ class ReplClientInfo { public: static const Client::Decoration<ReplClientInfo> forClient; - void setLastOp(const OpTime& op); + void setLastOp(OperationContext* opCtx, const OpTime& op); OpTime getLastOp() const { return _lastOp; } + /** + * Returns true when either setLastOp() or setLastOpToSystemLastOpTime() was called to set the + * opTime under the current OperationContext. + */ + bool lastOpWasSetExplicitlyByClientForCurrentOperation(OperationContext* opCtx) const; + // Resets the last op on this client; should only be used in testing. void clearLastOp_forTest() { _lastOp = OpTime(); diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index 3d4db04e8f8..eef32a087a5 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -115,7 +115,7 @@ public: // OpTime and the prepare OpTime. const auto systemLastOpTime = repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime(); - replClient.setLastOp(std::max(prepareOpTime, systemLastOpTime)); + replClient.setLastOp(opCtx, std::max(prepareOpTime, systemLastOpTime)); } invariant(opCtx->recoveryUnit()->getPrepareTimestamp() == diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index e8e9029caf2..0269a633e68 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -105,6 +105,20 @@ public: BSONObjBuilder& commandResponseBuilder) const override { auto lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + auto waitForWriteConcernAndAppendStatus = [&]() { + WriteConcernResult res; + auto waitForWCStatus = + mongo::waitForWriteConcern(opCtx, lastOpAfterRun, opCtx->getWriteConcern(), &res); + + CommandHelpers::appendCommandWCStatus(commandResponseBuilder, waitForWCStatus, res); + }; + + if (lastOpAfterRun != lastOpBeforeRun) { + invariant(lastOpAfterRun > lastOpBeforeRun); + waitForWriteConcernAndAppendStatus(); + return; + } + // Ensures that if we tried to do a write, we wait for write concern, even if that write was // a noop. // @@ -116,18 +130,24 @@ public: // concern on operations the transaction observed. As a result, "abortTransaction" only ever // waits on an oplog entry it wrote (and has already set lastOp to) or previous writes on // the same client. - if ((lastOpAfterRun == lastOpBeforeRun) && - opCtx->lockState()->wasGlobalLockTakenForWrite() && - (invocation->definition()->getName() != "abortTransaction")) { - repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); - lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + if (opCtx->lockState()->wasGlobalLockTakenForWrite()) { + if (invocation->definition()->getName() != "abortTransaction") { + repl::ReplClientInfo::forClient(opCtx->getClient()) + .setLastOpToSystemLastOpTime(opCtx); + lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + } + waitForWriteConcernAndAppendStatus(); + return; } - WriteConcernResult res; - auto waitForWCStatus = - mongo::waitForWriteConcern(opCtx, lastOpAfterRun, opCtx->getWriteConcern(), &res); + if (repl::ReplClientInfo::forClient(opCtx->getClient()) + .lastOpWasSetExplicitlyByClientForCurrentOperation(opCtx)) { + waitForWriteConcernAndAppendStatus(); + return; + } - CommandHelpers::appendCommandWCStatus(commandResponseBuilder, waitForWCStatus, res); + // If no write was attempted and the client's lastOp was not changed by the current network + // operation then we skip waiting for writeConcern. } void waitForLinearizableReadConcern(OperationContext* opCtx) const override { diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index c05275fbe28..25dbc52d9fc 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -67,7 +67,7 @@ public: private: bool supportsWriteConcern() const override { - return Pipeline::aggSupportsWriteConcern(_request.body); + return true; } bool supportsReadConcern(repl::ReadConcernLevel level) const override { @@ -104,7 +104,7 @@ public: void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { CommandHelpers::handleMarkKillOnClientDisconnect( - opCtx, !Pipeline::aggSupportsWriteConcern(_request.body)); + opCtx, !Pipeline::aggHasWriteStage(_request.body)); auto bob = reply->getBodyBuilder(); _runAggCommand(opCtx, _dbName, _request.body, boost::none, &bob); |