diff options
author | Randolph Tan <randolph@10gen.com> | 2017-09-26 17:45:15 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-10-04 16:43:58 -0400 |
commit | 0ab7000e04e16813c1e1e3f131f02de102ddffba (patch) | |
tree | 07c771aa1229bc85755f952dcc9a157a8d4e2dd2 /src/mongo/db | |
parent | d6267ee66b997af73fcfb095f03f655bb61c06dc (diff) | |
download | mongo-0ab7000e04e16813c1e1e3f131f02de102ddffba.tar.gz |
SERVER-31030 Use full OpTime instead of just Timestamps to refer to oplog entries
Diffstat (limited to 'src/mongo/db')
30 files changed, 589 insertions, 505 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index b466b70c3bb..79906de7f82 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -75,13 +75,12 @@ void onWriteOpCompleted(OperationContext* opCtx, Session* session, std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime) { - const auto lastStmtIdWriteTs = lastStmtIdWriteOpTime.getTimestamp(); - if (lastStmtIdWriteTs.isNull()) + if (lastStmtIdWriteOpTime.isNull()) return; if (session) { session->onWriteOpCompletedOnPrimary( - opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteTs); + opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteOpTime); } } @@ -140,7 +139,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, if (session) { sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); + oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); } OpTimeBundle opTimes; @@ -160,9 +159,9 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, opTimes.prePostImageOpTime = noteUpdateOpTime; if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { - oplogLink.preImageTs = noteUpdateOpTime.getTimestamp(); + oplogLink.preImageOpTime = noteUpdateOpTime; } else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) { - oplogLink.postImageTs = noteUpdateOpTime.getTimestamp(); + oplogLink.postImageOpTime = noteUpdateOpTime; } } @@ -197,7 +196,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, if (session) { sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); + oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); } OpTimeBundle opTimes; @@ -206,7 +205,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, auto noteOplog = repl::logOp( opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, sessionInfo, stmtId, {}); opTimes.prePostImageOpTime = noteOplog; - oplogLink.preImageTs = noteOplog.getTimestamp(); + oplogLink.preImageOpTime = noteOplog; } opTimes.writeOpTime = repl::logOp(opCtx, @@ -280,10 +279,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, bool fromMigrate) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; - const size_t count = end - begin; - auto timestamps = stdx::make_unique<Timestamp[]>(count); - const auto lastOpTime = - repl::logInsertOps(opCtx, nss, uuid, session, begin, end, timestamps.get(), fromMigrate); + const auto opTimeList = repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate); auto css = CollectionShardingState::get(opCtx, nss.ns()); @@ -292,10 +288,12 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "i", nss, it->doc, nullptr); if (!fromMigrate) { - css->onInsertOp(opCtx, it->doc, timestamps[index]); + auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index]; + css->onInsertOp(opCtx, it->doc, opTime); } } + auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back(); if (nss.coll() == "system.js") { Scope::storedFuncMod(opCtx); } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) { @@ -336,8 +334,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg args.criteria, args.update, args.updatedDoc, - opTime.writeOpTime.getTimestamp(), - opTime.prePostImageOpTime.getTimestamp()); + opTime.writeOpTime, + opTime.prePostImageOpTime); } if (args.nss.coll() == "system.js") { @@ -351,7 +349,6 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc); } - onWriteOpCompleted( opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime); } @@ -383,10 +380,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, auto css = CollectionShardingState::get(opCtx, nss.ns()); if (!fromMigrate) { - css->onDeleteOp(opCtx, - deleteState, - opTime.writeOpTime.getTimestamp(), - opTime.prePostImageOpTime.getTimestamp()); + css->onDeleteOp(opCtx, deleteState, opTime.writeOpTime, opTime.prePostImageOpTime); } if (nss.coll() == "system.js") { diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp index 3033daa68f9..3d4fc90fb7e 100644 --- a/src/mongo/db/ops/write_ops_retryability.cpp +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -67,7 +67,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, uassert(40607, str::stream() << "No pre-image available for findAndModify retry request:" << redact(request.toBSON()), - oplogWithCorrectLinks.getPreImageTs()); + oplogWithCorrectLinks.getPreImageOpTime()); } else if (opType == repl::OpTypeEnum::kInsert) { uassert( 40608, @@ -99,7 +99,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, << ts.toString() << ", oplog: " << redact(oplogEntry.toBSON()), - oplogWithCorrectLinks.getPostImageTs()); + oplogWithCorrectLinks.getPostImageOpTime()); } else { uassert(40612, str::stream() << "findAndModify retry request: " << redact(request.toBSON()) @@ -108,7 +108,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, << ts.toString() << ", oplog: " << redact(oplogEntry.toBSON()), - oplogWithCorrectLinks.getPreImageTs()); + oplogWithCorrectLinks.getPreImageOpTime()); } } } @@ -118,21 +118,21 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, * oplog. */ BSONObj extractPreOrPostImage(OperationContext* opCtx, const repl::OplogEntry& oplog) { - invariant(oplog.getPreImageTs() || oplog.getPostImageTs()); - auto ts = - oplog.getPreImageTs() ? oplog.getPreImageTs().value() : oplog.getPostImageTs().value(); + invariant(oplog.getPreImageOpTime() || oplog.getPostImageOpTime()); + auto opTime = oplog.getPreImageOpTime() ? oplog.getPreImageOpTime().value() + : oplog.getPostImageOpTime().value(); DBDirectClient client(opCtx); - auto oplogDoc = client.findOne(NamespaceString::kRsOplogNamespace.ns(), BSON("ts" << ts)); + auto oplogDoc = client.findOne(NamespaceString::kRsOplogNamespace.ns(), opTime.asQuery()); uassert(40613, str::stream() << "oplog no longer contains the complete write history of this " - "transaction, log with ts " - << ts.toString() + "transaction, log with opTime " + << opTime.toString() << " cannot be found", !oplogDoc.isEmpty()); - auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogDoc)); + auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogDoc)); return oplogEntry.getObject().getOwned(); } diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp index a50c51eba58..41774472760 100644 --- a/src/mongo/db/ops/write_ops_retryability_test.cpp +++ b/src/mongo/db/ops/write_ops_retryability_test.cpp @@ -287,9 +287,9 @@ TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPostImageButOplogHasPre) { auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); request.setShouldReturnNew(true); - Timestamp imageTs(120, 3); + repl::OpTime imageOpTime(Timestamp(120, 3), 1); repl::OplogEntry noteOplog( - repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); + imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); insertOplogEntry(noteOplog); @@ -299,7 +299,7 @@ TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPostImageButOplogHasPre) { kNs, BSON("x" << 1 << "y" << 1), BSON("x" << 1)); - updateOplog.setPreImageTs(imageTs); + updateOplog.setPreImageOpTime(imageOpTime); ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog), AssertionException); @@ -309,14 +309,14 @@ TEST_F(FindAndModifyRetryability, ErrorIfRequestIsUpdateButOplogIsDelete) { auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); request.setShouldReturnNew(true); - Timestamp imageTs(120, 3); + repl::OpTime imageOpTime(Timestamp(120, 3), 1); repl::OplogEntry noteOplog( - repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); + imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); insertOplogEntry(noteOplog); repl::OplogEntry oplog(repl::OpTime(), 0, repl::OpTypeEnum::kDelete, kNs, BSON("_id" << 1)); - oplog.setPreImageTs(imageTs); + oplog.setPreImageOpTime(imageOpTime); ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, oplog), AssertionException); } @@ -325,9 +325,9 @@ TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPreImageButOplogHasPost) { auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); request.setShouldReturnNew(false); - Timestamp imageTs(120, 3); + repl::OpTime imageOpTime(Timestamp(120, 3), 1); repl::OplogEntry noteOplog( - repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); + imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); insertOplogEntry(noteOplog); @@ -337,7 +337,7 @@ TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPreImageButOplogHasPost) { kNs, BSON("x" << 1 << "y" << 1), BSON("x" << 1)); - updateOplog.setPostImageTs(imageTs); + updateOplog.setPostImageOpTime(imageOpTime); ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog), AssertionException); @@ -347,9 +347,9 @@ TEST_F(FindAndModifyRetryability, UpdateWithPreImage) { auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); request.setShouldReturnNew(false); - Timestamp imageTs(120, 3); + repl::OpTime imageOpTime(Timestamp(120, 3), 1); repl::OplogEntry noteOplog( - repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); + imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); insertOplogEntry(noteOplog); @@ -359,7 +359,7 @@ TEST_F(FindAndModifyRetryability, UpdateWithPreImage) { kNs, BSON("x" << 1 << "y" << 1), BSON("x" << 1)); - updateOplog.setPreImageTs(imageTs); + updateOplog.setPreImageOpTime(imageOpTime); auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog); @@ -375,9 +375,9 @@ TEST_F(FindAndModifyRetryability, NestedUpdateWithPreImage) { auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); request.setShouldReturnNew(false); - Timestamp imageTs(120, 3); + repl::OpTime imageOpTime(Timestamp(120, 3), 1); repl::OplogEntry noteOplog( - repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); + imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); insertOplogEntry(noteOplog); @@ -394,7 +394,7 @@ TEST_F(FindAndModifyRetryability, NestedUpdateWithPreImage) { kNs, kNestedOplog, innerOplog.toBSON()); - updateOplog.setPreImageTs(imageTs); + updateOplog.setPreImageOpTime(imageOpTime); auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog); @@ -410,9 +410,9 @@ TEST_F(FindAndModifyRetryability, UpdateWithPostImage) { auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); request.setShouldReturnNew(true); - Timestamp imageTs(120, 3); + repl::OpTime imageOpTime(Timestamp(120, 3), 1); repl::OplogEntry noteOplog( - repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("a" << 1 << "b" << 1)); + imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("a" << 1 << "b" << 1)); insertOplogEntry(noteOplog); @@ -422,7 +422,7 @@ TEST_F(FindAndModifyRetryability, UpdateWithPostImage) { kNs, BSON("x" << 1 << "y" << 1), BSON("x" << 1)); - updateOplog.setPostImageTs(imageTs); + updateOplog.setPostImageOpTime(imageOpTime); auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog); @@ -438,9 +438,9 @@ TEST_F(FindAndModifyRetryability, NestedUpdateWithPostImage) { auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); request.setShouldReturnNew(true); - Timestamp imageTs(120, 3); + repl::OpTime imageOpTime(Timestamp(120, 3), 1); repl::OplogEntry noteOplog( - repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("a" << 1 << "b" << 1)); + imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("a" << 1 << "b" << 1)); insertOplogEntry(noteOplog); @@ -457,7 +457,7 @@ TEST_F(FindAndModifyRetryability, NestedUpdateWithPostImage) { kNs, kNestedOplog, innerOplog.toBSON()); - updateOplog.setPostImageTs(imageTs); + updateOplog.setPostImageOpTime(imageOpTime); auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog); @@ -473,14 +473,14 @@ TEST_F(FindAndModifyRetryability, UpdateWithPostImageButOplogDoesNotExistShouldE auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); request.setShouldReturnNew(true); - Timestamp imageTs(120, 3); + repl::OpTime imageOpTime(Timestamp(120, 3), 1); repl::OplogEntry updateOplog(repl::OpTime(), 0, repl::OpTypeEnum::kUpdate, kNs, BSON("x" << 1 << "y" << 1), BSON("x" << 1)); - updateOplog.setPostImageTs(imageTs); + updateOplog.setPostImageOpTime(imageOpTime); ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog), AssertionException); @@ -489,15 +489,15 @@ TEST_F(FindAndModifyRetryability, UpdateWithPostImageButOplogDoesNotExistShouldE TEST_F(FindAndModifyRetryability, BasicRemove) { auto request = FindAndModifyRequest::makeRemove(kNs, BSONObj()); - Timestamp imageTs(120, 3); + repl::OpTime imageOpTime(Timestamp(120, 3), 1); repl::OplogEntry noteOplog( - repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("_id" << 20 << "a" << 1)); + imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("_id" << 20 << "a" << 1)); insertOplogEntry(noteOplog); repl::OplogEntry removeOplog( repl::OpTime(), 0, repl::OpTypeEnum::kDelete, kNs, BSON("_id" << 20)); - removeOplog.setPreImageTs(imageTs); + removeOplog.setPreImageOpTime(imageOpTime); auto result = parseOplogEntryForFindAndModify(opCtx(), request, removeOplog); @@ -511,9 +511,9 @@ TEST_F(FindAndModifyRetryability, BasicRemove) { TEST_F(FindAndModifyRetryability, NestedRemove) { auto request = FindAndModifyRequest::makeRemove(kNs, BSONObj()); - Timestamp imageTs(120, 3); + repl::OpTime imageOpTime(Timestamp(120, 3), 1); repl::OplogEntry noteOplog( - repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("_id" << 20 << "a" << 1)); + imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("_id" << 20 << "a" << 1)); insertOplogEntry(noteOplog); @@ -526,7 +526,7 @@ TEST_F(FindAndModifyRetryability, NestedRemove) { kNs, kNestedOplog, innerOplog.toBSON()); - removeOplog.setPreImageTs(imageTs); + removeOplog.setPreImageOpTime(imageOpTime); auto result = parseOplogEntryForFindAndModify(opCtx(), request, removeOplog); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index b80dd5e0301..b2928a96b8e 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -300,14 +300,17 @@ void appendSessionInfo(OperationContext* opCtx, sessionInfo.serialize(builder); builder->append(OplogEntryBase::kStatementIdFieldName, statementId); - builder->append(OplogEntryBase::kPrevWriteTsInTransactionFieldName, oplogLink.prevTs); + oplogLink.prevOpTime.append(builder, + OplogEntryBase::kPrevWriteOpTimeInTransactionFieldName.toString()); - if (!oplogLink.preImageTs.isNull()) { - builder->append(OplogEntryBase::kPreImageTsFieldName, oplogLink.preImageTs); + if (!oplogLink.preImageOpTime.isNull()) { + oplogLink.preImageOpTime.append(builder, + OplogEntryBase::kPreImageOpTimeFieldName.toString()); } - if (!oplogLink.postImageTs.isNull()) { - builder->append(OplogEntryBase::kPostImageTsFieldName, oplogLink.postImageTs); + if (!oplogLink.postImageOpTime.isNull()) { + oplogLink.postImageOpTime.append(builder, + OplogEntryBase::kPostImageOpTimeFieldName.toString()); } } @@ -446,14 +449,13 @@ OpTime logOp(OperationContext* opCtx, return slot.opTime; } -repl::OpTime logInsertOps(OperationContext* opCtx, - const NamespaceString& nss, - OptionalCollectionUUID uuid, - Session* session, - std::vector<InsertStatement>::const_iterator begin, - std::vector<InsertStatement>::const_iterator end, - Timestamp timestamps[], - bool fromMigrate) { +std::vector<OpTime> logInsertOps(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + Session* session, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + bool fromMigrate) { invariant(begin != end); auto replCoord = ReplicationCoordinator::get(opCtx); @@ -478,10 +480,11 @@ repl::OpTime logInsertOps(OperationContext* opCtx, if (session) { sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); + oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); } - OpTime lastOpTime; + auto timestamps = stdx::make_unique<Timestamp[]>(count); + std::vector<OpTime> opTimes; for (size_t i = 0; i < count; i++) { // Make a mutable copy. auto insertStatementOplogSlot = begin[i].oplogSlot; @@ -502,19 +505,22 @@ repl::OpTime logInsertOps(OperationContext* opCtx, sessionInfo, begin[i].stmtId, oplogLink)); - oplogLink.prevTs = insertStatementOplogSlot.opTime.getTimestamp(); - timestamps[i] = oplogLink.prevTs; - lastOpTime = insertStatementOplogSlot.opTime; + oplogLink.prevOpTime = insertStatementOplogSlot.opTime; + timestamps[i] = oplogLink.prevOpTime.getTimestamp(); + opTimes.push_back(insertStatementOplogSlot.opTime); } std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]); for (size_t i = 0; i < count; i++) { basePtrs[i] = &writers[i]; } + + invariant(!opTimes.empty()); + auto lastOpTime = opTimes.back(); invariant(!lastOpTime.isNull()); - _logOpsInner(opCtx, nss, basePtrs.get(), timestamps, count, oplog, lastOpTime); + _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime); wuow.commit(); - return lastOpTime; + return opTimes; } namespace { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 34a96b23b05..78f1c4f8103 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -78,9 +78,9 @@ class ReplSettings; struct OplogLink { OplogLink() = default; - Timestamp prevTs; - Timestamp preImageTs; - Timestamp postImageTs; + OpTime prevOpTime; + OpTime preImageOpTime; + OpTime postImageOpTime; }; /** @@ -102,18 +102,15 @@ extern int OPLOG_VERSION; /** * Log insert(s) to the local oplog. - * Returns the OpTime of the last insert. - * The timestamps parameter can also be modified and contain the individual timestamps for each - * insert after the oplog entries were created. + * Returns the OpTime of every insert. */ -OpTime logInsertOps(OperationContext* opCtx, - const NamespaceString& nss, - OptionalCollectionUUID uuid, - Session* session, - std::vector<InsertStatement>::const_iterator begin, - std::vector<InsertStatement>::const_iterator end, - Timestamp timestamps[], - bool fromMigrate); +std::vector<OpTime> logInsertOps(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + Session* session, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + bool fromMigrate); /** * @param opstr diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index a01b38f2ee0..3f1e7cee003 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -32,6 +32,7 @@ global: imports: - "mongo/idl/basic_types.idl" - "mongo/db/logical_session_id.idl" + - "mongo/db/repl/replication_types.idl" enums: OpType: @@ -110,18 +111,18 @@ structs: # must exist. description: "Identifier of the transaction statement which generated this oplog entry" - prevTs: - cpp_name: prevWriteTsInTransaction - type: timestamp + prevOpTime: + cpp_name: prevWriteOpTimeInTransaction + type: optime optional: true # Only for writes that are part of a transaction - description: "The oplog timestamp of the previous write with the same transaction." - preImageTs: - type: timestamp + description: "The opTime of the previous write with the same transaction." + preImageOpTime: + type: optime optional: true - description: "The oplog timestamp of another oplog entry that contains the document + description: "The optime of another oplog entry that contains the document before an update/remove was applied." - postImageTs: - type: timestamp + postImageOpTime: + type: optime optional: true - description: "The oplog timestamp of another oplog entry that contains the document + description: "The optime of another oplog entry that contains the document after an update was applied." diff --git a/src/mongo/db/repl/optime.cpp b/src/mongo/db/repl/optime.cpp index 9c63e0a7e88..2957f218374 100644 --- a/src/mongo/db/repl/optime.cpp +++ b/src/mongo/db/repl/optime.cpp @@ -91,6 +91,22 @@ std::ostream& operator<<(std::ostream& out, const OpTime& opTime) { return out << opTime.toString(); } +void OpTime::appendAsQuery(BSONObjBuilder* builder) const { + builder->append(kTimestampFieldName, _timestamp); + if (_term == kUninitializedTerm) { + // pv0 oplogs don't actually have the term field so don't query for {t: -1}. + builder->append(kTermFieldName, BSON("$exists" << false)); + } else { + builder->append(kTermFieldName, _term); + } +} + +BSONObj OpTime::asQuery() const { + BSONObjBuilder builder; + appendAsQuery(&builder); + return builder.obj(); +} + } // namespace repl BSONObjBuilder& operator<<(BSONObjBuilderValueStream& builder, const repl::OpTime& value) { diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h index 05ee64df110..be16e923289 100644 --- a/src/mongo/db/repl/optime.h +++ b/src/mongo/db/repl/optime.h @@ -149,6 +149,9 @@ public: friend std::ostream& operator<<(std::ostream& out, const OpTime& opTime); + void appendAsQuery(BSONObjBuilder* builder) const; + BSONObj asQuery() const; + private: Timestamp _timestamp; long long _term = kInitialTerm; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 13b5df16db6..b25bc8a547b 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -544,16 +544,16 @@ SessionRecordMap computeLatestTransactionTableRecords(const MultiApplier::Operat LogicalSessionId lsid(*sessionInfo.getSessionId()); auto txnNumber = *sessionInfo.getTxnNumber(); - auto opTimeTs = op.getOpTime().getTimestamp(); + auto opTime = op.getOpTime(); auto it = latestRecords.find(lsid); if (it != latestRecords.end()) { - auto record = makeSessionTxnRecord(lsid, txnNumber, opTimeTs); + auto record = makeSessionTxnRecord(lsid, txnNumber, opTime); if (record > it->second) { latestRecords[lsid] = std::move(record); } } else { - latestRecords.emplace(lsid, makeSessionTxnRecord(lsid, txnNumber, opTimeTs)); + latestRecords.emplace(lsid, makeSessionTxnRecord(lsid, txnNumber, opTime)); } } return latestRecords; diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 343fe15b1cb..783917ea969 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -541,31 +541,40 @@ TEST_F(SyncTailTest, MultiApplyUpdatesTheTransactionTable) { DBDirectClient client(_opCtx.get()); // The txnNum and optime of the only write were saved. - auto resultSingle = + auto resultSingleDoc = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), BSON(SessionTxnRecord::kSessionIdFieldName << lsidSingle.toBSON())); - ASSERT_TRUE(!resultSingle.isEmpty()); - ASSERT_EQ(resultSingle[SessionTxnRecord::kTxnNumFieldName].numberLong(), 5LL); - ASSERT_EQ(resultSingle[SessionTxnRecord::kLastWriteOpTimeTsFieldName].timestamp(), - Timestamp(Seconds(1), 0)); + ASSERT_TRUE(!resultSingleDoc.isEmpty()); + + auto resultSingle = + SessionTxnRecord::parse(IDLParserErrorContext("resultSingleDoc test"), resultSingleDoc); + + ASSERT_EQ(resultSingle.getTxnNum(), 5LL); + ASSERT_EQ(resultSingle.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(1), 0), 1)); // The txnNum and optime of the write with the larger txnNum were saved. - auto resultDiffTxn = + auto resultDiffTxnDoc = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), BSON(SessionTxnRecord::kSessionIdFieldName << lsidDiffTxn.toBSON())); - ASSERT_TRUE(!resultDiffTxn.isEmpty()); - ASSERT_EQ(resultDiffTxn[SessionTxnRecord::kTxnNumFieldName].numberLong(), 20LL); - ASSERT_EQ(resultDiffTxn[SessionTxnRecord::kLastWriteOpTimeTsFieldName].timestamp(), - Timestamp(Seconds(3), 0)); + ASSERT_TRUE(!resultDiffTxnDoc.isEmpty()); + + auto resultDiffTxn = + SessionTxnRecord::parse(IDLParserErrorContext("resultDiffTxnDoc test"), resultDiffTxnDoc); + + ASSERT_EQ(resultDiffTxn.getTxnNum(), 20LL); + ASSERT_EQ(resultDiffTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(3), 0), 1)); // The txnNum and optime of the write with the later optime were saved. - auto resultSameTxn = + auto resultSameTxnDoc = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), BSON(SessionTxnRecord::kSessionIdFieldName << lsidSameTxn.toBSON())); - ASSERT_TRUE(!resultSameTxn.isEmpty()); - ASSERT_EQ(resultSameTxn[SessionTxnRecord::kTxnNumFieldName].numberLong(), 30LL); - ASSERT_EQ(resultSameTxn[SessionTxnRecord::kLastWriteOpTimeTsFieldName].timestamp(), - Timestamp(Seconds(6), 0)); + ASSERT_TRUE(!resultSameTxnDoc.isEmpty()); + + auto resultSameTxn = + SessionTxnRecord::parse(IDLParserErrorContext("resultSameTxnDoc test"), resultSameTxnDoc); + + ASSERT_EQ(resultSameTxn.getTxnNum(), 30LL); + ASSERT_EQ(resultSameTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(6), 0), 1)); // There is no entry for the write with no txnNumber. auto resultNoTxn = diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 575c442ac74..0c4e6a1b70d 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -269,7 +269,7 @@ boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj co void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, - const Timestamp& oplogTs) { + const repl::OpTime& opTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -293,7 +293,7 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, oplogTs); + _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, opTime); } } @@ -301,8 +301,8 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) { + const repl::OpTime& opTime, + const repl::OpTime& prePostImageOpTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -319,7 +319,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx, checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, oplogTs, prePostImageTs); + _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, opTime, prePostImageOpTime); } } @@ -330,8 +330,8 @@ auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState, - const Timestamp& oplogTs, - const Timestamp& preImageTs) { + const repl::OpTime& opTime, + const repl::OpTime& preImageOpTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -372,7 +372,7 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, checkShardVersionOrThrow(opCtx); if (_sourceMgr && deleteState.isMigrating) { - _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, oplogTs, preImageTs); + _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, opTime, preImageOpTime); } } diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 6f9722dceb9..c70a032918b 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -224,17 +224,19 @@ public: * * The global exclusive lock is expected to be held by the caller of any of these functions. */ - void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const Timestamp& oplogTs); + void onInsertOp(OperationContext* opCtx, + const BSONObj& insertedDoc, + const repl::OpTime& opTime); void onUpdateOp(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs); + const repl::OpTime& opTime, + const repl::OpTime& prePostImageOpTime); void onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState, - const Timestamp& oplogTs, - const Timestamp& preImageTs); + const repl::OpTime& opTime, + const repl::OpTime& preImageOpTime); void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName); private: diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index 9d9a1407f1d..ff3d31132fa 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -38,6 +38,10 @@ class OperationContext; class Status; class Timestamp; +namespace repl { +class OpTime; +} // namespace repl + /** * This class is responsible for producing chunk documents to be moved from donor to a recipient * shard and its methods represent cloning stages. Its lifetime is owned and controlled by a single @@ -121,7 +125,7 @@ public: */ virtual void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, - const Timestamp& oplogTs) = 0; + const repl::OpTime& opTime) = 0; /** * Notifies this cloner that an update happened to the collection, which it owns. It is up to @@ -132,8 +136,8 @@ public: */ virtual void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) = 0; + const repl::OpTime& opTime, + const repl::OpTime& prePostImageOpTime) = 0; /** * Notifies this cloner that a delede happened to the collection, which it owns. It is up to the @@ -144,8 +148,8 @@ public: */ virtual void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId, - const Timestamp& oplogTs, - const Timestamp& preImageTs) = 0; + const repl::OpTime& opTime, + const repl::OpTime& preImageOpTime) = 0; protected: MigrationChunkClonerSource(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 5d0db5dea0b..e3438030741 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -42,6 +42,7 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/query/internal_plans.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/s/start_chunk_clone_request.h" #include "mongo/db/service_context.h" #include "mongo/executor/remote_command_request.h" @@ -136,13 +137,13 @@ public: LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner, const BSONObj& idObj, const char op, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) + const repl::OpTime& opTime, + const repl::OpTime& prePostImageOpTime) : _cloner(cloner), _idObj(idObj.getOwned()), _op(op), - _oplogTs(oplogTs), - _prePostImageTs(prePostImageTs) {} + _opTime(opTime), + _prePostImageOpTime(prePostImageOpTime) {} void commit() override { switch (_op) { @@ -163,12 +164,12 @@ public: MONGO_UNREACHABLE; } - if (!_prePostImageTs.isNull()) { - _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs); + if (!_prePostImageOpTime.isNull()) { + _cloner->_sessionCatalogSource.notifyNewWriteOpTime(_prePostImageOpTime); } - if (!_oplogTs.isNull()) { - _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs); + if (!_opTime.isNull()) { + _cloner->_sessionCatalogSource.notifyNewWriteOpTime(_opTime); } } @@ -178,8 +179,8 @@ private: MigrationChunkClonerSourceLegacy* const _cloner; const BSONObj _idObj; const char _op; - const Timestamp _oplogTs; - const Timestamp _prePostImageTs; + const repl::OpTime _opTime; + const repl::OpTime _prePostImageOpTime; }; MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request, @@ -371,7 +372,7 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj& void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, - const Timestamp& oplogTs) { + const repl::OpTime& opTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = insertedDoc["_id"]; @@ -387,7 +388,7 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, if (opCtx->getTxnNumber()) { opCtx->recoveryUnit()->registerChange( - new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {})); + new LogOpForShardingHandler(this, idElement.wrap(), 'i', opTime, {})); } else { opCtx->recoveryUnit()->registerChange( new LogOpForShardingHandler(this, idElement.wrap(), 'i', {}, {})); @@ -396,8 +397,8 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) { + const repl::OpTime& opTime, + const repl::OpTime& prePostImageOpTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = updatedDoc["_id"]; @@ -413,7 +414,7 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, if (opCtx->getTxnNumber()) { opCtx->recoveryUnit()->registerChange( - new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs)); + new LogOpForShardingHandler(this, idElement.wrap(), 'u', opTime, prePostImageOpTime)); } else { opCtx->recoveryUnit()->registerChange( new LogOpForShardingHandler(this, idElement.wrap(), 'u', {}, {})); @@ -422,8 +423,8 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId, - const Timestamp& oplogTs, - const Timestamp& preImageTs) { + const repl::OpTime& opTime, + const repl::OpTime& preImageOpTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = deletedDocId["_id"]; @@ -435,7 +436,7 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, if (opCtx->getTxnNumber()) { opCtx->recoveryUnit()->registerChange( - new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs)); + new LogOpForShardingHandler(this, idElement.wrap(), 'd', opTime, preImageOpTime)); } else { opCtx->recoveryUnit()->registerChange( new LogOpForShardingHandler(this, idElement.wrap(), 'd', {}, {})); @@ -731,14 +732,16 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder) { while (_sessionCatalogSource.hasMoreOplog()) { - auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog(); + auto oplog = _sessionCatalogSource.getLastFetchedOplog(); - if (oplogDoc.isEmpty()) { + if (!oplog) { // Last fetched turned out empty, try to see if there are more _sessionCatalogSource.fetchNextOplog(opCtx); continue; } + auto oplogDoc = oplog->toBSON(); + // Use the builder size instead of accumulating the document sizes directly so that we // take into consideration the overhead of BSONArray indices. if (arrBuilder->arrSize() && diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 72d299b58e4..5bd20b0907d 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -75,17 +75,17 @@ public: void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, - const Timestamp& oplogTs) override; + const repl::OpTime& opTime) override; void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) override; + const repl::OpTime& opTime, + const repl::OpTime& prePostImageOpTime) override; void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId, - const Timestamp& oplogTs, - const Timestamp& preImageTs) override; + const repl::OpTime& opTime, + const repl::OpTime& preImageOpTime) override; // Legacy cloner specific functionality diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 6f6865d5f15..05aec22193b 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -90,16 +90,15 @@ repl::OplogLink extractPrePostImageTs(const ProcessOplogResult& lastResult, uassert(40628, str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() << " to not have " - << repl::OplogEntryBase::kPreImageTsFieldName + << repl::OplogEntryBase::kPreImageOpTimeFieldName << " or " - << repl::OplogEntryBase::kPostImageTsFieldName, - !entry.getPreImageTs() && !entry.getPostImageTs()); + << repl::OplogEntryBase::kPostImageOpTimeFieldName, + !entry.getPreImageOpTime() && !entry.getPostImageOpTime()); return oplogLink; } - const auto ts = lastResult.oplogTime.getTimestamp(); - invariant(!ts.isNull()); + invariant(!lastResult.oplogTime.isNull()); const auto& sessionInfo = entry.getOperationSessionInfo(); const auto sessionId = *sessionInfo.getSessionId(); @@ -118,19 +117,19 @@ repl::OplogLink extractPrePostImageTs(const ProcessOplogResult& lastResult, << lastResult.txnNum, lastResult.txnNum == txnNum); - if (entry.getPreImageTs()) { - oplogLink.preImageTs = ts; - } else if (entry.getPostImageTs()) { - oplogLink.postImageTs = ts; + if (entry.getPreImageOpTime()) { + oplogLink.preImageOpTime = lastResult.oplogTime; + } else if (entry.getPostImageOpTime()) { + oplogLink.postImageOpTime = lastResult.oplogTime; } else { uasserted(40631, - str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() + str::stream() << "expected oplog with opTime: " << entry.getOpTime().toString() << ": " << redact(entry.toBSON()) << " to have either " - << repl::OplogEntryBase::kPreImageTsFieldName + << repl::OplogEntryBase::kPreImageOpTimeFieldName << " or " - << repl::OplogEntryBase::kPostImageTsFieldName); + << repl::OplogEntryBase::kPostImageOpTimeFieldName); } return oplogLink; @@ -185,11 +184,13 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx, Shard::RetryPolicy::kNoRetry); uassertStatusOK(responseStatus.getStatus()); + uassertStatusOK(responseStatus.getValue().commandStatus); + auto result = responseStatus.getValue().response; auto oplogElement = result[kOplogField]; uassert(ErrorCodes::FailedToParse, - "_migrateSession response does not have the 'oplog' field as array", + "_getNextSessionMods response does not have the 'oplog' field as array", oplogElement.type() == Array); return result; @@ -252,49 +253,53 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, ? oplogEntry.getObject() : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); - oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum); - - writeConflictRetry( - opCtx, - "SessionOplogMigration", - NamespaceString::kSessionTransactionsTableNamespace.ns(), - [&] { - // Need to take global lock here so repl::logOp will not unlock it and trigger the - // invariant that disallows unlocking global lock while inside a WUOW. - // Grab a DBLock here instead of plain GlobalLock to make sure the MMAPV1 flush - // lock will be lock/unlocked correctly. Take the transaction table db lock to - // ensure the same lock ordering with normal replicated updates to the table. - Lock::DBLock lk( - opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX); - WriteUnitOfWork wunit(opCtx); - - result.oplogTime = repl::logOp(opCtx, - "n", - oplogEntry.getNamespace(), - oplogEntry.getUuid(), - object, - &object2, - true, - sessionInfo, - stmtId, - oplogLink); - - auto oplogTs = result.oplogTime.getTimestamp(); - uassert(40633, - str::stream() << "Failed to create new oplog entry for oplog with opTime: " - << oplogEntry.getOpTime().toString() - << ": " - << redact(oplogBSON), - !oplogTs.isNull()); - - // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because - // the next oplog will contain the real operation. - if (!result.isPrePostImage) { - scopedSession->onWriteOpCompletedOnPrimary(opCtx, result.txnNum, {stmtId}, oplogTs); - } - - wunit.commit(); - }); + oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum); + + writeConflictRetry(opCtx, + "SessionOplogMigration", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + // Need to take global lock here so repl::logOp will not unlock it and + // trigger the invariant that disallows unlocking global lock while + // inside a WUOW. Grab a DBLock here instead of plain GlobalLock to make + // sure the MMAPV1 flush lock will be lock/unlocked correctly. Take the + // transaction table db lock to ensure the same lock ordering with normal + // replicated updates to the table. + Lock::DBLock lk(opCtx, + NamespaceString::kSessionTransactionsTableNamespace.db(), + MODE_IX); + WriteUnitOfWork wunit(opCtx); + + result.oplogTime = repl::logOp(opCtx, + "n", + oplogEntry.getNamespace(), + oplogEntry.getUuid(), + object, + &object2, + true, + sessionInfo, + stmtId, + oplogLink); + + auto oplogOpTime = result.oplogTime; + uassert(40633, + str::stream() + << "Failed to create new oplog entry for oplog with opTime: " + << oplogEntry.getOpTime().toString() + << ": " + << redact(oplogBSON), + !oplogOpTime.isNull()); + + // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post + // image, because + // the next oplog will contain the real operation. + if (!result.isPrePostImage) { + scopedSession->onWriteOpCompletedOnPrimary( + opCtx, result.txnNum, {stmtId}, oplogOpTime); + } + + wunit.commit(); + }); return result; } @@ -327,8 +332,10 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { void SessionCatalogMigrationDestination::finish() { stdx::lock_guard<stdx::mutex> lk(_mutex); - _state = State::Committing; - _isStateChanged.notify_all(); + if (_state != State::ErrorOccurred) { + _state = State::Committing; + _isStateChanged.notify_all(); + } } void SessionCatalogMigrationDestination::join() { diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 71c63721c6f..3dc34514061 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -128,10 +128,9 @@ public: }); } - repl::OplogEntry getOplog(OperationContext* opCtx, const Timestamp& ts) { + repl::OplogEntry getOplog(OperationContext* opCtx, const repl::OpTime& opTime) { DBDirectClient client(opCtx); - auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(), - BSON(repl::OplogEntryBase::kTimestampFieldName << ts)); + auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(), opTime.asQuery()); ASSERT_FALSE(oplogBSON.isEmpty()); auto parseStatus = repl::OplogEntry::parse(oplogBSON); @@ -297,7 +296,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) { auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -350,7 +349,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, txnNum); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(txnNum)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(txnNum)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -396,7 +395,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -453,7 +452,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) { auto session = getSessionWithTxn(opCtx, sessionId1, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); @@ -461,7 +460,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) { auto session = getSessionWithTxn(opCtx, sessionId2, 42); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(42)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(42)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -526,7 +525,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); checkOplog(oplog2, historyIter.next(opCtx)); @@ -565,7 +564,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA BSON("$set" << BSON("x" << 101))); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); - updateOplog.setPreImageTs(Timestamp(100, 2)); + updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({preImageOplog, updateOplog}); // migration always fetches at least twice to transition from committing to done. @@ -578,7 +577,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); @@ -606,13 +605,13 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA ASSERT_FALSE(historyIter.hasNext()); - ASSERT_TRUE(nextOplog.getPreImageTs()); - ASSERT_FALSE(nextOplog.getPostImageTs()); + ASSERT_TRUE(nextOplog.getPreImageOpTime()); + ASSERT_FALSE(nextOplog.getPostImageOpTime()); // Check preImage oplog - auto preImageTs = nextOplog.getPreImageTs().value(); - auto newPreImageOplog = getOplog(opCtx, preImageTs); + auto preImageOpTime = nextOplog.getPreImageOpTime().value(); + auto newPreImageOplog = getOplog(opCtx, preImageOpTime); ASSERT_TRUE(newPreImageOplog.getStatementId()); ASSERT_EQ(45, newPreImageOplog.getStatementId().value()); @@ -656,7 +655,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind BSON("$set" << BSON("x" << 101))); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); - updateOplog.setPostImageTs(Timestamp(100, 2)); + updateOplog.setPostImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({postImageOplog, updateOplog}); // migration always fetches at least twice to transition from committing to done. @@ -669,7 +668,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); @@ -697,13 +696,13 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind ASSERT_FALSE(historyIter.hasNext()); - ASSERT_FALSE(nextOplog.getPreImageTs()); - ASSERT_TRUE(nextOplog.getPostImageTs()); + ASSERT_FALSE(nextOplog.getPreImageOpTime()); + ASSERT_TRUE(nextOplog.getPostImageOpTime()); // Check preImage oplog - auto postImageTs = nextOplog.getPostImageTs().value(); - auto newPostImageOplog = getOplog(opCtx, postImageTs); + auto postImageOpTime = nextOplog.getPostImageOpTime().value(); + auto newPostImageOplog = getOplog(opCtx, postImageOpTime); ASSERT_TRUE(newPostImageOplog.getStatementId()); ASSERT_EQ(45, newPostImageOplog.getStatementId().value()); @@ -747,7 +746,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify BSON("$set" << BSON("x" << 101))); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); - updateOplog.setPreImageTs(Timestamp(100, 2)); + updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({preImageOplog}); returnOplog({updateOplog}); @@ -761,7 +760,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); @@ -789,13 +788,13 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify ASSERT_FALSE(historyIter.hasNext()); - ASSERT_TRUE(nextOplog.getPreImageTs()); - ASSERT_FALSE(nextOplog.getPostImageTs()); + ASSERT_TRUE(nextOplog.getPreImageOpTime()); + ASSERT_FALSE(nextOplog.getPostImageOpTime()); // Check preImage oplog - auto preImageTs = nextOplog.getPreImageTs().value(); - auto newPreImageOplog = getOplog(opCtx, preImageTs); + auto preImageOpTime = nextOplog.getPreImageOpTime().value(); + auto newPreImageOplog = getOplog(opCtx, preImageOpTime); ASSERT_TRUE(newPreImageOplog.getStatementId()); ASSERT_EQ(45, newPreImageOplog.getStatementId().value()); @@ -856,7 +855,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) { ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); auto session = getSessionWithTxn(opCtx, sessionId, 20); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(20)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(20)); ASSERT_TRUE(historyIter.hasNext()); auto oplog = historyIter.next(opCtx); @@ -912,7 +911,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); auto session = getSessionWithTxn(opCtx, sessionId, 20); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(20)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(20)); ASSERT_TRUE(historyIter.hasNext()); auto oplog = historyIter.next(opCtx); @@ -1079,7 +1078,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx)); @@ -1157,7 +1156,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, sessionInfo.setSessionId(makeLogicalSessionIdForTest()); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); - updateOplog.setPreImageTs(Timestamp(100, 2)); + updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({preImageOplog, updateOplog}); @@ -1197,7 +1196,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNo sessionInfo.setTxnNumber(56); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); - updateOplog.setPreImageTs(Timestamp(100, 2)); + updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({preImageOplog, updateOplog}); @@ -1276,7 +1275,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, BSON("$set" << BSON("x" << 101))); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); - updateOplog.setPreImageTs(Timestamp(100, 2)); + updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({oplog1, updateOplog}); @@ -1316,7 +1315,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, BSON("$set" << BSON("x" << 101))); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); - updateOplog.setPostImageTs(Timestamp(100, 2)); + updateOplog.setPostImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({oplog1, updateOplog}); @@ -1365,7 +1364,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); auto session = getSessionWithTxn(opCtx, sessionId, 19); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(19)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(19)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 829d862b370..ac835502952 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -41,20 +41,23 @@ namespace mongo { namespace { -BSONObj fetchPrePostImageOplog(OperationContext* opCtx, const repl::OplogEntry& oplog) { - auto tsToFetch = oplog.getPreImageTs(); +boost::optional<repl::OplogEntry> fetchPrePostImageOplog(OperationContext* opCtx, + const repl::OplogEntry& oplog) { + auto opTimeToFetch = oplog.getPreImageOpTime(); - if (!tsToFetch) { - tsToFetch = oplog.getPostImageTs(); + if (!opTimeToFetch) { + opTimeToFetch = oplog.getPostImageOpTime(); } - if (!tsToFetch) { - return BSONObj(); + if (!opTimeToFetch) { + return boost::none; } + auto opTime = opTimeToFetch.value(); DBDirectClient client(opCtx); - return client.findOne(NamespaceString::kRsOplogNamespace.ns(), - Query(BSON("ts" << tsToFetch.value()))); + auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(), opTime.asQuery()); + + return uassertStatusOK(repl::OplogEntry::parse(oplogBSON)); } } // unnamed namespace @@ -66,10 +69,10 @@ bool SessionCatalogMigrationSource::hasMoreOplog() { return _hasMoreOplogFromSessionCatalog() || _hasNewWrites(); } -BSONObj SessionCatalogMigrationSource::getLastFetchedOplog() { +boost::optional<repl::OplogEntry> SessionCatalogMigrationSource::getLastFetchedOplog() { { stdx::lock_guard<stdx::mutex> _lk(_sessionCloneMutex); - if (!_lastFetchedOplog.isEmpty()) { + if (_lastFetchedOplog) { return _lastFetchedOplog; } } @@ -100,13 +103,12 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte return false; } - auto nextOplogBSON = nextOplog.toBSON().getOwned(); auto doc = fetchPrePostImageOplog(opCtx, nextOplog); - if (!doc.isEmpty()) { - _lastFetchedOplogBuffer.push_back(nextOplogBSON); - _lastFetchedOplog = doc; + if (doc) { + _lastFetchedOplogBuffer.push_back(nextOplog); + _lastFetchedOplog = *doc; } else { - _lastFetchedOplog = nextOplogBSON; + _lastFetchedOplog = nextOplog; } return true; @@ -120,18 +122,18 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte bool SessionCatalogMigrationSource::_hasMoreOplogFromSessionCatalog() { stdx::lock_guard<stdx::mutex> _lk(_sessionCloneMutex); - return !_lastFetchedOplog.isEmpty() || !_lastFetchedOplogBuffer.empty(); + return _lastFetchedOplog || !_lastFetchedOplogBuffer.empty(); } // Important: The no-op oplog entry for findAndModify should always be returned first before the // actual operation. -BSONObj SessionCatalogMigrationSource::_getLastFetchedOplogFromSessionCatalog() { +repl::OplogEntry SessionCatalogMigrationSource::_getLastFetchedOplogFromSessionCatalog() { stdx::lock_guard<stdx::mutex> lk(_sessionCloneMutex); return _lastFetchedOplogBuffer.back(); } bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> lk(_sessionCloneMutex); + stdx::unique_lock<stdx::mutex> lk(_sessionCloneMutex); if (!_lastFetchedOplogBuffer.empty()) { _lastFetchedOplog = _lastFetchedOplogBuffer.back(); @@ -139,25 +141,20 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC return true; } - _lastFetchedOplog = BSONObj(); + _lastFetchedOplog.reset(); if (_handleWriteHistory(lk, opCtx)) { return true; } - if (!_sessionCatalogCursor) { - DBDirectClient client(opCtx); - Query query; - query.sort(BSON("_id" << 1)); // strictly not required, but helps make test deterministic. - _sessionCatalogCursor = - client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), query); - } + _initIfNotYet(lk, opCtx); - while (_sessionCatalogCursor->more()) { - auto nextSession = SessionTxnRecord::parse( - IDLParserErrorContext("Session migration cloning"), _sessionCatalogCursor->next()); - _writeHistoryIterator = - stdx::make_unique<TransactionHistoryIterator>(nextSession.getLastWriteOpTimeTs()); + while (!_sessionLastWriteOpTimes.empty()) { + auto lowestOpTimeIter = _sessionLastWriteOpTimes.begin(); + auto nextOpTime = *lowestOpTimeIter; + _sessionLastWriteOpTimes.erase(lowestOpTimeIter); + + _writeHistoryIterator = stdx::make_unique<TransactionHistoryIterator>(nextOpTime); if (_handleWriteHistory(lk, opCtx)) { return true; } @@ -168,48 +165,67 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC bool SessionCatalogMigrationSource::_hasNewWrites() { stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); - return !_lastFetchedNewWriteOplog.isEmpty() || !_newWriteTsList.empty(); + return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty(); } -BSONObj SessionCatalogMigrationSource::_getLastFetchedNewWriteOplog() { +repl::OplogEntry SessionCatalogMigrationSource::_getLastFetchedNewWriteOplog() { stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); - return _lastFetchedNewWriteOplog; + invariant(_lastFetchedNewWriteOplog); + return *_lastFetchedNewWriteOplog; } bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* opCtx) { - Timestamp nextOplogTsToFetch; + repl::OpTime nextOpTimeToFetch; { stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); - if (_newWriteTsList.empty()) { - _lastFetchedNewWriteOplog = BSONObj(); + if (_newWriteOpTimeList.empty()) { + _lastFetchedNewWriteOplog.reset(); return false; } - nextOplogTsToFetch = _newWriteTsList.front(); - _newWriteTsList.pop_front(); + nextOpTimeToFetch = _newWriteOpTimeList.front(); } DBDirectClient client(opCtx); - auto newWriteOplog = client.findOne(NamespaceString::kRsOplogNamespace.ns(), - Query(BSON("ts" << nextOplogTsToFetch))); + auto newWriteOplog = + client.findOne(NamespaceString::kRsOplogNamespace.ns(), nextOpTimeToFetch.asQuery()); uassert(40620, - str::stream() << "Unable to fetch oplog entry with ts: " << nextOplogTsToFetch.toBSON(), + str::stream() << "Unable to fetch oplog entry with opTime: " + << nextOpTimeToFetch.toBSON(), !newWriteOplog.isEmpty()); { stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); - _lastFetchedNewWriteOplog = newWriteOplog; + _lastFetchedNewWriteOplog = uassertStatusOK(repl::OplogEntry::parse(newWriteOplog)); + _newWriteOpTimeList.pop_front(); } return true; } -void SessionCatalogMigrationSource::notifyNewWriteTS(Timestamp opTimestamp) { +void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) { stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); - _newWriteTsList.push_back(opTimestamp); + _newWriteOpTimeList.push_back(opTime); +} + +void SessionCatalogMigrationSource::_initIfNotYet(WithLock, OperationContext* opCtx) { + if (_alreadyInitialized) { + return; + } + + DBDirectClient client(opCtx); + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), {}); + + while (cursor->more()) { + auto nextSession = SessionTxnRecord::parse( + IDLParserErrorContext("Session migration cloning"), cursor->next()); + _sessionLastWriteOpTimes.insert(nextSession.getLastWriteOpTime()); + } + + _alreadyInitialized = true; } } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 5e44531d31c..c1e6729aca3 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -28,11 +28,13 @@ #pragma once +#include <boost/optional.hpp> #include <memory> #include "mongo/base/disallow_copying.h" #include "mongo/client/dbclientcursor.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/with_lock.h" @@ -70,17 +72,19 @@ public: * Returns the oplog document that was last fetched by the fetchNextOplog call. * Returns an empty object if there are no oplog to fetch. */ - BSONObj getLastFetchedOplog(); + boost::optional<repl::OplogEntry> getLastFetchedOplog(); /** * Remembers the oplog timestamp of a new write that just occurred. */ - void notifyNewWriteTS(Timestamp opTimestamp); + void notifyNewWriteOpTime(repl::OpTime opTimestamp); private: /////////////////////////////////////////////////////////////////////////// // Methods for extracting the oplog entries from session information. + void _initIfNotYet(WithLock, OperationContext* opCtx); + /** * If this returns false, it just means that there are no more oplog entry in the buffer that * needs to be moved over. However, there can still be new incoming operations that can add @@ -97,7 +101,7 @@ private: /** * Returns the document that was last fetched by fetchNextOplogFromSessionCatalog. */ - BSONObj _getLastFetchedOplogFromSessionCatalog(); + repl::OplogEntry _getLastFetchedOplogFromSessionCatalog(); /** * Extracts oplog information from the current writeHistoryIterator to _lastFetchedOplog. This @@ -124,16 +128,17 @@ private: /** * Returns the oplog that was last fetched by fetchNextNewWriteOplog. */ - BSONObj _getLastFetchedNewWriteOplog(); + repl::OplogEntry _getLastFetchedNewWriteOplog(); const NamespaceString _ns; - // Protects _sessionCatalogCursor, _writeHistoryIterator, + // Protects _alreadyInitialized, _sessionCatalogCursor, _writeHistoryIterator // _lastFetchedOplogBuffer, _lastFetchedOplog stdx::mutex _sessionCloneMutex; - // Cursor for iterating over the session catalog. - std::unique_ptr<DBClientCursor> _sessionCatalogCursor; + bool _alreadyInitialized = false; + + std::set<repl::OpTime> _sessionLastWriteOpTimes; // Iterator for oplog entries for a specific transaction. std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator; @@ -141,19 +146,19 @@ private: // Used for temporarily storing oplog entries for operations that has more than one entry. // For example, findAndModify generates one for the actual operation and another for the // pre/post image. - std::vector<BSONObj> _lastFetchedOplogBuffer; + std::vector<repl::OplogEntry> _lastFetchedOplogBuffer; // Used to store the last fetched oplog. This enables calling get multiple times. - BSONObj _lastFetchedOplog; + boost::optional<repl::OplogEntry> _lastFetchedOplog; // Protects _newWriteTsList, _lastFetchedNewWriteOplog stdx::mutex _newOplogMutex; - // Stores oplog timestamps of new writes that are coming in. - std::list<Timestamp> _newWriteTsList; + // Stores oplog opTime of new writes that are coming in. + std::list<repl::OpTime> _newWriteOpTimeList; // Used to store the last fetched oplog from _newWriteTsList. - BSONObj _lastFetchedNewWriteOplog; + boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplog; }; } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index ff43cce5d0b..f7051904a9a 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -56,18 +56,18 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); - entry1.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); insertOplogEntry(entry1); repl::OplogEntry entry2( repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50)); - entry2.setPrevWriteTsInTransaction(entry1.getTimestamp()); + entry2.setPrevWriteOpTimeInTransaction(entry1.getOpTime()); insertOplogEntry(entry2); SessionTxnRecord sessionRecord; sessionRecord.setSessionId(makeLogicalSessionIdForTest()); sessionRecord.setTxnNum(1); - sessionRecord.setLastWriteOpTimeTs(entry2.getTimestamp()); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); @@ -77,15 +77,19 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplog->toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON()); } ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); @@ -97,34 +101,33 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { repl::OplogEntry entry1a( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); - entry1a.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry1a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); repl::OplogEntry entry1b( repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50)); - entry1b.setPrevWriteTsInTransaction(entry1a.getTimestamp()); + entry1b.setPrevWriteOpTimeInTransaction(entry1a.getOpTime()); SessionTxnRecord sessionRecord1; sessionRecord1.setSessionId(makeLogicalSessionIdForTest()); sessionRecord1.setTxnNum(1); - sessionRecord1.setLastWriteOpTimeTs(entry1b.getTimestamp()); + sessionRecord1.setLastWriteOpTime(entry1b.getOpTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord1.toBSON()); - repl::OplogEntry entry2a( repl::OpTime(Timestamp(43, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); - entry2a.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry2a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); repl::OplogEntry entry2b( repl::OpTime(Timestamp(789, 13), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50)); - entry2b.setPrevWriteTsInTransaction(entry2a.getTimestamp()); + entry2b.setPrevWriteOpTimeInTransaction(entry2a.getOpTime()); SessionTxnRecord sessionRecord2; sessionRecord2.setSessionId(makeLogicalSessionIdForTest()); sessionRecord2.setTxnNum(1); - sessionRecord2.setLastWriteOpTimeTs(entry2b.getTimestamp()); + sessionRecord2.setLastWriteOpTime(entry2b.getOpTime()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord2.toBSON()); @@ -141,34 +144,27 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { const repl::OplogEntry& secondExpectedOplog) { { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(firstExpectedOplog.toBSON(), nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(firstExpectedOplog.toBSON(), nextOplog->toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(secondExpectedOplog.toBSON(), nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + ASSERT_BSONOBJ_EQ(secondExpectedOplog.toBSON(), nextOplog->toBSON()); } }; - if (sessionRecord1.getSessionId().toBSON().woCompare(sessionRecord2.getSessionId().toBSON()) < - 0) { - checkNextBatch(entry1b, entry1a); - - ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - ASSERT_TRUE(migrationSource.hasMoreOplog()); - - checkNextBatch(entry2b, entry2a); - } else { - checkNextBatch(entry2b, entry2a); + checkNextBatch(entry1b, entry1a); - ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - ASSERT_TRUE(migrationSource.hasMoreOplog()); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); - checkNextBatch(entry1b, entry1a); - } + checkNextBatch(entry2b, entry2a); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -181,18 +177,18 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 30)); - entry1.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); insertOplogEntry(entry1); repl::OplogEntry entry2( repl::OpTime(Timestamp(52, 346), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50)); - entry2.setPrevWriteTsInTransaction(Timestamp(0, 0)); - entry2.setPreImageTs(entry1.getTimestamp()); + entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry2.setPreImageOpTime(entry1.getOpTime()); insertOplogEntry(entry2); repl::OplogEntry entry3( repl::OpTime(Timestamp(73, 5), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 20)); - entry3.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); insertOplogEntry(entry3); repl::OplogEntry entry4(repl::OpTime(Timestamp(73, 6), 2), @@ -201,14 +197,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd kNs, BSON("x" << 19), BSON("$inc" << BSON("x" << 1))); - entry4.setPrevWriteTsInTransaction(entry2.getTimestamp()); - entry4.setPostImageTs(entry3.getTimestamp()); + entry4.setPrevWriteOpTimeInTransaction(entry2.getOpTime()); + entry4.setPostImageOpTime(entry3.getOpTime()); insertOplogEntry(entry4); SessionTxnRecord sessionRecord; sessionRecord.setSessionId(makeLogicalSessionIdForTest()); sessionRecord.setTxnNum(1); - sessionRecord.setLastWriteOpTimeTs(entry4.getTimestamp()); + sessionRecord.setLastWriteOpTime(entry4.getOpTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); @@ -216,12 +212,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd SessionCatalogMigrationSource migrationSource(kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - auto expectedSequece = {entry3.toBSON(), entry4.toBSON(), entry1.toBSON(), entry2.toBSON()}; + auto expectedSequece = {entry3, entry4, entry1, entry2}; - for (auto oplogDoc : expectedSequece) { + for (auto oplog : expectedSequece) { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(oplogDoc, nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplog->toBSON()); migrationSource.fetchNextOplog(opCtx()); } @@ -233,13 +231,13 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); - entry1.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); insertOplogEntry(entry1); SessionTxnRecord sessionRecord1; sessionRecord1.setSessionId(makeLogicalSessionIdForTest()); sessionRecord1.setTxnNum(1); - sessionRecord1.setLastWriteOpTimeTs(entry1.getTimestamp()); + sessionRecord1.setLastWriteOpTime(entry1.getOpTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), @@ -251,13 +249,13 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { repl::OpTypeEnum::kDelete, NamespaceString("x.y"), BSON("x" << 30)); - entry2.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); insertOplogEntry(entry2); SessionTxnRecord sessionRecord2; sessionRecord2.setSessionId(makeLogicalSessionIdForTest()); sessionRecord2.setTxnNum(1); - sessionRecord2.setLastWriteOpTimeTs(entry2.getTimestamp()); + sessionRecord2.setLastWriteOpTime(entry2.getOpTime()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord2.toBSON()); @@ -266,8 +264,10 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -278,13 +278,13 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); - entry1.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); insertOplogEntry(entry1); SessionTxnRecord sessionRecord1; sessionRecord1.setSessionId(makeLogicalSessionIdForTest()); sessionRecord1.setTxnNum(1); - sessionRecord1.setLastWriteOpTimeTs(entry1.getTimestamp()); + sessionRecord1.setLastWriteOpTime(entry1.getOpTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), @@ -292,38 +292,42 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { repl::OplogEntry entry2( repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); - entry2.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); insertOplogEntry(entry2); repl::OplogEntry entry3( repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40)); - entry3.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); insertOplogEntry(entry3); SessionCatalogMigrationSource migrationSource(kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - migrationSource.notifyNewWriteTS(entry2.getTimestamp()); - migrationSource.notifyNewWriteTS(entry3.getTimestamp()); + migrationSource.notifyNewWriteOpTime(entry2.getOpTime()); + migrationSource.notifyNewWriteOpTime(entry3.getOpTime()); { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplog->toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(entry3.toBSON(), nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + ASSERT_BSONOBJ_EQ(entry3.toBSON(), nextOplog->toBSON()); } ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); @@ -336,7 +340,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) { SessionCatalogMigrationSource migrationSource(kNs); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); - migrationSource.notifyNewWriteTS(Timestamp(100, 3)); + migrationSource.notifyNewWriteOpTime(repl::OpTime(Timestamp(100, 3), 1)); ASSERT_TRUE(migrationSource.hasMoreOplog()); ASSERT_THROWS(migrationSource.fetchNextOplog(opCtx()), AssertionException); } @@ -353,15 +357,17 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); - entry.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); insertOplogEntry(entry); - migrationSource.notifyNewWriteTS(entry.getTimestamp()); + migrationSource.notifyNewWriteOpTime(entry.getOpTime()); ASSERT_TRUE(migrationSource.hasMoreOplog()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(entry.toBSON(), nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -370,15 +376,16 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer { repl::OplogEntry entry( repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); - entry.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); insertOplogEntry(entry); - migrationSource.notifyNewWriteTS(entry.getTimestamp()); + migrationSource.notifyNewWriteOpTime(entry.getOpTime()); ASSERT_TRUE(migrationSource.hasMoreOplog()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(entry.toBSON(), nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -387,15 +394,16 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer { repl::OplogEntry entry( repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40)); - entry.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); insertOplogEntry(entry); - migrationSource.notifyNewWriteTS(entry.getTimestamp()); + migrationSource.notifyNewWriteOpTime(entry.getOpTime()); ASSERT_TRUE(migrationSource.hasMoreOplog()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - auto nextDoc = migrationSource.getLastFetchedOplog(); - ASSERT_BSONOBJ_EQ(entry.toBSON(), nextDoc); + auto nextOplog = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplog); + ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 13edd3d5535..c2ac4731b87 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -81,11 +81,11 @@ void fassertOnRepeatedExecution(OperationContext* opCtx, const LogicalSessionId& lsid, TxnNumber txnNumber, StmtId stmtId, - Timestamp firstTs, - Timestamp secondTs) { + const repl::OpTime& firstOpTime, + const repl::OpTime& secondOpTime) { severe() << "Statement id " << stmtId << " from transaction [ " << lsid.toBSON() << ":" - << txnNumber << " ] was committed once with timestamp " << firstTs - << " and a second time with timestamp " << secondTs + << txnNumber << " ] was committed once with opTime " << firstOpTime + << " and a second time with opTime " << secondOpTime << ". This indicates possible data corruption or server bug and the process will be " "terminated."; fassertFailed(40526); @@ -132,20 +132,20 @@ void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { CommittedStatementTimestampMap activeTxnCommittedStatements; if (lastWrittenTxnRecord) { - auto it = TransactionHistoryIterator(lastWrittenTxnRecord->getLastWriteOpTimeTs()); + auto it = TransactionHistoryIterator(lastWrittenTxnRecord->getLastWriteOpTime()); while (it.hasNext()) { const auto entry = it.next(opCtx); invariant(entry.getStatementId()); const auto insertRes = activeTxnCommittedStatements.emplace(*entry.getStatementId(), - entry.getTimestamp()); + entry.getOpTime()); if (!insertRes.second) { - const auto& existingTs = insertRes.first->second; + const auto& existingOpTime = insertRes.first->second; fassertOnRepeatedExecution(opCtx, _sessionId, lastWrittenTxnRecord->getTxnNum(), *entry.getStatementId(), - existingTs, - entry.getTimestamp()); + existingOpTime, + entry.getOpTime()); } } } @@ -177,28 +177,29 @@ void Session::beginTxn(OperationContext* opCtx, TxnNumber txnNumber) { void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, - Timestamp lastStmtIdWriteTs) { + const repl::OpTime& lastStmtIdWriteOpTime) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); stdx::unique_lock<stdx::mutex> ul(_mutex); // Sanity check that we don't double-execute statements for (const auto stmtId : stmtIdsWritten) { - const auto stmtTimestamp = _checkStatementExecuted(ul, txnNumber, stmtId); - if (stmtTimestamp) { + const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId); + if (stmtOpTime) { fassertOnRepeatedExecution( - opCtx, _sessionId, txnNumber, stmtId, *stmtTimestamp, lastStmtIdWriteTs); + opCtx, _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime); } } - const auto updateRequest = _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteTs); + const auto updateRequest = _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime); ul.unlock(); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); updateSessionEntry(opCtx, updateRequest); - _registerUpdateCacheOnCommit(opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteTs); + _registerUpdateCacheOnCommit( + opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); } void Session::updateSessionRecordOnSecondary(OperationContext* opCtx, @@ -233,15 +234,15 @@ void Session::invalidate() { _activeTxnCommittedStatements.clear(); } -Timestamp Session::getLastWriteOpTimeTs(TxnNumber txnNumber) const { +repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const { stdx::lock_guard<stdx::mutex> lg(_mutex); _checkValid(lg); _checkIsActiveTransaction(lg, txnNumber); if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber) - return Timestamp(); + return {}; - return _lastWrittenSessionRecord->getLastWriteOpTimeTs(); + return _lastWrittenSessionRecord->getLastWriteOpTime(); } boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationContext* opCtx, @@ -303,9 +304,9 @@ void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const { txnNumber == _activeTxnNumber); } -boost::optional<Timestamp> Session::_checkStatementExecuted(WithLock wl, - TxnNumber txnNumber, - StmtId stmtId) const { +boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl, + TxnNumber txnNumber, + StmtId stmtId) const { _checkValid(wl); _checkIsActiveTransaction(wl, txnNumber); @@ -321,22 +322,21 @@ boost::optional<Timestamp> Session::_checkStatementExecuted(WithLock wl, UpdateRequest Session::_makeUpdateRequest(WithLock, TxnNumber newTxnNumber, - Timestamp newLastWriteTs) const { + const repl::OpTime& newLastWriteOpTime) const { UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); if (_lastWrittenSessionRecord) { updateRequest.setQuery(_lastWrittenSessionRecord->toBSON()); - updateRequest.setUpdates( - BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName - << newTxnNumber - << SessionTxnRecord::kLastWriteOpTimeTsFieldName - << newLastWriteTs))); + updateRequest.setUpdates(BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName + << newTxnNumber + << SessionTxnRecord::kLastWriteOpTimeFieldName + << newLastWriteOpTime))); } else { const auto updateBSON = [&] { SessionTxnRecord newTxnRecord; newTxnRecord.setSessionId(_sessionId); newTxnRecord.setTxnNum(newTxnNumber); - newTxnRecord.setLastWriteOpTimeTs(newLastWriteTs); + newTxnRecord.setLastWriteOpTime(newLastWriteOpTime); return newTxnRecord.toBSON(); }(); @@ -351,13 +351,13 @@ UpdateRequest Session::_makeUpdateRequest(WithLock, void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, std::vector<StmtId> stmtIdsWritten, - Timestamp lastStmtIdWriteTs) { + const repl::OpTime& lastStmtIdWriteOpTime) { opCtx->recoveryUnit()->onCommit([ this, opCtx, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), - lastStmtIdWriteTs + lastStmtIdWriteOpTime ] { stdx::lock_guard<stdx::mutex> lg(_mutex); @@ -371,13 +371,13 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, _lastWrittenSessionRecord->setSessionId(_sessionId); _lastWrittenSessionRecord->setTxnNum(newTxnNumber); - _lastWrittenSessionRecord->setLastWriteOpTimeTs(lastStmtIdWriteTs); + _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); } else { if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) _lastWrittenSessionRecord->setTxnNum(newTxnNumber); - if (lastStmtIdWriteTs > _lastWrittenSessionRecord->getLastWriteOpTimeTs()) - _lastWrittenSessionRecord->setLastWriteOpTimeTs(lastStmtIdWriteTs); + if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) + _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); } if (newTxnNumber > _activeTxnNumber) { @@ -392,11 +392,15 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, if (newTxnNumber == _activeTxnNumber) { for (const auto stmtId : stmtIdsWritten) { const auto insertRes = - _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteTs); + _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); if (!insertRes.second) { - const auto& existingTs = insertRes.first->second; - fassertOnRepeatedExecution( - opCtx, _sessionId, newTxnNumber, stmtId, existingTs, lastStmtIdWriteTs); + const auto& existingOpTime = insertRes.first->second; + fassertOnRepeatedExecution(opCtx, + _sessionId, + newTxnNumber, + stmtId, + existingOpTime, + lastStmtIdWriteOpTime); } } } diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 68501f6da15..21d8d618a9a 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -87,7 +87,7 @@ public: * Called after a write under the specified transaction completes while the node is a primary * and specifies the statement ids which were written. Must be called while the caller is still * in the write's WUOW. Updates the on-disk state of the session to match the specified - * transaction/timestamp and keeps the cached state in sync. + * transaction/opTime and keeps the cached state in sync. * * Must only be called with the session checked-out. * @@ -96,7 +96,7 @@ public: void onWriteOpCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, - Timestamp lastStmtIdWriteTs); + const repl::OpTime& lastStmtIdWriteOpTime); /** * Called after a replication batch has been applied on a secondary node. Keeps the session @@ -120,7 +120,7 @@ public: * * Throws if the session has been invalidated or the active transaction number doesn't match. */ - Timestamp getLastWriteOpTimeTs(TxnNumber txnNumber) const; + repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const; /** * Returns the oplog entry with the given statementId for the specified transaction, if it @@ -142,18 +142,18 @@ private: void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const; - boost::optional<Timestamp> _checkStatementExecuted(WithLock, - TxnNumber txnNumber, - StmtId stmtId) const; + boost::optional<repl::OpTime> _checkStatementExecuted(WithLock, + TxnNumber txnNumber, + StmtId stmtId) const; UpdateRequest _makeUpdateRequest(WithLock, TxnNumber newTxnNumber, - Timestamp newLastWriteTs) const; + const repl::OpTime& newLastWriteTs) const; void _registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, std::vector<StmtId> stmtIdsWritten, - Timestamp lastStmtIdWriteTs); + const repl::OpTime& lastStmtIdWriteTs); const LogicalSessionId _sessionId; @@ -176,9 +176,9 @@ private: TxnNumber _activeTxnNumber{kUninitializedTxnNumber}; // For the active txn, tracks which statement ids have been committed and at which oplog - // timestamp. Used for fast retryability check and retrieving the previous write's data without + // opTime. Used for fast retryability check and retrieving the previous write's data without // having to scan through the oplog. - using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, Timestamp>; + using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>; CommittedStatementTimestampMap _activeTxnCommittedStatements; }; diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index df163a97d7b..168232a2c25 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -68,7 +68,7 @@ protected: const LogicalSessionId& lsid, TxnNumber txnNumber, StmtId stmtId) { - return logOp(opCtx, nss, lsid, txnNumber, stmtId, Timestamp()); + return logOp(opCtx, nss, lsid, txnNumber, stmtId, {}); } static repl::OpTime logOp(OperationContext* opCtx, @@ -76,13 +76,13 @@ protected: const LogicalSessionId& lsid, TxnNumber txnNumber, StmtId stmtId, - Timestamp prevTs) { + repl::OpTime prevOpTime) { OperationSessionInfo osi; osi.setSessionId(lsid); osi.setTxnNumber(txnNumber); repl::OplogLink link; - link.prevTs = prevTs; + link.prevOpTime = prevOpTime; return repl::logOp( opCtx, "n", nss, kUUID, BSON("TestValue" << 0), nullptr, false, osi, stmtId, link); @@ -98,7 +98,7 @@ TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) { session.beginTxn(opCtx(), txnNum); ASSERT_EQ(sessionId, session.getSessionId()); - ASSERT(session.getLastWriteOpTimeTs(txnNum).isNull()); + ASSERT(session.getLastWriteOpTime(txnNum).isNull()); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), @@ -119,7 +119,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime); wuow.commit(); return opTime; @@ -136,8 +136,8 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { ASSERT(!cursor->more()); ASSERT_EQ(sessionId, txnRecord.getSessionId()); ASSERT_EQ(txnNum, txnRecord.getTxnNum()); - ASSERT_EQ(opTime.getTimestamp(), txnRecord.getLastWriteOpTimeTs()); - ASSERT_EQ(opTime.getTimestamp(), session.getLastWriteOpTimeTs(txnNum)); + ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime()); + ASSERT_EQ(opTime, session.getLastWriteOpTime(txnNum)); } TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { @@ -145,20 +145,20 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); - const auto writeTxnRecordFn = [&](TxnNumber txnNum, StmtId stmtId, Timestamp prevTs) { + const auto writeTxnRecordFn = [&](TxnNumber txnNum, StmtId stmtId, repl::OpTime prevOpTime) { session.beginTxn(opCtx(), txnNum); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); - const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevTs); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime.getTimestamp()); + const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime); wuow.commit(); - return opTime.getTimestamp(); + return opTime; }; - const auto firstTs = writeTxnRecordFn(100, 0, Timestamp()); - const auto secondTs = writeTxnRecordFn(200, 1, firstTs); + const auto firstOpTime = writeTxnRecordFn(100, 0, {}); + const auto secondOpTime = writeTxnRecordFn(200, 1, firstOpTime); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), @@ -171,12 +171,12 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { ASSERT(!cursor->more()); ASSERT_EQ(sessionId, txnRecord.getSessionId()); ASSERT_EQ(200, txnRecord.getTxnNum()); - ASSERT_EQ(secondTs, txnRecord.getLastWriteOpTimeTs()); - ASSERT_EQ(secondTs, session.getLastWriteOpTimeTs(200)); + ASSERT_EQ(secondOpTime, txnRecord.getLastWriteOpTime()); + ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200)); session.invalidate(); session.refreshFromStorageIfNeeded(opCtx()); - ASSERT_EQ(secondTs, session.getLastWriteOpTimeTs(200)); + ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200)); } TEST_F(SessionTest, StartingOldTxnShouldAssert) { @@ -189,7 +189,7 @@ TEST_F(SessionTest, StartingOldTxnShouldAssert) { ASSERT_THROWS_CODE( session.beginTxn(opCtx(), txnNum - 1), AssertionException, ErrorCodes::TransactionTooOld); - ASSERT(session.getLastWriteOpTimeTs(txnNum).isNull()); + ASSERT(session.getLastWriteOpTime(txnNum).isNull()); } TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) { @@ -209,7 +209,7 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()), + ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime), AssertionException); } @@ -221,22 +221,22 @@ TEST_F(SessionTest, CheckStatementExecuted) { const TxnNumber txnNum = 100; session.beginTxn(opCtx(), txnNum); - const auto writeTxnRecordFn = [&](StmtId stmtId, Timestamp prevTs) { + const auto writeTxnRecordFn = [&](StmtId stmtId, repl::OpTime prevOpTime) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); - const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevTs); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime.getTimestamp()); + const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime); wuow.commit(); - return opTime.getTimestamp(); + return opTime; }; ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 1000)); - const auto firstTs = writeTxnRecordFn(1000, Timestamp()); + const auto firstOpTime = writeTxnRecordFn(1000, {}); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 2000)); - writeTxnRecordFn(2000, firstTs); + writeTxnRecordFn(2000, firstOpTime); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000)); // Invalidate the session and ensure the statements still check out @@ -282,7 +282,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime); wuow.commit(); } @@ -290,10 +290,9 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0); - ASSERT_THROWS_CODE( - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime.getTimestamp()), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } } @@ -311,10 +310,9 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) { session.invalidate(); - ASSERT_THROWS_CODE( - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { @@ -329,7 +327,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime); session.invalidate(); diff --git a/src/mongo/db/session_txn_record.cpp b/src/mongo/db/session_txn_record.cpp index b5b92a644eb..89b627e7dce 100644 --- a/src/mongo/db/session_txn_record.cpp +++ b/src/mongo/db/session_txn_record.cpp @@ -34,12 +34,14 @@ namespace mongo { -SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, TxnNumber txnNum, Timestamp ts) { +SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, + TxnNumber txnNum, + repl::OpTime opTime) { SessionTxnRecord record; record.setSessionId(lsid); record.setTxnNum(txnNum); - record.setLastWriteOpTimeTs(ts); + record.setLastWriteOpTime(std::move(opTime)); return record; } diff --git a/src/mongo/db/session_txn_record.h b/src/mongo/db/session_txn_record.h index d6425a7076d..b738e9b1dc3 100644 --- a/src/mongo/db/session_txn_record.h +++ b/src/mongo/db/session_txn_record.h @@ -30,13 +30,14 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/logical_session_id.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/session_txn_record_gen.h" namespace mongo { inline bool operator==(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs) { return (lhs.getSessionId() == rhs.getSessionId()) && (lhs.getTxnNum() == rhs.getTxnNum()) && - (lhs.getLastWriteOpTimeTs() == rhs.getLastWriteOpTimeTs()); + (lhs.getLastWriteOpTime() == rhs.getLastWriteOpTime()); } inline bool operator!=(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs) { @@ -52,10 +53,9 @@ inline bool operator>(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs) invariant(lhs.getSessionId() == rhs.getSessionId()); return (lhs.getTxnNum() > rhs.getTxnNum()) || - (lhs.getTxnNum() == rhs.getTxnNum() && - lhs.getLastWriteOpTimeTs() > rhs.getLastWriteOpTimeTs()); + (lhs.getTxnNum() == rhs.getTxnNum() && lhs.getLastWriteOpTime() > rhs.getLastWriteOpTime()); } -SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, TxnNumber txnNum, Timestamp ts); +SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, TxnNumber txnNum, repl::OpTime opTime); } // namespace mongo diff --git a/src/mongo/db/session_txn_record.idl b/src/mongo/db/session_txn_record.idl index 3a9dc02328d..fa81319225b 100644 --- a/src/mongo/db/session_txn_record.idl +++ b/src/mongo/db/session_txn_record.idl @@ -30,11 +30,21 @@ global: cpp_namespace: "mongo" cpp_includes: - "mongo/db/logical_session_id.h" + - "mongo/db/repl/optime.h" imports: - "mongo/idl/basic_types.idl" - "mongo/db/logical_session_id.idl" +# TODO: SERVER-31278 import from repl/replication_types.idl instead +types: + optime: + bson_serialization_type: object + description: A document representing an OpTime. + cpp_type: "repl::OpTime" + serializer: repl::OpTime::toBSON + deserializer: repl::OpTime::parse + structs: sessionTxnRecord: description: "A document used for storing session transaction states." @@ -46,8 +56,8 @@ structs: txnNum: type: TxnNumber description: "The id representing this transaction." - lastWriteOpTimeTs: - type: timestamp + lastWriteOpTime: + type: optime description: "The optime timestamp component of the last write on this transaction." diff --git a/src/mongo/db/transaction_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp index 4d66c0eadde..3220597dd61 100644 --- a/src/mongo/db/transaction_history_iterator.cpp +++ b/src/mongo/db/transaction_history_iterator.cpp @@ -38,11 +38,11 @@ namespace mongo { -TransactionHistoryIterator::TransactionHistoryIterator(Timestamp startingOpTimeTs) - : _nextOpTimeTs(std::move(startingOpTimeTs)) {} +TransactionHistoryIterator::TransactionHistoryIterator(repl::OpTime startingOpTime) + : _nextOpTime(std::move(startingOpTime)) {} bool TransactionHistoryIterator::hasNext() const { - return !_nextOpTimeTs.isNull(); + return !_nextOpTime.isNull(); } repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) { @@ -50,26 +50,24 @@ repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) { DBDirectClient client(opCtx); // TODO: SERVER-29843 oplogReplay option might be needed to activate fast ts search. - auto oplogBSON = - client.findOne(NamespaceString::kRsOplogNamespace.ns(), - BSON(repl::OplogEntryBase::kTimestampFieldName << _nextOpTimeTs)); + auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(), _nextOpTime.asQuery()); uassert(ErrorCodes::IncompleteTransactionHistory, str::stream() << "oplog no longer contains the complete write history of this " - "transaction, log with ts " - << _nextOpTimeTs.toBSON() + "transaction, log with opTime " + << _nextOpTime.toBSON() << " cannot be found", !oplogBSON.isEmpty()); auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON)); - const auto& oplogPrevTsOption = oplogEntry.getPrevWriteTsInTransaction(); + const auto& oplogPrevTsOption = oplogEntry.getPrevWriteOpTimeInTransaction(); uassert( ErrorCodes::FailedToParse, str::stream() << "Missing prevTs field on oplog entry of previous write in transcation: " << redact(oplogBSON), oplogPrevTsOption); - _nextOpTimeTs = oplogPrevTsOption.value(); + _nextOpTime = oplogPrevTsOption.value(); return oplogEntry; } diff --git a/src/mongo/db/transaction_history_iterator.h b/src/mongo/db/transaction_history_iterator.h index 3b6dee175d1..4e455ec3ced 100644 --- a/src/mongo/db/transaction_history_iterator.h +++ b/src/mongo/db/transaction_history_iterator.h @@ -28,7 +28,6 @@ #pragma once -#include "mongo/bson/timestamp.h" #include "mongo/db/repl/oplog_entry.h" namespace mongo { @@ -44,7 +43,7 @@ public: /** * Creates a new iterator starting with an oplog entry with the given start opTime. */ - TransactionHistoryIterator(Timestamp startingOpTimeTs); + TransactionHistoryIterator(repl::OpTime startingOpTime); /** * Returns false if there are no more entries to iterate. @@ -60,7 +59,7 @@ public: repl::OplogEntry next(OperationContext* opCtx); private: - Timestamp _nextOpTimeTs; + repl::OpTime _nextOpTime; }; } // namespace mongo diff --git a/src/mongo/db/transaction_history_iterator_test.cpp b/src/mongo/db/transaction_history_iterator_test.cpp index 687ced1b8cf..45ef909d8eb 100644 --- a/src/mongo/db/transaction_history_iterator_test.cpp +++ b/src/mongo/db/transaction_history_iterator_test.cpp @@ -58,7 +58,7 @@ TEST_F(SessionHistoryIteratorTest, NormalHistory) { repl::OpTypeEnum::kInsert, NamespaceString("a.b"), BSON("x" << 30)); - entry1.setPrevWriteTsInTransaction(Timestamp(0, 0)); + entry1.setPrevWriteOpTimeInTransaction(repl::OpTime()); insertOplogEntry(entry1); repl::OplogEntry entry2(repl::OpTime(Timestamp(67, 54801), 2), @@ -66,7 +66,7 @@ TEST_F(SessionHistoryIteratorTest, NormalHistory) { repl::OpTypeEnum::kInsert, NamespaceString("a.b"), BSON("y" << 50)); - entry2.setPrevWriteTsInTransaction(Timestamp(52, 345)); + entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(52, 345), 2)); insertOplogEntry(entry2); // Insert an unrelated entry in between @@ -75,7 +75,7 @@ TEST_F(SessionHistoryIteratorTest, NormalHistory) { repl::OpTypeEnum::kInsert, NamespaceString("a.b"), BSON("z" << 40)); - entry3.setPrevWriteTsInTransaction(Timestamp(22, 67)); + entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(22, 67), 2)); insertOplogEntry(entry3); repl::OplogEntry entry4(repl::OpTime(Timestamp(97, 2472), 2), @@ -83,29 +83,29 @@ TEST_F(SessionHistoryIteratorTest, NormalHistory) { repl::OpTypeEnum::kInsert, NamespaceString("a.b"), BSON("a" << 3)); - entry4.setPrevWriteTsInTransaction(Timestamp(67, 54801)); + entry4.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(67, 54801), 2)); insertOplogEntry(entry4); - TransactionHistoryIterator iter(Timestamp(97, 2472)); + TransactionHistoryIterator iter(repl::OpTime(Timestamp(97, 2472), 2)); { ASSERT_TRUE(iter.hasNext()); auto nextEntry = iter.next(opCtx()); - ASSERT_EQ(Timestamp(97, 2472), nextEntry.getTimestamp()); + ASSERT_EQ(repl::OpTime(Timestamp(97, 2472), 2), nextEntry.getOpTime()); ASSERT_BSONOBJ_EQ(BSON("a" << 3), nextEntry.getObject()); } { ASSERT_TRUE(iter.hasNext()); auto nextEntry = iter.next(opCtx()); - ASSERT_EQ(Timestamp(67, 54801), nextEntry.getTimestamp()); + ASSERT_EQ(repl::OpTime(Timestamp(67, 54801), 2), nextEntry.getOpTime()); ASSERT_BSONOBJ_EQ(BSON("y" << 50), nextEntry.getObject()); } { ASSERT_TRUE(iter.hasNext()); auto nextEntry = iter.next(opCtx()); - ASSERT_EQ(Timestamp(52, 345), nextEntry.getTimestamp()); + ASSERT_EQ(repl::OpTime(Timestamp(52, 345), 2), nextEntry.getOpTime()); ASSERT_BSONOBJ_EQ(BSON("x" << 30), nextEntry.getObject()); } @@ -118,10 +118,10 @@ TEST_F(SessionHistoryIteratorTest, StartAtZeroTSShouldNotBeAbleToIterate) { repl::OpTypeEnum::kInsert, NamespaceString("a.b"), BSON("y" << 50)); - entry.setPrevWriteTsInTransaction(Timestamp(52, 345)); + entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(52, 345), 1)); insertOplogEntry(entry); - TransactionHistoryIterator iter(Timestamp(0, 0)); + TransactionHistoryIterator iter({}); ASSERT_FALSE(iter.hasNext()); } @@ -131,14 +131,15 @@ TEST_F(SessionHistoryIteratorTest, NextShouldAssertIfHistoryIsTruncated) { repl::OpTypeEnum::kInsert, NamespaceString("a.b"), BSON("y" << 50)); - entry.setPrevWriteTsInTransaction(Timestamp(52, 345)); + entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(52, 345), 1)); insertOplogEntry(entry); - TransactionHistoryIterator iter(Timestamp(67, 54801)); + repl::OpTime opTime(Timestamp(67, 54801), 2); + TransactionHistoryIterator iter(opTime); ASSERT_TRUE(iter.hasNext()); auto nextEntry = iter.next(opCtx()); - ASSERT_EQ(Timestamp(67, 54801), nextEntry.getTimestamp()); + ASSERT_EQ(opTime, nextEntry.getOpTime()); ASSERT_BSONOBJ_EQ(BSON("y" << 50), nextEntry.getObject()); ASSERT_TRUE(iter.hasNext()); @@ -154,7 +155,7 @@ TEST_F(SessionHistoryIteratorTest, OplogInWriteHistoryChainWithMissingPrevTSShou BSON("y" << 50)); insertOplogEntry(entry); - TransactionHistoryIterator iter(Timestamp(67, 54801)); + TransactionHistoryIterator iter(repl::OpTime(Timestamp(67, 54801), 2)); ASSERT_TRUE(iter.hasNext()); ASSERT_THROWS_CODE(iter.next(opCtx()), AssertionException, ErrorCodes::FailedToParse); } diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp index 7257a56f29a..efdbb4bc3d2 100644 --- a/src/mongo/db/transaction_reaper.cpp +++ b/src/mongo/db/transaction_reaper.cpp @@ -75,6 +75,8 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(TransactionRecordMinimumLifetimeMinutes, const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1); const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1); +const auto kLastWriteTimestampFieldName = + SessionTxnRecord::kLastWriteOpTimeFieldName + "." + repl::OpTime::kTimestampFieldName; /** * Makes the query we'll use to scan the transactions table. @@ -89,7 +91,7 @@ Query makeQuery(Date_t now) { 0); BSONObjBuilder bob; { - BSONObjBuilder subbob(bob.subobjStart(SessionTxnRecord::kLastWriteOpTimeTsFieldName)); + BSONObjBuilder subbob(bob.subobjStart(kLastWriteTimestampFieldName)); subbob.append("$lt", possiblyExpired); } Query query(bob.obj()); |