summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2020-02-13 16:38:33 -0500
committerGregory Noma <gregory.noma@gmail.com>2020-02-18 17:13:19 -0500
commita68508f9922ad163ff98f8fa13953b1efe9d57d0 (patch)
tree1ead40c94d7da611cbb6cd36a342cf76ffe2e452
parent11640b2138d40777a4b45005628b4facfba7e6b2 (diff)
downloadmongo-46137.tar.gz
SERVER-46137 Implement ReplicaSetNodeProcessInterface methods needed for $merge46137
-rw-r--r--jstests/noPassthrough/merge_on_secondary.js48
-rw-r--r--src/mongo/db/db.cpp4
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.cpp7
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.h8
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp58
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp16
7 files changed, 108 insertions, 37 deletions
diff --git a/jstests/noPassthrough/merge_on_secondary.js b/jstests/noPassthrough/merge_on_secondary.js
new file mode 100644
index 00000000000..94ecd8cc232
--- /dev/null
+++ b/jstests/noPassthrough/merge_on_secondary.js
@@ -0,0 +1,48 @@
+/**
+ * Tests the behavior of $merge being run on a secondary.
+ *
+ * @tags: [assumes_unsharded_collection, requires_replication, requires_spawning_own_processes]
+ */
+(function() {
+"use strict";
+
+load("jstests/aggregation/extras/merge_helpers.js"); // For withEachMergeMode.
+
+let replTest = new ReplSetTest({nodes: 2});
+replTest.startSet();
+replTest.initiate();
+replTest.awaitReplication();
+
+let primary = replTest.getPrimary().getDB('test');
+let secondary = replTest.getSecondary().getDB('test');
+secondary.setSlaveOk(true);
+
+const collPrimary = primary.getCollection('coll');
+const collSecondary = secondary.getCollection('coll');
+const outColl = primary.getCollection('outColl');
+
+assert.commandWorked(collPrimary.insert({_id: 0, a: 1}, {writeConcern: {w: 2}}));
+assert.commandWorked(collPrimary.insert({_id: 1, a: 2}, {writeConcern: {w: 2}}));
+
+// Make sure the $merge succeeds without any duplicate keys.
+withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
+ // Skip the combination of merge modes which will fail depending on the contents of the
+ // source and target collection, as this will cause the aggregation to fail.
+ if (whenMatchedMode == "fail" || whenNotMatchedMode == "fail") {
+ return;
+ }
+
+ collSecondary.aggregate([{
+ $merge: {
+ into: outColl.getName(),
+ whenMatched: whenMatchedMode,
+ whenNotMatched: whenNotMatchedMode
+ }
+ }]);
+
+ assert.eq(whenNotMatchedMode == "discard" ? 0 : 2, outColl.find().itcount());
+ outColl.drop();
+});
+
+replTest.stopSet();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index b7f1c0945c8..cc005a763ab 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -968,9 +968,11 @@ void setUpReplication(ServiceContext* serviceContext) {
SecureRandom().nextInt64());
// Only create a ReplicaSetNodeExecutor if sharding is disabled and replication is enabled.
// Note that sharding sets up its own executors for scheduling work to remote nodes.
- if (serverGlobalParams.clusterRole == ClusterRole::None && replCoord->isReplEnabled())
+ if (serverGlobalParams.clusterRole == ClusterRole::None && replCoord->isReplEnabled()) {
ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor(
serviceContext, makeReplicaSetNodeExecutor(serviceContext));
+ ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(serviceContext)->startup();
+ }
repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord));
repl::setOplogCollectionName(serviceContext);
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
index ae19e6a7635..b8a76b0b915 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
@@ -200,4 +200,11 @@ std::string CommonProcessInterface::getHostAndPort(OperationContext* opCtx) cons
return getHostNameCachedAndPort();
}
+void CommonProcessInterface::attachWriteConcern(BatchedCommandRequest* request,
+ const WriteConcernOptions& writeConcern) {
+ if (!writeConcern.usedDefault) {
+ request->setWriteConcern(writeConcern.toBSON());
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h
index 7f68f8d201f..ca83c99a0e9 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h
@@ -33,6 +33,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
+#include "mongo/s/write_ops/batched_command_request.h"
namespace mongo {
@@ -52,6 +53,13 @@ public:
static bool keyPatternNamesExactPaths(const BSONObj& keyPattern,
const std::set<FieldPath>& uniqueKeyPaths);
+ /**
+ * Attaches the write concern to the given batch request. If 'writeConcern' has been default
+ * initialized to {w: 0, wtimeout: 0} then we do not bother attaching it.
+ */
+ static void attachWriteConcern(BatchedCommandRequest* request,
+ const WriteConcernOptions& writeConcern);
+
std::vector<BSONObj> getCurrentOps(const boost::intrusive_ptr<ExpressionContext>& expCtx,
CurrentOpConnectionsMode connMode,
CurrentOpSessionsMode sessionMode,
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
index bed008e597e..f7c853b2a44 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/index_builds_coordinator.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/future.h"
namespace mongo {
@@ -69,16 +70,13 @@ Status ReplicaSetNodeProcessInterface::insert(const boost::intrusive_ptr<Express
std::vector<BSONObj>&& objs,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) {
- auto writeResults = performInserts(
- expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+ auto&& opCtx = expCtx->opCtx;
+ BatchedCommandRequest insertCommand(
+ buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+ CommonProcessInterface::attachWriteConcern(&insertCommand, wc);
- // Need to check each result in the batch since the writes are unordered.
- for (const auto& result : writeResults.results) {
- if (result.getStatus() != Status::OK()) {
- return result.getStatus();
- }
- }
- return Status::OK();
+ return _executeCommandOnPrimary(opCtx, ns, _buildCommandObject(opCtx, insertCommand))
+ .getStatus();
}
StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::update(
@@ -89,20 +87,21 @@ StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::
UpsertType upsert,
bool multi,
boost::optional<OID> targetEpoch) {
- auto writeResults =
- performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
-
- // Need to check each result in the batch since the writes are unordered.
- UpdateResult updateResult;
- for (const auto& result : writeResults.results) {
- if (result.getStatus() != Status::OK()) {
- return result.getStatus();
- }
- updateResult.nMatched += result.getValue().getN();
- updateResult.nModified += result.getValue().getNModified();
+ auto&& opCtx = expCtx->opCtx;
+ BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
+ CommonProcessInterface::attachWriteConcern(&updateCommand, wc);
+
+ auto result = _executeCommandOnPrimary(opCtx, ns, _buildCommandObject(opCtx, updateCommand));
+ if (!result.isOK()) {
+ return result.getStatus();
}
- return updateResult;
+
+ std::string errMsg;
+ BatchedCommandResponse response;
+ uassert(31450, errMsg, response.parseBSON(result.getValue(), &errMsg));
+
+ return UpdateResult{response.getN(), response.getNModified()};
}
std::list<BSONObj> ReplicaSetNodeProcessInterface::getIndexSpecs(OperationContext* opCtx,
@@ -192,7 +191,7 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary(
std::move(promise));
auto scheduleResult = _executor->scheduleRemoteCommand(
std::move(request), [promisePtr](const auto& args) { promisePtr->emplaceValue(args); });
- if (!scheduleResult.getStatus().isOK()) {
+ if (!scheduleResult.isOK()) {
// Since the command failed to be scheduled, the callback above did not and will not run.
// Thus, it is safe to fulfill the promise here without worrying about synchronizing access
// with the executor's thread.
@@ -227,4 +226,19 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary(
return rcr.response.data;
}
+BSONObj ReplicaSetNodeProcessInterface::_buildCommandObject(
+ OperationContext* opCtx, const BatchedCommandRequest& command) const {
+ BSONObjBuilder requestBuilder;
+ command.serialize(&requestBuilder);
+ {
+ OperationSessionInfo sessionInfo;
+ if (opCtx->getLogicalSessionId()) {
+ sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
+ }
+ sessionInfo.setTxnNumber(opCtx->getTxnNumber());
+ sessionInfo.serialize(&requestBuilder);
+ }
+ return requestBuilder.obj();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
index 0729ff75866..87a7e5c03f2 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
@@ -32,6 +32,7 @@
#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h"
+#include "mongo/s/write_ops/batched_command_request.h"
namespace mongo {
@@ -98,6 +99,9 @@ private:
const NamespaceString& ns,
const BSONObj& cmdObj) const;
+ BSONObj _buildCommandObject(OperationContext* opCtx,
+ const BatchedCommandRequest& command) const;
+
executor::TaskExecutor* _executor;
};
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index 087a258a3fa..24ad6c9acca 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -61,18 +61,6 @@ using write_ops::Insert;
using write_ops::Update;
using write_ops::UpdateOpEntry;
-namespace {
-
-// Attaches the write concern to the given batch request. If it looks like 'writeConcern' has
-// been default initialized to {w: 0, wtimeout: 0} then we do not bother attaching it.
-void attachWriteConcern(BatchedCommandRequest* request, const WriteConcernOptions& writeConcern) {
- if (!writeConcern.wMode.empty() || writeConcern.wNumNodes > 0) {
- request->setWriteConcern(writeConcern.toBSON());
- }
-}
-
-} // namespace
-
bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS);
Lock::CollectionLock collLock(opCtx, nss, MODE_IS);
@@ -127,7 +115,7 @@ Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<Expression
buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
// If applicable, attach a write concern to the batched command request.
- attachWriteConcern(&insertCommand, wc);
+ CommonProcessInterface::attachWriteConcern(&insertCommand, wc);
ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response, targetEpoch);
@@ -148,7 +136,7 @@ StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::upd
BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
// If applicable, attach a write concern to the batched command request.
- attachWriteConcern(&updateCommand, wc);
+ CommonProcessInterface::attachWriteConcern(&updateCommand, wc);
ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response, targetEpoch);