summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-10-17 16:28:09 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-10-25 09:10:33 -0400
commit0935d7067068b3cb62a802a8696dd39c8d7e1944 (patch)
tree099dbd1d303769de9f5eaa942e4e96434c83d8eb /src
parent080514c9ee36564d8342cd832f3c5f3a199a9845 (diff)
downloadmongo-0935d7067068b3cb62a802a8696dd39c8d7e1944.tar.gz
SERVER-31281 Use separate wallclock time to track the last use of a transaction
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/op_observer_impl.cpp31
-rw-r--r--src/mongo/db/repl/sync_tail.cpp2
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp88
-rw-r--r--src/mongo/db/session.cpp38
-rw-r--r--src/mongo/db/session.h6
-rw-r--r--src/mongo/db/session_test.cpp26
-rw-r--r--src/mongo/db/session_txn_record.idl5
-rw-r--r--src/mongo/db/transaction_reaper.cpp10
8 files changed, 114 insertions, 92 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 5621f9da8a2..6da9b7f9599 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -77,13 +77,17 @@ void onWriteOpCompleted(OperationContext* opCtx,
const NamespaceString& nss,
Session* session,
std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime) {
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t lastStmtIdWriteDate) {
if (lastStmtIdWriteOpTime.isNull())
return;
if (session) {
- session->onWriteOpCompletedOnPrimary(
- opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
+ session->onWriteOpCompletedOnPrimary(opCtx,
+ *opCtx->getTxnNumber(),
+ std::move(stmtIdsWritten),
+ lastStmtIdWriteOpTime,
+ lastStmtIdWriteDate);
}
}
@@ -304,8 +308,10 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
- const auto opTimeList = repl::logInsertOps(
- opCtx, nss, uuid, session, begin, end, fromMigrate, getWallClockTimeForOpLog(opCtx));
+ const auto lastWriteDate = getWallClockTimeForOpLog(opCtx);
+
+ const auto opTimeList =
+ repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate, lastWriteDate);
auto css = CollectionShardingState::get(opCtx, nss.ns());
@@ -319,7 +325,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
}
}
- auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back();
+ const auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back();
if (nss.coll() == "system.js") {
Scope::storedFuncMod(opCtx);
} else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) {
@@ -339,7 +345,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
return stmt.stmtId;
});
- onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime);
+ onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate);
}
void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
@@ -389,8 +395,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc);
}
- onWriteOpCompleted(
- opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime);
+ onWriteOpCompleted(opCtx,
+ args.nss,
+ session,
+ std::vector<StmtId>{args.stmtId},
+ opTime.writeOpTime,
+ opTime.wallClockTime);
}
auto OpObserverImpl::aboutToDelete(OperationContext* opCtx,
@@ -434,7 +444,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey);
}
- onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime);
+ onWriteOpCompleted(
+ opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime, opTime.wallClockTime);
}
void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 25295eb133d..4cde3fec5c5 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -666,6 +666,8 @@ void fillWriterVectorsAndLastestSessionRecords(
record.setSessionId(lsid);
record.setTxnNum(*sessionInfo.getTxnNumber());
record.setLastWriteOpTime(op.getOpTime());
+ invariant(op.getWallClockTime());
+ record.setLastWriteDate(*op.getWallClockTime());
auto it = latestSessionRecords->find(lsid);
if (it == latestSessionRecords->end()) {
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 461d53e40b2..1df0fd145eb 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -267,51 +267,49 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum);
- writeConflictRetry(opCtx,
- "SessionOplogMigration",
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- [&] {
- // Need to take global lock here so repl::logOp will not unlock it and
- // trigger the invariant that disallows unlocking global lock while
- // inside a WUOW. Grab a DBLock here instead of plain GlobalLock to make
- // sure the MMAPV1 flush lock will be lock/unlocked correctly. Take the
- // transaction table db lock to ensure the same lock ordering with normal
- // replicated updates to the table.
- Lock::DBLock lk(opCtx,
- NamespaceString::kSessionTransactionsTableNamespace.db(),
- MODE_IX);
- WriteUnitOfWork wunit(opCtx);
-
- result.oplogTime = repl::logOp(opCtx,
- "n",
- oplogEntry.getNamespace(),
- oplogEntry.getUuid(),
- object,
- &object2,
- true,
- *oplogEntry.getWallClockTime(),
- sessionInfo,
- stmtId,
- oplogLink);
-
- auto oplogOpTime = result.oplogTime;
- uassert(40633,
- str::stream()
- << "Failed to create new oplog entry for oplog with opTime: "
- << oplogEntry.getOpTime().toString()
- << ": "
- << redact(oplogBSON),
- !oplogOpTime.isNull());
-
- // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post
- // image, because the next oplog will contain the real operation.
- if (!result.isPrePostImage) {
- scopedSession->onWriteOpCompletedOnPrimary(
- opCtx, result.txnNum, {stmtId}, oplogOpTime);
- }
-
- wunit.commit();
- });
+ writeConflictRetry(
+ opCtx,
+ "SessionOplogMigration",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ // Need to take global lock here so repl::logOp will not unlock it and trigger the
+ // invariant that disallows unlocking global lock while inside a WUOW. Grab a DBLock
+ // here instead of plain GlobalLock to make sure the MMAPV1 flush lock will be
+ // lock/unlocked correctly. Take the transaction table db lock to ensure the same lock
+ // ordering with normal replicated updates to the table.
+ Lock::DBLock lk(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX);
+ WriteUnitOfWork wunit(opCtx);
+
+ result.oplogTime = repl::logOp(opCtx,
+ "n",
+ oplogEntry.getNamespace(),
+ oplogEntry.getUuid(),
+ object,
+ &object2,
+ true,
+ *oplogEntry.getWallClockTime(),
+ sessionInfo,
+ stmtId,
+ oplogLink);
+
+ auto oplogOpTime = result.oplogTime;
+ uassert(40633,
+ str::stream() << "Failed to create new oplog entry for oplog with opTime: "
+ << oplogEntry.getOpTime().toString()
+ << ": "
+ << redact(oplogBSON),
+ !oplogOpTime.isNull());
+
+ // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post
+ // image, because the next oplog will contain the real operation.
+ if (!result.isPrePostImage) {
+ scopedSession->onWriteOpCompletedOnPrimary(
+ opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime());
+ }
+
+ wunit.commit();
+ });
return result;
}
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 8acfb4cb2d0..bc586137081 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -191,7 +191,8 @@ void Session::beginTxn(OperationContext* opCtx, TxnNumber txnNumber) {
void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
TxnNumber txnNumber,
std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime) {
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t lastStmtIdWriteDate) {
invariant(opCtx->lockState()->inAWriteUnitOfWork());
stdx::unique_lock<stdx::mutex> ul(_mutex);
@@ -205,7 +206,8 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
}
}
- const auto updateRequest = _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime);
+ const auto updateRequest =
+ _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate);
ul.unlock();
@@ -354,26 +356,32 @@ boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl,
UpdateRequest Session::_makeUpdateRequest(WithLock,
TxnNumber newTxnNumber,
- const repl::OpTime& newLastWriteOpTime) const {
+ const repl::OpTime& newLastWriteOpTime,
+ Date_t newLastWriteDate) const {
UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
if (_lastWrittenSessionRecord) {
- updateRequest.setQuery(_lastWrittenSessionRecord->toBSON());
+ updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName
+ << _sessionId.toBSON()
+ << SessionTxnRecord::kTxnNumFieldName
+ << _lastWrittenSessionRecord->getTxnNum()
+ << SessionTxnRecord::kLastWriteOpTimeFieldName
+ << _lastWrittenSessionRecord->getLastWriteOpTime()));
updateRequest.setUpdates(BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName
<< newTxnNumber
<< SessionTxnRecord::kLastWriteOpTimeFieldName
- << newLastWriteOpTime)));
+ << newLastWriteOpTime
+ << SessionTxnRecord::kLastWriteDateFieldName
+ << newLastWriteDate)));
} else {
- const auto updateBSON = [&] {
- SessionTxnRecord newTxnRecord;
- newTxnRecord.setSessionId(_sessionId);
- newTxnRecord.setTxnNum(newTxnNumber);
- newTxnRecord.setLastWriteOpTime(newLastWriteOpTime);
- return newTxnRecord.toBSON();
- }();
-
- updateRequest.setQuery(updateBSON);
- updateRequest.setUpdates(updateBSON);
+ updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName
+ << _sessionId.toBSON()
+ << SessionTxnRecord::kTxnNumFieldName
+ << newTxnNumber
+ << SessionTxnRecord::kLastWriteOpTimeFieldName
+ << newLastWriteOpTime));
+ updateRequest.setUpdates(
+ BSON("$set" << BSON(SessionTxnRecord::kLastWriteDateFieldName << newLastWriteDate)));
updateRequest.setUpsert(true);
}
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index f627ca8468d..c1b6c95b711 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -98,7 +98,8 @@ public:
void onWriteOpCompletedOnPrimary(OperationContext* opCtx,
TxnNumber txnNumber,
std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime);
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t lastStmtIdWriteDate);
/**
* Called after a replication batch has been applied on a secondary node. Keeps the session
@@ -152,7 +153,8 @@ private:
UpdateRequest _makeUpdateRequest(WithLock,
TxnNumber newTxnNumber,
- const repl::OpTime& newLastWriteTs) const;
+ const repl::OpTime& newLastWriteTs,
+ Date_t newLastWriteDate) const;
void _registerUpdateCacheOnCommit(OperationContext* opCtx,
TxnNumber newTxnNumber,
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index a7b1690b369..aacc609394d 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -128,7 +128,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
wuow.commit();
return opTime;
@@ -160,7 +160,7 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now());
wuow.commit();
return opTime;
@@ -218,7 +218,7 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime),
+ ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()),
AssertionException);
}
@@ -234,7 +234,7 @@ TEST_F(SessionTest, CheckStatementExecuted) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now());
wuow.commit();
return opTime;
@@ -291,7 +291,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
wuow.commit();
}
@@ -299,9 +299,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0);
- ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime, Date_t::now()),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
}
@@ -319,9 +320,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) {
session.invalidate();
- ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
@@ -336,7 +338,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
session.invalidate();
diff --git a/src/mongo/db/session_txn_record.idl b/src/mongo/db/session_txn_record.idl
index fa81319225b..c9924a32a32 100644
--- a/src/mongo/db/session_txn_record.idl
+++ b/src/mongo/db/session_txn_record.idl
@@ -60,4 +60,7 @@ structs:
type: optime
description: "The optime timestamp component of the last write on this
transaction."
-
+ lastWriteDate:
+ type: date
+ description: "Wall clock time of the last write which happened on on this
+ transaction."
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
index d8203bc8e3f..f9187cb9e82 100644
--- a/src/mongo/db/transaction_reaper.cpp
+++ b/src/mongo/db/transaction_reaper.cpp
@@ -75,8 +75,7 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(TransactionRecordMinimumLifetimeMinutes,
const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
-const auto kLastWriteTimestampFieldName =
- SessionTxnRecord::kLastWriteOpTimeFieldName + "." + repl::OpTime::kTimestampFieldName;
+const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName;
/**
* Makes the query we'll use to scan the transactions table.
@@ -85,11 +84,8 @@ const auto kLastWriteTimestampFieldName =
* to pull records likely to be on the same chunks (because they sort near each other).
*/
Query makeQuery(Date_t now) {
- const Timestamp possiblyExpired(
- duration_cast<Seconds>(
- (now - Minutes(TransactionRecordMinimumLifetimeMinutes)).toDurationSinceEpoch()),
- 0);
- Query query(BSON(kLastWriteTimestampFieldName << LT << possiblyExpired));
+ const Date_t possiblyExpired(now - Minutes(TransactionRecordMinimumLifetimeMinutes));
+ Query query(BSON(kLastWriteDateFieldName << LT << possiblyExpired));
query.sort(kSortById);
return query;
}