summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2021-02-07 17:50:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-07 19:23:55 +0000
commit4b88a9534a31fc78c5a3a388999e1f8469a1786f (patch)
tree24ffb18959bc68734ebffd15544fc250e4d3e1bf
parent96a417cbdf1d064e1361ffb1c11b645365dd06e5 (diff)
downloadmongo-4b88a9534a31fc78c5a3a388999e1f8469a1786f.tar.gz
SERVER-51623 Convert insert, update and delete commands output to IDL
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp401
-rw-r--r--src/mongo/db/ops/SConscript1
-rw-r--r--src/mongo/db/ops/write_ops.idl74
-rw-r--r--src/mongo/db/ops/write_ops_parsers.cpp20
-rw-r--r--src/mongo/db/ops/write_ops_parsers.h13
-rw-r--r--src/mongo/db/ops/write_ops_parsers_test.cpp79
6 files changed, 410 insertions, 178 deletions
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index e666f0daf70..e746a09f803 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -235,14 +235,6 @@ BSONArray makeTimeseriesInsertDocument(const BucketCatalog::BucketId& bucketId,
return builder.arr();
}
-void appendOpTime(const repl::OpTime& opTime, BSONObjBuilder* out) {
- if (opTime.getTerm() == repl::OpTime::kUninitializedTerm) {
- out->append("opTime", opTime.getTimestamp());
- } else {
- opTime.append(out, "opTime");
- }
-}
-
/**
* Returns true if the time-series write is retryable.
*/
@@ -258,50 +250,6 @@ bool isTimeseriesWriteRetryable(OperationContext* opCtx) {
return true;
}
-/**
- * Returns true if the retryable time-series write has been executed.
- */
-bool isRetryableTimeseriesWriteExecuted(OperationContext* opCtx,
- const write_ops::Insert& insert,
- BSONObjBuilder* result) {
-
- if (!isTimeseriesWriteRetryable(opCtx)) {
- return false;
- }
-
- if (insert.getDocuments().empty()) {
- return false;
- }
-
- auto txnParticipant = TransactionParticipant::get(opCtx);
- const auto& writeCommandBase = insert.getWriteCommandBase();
-
- uassert(ErrorCodes::OperationFailed,
- str::stream() << "Retryable time-series insert operations are limited to one document "
- "per command request",
- insert.getDocuments().size() == 1U);
-
- auto stmtId = write_ops::getStmtIdForWriteAt(writeCommandBase, 0);
- if (!txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId)) {
- return false;
- }
-
- // This retryable write has been executed previously. Fill in command result before returning.
- result->appendNumber("n", 1);
-
- auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext());
- if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) {
- appendOpTime(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), result);
- result->append("electionId", replCoord->getElectionId());
- }
-
- auto retryStats = RetryableWritesStats::get(opCtx);
- retryStats->incrementRetriedStatementsCount();
- retryStats->incrementRetriedCommandsCount();
-
- return true;
-}
-
bool checkFailTimeseriesInsertFailPoint(const BSONObj& metadata) {
bool shouldFailInsert = false;
failTimeseriesInsert.executeIf(
@@ -390,87 +338,7 @@ boost::optional<BSONObj> generateError(OperationContext* opCtx,
return error.obj();
}
-enum class ReplyStyle { kUpdate, kNotUpdate }; // update has extra fields.
-void serializeReply(OperationContext* opCtx,
- ReplyStyle replyStyle,
- bool continueOnError,
- size_t opsInBatch,
- write_ops_exec::WriteResult result,
- BSONObjBuilder* out) {
- if (shouldSkipOutput(opCtx))
- return;
-
- if (continueOnError) {
- invariant(!result.results.empty());
- const auto& lastResult = result.results.back();
-
- if (lastResult == ErrorCodes::StaleDbVersion ||
- ErrorCodes::isStaleShardVersionError(lastResult.getStatus()) ||
- ErrorCodes::isTenantMigrationError(lastResult.getStatus())) {
- // For ordered:false commands we need to duplicate these error results for all ops after
- // we stopped. See handleError() in write_ops_exec.cpp for more info.
- //
- // Omit the reason from the duplicate unordered responses so it doesn't consume BSON
- // object space
- result.results.resize(opsInBatch, lastResult.getStatus().withReason(""));
- }
- }
-
- long long nVal = 0;
- long long nModified = 0;
- std::vector<BSONObj> upsertInfo;
- std::vector<BSONObj> errors;
- BSONSizeTracker upsertInfoSizeTracker;
-
- for (size_t i = 0; i < result.results.size(); i++) {
- if (auto error = generateError(opCtx, result.results[i], i, errors.size())) {
- errors.push_back(*error);
- continue;
- }
-
- const auto& opResult = result.results[i].getValue();
- nVal += opResult.getN(); // Always there.
- if (replyStyle == ReplyStyle::kUpdate) {
- nModified += opResult.getNModified();
- if (auto idElement = opResult.getUpsertedId().firstElement()) {
- BSONObjBuilder upsertedId(upsertInfoSizeTracker);
- upsertedId.append("index", int(i));
- upsertedId.appendAs(idElement, "_id");
- upsertInfo.push_back(upsertedId.obj());
- }
- }
- }
-
- out->appendNumber("n", nVal);
-
- if (replyStyle == ReplyStyle::kUpdate) {
- out->appendNumber("nModified", nModified);
- if (!upsertInfo.empty()) {
- out->append("upserted", upsertInfo);
- }
- }
-
- if (!errors.empty()) {
- out->append("writeErrors", errors);
- }
-
- // writeConcernError field is handled by command processor.
-
- {
- // Undocumented repl fields that mongos depends on.
- auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext());
- const auto replMode = replCoord->getReplicationMode();
- if (replMode != repl::ReplicationCoordinator::modeNone) {
- appendOpTime(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), out);
-
- if (replMode == repl::ReplicationCoordinator::modeReplSet) {
- out->append("electionId", replCoord->getElectionId());
- }
- }
- }
-}
-
-template <typename DerivedT>
+template <typename CommandReplyType>
class WriteCommand : public Command {
public:
explicit WriteCommand(StringData name) : Command(name) {}
@@ -496,17 +364,147 @@ private:
}
};
-template <typename DerivedT>
-class WriteCommand<DerivedT>::InvocationBase : public CommandInvocation {
+template <typename CommandReplyType>
+class WriteCommand<CommandReplyType>::InvocationBase : public CommandInvocation {
public:
- using DerivedInvocation = typename DerivedT::Invocation;
InvocationBase(const WriteCommand* writeCommand, const OpMsgRequest& request)
: CommandInvocation(writeCommand), _request(&request) {}
- bool getBypass() const {
- return checked_cast<const DerivedInvocation*>(this)
- ->request()
- .getBypassDocumentValidation();
+protected:
+ /**
+ * Contains hooks that are used by 'populateReply' method.
+ */
+ struct PopulateReplyHooks {
+ // Called for each 'SingleWriteResult' processed by 'populateReply' method.
+ std::function<void(const SingleWriteResult&, int)> singleWriteResultHandler;
+
+ // Called after all 'SingleWriteResult' processing is completed by 'populateReply' method.
+ // This is called as the last method.
+ std::function<void()> postProcessHandler;
+ };
+
+ /**
+ * Method to populate a write command reply message. It takes 'result' parameter as an input
+ * source and populate the fields of 'cmdReply'.
+ */
+ void populateReply(OperationContext* opCtx,
+ bool continueOnError,
+ size_t opsInBatch,
+ write_ops_exec::WriteResult result,
+ CommandReplyType* cmdReply,
+ boost::optional<PopulateReplyHooks> hooks = boost::none) {
+
+ invariant(cmdReply);
+
+ if (shouldSkipOutput(opCtx))
+ return;
+
+ if (continueOnError) {
+ invariant(!result.results.empty());
+ const auto& lastResult = result.results.back();
+
+ if (lastResult == ErrorCodes::StaleDbVersion ||
+ ErrorCodes::isStaleShardVersionError(lastResult.getStatus()) ||
+ ErrorCodes::isTenantMigrationError(lastResult.getStatus())) {
+ // For ordered:false commands we need to duplicate these error results for all ops
+ // after we stopped. See handleError() in write_ops_exec.cpp for more info.
+ //
+ // Omit the reason from the duplicate unordered responses so it doesn't consume BSON
+ // object space
+ result.results.resize(opsInBatch, lastResult.getStatus().withReason(""));
+ }
+ }
+
+ long long nVal = 0;
+ std::vector<BSONObj> errors;
+
+ for (size_t i = 0; i < result.results.size(); i++) {
+ if (auto error = generateError(opCtx, result.results[i], i, errors.size())) {
+ errors.push_back(*error);
+ continue;
+ }
+
+ const auto& opResult = result.results[i].getValue();
+ nVal += opResult.getN(); // Always there.
+
+ // Handle custom processing of each result.
+ if (hooks && hooks->singleWriteResultHandler)
+ hooks->singleWriteResultHandler(opResult, i);
+ }
+
+ auto& replyBase = cmdReply->getWriteReplyBase();
+ replyBase.setN(nVal);
+
+ if (!errors.empty()) {
+ replyBase.setWriteErrors(errors);
+ }
+
+ // writeConcernError field is handled by command processor.
+
+ {
+ // Undocumented repl fields that mongos depends on.
+ auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext());
+ const auto replMode = replCoord->getReplicationMode();
+ if (replMode != repl::ReplicationCoordinator::modeNone) {
+ replyBase.setOpTime(
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp());
+
+ if (replMode == repl::ReplicationCoordinator::modeReplSet) {
+ replyBase.setElectionId(replCoord->getElectionId());
+ }
+ }
+ }
+
+ // Call the called-defined post processing handler.
+ if (hooks && hooks->postProcessHandler)
+ hooks->postProcessHandler();
+ }
+
+ /**
+ * Returns true if the retryable time-series write has been executed.
+ */
+ bool isRetryableTimeseriesWriteExecuted(OperationContext* opCtx,
+ const write_ops::Insert& insert,
+ CommandReplyType* reply) const {
+ if (!isTimeseriesWriteRetryable(opCtx)) {
+ return false;
+ }
+
+ if (insert.getDocuments().empty()) {
+ return false;
+ }
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ const auto& writeCommandBase = insert.getWriteCommandBase();
+
+ uassert(
+ ErrorCodes::OperationFailed,
+ str::stream() << "Retryable time-series insert operations are limited to one document "
+ "per command request",
+ insert.getDocuments().size() == 1U);
+
+ auto stmtId = write_ops::getStmtIdForWriteAt(writeCommandBase, 0);
+ if (!txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId)) {
+ return false;
+ }
+
+ auto& baseReply = reply->getWriteReplyBase();
+
+ // This retryable write has been executed previously. Fill in command reply before
+ // returning.
+ baseReply.setN(1);
+
+ auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext());
+ if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) {
+ baseReply.setOpTime(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp());
+ baseReply.setElectionId(replCoord->getElectionId());
+ }
+
+ auto retryStats = RetryableWritesStats::get(opCtx);
+ retryStats->incrementRetriedStatementsCount();
+ retryStats->incrementRetriedCommandsCount();
+
+ return true;
}
private:
@@ -514,13 +512,14 @@ private:
virtual void doCheckAuthorizationImpl(AuthorizationSession* authzSession) const = 0;
// Customization point for 'run'.
- virtual void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const = 0;
+ virtual CommandReplyType runImpl(OperationContext* opCtx) = 0;
void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) final {
try {
_transactionChecks(opCtx);
BSONObjBuilder bob = result->getBodyBuilder();
- runImpl(opCtx, bob);
+ CommandReplyType cmdReply = runImpl(opCtx);
+ cmdReply.serialize(&bob);
CommandHelpers::extractOrAppendOk(bob);
} catch (const DBException& ex) {
LastError::get(opCtx->getClient()).setLastError(ex.code(), ex.reason());
@@ -558,7 +557,7 @@ private:
const OpMsgRequest* _request;
};
-class CmdInsert final : public WriteCommand<CmdInsert> {
+class CmdInsert final : public WriteCommand<write_ops::InsertReply> {
public:
CmdInsert() : WriteCommand("insert") {}
@@ -575,6 +574,11 @@ public:
return _batch;
}
+ bool getBypass() const {
+ return request().getBypassDocumentValidation();
+ }
+
+
private:
NamespaceString ns() const override {
return _batch.getNamespace();
@@ -809,8 +813,10 @@ public:
<< redact(_batch.toBSON({})));
}
- void _performTimeseriesWrites(OperationContext* opCtx, BSONObjBuilder* result) const {
- if (isRetryableTimeseriesWriteExecuted(opCtx, _batch, result)) {
+ void _performTimeseriesWrites(OperationContext* opCtx,
+ write_ops::InsertReply* insertReply) const {
+
+ if (isRetryableTimeseriesWriteExecuted(opCtx, _batch, insertReply)) {
return;
}
@@ -818,50 +824,57 @@ public:
boost::optional<repl::OpTime> opTime;
boost::optional<OID> electionId;
+ auto& baseReply = insertReply->getWriteReplyBase();
+
if (_batch.getOrdered()) {
for (size_t i = 0; i < _batch.getDocuments().size(); ++i) {
_performTimeseriesWritesSubset(opCtx, i, 1, &errors, &opTime, &electionId);
if (!errors.empty()) {
- result->appendNumber("n", i);
+ baseReply.setN(i);
break;
}
}
} else {
_performTimeseriesWritesSubset(
opCtx, 0, _batch.getDocuments().size(), &errors, &opTime, &electionId);
- result->appendNumber("n", _batch.getDocuments().size() - errors.size());
+ baseReply.setN(_batch.getDocuments().size() - errors.size());
}
if (!errors.empty()) {
- result->append("writeErrors", errors);
+ baseReply.setWriteErrors(errors);
}
if (opTime) {
- appendOpTime(*opTime, result);
+ baseReply.setOpTime(*opTime);
}
if (electionId) {
- result->append("electionId", *electionId);
+ baseReply.setElectionId(*electionId);
}
}
- void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override {
+ write_ops::InsertReply runImpl(OperationContext* opCtx) override {
+ write_ops::InsertReply insertReply;
+
if (isTimeseries(opCtx, ns())) {
// Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's
// constructor.
try {
- _performTimeseriesWrites(opCtx, &result);
+ _performTimeseriesWrites(opCtx, &insertReply);
} catch (DBException& ex) {
ex.addContext(str::stream() << "time-series insert failed: " << ns().ns());
throw;
}
- return;
+
+ return insertReply;
}
auto reply = write_ops_exec::performInserts(opCtx, _batch);
- serializeReply(opCtx,
- ReplyStyle::kNotUpdate,
- !_batch.getWriteCommandBase().getOrdered(),
- _batch.getDocuments().size(),
- std::move(reply),
- &result);
+
+ populateReply(opCtx,
+ !_batch.getWriteCommandBase().getOrdered(),
+ _batch.getDocuments().size(),
+ std::move(reply),
+ &insertReply);
+
+ return insertReply;
}
write_ops::Insert _batch;
@@ -881,7 +894,7 @@ public:
}
} cmdInsert;
-class CmdUpdate final : public WriteCommand<CmdUpdate> {
+class CmdUpdate final : public WriteCommand<write_ops::UpdateReply> {
public:
CmdUpdate() : WriteCommand("update"), _updateMetrics{"update"} {}
@@ -917,6 +930,10 @@ public:
return _batch;
}
+ bool getBypass() const {
+ return request().getBypassDocumentValidation();
+ }
+
bool supportsReadMirroring() const override {
return true;
}
@@ -955,14 +972,38 @@ public:
auth::checkAuthForUpdateCommand(authzSession, getBypass(), _batch);
}
- void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override {
+ write_ops::UpdateReply runImpl(OperationContext* opCtx) override {
+ write_ops::UpdateReply updateReply;
+ long long nModified = 0;
+
+ // Tracks the upserted information. The memory of this variable gets moved in the
+ // 'postProcessHandler' and should not be accessed afterwards.
+ std::vector<write_ops::Upserted> upsertedInfoVec;
+
auto reply = write_ops_exec::performUpdates(opCtx, _batch);
- serializeReply(opCtx,
- ReplyStyle::kUpdate,
- !_batch.getWriteCommandBase().getOrdered(),
- _batch.getUpdates().size(),
- std::move(reply),
- &result);
+
+ // Handler to process each 'SingleWriteResult'.
+ auto singleWriteHandler = [&](const SingleWriteResult& opResult, int index) {
+ nModified += opResult.getNModified();
+ BSONSizeTracker upsertInfoSizeTracker;
+
+ if (auto idElement = opResult.getUpsertedId().firstElement())
+ upsertedInfoVec.emplace_back(write_ops::Upserted(index, idElement));
+ };
+
+ // Handler to do the post-processing.
+ auto postProcessHandler = [&]() {
+ updateReply.setNModified(nModified);
+ if (!upsertedInfoVec.empty())
+ updateReply.setUpserted(std::move(upsertedInfoVec));
+ };
+
+ populateReply(opCtx,
+ !_batch.getWriteCommandBase().getOrdered(),
+ _batch.getUpdates().size(),
+ std::move(reply),
+ &updateReply,
+ PopulateReplyHooks{singleWriteHandler, postProcessHandler});
// Collect metrics.
for (auto&& update : _batch.getUpdates()) {
@@ -981,6 +1022,8 @@ public:
_updateMetrics->incrementExecutedWithArrayFilters();
}
}
+
+ return updateReply;
}
void explain(OperationContext* opCtx,
@@ -1047,7 +1090,7 @@ public:
UpdateMetrics _updateMetrics;
} cmdUpdate;
-class CmdDelete final : public WriteCommand<CmdDelete> {
+class CmdDelete final : public WriteCommand<write_ops::DeleteReply> {
public:
CmdDelete() : WriteCommand("delete") {}
@@ -1066,6 +1109,11 @@ public:
return _batch;
}
+ bool getBypass() const {
+ return request().getBypassDocumentValidation();
+ }
+
+
private:
NamespaceString ns() const override {
return _batch.getNamespace();
@@ -1075,14 +1123,17 @@ public:
auth::checkAuthForDeleteCommand(authzSession, getBypass(), _batch);
}
- void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override {
+ write_ops::DeleteReply runImpl(OperationContext* opCtx) override {
+ write_ops::DeleteReply deleteReply;
+
auto reply = write_ops_exec::performDeletes(opCtx, _batch);
- serializeReply(opCtx,
- ReplyStyle::kNotUpdate,
- !_batch.getWriteCommandBase().getOrdered(),
- _batch.getDeletes().size(),
- std::move(reply),
- &result);
+ populateReply(opCtx,
+ !_batch.getWriteCommandBase().getOrdered(),
+ _batch.getDeletes().size(),
+ std::move(reply),
+ &deleteReply);
+
+ return deleteReply;
}
void explain(OperationContext* opCtx,
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript
index 78a7a1e0e47..18960b1c810 100644
--- a/src/mongo/db/ops/SConscript
+++ b/src/mongo/db/ops/SConscript
@@ -38,6 +38,7 @@ env.Library(
'$BUILD_DIR/mongo/db/dbmessage',
'$BUILD_DIR/mongo/db/pipeline/runtime_constants_idl',
'$BUILD_DIR/mongo/db/query/hint_parser',
+ '$BUILD_DIR/mongo/db/repl/optime',
'$BUILD_DIR/mongo/idl/idl_parser',
'$BUILD_DIR/mongo/rpc/command_status',
],
diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl
index c3b0eed8032..a79e3ca58f3 100644
--- a/src/mongo/db/ops/write_ops.idl
+++ b/src/mongo/db/ops/write_ops.idl
@@ -30,6 +30,7 @@ global:
cpp_namespace: "mongo::write_ops"
cpp_includes:
- "mongo/db/ops/write_ops_parsers.h"
+ - "mongo/db/repl/optime.h"
imports:
- "mongo/db/logical_session_id.idl"
@@ -55,8 +56,72 @@ types:
serializer: "mongo::write_ops::UpdateModification::serializeToBSON"
deserializer: "mongo::write_ops::UpdateModification::parseFromBSON"
+ write_cmd_optime:
+ bson_serialization_type: any
+ description: "Holds the operation-time for write commands."
+ cpp_type: "mongo::repl::OpTime"
+ serializer: "::mongo::write_ops::opTimeSerializerWithTermCheck"
+ deserializer: "::mongo::write_ops::opTimeParser"
+
structs:
+ WriteReplyBase:
+ description: "Contains fields that is common in all the write commands reply."
+ fields:
+ n:
+ description: "For insert: number of documents inserted.
+ For update: number of documents that matched the query predicate.
+ For delete: number of documents deleted."
+ type: int
+ default: 0
+ electionId:
+ description: "Replication coordinator election id."
+ type: objectid
+ optional: true
+ opTime:
+ description: "Operation time for the command."
+ type: write_cmd_optime
+ optional: true
+ writeErrors:
+ description: "Contains all the errors encoutered."
+ type: array<object>
+ optional: true
+
+ InsertReply:
+ description: "Contains information related to insert command reply."
+ chained_structs:
+ WriteReplyBase: writeReplyBase
+
+ Upserted:
+ description: "Contains documents that have been upserted."
+ fields:
+ index:
+ description: "Index of the document."
+ type: int
+ _id:
+ description: "ID of the document."
+ type: IDLAnyTypeOwned
+
+ UpdateReply:
+ description: "Contains information related to update command reply."
+ fields:
+ upserted:
+ description: "An array contains information about upserted documents."
+ type: array<Upserted>
+ optional: true
+ nModified:
+ description: "Number of updated documents."
+ type: int
+ default: 0
+
+ chained_structs:
+ WriteReplyBase: writeReplyBase
+
+ DeleteReply:
+ description: "Contains information related to delete command reply."
+ chained_structs:
+ WriteReplyBase: writeReplyBase
+
WriteCommandBase:
description: "Contains basic information included by all write commands"
strict: false
@@ -184,7 +249,8 @@ commands:
command_name: insert
strict: true
namespace: concatenate_with_db
- api_version: ""
+ api_version: "1"
+ reply_type: InsertReply
chained_structs:
WriteCommandBase: writeCommandBase
fields:
@@ -198,7 +264,8 @@ commands:
command_name: update
strict: true
namespace: concatenate_with_db
- api_version: ""
+ api_version: "1"
+ reply_type: UpdateReply
chained_structs:
WriteCommandBase: writeCommandBase
fields:
@@ -223,7 +290,8 @@ commands:
command_name: delete
strict: true
namespace: concatenate_with_db
- api_version: ""
+ api_version: "1"
+ reply_type: DeleteReply
chained_structs:
WriteCommandBase: writeCommandBase
fields:
diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp
index 3dafb9dc4df..8642ecdf82f 100644
--- a/src/mongo/db/ops/write_ops_parsers.cpp
+++ b/src/mongo/db/ops/write_ops_parsers.cpp
@@ -100,6 +100,26 @@ void writeMultiDeleteProperty(bool isMulti, StringData fieldName, BSONObjBuilder
builder->append(fieldName, isMulti ? 0 : 1);
}
+void opTimeSerializerWithTermCheck(repl::OpTime opTime, StringData fieldName, BSONObjBuilder* bob) {
+ if (opTime.getTerm() == repl::OpTime::kUninitializedTerm) {
+ bob->append(fieldName, opTime.getTimestamp());
+ } else {
+ opTime.append(bob, fieldName.toString());
+ }
+}
+
+repl::OpTime opTimeParser(BSONElement elem) {
+ if (elem.type() == BSONType::Object) {
+ return repl::OpTime::parse(elem.Obj());
+ } else if (elem.type() == BSONType::bsonTimestamp) {
+ return repl::OpTime(elem.timestamp(), repl::OpTime::kUninitializedTerm);
+ }
+
+ uasserted(ErrorCodes::TypeMismatch,
+ str::stream() << "Expected BSON type " << BSONType::Object << " or "
+ << BSONType::bsonTimestamp << ", but found " << elem.type());
+}
+
int32_t getStmtIdForWriteAt(const WriteCommandBase& writeCommandBase, size_t writePos) {
const auto& stmtIds = writeCommandBase.getStmtIds();
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h
index 8de2835d193..20b9f543bc7 100644
--- a/src/mongo/db/ops/write_ops_parsers.h
+++ b/src/mongo/db/ops/write_ops_parsers.h
@@ -33,6 +33,7 @@
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/update/document_diff_serialization.h"
#include "mongo/stdx/variant.h"
#include "mongo/util/visit_helper.h"
@@ -58,6 +59,18 @@ bool readMultiDeleteProperty(const BSONElement& limitElement);
*/
void writeMultiDeleteProperty(bool isMulti, StringData fieldName, BSONObjBuilder* builder);
+/**
+ * Serializes the opTime fields to specified BSON builder. A 'term' field will be included only
+ * when it is intialized.
+ */
+void opTimeSerializerWithTermCheck(repl::OpTime opTime, StringData fieldName, BSONObjBuilder* bob);
+
+/**
+ * Method to deserialize the specified BSON element to opTime. This method is used by the IDL
+ * parser to generate the deserializer code.
+ */
+repl::OpTime opTimeParser(BSONElement elem);
+
class UpdateModification {
public:
enum class Type { kClassic, kPipeline, kDelta };
diff --git a/src/mongo/db/ops/write_ops_parsers_test.cpp b/src/mongo/db/ops/write_ops_parsers_test.cpp
index b5074350ef4..7dd244071d7 100644
--- a/src/mongo/db/ops/write_ops_parsers_test.cpp
+++ b/src/mongo/db/ops/write_ops_parsers_test.cpp
@@ -514,5 +514,84 @@ TEST(LegacyWriteOpsParsers, Remove) {
}
}
+/**
+ * Test OpTime serializer and deserializer when OpTime does not have term initailized.
+ */
+TEST(OpTimeSerdes, OpTimeWithoutTerm) {
+ const auto fieldName = "opTime";
+ repl::OpTime opTime(Timestamp(10, 20), repl::OpTime::kUninitializedTerm);
+ BSONObjBuilder bob;
+
+ write_ops::opTimeSerializerWithTermCheck(opTime, "opTime", &bob);
+
+ auto bsonObj = bob.done();
+ auto bsonElem = bsonObj[fieldName];
+
+ ASSERT_FALSE(bsonElem.eoo());
+
+ repl::OpTime retOpTime = write_ops::opTimeParser(bsonElem);
+
+ ASSERT_EQ(opTime.getTimestamp(), retOpTime.getTimestamp());
+ ASSERT_EQ(opTime.getTerm(), retOpTime.getTerm());
+}
+
+/**
+ * Test OpTime serializer and deserializer when OpTime have term initailized.
+ */
+TEST(OpTimeSerdes, OpTimeWithTerm) {
+ const auto fieldName = "opTime";
+ repl::OpTime opTime(Timestamp(10, 20), 10);
+ BSONObjBuilder bob;
+
+ write_ops::opTimeSerializerWithTermCheck(opTime, "opTime", &bob);
+
+ auto bsonObj = bob.done();
+ auto bsonElem = bsonObj[fieldName];
+
+ ASSERT_FALSE(bsonElem.eoo());
+
+ repl::OpTime retOpTime = write_ops::opTimeParser(bsonElem);
+
+ ASSERT_EQ(opTime.getTimestamp(), retOpTime.getTimestamp());
+ ASSERT_EQ(opTime.getTerm(), retOpTime.getTerm());
+}
+
+/**
+ * Test OpTime deserializer by directly passing Timestamp to the OpTime deserializer.
+ */
+TEST(OpTimeSerdes, DeserializeWithTimestamp) {
+ const auto fieldName = "opTime";
+ Timestamp timestamp(10, 20);
+ BSONObjBuilder bob;
+
+ bob.append(fieldName, timestamp);
+
+ auto bsonObj = bob.done();
+ auto bsonElem = bsonObj[fieldName];
+
+ ASSERT_FALSE(bsonElem.eoo());
+
+ repl::OpTime retOpTime = write_ops::opTimeParser(bsonElem);
+
+ ASSERT_EQ(timestamp, retOpTime.getTimestamp());
+ ASSERT_EQ(repl::OpTime::kUninitializedTerm, retOpTime.getTerm());
+}
+
+/**
+ * Test OpTime deserializer by passing invalid BSON type.
+ */
+TEST(OpTimeSerdes, DeserializeWithInvalidBSONType) {
+ const auto fieldName = "opTime";
+ BSONObjBuilder bob;
+
+ bob.append(fieldName, 100);
+
+ auto bsonObj = bob.done();
+ auto bsonElem = bsonObj[fieldName];
+
+ ASSERT_FALSE(bsonElem.eoo());
+ ASSERT_THROWS_CODE(write_ops::opTimeParser(bsonElem), DBException, ErrorCodes::TypeMismatch);
+}
+
} // namespace
} // namespace mongo