summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp')
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp58
1 files changed, 36 insertions, 22 deletions
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
index bed008e597e..f7c853b2a44 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/index_builds_coordinator.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/future.h"
namespace mongo {
@@ -69,16 +70,13 @@ Status ReplicaSetNodeProcessInterface::insert(const boost::intrusive_ptr<Express
std::vector<BSONObj>&& objs,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) {
- auto writeResults = performInserts(
- expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+ auto&& opCtx = expCtx->opCtx;
+ BatchedCommandRequest insertCommand(
+ buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+ CommonProcessInterface::attachWriteConcern(&insertCommand, wc);
- // Need to check each result in the batch since the writes are unordered.
- for (const auto& result : writeResults.results) {
- if (result.getStatus() != Status::OK()) {
- return result.getStatus();
- }
- }
- return Status::OK();
+ return _executeCommandOnPrimary(opCtx, ns, _buildCommandObject(opCtx, insertCommand))
+ .getStatus();
}
StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::update(
@@ -89,20 +87,21 @@ StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::
UpsertType upsert,
bool multi,
boost::optional<OID> targetEpoch) {
- auto writeResults =
- performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
-
- // Need to check each result in the batch since the writes are unordered.
- UpdateResult updateResult;
- for (const auto& result : writeResults.results) {
- if (result.getStatus() != Status::OK()) {
- return result.getStatus();
- }
- updateResult.nMatched += result.getValue().getN();
- updateResult.nModified += result.getValue().getNModified();
+ auto&& opCtx = expCtx->opCtx;
+ BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
+ CommonProcessInterface::attachWriteConcern(&updateCommand, wc);
+
+ auto result = _executeCommandOnPrimary(opCtx, ns, _buildCommandObject(opCtx, updateCommand));
+ if (!result.isOK()) {
+ return result.getStatus();
}
- return updateResult;
+
+ std::string errMsg;
+ BatchedCommandResponse response;
+ uassert(31450, errMsg, response.parseBSON(result.getValue(), &errMsg));
+
+ return UpdateResult{response.getN(), response.getNModified()};
}
std::list<BSONObj> ReplicaSetNodeProcessInterface::getIndexSpecs(OperationContext* opCtx,
@@ -192,7 +191,7 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary(
std::move(promise));
auto scheduleResult = _executor->scheduleRemoteCommand(
std::move(request), [promisePtr](const auto& args) { promisePtr->emplaceValue(args); });
- if (!scheduleResult.getStatus().isOK()) {
+ if (!scheduleResult.isOK()) {
// Since the command failed to be scheduled, the callback above did not and will not run.
// Thus, it is safe to fulfill the promise here without worrying about synchronizing access
// with the executor's thread.
@@ -227,4 +226,19 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary(
return rcr.response.data;
}
+BSONObj ReplicaSetNodeProcessInterface::_buildCommandObject(
+ OperationContext* opCtx, const BatchedCommandRequest& command) const {
+ BSONObjBuilder requestBuilder;
+ command.serialize(&requestBuilder);
+ {
+ OperationSessionInfo sessionInfo;
+ if (opCtx->getLogicalSessionId()) {
+ sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
+ }
+ sessionInfo.setTxnNumber(opCtx->getTxnNumber());
+ sessionInfo.serialize(&requestBuilder);
+ }
+ return requestBuilder.obj();
+}
+
} // namespace mongo