diff options
author | Mihai Andrei <mihai.andrei@10gen.com> | 2020-03-09 14:48:58 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-04-02 14:47:39 +0000 |
commit | 7bcc4a0963d059ddfa7053ba54c768db3101ab69 (patch) | |
tree | 1a20311735415f4d384660a3abe472203f10f655 | |
parent | aaa0011c67807d5e70f3f06a434f115438295cdb (diff) | |
download | mongo-7bcc4a0963d059ddfa7053ba54c768db3101ab69.tar.gz |
SERVER-46665 Fix causal consistency for $out/$merge running on secondaries
(cherry picked from commit 526e5bcf9e8cd0e659123c553538e005ac5ab29a)
12 files changed, 80 insertions, 9 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml b/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml index b06657552a7..602b96b7024 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml @@ -49,7 +49,6 @@ executor: shell_options: eval: >- var testingReplication = true; - load('jstests/libs/override_methods/set_read_preference_secondary.js'); load('jstests/libs/override_methods/enable_causal_consistency.js'); load('jstests/libs/override_methods/detect_spawning_own_mongod.js'); readMode: commands diff --git a/jstests/aggregation/sources/merge/mode_replace_insert.js b/jstests/aggregation/sources/merge/mode_replace_insert.js index e81ac857dc4..3458cb69df8 100644 --- a/jstests/aggregation/sources/merge/mode_replace_insert.js +++ b/jstests/aggregation/sources/merge/mode_replace_insert.js @@ -36,6 +36,7 @@ outColl.drop(); assert.commandWorked(coll.insert([{_id: 0, a: {b: 1}}, {_id: 1, a: {b: 1}, c: 1}])); assert.commandWorked(outColl.createIndex({"a.b": 1, _id: 1}, {unique: true})); coll.aggregate([ + {$sort: {_id: 1}}, {$addFields: {_id: 0}}, { $merge: { diff --git a/jstests/libs/override_methods/set_read_preference_secondary.js b/jstests/libs/override_methods/set_read_preference_secondary.js index 788a53f5f14..49f0ea262a4 100644 --- a/jstests/libs/override_methods/set_read_preference_secondary.js +++ b/jstests/libs/override_methods/set_read_preference_secondary.js @@ -107,11 +107,9 @@ function runCommandWithReadPreferenceSecondary( } let shouldForceReadPreference = kCommandsSupportingReadPreference.has(commandName); - if (OverrideHelpers.isAggregationWithOutOrMergeStage(commandName, commandObjUnwrapped)) { - // An aggregation with a $out stage must be sent to the primary. - shouldForceReadPreference = false; - } else if ((commandName === "mapReduce" || commandName === "mapreduce") && - !OverrideHelpers.isMapReduceWithInlineOutput(commandName, commandObjUnwrapped)) { + + if ((commandName === "mapReduce" || commandName === "mapreduce") && + !OverrideHelpers.isMapReduceWithInlineOutput(commandName, commandObjUnwrapped)) { // A map-reduce operation with non-inline output must be sent to the primary. shouldForceReadPreference = false; } else if (conn.isMongos() && kDatabasesOnConfigServers.has(dbName)) { diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h index cdeb64221fa..a3d4e7f471a 100644 --- a/src/mongo/db/pipeline/document_source_writer.h +++ b/src/mongo/db/pipeline/document_source_writer.h @@ -183,6 +183,11 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() { _done = nextInput.getStatus() == GetNextResult::ReturnStatus::kEOF; return nextInput; } else { + // Ensure that the client's operationTime reflects the latest write even if the command + // fails. + ON_BLOCK_EXIT( + [&] { pExpCtx->mongoProcessInterface->updateClientOperationTime(pExpCtx->opCtx); }); + if (!_initialized) { initialize(); _initialized = true; diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index fe01db30903..021fc820a04 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -40,7 +40,10 @@ #include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/operation_context.h" +#include "mongo/db/operation_time_tracker.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" @@ -146,6 +149,25 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR return {"_id"}; } +void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const { + // In order to support causal consistency in a replica set or a sharded cluster when reading + // with secondary read preference, the secondary must propagate the primary's operation time + // to the client so that when the client attempts to read, the secondary will block until it + // has replicated the primary's writes. As such, the 'operationTime' returned from the + // primary is explicitly set on the given opCtx's client. + // + // Note that the operationTime is attached even when a command fails because writes may succeed + // while the command fails (such as in a $merge where 'whenMatched' is set to fail). This + // guarantees that the operation time returned to the client reflects the most recent + // successful write executed by this client. + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (replCoord) { + auto operationTime = OperationTimeTracker::get(opCtx)->getMaxOperationTime(); + repl::OpTime opTime(operationTime.asTimestamp(), replCoord->getTerm()); + repl::ReplClientInfo::forClient(opCtx->getClient()).setLastProxyWriteOpTimeForward(opTime); + } +} + bool CommonProcessInterface::keyPatternNamesExactPaths(const BSONObj& keyPattern, const std::set<FieldPath>& uniqueKeyPaths) { size_t nFieldsMatched = 0; diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h index 93a92626e3b..75c3e01c513 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h @@ -64,6 +64,8 @@ public: virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const override; + virtual void updateClientOperationTime(OperationContext* opCtx) const final; + boost::optional<ChunkVersion> refreshAndGetCollectionVersion( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss) const override; diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index f65a3fb5218..1d720645bb4 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -143,6 +143,12 @@ public: */ virtual bool supportsReadPreferenceForWriteOp( const boost::intrusive_ptr<ExpressionContext>& expCtx) = 0; + /** + * Advances the proxied write time associated with the client in ReplClientInfo to + * be at least as high as the one tracked by the OperationTimeTracker associated with the + * given operation context. + */ + virtual void updateClientOperationTime(OperationContext* opCtx) const = 0; /** * Inserts 'objs' into 'ns' and returns an error Status if the insert fails. If 'targetEpoch' is diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp index cc8fa07f967..6d48014b8ed 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -37,9 +37,10 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/operation_time_tracker.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/future.h" @@ -47,6 +48,8 @@ namespace mongo { namespace { +const char kOperationTimeFieldName[] = "operationTime"; + const auto replicaSetNodeExecutor = ServiceContext::declareDecoration<std::shared_ptr<executor::TaskExecutor>>(); } // namespace @@ -191,6 +194,17 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( } auto rcr = std::move(response.getValue()); + + // Update the OperationTimeTracker associated with 'opCtx' with the operation time from the + // primary's response. + auto operationTime = rcr.response.data[kOperationTimeFieldName]; + if (operationTime) { + invariant(operationTime.type() == BSONType::bsonTimestamp); + LogicalTime logicalTime(operationTime.timestamp()); + auto operationTimeTracker = OperationTimeTracker::get(opCtx); + operationTimeTracker->updateOperationTime(logicalTime); + } + if (!rcr.response.status.isOK()) { return rcr.response.status; } diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index c46fb010084..9b3f6f9105f 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -62,6 +62,8 @@ public: return true; } + void updateClientOperationTime(OperationContext* opCtx) const override {} + Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs, diff --git a/src/mongo/db/repl/repl_client_info.h b/src/mongo/db/repl/repl_client_info.h index 232c95460fe..51b953d2875 100644 --- a/src/mongo/db/repl/repl_client_info.h +++ b/src/mongo/db/repl/repl_client_info.h @@ -58,6 +58,26 @@ public: } /** + * Stores the operation time of the latest proxy write, that is, a write that was forwarded + * to and executed on a different node instead of being executed locally. + */ + void setLastProxyWriteOpTimeForward(const OpTime& opTime) { + // Only advance the operation time of the latest proxy write if it is greater than the one + // currently stored. + if (opTime > _lastProxyWriteOpTime) { + _lastProxyWriteOpTime = opTime; + } + } + + /** + * Returns the greater of the times set by 'setLastOp()' and + * 'setLastProxiedWriteOpTimeForward()'. + */ + OpTime getMaxKnownOpTime() const { + return _lastOp > _lastProxyWriteOpTime ? _lastOp : _lastProxyWriteOpTime; + } + + /** * Returns true when either setLastOp() or setLastOpToSystemLastOpTime() was called to set the * opTime under the current OperationContext. */ @@ -88,6 +108,8 @@ private: static const long long kUninitializedTerm = -1; OpTime _lastOp = OpTime(); + + OpTime _lastProxyWriteOpTime; }; } // namespace repl diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index f4497a8ccdf..3f1914537e0 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -392,7 +392,7 @@ LogicalTime getClientOperationTime(OperationContext* opCtx) { } return LogicalTime( - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp().getTimestamp()); + repl::ReplClientInfo::forClient(opCtx->getClient()).getMaxKnownOpTime().getTimestamp()); } /** diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index 6dfc27e8d3d..8c3cb00f70c 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -254,7 +254,7 @@ public: if (isReplSet) { // Attach our own last opTime. repl::OpTime lastOpTimeFromClient = - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + repl::ReplClientInfo::forClient(opCtx->getClient()).getMaxKnownOpTime(); replCoord->prepareReplMetadata(request.body, lastOpTimeFromClient, metadataBob); // For commands from mongos, append some info to help getLastError(w) work. // TODO: refactor out of here as part of SERVER-18236 |