diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-02-28 23:45:50 -0500 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-03-05 18:08:47 -0500 |
commit | c7524de57dc3a9d829d8bce43e219c9e011094c5 (patch) | |
tree | 312818f1457c9682f1ef683375e25c767090f4f3 /src/mongo | |
parent | 3f94de87680764d74bfb219a07606d46c515cba6 (diff) | |
download | mongo-c7524de57dc3a9d829d8bce43e219c9e011094c5.tar.gz |
SERVER-33216 Implement commitTransaction command.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands.h | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/txn_cmds.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 5 |
6 files changed, 43 insertions, 16 deletions
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index e9516c75f29..5779d89e7e9 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -134,6 +134,7 @@ struct CommandHelpers { arg == "writeConcern" || // arg == "lsid" || // arg == "txnNumber" || // + arg == "autocommit" || // false; // These comments tell clang-format to keep this line-oriented. } diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp index 5df5b0b3688..da055acd7ed 100644 --- a/src/mongo/db/commands/txn_cmds.cpp +++ b/src/mongo/db/commands/txn_cmds.cpp @@ -33,8 +33,10 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/commands.h" +#include "mongo/db/op_observer.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" +#include "mongo/db/session_catalog.h" namespace mongo { namespace { @@ -65,6 +67,21 @@ public: const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder& result) override { + auto session = OperationContextSession::get(opCtx); + uassert( + ErrorCodes::CommandFailed, "commitTransaction must be run within a session", session); + + // TODO SERVER-33501 Change this when commitTransaction is retryable. + uassert(ErrorCodes::CommandFailed, + "Transaction isn't in progress", + opCtx->getWriteUnitOfWork() && session->inMultiDocumentTransaction()); + + auto opObserver = opCtx->getServiceContext()->getOpObserver(); + invariant(opObserver); + opObserver->onTransactionCommit(opCtx); + opCtx->getWriteUnitOfWork()->commit(); + opCtx->setWriteUnitOfWork(nullptr); + return true; } diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index cef46afc15c..45fafaf06e8 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -143,7 +143,7 @@ ReplOperation OplogEntry::makeInsertOperation(const NamespaceString& nss, op.setOpType(OpTypeEnum::kInsert); op.setNamespace(nss); op.setUuid(uuid); - op.setObject(docToInsert); + op.setObject(docToInsert.getOwned()); return op; } @@ -155,8 +155,8 @@ ReplOperation OplogEntry::makeUpdateOperation(const NamespaceString nss, op.setOpType(OpTypeEnum::kUpdate); op.setNamespace(nss); op.setUuid(uuid); - op.setObject(update); - op.setObject2(criteria); + op.setObject(update.getOwned()); + op.setObject2(criteria.getOwned()); return op; } @@ -167,7 +167,7 @@ ReplOperation OplogEntry::makeDeleteOperation(const NamespaceString& nss, op.setOpType(OpTypeEnum::kDelete); op.setNamespace(nss); op.setUuid(uuid); - op.setObject(docToDelete); + op.setObject(docToDelete.getOwned()); return op; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a073fd70354..66e612f5bac 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1165,13 +1165,6 @@ OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const { Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx, const ReadConcernArgs& readConcern) { - // We should never wait for replication if we are holding any locks, because this can - // potentially block for long time while doing network activity. - if (opCtx->lockState()->isLocked()) { - return {ErrorCodes::IllegalOperation, - "Waiting for replication not allowed while holding a lock"}; - } - if (readConcern.getArgsAfterClusterTime() && readConcern.getLevel() != ReadConcernLevel::kMajorityReadConcern && readConcern.getLevel() != ReadConcernLevel::kLocalReadConcern && @@ -1218,6 +1211,13 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt return Status::OK(); } + // We should never wait for replication if we are holding any locks, because this can + // potentially block for long time while doing network activity. + if (opCtx->lockState()->isLocked()) { + return {ErrorCodes::IllegalOperation, + "Waiting for replication not allowed while holding a lock"}; + } + return waitUntilOpTimeForReadUntil(opCtx, readConcern, boost::none); } diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 38cf2cffda1..a2389e03e60 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -103,6 +103,7 @@ using logger::LogComponent; // which is not allowed. const StringMap<int> sessionCheckoutWhitelist = {{"aggregate", 1}, {"applyOps", 1}, + {"commitTransaction", 1}, {"count", 1}, {"delete", 1}, {"distinct", 1}, @@ -688,12 +689,19 @@ void execCommandDatabase(OperationContext* opCtx, if (retval) { if (opCtx->getWriteUnitOfWork()) { - if (!opCtx->hasStashedCursor()) { + // Snapshot readConcern is enabled and it must be used within a session. + auto session = sessionTxnState.get(opCtx); + invariant(session != nullptr, + str::stream() + << "Snapshot transaction must be run within a session. Command: " + << ServiceEntryPointCommon::getRedactedCopyForLogging(command, + request.body)); + if (opCtx->hasStashedCursor() || session->inMultiDocumentTransaction()) { + sessionTxnState.stashTransactionResources(); + } else { // If we are in an autocommit=true transaction and have no stashed cursor, // commit the transaction. opCtx->getWriteUnitOfWork()->commit(); - } else { - sessionTxnState.stashTransactionResources(); } } } else { diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 145b9bcaf3c..aeb58909af7 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -482,7 +482,7 @@ void Session::stashTransactionResources(OperationContext* opCtx) { return; } - invariant(opCtx->hasStashedCursor()); + invariant(opCtx->hasStashedCursor() || !_autocommit); if (*opCtx->getTxnNumber() != _activeTxnNumber) { // The session is checked out, so _activeTxnNumber cannot advance due to a user operation. @@ -548,7 +548,8 @@ void Session::unstashTransactionResources(OperationContext* opCtx) { opCtx->setWriteUnitOfWork(WriteUnitOfWork::createForSnapshotResume(opCtx)); } else { auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); - if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { + if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern || + _txnState == MultiDocumentTransactionState::kInProgress) { opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx)); _isSnapshotTxn = true; } |