summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2018-02-28 23:45:50 -0500
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2018-03-05 18:08:47 -0500
commitc7524de57dc3a9d829d8bce43e219c9e011094c5 (patch)
tree312818f1457c9682f1ef683375e25c767090f4f3
parent3f94de87680764d74bfb219a07606d46c515cba6 (diff)
downloadmongo-c7524de57dc3a9d829d8bce43e219c9e011094c5.tar.gz
SERVER-33216 Implement commitTransaction command.
-rw-r--r--jstests/noPassthrough/multi_statement_transaction.js84
-rw-r--r--src/mongo/db/commands.h1
-rw-r--r--src/mongo/db/commands/txn_cmds.cpp17
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp14
-rw-r--r--src/mongo/db/service_entry_point_common.cpp14
-rw-r--r--src/mongo/db/session.cpp5
7 files changed, 127 insertions, 16 deletions
diff --git a/jstests/noPassthrough/multi_statement_transaction.js b/jstests/noPassthrough/multi_statement_transaction.js
new file mode 100644
index 00000000000..11e543c2bd4
--- /dev/null
+++ b/jstests/noPassthrough/multi_statement_transaction.js
@@ -0,0 +1,84 @@
+// Test basic transaction commits with two inserts.
+// @tags: [requires_replication]
+(function() {
+ "use strict";
+ load('jstests/libs/uuid_util.js');
+
+ const dbName = "test";
+ const collName = "coll";
+
+ const rst = new ReplSetTest({nodes: 1});
+ rst.startSet();
+ rst.initiate();
+ const testDB = rst.getPrimary().getDB(dbName);
+ const coll = testDB.coll;
+
+ if (!testDB.serverStatus().storageEngine.supportsSnapshotReadConcern) {
+ rst.stopSet();
+ return;
+ }
+
+ testDB.runCommand({create: coll.getName(), writeConcern: {w: "majority"}});
+ const uuid = getUUIDFromListCollections(testDB, coll.getName());
+ const oplog = testDB.getSiblingDB('local').oplog.rs;
+ let txnNumber = 0;
+
+ const sessionOptions = {causalConsistency: false};
+ const session = testDB.getMongo().startSession(sessionOptions);
+ const sessionDb = session.getDatabase(dbName);
+
+ // Insert a doc within the transaction.
+ assert.commandWorked(sessionDb.runCommand({
+ insert: collName,
+ documents: [{_id: "insert-1"}],
+ readConcern: {level: "snapshot"},
+ txnNumber: NumberLong(txnNumber),
+ // Only the first write in a transaction has autocommit flag.
+ autocommit: false
+ }));
+
+ // Cannot read with default read concern.
+ assert.eq(null, testDB.coll.findOne({_id: "insert-1"}));
+ // But read in the same transaction returns the doc.
+ let res = sessionDb.runCommand({
+ find: collName,
+ filter: {_id: "insert-1"},
+ readConcern: {level: "snapshot"},
+ txnNumber: NumberLong(txnNumber)
+ });
+ assert.commandWorked(res);
+ assert.docEq([{_id: "insert-1"}], res.cursor.firstBatch);
+
+ // Insert a doc within a transaction.
+ assert.commandWorked(sessionDb.runCommand({
+ insert: collName,
+ documents: [{_id: "insert-2"}],
+ readConcern: {level: "snapshot"},
+ txnNumber: NumberLong(txnNumber),
+ }));
+
+ // Cannot read with default read concern.
+ assert.eq(null, testDB.coll.findOne({_id: "insert-1"}));
+ // Cannot read with default read concern.
+ assert.eq(null, testDB.coll.findOne({_id: "insert-2"}));
+
+ assert.commandWorked(sessionDb.runCommand({
+ commitTransaction: 1,
+ txnNumber: NumberLong(txnNumber),
+ }));
+
+ // Read with default read concern sees the committed transaction.
+ assert.eq({_id: "insert-1"}, testDB.coll.findOne({_id: "insert-1"}));
+ assert.eq({_id: "insert-2"}, testDB.coll.findOne({_id: "insert-2"}));
+
+ // Oplog has the "applyOps" entry that includes two insert ops.
+ const insertOps = [
+ {op: 'i', ns: coll.getFullName(), o: {_id: "insert-1"}},
+ {op: 'i', ns: coll.getFullName(), o: {_id: "insert-2"}},
+ ];
+ let topOfOplog = oplog.find().sort({$natural: -1}).limit(1).next();
+ assert.eq(topOfOplog.txnNumber, NumberLong(txnNumber));
+ assert.docEq(topOfOplog.o.applyOps, insertOps.map(x => Object.assign(x, {ui: uuid})));
+
+ rst.stopSet();
+}());
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index e9516c75f29..5779d89e7e9 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -134,6 +134,7 @@ struct CommandHelpers {
arg == "writeConcern" || //
arg == "lsid" || //
arg == "txnNumber" || //
+ arg == "autocommit" || //
false; // These comments tell clang-format to keep this line-oriented.
}
diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp
index 5df5b0b3688..da055acd7ed 100644
--- a/src/mongo/db/commands/txn_cmds.cpp
+++ b/src/mongo/db/commands/txn_cmds.cpp
@@ -33,8 +33,10 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/commands.h"
+#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/session_catalog.h"
namespace mongo {
namespace {
@@ -65,6 +67,21 @@ public:
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) override {
+ auto session = OperationContextSession::get(opCtx);
+ uassert(
+ ErrorCodes::CommandFailed, "commitTransaction must be run within a session", session);
+
+ // TODO SERVER-33501 Change this when commitTransaction is retryable.
+ uassert(ErrorCodes::CommandFailed,
+ "Transaction isn't in progress",
+ opCtx->getWriteUnitOfWork() && session->inMultiDocumentTransaction());
+
+ auto opObserver = opCtx->getServiceContext()->getOpObserver();
+ invariant(opObserver);
+ opObserver->onTransactionCommit(opCtx);
+ opCtx->getWriteUnitOfWork()->commit();
+ opCtx->setWriteUnitOfWork(nullptr);
+
return true;
}
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index cef46afc15c..45fafaf06e8 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -143,7 +143,7 @@ ReplOperation OplogEntry::makeInsertOperation(const NamespaceString& nss,
op.setOpType(OpTypeEnum::kInsert);
op.setNamespace(nss);
op.setUuid(uuid);
- op.setObject(docToInsert);
+ op.setObject(docToInsert.getOwned());
return op;
}
@@ -155,8 +155,8 @@ ReplOperation OplogEntry::makeUpdateOperation(const NamespaceString nss,
op.setOpType(OpTypeEnum::kUpdate);
op.setNamespace(nss);
op.setUuid(uuid);
- op.setObject(update);
- op.setObject2(criteria);
+ op.setObject(update.getOwned());
+ op.setObject2(criteria.getOwned());
return op;
}
@@ -167,7 +167,7 @@ ReplOperation OplogEntry::makeDeleteOperation(const NamespaceString& nss,
op.setOpType(OpTypeEnum::kDelete);
op.setNamespace(nss);
op.setUuid(uuid);
- op.setObject(docToDelete);
+ op.setObject(docToDelete.getOwned());
return op;
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a073fd70354..66e612f5bac 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1165,13 +1165,6 @@ OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const {
Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx,
const ReadConcernArgs& readConcern) {
- // We should never wait for replication if we are holding any locks, because this can
- // potentially block for long time while doing network activity.
- if (opCtx->lockState()->isLocked()) {
- return {ErrorCodes::IllegalOperation,
- "Waiting for replication not allowed while holding a lock"};
- }
-
if (readConcern.getArgsAfterClusterTime() &&
readConcern.getLevel() != ReadConcernLevel::kMajorityReadConcern &&
readConcern.getLevel() != ReadConcernLevel::kLocalReadConcern &&
@@ -1218,6 +1211,13 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt
return Status::OK();
}
+ // We should never wait for replication if we are holding any locks, because this can
+ // potentially block for long time while doing network activity.
+ if (opCtx->lockState()->isLocked()) {
+ return {ErrorCodes::IllegalOperation,
+ "Waiting for replication not allowed while holding a lock"};
+ }
+
return waitUntilOpTimeForReadUntil(opCtx, readConcern, boost::none);
}
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 38cf2cffda1..a2389e03e60 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -103,6 +103,7 @@ using logger::LogComponent;
// which is not allowed.
const StringMap<int> sessionCheckoutWhitelist = {{"aggregate", 1},
{"applyOps", 1},
+ {"commitTransaction", 1},
{"count", 1},
{"delete", 1},
{"distinct", 1},
@@ -688,12 +689,19 @@ void execCommandDatabase(OperationContext* opCtx,
if (retval) {
if (opCtx->getWriteUnitOfWork()) {
- if (!opCtx->hasStashedCursor()) {
+ // Snapshot readConcern is enabled and it must be used within a session.
+ auto session = sessionTxnState.get(opCtx);
+ invariant(session != nullptr,
+ str::stream()
+ << "Snapshot transaction must be run within a session. Command: "
+ << ServiceEntryPointCommon::getRedactedCopyForLogging(command,
+ request.body));
+ if (opCtx->hasStashedCursor() || session->inMultiDocumentTransaction()) {
+ sessionTxnState.stashTransactionResources();
+ } else {
// If we are in an autocommit=true transaction and have no stashed cursor,
// commit the transaction.
opCtx->getWriteUnitOfWork()->commit();
- } else {
- sessionTxnState.stashTransactionResources();
}
}
} else {
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 145b9bcaf3c..aeb58909af7 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -482,7 +482,7 @@ void Session::stashTransactionResources(OperationContext* opCtx) {
return;
}
- invariant(opCtx->hasStashedCursor());
+ invariant(opCtx->hasStashedCursor() || !_autocommit);
if (*opCtx->getTxnNumber() != _activeTxnNumber) {
// The session is checked out, so _activeTxnNumber cannot advance due to a user operation.
@@ -548,7 +548,8 @@ void Session::unstashTransactionResources(OperationContext* opCtx) {
opCtx->setWriteUnitOfWork(WriteUnitOfWork::createForSnapshotResume(opCtx));
} else {
auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
+ if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern ||
+ _txnState == MultiDocumentTransactionState::kInProgress) {
opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx));
_isSnapshotTxn = true;
}