diff options
Diffstat (limited to 'src/mongo/db/pipeline')
5 files changed, 57 insertions, 36 deletions
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index ae19e6a7635..b8a76b0b915 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -200,4 +200,11 @@ std::string CommonProcessInterface::getHostAndPort(OperationContext* opCtx) cons return getHostNameCachedAndPort(); } +void CommonProcessInterface::attachWriteConcern(BatchedCommandRequest* request, + const WriteConcernOptions& writeConcern) { + if (!writeConcern.usedDefault) { + request->setWriteConcern(writeConcern.toBSON()); + } +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h index 7f68f8d201f..ca83c99a0e9 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/pipeline/process_interface/mongo_process_interface.h" +#include "mongo/s/write_ops/batched_command_request.h" namespace mongo { @@ -52,6 +53,13 @@ public: static bool keyPatternNamesExactPaths(const BSONObj& keyPattern, const std::set<FieldPath>& uniqueKeyPaths); + /** + * Attaches the write concern to the given batch request. If 'writeConcern' has been default + * initialized to {w: 0, wtimeout: 0} then we do not bother attaching it. + */ + static void attachWriteConcern(BatchedCommandRequest* request, + const WriteConcernOptions& writeConcern); + std::vector<BSONObj> getCurrentOps(const boost::intrusive_ptr<ExpressionContext>& expCtx, CurrentOpConnectionsMode connMode, CurrentOpSessionsMode sessionMode, diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp index bed008e597e..f7c853b2a44 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -40,6 +40,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/future.h" namespace mongo { @@ -69,16 +70,13 @@ Status ReplicaSetNodeProcessInterface::insert(const boost::intrusive_ptr<Express std::vector<BSONObj>&& objs, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) { - auto writeResults = performInserts( - expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + auto&& opCtx = expCtx->opCtx; + BatchedCommandRequest insertCommand( + buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + CommonProcessInterface::attachWriteConcern(&insertCommand, wc); - // Need to check each result in the batch since the writes are unordered. - for (const auto& result : writeResults.results) { - if (result.getStatus() != Status::OK()) { - return result.getStatus(); - } - } - return Status::OK(); + return _executeCommandOnPrimary(opCtx, ns, _buildCommandObject(opCtx, insertCommand)) + .getStatus(); } StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::update( @@ -89,20 +87,21 @@ StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface:: UpsertType upsert, bool multi, boost::optional<OID> targetEpoch) { - auto writeResults = - performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); - - // Need to check each result in the batch since the writes are unordered. - UpdateResult updateResult; - for (const auto& result : writeResults.results) { - if (result.getStatus() != Status::OK()) { - return result.getStatus(); - } - updateResult.nMatched += result.getValue().getN(); - updateResult.nModified += result.getValue().getNModified(); + auto&& opCtx = expCtx->opCtx; + BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); + CommonProcessInterface::attachWriteConcern(&updateCommand, wc); + + auto result = _executeCommandOnPrimary(opCtx, ns, _buildCommandObject(opCtx, updateCommand)); + if (!result.isOK()) { + return result.getStatus(); } - return updateResult; + + std::string errMsg; + BatchedCommandResponse response; + uassert(31450, errMsg, response.parseBSON(result.getValue(), &errMsg)); + + return UpdateResult{response.getN(), response.getNModified()}; } std::list<BSONObj> ReplicaSetNodeProcessInterface::getIndexSpecs(OperationContext* opCtx, @@ -192,7 +191,7 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( std::move(promise)); auto scheduleResult = _executor->scheduleRemoteCommand( std::move(request), [promisePtr](const auto& args) { promisePtr->emplaceValue(args); }); - if (!scheduleResult.getStatus().isOK()) { + if (!scheduleResult.isOK()) { // Since the command failed to be scheduled, the callback above did not and will not run. // Thus, it is safe to fulfill the promise here without worrying about synchronizing access // with the executor's thread. @@ -227,4 +226,19 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( return rcr.response.data; } +BSONObj ReplicaSetNodeProcessInterface::_buildCommandObject( + OperationContext* opCtx, const BatchedCommandRequest& command) const { + BSONObjBuilder requestBuilder; + command.serialize(&requestBuilder); + { + OperationSessionInfo sessionInfo; + if (opCtx->getLogicalSessionId()) { + sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); + } + sessionInfo.setTxnNumber(opCtx->getTxnNumber()); + sessionInfo.serialize(&requestBuilder); + } + return requestBuilder.obj(); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index 0729ff75866..87a7e5c03f2 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -32,6 +32,7 @@ #include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" +#include "mongo/s/write_ops/batched_command_request.h" namespace mongo { @@ -98,6 +99,9 @@ private: const NamespaceString& ns, const BSONObj& cmdObj) const; + BSONObj _buildCommandObject(OperationContext* opCtx, + const BatchedCommandRequest& command) const; + executor::TaskExecutor* _executor; }; diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 087a258a3fa..24ad6c9acca 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -61,18 +61,6 @@ using write_ops::Insert; using write_ops::Update; using write_ops::UpdateOpEntry; -namespace { - -// Attaches the write concern to the given batch request. If it looks like 'writeConcern' has -// been default initialized to {w: 0, wtimeout: 0} then we do not bother attaching it. -void attachWriteConcern(BatchedCommandRequest* request, const WriteConcernOptions& writeConcern) { - if (!writeConcern.wMode.empty() || writeConcern.wNumNodes > 0) { - request->setWriteConcern(writeConcern.toBSON()); - } -} - -} // namespace - bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); Lock::CollectionLock collLock(opCtx, nss, MODE_IS); @@ -127,7 +115,7 @@ Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<Expression buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); // If applicable, attach a write concern to the batched command request. - attachWriteConcern(&insertCommand, wc); + CommonProcessInterface::attachWriteConcern(&insertCommand, wc); ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response, targetEpoch); @@ -148,7 +136,7 @@ StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::upd BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); // If applicable, attach a write concern to the batched command request. - attachWriteConcern(&updateCommand, wc); + CommonProcessInterface::attachWriteConcern(&updateCommand, wc); ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response, targetEpoch); |