diff options
-rw-r--r-- | jstests/replsets/retryable_writes_direct_write_to_config_transactions.js | 83 | ||||
-rw-r--r-- | jstests/sharding/retryable_writes.js | 2 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.h | 12 |
8 files changed, 140 insertions, 23 deletions
diff --git a/jstests/replsets/retryable_writes_direct_write_to_config_transactions.js b/jstests/replsets/retryable_writes_direct_write_to_config_transactions.js new file mode 100644 index 00000000000..62d0b840711 --- /dev/null +++ b/jstests/replsets/retryable_writes_direct_write_to_config_transactions.js @@ -0,0 +1,83 @@ +// Validates the expected behaviour of direct writes against the `config.transactions` collection +(function() { + 'use strict'; + + var replTest = new ReplSetTest({nodes: 2}); + replTest.startSet(); + replTest.initiate(); + + var priConn = replTest.getPrimary(); + var db = priConn.getDB('TestDB'); + var config = priConn.getDB('config'); + + assert.writeOK(db.user.insert({_id: 0})); + assert.writeOK(db.user.insert({_id: 1})); + + const lsid1 = UUID(); + const lsid2 = UUID(); + + const cmdObj1 = { + update: 'user', + updates: [{q: {_id: 0}, u: {$inc: {x: 1}}}], + lsid: {id: lsid1}, + txnNumber: NumberLong(1) + }; + assert.commandWorked(db.runCommand(cmdObj1)); + assert.eq(1, db.user.find({_id: 0}).toArray()[0].x); + + const cmdObj2 = { + update: 'user', + updates: [{q: {_id: 1}, u: {$inc: {x: 1}}}], + lsid: {id: lsid2}, + txnNumber: NumberLong(1) + }; + assert.commandWorked(db.runCommand(cmdObj2)); + assert.eq(1, db.user.find({_id: 1}).toArray()[0].x); + + assert.eq(1, config.transactions.find({'_id.id': lsid1}).itcount()); + assert.eq(1, config.transactions.find({'_id.id': lsid2}).itcount()); + + // Invalidating lsid1 doesn't impact lsid2, but allows same statement to be executed again + assert.writeOK(config.transactions.remove({'_id.id': lsid1})); + assert.commandWorked(db.runCommand(cmdObj1)); + assert.eq(2, db.user.find({_id: 0}).toArray()[0].x); + assert.commandWorked(db.runCommand(cmdObj2)); + assert.eq(1, db.user.find({_id: 1}).toArray()[0].x); + + // Ensure lsid1 is properly tracked after the recreate + assert.commandWorked(db.runCommand(cmdObj1)); + assert.eq(2, db.user.find({_id: 0}).toArray()[0].x); + + // Ensure garbage data cannot be written to the `config.transactions` collection + assert.writeError(config.transactions.insert({_id: 'String'})); + assert.writeError(config.transactions.insert({_id: {UnknownField: 'Garbage'}})); + + // Ensure inserting an invalid session record manually without all the required fields causes + // the session to not work anymore for retryable writes for that session, but not for any other + const lsidManual = config.transactions.find({'_id.id': lsid1}).toArray()[0]._id; + assert.writeOK(config.transactions.remove({'_id.id': lsid1})); + assert.writeOK(config.transactions.insert({_id: lsidManual})); + + const lsid3 = UUID(); + assert.commandWorked(db.runCommand({ + update: 'user', + updates: [{q: {_id: 2}, u: {$inc: {x: 1}}, upsert: true}], + lsid: {id: lsid3}, + txnNumber: NumberLong(1) + })); + assert.eq(1, db.user.find({_id: 2}).toArray()[0].x); + + // Ensure dropping the `config.transactions` collection breaks the retryable writes feature, but + // doesn't crash the server + assert(config.transactions.drop()); + var res = assert.commandWorked(db.runCommand(cmdObj2)); + assert.eq(0, res.nModified); + assert.eq(1, db.user.find({_id: 1}).toArray()[0].x); + + assert(config.dropDatabase()); + res = assert.commandWorked(db.runCommand(cmdObj2)); + assert.eq(0, res.nModified); + assert.eq(1, db.user.find({_id: 1}).toArray()[0].x); + + replTest.stopSet(); +})(); diff --git a/jstests/sharding/retryable_writes.js b/jstests/sharding/retryable_writes.js index adaccecc104..cd1914e83bb 100644 --- a/jstests/sharding/retryable_writes.js +++ b/jstests/sharding/retryable_writes.js @@ -344,7 +344,7 @@ } // Tests for replica set - var replTest = new ReplSetTest({nodes: 1}); + var replTest = new ReplSetTest({nodes: 2}); replTest.startSet(); replTest.initiate(); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 160c1fac2d6..be7b8c9be8c 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -79,14 +79,7 @@ void onWriteOpCompleted(OperationContext* opCtx, if (lastStmtIdWriteTs.isNull()) return; - if (nss == NamespaceString::kSessionTransactionsTableNamespace) { - uassert(40528, - str::stream() << "Direct writes against " - << NamespaceString::kSessionTransactionsTableNamespace.ns() - << " cannot be performed using a transaction or on a session.", - !opCtx->getLogicalSessionId()); - SessionCatalog::get(opCtx)->resetSessions(); - } else if (session) { + if (session) { session->onWriteOpCompletedOnPrimary( opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteTs); } @@ -291,6 +284,10 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, for (auto it = begin; it != end; it++) { FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, it->doc); } + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) { + for (auto it = begin; it != end; it++) { + SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc); + } } std::vector<StmtId> stmtIdsWritten; @@ -324,6 +321,9 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg DurableViewCatalog::onExternalChange(opCtx, args.nss); } else if (args.nss.ns() == FeatureCompatibilityVersion::kCollection) { FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc); + } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace && + !opTime.isNull()) { + SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc); } onWriteOpCompleted(opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime); @@ -348,7 +348,6 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, } Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; - const auto opTime = replLogDelete(opCtx, nss, uuid, session, stmtId, deleteState, fromMigrate, deletedDoc); @@ -366,6 +365,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, DurableViewCatalog::onExternalChange(opCtx, nss); } else if (nss.ns() == FeatureCompatibilityVersion::kCollection) { FeatureCompatibilityVersion::onDelete(opCtx, deleteState.documentKey); + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.isNull()) { + SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey); } onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime); @@ -483,6 +484,8 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& if (dbName == FeatureCompatibilityVersion::kDatabase) { FeatureCompatibilityVersion::onDropCollection(opCtx); + } else if (dbName == NamespaceString::kSessionTransactionsTableNamespace.db()) { + SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); } NamespaceUUIDCache::get(opCtx).evictNamespacesInDatabase(dbName); @@ -506,10 +509,10 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, if (collectionName.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange(opCtx, collectionName); - } - - if (collectionName.ns() == FeatureCompatibilityVersion::kCollection) { + } else if (collectionName.ns() == FeatureCompatibilityVersion::kCollection) { FeatureCompatibilityVersion::onDropCollection(opCtx); + } else if (collectionName == NamespaceString::kSessionTransactionsTableNamespace) { + SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); } AuthorizationManager::get(opCtx->getServiceContext()) diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index 1b791b63d9c..c3a192c8d7a 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -243,7 +243,7 @@ void RollbackImpl::_resetSessions(OperationContext* opCtx) { log() << "resetting in-memory state of active sessions"; - SessionCatalog::get(opCtx)->resetSessions(); + SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); } void RollbackImpl::_transitionFromRollbackToSecondary(OperationContext* opCtx) { diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 6887a962baf..9af9794174f 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -1346,7 +1346,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, // If necessary, clear the memory of existing sessions. if (fixUpInfo.refetchTransactionDocs) { - SessionCatalog::get(opCtx)->resetSessions(); + SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); } // Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index d2e7c775619..9e3bece6063 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -323,7 +323,6 @@ UpdateRequest Session::_makeUpdateRequest(WithLock, TxnNumber newTxnNumber, Timestamp newLastWriteTs) const { UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); - updateRequest.setUpsert(true); if (_lastWrittenSessionRecord) { updateRequest.setQuery(_lastWrittenSessionRecord->toBSON()); @@ -343,6 +342,7 @@ UpdateRequest Session::_makeUpdateRequest(WithLock, updateRequest.setQuery(updateBSON); updateRequest.setUpdates(updateBSON); + updateRequest.setUpsert(true); } return updateRequest; diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index f155d10639e..8f73410fe1c 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -104,7 +104,7 @@ boost::optional<UUID> SessionCatalog::getTransactionTableUUID(OperationContext* } void SessionCatalog::onStepUp(OperationContext* opCtx) { - resetSessions(); + invalidateSessions(opCtx, boost::none); DBDirectClient client(opCtx); @@ -172,10 +172,35 @@ ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx, return ss; } -void SessionCatalog::resetSessions() { +void SessionCatalog::invalidateSessions(OperationContext* opCtx, + boost::optional<BSONObj> singleSessionDoc) { + uassert(40528, + str::stream() << "Direct writes against " + << NamespaceString::kSessionTransactionsTableNamespace.ns() + << " cannot be performed using a transaction or on a session.", + !opCtx->getLogicalSessionId()); + + const auto invalidateSessionFn = [&](WithLock, SessionRuntimeInfoMap::iterator it) { + auto& sri = it->second; + sri->txnState.invalidate(); + _txnTable.erase(it); + }; + stdx::lock_guard<stdx::mutex> lg(_mutex); - for (const auto& it : _txnTable) { - it.second->txnState.invalidate(); + + if (singleSessionDoc) { + const auto lsid = LogicalSessionId::parse(IDLParserErrorContext("lsid"), + singleSessionDoc->getField("_id").Obj()); + + auto it = _txnTable.find(lsid); + if (it != _txnTable.end()) { + invalidateSessionFn(lg, it); + } + } else { + auto it = _txnTable.begin(); + while (it != _txnTable.end()) { + invalidateSessionFn(lg, it++); + } } } diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index 3e03c97d83d..e0432c439f5 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -112,10 +112,16 @@ public: ScopedSession getOrCreateSession(OperationContext* opCtx, const LogicalSessionId& lsid); /** - * Resets all created sessions and increments their generation, forcing each to be reloaded by - * subsequent write commands. Invoked after rollback. + * Callback to be invoked when it is suspected that the on-disk session contents might not be in + * sync with what is in the sessions cache. + * + * If no specific document is available, the method will invalidate all sessions. Otherwise if + * one is avaiable (which is the case for insert/update/delete), it must contain _id field with + * a valid session entry, in which case only that particular session will be invalidated. If the + * _id field is missing or doesn't contain a valid serialization of logical session, the method + * will throw. This prevents invalid entries from making it in the collection. */ - void resetSessions(); + void invalidateSessions(OperationContext* opCtx, boost::optional<BSONObj> singleSessionDoc); private: struct SessionRuntimeInfo { |