summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2018-02-28 23:45:50 -0500
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2018-03-05 18:08:47 -0500
commitc7524de57dc3a9d829d8bce43e219c9e011094c5 (patch)
tree312818f1457c9682f1ef683375e25c767090f4f3 /src/mongo
parent3f94de87680764d74bfb219a07606d46c515cba6 (diff)
downloadmongo-c7524de57dc3a9d829d8bce43e219c9e011094c5.tar.gz
SERVER-33216 Implement commitTransaction command.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands.h1
-rw-r--r--src/mongo/db/commands/txn_cmds.cpp17
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp14
-rw-r--r--src/mongo/db/service_entry_point_common.cpp14
-rw-r--r--src/mongo/db/session.cpp5
6 files changed, 43 insertions, 16 deletions
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index e9516c75f29..5779d89e7e9 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -134,6 +134,7 @@ struct CommandHelpers {
arg == "writeConcern" || //
arg == "lsid" || //
arg == "txnNumber" || //
+ arg == "autocommit" || //
false; // These comments tell clang-format to keep this line-oriented.
}
diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp
index 5df5b0b3688..da055acd7ed 100644
--- a/src/mongo/db/commands/txn_cmds.cpp
+++ b/src/mongo/db/commands/txn_cmds.cpp
@@ -33,8 +33,10 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/commands.h"
+#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/session_catalog.h"
namespace mongo {
namespace {
@@ -65,6 +67,21 @@ public:
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) override {
+ auto session = OperationContextSession::get(opCtx);
+ uassert(
+ ErrorCodes::CommandFailed, "commitTransaction must be run within a session", session);
+
+ // TODO SERVER-33501 Change this when commitTransaction is retryable.
+ uassert(ErrorCodes::CommandFailed,
+ "Transaction isn't in progress",
+ opCtx->getWriteUnitOfWork() && session->inMultiDocumentTransaction());
+
+ auto opObserver = opCtx->getServiceContext()->getOpObserver();
+ invariant(opObserver);
+ opObserver->onTransactionCommit(opCtx);
+ opCtx->getWriteUnitOfWork()->commit();
+ opCtx->setWriteUnitOfWork(nullptr);
+
return true;
}
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index cef46afc15c..45fafaf06e8 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -143,7 +143,7 @@ ReplOperation OplogEntry::makeInsertOperation(const NamespaceString& nss,
op.setOpType(OpTypeEnum::kInsert);
op.setNamespace(nss);
op.setUuid(uuid);
- op.setObject(docToInsert);
+ op.setObject(docToInsert.getOwned());
return op;
}
@@ -155,8 +155,8 @@ ReplOperation OplogEntry::makeUpdateOperation(const NamespaceString nss,
op.setOpType(OpTypeEnum::kUpdate);
op.setNamespace(nss);
op.setUuid(uuid);
- op.setObject(update);
- op.setObject2(criteria);
+ op.setObject(update.getOwned());
+ op.setObject2(criteria.getOwned());
return op;
}
@@ -167,7 +167,7 @@ ReplOperation OplogEntry::makeDeleteOperation(const NamespaceString& nss,
op.setOpType(OpTypeEnum::kDelete);
op.setNamespace(nss);
op.setUuid(uuid);
- op.setObject(docToDelete);
+ op.setObject(docToDelete.getOwned());
return op;
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a073fd70354..66e612f5bac 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1165,13 +1165,6 @@ OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const {
Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx,
const ReadConcernArgs& readConcern) {
- // We should never wait for replication if we are holding any locks, because this can
- // potentially block for long time while doing network activity.
- if (opCtx->lockState()->isLocked()) {
- return {ErrorCodes::IllegalOperation,
- "Waiting for replication not allowed while holding a lock"};
- }
-
if (readConcern.getArgsAfterClusterTime() &&
readConcern.getLevel() != ReadConcernLevel::kMajorityReadConcern &&
readConcern.getLevel() != ReadConcernLevel::kLocalReadConcern &&
@@ -1218,6 +1211,13 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt
return Status::OK();
}
+ // We should never wait for replication if we are holding any locks, because this can
+ // potentially block for long time while doing network activity.
+ if (opCtx->lockState()->isLocked()) {
+ return {ErrorCodes::IllegalOperation,
+ "Waiting for replication not allowed while holding a lock"};
+ }
+
return waitUntilOpTimeForReadUntil(opCtx, readConcern, boost::none);
}
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 38cf2cffda1..a2389e03e60 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -103,6 +103,7 @@ using logger::LogComponent;
// which is not allowed.
const StringMap<int> sessionCheckoutWhitelist = {{"aggregate", 1},
{"applyOps", 1},
+ {"commitTransaction", 1},
{"count", 1},
{"delete", 1},
{"distinct", 1},
@@ -688,12 +689,19 @@ void execCommandDatabase(OperationContext* opCtx,
if (retval) {
if (opCtx->getWriteUnitOfWork()) {
- if (!opCtx->hasStashedCursor()) {
+ // Snapshot readConcern is enabled and it must be used within a session.
+ auto session = sessionTxnState.get(opCtx);
+ invariant(session != nullptr,
+ str::stream()
+ << "Snapshot transaction must be run within a session. Command: "
+ << ServiceEntryPointCommon::getRedactedCopyForLogging(command,
+ request.body));
+ if (opCtx->hasStashedCursor() || session->inMultiDocumentTransaction()) {
+ sessionTxnState.stashTransactionResources();
+ } else {
// If we are in an autocommit=true transaction and have no stashed cursor,
// commit the transaction.
opCtx->getWriteUnitOfWork()->commit();
- } else {
- sessionTxnState.stashTransactionResources();
}
}
} else {
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 145b9bcaf3c..aeb58909af7 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -482,7 +482,7 @@ void Session::stashTransactionResources(OperationContext* opCtx) {
return;
}
- invariant(opCtx->hasStashedCursor());
+ invariant(opCtx->hasStashedCursor() || !_autocommit);
if (*opCtx->getTxnNumber() != _activeTxnNumber) {
// The session is checked out, so _activeTxnNumber cannot advance due to a user operation.
@@ -548,7 +548,8 @@ void Session::unstashTransactionResources(OperationContext* opCtx) {
opCtx->setWriteUnitOfWork(WriteUnitOfWork::createForSnapshotResume(opCtx));
} else {
auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
+ if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern ||
+ _txnState == MultiDocumentTransactionState::kInProgress) {
opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx));
_isSnapshotTxn = true;
}