summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml1
-rw-r--r--jstests/aggregation/sources/merge/mode_replace_insert.js1
-rw-r--r--jstests/libs/override_methods/set_read_preference_secondary.js8
-rw-r--r--src/mongo/db/pipeline/document_source_writer.h5
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.cpp22
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.h2
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h7
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp16
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h2
-rw-r--r--src/mongo/db/repl/repl_client_info.h22
-rw-r--r--src/mongo/db/service_entry_point_common.cpp2
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp2
12 files changed, 81 insertions, 9 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml b/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml
index b06657552a7..602b96b7024 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml
@@ -49,7 +49,6 @@ executor:
shell_options:
eval: >-
var testingReplication = true;
- load('jstests/libs/override_methods/set_read_preference_secondary.js');
load('jstests/libs/override_methods/enable_causal_consistency.js');
load('jstests/libs/override_methods/detect_spawning_own_mongod.js');
readMode: commands
diff --git a/jstests/aggregation/sources/merge/mode_replace_insert.js b/jstests/aggregation/sources/merge/mode_replace_insert.js
index e81ac857dc4..3458cb69df8 100644
--- a/jstests/aggregation/sources/merge/mode_replace_insert.js
+++ b/jstests/aggregation/sources/merge/mode_replace_insert.js
@@ -36,6 +36,7 @@ outColl.drop();
assert.commandWorked(coll.insert([{_id: 0, a: {b: 1}}, {_id: 1, a: {b: 1}, c: 1}]));
assert.commandWorked(outColl.createIndex({"a.b": 1, _id: 1}, {unique: true}));
coll.aggregate([
+ {$sort: {_id: 1}},
{$addFields: {_id: 0}},
{
$merge: {
diff --git a/jstests/libs/override_methods/set_read_preference_secondary.js b/jstests/libs/override_methods/set_read_preference_secondary.js
index a479dac391b..9bed1fdad44 100644
--- a/jstests/libs/override_methods/set_read_preference_secondary.js
+++ b/jstests/libs/override_methods/set_read_preference_secondary.js
@@ -107,11 +107,9 @@ function runCommandWithReadPreferenceSecondary(
}
let shouldForceReadPreference = kCommandsSupportingReadPreference.has(commandName);
- if (OverrideHelpers.isAggregationWithOutOrMergeStage(commandName, commandObjUnwrapped)) {
- // An aggregation with a $out stage must be sent to the primary.
- shouldForceReadPreference = false;
- } else if ((commandName === "mapReduce" || commandName === "mapreduce") &&
- !OverrideHelpers.isMapReduceWithInlineOutput(commandName, commandObjUnwrapped)) {
+
+ if ((commandName === "mapReduce" || commandName === "mapreduce") &&
+ !OverrideHelpers.isMapReduceWithInlineOutput(commandName, commandObjUnwrapped)) {
// A map-reduce operation with non-inline output must be sent to the primary.
shouldForceReadPreference = false;
} else if ((conn.isMongos() && kDatabasesOnConfigServers.has(dbName)) || conn._isConfigServer) {
diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h
index 5110582faeb..8f89573aaf4 100644
--- a/src/mongo/db/pipeline/document_source_writer.h
+++ b/src/mongo/db/pipeline/document_source_writer.h
@@ -178,6 +178,11 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() {
_done = nextInput.getStatus() == GetNextResult::ReturnStatus::kEOF;
return nextInput;
} else {
+ // Ensure that the client's operationTime reflects the latest write even if the command
+ // fails.
+ ON_BLOCK_EXIT(
+ [&] { pExpCtx->mongoProcessInterface->updateClientOperationTime(pExpCtx->opCtx); });
+
if (!_initialized) {
initialize();
_initialized = true;
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
index 376fb3c9af7..15fde6baff8 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
@@ -40,7 +40,10 @@
#include "mongo/db/client.h"
#include "mongo/db/curop.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/operation_time_tracker.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
@@ -146,6 +149,25 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR
return {"_id"};
}
+void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const {
+ // In order to support causal consistency in a replica set or a sharded cluster when reading
+ // with secondary read preference, the secondary must propagate the primary's operation time
+ // to the client so that when the client attempts to read, the secondary will block until it
+ // has replicated the primary's writes. As such, the 'operationTime' returned from the
+ // primary is explicitly set on the given opCtx's client.
+ //
+ // Note that the operationTime is attached even when a command fails because writes may succeed
+ // while the command fails (such as in a $merge where 'whenMatched' is set to fail). This
+ // guarantees that the operation time returned to the client reflects the most recent
+ // successful write executed by this client.
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (replCoord) {
+ auto operationTime = OperationTimeTracker::get(opCtx)->getMaxOperationTime();
+ repl::OpTime opTime(operationTime.asTimestamp(), replCoord->getTerm());
+ repl::ReplClientInfo::forClient(opCtx->getClient()).setLastProxyWriteOpTimeForward(opTime);
+ }
+}
+
bool CommonProcessInterface::keyPatternNamesExactPaths(const BSONObj& keyPattern,
const std::set<FieldPath>& uniqueKeyPaths) {
size_t nFieldsMatched = 0;
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h
index ebe970987c8..e3d68b5bdf7 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h
@@ -64,6 +64,8 @@ public:
virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const override;
+ virtual void updateClientOperationTime(OperationContext* opCtx) const final;
+
boost::optional<ChunkVersion> refreshAndGetCollectionVersion(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss) const override;
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index 7b0be46f8aa..f7d8f8e0a2e 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -138,6 +138,13 @@ public:
virtual bool isSharded(OperationContext* opCtx, const NamespaceString& ns) = 0;
/**
+ * Advances the proxied write time associated with the client in ReplClientInfo to
+ * be at least as high as the one tracked by the OperationTimeTracker associated with the
+ * given operation context.
+ */
+ virtual void updateClientOperationTime(OperationContext* opCtx) const = 0;
+
+ /**
* Inserts 'objs' into 'ns' and returns an error Status if the insert fails. If 'targetEpoch' is
* set, throws ErrorCodes::StaleEpoch if the targeted collection does not have the same epoch or
* the epoch changes during the course of the insert.
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
index cc8fa07f967..6d48014b8ed 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
@@ -37,9 +37,10 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index_builds_coordinator.h"
#include "mongo/db/logical_session_id_helpers.h"
+#include "mongo/db/operation_time_tracker.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/future.h"
@@ -47,6 +48,8 @@
namespace mongo {
namespace {
+const char kOperationTimeFieldName[] = "operationTime";
+
const auto replicaSetNodeExecutor =
ServiceContext::declareDecoration<std::shared_ptr<executor::TaskExecutor>>();
} // namespace
@@ -191,6 +194,17 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary(
}
auto rcr = std::move(response.getValue());
+
+ // Update the OperationTimeTracker associated with 'opCtx' with the operation time from the
+ // primary's response.
+ auto operationTime = rcr.response.data[kOperationTimeFieldName];
+ if (operationTime) {
+ invariant(operationTime.type() == BSONType::bsonTimestamp);
+ LogicalTime logicalTime(operationTime.timestamp());
+ auto operationTimeTracker = OperationTimeTracker::get(opCtx);
+ operationTimeTracker->updateOperationTime(logicalTime);
+ }
+
if (!rcr.response.status.isOK()) {
return rcr.response.status;
}
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 4ec2bf63ca6..4a3b70dbe28 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -57,6 +57,8 @@ public:
return false;
}
+ void updateClientOperationTime(OperationContext* opCtx) const override {}
+
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
diff --git a/src/mongo/db/repl/repl_client_info.h b/src/mongo/db/repl/repl_client_info.h
index 232c95460fe..51b953d2875 100644
--- a/src/mongo/db/repl/repl_client_info.h
+++ b/src/mongo/db/repl/repl_client_info.h
@@ -58,6 +58,26 @@ public:
}
/**
+ * Stores the operation time of the latest proxy write, that is, a write that was forwarded
+ * to and executed on a different node instead of being executed locally.
+ */
+ void setLastProxyWriteOpTimeForward(const OpTime& opTime) {
+ // Only advance the operation time of the latest proxy write if it is greater than the one
+ // currently stored.
+ if (opTime > _lastProxyWriteOpTime) {
+ _lastProxyWriteOpTime = opTime;
+ }
+ }
+
+ /**
+ * Returns the greater of the times set by 'setLastOp()' and
+ * 'setLastProxiedWriteOpTimeForward()'.
+ */
+ OpTime getMaxKnownOpTime() const {
+ return _lastOp > _lastProxyWriteOpTime ? _lastOp : _lastProxyWriteOpTime;
+ }
+
+ /**
* Returns true when either setLastOp() or setLastOpToSystemLastOpTime() was called to set the
* opTime under the current OperationContext.
*/
@@ -88,6 +108,8 @@ private:
static const long long kUninitializedTerm = -1;
OpTime _lastOp = OpTime();
+
+ OpTime _lastProxyWriteOpTime;
};
} // namespace repl
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 7c1008d0623..5aaed4169bd 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -396,7 +396,7 @@ LogicalTime getClientOperationTime(OperationContext* opCtx) {
}
return LogicalTime(
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp().getTimestamp());
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getMaxKnownOpTime().getTimestamp());
}
/**
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index 6cb373d91b4..137f921bb6e 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -242,7 +242,7 @@ public:
if (isReplSet) {
// Attach our own last opTime.
repl::OpTime lastOpTimeFromClient =
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getMaxKnownOpTime();
replCoord->prepareReplMetadata(request.body, lastOpTimeFromClient, metadataBob);
// For commands from mongos, append some info to help getLastError(w) work.
// TODO: refactor out of here as part of SERVER-18236