diff options
author | Gregory Noma <gregory.noma@gmail.com> | 2020-02-13 16:38:33 -0500 |
---|---|---|
committer | Gregory Noma <gregory.noma@gmail.com> | 2020-02-18 17:13:19 -0500 |
commit | a68508f9922ad163ff98f8fa13953b1efe9d57d0 (patch) | |
tree | 1ead40c94d7da611cbb6cd36a342cf76ffe2e452 /src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp | |
parent | 11640b2138d40777a4b45005628b4facfba7e6b2 (diff) | |
download | mongo-46137.tar.gz |
SERVER-46137 Implement ReplicaSetNodeProcessInterface methods needed for $merge46137
Diffstat (limited to 'src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp')
-rw-r--r-- | src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp | 58 |
1 files changed, 36 insertions, 22 deletions
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp index bed008e597e..f7c853b2a44 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -40,6 +40,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/future.h" namespace mongo { @@ -69,16 +70,13 @@ Status ReplicaSetNodeProcessInterface::insert(const boost::intrusive_ptr<Express std::vector<BSONObj>&& objs, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) { - auto writeResults = performInserts( - expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + auto&& opCtx = expCtx->opCtx; + BatchedCommandRequest insertCommand( + buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + CommonProcessInterface::attachWriteConcern(&insertCommand, wc); - // Need to check each result in the batch since the writes are unordered. - for (const auto& result : writeResults.results) { - if (result.getStatus() != Status::OK()) { - return result.getStatus(); - } - } - return Status::OK(); + return _executeCommandOnPrimary(opCtx, ns, _buildCommandObject(opCtx, insertCommand)) + .getStatus(); } StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::update( @@ -89,20 +87,21 @@ StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface:: UpsertType upsert, bool multi, boost::optional<OID> targetEpoch) { - auto writeResults = - performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); - - // Need to check each result in the batch since the writes are unordered. - UpdateResult updateResult; - for (const auto& result : writeResults.results) { - if (result.getStatus() != Status::OK()) { - return result.getStatus(); - } - updateResult.nMatched += result.getValue().getN(); - updateResult.nModified += result.getValue().getNModified(); + auto&& opCtx = expCtx->opCtx; + BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); + CommonProcessInterface::attachWriteConcern(&updateCommand, wc); + + auto result = _executeCommandOnPrimary(opCtx, ns, _buildCommandObject(opCtx, updateCommand)); + if (!result.isOK()) { + return result.getStatus(); } - return updateResult; + + std::string errMsg; + BatchedCommandResponse response; + uassert(31450, errMsg, response.parseBSON(result.getValue(), &errMsg)); + + return UpdateResult{response.getN(), response.getNModified()}; } std::list<BSONObj> ReplicaSetNodeProcessInterface::getIndexSpecs(OperationContext* opCtx, @@ -192,7 +191,7 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( std::move(promise)); auto scheduleResult = _executor->scheduleRemoteCommand( std::move(request), [promisePtr](const auto& args) { promisePtr->emplaceValue(args); }); - if (!scheduleResult.getStatus().isOK()) { + if (!scheduleResult.isOK()) { // Since the command failed to be scheduled, the callback above did not and will not run. // Thus, it is safe to fulfill the promise here without worrying about synchronizing access // with the executor's thread. @@ -227,4 +226,19 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( return rcr.response.data; } +BSONObj ReplicaSetNodeProcessInterface::_buildCommandObject( + OperationContext* opCtx, const BatchedCommandRequest& command) const { + BSONObjBuilder requestBuilder; + command.serialize(&requestBuilder); + { + OperationSessionInfo sessionInfo; + if (opCtx->getLogicalSessionId()) { + sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); + } + sessionInfo.setTxnNumber(opCtx->getTxnNumber()); + sessionInfo.serialize(&requestBuilder); + } + return requestBuilder.obj(); +} + } // namespace mongo |