summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplog.cpp')
-rw-r--r--src/mongo/db/repl/oplog.cpp326
1 files changed, 214 insertions, 112 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index b6d7249e6dc..9baa4b211cc 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/catalog/drop_database.h"
#include "mongo/db/catalog/drop_indexes.h"
#include "mongo/db/catalog/rename_collection.h"
+#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
@@ -567,8 +568,72 @@ NamespaceString parseNs(const string& ns, const BSONObj& cmdObj) {
return NamespaceString(NamespaceString(ns).db().toString(), coll);
}
-using OpApplyFn = stdx::function<Status(
- OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime)>;
+NamespaceString parseUUID(OperationContext* opCtx, const BSONElement& ui) {
+ auto statusWithUUID = UUID::parse(ui);
+ uassertStatusOK(statusWithUUID);
+ auto uuid = statusWithUUID.getValue();
+ auto& catalog = UUIDCatalog::get(opCtx);
+ auto nss = catalog.lookupNSSByUUID(uuid);
+ uassert(
+ ErrorCodes::NamespaceNotFound, "No namespace with UUID " + uuid.toString(), !nss.isEmpty());
+ return nss;
+}
+
+NamespaceString parseUUIDorNs(OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd) {
+ return ui.ok() ? parseUUID(opCtx, ui) : parseNs(ns, cmd);
+}
+
+void createIndexForApplyOps(OperationContext* opCtx,
+ const BSONObj& indexSpec,
+ const NamespaceString& indexNss,
+ IncrementOpsAppliedStatsFn incrementOpsAppliedStats) {
+ // Check if collection exists.
+ Database* db = dbHolder().get(opCtx, indexNss.ns());
+ auto indexCollection = db ? db->getCollection(opCtx, indexNss) : nullptr;
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Failed to create index due to missing collection: " << indexNss.ns(),
+ indexCollection);
+
+ OpCounters* opCounters = opCtx->writesAreReplicated() ? &globalOpCounters : &replOpCounters;
+ opCounters->gotInsert();
+
+ bool relaxIndexConstraints =
+ ReplicationCoordinator::get(opCtx)->shouldRelaxIndexConstraints(opCtx, indexNss);
+ if (indexSpec["background"].trueValue()) {
+ Lock::TempRelease release(opCtx->lockState());
+ if (opCtx->lockState()->isLocked()) {
+ // If TempRelease fails, background index build will deadlock.
+ LOG(3) << "apply op: building background index " << indexSpec
+ << " in the foreground because temp release failed";
+ IndexBuilder builder(indexSpec, relaxIndexConstraints);
+ Status status = builder.buildInForeground(opCtx, db);
+ uassertStatusOK(status);
+ } else {
+ IndexBuilder* builder = new IndexBuilder(indexSpec, relaxIndexConstraints);
+ // This spawns a new thread and returns immediately.
+ builder->go();
+ // Wait for thread to start and register itself
+ IndexBuilder::waitForBgIndexStarting();
+ }
+ opCtx->recoveryUnit()->abandonSnapshot();
+ } else {
+ IndexBuilder builder(indexSpec, relaxIndexConstraints);
+ Status status = builder.buildInForeground(opCtx, db);
+ uassertStatusOK(status);
+ }
+ if (incrementOpsAppliedStats) {
+ incrementOpsAppliedStats();
+ }
+}
+
+using OpApplyFn = stdx::function<Status(OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime)>;
struct ApplyOpMetadata {
OpApplyFn applyFunc;
@@ -586,13 +651,17 @@ struct ApplyOpMetadata {
std::map<std::string, ApplyOpMetadata> opsMap = {
{"create",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
const NamespaceString nss(parseNs(ns, cmd));
if (auto idIndexElem = cmd["idIndex"]) {
// Remove "idIndex" field from command.
auto cmdWithoutIdIndex = cmd.removeField("idIndex");
- return createCollection(
- opCtx, nss.db().toString(), cmdWithoutIdIndex, idIndexElem.Obj());
+ return createCollectionForApplyOps(
+ opCtx, nss.db().toString(), ui, cmdWithoutIdIndex, idIndexElem.Obj());
}
// No _id index spec was provided, so we should build a v:1 _id index.
@@ -602,25 +671,59 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
idIndexSpecBuilder.append(IndexDescriptor::kIndexNameFieldName, "_id_");
idIndexSpecBuilder.append(IndexDescriptor::kNamespaceFieldName, nss.ns());
idIndexSpecBuilder.append(IndexDescriptor::kKeyPatternFieldName, BSON("_id" << 1));
- return createCollection(opCtx, nss.db().toString(), cmd, idIndexSpecBuilder.done());
+ return createCollectionForApplyOps(
+ opCtx, nss.db().toString(), ui, cmd, idIndexSpecBuilder.done());
},
{ErrorCodes::NamespaceExists}}},
+ {"createIndexes",
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
+ const NamespaceString nss(parseUUID(opCtx, ui));
+ BSONElement first = cmd.firstElement();
+ invariant(first.fieldNameStringData() == "createIndexes");
+ uassert(ErrorCodes::InvalidNamespace,
+ "createIndexes value must be a string",
+ first.type() == mongo::String);
+ BSONObj indexSpec = cmd.removeField("createIndexes");
+ // The UUID determines the collection to build the index on, so create new 'ns' field.
+ BSONObj nsObj = BSON("ns" << nss.ns());
+ indexSpec = indexSpec.addField(nsObj.firstElement());
+
+ createIndexForApplyOps(opCtx, indexSpec, nss, {});
+ return Status::OK();
+ },
+ {ErrorCodes::IndexAlreadyExists}}},
{"collMod",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return collMod(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
+ return collMod(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::IndexNotFound, ErrorCodes::NamespaceNotFound}}},
{"dropDatabase",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
return dropDatabase(opCtx, NamespaceString(ns).db().toString());
},
{ErrorCodes::NamespaceNotFound}}},
{"drop",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return dropCollection(opCtx,
- parseNs(ns, cmd),
+ parseUUIDorNs(opCtx, ns, ui, cmd),
resultWeDontCareAbout,
opTime,
DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
@@ -628,59 +731,79 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
{ErrorCodes::NamespaceNotFound}}},
// deleteIndex(es) is deprecated but still works as of April 10, 2015
{"deleteIndex",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
+ return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"deleteIndexes",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
+ return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"dropIndex",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
+ return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"dropIndexes",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
+ return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"renameCollection",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
- const auto sourceNsElt = cmd.firstElement();
- const auto targetNsElt = cmd["to"];
- uassert(ErrorCodes::TypeMismatch,
- "'renameCollection' must be of type String",
- sourceNsElt.type() == BSONType::String);
- uassert(ErrorCodes::TypeMismatch,
- "'to' must be of type String",
- targetNsElt.type() == BSONType::String);
- return renameCollection(opCtx,
- NamespaceString(sourceNsElt.valueStringData()),
- NamespaceString(targetNsElt.valueStringData()),
- cmd["dropTarget"].trueValue(),
- cmd["stayTemp"].trueValue());
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
+ return renameCollectionForApplyOps(opCtx, nsToDatabase(ns), ui, cmd);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::NamespaceExists}}},
{"applyOps",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return applyOps(opCtx, nsToDatabase(ns), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::UnknownError}}},
{"convertToCapped",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
- return convertToCapped(opCtx, parseNs(ns, cmd), cmd["size"].number());
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
+ return convertToCapped(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd["size"].number());
}}},
{"emptycapped",
- {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status {
- return emptyCapped(opCtx, parseNs(ns, cmd));
+ {[](OperationContext* opCtx,
+ const char* ns,
+ const BSONElement& ui,
+ BSONObj& cmd,
+ const OpTime& opTime) -> Status {
+ return emptyCapped(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd));
}}},
};
@@ -736,24 +859,54 @@ Status applyOperation_inlock(OperationContext* opCtx,
OpCounters* opCounters = opCtx->writesAreReplicated() ? &globalOpCounters : &replOpCounters;
- const char* names[] = {"o", "ns", "op", "b", "o2"};
- BSONElement fields[5];
- op.getFields(5, names, fields);
+ std::array<StringData, 6> names = {"o", "ui", "ns", "op", "b", "o2"};
+ std::array<BSONElement, 6> fields;
+ op.getFields(names, &fields);
BSONElement& fieldO = fields[0];
- BSONElement& fieldNs = fields[1];
- BSONElement& fieldOp = fields[2];
- BSONElement& fieldB = fields[3];
- BSONElement& fieldO2 = fields[4];
+ BSONElement& fieldUI = fields[1];
+ BSONElement& fieldNs = fields[2];
+ BSONElement& fieldOp = fields[3];
+ BSONElement& fieldB = fields[4];
+ BSONElement& fieldO2 = fields[5];
BSONObj o;
if (fieldO.isABSONObj())
o = fieldO.embeddedObject();
- uassert(ErrorCodes::InvalidNamespace,
- "'ns' must be of type String",
- fieldNs.type() == BSONType::String);
- const StringData ns = fieldNs.valueStringData();
- NamespaceString requestNss{ns};
+ // operation type -- see logOp() comments for types
+ const char* opType = fieldOp.valuestrsafe();
+
+ NamespaceString requestNss;
+ Collection* collection = nullptr;
+ if (fieldUI) {
+ UUIDCatalog& catalog = UUIDCatalog::get(opCtx);
+ auto uuid = UUID::parse(fieldUI);
+ uassertStatusOK(uuid);
+ collection = catalog.lookupCollectionByUUID(uuid.getValue());
+ if (collection) {
+ requestNss = collection->ns();
+ dassert(opCtx->lockState()->isCollectionLockedForMode(
+ requestNss.ns(), supportsDocLocking() ? MODE_IX : MODE_X));
+ }
+ } else {
+ uassert(ErrorCodes::InvalidNamespace,
+ "'ns' must be of type String",
+ fieldNs.type() == BSONType::String);
+ const StringData ns = fieldNs.valueStringData();
+ requestNss = NamespaceString(ns);
+ if (nsIsFull(ns)) {
+ if (supportsDocLocking()) {
+ // WiredTiger, and others requires MODE_IX since the applier threads driving
+ // this allow writes to the same collection on any thread.
+ dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_IX));
+ } else {
+ // mmapV1 ensures that all operations to the same collection are executed from
+ // the same worker thread, so it takes an exclusive lock (MODE_X)
+ dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_X));
+ }
+ }
+ collection = db->getCollection(opCtx, requestNss);
+ }
BSONObj o2;
if (fieldO2.isABSONObj())
@@ -761,27 +914,11 @@ Status applyOperation_inlock(OperationContext* opCtx,
bool valueB = fieldB.booleanSafe();
- if (nsIsFull(ns)) {
- if (supportsDocLocking()) {
- // WiredTiger, and others requires MODE_IX since the applier threads driving
- // this allow writes to the same collection on any thread.
- dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_IX));
- } else {
- // mmapV1 ensures that all operations to the same collection are executed from
- // the same worker thread, so it takes an exclusive lock (MODE_X)
- dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_X));
- }
- }
- Collection* collection = db->getCollection(opCtx, requestNss);
IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog();
const bool haveWrappingWriteUnitOfWork = opCtx->lockState()->inAWriteUnitOfWork();
uassert(ErrorCodes::CommandNotSupportedOnView,
- str::stream() << "applyOps not supported on view: " << ns,
- collection || !db->getViewCatalog()->lookup(opCtx, ns));
-
- // operation type -- see logOp() comments for types
- const char* opType = fieldOp.valuestrsafe();
- invariant(*opType != 'c'); // commands are processed in applyCommand_inlock()
+ str::stream() << "applyOps not supported on view: " << requestNss.ns(),
+ collection || !db->getViewCatalog()->lookup(opCtx, requestNss.ns()));
if (*opType == 'i') {
if (requestNss.isSystemDotIndexes()) {
@@ -789,44 +926,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
NamespaceString indexNss;
std::tie(indexSpec, indexNss) =
repl::prepForApplyOpsIndexInsert(fieldO, op, requestNss);
-
- // Check if collection exists.
- auto indexCollection = db->getCollection(opCtx, indexNss);
- uassert(ErrorCodes::NamespaceNotFound,
- str::stream() << "Failed to create index due to missing collection: "
- << op.toString(),
- indexCollection);
-
- opCounters->gotInsert();
-
- bool relaxIndexConstraints =
- ReplicationCoordinator::get(opCtx)->shouldRelaxIndexConstraints(opCtx, indexNss);
- if (indexSpec["background"].trueValue()) {
- Lock::TempRelease release(opCtx->lockState());
- if (opCtx->lockState()->isLocked()) {
- // If TempRelease fails, background index build will deadlock.
- LOG(3) << "apply op: building background index " << indexSpec
- << " in the foreground because temp release failed";
- IndexBuilder builder(indexSpec, relaxIndexConstraints);
- Status status = builder.buildInForeground(opCtx, db);
- uassertStatusOK(status);
- } else {
- IndexBuilder* builder = new IndexBuilder(indexSpec, relaxIndexConstraints);
- // This spawns a new thread and returns immediately.
- builder->go();
- // Wait for thread to start and register itself
- IndexBuilder::waitForBgIndexStarting();
- }
- opCtx->recoveryUnit()->abandonSnapshot();
- } else {
- IndexBuilder builder(indexSpec, relaxIndexConstraints);
- Status status = builder.buildInForeground(opCtx, db);
- uassertStatusOK(status);
- }
- // Since this is an index operation we can return without falling through.
- if (incrementOpsAppliedStats) {
- incrementOpsAppliedStats();
- }
+ createIndexForApplyOps(opCtx, indexSpec, indexNss, incrementOpsAppliedStats);
return Status::OK();
}
uassert(ErrorCodes::NamespaceNotFound,
@@ -1005,6 +1105,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
incrementOpsAppliedStats();
}
} else {
+ invariant(*opType != 'c'); // commands are processed in applyCommand_inlock()
throw MsgAssertionException(
14825, str::stream() << "error in applyOperation : unknown opType " << *opType);
}
@@ -1015,12 +1116,13 @@ Status applyOperation_inlock(OperationContext* opCtx,
Status applyCommand_inlock(OperationContext* opCtx,
const BSONObj& op,
bool inSteadyStateReplication) {
- const char* names[] = {"o", "ns", "op"};
- BSONElement fields[3];
- op.getFields(3, names, fields);
+ std::array<StringData, 4> names = {"o", "ui", "ns", "op"};
+ std::array<BSONElement, 4> fields;
+ op.getFields(names, &fields);
BSONElement& fieldO = fields[0];
- BSONElement& fieldNs = fields[1];
- BSONElement& fieldOp = fields[2];
+ BSONElement& fieldUI = fields[1];
+ BSONElement& fieldNs = fields[2];
+ BSONElement& fieldOp = fields[3];
const char* opType = fieldOp.valuestrsafe();
invariant(*opType == 'c'); // only commands are processed here
@@ -1084,7 +1186,7 @@ Status applyCommand_inlock(OperationContext* opCtx,
ApplyOpMetadata curOpToApply = op->second;
Status status = Status::OK();
try {
- status = curOpToApply.applyFunc(opCtx, nss.ns().c_str(), o, opTime);
+ status = curOpToApply.applyFunc(opCtx, nss.ns().c_str(), fieldUI, o, opTime);
} catch (...) {
status = exceptionToStatus();
}