summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@10gen.com>2020-03-09 14:48:58 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-02 14:47:39 +0000
commit7bcc4a0963d059ddfa7053ba54c768db3101ab69 (patch)
tree1a20311735415f4d384660a3abe472203f10f655
parentaaa0011c67807d5e70f3f06a434f115438295cdb (diff)
downloadmongo-7bcc4a0963d059ddfa7053ba54c768db3101ab69.tar.gz
SERVER-46665 Fix causal consistency for $out/$merge running on secondaries
(cherry picked from commit 526e5bcf9e8cd0e659123c553538e005ac5ab29a)
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml1
-rw-r--r--jstests/aggregation/sources/merge/mode_replace_insert.js1
-rw-r--r--jstests/libs/override_methods/set_read_preference_secondary.js8
-rw-r--r--src/mongo/db/pipeline/document_source_writer.h5
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.cpp22
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.h2
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp16
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h2
-rw-r--r--src/mongo/db/repl/repl_client_info.h22
-rw-r--r--src/mongo/db/service_entry_point_common.cpp2
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp2
12 files changed, 80 insertions, 9 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml b/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml
index b06657552a7..602b96b7024 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_secondary_reads.yml
@@ -49,7 +49,6 @@ executor:
shell_options:
eval: >-
var testingReplication = true;
- load('jstests/libs/override_methods/set_read_preference_secondary.js');
load('jstests/libs/override_methods/enable_causal_consistency.js');
load('jstests/libs/override_methods/detect_spawning_own_mongod.js');
readMode: commands
diff --git a/jstests/aggregation/sources/merge/mode_replace_insert.js b/jstests/aggregation/sources/merge/mode_replace_insert.js
index e81ac857dc4..3458cb69df8 100644
--- a/jstests/aggregation/sources/merge/mode_replace_insert.js
+++ b/jstests/aggregation/sources/merge/mode_replace_insert.js
@@ -36,6 +36,7 @@ outColl.drop();
assert.commandWorked(coll.insert([{_id: 0, a: {b: 1}}, {_id: 1, a: {b: 1}, c: 1}]));
assert.commandWorked(outColl.createIndex({"a.b": 1, _id: 1}, {unique: true}));
coll.aggregate([
+ {$sort: {_id: 1}},
{$addFields: {_id: 0}},
{
$merge: {
diff --git a/jstests/libs/override_methods/set_read_preference_secondary.js b/jstests/libs/override_methods/set_read_preference_secondary.js
index 788a53f5f14..49f0ea262a4 100644
--- a/jstests/libs/override_methods/set_read_preference_secondary.js
+++ b/jstests/libs/override_methods/set_read_preference_secondary.js
@@ -107,11 +107,9 @@ function runCommandWithReadPreferenceSecondary(
}
let shouldForceReadPreference = kCommandsSupportingReadPreference.has(commandName);
- if (OverrideHelpers.isAggregationWithOutOrMergeStage(commandName, commandObjUnwrapped)) {
- // An aggregation with a $out stage must be sent to the primary.
- shouldForceReadPreference = false;
- } else if ((commandName === "mapReduce" || commandName === "mapreduce") &&
- !OverrideHelpers.isMapReduceWithInlineOutput(commandName, commandObjUnwrapped)) {
+
+ if ((commandName === "mapReduce" || commandName === "mapreduce") &&
+ !OverrideHelpers.isMapReduceWithInlineOutput(commandName, commandObjUnwrapped)) {
// A map-reduce operation with non-inline output must be sent to the primary.
shouldForceReadPreference = false;
} else if (conn.isMongos() && kDatabasesOnConfigServers.has(dbName)) {
diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h
index cdeb64221fa..a3d4e7f471a 100644
--- a/src/mongo/db/pipeline/document_source_writer.h
+++ b/src/mongo/db/pipeline/document_source_writer.h
@@ -183,6 +183,11 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() {
_done = nextInput.getStatus() == GetNextResult::ReturnStatus::kEOF;
return nextInput;
} else {
+ // Ensure that the client's operationTime reflects the latest write even if the command
+ // fails.
+ ON_BLOCK_EXIT(
+ [&] { pExpCtx->mongoProcessInterface->updateClientOperationTime(pExpCtx->opCtx); });
+
if (!_initialized) {
initialize();
_initialized = true;
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
index fe01db30903..021fc820a04 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
@@ -40,7 +40,10 @@
#include "mongo/db/client.h"
#include "mongo/db/curop.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/operation_time_tracker.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
@@ -146,6 +149,25 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR
return {"_id"};
}
+void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const {
+ // In order to support causal consistency in a replica set or a sharded cluster when reading
+ // with secondary read preference, the secondary must propagate the primary's operation time
+ // to the client so that when the client attempts to read, the secondary will block until it
+ // has replicated the primary's writes. As such, the 'operationTime' returned from the
+ // primary is explicitly set on the given opCtx's client.
+ //
+ // Note that the operationTime is attached even when a command fails because writes may succeed
+ // while the command fails (such as in a $merge where 'whenMatched' is set to fail). This
+ // guarantees that the operation time returned to the client reflects the most recent
+ // successful write executed by this client.
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (replCoord) {
+ auto operationTime = OperationTimeTracker::get(opCtx)->getMaxOperationTime();
+ repl::OpTime opTime(operationTime.asTimestamp(), replCoord->getTerm());
+ repl::ReplClientInfo::forClient(opCtx->getClient()).setLastProxyWriteOpTimeForward(opTime);
+ }
+}
+
bool CommonProcessInterface::keyPatternNamesExactPaths(const BSONObj& keyPattern,
const std::set<FieldPath>& uniqueKeyPaths) {
size_t nFieldsMatched = 0;
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h
index 93a92626e3b..75c3e01c513 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h
@@ -64,6 +64,8 @@ public:
virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const override;
+ virtual void updateClientOperationTime(OperationContext* opCtx) const final;
+
boost::optional<ChunkVersion> refreshAndGetCollectionVersion(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss) const override;
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index f65a3fb5218..1d720645bb4 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -143,6 +143,12 @@ public:
*/
virtual bool supportsReadPreferenceForWriteOp(
const boost::intrusive_ptr<ExpressionContext>& expCtx) = 0;
+ /**
+ * Advances the proxied write time associated with the client in ReplClientInfo to
+ * be at least as high as the one tracked by the OperationTimeTracker associated with the
+ * given operation context.
+ */
+ virtual void updateClientOperationTime(OperationContext* opCtx) const = 0;
/**
* Inserts 'objs' into 'ns' and returns an error Status if the insert fails. If 'targetEpoch' is
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
index cc8fa07f967..6d48014b8ed 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
@@ -37,9 +37,10 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index_builds_coordinator.h"
#include "mongo/db/logical_session_id_helpers.h"
+#include "mongo/db/operation_time_tracker.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/future.h"
@@ -47,6 +48,8 @@
namespace mongo {
namespace {
+const char kOperationTimeFieldName[] = "operationTime";
+
const auto replicaSetNodeExecutor =
ServiceContext::declareDecoration<std::shared_ptr<executor::TaskExecutor>>();
} // namespace
@@ -191,6 +194,17 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary(
}
auto rcr = std::move(response.getValue());
+
+ // Update the OperationTimeTracker associated with 'opCtx' with the operation time from the
+ // primary's response.
+ auto operationTime = rcr.response.data[kOperationTimeFieldName];
+ if (operationTime) {
+ invariant(operationTime.type() == BSONType::bsonTimestamp);
+ LogicalTime logicalTime(operationTime.timestamp());
+ auto operationTimeTracker = OperationTimeTracker::get(opCtx);
+ operationTimeTracker->updateOperationTime(logicalTime);
+ }
+
if (!rcr.response.status.isOK()) {
return rcr.response.status;
}
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index c46fb010084..9b3f6f9105f 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -62,6 +62,8 @@ public:
return true;
}
+ void updateClientOperationTime(OperationContext* opCtx) const override {}
+
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
diff --git a/src/mongo/db/repl/repl_client_info.h b/src/mongo/db/repl/repl_client_info.h
index 232c95460fe..51b953d2875 100644
--- a/src/mongo/db/repl/repl_client_info.h
+++ b/src/mongo/db/repl/repl_client_info.h
@@ -58,6 +58,26 @@ public:
}
/**
+ * Stores the operation time of the latest proxy write, that is, a write that was forwarded
+ * to and executed on a different node instead of being executed locally.
+ */
+ void setLastProxyWriteOpTimeForward(const OpTime& opTime) {
+ // Only advance the operation time of the latest proxy write if it is greater than the one
+ // currently stored.
+ if (opTime > _lastProxyWriteOpTime) {
+ _lastProxyWriteOpTime = opTime;
+ }
+ }
+
+ /**
+ * Returns the greater of the times set by 'setLastOp()' and
+ * 'setLastProxiedWriteOpTimeForward()'.
+ */
+ OpTime getMaxKnownOpTime() const {
+ return _lastOp > _lastProxyWriteOpTime ? _lastOp : _lastProxyWriteOpTime;
+ }
+
+ /**
* Returns true when either setLastOp() or setLastOpToSystemLastOpTime() was called to set the
* opTime under the current OperationContext.
*/
@@ -88,6 +108,8 @@ private:
static const long long kUninitializedTerm = -1;
OpTime _lastOp = OpTime();
+
+ OpTime _lastProxyWriteOpTime;
};
} // namespace repl
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index f4497a8ccdf..3f1914537e0 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -392,7 +392,7 @@ LogicalTime getClientOperationTime(OperationContext* opCtx) {
}
return LogicalTime(
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp().getTimestamp());
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getMaxKnownOpTime().getTimestamp());
}
/**
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index 6dfc27e8d3d..8c3cb00f70c 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -254,7 +254,7 @@ public:
if (isReplSet) {
// Attach our own last opTime.
repl::OpTime lastOpTimeFromClient =
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getMaxKnownOpTime();
replCoord->prepareReplMetadata(request.body, lastOpTimeFromClient, metadataBob);
// For commands from mongos, append some info to help getLastError(w) work.
// TODO: refactor out of here as part of SERVER-18236