summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-09-26 17:45:15 -0400
committerRandolph Tan <randolph@10gen.com>2017-10-04 16:43:58 -0400
commit0ab7000e04e16813c1e1e3f131f02de102ddffba (patch)
tree07c771aa1229bc85755f952dcc9a157a8d4e2dd2 /src/mongo/db
parentd6267ee66b997af73fcfb095f03f655bb61c06dc (diff)
downloadmongo-0ab7000e04e16813c1e1e3f131f02de102ddffba.tar.gz
SERVER-31030 Use full OpTime instead of just Timestamps to refer to oplog entries
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/op_observer_impl.cpp34
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp20
-rw-r--r--src/mongo/db/ops/write_ops_retryability_test.cpp58
-rw-r--r--src/mongo/db/repl/oplog.cpp46
-rw-r--r--src/mongo/db/repl/oplog.h25
-rw-r--r--src/mongo/db/repl/oplog_entry.idl21
-rw-r--r--src/mongo/db/repl/optime.cpp16
-rw-r--r--src/mongo/db/repl/optime.h3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp6
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp39
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp16
-rw-r--r--src/mongo/db/s/collection_sharding_state.h12
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.h14
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp43
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h10
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp123
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp69
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp106
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h29
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp156
-rw-r--r--src/mongo/db/session.cpp78
-rw-r--r--src/mongo/db/session.h20
-rw-r--r--src/mongo/db/session_test.cpp66
-rw-r--r--src/mongo/db/session_txn_record.cpp6
-rw-r--r--src/mongo/db/session_txn_record.h8
-rw-r--r--src/mongo/db/session_txn_record.idl14
-rw-r--r--src/mongo/db/transaction_history_iterator.cpp18
-rw-r--r--src/mongo/db/transaction_history_iterator.h5
-rw-r--r--src/mongo/db/transaction_history_iterator_test.cpp29
-rw-r--r--src/mongo/db/transaction_reaper.cpp4
30 files changed, 589 insertions, 505 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index b466b70c3bb..79906de7f82 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -75,13 +75,12 @@ void onWriteOpCompleted(OperationContext* opCtx,
Session* session,
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime) {
- const auto lastStmtIdWriteTs = lastStmtIdWriteOpTime.getTimestamp();
- if (lastStmtIdWriteTs.isNull())
+ if (lastStmtIdWriteOpTime.isNull())
return;
if (session) {
session->onWriteOpCompletedOnPrimary(
- opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteTs);
+ opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
}
}
@@ -140,7 +139,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
if (session) {
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
+ oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber());
}
OpTimeBundle opTimes;
@@ -160,9 +159,9 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
opTimes.prePostImageOpTime = noteUpdateOpTime;
if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) {
- oplogLink.preImageTs = noteUpdateOpTime.getTimestamp();
+ oplogLink.preImageOpTime = noteUpdateOpTime;
} else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) {
- oplogLink.postImageTs = noteUpdateOpTime.getTimestamp();
+ oplogLink.postImageOpTime = noteUpdateOpTime;
}
}
@@ -197,7 +196,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
if (session) {
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
+ oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber());
}
OpTimeBundle opTimes;
@@ -206,7 +205,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
auto noteOplog = repl::logOp(
opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, sessionInfo, stmtId, {});
opTimes.prePostImageOpTime = noteOplog;
- oplogLink.preImageTs = noteOplog.getTimestamp();
+ oplogLink.preImageOpTime = noteOplog;
}
opTimes.writeOpTime = repl::logOp(opCtx,
@@ -280,10 +279,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
- const size_t count = end - begin;
- auto timestamps = stdx::make_unique<Timestamp[]>(count);
- const auto lastOpTime =
- repl::logInsertOps(opCtx, nss, uuid, session, begin, end, timestamps.get(), fromMigrate);
+ const auto opTimeList = repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate);
auto css = CollectionShardingState::get(opCtx, nss.ns());
@@ -292,10 +288,12 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "i", nss, it->doc, nullptr);
if (!fromMigrate) {
- css->onInsertOp(opCtx, it->doc, timestamps[index]);
+ auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index];
+ css->onInsertOp(opCtx, it->doc, opTime);
}
}
+ auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back();
if (nss.coll() == "system.js") {
Scope::storedFuncMod(opCtx);
} else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) {
@@ -336,8 +334,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
args.criteria,
args.update,
args.updatedDoc,
- opTime.writeOpTime.getTimestamp(),
- opTime.prePostImageOpTime.getTimestamp());
+ opTime.writeOpTime,
+ opTime.prePostImageOpTime);
}
if (args.nss.coll() == "system.js") {
@@ -351,7 +349,6 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc);
}
-
onWriteOpCompleted(
opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime);
}
@@ -383,10 +380,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
auto css = CollectionShardingState::get(opCtx, nss.ns());
if (!fromMigrate) {
- css->onDeleteOp(opCtx,
- deleteState,
- opTime.writeOpTime.getTimestamp(),
- opTime.prePostImageOpTime.getTimestamp());
+ css->onDeleteOp(opCtx, deleteState, opTime.writeOpTime, opTime.prePostImageOpTime);
}
if (nss.coll() == "system.js") {
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
index 3033daa68f9..3d4fc90fb7e 100644
--- a/src/mongo/db/ops/write_ops_retryability.cpp
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -67,7 +67,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
uassert(40607,
str::stream() << "No pre-image available for findAndModify retry request:"
<< redact(request.toBSON()),
- oplogWithCorrectLinks.getPreImageTs());
+ oplogWithCorrectLinks.getPreImageOpTime());
} else if (opType == repl::OpTypeEnum::kInsert) {
uassert(
40608,
@@ -99,7 +99,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
<< ts.toString()
<< ", oplog: "
<< redact(oplogEntry.toBSON()),
- oplogWithCorrectLinks.getPostImageTs());
+ oplogWithCorrectLinks.getPostImageOpTime());
} else {
uassert(40612,
str::stream() << "findAndModify retry request: " << redact(request.toBSON())
@@ -108,7 +108,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
<< ts.toString()
<< ", oplog: "
<< redact(oplogEntry.toBSON()),
- oplogWithCorrectLinks.getPreImageTs());
+ oplogWithCorrectLinks.getPreImageOpTime());
}
}
}
@@ -118,21 +118,21 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
* oplog.
*/
BSONObj extractPreOrPostImage(OperationContext* opCtx, const repl::OplogEntry& oplog) {
- invariant(oplog.getPreImageTs() || oplog.getPostImageTs());
- auto ts =
- oplog.getPreImageTs() ? oplog.getPreImageTs().value() : oplog.getPostImageTs().value();
+ invariant(oplog.getPreImageOpTime() || oplog.getPostImageOpTime());
+ auto opTime = oplog.getPreImageOpTime() ? oplog.getPreImageOpTime().value()
+ : oplog.getPostImageOpTime().value();
DBDirectClient client(opCtx);
- auto oplogDoc = client.findOne(NamespaceString::kRsOplogNamespace.ns(), BSON("ts" << ts));
+ auto oplogDoc = client.findOne(NamespaceString::kRsOplogNamespace.ns(), opTime.asQuery());
uassert(40613,
str::stream() << "oplog no longer contains the complete write history of this "
- "transaction, log with ts "
- << ts.toString()
+ "transaction, log with opTime "
+ << opTime.toString()
<< " cannot be found",
!oplogDoc.isEmpty());
- auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogDoc));
+ auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogDoc));
return oplogEntry.getObject().getOwned();
}
diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp
index a50c51eba58..41774472760 100644
--- a/src/mongo/db/ops/write_ops_retryability_test.cpp
+++ b/src/mongo/db/ops/write_ops_retryability_test.cpp
@@ -287,9 +287,9 @@ TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPostImageButOplogHasPre) {
auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
request.setShouldReturnNew(true);
- Timestamp imageTs(120, 3);
+ repl::OpTime imageOpTime(Timestamp(120, 3), 1);
repl::OplogEntry noteOplog(
- repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
+ imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
insertOplogEntry(noteOplog);
@@ -299,7 +299,7 @@ TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPostImageButOplogHasPre) {
kNs,
BSON("x" << 1 << "y" << 1),
BSON("x" << 1));
- updateOplog.setPreImageTs(imageTs);
+ updateOplog.setPreImageOpTime(imageOpTime);
ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog),
AssertionException);
@@ -309,14 +309,14 @@ TEST_F(FindAndModifyRetryability, ErrorIfRequestIsUpdateButOplogIsDelete) {
auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
request.setShouldReturnNew(true);
- Timestamp imageTs(120, 3);
+ repl::OpTime imageOpTime(Timestamp(120, 3), 1);
repl::OplogEntry noteOplog(
- repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
+ imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
insertOplogEntry(noteOplog);
repl::OplogEntry oplog(repl::OpTime(), 0, repl::OpTypeEnum::kDelete, kNs, BSON("_id" << 1));
- oplog.setPreImageTs(imageTs);
+ oplog.setPreImageOpTime(imageOpTime);
ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, oplog), AssertionException);
}
@@ -325,9 +325,9 @@ TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPreImageButOplogHasPost) {
auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
request.setShouldReturnNew(false);
- Timestamp imageTs(120, 3);
+ repl::OpTime imageOpTime(Timestamp(120, 3), 1);
repl::OplogEntry noteOplog(
- repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
+ imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
insertOplogEntry(noteOplog);
@@ -337,7 +337,7 @@ TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPreImageButOplogHasPost) {
kNs,
BSON("x" << 1 << "y" << 1),
BSON("x" << 1));
- updateOplog.setPostImageTs(imageTs);
+ updateOplog.setPostImageOpTime(imageOpTime);
ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog),
AssertionException);
@@ -347,9 +347,9 @@ TEST_F(FindAndModifyRetryability, UpdateWithPreImage) {
auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
request.setShouldReturnNew(false);
- Timestamp imageTs(120, 3);
+ repl::OpTime imageOpTime(Timestamp(120, 3), 1);
repl::OplogEntry noteOplog(
- repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
+ imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
insertOplogEntry(noteOplog);
@@ -359,7 +359,7 @@ TEST_F(FindAndModifyRetryability, UpdateWithPreImage) {
kNs,
BSON("x" << 1 << "y" << 1),
BSON("x" << 1));
- updateOplog.setPreImageTs(imageTs);
+ updateOplog.setPreImageOpTime(imageOpTime);
auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog);
@@ -375,9 +375,9 @@ TEST_F(FindAndModifyRetryability, NestedUpdateWithPreImage) {
auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
request.setShouldReturnNew(false);
- Timestamp imageTs(120, 3);
+ repl::OpTime imageOpTime(Timestamp(120, 3), 1);
repl::OplogEntry noteOplog(
- repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
+ imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
insertOplogEntry(noteOplog);
@@ -394,7 +394,7 @@ TEST_F(FindAndModifyRetryability, NestedUpdateWithPreImage) {
kNs,
kNestedOplog,
innerOplog.toBSON());
- updateOplog.setPreImageTs(imageTs);
+ updateOplog.setPreImageOpTime(imageOpTime);
auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog);
@@ -410,9 +410,9 @@ TEST_F(FindAndModifyRetryability, UpdateWithPostImage) {
auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
request.setShouldReturnNew(true);
- Timestamp imageTs(120, 3);
+ repl::OpTime imageOpTime(Timestamp(120, 3), 1);
repl::OplogEntry noteOplog(
- repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("a" << 1 << "b" << 1));
+ imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("a" << 1 << "b" << 1));
insertOplogEntry(noteOplog);
@@ -422,7 +422,7 @@ TEST_F(FindAndModifyRetryability, UpdateWithPostImage) {
kNs,
BSON("x" << 1 << "y" << 1),
BSON("x" << 1));
- updateOplog.setPostImageTs(imageTs);
+ updateOplog.setPostImageOpTime(imageOpTime);
auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog);
@@ -438,9 +438,9 @@ TEST_F(FindAndModifyRetryability, NestedUpdateWithPostImage) {
auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
request.setShouldReturnNew(true);
- Timestamp imageTs(120, 3);
+ repl::OpTime imageOpTime(Timestamp(120, 3), 1);
repl::OplogEntry noteOplog(
- repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("a" << 1 << "b" << 1));
+ imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("a" << 1 << "b" << 1));
insertOplogEntry(noteOplog);
@@ -457,7 +457,7 @@ TEST_F(FindAndModifyRetryability, NestedUpdateWithPostImage) {
kNs,
kNestedOplog,
innerOplog.toBSON());
- updateOplog.setPostImageTs(imageTs);
+ updateOplog.setPostImageOpTime(imageOpTime);
auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog);
@@ -473,14 +473,14 @@ TEST_F(FindAndModifyRetryability, UpdateWithPostImageButOplogDoesNotExistShouldE
auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
request.setShouldReturnNew(true);
- Timestamp imageTs(120, 3);
+ repl::OpTime imageOpTime(Timestamp(120, 3), 1);
repl::OplogEntry updateOplog(repl::OpTime(),
0,
repl::OpTypeEnum::kUpdate,
kNs,
BSON("x" << 1 << "y" << 1),
BSON("x" << 1));
- updateOplog.setPostImageTs(imageTs);
+ updateOplog.setPostImageOpTime(imageOpTime);
ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog),
AssertionException);
@@ -489,15 +489,15 @@ TEST_F(FindAndModifyRetryability, UpdateWithPostImageButOplogDoesNotExistShouldE
TEST_F(FindAndModifyRetryability, BasicRemove) {
auto request = FindAndModifyRequest::makeRemove(kNs, BSONObj());
- Timestamp imageTs(120, 3);
+ repl::OpTime imageOpTime(Timestamp(120, 3), 1);
repl::OplogEntry noteOplog(
- repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("_id" << 20 << "a" << 1));
+ imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("_id" << 20 << "a" << 1));
insertOplogEntry(noteOplog);
repl::OplogEntry removeOplog(
repl::OpTime(), 0, repl::OpTypeEnum::kDelete, kNs, BSON("_id" << 20));
- removeOplog.setPreImageTs(imageTs);
+ removeOplog.setPreImageOpTime(imageOpTime);
auto result = parseOplogEntryForFindAndModify(opCtx(), request, removeOplog);
@@ -511,9 +511,9 @@ TEST_F(FindAndModifyRetryability, BasicRemove) {
TEST_F(FindAndModifyRetryability, NestedRemove) {
auto request = FindAndModifyRequest::makeRemove(kNs, BSONObj());
- Timestamp imageTs(120, 3);
+ repl::OpTime imageOpTime(Timestamp(120, 3), 1);
repl::OplogEntry noteOplog(
- repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("_id" << 20 << "a" << 1));
+ imageOpTime, 0, repl::OpTypeEnum::kNoop, kNs, BSON("_id" << 20 << "a" << 1));
insertOplogEntry(noteOplog);
@@ -526,7 +526,7 @@ TEST_F(FindAndModifyRetryability, NestedRemove) {
kNs,
kNestedOplog,
innerOplog.toBSON());
- removeOplog.setPreImageTs(imageTs);
+ removeOplog.setPreImageOpTime(imageOpTime);
auto result = parseOplogEntryForFindAndModify(opCtx(), request, removeOplog);
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index b80dd5e0301..b2928a96b8e 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -300,14 +300,17 @@ void appendSessionInfo(OperationContext* opCtx,
sessionInfo.serialize(builder);
builder->append(OplogEntryBase::kStatementIdFieldName, statementId);
- builder->append(OplogEntryBase::kPrevWriteTsInTransactionFieldName, oplogLink.prevTs);
+ oplogLink.prevOpTime.append(builder,
+ OplogEntryBase::kPrevWriteOpTimeInTransactionFieldName.toString());
- if (!oplogLink.preImageTs.isNull()) {
- builder->append(OplogEntryBase::kPreImageTsFieldName, oplogLink.preImageTs);
+ if (!oplogLink.preImageOpTime.isNull()) {
+ oplogLink.preImageOpTime.append(builder,
+ OplogEntryBase::kPreImageOpTimeFieldName.toString());
}
- if (!oplogLink.postImageTs.isNull()) {
- builder->append(OplogEntryBase::kPostImageTsFieldName, oplogLink.postImageTs);
+ if (!oplogLink.postImageOpTime.isNull()) {
+ oplogLink.postImageOpTime.append(builder,
+ OplogEntryBase::kPostImageOpTimeFieldName.toString());
}
}
@@ -446,14 +449,13 @@ OpTime logOp(OperationContext* opCtx,
return slot.opTime;
}
-repl::OpTime logInsertOps(OperationContext* opCtx,
- const NamespaceString& nss,
- OptionalCollectionUUID uuid,
- Session* session,
- std::vector<InsertStatement>::const_iterator begin,
- std::vector<InsertStatement>::const_iterator end,
- Timestamp timestamps[],
- bool fromMigrate) {
+std::vector<OpTime> logInsertOps(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ Session* session,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate) {
invariant(begin != end);
auto replCoord = ReplicationCoordinator::get(opCtx);
@@ -478,10 +480,11 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
if (session) {
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
+ oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber());
}
- OpTime lastOpTime;
+ auto timestamps = stdx::make_unique<Timestamp[]>(count);
+ std::vector<OpTime> opTimes;
for (size_t i = 0; i < count; i++) {
// Make a mutable copy.
auto insertStatementOplogSlot = begin[i].oplogSlot;
@@ -502,19 +505,22 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
sessionInfo,
begin[i].stmtId,
oplogLink));
- oplogLink.prevTs = insertStatementOplogSlot.opTime.getTimestamp();
- timestamps[i] = oplogLink.prevTs;
- lastOpTime = insertStatementOplogSlot.opTime;
+ oplogLink.prevOpTime = insertStatementOplogSlot.opTime;
+ timestamps[i] = oplogLink.prevOpTime.getTimestamp();
+ opTimes.push_back(insertStatementOplogSlot.opTime);
}
std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]);
for (size_t i = 0; i < count; i++) {
basePtrs[i] = &writers[i];
}
+
+ invariant(!opTimes.empty());
+ auto lastOpTime = opTimes.back();
invariant(!lastOpTime.isNull());
- _logOpsInner(opCtx, nss, basePtrs.get(), timestamps, count, oplog, lastOpTime);
+ _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime);
wuow.commit();
- return lastOpTime;
+ return opTimes;
}
namespace {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 34a96b23b05..78f1c4f8103 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -78,9 +78,9 @@ class ReplSettings;
struct OplogLink {
OplogLink() = default;
- Timestamp prevTs;
- Timestamp preImageTs;
- Timestamp postImageTs;
+ OpTime prevOpTime;
+ OpTime preImageOpTime;
+ OpTime postImageOpTime;
};
/**
@@ -102,18 +102,15 @@ extern int OPLOG_VERSION;
/**
* Log insert(s) to the local oplog.
- * Returns the OpTime of the last insert.
- * The timestamps parameter can also be modified and contain the individual timestamps for each
- * insert after the oplog entries were created.
+ * Returns the OpTime of every insert.
*/
-OpTime logInsertOps(OperationContext* opCtx,
- const NamespaceString& nss,
- OptionalCollectionUUID uuid,
- Session* session,
- std::vector<InsertStatement>::const_iterator begin,
- std::vector<InsertStatement>::const_iterator end,
- Timestamp timestamps[],
- bool fromMigrate);
+std::vector<OpTime> logInsertOps(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ Session* session,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate);
/**
* @param opstr
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index a01b38f2ee0..3f1e7cee003 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -32,6 +32,7 @@ global:
imports:
- "mongo/idl/basic_types.idl"
- "mongo/db/logical_session_id.idl"
+ - "mongo/db/repl/replication_types.idl"
enums:
OpType:
@@ -110,18 +111,18 @@ structs:
# must exist.
description: "Identifier of the transaction statement which generated this oplog
entry"
- prevTs:
- cpp_name: prevWriteTsInTransaction
- type: timestamp
+ prevOpTime:
+ cpp_name: prevWriteOpTimeInTransaction
+ type: optime
optional: true # Only for writes that are part of a transaction
- description: "The oplog timestamp of the previous write with the same transaction."
- preImageTs:
- type: timestamp
+ description: "The opTime of the previous write with the same transaction."
+ preImageOpTime:
+ type: optime
optional: true
- description: "The oplog timestamp of another oplog entry that contains the document
+ description: "The optime of another oplog entry that contains the document
before an update/remove was applied."
- postImageTs:
- type: timestamp
+ postImageOpTime:
+ type: optime
optional: true
- description: "The oplog timestamp of another oplog entry that contains the document
+ description: "The optime of another oplog entry that contains the document
after an update was applied."
diff --git a/src/mongo/db/repl/optime.cpp b/src/mongo/db/repl/optime.cpp
index 9c63e0a7e88..2957f218374 100644
--- a/src/mongo/db/repl/optime.cpp
+++ b/src/mongo/db/repl/optime.cpp
@@ -91,6 +91,22 @@ std::ostream& operator<<(std::ostream& out, const OpTime& opTime) {
return out << opTime.toString();
}
+void OpTime::appendAsQuery(BSONObjBuilder* builder) const {
+ builder->append(kTimestampFieldName, _timestamp);
+ if (_term == kUninitializedTerm) {
+ // pv0 oplogs don't actually have the term field so don't query for {t: -1}.
+ builder->append(kTermFieldName, BSON("$exists" << false));
+ } else {
+ builder->append(kTermFieldName, _term);
+ }
+}
+
+BSONObj OpTime::asQuery() const {
+ BSONObjBuilder builder;
+ appendAsQuery(&builder);
+ return builder.obj();
+}
+
} // namespace repl
BSONObjBuilder& operator<<(BSONObjBuilderValueStream& builder, const repl::OpTime& value) {
diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h
index 05ee64df110..be16e923289 100644
--- a/src/mongo/db/repl/optime.h
+++ b/src/mongo/db/repl/optime.h
@@ -149,6 +149,9 @@ public:
friend std::ostream& operator<<(std::ostream& out, const OpTime& opTime);
+ void appendAsQuery(BSONObjBuilder* builder) const;
+ BSONObj asQuery() const;
+
private:
Timestamp _timestamp;
long long _term = kInitialTerm;
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 13b5df16db6..b25bc8a547b 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -544,16 +544,16 @@ SessionRecordMap computeLatestTransactionTableRecords(const MultiApplier::Operat
LogicalSessionId lsid(*sessionInfo.getSessionId());
auto txnNumber = *sessionInfo.getTxnNumber();
- auto opTimeTs = op.getOpTime().getTimestamp();
+ auto opTime = op.getOpTime();
auto it = latestRecords.find(lsid);
if (it != latestRecords.end()) {
- auto record = makeSessionTxnRecord(lsid, txnNumber, opTimeTs);
+ auto record = makeSessionTxnRecord(lsid, txnNumber, opTime);
if (record > it->second) {
latestRecords[lsid] = std::move(record);
}
} else {
- latestRecords.emplace(lsid, makeSessionTxnRecord(lsid, txnNumber, opTimeTs));
+ latestRecords.emplace(lsid, makeSessionTxnRecord(lsid, txnNumber, opTime));
}
}
return latestRecords;
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 343fe15b1cb..783917ea969 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -541,31 +541,40 @@ TEST_F(SyncTailTest, MultiApplyUpdatesTheTransactionTable) {
DBDirectClient client(_opCtx.get());
// The txnNum and optime of the only write were saved.
- auto resultSingle =
+ auto resultSingleDoc =
client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
BSON(SessionTxnRecord::kSessionIdFieldName << lsidSingle.toBSON()));
- ASSERT_TRUE(!resultSingle.isEmpty());
- ASSERT_EQ(resultSingle[SessionTxnRecord::kTxnNumFieldName].numberLong(), 5LL);
- ASSERT_EQ(resultSingle[SessionTxnRecord::kLastWriteOpTimeTsFieldName].timestamp(),
- Timestamp(Seconds(1), 0));
+ ASSERT_TRUE(!resultSingleDoc.isEmpty());
+
+ auto resultSingle =
+ SessionTxnRecord::parse(IDLParserErrorContext("resultSingleDoc test"), resultSingleDoc);
+
+ ASSERT_EQ(resultSingle.getTxnNum(), 5LL);
+ ASSERT_EQ(resultSingle.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(1), 0), 1));
// The txnNum and optime of the write with the larger txnNum were saved.
- auto resultDiffTxn =
+ auto resultDiffTxnDoc =
client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
BSON(SessionTxnRecord::kSessionIdFieldName << lsidDiffTxn.toBSON()));
- ASSERT_TRUE(!resultDiffTxn.isEmpty());
- ASSERT_EQ(resultDiffTxn[SessionTxnRecord::kTxnNumFieldName].numberLong(), 20LL);
- ASSERT_EQ(resultDiffTxn[SessionTxnRecord::kLastWriteOpTimeTsFieldName].timestamp(),
- Timestamp(Seconds(3), 0));
+ ASSERT_TRUE(!resultDiffTxnDoc.isEmpty());
+
+ auto resultDiffTxn =
+ SessionTxnRecord::parse(IDLParserErrorContext("resultDiffTxnDoc test"), resultDiffTxnDoc);
+
+ ASSERT_EQ(resultDiffTxn.getTxnNum(), 20LL);
+ ASSERT_EQ(resultDiffTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(3), 0), 1));
// The txnNum and optime of the write with the later optime were saved.
- auto resultSameTxn =
+ auto resultSameTxnDoc =
client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
BSON(SessionTxnRecord::kSessionIdFieldName << lsidSameTxn.toBSON()));
- ASSERT_TRUE(!resultSameTxn.isEmpty());
- ASSERT_EQ(resultSameTxn[SessionTxnRecord::kTxnNumFieldName].numberLong(), 30LL);
- ASSERT_EQ(resultSameTxn[SessionTxnRecord::kLastWriteOpTimeTsFieldName].timestamp(),
- Timestamp(Seconds(6), 0));
+ ASSERT_TRUE(!resultSameTxnDoc.isEmpty());
+
+ auto resultSameTxn =
+ SessionTxnRecord::parse(IDLParserErrorContext("resultSameTxnDoc test"), resultSameTxnDoc);
+
+ ASSERT_EQ(resultSameTxn.getTxnNum(), 30LL);
+ ASSERT_EQ(resultSameTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(6), 0), 1));
// There is no entry for the write with no txnNumber.
auto resultNoTxn =
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 575c442ac74..0c4e6a1b70d 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -269,7 +269,7 @@ boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj co
void CollectionShardingState::onInsertOp(OperationContext* opCtx,
const BSONObj& insertedDoc,
- const Timestamp& oplogTs) {
+ const repl::OpTime& opTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -293,7 +293,7 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx,
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
- _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, oplogTs);
+ _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, opTime);
}
}
@@ -301,8 +301,8 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
const BSONObj& updatedDoc,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs) {
+ const repl::OpTime& opTime,
+ const repl::OpTime& prePostImageOpTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -319,7 +319,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
- _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, oplogTs, prePostImageTs);
+ _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, opTime, prePostImageOpTime);
}
}
@@ -330,8 +330,8 @@ auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState
void CollectionShardingState::onDeleteOp(OperationContext* opCtx,
const DeleteState& deleteState,
- const Timestamp& oplogTs,
- const Timestamp& preImageTs) {
+ const repl::OpTime& opTime,
+ const repl::OpTime& preImageOpTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -372,7 +372,7 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx,
checkShardVersionOrThrow(opCtx);
if (_sourceMgr && deleteState.isMigrating) {
- _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, oplogTs, preImageTs);
+ _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, opTime, preImageOpTime);
}
}
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index 6f9722dceb9..c70a032918b 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -224,17 +224,19 @@ public:
*
* The global exclusive lock is expected to be held by the caller of any of these functions.
*/
- void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const Timestamp& oplogTs);
+ void onInsertOp(OperationContext* opCtx,
+ const BSONObj& insertedDoc,
+ const repl::OpTime& opTime);
void onUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
const BSONObj& updatedDoc,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs);
+ const repl::OpTime& opTime,
+ const repl::OpTime& prePostImageOpTime);
void onDeleteOp(OperationContext* opCtx,
const DeleteState& deleteState,
- const Timestamp& oplogTs,
- const Timestamp& preImageTs);
+ const repl::OpTime& opTime,
+ const repl::OpTime& preImageOpTime);
void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName);
private:
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h
index 9d9a1407f1d..ff3d31132fa 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source.h
@@ -38,6 +38,10 @@ class OperationContext;
class Status;
class Timestamp;
+namespace repl {
+class OpTime;
+} // namespace repl
+
/**
* This class is responsible for producing chunk documents to be moved from donor to a recipient
* shard and its methods represent cloning stages. Its lifetime is owned and controlled by a single
@@ -121,7 +125,7 @@ public:
*/
virtual void onInsertOp(OperationContext* opCtx,
const BSONObj& insertedDoc,
- const Timestamp& oplogTs) = 0;
+ const repl::OpTime& opTime) = 0;
/**
* Notifies this cloner that an update happened to the collection, which it owns. It is up to
@@ -132,8 +136,8 @@ public:
*/
virtual void onUpdateOp(OperationContext* opCtx,
const BSONObj& updatedDoc,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs) = 0;
+ const repl::OpTime& opTime,
+ const repl::OpTime& prePostImageOpTime) = 0;
/**
* Notifies this cloner that a delede happened to the collection, which it owns. It is up to the
@@ -144,8 +148,8 @@ public:
*/
virtual void onDeleteOp(OperationContext* opCtx,
const BSONObj& deletedDocId,
- const Timestamp& oplogTs,
- const Timestamp& preImageTs) = 0;
+ const repl::OpTime& opTime,
+ const repl::OpTime& preImageOpTime) = 0;
protected:
MigrationChunkClonerSource();
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 5d0db5dea0b..e3438030741 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/query/internal_plans.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/s/start_chunk_clone_request.h"
#include "mongo/db/service_context.h"
#include "mongo/executor/remote_command_request.h"
@@ -136,13 +137,13 @@ public:
LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner,
const BSONObj& idObj,
const char op,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs)
+ const repl::OpTime& opTime,
+ const repl::OpTime& prePostImageOpTime)
: _cloner(cloner),
_idObj(idObj.getOwned()),
_op(op),
- _oplogTs(oplogTs),
- _prePostImageTs(prePostImageTs) {}
+ _opTime(opTime),
+ _prePostImageOpTime(prePostImageOpTime) {}
void commit() override {
switch (_op) {
@@ -163,12 +164,12 @@ public:
MONGO_UNREACHABLE;
}
- if (!_prePostImageTs.isNull()) {
- _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs);
+ if (!_prePostImageOpTime.isNull()) {
+ _cloner->_sessionCatalogSource.notifyNewWriteOpTime(_prePostImageOpTime);
}
- if (!_oplogTs.isNull()) {
- _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs);
+ if (!_opTime.isNull()) {
+ _cloner->_sessionCatalogSource.notifyNewWriteOpTime(_opTime);
}
}
@@ -178,8 +179,8 @@ private:
MigrationChunkClonerSourceLegacy* const _cloner;
const BSONObj _idObj;
const char _op;
- const Timestamp _oplogTs;
- const Timestamp _prePostImageTs;
+ const repl::OpTime _opTime;
+ const repl::OpTime _prePostImageOpTime;
};
MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request,
@@ -371,7 +372,7 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj&
void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
const BSONObj& insertedDoc,
- const Timestamp& oplogTs) {
+ const repl::OpTime& opTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = insertedDoc["_id"];
@@ -387,7 +388,7 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
if (opCtx->getTxnNumber()) {
opCtx->recoveryUnit()->registerChange(
- new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {}));
+ new LogOpForShardingHandler(this, idElement.wrap(), 'i', opTime, {}));
} else {
opCtx->recoveryUnit()->registerChange(
new LogOpForShardingHandler(this, idElement.wrap(), 'i', {}, {}));
@@ -396,8 +397,8 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
const BSONObj& updatedDoc,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs) {
+ const repl::OpTime& opTime,
+ const repl::OpTime& prePostImageOpTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = updatedDoc["_id"];
@@ -413,7 +414,7 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
if (opCtx->getTxnNumber()) {
opCtx->recoveryUnit()->registerChange(
- new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs));
+ new LogOpForShardingHandler(this, idElement.wrap(), 'u', opTime, prePostImageOpTime));
} else {
opCtx->recoveryUnit()->registerChange(
new LogOpForShardingHandler(this, idElement.wrap(), 'u', {}, {}));
@@ -422,8 +423,8 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
const BSONObj& deletedDocId,
- const Timestamp& oplogTs,
- const Timestamp& preImageTs) {
+ const repl::OpTime& opTime,
+ const repl::OpTime& preImageOpTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = deletedDocId["_id"];
@@ -435,7 +436,7 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
if (opCtx->getTxnNumber()) {
opCtx->recoveryUnit()->registerChange(
- new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs));
+ new LogOpForShardingHandler(this, idElement.wrap(), 'd', opTime, preImageOpTime));
} else {
opCtx->recoveryUnit()->registerChange(
new LogOpForShardingHandler(this, idElement.wrap(), 'd', {}, {}));
@@ -731,14 +732,16 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx,
void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx,
BSONArrayBuilder* arrBuilder) {
while (_sessionCatalogSource.hasMoreOplog()) {
- auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog();
+ auto oplog = _sessionCatalogSource.getLastFetchedOplog();
- if (oplogDoc.isEmpty()) {
+ if (!oplog) {
// Last fetched turned out empty, try to see if there are more
_sessionCatalogSource.fetchNextOplog(opCtx);
continue;
}
+ auto oplogDoc = oplog->toBSON();
+
// Use the builder size instead of accumulating the document sizes directly so that we
// take into consideration the overhead of BSONArray indices.
if (arrBuilder->arrSize() &&
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 72d299b58e4..5bd20b0907d 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -75,17 +75,17 @@ public:
void onInsertOp(OperationContext* opCtx,
const BSONObj& insertedDoc,
- const Timestamp& oplogTs) override;
+ const repl::OpTime& opTime) override;
void onUpdateOp(OperationContext* opCtx,
const BSONObj& updatedDoc,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs) override;
+ const repl::OpTime& opTime,
+ const repl::OpTime& prePostImageOpTime) override;
void onDeleteOp(OperationContext* opCtx,
const BSONObj& deletedDocId,
- const Timestamp& oplogTs,
- const Timestamp& preImageTs) override;
+ const repl::OpTime& opTime,
+ const repl::OpTime& preImageOpTime) override;
// Legacy cloner specific functionality
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 6f6865d5f15..05aec22193b 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -90,16 +90,15 @@ repl::OplogLink extractPrePostImageTs(const ProcessOplogResult& lastResult,
uassert(40628,
str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString()
<< " to not have "
- << repl::OplogEntryBase::kPreImageTsFieldName
+ << repl::OplogEntryBase::kPreImageOpTimeFieldName
<< " or "
- << repl::OplogEntryBase::kPostImageTsFieldName,
- !entry.getPreImageTs() && !entry.getPostImageTs());
+ << repl::OplogEntryBase::kPostImageOpTimeFieldName,
+ !entry.getPreImageOpTime() && !entry.getPostImageOpTime());
return oplogLink;
}
- const auto ts = lastResult.oplogTime.getTimestamp();
- invariant(!ts.isNull());
+ invariant(!lastResult.oplogTime.isNull());
const auto& sessionInfo = entry.getOperationSessionInfo();
const auto sessionId = *sessionInfo.getSessionId();
@@ -118,19 +117,19 @@ repl::OplogLink extractPrePostImageTs(const ProcessOplogResult& lastResult,
<< lastResult.txnNum,
lastResult.txnNum == txnNum);
- if (entry.getPreImageTs()) {
- oplogLink.preImageTs = ts;
- } else if (entry.getPostImageTs()) {
- oplogLink.postImageTs = ts;
+ if (entry.getPreImageOpTime()) {
+ oplogLink.preImageOpTime = lastResult.oplogTime;
+ } else if (entry.getPostImageOpTime()) {
+ oplogLink.postImageOpTime = lastResult.oplogTime;
} else {
uasserted(40631,
- str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString()
+ str::stream() << "expected oplog with opTime: " << entry.getOpTime().toString()
<< ": "
<< redact(entry.toBSON())
<< " to have either "
- << repl::OplogEntryBase::kPreImageTsFieldName
+ << repl::OplogEntryBase::kPreImageOpTimeFieldName
<< " or "
- << repl::OplogEntryBase::kPostImageTsFieldName);
+ << repl::OplogEntryBase::kPostImageOpTimeFieldName);
}
return oplogLink;
@@ -185,11 +184,13 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
Shard::RetryPolicy::kNoRetry);
uassertStatusOK(responseStatus.getStatus());
+ uassertStatusOK(responseStatus.getValue().commandStatus);
+
auto result = responseStatus.getValue().response;
auto oplogElement = result[kOplogField];
uassert(ErrorCodes::FailedToParse,
- "_migrateSession response does not have the 'oplog' field as array",
+ "_getNextSessionMods response does not have the 'oplog' field as array",
oplogElement.type() == Array);
return result;
@@ -252,49 +253,53 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
? oplogEntry.getObject()
: BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
- oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum);
-
- writeConflictRetry(
- opCtx,
- "SessionOplogMigration",
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- [&] {
- // Need to take global lock here so repl::logOp will not unlock it and trigger the
- // invariant that disallows unlocking global lock while inside a WUOW.
- // Grab a DBLock here instead of plain GlobalLock to make sure the MMAPV1 flush
- // lock will be lock/unlocked correctly. Take the transaction table db lock to
- // ensure the same lock ordering with normal replicated updates to the table.
- Lock::DBLock lk(
- opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX);
- WriteUnitOfWork wunit(opCtx);
-
- result.oplogTime = repl::logOp(opCtx,
- "n",
- oplogEntry.getNamespace(),
- oplogEntry.getUuid(),
- object,
- &object2,
- true,
- sessionInfo,
- stmtId,
- oplogLink);
-
- auto oplogTs = result.oplogTime.getTimestamp();
- uassert(40633,
- str::stream() << "Failed to create new oplog entry for oplog with opTime: "
- << oplogEntry.getOpTime().toString()
- << ": "
- << redact(oplogBSON),
- !oplogTs.isNull());
-
- // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because
- // the next oplog will contain the real operation.
- if (!result.isPrePostImage) {
- scopedSession->onWriteOpCompletedOnPrimary(opCtx, result.txnNum, {stmtId}, oplogTs);
- }
-
- wunit.commit();
- });
+ oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum);
+
+ writeConflictRetry(opCtx,
+ "SessionOplogMigration",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ // Need to take global lock here so repl::logOp will not unlock it and
+ // trigger the invariant that disallows unlocking global lock while
+ // inside a WUOW. Grab a DBLock here instead of plain GlobalLock to make
+ // sure the MMAPV1 flush lock will be lock/unlocked correctly. Take the
+ // transaction table db lock to ensure the same lock ordering with normal
+ // replicated updates to the table.
+ Lock::DBLock lk(opCtx,
+ NamespaceString::kSessionTransactionsTableNamespace.db(),
+ MODE_IX);
+ WriteUnitOfWork wunit(opCtx);
+
+ result.oplogTime = repl::logOp(opCtx,
+ "n",
+ oplogEntry.getNamespace(),
+ oplogEntry.getUuid(),
+ object,
+ &object2,
+ true,
+ sessionInfo,
+ stmtId,
+ oplogLink);
+
+ auto oplogOpTime = result.oplogTime;
+ uassert(40633,
+ str::stream()
+ << "Failed to create new oplog entry for oplog with opTime: "
+ << oplogEntry.getOpTime().toString()
+ << ": "
+ << redact(oplogBSON),
+ !oplogOpTime.isNull());
+
+ // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post
+ // image, because
+ // the next oplog will contain the real operation.
+ if (!result.isPrePostImage) {
+ scopedSession->onWriteOpCompletedOnPrimary(
+ opCtx, result.txnNum, {stmtId}, oplogOpTime);
+ }
+
+ wunit.commit();
+ });
return result;
}
@@ -327,8 +332,10 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) {
void SessionCatalogMigrationDestination::finish() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _state = State::Committing;
- _isStateChanged.notify_all();
+ if (_state != State::ErrorOccurred) {
+ _state = State::Committing;
+ _isStateChanged.notify_all();
+ }
}
void SessionCatalogMigrationDestination::join() {
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 71c63721c6f..3dc34514061 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -128,10 +128,9 @@ public:
});
}
- repl::OplogEntry getOplog(OperationContext* opCtx, const Timestamp& ts) {
+ repl::OplogEntry getOplog(OperationContext* opCtx, const repl::OpTime& opTime) {
DBDirectClient client(opCtx);
- auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(),
- BSON(repl::OplogEntryBase::kTimestampFieldName << ts));
+ auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(), opTime.asQuery());
ASSERT_FALSE(oplogBSON.isEmpty());
auto parseStatus = repl::OplogEntry::parse(oplogBSON);
@@ -297,7 +296,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -350,7 +349,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, txnNum);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(txnNum));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(txnNum));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -396,7 +395,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -453,7 +452,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
{
auto session = getSessionWithTxn(opCtx, sessionId1, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
@@ -461,7 +460,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
{
auto session = getSessionWithTxn(opCtx, sessionId2, 42);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(42));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(42));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -526,7 +525,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplog(oplog2, historyIter.next(opCtx));
@@ -565,7 +564,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
- updateOplog.setPreImageTs(Timestamp(100, 2));
+ updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog, updateOplog});
// migration always fetches at least twice to transition from committing to done.
@@ -578,7 +577,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
@@ -606,13 +605,13 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
ASSERT_FALSE(historyIter.hasNext());
- ASSERT_TRUE(nextOplog.getPreImageTs());
- ASSERT_FALSE(nextOplog.getPostImageTs());
+ ASSERT_TRUE(nextOplog.getPreImageOpTime());
+ ASSERT_FALSE(nextOplog.getPostImageOpTime());
// Check preImage oplog
- auto preImageTs = nextOplog.getPreImageTs().value();
- auto newPreImageOplog = getOplog(opCtx, preImageTs);
+ auto preImageOpTime = nextOplog.getPreImageOpTime().value();
+ auto newPreImageOplog = getOplog(opCtx, preImageOpTime);
ASSERT_TRUE(newPreImageOplog.getStatementId());
ASSERT_EQ(45, newPreImageOplog.getStatementId().value());
@@ -656,7 +655,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
- updateOplog.setPostImageTs(Timestamp(100, 2));
+ updateOplog.setPostImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({postImageOplog, updateOplog});
// migration always fetches at least twice to transition from committing to done.
@@ -669,7 +668,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
@@ -697,13 +696,13 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
ASSERT_FALSE(historyIter.hasNext());
- ASSERT_FALSE(nextOplog.getPreImageTs());
- ASSERT_TRUE(nextOplog.getPostImageTs());
+ ASSERT_FALSE(nextOplog.getPreImageOpTime());
+ ASSERT_TRUE(nextOplog.getPostImageOpTime());
// Check preImage oplog
- auto postImageTs = nextOplog.getPostImageTs().value();
- auto newPostImageOplog = getOplog(opCtx, postImageTs);
+ auto postImageOpTime = nextOplog.getPostImageOpTime().value();
+ auto newPostImageOplog = getOplog(opCtx, postImageOpTime);
ASSERT_TRUE(newPostImageOplog.getStatementId());
ASSERT_EQ(45, newPostImageOplog.getStatementId().value());
@@ -747,7 +746,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
- updateOplog.setPreImageTs(Timestamp(100, 2));
+ updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog});
returnOplog({updateOplog});
@@ -761,7 +760,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
@@ -789,13 +788,13 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
ASSERT_FALSE(historyIter.hasNext());
- ASSERT_TRUE(nextOplog.getPreImageTs());
- ASSERT_FALSE(nextOplog.getPostImageTs());
+ ASSERT_TRUE(nextOplog.getPreImageOpTime());
+ ASSERT_FALSE(nextOplog.getPostImageOpTime());
// Check preImage oplog
- auto preImageTs = nextOplog.getPreImageTs().value();
- auto newPreImageOplog = getOplog(opCtx, preImageTs);
+ auto preImageOpTime = nextOplog.getPreImageOpTime().value();
+ auto newPreImageOplog = getOplog(opCtx, preImageOpTime);
ASSERT_TRUE(newPreImageOplog.getStatementId());
ASSERT_EQ(45, newPreImageOplog.getStatementId().value());
@@ -856,7 +855,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto session = getSessionWithTxn(opCtx, sessionId, 20);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(20));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(20));
ASSERT_TRUE(historyIter.hasNext());
auto oplog = historyIter.next(opCtx);
@@ -912,7 +911,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto session = getSessionWithTxn(opCtx, sessionId, 20);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(20));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(20));
ASSERT_TRUE(historyIter.hasNext());
auto oplog = historyIter.next(opCtx);
@@ -1079,7 +1078,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
@@ -1157,7 +1156,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
sessionInfo.setSessionId(makeLogicalSessionIdForTest());
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
- updateOplog.setPreImageTs(Timestamp(100, 2));
+ updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog, updateOplog});
@@ -1197,7 +1196,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNo
sessionInfo.setTxnNumber(56);
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
- updateOplog.setPreImageTs(Timestamp(100, 2));
+ updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog, updateOplog});
@@ -1276,7 +1275,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
- updateOplog.setPreImageTs(Timestamp(100, 2));
+ updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({oplog1, updateOplog});
@@ -1316,7 +1315,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
- updateOplog.setPostImageTs(Timestamp(100, 2));
+ updateOplog.setPostImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({oplog1, updateOplog});
@@ -1365,7 +1364,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto session = getSessionWithTxn(opCtx, sessionId, 19);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(19));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(19));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 829d862b370..ac835502952 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -41,20 +41,23 @@ namespace mongo {
namespace {
-BSONObj fetchPrePostImageOplog(OperationContext* opCtx, const repl::OplogEntry& oplog) {
- auto tsToFetch = oplog.getPreImageTs();
+boost::optional<repl::OplogEntry> fetchPrePostImageOplog(OperationContext* opCtx,
+ const repl::OplogEntry& oplog) {
+ auto opTimeToFetch = oplog.getPreImageOpTime();
- if (!tsToFetch) {
- tsToFetch = oplog.getPostImageTs();
+ if (!opTimeToFetch) {
+ opTimeToFetch = oplog.getPostImageOpTime();
}
- if (!tsToFetch) {
- return BSONObj();
+ if (!opTimeToFetch) {
+ return boost::none;
}
+ auto opTime = opTimeToFetch.value();
DBDirectClient client(opCtx);
- return client.findOne(NamespaceString::kRsOplogNamespace.ns(),
- Query(BSON("ts" << tsToFetch.value())));
+ auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(), opTime.asQuery());
+
+ return uassertStatusOK(repl::OplogEntry::parse(oplogBSON));
}
} // unnamed namespace
@@ -66,10 +69,10 @@ bool SessionCatalogMigrationSource::hasMoreOplog() {
return _hasMoreOplogFromSessionCatalog() || _hasNewWrites();
}
-BSONObj SessionCatalogMigrationSource::getLastFetchedOplog() {
+boost::optional<repl::OplogEntry> SessionCatalogMigrationSource::getLastFetchedOplog() {
{
stdx::lock_guard<stdx::mutex> _lk(_sessionCloneMutex);
- if (!_lastFetchedOplog.isEmpty()) {
+ if (_lastFetchedOplog) {
return _lastFetchedOplog;
}
}
@@ -100,13 +103,12 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte
return false;
}
- auto nextOplogBSON = nextOplog.toBSON().getOwned();
auto doc = fetchPrePostImageOplog(opCtx, nextOplog);
- if (!doc.isEmpty()) {
- _lastFetchedOplogBuffer.push_back(nextOplogBSON);
- _lastFetchedOplog = doc;
+ if (doc) {
+ _lastFetchedOplogBuffer.push_back(nextOplog);
+ _lastFetchedOplog = *doc;
} else {
- _lastFetchedOplog = nextOplogBSON;
+ _lastFetchedOplog = nextOplog;
}
return true;
@@ -120,18 +122,18 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte
bool SessionCatalogMigrationSource::_hasMoreOplogFromSessionCatalog() {
stdx::lock_guard<stdx::mutex> _lk(_sessionCloneMutex);
- return !_lastFetchedOplog.isEmpty() || !_lastFetchedOplogBuffer.empty();
+ return _lastFetchedOplog || !_lastFetchedOplogBuffer.empty();
}
// Important: The no-op oplog entry for findAndModify should always be returned first before the
// actual operation.
-BSONObj SessionCatalogMigrationSource::_getLastFetchedOplogFromSessionCatalog() {
+repl::OplogEntry SessionCatalogMigrationSource::_getLastFetchedOplogFromSessionCatalog() {
stdx::lock_guard<stdx::mutex> lk(_sessionCloneMutex);
return _lastFetchedOplogBuffer.back();
}
bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationContext* opCtx) {
- stdx::lock_guard<stdx::mutex> lk(_sessionCloneMutex);
+ stdx::unique_lock<stdx::mutex> lk(_sessionCloneMutex);
if (!_lastFetchedOplogBuffer.empty()) {
_lastFetchedOplog = _lastFetchedOplogBuffer.back();
@@ -139,25 +141,20 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC
return true;
}
- _lastFetchedOplog = BSONObj();
+ _lastFetchedOplog.reset();
if (_handleWriteHistory(lk, opCtx)) {
return true;
}
- if (!_sessionCatalogCursor) {
- DBDirectClient client(opCtx);
- Query query;
- query.sort(BSON("_id" << 1)); // strictly not required, but helps make test deterministic.
- _sessionCatalogCursor =
- client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), query);
- }
+ _initIfNotYet(lk, opCtx);
- while (_sessionCatalogCursor->more()) {
- auto nextSession = SessionTxnRecord::parse(
- IDLParserErrorContext("Session migration cloning"), _sessionCatalogCursor->next());
- _writeHistoryIterator =
- stdx::make_unique<TransactionHistoryIterator>(nextSession.getLastWriteOpTimeTs());
+ while (!_sessionLastWriteOpTimes.empty()) {
+ auto lowestOpTimeIter = _sessionLastWriteOpTimes.begin();
+ auto nextOpTime = *lowestOpTimeIter;
+ _sessionLastWriteOpTimes.erase(lowestOpTimeIter);
+
+ _writeHistoryIterator = stdx::make_unique<TransactionHistoryIterator>(nextOpTime);
if (_handleWriteHistory(lk, opCtx)) {
return true;
}
@@ -168,48 +165,67 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC
bool SessionCatalogMigrationSource::_hasNewWrites() {
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
- return !_lastFetchedNewWriteOplog.isEmpty() || !_newWriteTsList.empty();
+ return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty();
}
-BSONObj SessionCatalogMigrationSource::_getLastFetchedNewWriteOplog() {
+repl::OplogEntry SessionCatalogMigrationSource::_getLastFetchedNewWriteOplog() {
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
- return _lastFetchedNewWriteOplog;
+ invariant(_lastFetchedNewWriteOplog);
+ return *_lastFetchedNewWriteOplog;
}
bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* opCtx) {
- Timestamp nextOplogTsToFetch;
+ repl::OpTime nextOpTimeToFetch;
{
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
- if (_newWriteTsList.empty()) {
- _lastFetchedNewWriteOplog = BSONObj();
+ if (_newWriteOpTimeList.empty()) {
+ _lastFetchedNewWriteOplog.reset();
return false;
}
- nextOplogTsToFetch = _newWriteTsList.front();
- _newWriteTsList.pop_front();
+ nextOpTimeToFetch = _newWriteOpTimeList.front();
}
DBDirectClient client(opCtx);
- auto newWriteOplog = client.findOne(NamespaceString::kRsOplogNamespace.ns(),
- Query(BSON("ts" << nextOplogTsToFetch)));
+ auto newWriteOplog =
+ client.findOne(NamespaceString::kRsOplogNamespace.ns(), nextOpTimeToFetch.asQuery());
uassert(40620,
- str::stream() << "Unable to fetch oplog entry with ts: " << nextOplogTsToFetch.toBSON(),
+ str::stream() << "Unable to fetch oplog entry with opTime: "
+ << nextOpTimeToFetch.toBSON(),
!newWriteOplog.isEmpty());
{
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
- _lastFetchedNewWriteOplog = newWriteOplog;
+ _lastFetchedNewWriteOplog = uassertStatusOK(repl::OplogEntry::parse(newWriteOplog));
+ _newWriteOpTimeList.pop_front();
}
return true;
}
-void SessionCatalogMigrationSource::notifyNewWriteTS(Timestamp opTimestamp) {
+void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) {
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
- _newWriteTsList.push_back(opTimestamp);
+ _newWriteOpTimeList.push_back(opTime);
+}
+
+void SessionCatalogMigrationSource::_initIfNotYet(WithLock, OperationContext* opCtx) {
+ if (_alreadyInitialized) {
+ return;
+ }
+
+ DBDirectClient client(opCtx);
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), {});
+
+ while (cursor->more()) {
+ auto nextSession = SessionTxnRecord::parse(
+ IDLParserErrorContext("Session migration cloning"), cursor->next());
+ _sessionLastWriteOpTimes.insert(nextSession.getLastWriteOpTime());
+ }
+
+ _alreadyInitialized = true;
}
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 5e44531d31c..c1e6729aca3 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -28,11 +28,13 @@
#pragma once
+#include <boost/optional.hpp>
#include <memory>
#include "mongo/base/disallow_copying.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -70,17 +72,19 @@ public:
* Returns the oplog document that was last fetched by the fetchNextOplog call.
* Returns an empty object if there are no oplog to fetch.
*/
- BSONObj getLastFetchedOplog();
+ boost::optional<repl::OplogEntry> getLastFetchedOplog();
/**
* Remembers the oplog timestamp of a new write that just occurred.
*/
- void notifyNewWriteTS(Timestamp opTimestamp);
+ void notifyNewWriteOpTime(repl::OpTime opTimestamp);
private:
///////////////////////////////////////////////////////////////////////////
// Methods for extracting the oplog entries from session information.
+ void _initIfNotYet(WithLock, OperationContext* opCtx);
+
/**
* If this returns false, it just means that there are no more oplog entry in the buffer that
* needs to be moved over. However, there can still be new incoming operations that can add
@@ -97,7 +101,7 @@ private:
/**
* Returns the document that was last fetched by fetchNextOplogFromSessionCatalog.
*/
- BSONObj _getLastFetchedOplogFromSessionCatalog();
+ repl::OplogEntry _getLastFetchedOplogFromSessionCatalog();
/**
* Extracts oplog information from the current writeHistoryIterator to _lastFetchedOplog. This
@@ -124,16 +128,17 @@ private:
/**
* Returns the oplog that was last fetched by fetchNextNewWriteOplog.
*/
- BSONObj _getLastFetchedNewWriteOplog();
+ repl::OplogEntry _getLastFetchedNewWriteOplog();
const NamespaceString _ns;
- // Protects _sessionCatalogCursor, _writeHistoryIterator,
+ // Protects _alreadyInitialized, _sessionCatalogCursor, _writeHistoryIterator
// _lastFetchedOplogBuffer, _lastFetchedOplog
stdx::mutex _sessionCloneMutex;
- // Cursor for iterating over the session catalog.
- std::unique_ptr<DBClientCursor> _sessionCatalogCursor;
+ bool _alreadyInitialized = false;
+
+ std::set<repl::OpTime> _sessionLastWriteOpTimes;
// Iterator for oplog entries for a specific transaction.
std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator;
@@ -141,19 +146,19 @@ private:
// Used for temporarily storing oplog entries for operations that has more than one entry.
// For example, findAndModify generates one for the actual operation and another for the
// pre/post image.
- std::vector<BSONObj> _lastFetchedOplogBuffer;
+ std::vector<repl::OplogEntry> _lastFetchedOplogBuffer;
// Used to store the last fetched oplog. This enables calling get multiple times.
- BSONObj _lastFetchedOplog;
+ boost::optional<repl::OplogEntry> _lastFetchedOplog;
// Protects _newWriteTsList, _lastFetchedNewWriteOplog
stdx::mutex _newOplogMutex;
- // Stores oplog timestamps of new writes that are coming in.
- std::list<Timestamp> _newWriteTsList;
+ // Stores oplog opTime of new writes that are coming in.
+ std::list<repl::OpTime> _newWriteOpTimeList;
// Used to store the last fetched oplog from _newWriteTsList.
- BSONObj _lastFetchedNewWriteOplog;
+ boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplog;
};
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index ff43cce5d0b..f7051904a9a 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -56,18 +56,18 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
- entry1.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
insertOplogEntry(entry1);
repl::OplogEntry entry2(
repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50));
- entry2.setPrevWriteTsInTransaction(entry1.getTimestamp());
+ entry2.setPrevWriteOpTimeInTransaction(entry1.getOpTime());
insertOplogEntry(entry2);
SessionTxnRecord sessionRecord;
sessionRecord.setSessionId(makeLogicalSessionIdForTest());
sessionRecord.setTxnNum(1);
- sessionRecord.setLastWriteOpTimeTs(entry2.getTimestamp());
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
@@ -77,15 +77,19 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplog->toBSON());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
}
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON());
}
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
@@ -97,34 +101,33 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
repl::OplogEntry entry1a(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
- entry1a.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry1a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
repl::OplogEntry entry1b(
repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50));
- entry1b.setPrevWriteTsInTransaction(entry1a.getTimestamp());
+ entry1b.setPrevWriteOpTimeInTransaction(entry1a.getOpTime());
SessionTxnRecord sessionRecord1;
sessionRecord1.setSessionId(makeLogicalSessionIdForTest());
sessionRecord1.setTxnNum(1);
- sessionRecord1.setLastWriteOpTimeTs(entry1b.getTimestamp());
+ sessionRecord1.setLastWriteOpTime(entry1b.getOpTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
sessionRecord1.toBSON());
-
repl::OplogEntry entry2a(
repl::OpTime(Timestamp(43, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
- entry2a.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry2a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
repl::OplogEntry entry2b(
repl::OpTime(Timestamp(789, 13), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50));
- entry2b.setPrevWriteTsInTransaction(entry2a.getTimestamp());
+ entry2b.setPrevWriteOpTimeInTransaction(entry2a.getOpTime());
SessionTxnRecord sessionRecord2;
sessionRecord2.setSessionId(makeLogicalSessionIdForTest());
sessionRecord2.setTxnNum(1);
- sessionRecord2.setLastWriteOpTimeTs(entry2b.getTimestamp());
+ sessionRecord2.setLastWriteOpTime(entry2b.getOpTime());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
sessionRecord2.toBSON());
@@ -141,34 +144,27 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
const repl::OplogEntry& secondExpectedOplog) {
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(firstExpectedOplog.toBSON(), nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(firstExpectedOplog.toBSON(), nextOplog->toBSON());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
}
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(secondExpectedOplog.toBSON(), nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ ASSERT_BSONOBJ_EQ(secondExpectedOplog.toBSON(), nextOplog->toBSON());
}
};
- if (sessionRecord1.getSessionId().toBSON().woCompare(sessionRecord2.getSessionId().toBSON()) <
- 0) {
- checkNextBatch(entry1b, entry1a);
-
- ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- ASSERT_TRUE(migrationSource.hasMoreOplog());
-
- checkNextBatch(entry2b, entry2a);
- } else {
- checkNextBatch(entry2b, entry2a);
+ checkNextBatch(entry1b, entry1a);
- ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- ASSERT_TRUE(migrationSource.hasMoreOplog());
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
- checkNextBatch(entry1b, entry1a);
- }
+ checkNextBatch(entry2b, entry2a);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -181,18 +177,18 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 30));
- entry1.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
insertOplogEntry(entry1);
repl::OplogEntry entry2(
repl::OpTime(Timestamp(52, 346), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50));
- entry2.setPrevWriteTsInTransaction(Timestamp(0, 0));
- entry2.setPreImageTs(entry1.getTimestamp());
+ entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry2.setPreImageOpTime(entry1.getOpTime());
insertOplogEntry(entry2);
repl::OplogEntry entry3(
repl::OpTime(Timestamp(73, 5), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 20));
- entry3.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
insertOplogEntry(entry3);
repl::OplogEntry entry4(repl::OpTime(Timestamp(73, 6), 2),
@@ -201,14 +197,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
kNs,
BSON("x" << 19),
BSON("$inc" << BSON("x" << 1)));
- entry4.setPrevWriteTsInTransaction(entry2.getTimestamp());
- entry4.setPostImageTs(entry3.getTimestamp());
+ entry4.setPrevWriteOpTimeInTransaction(entry2.getOpTime());
+ entry4.setPostImageOpTime(entry3.getOpTime());
insertOplogEntry(entry4);
SessionTxnRecord sessionRecord;
sessionRecord.setSessionId(makeLogicalSessionIdForTest());
sessionRecord.setTxnNum(1);
- sessionRecord.setLastWriteOpTimeTs(entry4.getTimestamp());
+ sessionRecord.setLastWriteOpTime(entry4.getOpTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
@@ -216,12 +212,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
SessionCatalogMigrationSource migrationSource(kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- auto expectedSequece = {entry3.toBSON(), entry4.toBSON(), entry1.toBSON(), entry2.toBSON()};
+ auto expectedSequece = {entry3, entry4, entry1, entry2};
- for (auto oplogDoc : expectedSequece) {
+ for (auto oplog : expectedSequece) {
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(oplogDoc, nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplog->toBSON());
migrationSource.fetchNextOplog(opCtx());
}
@@ -233,13 +231,13 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
- entry1.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
insertOplogEntry(entry1);
SessionTxnRecord sessionRecord1;
sessionRecord1.setSessionId(makeLogicalSessionIdForTest());
sessionRecord1.setTxnNum(1);
- sessionRecord1.setLastWriteOpTimeTs(entry1.getTimestamp());
+ sessionRecord1.setLastWriteOpTime(entry1.getOpTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
@@ -251,13 +249,13 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
repl::OpTypeEnum::kDelete,
NamespaceString("x.y"),
BSON("x" << 30));
- entry2.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
insertOplogEntry(entry2);
SessionTxnRecord sessionRecord2;
sessionRecord2.setSessionId(makeLogicalSessionIdForTest());
sessionRecord2.setTxnNum(1);
- sessionRecord2.setLastWriteOpTimeTs(entry2.getTimestamp());
+ sessionRecord2.setLastWriteOpTime(entry2.getOpTime());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
sessionRecord2.toBSON());
@@ -266,8 +264,10 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -278,13 +278,13 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
- entry1.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
insertOplogEntry(entry1);
SessionTxnRecord sessionRecord1;
sessionRecord1.setSessionId(makeLogicalSessionIdForTest());
sessionRecord1.setTxnNum(1);
- sessionRecord1.setLastWriteOpTimeTs(entry1.getTimestamp());
+ sessionRecord1.setLastWriteOpTime(entry1.getOpTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
@@ -292,38 +292,42 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
repl::OplogEntry entry2(
repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
- entry2.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
insertOplogEntry(entry2);
repl::OplogEntry entry3(
repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40));
- entry3.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
insertOplogEntry(entry3);
SessionCatalogMigrationSource migrationSource(kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- migrationSource.notifyNewWriteTS(entry2.getTimestamp());
- migrationSource.notifyNewWriteTS(entry3.getTimestamp());
+ migrationSource.notifyNewWriteOpTime(entry2.getOpTime());
+ migrationSource.notifyNewWriteOpTime(entry3.getOpTime());
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
}
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplog->toBSON());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
}
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(entry3.toBSON(), nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ ASSERT_BSONOBJ_EQ(entry3.toBSON(), nextOplog->toBSON());
}
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
@@ -336,7 +340,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
SessionCatalogMigrationSource migrationSource(kNs);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
- migrationSource.notifyNewWriteTS(Timestamp(100, 3));
+ migrationSource.notifyNewWriteOpTime(repl::OpTime(Timestamp(100, 3), 1));
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_THROWS(migrationSource.fetchNextOplog(opCtx()), AssertionException);
}
@@ -353,15 +357,17 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
repl::OpTypeEnum::kInsert,
kNs,
BSON("x" << 30));
- entry.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
insertOplogEntry(entry);
- migrationSource.notifyNewWriteTS(entry.getTimestamp());
+ migrationSource.notifyNewWriteOpTime(entry.getOpTime());
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(entry.toBSON(), nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -370,15 +376,16 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
{
repl::OplogEntry entry(
repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
- entry.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
insertOplogEntry(entry);
- migrationSource.notifyNewWriteTS(entry.getTimestamp());
+ migrationSource.notifyNewWriteOpTime(entry.getOpTime());
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(entry.toBSON(), nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -387,15 +394,16 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
{
repl::OplogEntry entry(
repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40));
- entry.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
insertOplogEntry(entry);
- migrationSource.notifyNewWriteTS(entry.getTimestamp());
+ migrationSource.notifyNewWriteOpTime(entry.getOpTime());
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- auto nextDoc = migrationSource.getLastFetchedOplog();
- ASSERT_BSONOBJ_EQ(entry.toBSON(), nextDoc);
+ auto nextOplog = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplog);
+ ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 13edd3d5535..c2ac4731b87 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -81,11 +81,11 @@ void fassertOnRepeatedExecution(OperationContext* opCtx,
const LogicalSessionId& lsid,
TxnNumber txnNumber,
StmtId stmtId,
- Timestamp firstTs,
- Timestamp secondTs) {
+ const repl::OpTime& firstOpTime,
+ const repl::OpTime& secondOpTime) {
severe() << "Statement id " << stmtId << " from transaction [ " << lsid.toBSON() << ":"
- << txnNumber << " ] was committed once with timestamp " << firstTs
- << " and a second time with timestamp " << secondTs
+ << txnNumber << " ] was committed once with opTime " << firstOpTime
+ << " and a second time with opTime " << secondOpTime
<< ". This indicates possible data corruption or server bug and the process will be "
"terminated.";
fassertFailed(40526);
@@ -132,20 +132,20 @@ void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
CommittedStatementTimestampMap activeTxnCommittedStatements;
if (lastWrittenTxnRecord) {
- auto it = TransactionHistoryIterator(lastWrittenTxnRecord->getLastWriteOpTimeTs());
+ auto it = TransactionHistoryIterator(lastWrittenTxnRecord->getLastWriteOpTime());
while (it.hasNext()) {
const auto entry = it.next(opCtx);
invariant(entry.getStatementId());
const auto insertRes = activeTxnCommittedStatements.emplace(*entry.getStatementId(),
- entry.getTimestamp());
+ entry.getOpTime());
if (!insertRes.second) {
- const auto& existingTs = insertRes.first->second;
+ const auto& existingOpTime = insertRes.first->second;
fassertOnRepeatedExecution(opCtx,
_sessionId,
lastWrittenTxnRecord->getTxnNum(),
*entry.getStatementId(),
- existingTs,
- entry.getTimestamp());
+ existingOpTime,
+ entry.getOpTime());
}
}
}
@@ -177,28 +177,29 @@ void Session::beginTxn(OperationContext* opCtx, TxnNumber txnNumber) {
void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
TxnNumber txnNumber,
std::vector<StmtId> stmtIdsWritten,
- Timestamp lastStmtIdWriteTs) {
+ const repl::OpTime& lastStmtIdWriteOpTime) {
invariant(opCtx->lockState()->inAWriteUnitOfWork());
stdx::unique_lock<stdx::mutex> ul(_mutex);
// Sanity check that we don't double-execute statements
for (const auto stmtId : stmtIdsWritten) {
- const auto stmtTimestamp = _checkStatementExecuted(ul, txnNumber, stmtId);
- if (stmtTimestamp) {
+ const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId);
+ if (stmtOpTime) {
fassertOnRepeatedExecution(
- opCtx, _sessionId, txnNumber, stmtId, *stmtTimestamp, lastStmtIdWriteTs);
+ opCtx, _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime);
}
}
- const auto updateRequest = _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteTs);
+ const auto updateRequest = _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime);
ul.unlock();
repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
updateSessionEntry(opCtx, updateRequest);
- _registerUpdateCacheOnCommit(opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteTs);
+ _registerUpdateCacheOnCommit(
+ opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
}
void Session::updateSessionRecordOnSecondary(OperationContext* opCtx,
@@ -233,15 +234,15 @@ void Session::invalidate() {
_activeTxnCommittedStatements.clear();
}
-Timestamp Session::getLastWriteOpTimeTs(TxnNumber txnNumber) const {
+repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const {
stdx::lock_guard<stdx::mutex> lg(_mutex);
_checkValid(lg);
_checkIsActiveTransaction(lg, txnNumber);
if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
- return Timestamp();
+ return {};
- return _lastWrittenSessionRecord->getLastWriteOpTimeTs();
+ return _lastWrittenSessionRecord->getLastWriteOpTime();
}
boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationContext* opCtx,
@@ -303,9 +304,9 @@ void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const {
txnNumber == _activeTxnNumber);
}
-boost::optional<Timestamp> Session::_checkStatementExecuted(WithLock wl,
- TxnNumber txnNumber,
- StmtId stmtId) const {
+boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl,
+ TxnNumber txnNumber,
+ StmtId stmtId) const {
_checkValid(wl);
_checkIsActiveTransaction(wl, txnNumber);
@@ -321,22 +322,21 @@ boost::optional<Timestamp> Session::_checkStatementExecuted(WithLock wl,
UpdateRequest Session::_makeUpdateRequest(WithLock,
TxnNumber newTxnNumber,
- Timestamp newLastWriteTs) const {
+ const repl::OpTime& newLastWriteOpTime) const {
UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
if (_lastWrittenSessionRecord) {
updateRequest.setQuery(_lastWrittenSessionRecord->toBSON());
- updateRequest.setUpdates(
- BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName
- << newTxnNumber
- << SessionTxnRecord::kLastWriteOpTimeTsFieldName
- << newLastWriteTs)));
+ updateRequest.setUpdates(BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName
+ << newTxnNumber
+ << SessionTxnRecord::kLastWriteOpTimeFieldName
+ << newLastWriteOpTime)));
} else {
const auto updateBSON = [&] {
SessionTxnRecord newTxnRecord;
newTxnRecord.setSessionId(_sessionId);
newTxnRecord.setTxnNum(newTxnNumber);
- newTxnRecord.setLastWriteOpTimeTs(newLastWriteTs);
+ newTxnRecord.setLastWriteOpTime(newLastWriteOpTime);
return newTxnRecord.toBSON();
}();
@@ -351,13 +351,13 @@ UpdateRequest Session::_makeUpdateRequest(WithLock,
void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
TxnNumber newTxnNumber,
std::vector<StmtId> stmtIdsWritten,
- Timestamp lastStmtIdWriteTs) {
+ const repl::OpTime& lastStmtIdWriteOpTime) {
opCtx->recoveryUnit()->onCommit([
this,
opCtx,
newTxnNumber,
stmtIdsWritten = std::move(stmtIdsWritten),
- lastStmtIdWriteTs
+ lastStmtIdWriteOpTime
] {
stdx::lock_guard<stdx::mutex> lg(_mutex);
@@ -371,13 +371,13 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
_lastWrittenSessionRecord->setSessionId(_sessionId);
_lastWrittenSessionRecord->setTxnNum(newTxnNumber);
- _lastWrittenSessionRecord->setLastWriteOpTimeTs(lastStmtIdWriteTs);
+ _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
} else {
if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
_lastWrittenSessionRecord->setTxnNum(newTxnNumber);
- if (lastStmtIdWriteTs > _lastWrittenSessionRecord->getLastWriteOpTimeTs())
- _lastWrittenSessionRecord->setLastWriteOpTimeTs(lastStmtIdWriteTs);
+ if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime())
+ _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
}
if (newTxnNumber > _activeTxnNumber) {
@@ -392,11 +392,15 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
if (newTxnNumber == _activeTxnNumber) {
for (const auto stmtId : stmtIdsWritten) {
const auto insertRes =
- _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteTs);
+ _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
if (!insertRes.second) {
- const auto& existingTs = insertRes.first->second;
- fassertOnRepeatedExecution(
- opCtx, _sessionId, newTxnNumber, stmtId, existingTs, lastStmtIdWriteTs);
+ const auto& existingOpTime = insertRes.first->second;
+ fassertOnRepeatedExecution(opCtx,
+ _sessionId,
+ newTxnNumber,
+ stmtId,
+ existingOpTime,
+ lastStmtIdWriteOpTime);
}
}
}
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 68501f6da15..21d8d618a9a 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -87,7 +87,7 @@ public:
* Called after a write under the specified transaction completes while the node is a primary
* and specifies the statement ids which were written. Must be called while the caller is still
* in the write's WUOW. Updates the on-disk state of the session to match the specified
- * transaction/timestamp and keeps the cached state in sync.
+ * transaction/opTime and keeps the cached state in sync.
*
* Must only be called with the session checked-out.
*
@@ -96,7 +96,7 @@ public:
void onWriteOpCompletedOnPrimary(OperationContext* opCtx,
TxnNumber txnNumber,
std::vector<StmtId> stmtIdsWritten,
- Timestamp lastStmtIdWriteTs);
+ const repl::OpTime& lastStmtIdWriteOpTime);
/**
* Called after a replication batch has been applied on a secondary node. Keeps the session
@@ -120,7 +120,7 @@ public:
*
* Throws if the session has been invalidated or the active transaction number doesn't match.
*/
- Timestamp getLastWriteOpTimeTs(TxnNumber txnNumber) const;
+ repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const;
/**
* Returns the oplog entry with the given statementId for the specified transaction, if it
@@ -142,18 +142,18 @@ private:
void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const;
- boost::optional<Timestamp> _checkStatementExecuted(WithLock,
- TxnNumber txnNumber,
- StmtId stmtId) const;
+ boost::optional<repl::OpTime> _checkStatementExecuted(WithLock,
+ TxnNumber txnNumber,
+ StmtId stmtId) const;
UpdateRequest _makeUpdateRequest(WithLock,
TxnNumber newTxnNumber,
- Timestamp newLastWriteTs) const;
+ const repl::OpTime& newLastWriteTs) const;
void _registerUpdateCacheOnCommit(OperationContext* opCtx,
TxnNumber newTxnNumber,
std::vector<StmtId> stmtIdsWritten,
- Timestamp lastStmtIdWriteTs);
+ const repl::OpTime& lastStmtIdWriteTs);
const LogicalSessionId _sessionId;
@@ -176,9 +176,9 @@ private:
TxnNumber _activeTxnNumber{kUninitializedTxnNumber};
// For the active txn, tracks which statement ids have been committed and at which oplog
- // timestamp. Used for fast retryability check and retrieving the previous write's data without
+ // opTime. Used for fast retryability check and retrieving the previous write's data without
// having to scan through the oplog.
- using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, Timestamp>;
+ using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>;
CommittedStatementTimestampMap _activeTxnCommittedStatements;
};
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index df163a97d7b..168232a2c25 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -68,7 +68,7 @@ protected:
const LogicalSessionId& lsid,
TxnNumber txnNumber,
StmtId stmtId) {
- return logOp(opCtx, nss, lsid, txnNumber, stmtId, Timestamp());
+ return logOp(opCtx, nss, lsid, txnNumber, stmtId, {});
}
static repl::OpTime logOp(OperationContext* opCtx,
@@ -76,13 +76,13 @@ protected:
const LogicalSessionId& lsid,
TxnNumber txnNumber,
StmtId stmtId,
- Timestamp prevTs) {
+ repl::OpTime prevOpTime) {
OperationSessionInfo osi;
osi.setSessionId(lsid);
osi.setTxnNumber(txnNumber);
repl::OplogLink link;
- link.prevTs = prevTs;
+ link.prevOpTime = prevOpTime;
return repl::logOp(
opCtx, "n", nss, kUUID, BSON("TestValue" << 0), nullptr, false, osi, stmtId, link);
@@ -98,7 +98,7 @@ TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) {
session.beginTxn(opCtx(), txnNum);
ASSERT_EQ(sessionId, session.getSessionId());
- ASSERT(session.getLastWriteOpTimeTs(txnNum).isNull());
+ ASSERT(session.getLastWriteOpTime(txnNum).isNull());
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
@@ -119,7 +119,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp());
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime);
wuow.commit();
return opTime;
@@ -136,8 +136,8 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) {
ASSERT(!cursor->more());
ASSERT_EQ(sessionId, txnRecord.getSessionId());
ASSERT_EQ(txnNum, txnRecord.getTxnNum());
- ASSERT_EQ(opTime.getTimestamp(), txnRecord.getLastWriteOpTimeTs());
- ASSERT_EQ(opTime.getTimestamp(), session.getLastWriteOpTimeTs(txnNum));
+ ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime());
+ ASSERT_EQ(opTime, session.getLastWriteOpTime(txnNum));
}
TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
@@ -145,20 +145,20 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
- const auto writeTxnRecordFn = [&](TxnNumber txnNum, StmtId stmtId, Timestamp prevTs) {
+ const auto writeTxnRecordFn = [&](TxnNumber txnNum, StmtId stmtId, repl::OpTime prevOpTime) {
session.beginTxn(opCtx(), txnNum);
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
- const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevTs);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime.getTimestamp());
+ const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime);
wuow.commit();
- return opTime.getTimestamp();
+ return opTime;
};
- const auto firstTs = writeTxnRecordFn(100, 0, Timestamp());
- const auto secondTs = writeTxnRecordFn(200, 1, firstTs);
+ const auto firstOpTime = writeTxnRecordFn(100, 0, {});
+ const auto secondOpTime = writeTxnRecordFn(200, 1, firstOpTime);
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
@@ -171,12 +171,12 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
ASSERT(!cursor->more());
ASSERT_EQ(sessionId, txnRecord.getSessionId());
ASSERT_EQ(200, txnRecord.getTxnNum());
- ASSERT_EQ(secondTs, txnRecord.getLastWriteOpTimeTs());
- ASSERT_EQ(secondTs, session.getLastWriteOpTimeTs(200));
+ ASSERT_EQ(secondOpTime, txnRecord.getLastWriteOpTime());
+ ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200));
session.invalidate();
session.refreshFromStorageIfNeeded(opCtx());
- ASSERT_EQ(secondTs, session.getLastWriteOpTimeTs(200));
+ ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200));
}
TEST_F(SessionTest, StartingOldTxnShouldAssert) {
@@ -189,7 +189,7 @@ TEST_F(SessionTest, StartingOldTxnShouldAssert) {
ASSERT_THROWS_CODE(
session.beginTxn(opCtx(), txnNum - 1), AssertionException, ErrorCodes::TransactionTooOld);
- ASSERT(session.getLastWriteOpTimeTs(txnNum).isNull());
+ ASSERT(session.getLastWriteOpTime(txnNum).isNull());
}
TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) {
@@ -209,7 +209,7 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()),
+ ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime),
AssertionException);
}
@@ -221,22 +221,22 @@ TEST_F(SessionTest, CheckStatementExecuted) {
const TxnNumber txnNum = 100;
session.beginTxn(opCtx(), txnNum);
- const auto writeTxnRecordFn = [&](StmtId stmtId, Timestamp prevTs) {
+ const auto writeTxnRecordFn = [&](StmtId stmtId, repl::OpTime prevOpTime) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
- const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevTs);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime.getTimestamp());
+ const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime);
wuow.commit();
- return opTime.getTimestamp();
+ return opTime;
};
ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 1000));
- const auto firstTs = writeTxnRecordFn(1000, Timestamp());
+ const auto firstOpTime = writeTxnRecordFn(1000, {});
ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000));
ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 2000));
- writeTxnRecordFn(2000, firstTs);
+ writeTxnRecordFn(2000, firstOpTime);
ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000));
// Invalidate the session and ensure the statements still check out
@@ -282,7 +282,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp());
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime);
wuow.commit();
}
@@ -290,10 +290,9 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0);
- ASSERT_THROWS_CODE(
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime.getTimestamp()),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
}
@@ -311,10 +310,9 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) {
session.invalidate();
- ASSERT_THROWS_CODE(
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
@@ -329,7 +327,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp());
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime);
session.invalidate();
diff --git a/src/mongo/db/session_txn_record.cpp b/src/mongo/db/session_txn_record.cpp
index b5b92a644eb..89b627e7dce 100644
--- a/src/mongo/db/session_txn_record.cpp
+++ b/src/mongo/db/session_txn_record.cpp
@@ -34,12 +34,14 @@
namespace mongo {
-SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, TxnNumber txnNum, Timestamp ts) {
+SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ repl::OpTime opTime) {
SessionTxnRecord record;
record.setSessionId(lsid);
record.setTxnNum(txnNum);
- record.setLastWriteOpTimeTs(ts);
+ record.setLastWriteOpTime(std::move(opTime));
return record;
}
diff --git a/src/mongo/db/session_txn_record.h b/src/mongo/db/session_txn_record.h
index d6425a7076d..b738e9b1dc3 100644
--- a/src/mongo/db/session_txn_record.h
+++ b/src/mongo/db/session_txn_record.h
@@ -30,13 +30,14 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/session_txn_record_gen.h"
namespace mongo {
inline bool operator==(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs) {
return (lhs.getSessionId() == rhs.getSessionId()) && (lhs.getTxnNum() == rhs.getTxnNum()) &&
- (lhs.getLastWriteOpTimeTs() == rhs.getLastWriteOpTimeTs());
+ (lhs.getLastWriteOpTime() == rhs.getLastWriteOpTime());
}
inline bool operator!=(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs) {
@@ -52,10 +53,9 @@ inline bool operator>(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs)
invariant(lhs.getSessionId() == rhs.getSessionId());
return (lhs.getTxnNum() > rhs.getTxnNum()) ||
- (lhs.getTxnNum() == rhs.getTxnNum() &&
- lhs.getLastWriteOpTimeTs() > rhs.getLastWriteOpTimeTs());
+ (lhs.getTxnNum() == rhs.getTxnNum() && lhs.getLastWriteOpTime() > rhs.getLastWriteOpTime());
}
-SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, TxnNumber txnNum, Timestamp ts);
+SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, TxnNumber txnNum, repl::OpTime opTime);
} // namespace mongo
diff --git a/src/mongo/db/session_txn_record.idl b/src/mongo/db/session_txn_record.idl
index 3a9dc02328d..fa81319225b 100644
--- a/src/mongo/db/session_txn_record.idl
+++ b/src/mongo/db/session_txn_record.idl
@@ -30,11 +30,21 @@ global:
cpp_namespace: "mongo"
cpp_includes:
- "mongo/db/logical_session_id.h"
+ - "mongo/db/repl/optime.h"
imports:
- "mongo/idl/basic_types.idl"
- "mongo/db/logical_session_id.idl"
+# TODO: SERVER-31278 import from repl/replication_types.idl instead
+types:
+ optime:
+ bson_serialization_type: object
+ description: A document representing an OpTime.
+ cpp_type: "repl::OpTime"
+ serializer: repl::OpTime::toBSON
+ deserializer: repl::OpTime::parse
+
structs:
sessionTxnRecord:
description: "A document used for storing session transaction states."
@@ -46,8 +56,8 @@ structs:
txnNum:
type: TxnNumber
description: "The id representing this transaction."
- lastWriteOpTimeTs:
- type: timestamp
+ lastWriteOpTime:
+ type: optime
description: "The optime timestamp component of the last write on this
transaction."
diff --git a/src/mongo/db/transaction_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp
index 4d66c0eadde..3220597dd61 100644
--- a/src/mongo/db/transaction_history_iterator.cpp
+++ b/src/mongo/db/transaction_history_iterator.cpp
@@ -38,11 +38,11 @@
namespace mongo {
-TransactionHistoryIterator::TransactionHistoryIterator(Timestamp startingOpTimeTs)
- : _nextOpTimeTs(std::move(startingOpTimeTs)) {}
+TransactionHistoryIterator::TransactionHistoryIterator(repl::OpTime startingOpTime)
+ : _nextOpTime(std::move(startingOpTime)) {}
bool TransactionHistoryIterator::hasNext() const {
- return !_nextOpTimeTs.isNull();
+ return !_nextOpTime.isNull();
}
repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) {
@@ -50,26 +50,24 @@ repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) {
DBDirectClient client(opCtx);
// TODO: SERVER-29843 oplogReplay option might be needed to activate fast ts search.
- auto oplogBSON =
- client.findOne(NamespaceString::kRsOplogNamespace.ns(),
- BSON(repl::OplogEntryBase::kTimestampFieldName << _nextOpTimeTs));
+ auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(), _nextOpTime.asQuery());
uassert(ErrorCodes::IncompleteTransactionHistory,
str::stream() << "oplog no longer contains the complete write history of this "
- "transaction, log with ts "
- << _nextOpTimeTs.toBSON()
+ "transaction, log with opTime "
+ << _nextOpTime.toBSON()
<< " cannot be found",
!oplogBSON.isEmpty());
auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON));
- const auto& oplogPrevTsOption = oplogEntry.getPrevWriteTsInTransaction();
+ const auto& oplogPrevTsOption = oplogEntry.getPrevWriteOpTimeInTransaction();
uassert(
ErrorCodes::FailedToParse,
str::stream() << "Missing prevTs field on oplog entry of previous write in transcation: "
<< redact(oplogBSON),
oplogPrevTsOption);
- _nextOpTimeTs = oplogPrevTsOption.value();
+ _nextOpTime = oplogPrevTsOption.value();
return oplogEntry;
}
diff --git a/src/mongo/db/transaction_history_iterator.h b/src/mongo/db/transaction_history_iterator.h
index 3b6dee175d1..4e455ec3ced 100644
--- a/src/mongo/db/transaction_history_iterator.h
+++ b/src/mongo/db/transaction_history_iterator.h
@@ -28,7 +28,6 @@
#pragma once
-#include "mongo/bson/timestamp.h"
#include "mongo/db/repl/oplog_entry.h"
namespace mongo {
@@ -44,7 +43,7 @@ public:
/**
* Creates a new iterator starting with an oplog entry with the given start opTime.
*/
- TransactionHistoryIterator(Timestamp startingOpTimeTs);
+ TransactionHistoryIterator(repl::OpTime startingOpTime);
/**
* Returns false if there are no more entries to iterate.
@@ -60,7 +59,7 @@ public:
repl::OplogEntry next(OperationContext* opCtx);
private:
- Timestamp _nextOpTimeTs;
+ repl::OpTime _nextOpTime;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_history_iterator_test.cpp b/src/mongo/db/transaction_history_iterator_test.cpp
index 687ced1b8cf..45ef909d8eb 100644
--- a/src/mongo/db/transaction_history_iterator_test.cpp
+++ b/src/mongo/db/transaction_history_iterator_test.cpp
@@ -58,7 +58,7 @@ TEST_F(SessionHistoryIteratorTest, NormalHistory) {
repl::OpTypeEnum::kInsert,
NamespaceString("a.b"),
BSON("x" << 30));
- entry1.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ entry1.setPrevWriteOpTimeInTransaction(repl::OpTime());
insertOplogEntry(entry1);
repl::OplogEntry entry2(repl::OpTime(Timestamp(67, 54801), 2),
@@ -66,7 +66,7 @@ TEST_F(SessionHistoryIteratorTest, NormalHistory) {
repl::OpTypeEnum::kInsert,
NamespaceString("a.b"),
BSON("y" << 50));
- entry2.setPrevWriteTsInTransaction(Timestamp(52, 345));
+ entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(52, 345), 2));
insertOplogEntry(entry2);
// Insert an unrelated entry in between
@@ -75,7 +75,7 @@ TEST_F(SessionHistoryIteratorTest, NormalHistory) {
repl::OpTypeEnum::kInsert,
NamespaceString("a.b"),
BSON("z" << 40));
- entry3.setPrevWriteTsInTransaction(Timestamp(22, 67));
+ entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(22, 67), 2));
insertOplogEntry(entry3);
repl::OplogEntry entry4(repl::OpTime(Timestamp(97, 2472), 2),
@@ -83,29 +83,29 @@ TEST_F(SessionHistoryIteratorTest, NormalHistory) {
repl::OpTypeEnum::kInsert,
NamespaceString("a.b"),
BSON("a" << 3));
- entry4.setPrevWriteTsInTransaction(Timestamp(67, 54801));
+ entry4.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(67, 54801), 2));
insertOplogEntry(entry4);
- TransactionHistoryIterator iter(Timestamp(97, 2472));
+ TransactionHistoryIterator iter(repl::OpTime(Timestamp(97, 2472), 2));
{
ASSERT_TRUE(iter.hasNext());
auto nextEntry = iter.next(opCtx());
- ASSERT_EQ(Timestamp(97, 2472), nextEntry.getTimestamp());
+ ASSERT_EQ(repl::OpTime(Timestamp(97, 2472), 2), nextEntry.getOpTime());
ASSERT_BSONOBJ_EQ(BSON("a" << 3), nextEntry.getObject());
}
{
ASSERT_TRUE(iter.hasNext());
auto nextEntry = iter.next(opCtx());
- ASSERT_EQ(Timestamp(67, 54801), nextEntry.getTimestamp());
+ ASSERT_EQ(repl::OpTime(Timestamp(67, 54801), 2), nextEntry.getOpTime());
ASSERT_BSONOBJ_EQ(BSON("y" << 50), nextEntry.getObject());
}
{
ASSERT_TRUE(iter.hasNext());
auto nextEntry = iter.next(opCtx());
- ASSERT_EQ(Timestamp(52, 345), nextEntry.getTimestamp());
+ ASSERT_EQ(repl::OpTime(Timestamp(52, 345), 2), nextEntry.getOpTime());
ASSERT_BSONOBJ_EQ(BSON("x" << 30), nextEntry.getObject());
}
@@ -118,10 +118,10 @@ TEST_F(SessionHistoryIteratorTest, StartAtZeroTSShouldNotBeAbleToIterate) {
repl::OpTypeEnum::kInsert,
NamespaceString("a.b"),
BSON("y" << 50));
- entry.setPrevWriteTsInTransaction(Timestamp(52, 345));
+ entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(52, 345), 1));
insertOplogEntry(entry);
- TransactionHistoryIterator iter(Timestamp(0, 0));
+ TransactionHistoryIterator iter({});
ASSERT_FALSE(iter.hasNext());
}
@@ -131,14 +131,15 @@ TEST_F(SessionHistoryIteratorTest, NextShouldAssertIfHistoryIsTruncated) {
repl::OpTypeEnum::kInsert,
NamespaceString("a.b"),
BSON("y" << 50));
- entry.setPrevWriteTsInTransaction(Timestamp(52, 345));
+ entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(52, 345), 1));
insertOplogEntry(entry);
- TransactionHistoryIterator iter(Timestamp(67, 54801));
+ repl::OpTime opTime(Timestamp(67, 54801), 2);
+ TransactionHistoryIterator iter(opTime);
ASSERT_TRUE(iter.hasNext());
auto nextEntry = iter.next(opCtx());
- ASSERT_EQ(Timestamp(67, 54801), nextEntry.getTimestamp());
+ ASSERT_EQ(opTime, nextEntry.getOpTime());
ASSERT_BSONOBJ_EQ(BSON("y" << 50), nextEntry.getObject());
ASSERT_TRUE(iter.hasNext());
@@ -154,7 +155,7 @@ TEST_F(SessionHistoryIteratorTest, OplogInWriteHistoryChainWithMissingPrevTSShou
BSON("y" << 50));
insertOplogEntry(entry);
- TransactionHistoryIterator iter(Timestamp(67, 54801));
+ TransactionHistoryIterator iter(repl::OpTime(Timestamp(67, 54801), 2));
ASSERT_TRUE(iter.hasNext());
ASSERT_THROWS_CODE(iter.next(opCtx()), AssertionException, ErrorCodes::FailedToParse);
}
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
index 7257a56f29a..efdbb4bc3d2 100644
--- a/src/mongo/db/transaction_reaper.cpp
+++ b/src/mongo/db/transaction_reaper.cpp
@@ -75,6 +75,8 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(TransactionRecordMinimumLifetimeMinutes,
const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
+const auto kLastWriteTimestampFieldName =
+ SessionTxnRecord::kLastWriteOpTimeFieldName + "." + repl::OpTime::kTimestampFieldName;
/**
* Makes the query we'll use to scan the transactions table.
@@ -89,7 +91,7 @@ Query makeQuery(Date_t now) {
0);
BSONObjBuilder bob;
{
- BSONObjBuilder subbob(bob.subobjStart(SessionTxnRecord::kLastWriteOpTimeTsFieldName));
+ BSONObjBuilder subbob(bob.subobjStart(kLastWriteTimestampFieldName));
subbob.append("$lt", possiblyExpired);
}
Query query(bob.obj());