From a68508f9922ad163ff98f8fa13953b1efe9d57d0 Mon Sep 17 00:00:00 2001 From: Gregory Noma Date: Thu, 13 Feb 2020 16:38:33 -0500 Subject: SERVER-46137 Implement ReplicaSetNodeProcessInterface methods needed for $merge --- jstests/noPassthrough/merge_on_secondary.js | 48 ++++++++++++++++++ src/mongo/db/db.cpp | 4 +- .../process_interface/common_process_interface.cpp | 7 +++ .../process_interface/common_process_interface.h | 8 +++ .../replica_set_node_process_interface.cpp | 58 ++++++++++++++-------- .../replica_set_node_process_interface.h | 4 ++ .../shardsvr_process_interface.cpp | 16 +----- 7 files changed, 108 insertions(+), 37 deletions(-) create mode 100644 jstests/noPassthrough/merge_on_secondary.js diff --git a/jstests/noPassthrough/merge_on_secondary.js b/jstests/noPassthrough/merge_on_secondary.js new file mode 100644 index 00000000000..94ecd8cc232 --- /dev/null +++ b/jstests/noPassthrough/merge_on_secondary.js @@ -0,0 +1,48 @@ +/** + * Tests the behavior of $merge being run on a secondary. + * + * @tags: [assumes_unsharded_collection, requires_replication, requires_spawning_own_processes] + */ +(function() { +"use strict"; + +load("jstests/aggregation/extras/merge_helpers.js"); // For withEachMergeMode. + +let replTest = new ReplSetTest({nodes: 2}); +replTest.startSet(); +replTest.initiate(); +replTest.awaitReplication(); + +let primary = replTest.getPrimary().getDB('test'); +let secondary = replTest.getSecondary().getDB('test'); +secondary.setSlaveOk(true); + +const collPrimary = primary.getCollection('coll'); +const collSecondary = secondary.getCollection('coll'); +const outColl = primary.getCollection('outColl'); + +assert.commandWorked(collPrimary.insert({_id: 0, a: 1}, {writeConcern: {w: 2}})); +assert.commandWorked(collPrimary.insert({_id: 1, a: 2}, {writeConcern: {w: 2}})); + +// Make sure the $merge succeeds without any duplicate keys. +withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => { + // Skip the combination of merge modes which will fail depending on the contents of the + // source and target collection, as this will cause the aggregation to fail. + if (whenMatchedMode == "fail" || whenNotMatchedMode == "fail") { + return; + } + + collSecondary.aggregate([{ + $merge: { + into: outColl.getName(), + whenMatched: whenMatchedMode, + whenNotMatched: whenNotMatchedMode + } + }]); + + assert.eq(whenNotMatchedMode == "discard" ? 0 : 2, outColl.find().itcount()); + outColl.drop(); +}); + +replTest.stopSet(); +})(); \ No newline at end of file diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index b7f1c0945c8..cc005a763ab 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -968,9 +968,11 @@ void setUpReplication(ServiceContext* serviceContext) { SecureRandom().nextInt64()); // Only create a ReplicaSetNodeExecutor if sharding is disabled and replication is enabled. // Note that sharding sets up its own executors for scheduling work to remote nodes. - if (serverGlobalParams.clusterRole == ClusterRole::None && replCoord->isReplEnabled()) + if (serverGlobalParams.clusterRole == ClusterRole::None && replCoord->isReplEnabled()) { ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor( serviceContext, makeReplicaSetNodeExecutor(serviceContext)); + ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(serviceContext)->startup(); + } repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord)); repl::setOplogCollectionName(serviceContext); diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index ae19e6a7635..b8a76b0b915 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -200,4 +200,11 @@ std::string CommonProcessInterface::getHostAndPort(OperationContext* opCtx) cons return getHostNameCachedAndPort(); } +void CommonProcessInterface::attachWriteConcern(BatchedCommandRequest* request, + const WriteConcernOptions& writeConcern) { + if (!writeConcern.usedDefault) { + request->setWriteConcern(writeConcern.toBSON()); + } +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h index 7f68f8d201f..ca83c99a0e9 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/pipeline/process_interface/mongo_process_interface.h" +#include "mongo/s/write_ops/batched_command_request.h" namespace mongo { @@ -52,6 +53,13 @@ public: static bool keyPatternNamesExactPaths(const BSONObj& keyPattern, const std::set& uniqueKeyPaths); + /** + * Attaches the write concern to the given batch request. If 'writeConcern' has been default + * initialized to {w: 0, wtimeout: 0} then we do not bother attaching it. + */ + static void attachWriteConcern(BatchedCommandRequest* request, + const WriteConcernOptions& writeConcern); + std::vector getCurrentOps(const boost::intrusive_ptr& expCtx, CurrentOpConnectionsMode connMode, CurrentOpSessionsMode sessionMode, diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp index bed008e597e..f7c853b2a44 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -40,6 +40,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/future.h" namespace mongo { @@ -69,16 +70,13 @@ Status ReplicaSetNodeProcessInterface::insert(const boost::intrusive_ptr&& objs, const WriteConcernOptions& wc, boost::optional targetEpoch) { - auto writeResults = performInserts( - expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + auto&& opCtx = expCtx->opCtx; + BatchedCommandRequest insertCommand( + buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + CommonProcessInterface::attachWriteConcern(&insertCommand, wc); - // Need to check each result in the batch since the writes are unordered. - for (const auto& result : writeResults.results) { - if (result.getStatus() != Status::OK()) { - return result.getStatus(); - } - } - return Status::OK(); + return _executeCommandOnPrimary(opCtx, ns, _buildCommandObject(opCtx, insertCommand)) + .getStatus(); } StatusWith ReplicaSetNodeProcessInterface::update( @@ -89,20 +87,21 @@ StatusWith ReplicaSetNodeProcessInterface:: UpsertType upsert, bool multi, boost::optional targetEpoch) { - auto writeResults = - performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); - - // Need to check each result in the batch since the writes are unordered. - UpdateResult updateResult; - for (const auto& result : writeResults.results) { - if (result.getStatus() != Status::OK()) { - return result.getStatus(); - } - updateResult.nMatched += result.getValue().getN(); - updateResult.nModified += result.getValue().getNModified(); + auto&& opCtx = expCtx->opCtx; + BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); + CommonProcessInterface::attachWriteConcern(&updateCommand, wc); + + auto result = _executeCommandOnPrimary(opCtx, ns, _buildCommandObject(opCtx, updateCommand)); + if (!result.isOK()) { + return result.getStatus(); } - return updateResult; + + std::string errMsg; + BatchedCommandResponse response; + uassert(31450, errMsg, response.parseBSON(result.getValue(), &errMsg)); + + return UpdateResult{response.getN(), response.getNModified()}; } std::list ReplicaSetNodeProcessInterface::getIndexSpecs(OperationContext* opCtx, @@ -192,7 +191,7 @@ StatusWith ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( std::move(promise)); auto scheduleResult = _executor->scheduleRemoteCommand( std::move(request), [promisePtr](const auto& args) { promisePtr->emplaceValue(args); }); - if (!scheduleResult.getStatus().isOK()) { + if (!scheduleResult.isOK()) { // Since the command failed to be scheduled, the callback above did not and will not run. // Thus, it is safe to fulfill the promise here without worrying about synchronizing access // with the executor's thread. @@ -227,4 +226,19 @@ StatusWith ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( return rcr.response.data; } +BSONObj ReplicaSetNodeProcessInterface::_buildCommandObject( + OperationContext* opCtx, const BatchedCommandRequest& command) const { + BSONObjBuilder requestBuilder; + command.serialize(&requestBuilder); + { + OperationSessionInfo sessionInfo; + if (opCtx->getLogicalSessionId()) { + sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); + } + sessionInfo.setTxnNumber(opCtx->getTxnNumber()); + sessionInfo.serialize(&requestBuilder); + } + return requestBuilder.obj(); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index 0729ff75866..87a7e5c03f2 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -32,6 +32,7 @@ #include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" +#include "mongo/s/write_ops/batched_command_request.h" namespace mongo { @@ -98,6 +99,9 @@ private: const NamespaceString& ns, const BSONObj& cmdObj) const; + BSONObj _buildCommandObject(OperationContext* opCtx, + const BatchedCommandRequest& command) const; + executor::TaskExecutor* _executor; }; diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 087a258a3fa..24ad6c9acca 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -61,18 +61,6 @@ using write_ops::Insert; using write_ops::Update; using write_ops::UpdateOpEntry; -namespace { - -// Attaches the write concern to the given batch request. If it looks like 'writeConcern' has -// been default initialized to {w: 0, wtimeout: 0} then we do not bother attaching it. -void attachWriteConcern(BatchedCommandRequest* request, const WriteConcernOptions& writeConcern) { - if (!writeConcern.wMode.empty() || writeConcern.wNumNodes > 0) { - request->setWriteConcern(writeConcern.toBSON()); - } -} - -} // namespace - bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); Lock::CollectionLock collLock(opCtx, nss, MODE_IS); @@ -127,7 +115,7 @@ Status ShardServerProcessInterface::insert(const boost::intrusive_ptrbypassDocumentValidation)); // If applicable, attach a write concern to the batched command request. - attachWriteConcern(&insertCommand, wc); + CommonProcessInterface::attachWriteConcern(&insertCommand, wc); ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response, targetEpoch); @@ -148,7 +136,7 @@ StatusWith ShardServerProcessInterface::upd BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); // If applicable, attach a write concern to the batched command request. - attachWriteConcern(&updateCommand, wc); + CommonProcessInterface::attachWriteConcern(&updateCommand, wc); ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response, targetEpoch); -- cgit v1.2.1