diff options
-rw-r--r-- | jstests/noPassthrough/multi_statement_transaction.js | 84 | ||||
-rw-r--r-- | src/mongo/db/commands.h | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/txn_cmds.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 5 |
7 files changed, 127 insertions, 16 deletions
diff --git a/jstests/noPassthrough/multi_statement_transaction.js b/jstests/noPassthrough/multi_statement_transaction.js new file mode 100644 index 00000000000..11e543c2bd4 --- /dev/null +++ b/jstests/noPassthrough/multi_statement_transaction.js @@ -0,0 +1,84 @@ +// Test basic transaction commits with two inserts. +// @tags: [requires_replication] +(function() { + "use strict"; + load('jstests/libs/uuid_util.js'); + + const dbName = "test"; + const collName = "coll"; + + const rst = new ReplSetTest({nodes: 1}); + rst.startSet(); + rst.initiate(); + const testDB = rst.getPrimary().getDB(dbName); + const coll = testDB.coll; + + if (!testDB.serverStatus().storageEngine.supportsSnapshotReadConcern) { + rst.stopSet(); + return; + } + + testDB.runCommand({create: coll.getName(), writeConcern: {w: "majority"}}); + const uuid = getUUIDFromListCollections(testDB, coll.getName()); + const oplog = testDB.getSiblingDB('local').oplog.rs; + let txnNumber = 0; + + const sessionOptions = {causalConsistency: false}; + const session = testDB.getMongo().startSession(sessionOptions); + const sessionDb = session.getDatabase(dbName); + + // Insert a doc within the transaction. + assert.commandWorked(sessionDb.runCommand({ + insert: collName, + documents: [{_id: "insert-1"}], + readConcern: {level: "snapshot"}, + txnNumber: NumberLong(txnNumber), + // Only the first write in a transaction has autocommit flag. + autocommit: false + })); + + // Cannot read with default read concern. + assert.eq(null, testDB.coll.findOne({_id: "insert-1"})); + // But read in the same transaction returns the doc. + let res = sessionDb.runCommand({ + find: collName, + filter: {_id: "insert-1"}, + readConcern: {level: "snapshot"}, + txnNumber: NumberLong(txnNumber) + }); + assert.commandWorked(res); + assert.docEq([{_id: "insert-1"}], res.cursor.firstBatch); + + // Insert a doc within a transaction. + assert.commandWorked(sessionDb.runCommand({ + insert: collName, + documents: [{_id: "insert-2"}], + readConcern: {level: "snapshot"}, + txnNumber: NumberLong(txnNumber), + })); + + // Cannot read with default read concern. + assert.eq(null, testDB.coll.findOne({_id: "insert-1"})); + // Cannot read with default read concern. + assert.eq(null, testDB.coll.findOne({_id: "insert-2"})); + + assert.commandWorked(sessionDb.runCommand({ + commitTransaction: 1, + txnNumber: NumberLong(txnNumber), + })); + + // Read with default read concern sees the committed transaction. + assert.eq({_id: "insert-1"}, testDB.coll.findOne({_id: "insert-1"})); + assert.eq({_id: "insert-2"}, testDB.coll.findOne({_id: "insert-2"})); + + // Oplog has the "applyOps" entry that includes two insert ops. + const insertOps = [ + {op: 'i', ns: coll.getFullName(), o: {_id: "insert-1"}}, + {op: 'i', ns: coll.getFullName(), o: {_id: "insert-2"}}, + ]; + let topOfOplog = oplog.find().sort({$natural: -1}).limit(1).next(); + assert.eq(topOfOplog.txnNumber, NumberLong(txnNumber)); + assert.docEq(topOfOplog.o.applyOps, insertOps.map(x => Object.assign(x, {ui: uuid}))); + + rst.stopSet(); +}()); diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index e9516c75f29..5779d89e7e9 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -134,6 +134,7 @@ struct CommandHelpers { arg == "writeConcern" || // arg == "lsid" || // arg == "txnNumber" || // + arg == "autocommit" || // false; // These comments tell clang-format to keep this line-oriented. } diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp index 5df5b0b3688..da055acd7ed 100644 --- a/src/mongo/db/commands/txn_cmds.cpp +++ b/src/mongo/db/commands/txn_cmds.cpp @@ -33,8 +33,10 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/commands.h" +#include "mongo/db/op_observer.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" +#include "mongo/db/session_catalog.h" namespace mongo { namespace { @@ -65,6 +67,21 @@ public: const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder& result) override { + auto session = OperationContextSession::get(opCtx); + uassert( + ErrorCodes::CommandFailed, "commitTransaction must be run within a session", session); + + // TODO SERVER-33501 Change this when commitTransaction is retryable. + uassert(ErrorCodes::CommandFailed, + "Transaction isn't in progress", + opCtx->getWriteUnitOfWork() && session->inMultiDocumentTransaction()); + + auto opObserver = opCtx->getServiceContext()->getOpObserver(); + invariant(opObserver); + opObserver->onTransactionCommit(opCtx); + opCtx->getWriteUnitOfWork()->commit(); + opCtx->setWriteUnitOfWork(nullptr); + return true; } diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index cef46afc15c..45fafaf06e8 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -143,7 +143,7 @@ ReplOperation OplogEntry::makeInsertOperation(const NamespaceString& nss, op.setOpType(OpTypeEnum::kInsert); op.setNamespace(nss); op.setUuid(uuid); - op.setObject(docToInsert); + op.setObject(docToInsert.getOwned()); return op; } @@ -155,8 +155,8 @@ ReplOperation OplogEntry::makeUpdateOperation(const NamespaceString nss, op.setOpType(OpTypeEnum::kUpdate); op.setNamespace(nss); op.setUuid(uuid); - op.setObject(update); - op.setObject2(criteria); + op.setObject(update.getOwned()); + op.setObject2(criteria.getOwned()); return op; } @@ -167,7 +167,7 @@ ReplOperation OplogEntry::makeDeleteOperation(const NamespaceString& nss, op.setOpType(OpTypeEnum::kDelete); op.setNamespace(nss); op.setUuid(uuid); - op.setObject(docToDelete); + op.setObject(docToDelete.getOwned()); return op; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a073fd70354..66e612f5bac 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1165,13 +1165,6 @@ OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const { Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx, const ReadConcernArgs& readConcern) { - // We should never wait for replication if we are holding any locks, because this can - // potentially block for long time while doing network activity. - if (opCtx->lockState()->isLocked()) { - return {ErrorCodes::IllegalOperation, - "Waiting for replication not allowed while holding a lock"}; - } - if (readConcern.getArgsAfterClusterTime() && readConcern.getLevel() != ReadConcernLevel::kMajorityReadConcern && readConcern.getLevel() != ReadConcernLevel::kLocalReadConcern && @@ -1218,6 +1211,13 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt return Status::OK(); } + // We should never wait for replication if we are holding any locks, because this can + // potentially block for long time while doing network activity. + if (opCtx->lockState()->isLocked()) { + return {ErrorCodes::IllegalOperation, + "Waiting for replication not allowed while holding a lock"}; + } + return waitUntilOpTimeForReadUntil(opCtx, readConcern, boost::none); } diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 38cf2cffda1..a2389e03e60 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -103,6 +103,7 @@ using logger::LogComponent; // which is not allowed. const StringMap<int> sessionCheckoutWhitelist = {{"aggregate", 1}, {"applyOps", 1}, + {"commitTransaction", 1}, {"count", 1}, {"delete", 1}, {"distinct", 1}, @@ -688,12 +689,19 @@ void execCommandDatabase(OperationContext* opCtx, if (retval) { if (opCtx->getWriteUnitOfWork()) { - if (!opCtx->hasStashedCursor()) { + // Snapshot readConcern is enabled and it must be used within a session. + auto session = sessionTxnState.get(opCtx); + invariant(session != nullptr, + str::stream() + << "Snapshot transaction must be run within a session. Command: " + << ServiceEntryPointCommon::getRedactedCopyForLogging(command, + request.body)); + if (opCtx->hasStashedCursor() || session->inMultiDocumentTransaction()) { + sessionTxnState.stashTransactionResources(); + } else { // If we are in an autocommit=true transaction and have no stashed cursor, // commit the transaction. opCtx->getWriteUnitOfWork()->commit(); - } else { - sessionTxnState.stashTransactionResources(); } } } else { diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 145b9bcaf3c..aeb58909af7 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -482,7 +482,7 @@ void Session::stashTransactionResources(OperationContext* opCtx) { return; } - invariant(opCtx->hasStashedCursor()); + invariant(opCtx->hasStashedCursor() || !_autocommit); if (*opCtx->getTxnNumber() != _activeTxnNumber) { // The session is checked out, so _activeTxnNumber cannot advance due to a user operation. @@ -548,7 +548,8 @@ void Session::unstashTransactionResources(OperationContext* opCtx) { opCtx->setWriteUnitOfWork(WriteUnitOfWork::createForSnapshotResume(opCtx)); } else { auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); - if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { + if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern || + _txnState == MultiDocumentTransactionState::kInProgress) { opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx)); _isSnapshotTxn = true; } |