summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-09-20 11:03:05 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-09-20 11:15:47 -0400
commitc565d15a53cd6dd452da97d49c6b9c6cbffb6cf1 (patch)
tree93abf8b71235cceb895df4ce50dda2ef14929948 /src/mongo/db
parent8a5656c58c18e7c23d5bec6f9b2d41013ee80a47 (diff)
downloadmongo-c565d15a53cd6dd452da97d49c6b9c6cbffb6cf1.tar.gz
SERVER-31114 Perform targeted session invalidation on direct writes to `config.transactions`
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/op_observer_impl.cpp27
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp2
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp2
-rw-r--r--src/mongo/db/session.cpp2
-rw-r--r--src/mongo/db/session_catalog.cpp33
-rw-r--r--src/mongo/db/session_catalog.h12
6 files changed, 56 insertions, 22 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 160c1fac2d6..be7b8c9be8c 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -79,14 +79,7 @@ void onWriteOpCompleted(OperationContext* opCtx,
if (lastStmtIdWriteTs.isNull())
return;
- if (nss == NamespaceString::kSessionTransactionsTableNamespace) {
- uassert(40528,
- str::stream() << "Direct writes against "
- << NamespaceString::kSessionTransactionsTableNamespace.ns()
- << " cannot be performed using a transaction or on a session.",
- !opCtx->getLogicalSessionId());
- SessionCatalog::get(opCtx)->resetSessions();
- } else if (session) {
+ if (session) {
session->onWriteOpCompletedOnPrimary(
opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteTs);
}
@@ -291,6 +284,10 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
for (auto it = begin; it != end; it++) {
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, it->doc);
}
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) {
+ for (auto it = begin; it != end; it++) {
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc);
+ }
}
std::vector<StmtId> stmtIdsWritten;
@@ -324,6 +321,9 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
DurableViewCatalog::onExternalChange(opCtx, args.nss);
} else if (args.nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc);
+ } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace &&
+ !opTime.isNull()) {
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc);
}
onWriteOpCompleted(opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime);
@@ -348,7 +348,6 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
}
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
-
const auto opTime =
replLogDelete(opCtx, nss, uuid, session, stmtId, deleteState, fromMigrate, deletedDoc);
@@ -366,6 +365,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
DurableViewCatalog::onExternalChange(opCtx, nss);
} else if (nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onDelete(opCtx, deleteState.documentKey);
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.isNull()) {
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey);
}
onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime);
@@ -483,6 +484,8 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string&
if (dbName == FeatureCompatibilityVersion::kDatabase) {
FeatureCompatibilityVersion::onDropCollection(opCtx);
+ } else if (dbName == NamespaceString::kSessionTransactionsTableNamespace.db()) {
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
}
NamespaceUUIDCache::get(opCtx).evictNamespacesInDatabase(dbName);
@@ -506,10 +509,10 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
if (collectionName.coll() == DurableViewCatalog::viewsCollectionName()) {
DurableViewCatalog::onExternalChange(opCtx, collectionName);
- }
-
- if (collectionName.ns() == FeatureCompatibilityVersion::kCollection) {
+ } else if (collectionName.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onDropCollection(opCtx);
+ } else if (collectionName == NamespaceString::kSessionTransactionsTableNamespace) {
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
}
AuthorizationManager::get(opCtx->getServiceContext())
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index 1b791b63d9c..c3a192c8d7a 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -243,7 +243,7 @@ void RollbackImpl::_resetSessions(OperationContext* opCtx) {
log() << "resetting in-memory state of active sessions";
- SessionCatalog::get(opCtx)->resetSessions();
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
}
void RollbackImpl::_transitionFromRollbackToSecondary(OperationContext* opCtx) {
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 6887a962baf..9af9794174f 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -1346,7 +1346,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
// If necessary, clear the memory of existing sessions.
if (fixUpInfo.refetchTransactionDocs) {
- SessionCatalog::get(opCtx)->resetSessions();
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
}
// Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index d2e7c775619..9e3bece6063 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -323,7 +323,6 @@ UpdateRequest Session::_makeUpdateRequest(WithLock,
TxnNumber newTxnNumber,
Timestamp newLastWriteTs) const {
UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
- updateRequest.setUpsert(true);
if (_lastWrittenSessionRecord) {
updateRequest.setQuery(_lastWrittenSessionRecord->toBSON());
@@ -343,6 +342,7 @@ UpdateRequest Session::_makeUpdateRequest(WithLock,
updateRequest.setQuery(updateBSON);
updateRequest.setUpdates(updateBSON);
+ updateRequest.setUpsert(true);
}
return updateRequest;
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index f155d10639e..8f73410fe1c 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -104,7 +104,7 @@ boost::optional<UUID> SessionCatalog::getTransactionTableUUID(OperationContext*
}
void SessionCatalog::onStepUp(OperationContext* opCtx) {
- resetSessions();
+ invalidateSessions(opCtx, boost::none);
DBDirectClient client(opCtx);
@@ -172,10 +172,35 @@ ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx,
return ss;
}
-void SessionCatalog::resetSessions() {
+void SessionCatalog::invalidateSessions(OperationContext* opCtx,
+ boost::optional<BSONObj> singleSessionDoc) {
+ uassert(40528,
+ str::stream() << "Direct writes against "
+ << NamespaceString::kSessionTransactionsTableNamespace.ns()
+ << " cannot be performed using a transaction or on a session.",
+ !opCtx->getLogicalSessionId());
+
+ const auto invalidateSessionFn = [&](WithLock, SessionRuntimeInfoMap::iterator it) {
+ auto& sri = it->second;
+ sri->txnState.invalidate();
+ _txnTable.erase(it);
+ };
+
stdx::lock_guard<stdx::mutex> lg(_mutex);
- for (const auto& it : _txnTable) {
- it.second->txnState.invalidate();
+
+ if (singleSessionDoc) {
+ const auto lsid = LogicalSessionId::parse(IDLParserErrorContext("lsid"),
+ singleSessionDoc->getField("_id").Obj());
+
+ auto it = _txnTable.find(lsid);
+ if (it != _txnTable.end()) {
+ invalidateSessionFn(lg, it);
+ }
+ } else {
+ auto it = _txnTable.begin();
+ while (it != _txnTable.end()) {
+ invalidateSessionFn(lg, it++);
+ }
}
}
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 3e03c97d83d..e0432c439f5 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -112,10 +112,16 @@ public:
ScopedSession getOrCreateSession(OperationContext* opCtx, const LogicalSessionId& lsid);
/**
- * Resets all created sessions and increments their generation, forcing each to be reloaded by
- * subsequent write commands. Invoked after rollback.
+ * Callback to be invoked when it is suspected that the on-disk session contents might not be in
+ * sync with what is in the sessions cache.
+ *
+ * If no specific document is available, the method will invalidate all sessions. Otherwise if
+ * one is avaiable (which is the case for insert/update/delete), it must contain _id field with
+ * a valid session entry, in which case only that particular session will be invalidated. If the
+ * _id field is missing or doesn't contain a valid serialization of logical session, the method
+ * will throw. This prevents invalid entries from making it in the collection.
*/
- void resetSessions();
+ void invalidateSessions(OperationContext* opCtx, boost::optional<BSONObj> singleSessionDoc);
private:
struct SessionRuntimeInfo {