summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/do_txn.cpp
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2018-01-08 17:06:56 -0500
committerMatthew Russotto <matthew.russotto@10gen.com>2018-01-08 17:07:32 -0500
commit8b766fdc6c896ba0a4db30d7b9d2f050b464db22 (patch)
tree107cf6c037074c53bdddb61423c4c6cb352b3003 /src/mongo/db/repl/do_txn.cpp
parent27ccd4a3bf1150a1bf4356de8a91e4dff64e0762 (diff)
downloadmongo-8b766fdc6c896ba0a4db30d7b9d2f050b464db22.tar.gz
SERVER-32162 Remove non-atomic (push-based replication) support from doTxn.
Diffstat (limited to 'src/mongo/db/repl/do_txn.cpp')
-rw-r--r--src/mongo/db/repl/do_txn.cpp250
1 files changed, 60 insertions, 190 deletions
diff --git a/src/mongo/db/repl/do_txn.cpp b/src/mongo/db/repl/do_txn.cpp
index 1bd9d13d8a1..44de9e8590c 100644
--- a/src/mongo/db/repl/do_txn.cpp
+++ b/src/mongo/db/repl/do_txn.cpp
@@ -56,7 +56,6 @@
namespace mongo {
constexpr StringData DoTxn::kPreconditionFieldName;
-constexpr StringData DoTxn::kOplogApplicationModeFieldName;
namespace {
@@ -84,7 +83,6 @@ bool _areOpsCrudOnly(const BSONObj& doTxnCmd) {
// Only consider CRUD operations.
switch (*opType) {
case 'd':
- case 'n':
case 'u':
break;
case 'i':
@@ -102,7 +100,6 @@ bool _areOpsCrudOnly(const BSONObj& doTxnCmd) {
Status _doTxn(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& doTxnCmd,
- repl::OplogApplication::Mode oplogApplicationMode,
BSONObjBuilder* result,
int* numApplied,
BSONArrayBuilder* opsBuilder) {
@@ -113,168 +110,80 @@ Status _doTxn(OperationContext* opCtx,
BSONObjIterator i(ops);
BSONArrayBuilder ab;
- const bool alwaysUpsert =
- doTxnCmd.hasField("alwaysUpsert") ? doTxnCmd["alwaysUpsert"].trueValue() : true;
- const bool haveWrappingWUOW = opCtx->lockState()->inAWriteUnitOfWork();
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
// Apply each op in the given 'doTxn' command object.
while (i.more()) {
BSONElement e = i.next();
const BSONObj& opObj = e.Obj();
- // Ignore 'n' operations.
- const char* opType = opObj["op"].valuestrsafe();
- if (*opType == 'n')
- continue;
-
const NamespaceString nss(opObj["ns"].String());
// Need to check this here, or OldClientContext may fail an invariant.
- if (*opType != 'c' && !nss.isValid())
+ if (!nss.isValid())
return {ErrorCodes::InvalidNamespace, "invalid ns: " + nss.ns()};
Status status(ErrorCodes::InternalError, "");
- if (haveWrappingWUOW) {
- invariant(opCtx->lockState()->isW());
- invariant(*opType != 'c');
-
- auto db = dbHolder().get(opCtx, nss.ns());
- if (!db) {
- // Retry in non-atomic mode, since MMAP cannot implicitly create a new database
- // within an active WriteUnitOfWork.
- uasserted(ErrorCodes::AtomicityFailure,
- "cannot create a database in atomic doTxn mode; will retry without "
- "atomicity");
- }
-
- // When processing an update on a non-existent collection, applyOperation_inlock()
- // returns UpdateOperationFailed on updates and allows the collection to be
- // implicitly created on upserts. We detect both cases here and fail early with
- // NamespaceNotFound.
- // Additionally for inserts, we fail early on non-existent collections.
- auto collection = db->getCollection(opCtx, nss);
- if (!collection && !nss.isSystemDotIndexes() && (*opType == 'i' || *opType == 'u')) {
- uasserted(
- ErrorCodes::AtomicityFailure,
- str::stream()
- << "cannot apply insert or update operation on a non-existent namespace "
- << nss.ns()
- << " in atomic doTxn mode: "
- << redact(opObj));
- }
+ invariant(opCtx->lockState()->isW());
- // Cannot specify timestamp values in an atomic doTxn.
- if (opObj.hasField("ts")) {
- uasserted(ErrorCodes::AtomicityFailure,
- "cannot apply an op with a timestamp in atomic doTxn mode; "
- "will retry without atomicity");
- }
+ auto db = dbHolder().get(opCtx, nss.ns());
+ if (!db) {
+ uasserted(ErrorCodes::NamespaceNotFound,
+ str::stream() << "cannot apply insert, delete, or update operation on a "
+ "non-existent namespace "
+ << nss.ns()
+ << ": "
+ << mongo::redact(opObj));
+ }
- OldClientContext ctx(opCtx, nss.ns());
-
- status = repl::applyOperation_inlock(
- opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode);
- if (!status.isOK())
- return status;
-
- // Append completed op, including UUID if available, to 'opsBuilder'.
- if (opsBuilder) {
- if (opObj.hasField("ui") || nss.isSystemDotIndexes() ||
- !(collection && collection->uuid())) {
- // No changes needed to operation document.
- opsBuilder->append(opObj);
- } else {
- // Operation document has no "ui" field and collection has a UUID.
- auto uuid = collection->uuid();
- BSONObjBuilder opBuilder;
- opBuilder.appendElements(opObj);
- uuid->appendToBuilder(&opBuilder, "ui");
- opsBuilder->append(opBuilder.obj());
- }
- }
- } else {
- try {
- status = writeConflictRetry(
- opCtx,
- "doTxn",
- nss.ns(),
- [opCtx, nss, opObj, opType, alwaysUpsert, oplogApplicationMode] {
- if (*opType == 'c') {
- invariant(opCtx->lockState()->isW());
- return repl::applyCommand_inlock(opCtx, opObj, oplogApplicationMode);
- }
+ // When processing an update on a non-existent collection, applyOperation_inlock()
+ // returns UpdateOperationFailed on updates and allows the collection to be
+ // implicitly created on upserts. We detect both cases here and fail early with
+ // NamespaceNotFound.
+ // Additionally for inserts, we fail early on non-existent collections.
+ auto collection = db->getCollection(opCtx, nss);
+ if (!collection && db->getViewCatalog()->lookup(opCtx, nss.ns())) {
+ uasserted(ErrorCodes::CommandNotSupportedOnView,
+ str::stream() << "doTxn not supported on a view: " << redact(opObj));
+ }
+ if (!collection) {
+ uasserted(ErrorCodes::NamespaceNotFound,
+ str::stream() << "cannot apply operation on a non-existent namespace "
+ << nss.ns()
+ << " with doTxn: "
+ << redact(opObj));
+ }
- AutoGetCollection autoColl(opCtx, nss, MODE_IX);
- if (!autoColl.getCollection() && !nss.isSystemDotIndexes()) {
- // For idempotency reasons, return success on delete operations.
- if (*opType == 'd') {
- return Status::OK();
- }
- uasserted(ErrorCodes::NamespaceNotFound,
- str::stream()
- << "cannot apply insert or update operation on a "
- "non-existent namespace "
- << nss.ns()
- << ": "
- << mongo::redact(opObj));
- }
+ // Cannot specify timestamp values in an atomic doTxn.
+ if (opObj.hasField("ts")) {
+ uasserted(ErrorCodes::AtomicityFailure,
+ "cannot apply an op with a timestamp with doTxn; ");
+ }
- OldClientContext ctx(opCtx, nss.ns());
+ OldClientContext ctx(opCtx, nss.ns());
- if (!nss.isSystemDotIndexes()) {
- return repl::applyOperation_inlock(
- opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode);
- }
+ // Setting alwaysUpsert to true makes sense only during oplog replay, and doTxn commands
+ // should not be executed during oplog replay.
+ const bool alwaysUpsert = false;
+ status = repl::applyOperation_inlock(
+ opCtx, ctx.db(), opObj, alwaysUpsert, repl::OplogApplication::Mode::kApplyOpsCmd);
+ if (!status.isOK())
+ return status;
- auto fieldO = opObj["o"];
- BSONObj indexSpec;
- NamespaceString indexNss;
- std::tie(indexSpec, indexNss) =
- repl::prepForApplyOpsIndexInsert(fieldO, opObj, nss);
- if (!indexSpec["collation"]) {
- // If the index spec does not include a collation, explicitly specify
- // the simple collation, so the index does not inherit the collection
- // default collation.
- auto indexVersion = indexSpec["v"];
- // The index version is populated by prepForApplyOpsIndexInsert().
- invariant(indexVersion);
- if (indexVersion.isNumber() &&
- (indexVersion.numberInt() >=
- static_cast<int>(IndexDescriptor::IndexVersion::kV2))) {
- BSONObjBuilder bob;
- bob.append("collation", CollationSpec::kSimpleSpec);
- bob.appendElements(indexSpec);
- indexSpec = bob.obj();
- }
- }
- BSONObjBuilder command;
- command.append("createIndexes", indexNss.coll());
- {
- BSONArrayBuilder indexes(command.subarrayStart("indexes"));
- indexes.append(indexSpec);
- indexes.doneFast();
- }
- const BSONObj commandObj = command.done();
-
- DBDirectClient client(opCtx);
- BSONObj infoObj;
- client.runCommand(nss.db().toString(), commandObj, infoObj);
-
- // Uassert to stop doTxn only when building indexes, but not for CRUD
- // ops.
- uassertStatusOK(getStatusFromCommandResult(infoObj));
-
- return Status::OK();
- });
- } catch (const DBException& ex) {
- ab.append(false);
- result->append("applied", ++(*numApplied));
- result->append("code", ex.code());
- result->append("codeName", ErrorCodes::errorString(ex.code()));
- result->append("errmsg", ex.what());
- result->append("results", ab.arr());
- return Status(ErrorCodes::UnknownError, ex.what());
+ // Append completed op, including UUID if available, to 'opsBuilder'.
+ if (opsBuilder) {
+ if (opObj.hasField("ui") || nss.isSystemDotIndexes() ||
+ !(collection && collection->uuid())) {
+ // No changes needed to operation document.
+ opsBuilder->append(opObj);
+ } else {
+ // Operation document has no "ui" field and collection has a UUID.
+ auto uuid = collection->uuid();
+ BSONObjBuilder opBuilder;
+ opBuilder.appendElements(opObj);
+ uuid->appendToBuilder(&opBuilder, "ui");
+ opsBuilder->append(opBuilder.obj());
}
}
@@ -364,34 +273,12 @@ Status _checkPrecondition(OperationContext* opCtx,
Status doTxn(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& doTxnCmd,
- repl::OplogApplication::Mode oplogApplicationMode,
BSONObjBuilder* result) {
- bool allowAtomic = false;
- uassertStatusOK(
- bsonExtractBooleanFieldWithDefault(doTxnCmd, "allowAtomic", true, &allowAtomic));
- auto areOpsCrudOnly = _areOpsCrudOnly(doTxnCmd);
- auto isAtomic = allowAtomic && areOpsCrudOnly;
+ uassert(
+ ErrorCodes::InvalidOptions, "doTxn supports only CRUD opts.", _areOpsCrudOnly(doTxnCmd));
auto hasPrecondition = _hasPrecondition(doTxnCmd);
- if (hasPrecondition) {
- uassert(ErrorCodes::InvalidOptions,
- "Cannot use preCondition with {allowAtomic: false}.",
- allowAtomic);
- uassert(ErrorCodes::InvalidOptions,
- "Cannot use preCondition when operations include commands.",
- areOpsCrudOnly);
- }
-
- boost::optional<Lock::GlobalWrite> globalWriteLock;
- boost::optional<Lock::DBLock> dbWriteLock;
-
- // There's only one case where we are allowed to take the database lock instead of the global
- // lock - no preconditions; only CRUD ops; and non-atomic mode.
- if (!hasPrecondition && areOpsCrudOnly && !allowAtomic) {
- dbWriteLock.emplace(opCtx, dbName, MODE_IX);
- } else {
- globalWriteLock.emplace(opCtx);
- }
+ Lock::GlobalWrite globalWriteLock(opCtx);
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
bool userInitiatedWritesAndNotPrimary =
@@ -402,7 +289,6 @@ Status doTxn(OperationContext* opCtx,
str::stream() << "Not primary while applying ops to database " << dbName);
if (hasPrecondition) {
- invariant(isAtomic);
auto status = _checkPrecondition(opCtx, doTxnCmd, result);
if (!status.isOK()) {
return status;
@@ -410,12 +296,6 @@ Status doTxn(OperationContext* opCtx,
}
int numApplied = 0;
- if (!isAtomic) {
- return _doTxn(opCtx, dbName, doTxnCmd, oplogApplicationMode, result, &numApplied, nullptr);
- }
-
- // Perform write ops atomically
- invariant(globalWriteLock);
try {
writeConflictRetry(opCtx, "doTxn", dbName, [&] {
@@ -430,13 +310,8 @@ Status doTxn(OperationContext* opCtx,
{
// Suppress replication for atomic operations until end of doTxn.
repl::UnreplicatedWritesBlock uwb(opCtx);
- uassertStatusOK(_doTxn(opCtx,
- dbName,
- doTxnCmd,
- oplogApplicationMode,
- &intermediateResult,
- &numApplied,
- opsBuilder.get()));
+ uassertStatusOK(_doTxn(
+ opCtx, dbName, doTxnCmd, &intermediateResult, &numApplied, opsBuilder.get()));
}
// Generate oplog entry for all atomic ops collectively.
if (opCtx->writesAreReplicated()) {
@@ -475,11 +350,6 @@ Status doTxn(OperationContext* opCtx,
result->appendElements(intermediateResult.obj());
});
} catch (const DBException& ex) {
- if (ex.code() == ErrorCodes::AtomicityFailure) {
- // Retry in non-atomic mode.
- return _doTxn(
- opCtx, dbName, doTxnCmd, oplogApplicationMode, result, &numApplied, nullptr);
- }
BSONArrayBuilder ab;
++numApplied;
for (int j = 0; j < numApplied; j++)