From 4b88a9534a31fc78c5a3a388999e1f8469a1786f Mon Sep 17 00:00:00 2001 From: Rishab Joshi Date: Sun, 7 Feb 2021 17:50:45 +0000 Subject: SERVER-51623 Convert insert, update and delete commands output to IDL --- .../db/commands/write_commands/write_commands.cpp | 401 ++++++++++++--------- src/mongo/db/ops/SConscript | 1 + src/mongo/db/ops/write_ops.idl | 74 +++- src/mongo/db/ops/write_ops_parsers.cpp | 20 + src/mongo/db/ops/write_ops_parsers.h | 13 + src/mongo/db/ops/write_ops_parsers_test.cpp | 79 ++++ 6 files changed, 410 insertions(+), 178 deletions(-) diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index e666f0daf70..e746a09f803 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -235,14 +235,6 @@ BSONArray makeTimeseriesInsertDocument(const BucketCatalog::BucketId& bucketId, return builder.arr(); } -void appendOpTime(const repl::OpTime& opTime, BSONObjBuilder* out) { - if (opTime.getTerm() == repl::OpTime::kUninitializedTerm) { - out->append("opTime", opTime.getTimestamp()); - } else { - opTime.append(out, "opTime"); - } -} - /** * Returns true if the time-series write is retryable. */ @@ -258,50 +250,6 @@ bool isTimeseriesWriteRetryable(OperationContext* opCtx) { return true; } -/** - * Returns true if the retryable time-series write has been executed. - */ -bool isRetryableTimeseriesWriteExecuted(OperationContext* opCtx, - const write_ops::Insert& insert, - BSONObjBuilder* result) { - - if (!isTimeseriesWriteRetryable(opCtx)) { - return false; - } - - if (insert.getDocuments().empty()) { - return false; - } - - auto txnParticipant = TransactionParticipant::get(opCtx); - const auto& writeCommandBase = insert.getWriteCommandBase(); - - uassert(ErrorCodes::OperationFailed, - str::stream() << "Retryable time-series insert operations are limited to one document " - "per command request", - insert.getDocuments().size() == 1U); - - auto stmtId = write_ops::getStmtIdForWriteAt(writeCommandBase, 0); - if (!txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId)) { - return false; - } - - // This retryable write has been executed previously. Fill in command result before returning. - result->appendNumber("n", 1); - - auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext()); - if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { - appendOpTime(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), result); - result->append("electionId", replCoord->getElectionId()); - } - - auto retryStats = RetryableWritesStats::get(opCtx); - retryStats->incrementRetriedStatementsCount(); - retryStats->incrementRetriedCommandsCount(); - - return true; -} - bool checkFailTimeseriesInsertFailPoint(const BSONObj& metadata) { bool shouldFailInsert = false; failTimeseriesInsert.executeIf( @@ -390,87 +338,7 @@ boost::optional generateError(OperationContext* opCtx, return error.obj(); } -enum class ReplyStyle { kUpdate, kNotUpdate }; // update has extra fields. -void serializeReply(OperationContext* opCtx, - ReplyStyle replyStyle, - bool continueOnError, - size_t opsInBatch, - write_ops_exec::WriteResult result, - BSONObjBuilder* out) { - if (shouldSkipOutput(opCtx)) - return; - - if (continueOnError) { - invariant(!result.results.empty()); - const auto& lastResult = result.results.back(); - - if (lastResult == ErrorCodes::StaleDbVersion || - ErrorCodes::isStaleShardVersionError(lastResult.getStatus()) || - ErrorCodes::isTenantMigrationError(lastResult.getStatus())) { - // For ordered:false commands we need to duplicate these error results for all ops after - // we stopped. See handleError() in write_ops_exec.cpp for more info. - // - // Omit the reason from the duplicate unordered responses so it doesn't consume BSON - // object space - result.results.resize(opsInBatch, lastResult.getStatus().withReason("")); - } - } - - long long nVal = 0; - long long nModified = 0; - std::vector upsertInfo; - std::vector errors; - BSONSizeTracker upsertInfoSizeTracker; - - for (size_t i = 0; i < result.results.size(); i++) { - if (auto error = generateError(opCtx, result.results[i], i, errors.size())) { - errors.push_back(*error); - continue; - } - - const auto& opResult = result.results[i].getValue(); - nVal += opResult.getN(); // Always there. - if (replyStyle == ReplyStyle::kUpdate) { - nModified += opResult.getNModified(); - if (auto idElement = opResult.getUpsertedId().firstElement()) { - BSONObjBuilder upsertedId(upsertInfoSizeTracker); - upsertedId.append("index", int(i)); - upsertedId.appendAs(idElement, "_id"); - upsertInfo.push_back(upsertedId.obj()); - } - } - } - - out->appendNumber("n", nVal); - - if (replyStyle == ReplyStyle::kUpdate) { - out->appendNumber("nModified", nModified); - if (!upsertInfo.empty()) { - out->append("upserted", upsertInfo); - } - } - - if (!errors.empty()) { - out->append("writeErrors", errors); - } - - // writeConcernError field is handled by command processor. - - { - // Undocumented repl fields that mongos depends on. - auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext()); - const auto replMode = replCoord->getReplicationMode(); - if (replMode != repl::ReplicationCoordinator::modeNone) { - appendOpTime(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), out); - - if (replMode == repl::ReplicationCoordinator::modeReplSet) { - out->append("electionId", replCoord->getElectionId()); - } - } - } -} - -template +template class WriteCommand : public Command { public: explicit WriteCommand(StringData name) : Command(name) {} @@ -496,17 +364,147 @@ private: } }; -template -class WriteCommand::InvocationBase : public CommandInvocation { +template +class WriteCommand::InvocationBase : public CommandInvocation { public: - using DerivedInvocation = typename DerivedT::Invocation; InvocationBase(const WriteCommand* writeCommand, const OpMsgRequest& request) : CommandInvocation(writeCommand), _request(&request) {} - bool getBypass() const { - return checked_cast(this) - ->request() - .getBypassDocumentValidation(); +protected: + /** + * Contains hooks that are used by 'populateReply' method. + */ + struct PopulateReplyHooks { + // Called for each 'SingleWriteResult' processed by 'populateReply' method. + std::function singleWriteResultHandler; + + // Called after all 'SingleWriteResult' processing is completed by 'populateReply' method. + // This is called as the last method. + std::function postProcessHandler; + }; + + /** + * Method to populate a write command reply message. It takes 'result' parameter as an input + * source and populate the fields of 'cmdReply'. + */ + void populateReply(OperationContext* opCtx, + bool continueOnError, + size_t opsInBatch, + write_ops_exec::WriteResult result, + CommandReplyType* cmdReply, + boost::optional hooks = boost::none) { + + invariant(cmdReply); + + if (shouldSkipOutput(opCtx)) + return; + + if (continueOnError) { + invariant(!result.results.empty()); + const auto& lastResult = result.results.back(); + + if (lastResult == ErrorCodes::StaleDbVersion || + ErrorCodes::isStaleShardVersionError(lastResult.getStatus()) || + ErrorCodes::isTenantMigrationError(lastResult.getStatus())) { + // For ordered:false commands we need to duplicate these error results for all ops + // after we stopped. See handleError() in write_ops_exec.cpp for more info. + // + // Omit the reason from the duplicate unordered responses so it doesn't consume BSON + // object space + result.results.resize(opsInBatch, lastResult.getStatus().withReason("")); + } + } + + long long nVal = 0; + std::vector errors; + + for (size_t i = 0; i < result.results.size(); i++) { + if (auto error = generateError(opCtx, result.results[i], i, errors.size())) { + errors.push_back(*error); + continue; + } + + const auto& opResult = result.results[i].getValue(); + nVal += opResult.getN(); // Always there. + + // Handle custom processing of each result. + if (hooks && hooks->singleWriteResultHandler) + hooks->singleWriteResultHandler(opResult, i); + } + + auto& replyBase = cmdReply->getWriteReplyBase(); + replyBase.setN(nVal); + + if (!errors.empty()) { + replyBase.setWriteErrors(errors); + } + + // writeConcernError field is handled by command processor. + + { + // Undocumented repl fields that mongos depends on. + auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext()); + const auto replMode = replCoord->getReplicationMode(); + if (replMode != repl::ReplicationCoordinator::modeNone) { + replyBase.setOpTime( + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()); + + if (replMode == repl::ReplicationCoordinator::modeReplSet) { + replyBase.setElectionId(replCoord->getElectionId()); + } + } + } + + // Call the called-defined post processing handler. + if (hooks && hooks->postProcessHandler) + hooks->postProcessHandler(); + } + + /** + * Returns true if the retryable time-series write has been executed. + */ + bool isRetryableTimeseriesWriteExecuted(OperationContext* opCtx, + const write_ops::Insert& insert, + CommandReplyType* reply) const { + if (!isTimeseriesWriteRetryable(opCtx)) { + return false; + } + + if (insert.getDocuments().empty()) { + return false; + } + + auto txnParticipant = TransactionParticipant::get(opCtx); + const auto& writeCommandBase = insert.getWriteCommandBase(); + + uassert( + ErrorCodes::OperationFailed, + str::stream() << "Retryable time-series insert operations are limited to one document " + "per command request", + insert.getDocuments().size() == 1U); + + auto stmtId = write_ops::getStmtIdForWriteAt(writeCommandBase, 0); + if (!txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId)) { + return false; + } + + auto& baseReply = reply->getWriteReplyBase(); + + // This retryable write has been executed previously. Fill in command reply before + // returning. + baseReply.setN(1); + + auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext()); + if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { + baseReply.setOpTime(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()); + baseReply.setElectionId(replCoord->getElectionId()); + } + + auto retryStats = RetryableWritesStats::get(opCtx); + retryStats->incrementRetriedStatementsCount(); + retryStats->incrementRetriedCommandsCount(); + + return true; } private: @@ -514,13 +512,14 @@ private: virtual void doCheckAuthorizationImpl(AuthorizationSession* authzSession) const = 0; // Customization point for 'run'. - virtual void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const = 0; + virtual CommandReplyType runImpl(OperationContext* opCtx) = 0; void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) final { try { _transactionChecks(opCtx); BSONObjBuilder bob = result->getBodyBuilder(); - runImpl(opCtx, bob); + CommandReplyType cmdReply = runImpl(opCtx); + cmdReply.serialize(&bob); CommandHelpers::extractOrAppendOk(bob); } catch (const DBException& ex) { LastError::get(opCtx->getClient()).setLastError(ex.code(), ex.reason()); @@ -558,7 +557,7 @@ private: const OpMsgRequest* _request; }; -class CmdInsert final : public WriteCommand { +class CmdInsert final : public WriteCommand { public: CmdInsert() : WriteCommand("insert") {} @@ -575,6 +574,11 @@ public: return _batch; } + bool getBypass() const { + return request().getBypassDocumentValidation(); + } + + private: NamespaceString ns() const override { return _batch.getNamespace(); @@ -809,8 +813,10 @@ public: << redact(_batch.toBSON({}))); } - void _performTimeseriesWrites(OperationContext* opCtx, BSONObjBuilder* result) const { - if (isRetryableTimeseriesWriteExecuted(opCtx, _batch, result)) { + void _performTimeseriesWrites(OperationContext* opCtx, + write_ops::InsertReply* insertReply) const { + + if (isRetryableTimeseriesWriteExecuted(opCtx, _batch, insertReply)) { return; } @@ -818,50 +824,57 @@ public: boost::optional opTime; boost::optional electionId; + auto& baseReply = insertReply->getWriteReplyBase(); + if (_batch.getOrdered()) { for (size_t i = 0; i < _batch.getDocuments().size(); ++i) { _performTimeseriesWritesSubset(opCtx, i, 1, &errors, &opTime, &electionId); if (!errors.empty()) { - result->appendNumber("n", i); + baseReply.setN(i); break; } } } else { _performTimeseriesWritesSubset( opCtx, 0, _batch.getDocuments().size(), &errors, &opTime, &electionId); - result->appendNumber("n", _batch.getDocuments().size() - errors.size()); + baseReply.setN(_batch.getDocuments().size() - errors.size()); } if (!errors.empty()) { - result->append("writeErrors", errors); + baseReply.setWriteErrors(errors); } if (opTime) { - appendOpTime(*opTime, result); + baseReply.setOpTime(*opTime); } if (electionId) { - result->append("electionId", *electionId); + baseReply.setElectionId(*electionId); } } - void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override { + write_ops::InsertReply runImpl(OperationContext* opCtx) override { + write_ops::InsertReply insertReply; + if (isTimeseries(opCtx, ns())) { // Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's // constructor. try { - _performTimeseriesWrites(opCtx, &result); + _performTimeseriesWrites(opCtx, &insertReply); } catch (DBException& ex) { ex.addContext(str::stream() << "time-series insert failed: " << ns().ns()); throw; } - return; + + return insertReply; } auto reply = write_ops_exec::performInserts(opCtx, _batch); - serializeReply(opCtx, - ReplyStyle::kNotUpdate, - !_batch.getWriteCommandBase().getOrdered(), - _batch.getDocuments().size(), - std::move(reply), - &result); + + populateReply(opCtx, + !_batch.getWriteCommandBase().getOrdered(), + _batch.getDocuments().size(), + std::move(reply), + &insertReply); + + return insertReply; } write_ops::Insert _batch; @@ -881,7 +894,7 @@ public: } } cmdInsert; -class CmdUpdate final : public WriteCommand { +class CmdUpdate final : public WriteCommand { public: CmdUpdate() : WriteCommand("update"), _updateMetrics{"update"} {} @@ -917,6 +930,10 @@ public: return _batch; } + bool getBypass() const { + return request().getBypassDocumentValidation(); + } + bool supportsReadMirroring() const override { return true; } @@ -955,14 +972,38 @@ public: auth::checkAuthForUpdateCommand(authzSession, getBypass(), _batch); } - void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override { + write_ops::UpdateReply runImpl(OperationContext* opCtx) override { + write_ops::UpdateReply updateReply; + long long nModified = 0; + + // Tracks the upserted information. The memory of this variable gets moved in the + // 'postProcessHandler' and should not be accessed afterwards. + std::vector upsertedInfoVec; + auto reply = write_ops_exec::performUpdates(opCtx, _batch); - serializeReply(opCtx, - ReplyStyle::kUpdate, - !_batch.getWriteCommandBase().getOrdered(), - _batch.getUpdates().size(), - std::move(reply), - &result); + + // Handler to process each 'SingleWriteResult'. + auto singleWriteHandler = [&](const SingleWriteResult& opResult, int index) { + nModified += opResult.getNModified(); + BSONSizeTracker upsertInfoSizeTracker; + + if (auto idElement = opResult.getUpsertedId().firstElement()) + upsertedInfoVec.emplace_back(write_ops::Upserted(index, idElement)); + }; + + // Handler to do the post-processing. + auto postProcessHandler = [&]() { + updateReply.setNModified(nModified); + if (!upsertedInfoVec.empty()) + updateReply.setUpserted(std::move(upsertedInfoVec)); + }; + + populateReply(opCtx, + !_batch.getWriteCommandBase().getOrdered(), + _batch.getUpdates().size(), + std::move(reply), + &updateReply, + PopulateReplyHooks{singleWriteHandler, postProcessHandler}); // Collect metrics. for (auto&& update : _batch.getUpdates()) { @@ -981,6 +1022,8 @@ public: _updateMetrics->incrementExecutedWithArrayFilters(); } } + + return updateReply; } void explain(OperationContext* opCtx, @@ -1047,7 +1090,7 @@ public: UpdateMetrics _updateMetrics; } cmdUpdate; -class CmdDelete final : public WriteCommand { +class CmdDelete final : public WriteCommand { public: CmdDelete() : WriteCommand("delete") {} @@ -1066,6 +1109,11 @@ public: return _batch; } + bool getBypass() const { + return request().getBypassDocumentValidation(); + } + + private: NamespaceString ns() const override { return _batch.getNamespace(); @@ -1075,14 +1123,17 @@ public: auth::checkAuthForDeleteCommand(authzSession, getBypass(), _batch); } - void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override { + write_ops::DeleteReply runImpl(OperationContext* opCtx) override { + write_ops::DeleteReply deleteReply; + auto reply = write_ops_exec::performDeletes(opCtx, _batch); - serializeReply(opCtx, - ReplyStyle::kNotUpdate, - !_batch.getWriteCommandBase().getOrdered(), - _batch.getDeletes().size(), - std::move(reply), - &result); + populateReply(opCtx, + !_batch.getWriteCommandBase().getOrdered(), + _batch.getDeletes().size(), + std::move(reply), + &deleteReply); + + return deleteReply; } void explain(OperationContext* opCtx, diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index 78a7a1e0e47..18960b1c810 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -38,6 +38,7 @@ env.Library( '$BUILD_DIR/mongo/db/dbmessage', '$BUILD_DIR/mongo/db/pipeline/runtime_constants_idl', '$BUILD_DIR/mongo/db/query/hint_parser', + '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/idl/idl_parser', '$BUILD_DIR/mongo/rpc/command_status', ], diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl index c3b0eed8032..a79e3ca58f3 100644 --- a/src/mongo/db/ops/write_ops.idl +++ b/src/mongo/db/ops/write_ops.idl @@ -30,6 +30,7 @@ global: cpp_namespace: "mongo::write_ops" cpp_includes: - "mongo/db/ops/write_ops_parsers.h" + - "mongo/db/repl/optime.h" imports: - "mongo/db/logical_session_id.idl" @@ -55,8 +56,72 @@ types: serializer: "mongo::write_ops::UpdateModification::serializeToBSON" deserializer: "mongo::write_ops::UpdateModification::parseFromBSON" + write_cmd_optime: + bson_serialization_type: any + description: "Holds the operation-time for write commands." + cpp_type: "mongo::repl::OpTime" + serializer: "::mongo::write_ops::opTimeSerializerWithTermCheck" + deserializer: "::mongo::write_ops::opTimeParser" + structs: + WriteReplyBase: + description: "Contains fields that is common in all the write commands reply." + fields: + n: + description: "For insert: number of documents inserted. + For update: number of documents that matched the query predicate. + For delete: number of documents deleted." + type: int + default: 0 + electionId: + description: "Replication coordinator election id." + type: objectid + optional: true + opTime: + description: "Operation time for the command." + type: write_cmd_optime + optional: true + writeErrors: + description: "Contains all the errors encoutered." + type: array + optional: true + + InsertReply: + description: "Contains information related to insert command reply." + chained_structs: + WriteReplyBase: writeReplyBase + + Upserted: + description: "Contains documents that have been upserted." + fields: + index: + description: "Index of the document." + type: int + _id: + description: "ID of the document." + type: IDLAnyTypeOwned + + UpdateReply: + description: "Contains information related to update command reply." + fields: + upserted: + description: "An array contains information about upserted documents." + type: array + optional: true + nModified: + description: "Number of updated documents." + type: int + default: 0 + + chained_structs: + WriteReplyBase: writeReplyBase + + DeleteReply: + description: "Contains information related to delete command reply." + chained_structs: + WriteReplyBase: writeReplyBase + WriteCommandBase: description: "Contains basic information included by all write commands" strict: false @@ -184,7 +249,8 @@ commands: command_name: insert strict: true namespace: concatenate_with_db - api_version: "" + api_version: "1" + reply_type: InsertReply chained_structs: WriteCommandBase: writeCommandBase fields: @@ -198,7 +264,8 @@ commands: command_name: update strict: true namespace: concatenate_with_db - api_version: "" + api_version: "1" + reply_type: UpdateReply chained_structs: WriteCommandBase: writeCommandBase fields: @@ -223,7 +290,8 @@ commands: command_name: delete strict: true namespace: concatenate_with_db - api_version: "" + api_version: "1" + reply_type: DeleteReply chained_structs: WriteCommandBase: writeCommandBase fields: diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp index 3dafb9dc4df..8642ecdf82f 100644 --- a/src/mongo/db/ops/write_ops_parsers.cpp +++ b/src/mongo/db/ops/write_ops_parsers.cpp @@ -100,6 +100,26 @@ void writeMultiDeleteProperty(bool isMulti, StringData fieldName, BSONObjBuilder builder->append(fieldName, isMulti ? 0 : 1); } +void opTimeSerializerWithTermCheck(repl::OpTime opTime, StringData fieldName, BSONObjBuilder* bob) { + if (opTime.getTerm() == repl::OpTime::kUninitializedTerm) { + bob->append(fieldName, opTime.getTimestamp()); + } else { + opTime.append(bob, fieldName.toString()); + } +} + +repl::OpTime opTimeParser(BSONElement elem) { + if (elem.type() == BSONType::Object) { + return repl::OpTime::parse(elem.Obj()); + } else if (elem.type() == BSONType::bsonTimestamp) { + return repl::OpTime(elem.timestamp(), repl::OpTime::kUninitializedTerm); + } + + uasserted(ErrorCodes::TypeMismatch, + str::stream() << "Expected BSON type " << BSONType::Object << " or " + << BSONType::bsonTimestamp << ", but found " << elem.type()); +} + int32_t getStmtIdForWriteAt(const WriteCommandBase& writeCommandBase, size_t writePos) { const auto& stmtIds = writeCommandBase.getStmtIds(); diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h index 8de2835d193..20b9f543bc7 100644 --- a/src/mongo/db/ops/write_ops_parsers.h +++ b/src/mongo/db/ops/write_ops_parsers.h @@ -33,6 +33,7 @@ #include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/exec/document_value/value.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/update/document_diff_serialization.h" #include "mongo/stdx/variant.h" #include "mongo/util/visit_helper.h" @@ -58,6 +59,18 @@ bool readMultiDeleteProperty(const BSONElement& limitElement); */ void writeMultiDeleteProperty(bool isMulti, StringData fieldName, BSONObjBuilder* builder); +/** + * Serializes the opTime fields to specified BSON builder. A 'term' field will be included only + * when it is intialized. + */ +void opTimeSerializerWithTermCheck(repl::OpTime opTime, StringData fieldName, BSONObjBuilder* bob); + +/** + * Method to deserialize the specified BSON element to opTime. This method is used by the IDL + * parser to generate the deserializer code. + */ +repl::OpTime opTimeParser(BSONElement elem); + class UpdateModification { public: enum class Type { kClassic, kPipeline, kDelta }; diff --git a/src/mongo/db/ops/write_ops_parsers_test.cpp b/src/mongo/db/ops/write_ops_parsers_test.cpp index b5074350ef4..7dd244071d7 100644 --- a/src/mongo/db/ops/write_ops_parsers_test.cpp +++ b/src/mongo/db/ops/write_ops_parsers_test.cpp @@ -514,5 +514,84 @@ TEST(LegacyWriteOpsParsers, Remove) { } } +/** + * Test OpTime serializer and deserializer when OpTime does not have term initailized. + */ +TEST(OpTimeSerdes, OpTimeWithoutTerm) { + const auto fieldName = "opTime"; + repl::OpTime opTime(Timestamp(10, 20), repl::OpTime::kUninitializedTerm); + BSONObjBuilder bob; + + write_ops::opTimeSerializerWithTermCheck(opTime, "opTime", &bob); + + auto bsonObj = bob.done(); + auto bsonElem = bsonObj[fieldName]; + + ASSERT_FALSE(bsonElem.eoo()); + + repl::OpTime retOpTime = write_ops::opTimeParser(bsonElem); + + ASSERT_EQ(opTime.getTimestamp(), retOpTime.getTimestamp()); + ASSERT_EQ(opTime.getTerm(), retOpTime.getTerm()); +} + +/** + * Test OpTime serializer and deserializer when OpTime have term initailized. + */ +TEST(OpTimeSerdes, OpTimeWithTerm) { + const auto fieldName = "opTime"; + repl::OpTime opTime(Timestamp(10, 20), 10); + BSONObjBuilder bob; + + write_ops::opTimeSerializerWithTermCheck(opTime, "opTime", &bob); + + auto bsonObj = bob.done(); + auto bsonElem = bsonObj[fieldName]; + + ASSERT_FALSE(bsonElem.eoo()); + + repl::OpTime retOpTime = write_ops::opTimeParser(bsonElem); + + ASSERT_EQ(opTime.getTimestamp(), retOpTime.getTimestamp()); + ASSERT_EQ(opTime.getTerm(), retOpTime.getTerm()); +} + +/** + * Test OpTime deserializer by directly passing Timestamp to the OpTime deserializer. + */ +TEST(OpTimeSerdes, DeserializeWithTimestamp) { + const auto fieldName = "opTime"; + Timestamp timestamp(10, 20); + BSONObjBuilder bob; + + bob.append(fieldName, timestamp); + + auto bsonObj = bob.done(); + auto bsonElem = bsonObj[fieldName]; + + ASSERT_FALSE(bsonElem.eoo()); + + repl::OpTime retOpTime = write_ops::opTimeParser(bsonElem); + + ASSERT_EQ(timestamp, retOpTime.getTimestamp()); + ASSERT_EQ(repl::OpTime::kUninitializedTerm, retOpTime.getTerm()); +} + +/** + * Test OpTime deserializer by passing invalid BSON type. + */ +TEST(OpTimeSerdes, DeserializeWithInvalidBSONType) { + const auto fieldName = "opTime"; + BSONObjBuilder bob; + + bob.append(fieldName, 100); + + auto bsonObj = bob.done(); + auto bsonElem = bsonObj[fieldName]; + + ASSERT_FALSE(bsonElem.eoo()); + ASSERT_THROWS_CODE(write_ops::opTimeParser(bsonElem), DBException, ErrorCodes::TypeMismatch); +} + } // namespace } // namespace mongo -- cgit v1.2.1