summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/replsets/retryable_writes_direct_write_to_config_transactions.js83
-rw-r--r--jstests/sharding/retryable_writes.js2
-rw-r--r--src/mongo/db/op_observer_impl.cpp27
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp2
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp2
-rw-r--r--src/mongo/db/session.cpp2
-rw-r--r--src/mongo/db/session_catalog.cpp33
-rw-r--r--src/mongo/db/session_catalog.h12
8 files changed, 140 insertions, 23 deletions
diff --git a/jstests/replsets/retryable_writes_direct_write_to_config_transactions.js b/jstests/replsets/retryable_writes_direct_write_to_config_transactions.js
new file mode 100644
index 00000000000..62d0b840711
--- /dev/null
+++ b/jstests/replsets/retryable_writes_direct_write_to_config_transactions.js
@@ -0,0 +1,83 @@
+// Validates the expected behaviour of direct writes against the `config.transactions` collection
+(function() {
+ 'use strict';
+
+ var replTest = new ReplSetTest({nodes: 2});
+ replTest.startSet();
+ replTest.initiate();
+
+ var priConn = replTest.getPrimary();
+ var db = priConn.getDB('TestDB');
+ var config = priConn.getDB('config');
+
+ assert.writeOK(db.user.insert({_id: 0}));
+ assert.writeOK(db.user.insert({_id: 1}));
+
+ const lsid1 = UUID();
+ const lsid2 = UUID();
+
+ const cmdObj1 = {
+ update: 'user',
+ updates: [{q: {_id: 0}, u: {$inc: {x: 1}}}],
+ lsid: {id: lsid1},
+ txnNumber: NumberLong(1)
+ };
+ assert.commandWorked(db.runCommand(cmdObj1));
+ assert.eq(1, db.user.find({_id: 0}).toArray()[0].x);
+
+ const cmdObj2 = {
+ update: 'user',
+ updates: [{q: {_id: 1}, u: {$inc: {x: 1}}}],
+ lsid: {id: lsid2},
+ txnNumber: NumberLong(1)
+ };
+ assert.commandWorked(db.runCommand(cmdObj2));
+ assert.eq(1, db.user.find({_id: 1}).toArray()[0].x);
+
+ assert.eq(1, config.transactions.find({'_id.id': lsid1}).itcount());
+ assert.eq(1, config.transactions.find({'_id.id': lsid2}).itcount());
+
+ // Invalidating lsid1 doesn't impact lsid2, but allows same statement to be executed again
+ assert.writeOK(config.transactions.remove({'_id.id': lsid1}));
+ assert.commandWorked(db.runCommand(cmdObj1));
+ assert.eq(2, db.user.find({_id: 0}).toArray()[0].x);
+ assert.commandWorked(db.runCommand(cmdObj2));
+ assert.eq(1, db.user.find({_id: 1}).toArray()[0].x);
+
+ // Ensure lsid1 is properly tracked after the recreate
+ assert.commandWorked(db.runCommand(cmdObj1));
+ assert.eq(2, db.user.find({_id: 0}).toArray()[0].x);
+
+ // Ensure garbage data cannot be written to the `config.transactions` collection
+ assert.writeError(config.transactions.insert({_id: 'String'}));
+ assert.writeError(config.transactions.insert({_id: {UnknownField: 'Garbage'}}));
+
+ // Ensure inserting an invalid session record manually without all the required fields causes
+ // the session to not work anymore for retryable writes for that session, but not for any other
+ const lsidManual = config.transactions.find({'_id.id': lsid1}).toArray()[0]._id;
+ assert.writeOK(config.transactions.remove({'_id.id': lsid1}));
+ assert.writeOK(config.transactions.insert({_id: lsidManual}));
+
+ const lsid3 = UUID();
+ assert.commandWorked(db.runCommand({
+ update: 'user',
+ updates: [{q: {_id: 2}, u: {$inc: {x: 1}}, upsert: true}],
+ lsid: {id: lsid3},
+ txnNumber: NumberLong(1)
+ }));
+ assert.eq(1, db.user.find({_id: 2}).toArray()[0].x);
+
+ // Ensure dropping the `config.transactions` collection breaks the retryable writes feature, but
+ // doesn't crash the server
+ assert(config.transactions.drop());
+ var res = assert.commandWorked(db.runCommand(cmdObj2));
+ assert.eq(0, res.nModified);
+ assert.eq(1, db.user.find({_id: 1}).toArray()[0].x);
+
+ assert(config.dropDatabase());
+ res = assert.commandWorked(db.runCommand(cmdObj2));
+ assert.eq(0, res.nModified);
+ assert.eq(1, db.user.find({_id: 1}).toArray()[0].x);
+
+ replTest.stopSet();
+})();
diff --git a/jstests/sharding/retryable_writes.js b/jstests/sharding/retryable_writes.js
index adaccecc104..cd1914e83bb 100644
--- a/jstests/sharding/retryable_writes.js
+++ b/jstests/sharding/retryable_writes.js
@@ -344,7 +344,7 @@
}
// Tests for replica set
- var replTest = new ReplSetTest({nodes: 1});
+ var replTest = new ReplSetTest({nodes: 2});
replTest.startSet();
replTest.initiate();
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 160c1fac2d6..be7b8c9be8c 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -79,14 +79,7 @@ void onWriteOpCompleted(OperationContext* opCtx,
if (lastStmtIdWriteTs.isNull())
return;
- if (nss == NamespaceString::kSessionTransactionsTableNamespace) {
- uassert(40528,
- str::stream() << "Direct writes against "
- << NamespaceString::kSessionTransactionsTableNamespace.ns()
- << " cannot be performed using a transaction or on a session.",
- !opCtx->getLogicalSessionId());
- SessionCatalog::get(opCtx)->resetSessions();
- } else if (session) {
+ if (session) {
session->onWriteOpCompletedOnPrimary(
opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteTs);
}
@@ -291,6 +284,10 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
for (auto it = begin; it != end; it++) {
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, it->doc);
}
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) {
+ for (auto it = begin; it != end; it++) {
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc);
+ }
}
std::vector<StmtId> stmtIdsWritten;
@@ -324,6 +321,9 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
DurableViewCatalog::onExternalChange(opCtx, args.nss);
} else if (args.nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc);
+ } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace &&
+ !opTime.isNull()) {
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc);
}
onWriteOpCompleted(opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime);
@@ -348,7 +348,6 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
}
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
-
const auto opTime =
replLogDelete(opCtx, nss, uuid, session, stmtId, deleteState, fromMigrate, deletedDoc);
@@ -366,6 +365,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
DurableViewCatalog::onExternalChange(opCtx, nss);
} else if (nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onDelete(opCtx, deleteState.documentKey);
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.isNull()) {
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey);
}
onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime);
@@ -483,6 +484,8 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string&
if (dbName == FeatureCompatibilityVersion::kDatabase) {
FeatureCompatibilityVersion::onDropCollection(opCtx);
+ } else if (dbName == NamespaceString::kSessionTransactionsTableNamespace.db()) {
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
}
NamespaceUUIDCache::get(opCtx).evictNamespacesInDatabase(dbName);
@@ -506,10 +509,10 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
if (collectionName.coll() == DurableViewCatalog::viewsCollectionName()) {
DurableViewCatalog::onExternalChange(opCtx, collectionName);
- }
-
- if (collectionName.ns() == FeatureCompatibilityVersion::kCollection) {
+ } else if (collectionName.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onDropCollection(opCtx);
+ } else if (collectionName == NamespaceString::kSessionTransactionsTableNamespace) {
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
}
AuthorizationManager::get(opCtx->getServiceContext())
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index 1b791b63d9c..c3a192c8d7a 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -243,7 +243,7 @@ void RollbackImpl::_resetSessions(OperationContext* opCtx) {
log() << "resetting in-memory state of active sessions";
- SessionCatalog::get(opCtx)->resetSessions();
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
}
void RollbackImpl::_transitionFromRollbackToSecondary(OperationContext* opCtx) {
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 6887a962baf..9af9794174f 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -1346,7 +1346,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
// If necessary, clear the memory of existing sessions.
if (fixUpInfo.refetchTransactionDocs) {
- SessionCatalog::get(opCtx)->resetSessions();
+ SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
}
// Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index d2e7c775619..9e3bece6063 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -323,7 +323,6 @@ UpdateRequest Session::_makeUpdateRequest(WithLock,
TxnNumber newTxnNumber,
Timestamp newLastWriteTs) const {
UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
- updateRequest.setUpsert(true);
if (_lastWrittenSessionRecord) {
updateRequest.setQuery(_lastWrittenSessionRecord->toBSON());
@@ -343,6 +342,7 @@ UpdateRequest Session::_makeUpdateRequest(WithLock,
updateRequest.setQuery(updateBSON);
updateRequest.setUpdates(updateBSON);
+ updateRequest.setUpsert(true);
}
return updateRequest;
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index f155d10639e..8f73410fe1c 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -104,7 +104,7 @@ boost::optional<UUID> SessionCatalog::getTransactionTableUUID(OperationContext*
}
void SessionCatalog::onStepUp(OperationContext* opCtx) {
- resetSessions();
+ invalidateSessions(opCtx, boost::none);
DBDirectClient client(opCtx);
@@ -172,10 +172,35 @@ ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx,
return ss;
}
-void SessionCatalog::resetSessions() {
+void SessionCatalog::invalidateSessions(OperationContext* opCtx,
+ boost::optional<BSONObj> singleSessionDoc) {
+ uassert(40528,
+ str::stream() << "Direct writes against "
+ << NamespaceString::kSessionTransactionsTableNamespace.ns()
+ << " cannot be performed using a transaction or on a session.",
+ !opCtx->getLogicalSessionId());
+
+ const auto invalidateSessionFn = [&](WithLock, SessionRuntimeInfoMap::iterator it) {
+ auto& sri = it->second;
+ sri->txnState.invalidate();
+ _txnTable.erase(it);
+ };
+
stdx::lock_guard<stdx::mutex> lg(_mutex);
- for (const auto& it : _txnTable) {
- it.second->txnState.invalidate();
+
+ if (singleSessionDoc) {
+ const auto lsid = LogicalSessionId::parse(IDLParserErrorContext("lsid"),
+ singleSessionDoc->getField("_id").Obj());
+
+ auto it = _txnTable.find(lsid);
+ if (it != _txnTable.end()) {
+ invalidateSessionFn(lg, it);
+ }
+ } else {
+ auto it = _txnTable.begin();
+ while (it != _txnTable.end()) {
+ invalidateSessionFn(lg, it++);
+ }
}
}
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 3e03c97d83d..e0432c439f5 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -112,10 +112,16 @@ public:
ScopedSession getOrCreateSession(OperationContext* opCtx, const LogicalSessionId& lsid);
/**
- * Resets all created sessions and increments their generation, forcing each to be reloaded by
- * subsequent write commands. Invoked after rollback.
+ * Callback to be invoked when it is suspected that the on-disk session contents might not be in
+ * sync with what is in the sessions cache.
+ *
+ * If no specific document is available, the method will invalidate all sessions. Otherwise if
+ * one is avaiable (which is the case for insert/update/delete), it must contain _id field with
+ * a valid session entry, in which case only that particular session will be invalidated. If the
+ * _id field is missing or doesn't contain a valid serialization of logical session, the method
+ * will throw. This prevents invalid entries from making it in the collection.
*/
- void resetSessions();
+ void invalidateSessions(OperationContext* opCtx, boost::optional<BSONObj> singleSessionDoc);
private:
struct SessionRuntimeInfo {