diff options
Diffstat (limited to 'src/mongo/db/repl/oplog.cpp')
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 326 |
1 files changed, 214 insertions, 112 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index b6d7249e6dc..9baa4b211cc 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -54,6 +54,7 @@ #include "mongo/db/catalog/drop_database.h" #include "mongo/db/catalog/drop_indexes.h" #include "mongo/db/catalog/rename_collection.h" +#include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -567,8 +568,72 @@ NamespaceString parseNs(const string& ns, const BSONObj& cmdObj) { return NamespaceString(NamespaceString(ns).db().toString(), coll); } -using OpApplyFn = stdx::function<Status( - OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime)>; +NamespaceString parseUUID(OperationContext* opCtx, const BSONElement& ui) { + auto statusWithUUID = UUID::parse(ui); + uassertStatusOK(statusWithUUID); + auto uuid = statusWithUUID.getValue(); + auto& catalog = UUIDCatalog::get(opCtx); + auto nss = catalog.lookupNSSByUUID(uuid); + uassert( + ErrorCodes::NamespaceNotFound, "No namespace with UUID " + uuid.toString(), !nss.isEmpty()); + return nss; +} + +NamespaceString parseUUIDorNs(OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd) { + return ui.ok() ? parseUUID(opCtx, ui) : parseNs(ns, cmd); +} + +void createIndexForApplyOps(OperationContext* opCtx, + const BSONObj& indexSpec, + const NamespaceString& indexNss, + IncrementOpsAppliedStatsFn incrementOpsAppliedStats) { + // Check if collection exists. + Database* db = dbHolder().get(opCtx, indexNss.ns()); + auto indexCollection = db ? db->getCollection(opCtx, indexNss) : nullptr; + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Failed to create index due to missing collection: " << indexNss.ns(), + indexCollection); + + OpCounters* opCounters = opCtx->writesAreReplicated() ? &globalOpCounters : &replOpCounters; + opCounters->gotInsert(); + + bool relaxIndexConstraints = + ReplicationCoordinator::get(opCtx)->shouldRelaxIndexConstraints(opCtx, indexNss); + if (indexSpec["background"].trueValue()) { + Lock::TempRelease release(opCtx->lockState()); + if (opCtx->lockState()->isLocked()) { + // If TempRelease fails, background index build will deadlock. + LOG(3) << "apply op: building background index " << indexSpec + << " in the foreground because temp release failed"; + IndexBuilder builder(indexSpec, relaxIndexConstraints); + Status status = builder.buildInForeground(opCtx, db); + uassertStatusOK(status); + } else { + IndexBuilder* builder = new IndexBuilder(indexSpec, relaxIndexConstraints); + // This spawns a new thread and returns immediately. + builder->go(); + // Wait for thread to start and register itself + IndexBuilder::waitForBgIndexStarting(); + } + opCtx->recoveryUnit()->abandonSnapshot(); + } else { + IndexBuilder builder(indexSpec, relaxIndexConstraints); + Status status = builder.buildInForeground(opCtx, db); + uassertStatusOK(status); + } + if (incrementOpsAppliedStats) { + incrementOpsAppliedStats(); + } +} + +using OpApplyFn = stdx::function<Status(OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime)>; struct ApplyOpMetadata { OpApplyFn applyFunc; @@ -586,13 +651,17 @@ struct ApplyOpMetadata { std::map<std::string, ApplyOpMetadata> opsMap = { {"create", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { const NamespaceString nss(parseNs(ns, cmd)); if (auto idIndexElem = cmd["idIndex"]) { // Remove "idIndex" field from command. auto cmdWithoutIdIndex = cmd.removeField("idIndex"); - return createCollection( - opCtx, nss.db().toString(), cmdWithoutIdIndex, idIndexElem.Obj()); + return createCollectionForApplyOps( + opCtx, nss.db().toString(), ui, cmdWithoutIdIndex, idIndexElem.Obj()); } // No _id index spec was provided, so we should build a v:1 _id index. @@ -602,25 +671,59 @@ std::map<std::string, ApplyOpMetadata> opsMap = { idIndexSpecBuilder.append(IndexDescriptor::kIndexNameFieldName, "_id_"); idIndexSpecBuilder.append(IndexDescriptor::kNamespaceFieldName, nss.ns()); idIndexSpecBuilder.append(IndexDescriptor::kKeyPatternFieldName, BSON("_id" << 1)); - return createCollection(opCtx, nss.db().toString(), cmd, idIndexSpecBuilder.done()); + return createCollectionForApplyOps( + opCtx, nss.db().toString(), ui, cmd, idIndexSpecBuilder.done()); }, {ErrorCodes::NamespaceExists}}}, + {"createIndexes", + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { + const NamespaceString nss(parseUUID(opCtx, ui)); + BSONElement first = cmd.firstElement(); + invariant(first.fieldNameStringData() == "createIndexes"); + uassert(ErrorCodes::InvalidNamespace, + "createIndexes value must be a string", + first.type() == mongo::String); + BSONObj indexSpec = cmd.removeField("createIndexes"); + // The UUID determines the collection to build the index on, so create new 'ns' field. + BSONObj nsObj = BSON("ns" << nss.ns()); + indexSpec = indexSpec.addField(nsObj.firstElement()); + + createIndexForApplyOps(opCtx, indexSpec, nss, {}); + return Status::OK(); + }, + {ErrorCodes::IndexAlreadyExists}}}, {"collMod", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { BSONObjBuilder resultWeDontCareAbout; - return collMod(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + return collMod(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout); }, {ErrorCodes::IndexNotFound, ErrorCodes::NamespaceNotFound}}}, {"dropDatabase", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { return dropDatabase(opCtx, NamespaceString(ns).db().toString()); }, {ErrorCodes::NamespaceNotFound}}}, {"drop", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { BSONObjBuilder resultWeDontCareAbout; return dropCollection(opCtx, - parseNs(ns, cmd), + parseUUIDorNs(opCtx, ns, ui, cmd), resultWeDontCareAbout, opTime, DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); @@ -628,59 +731,79 @@ std::map<std::string, ApplyOpMetadata> opsMap = { {ErrorCodes::NamespaceNotFound}}}, // deleteIndex(es) is deprecated but still works as of April 10, 2015 {"deleteIndex", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"deleteIndexes", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"dropIndex", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"dropIndexes", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"renameCollection", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { - const auto sourceNsElt = cmd.firstElement(); - const auto targetNsElt = cmd["to"]; - uassert(ErrorCodes::TypeMismatch, - "'renameCollection' must be of type String", - sourceNsElt.type() == BSONType::String); - uassert(ErrorCodes::TypeMismatch, - "'to' must be of type String", - targetNsElt.type() == BSONType::String); - return renameCollection(opCtx, - NamespaceString(sourceNsElt.valueStringData()), - NamespaceString(targetNsElt.valueStringData()), - cmd["dropTarget"].trueValue(), - cmd["stayTemp"].trueValue()); + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { + return renameCollectionForApplyOps(opCtx, nsToDatabase(ns), ui, cmd); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::NamespaceExists}}}, {"applyOps", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { BSONObjBuilder resultWeDontCareAbout; return applyOps(opCtx, nsToDatabase(ns), cmd, &resultWeDontCareAbout); }, {ErrorCodes::UnknownError}}}, {"convertToCapped", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { - return convertToCapped(opCtx, parseNs(ns, cmd), cmd["size"].number()); + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { + return convertToCapped(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd["size"].number()); }}}, {"emptycapped", - {[](OperationContext* opCtx, const char* ns, BSONObj& cmd, const OpTime& opTime) -> Status { - return emptyCapped(opCtx, parseNs(ns, cmd)); + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime) -> Status { + return emptyCapped(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd)); }}}, }; @@ -736,24 +859,54 @@ Status applyOperation_inlock(OperationContext* opCtx, OpCounters* opCounters = opCtx->writesAreReplicated() ? &globalOpCounters : &replOpCounters; - const char* names[] = {"o", "ns", "op", "b", "o2"}; - BSONElement fields[5]; - op.getFields(5, names, fields); + std::array<StringData, 6> names = {"o", "ui", "ns", "op", "b", "o2"}; + std::array<BSONElement, 6> fields; + op.getFields(names, &fields); BSONElement& fieldO = fields[0]; - BSONElement& fieldNs = fields[1]; - BSONElement& fieldOp = fields[2]; - BSONElement& fieldB = fields[3]; - BSONElement& fieldO2 = fields[4]; + BSONElement& fieldUI = fields[1]; + BSONElement& fieldNs = fields[2]; + BSONElement& fieldOp = fields[3]; + BSONElement& fieldB = fields[4]; + BSONElement& fieldO2 = fields[5]; BSONObj o; if (fieldO.isABSONObj()) o = fieldO.embeddedObject(); - uassert(ErrorCodes::InvalidNamespace, - "'ns' must be of type String", - fieldNs.type() == BSONType::String); - const StringData ns = fieldNs.valueStringData(); - NamespaceString requestNss{ns}; + // operation type -- see logOp() comments for types + const char* opType = fieldOp.valuestrsafe(); + + NamespaceString requestNss; + Collection* collection = nullptr; + if (fieldUI) { + UUIDCatalog& catalog = UUIDCatalog::get(opCtx); + auto uuid = UUID::parse(fieldUI); + uassertStatusOK(uuid); + collection = catalog.lookupCollectionByUUID(uuid.getValue()); + if (collection) { + requestNss = collection->ns(); + dassert(opCtx->lockState()->isCollectionLockedForMode( + requestNss.ns(), supportsDocLocking() ? MODE_IX : MODE_X)); + } + } else { + uassert(ErrorCodes::InvalidNamespace, + "'ns' must be of type String", + fieldNs.type() == BSONType::String); + const StringData ns = fieldNs.valueStringData(); + requestNss = NamespaceString(ns); + if (nsIsFull(ns)) { + if (supportsDocLocking()) { + // WiredTiger, and others requires MODE_IX since the applier threads driving + // this allow writes to the same collection on any thread. + dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_IX)); + } else { + // mmapV1 ensures that all operations to the same collection are executed from + // the same worker thread, so it takes an exclusive lock (MODE_X) + dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_X)); + } + } + collection = db->getCollection(opCtx, requestNss); + } BSONObj o2; if (fieldO2.isABSONObj()) @@ -761,27 +914,11 @@ Status applyOperation_inlock(OperationContext* opCtx, bool valueB = fieldB.booleanSafe(); - if (nsIsFull(ns)) { - if (supportsDocLocking()) { - // WiredTiger, and others requires MODE_IX since the applier threads driving - // this allow writes to the same collection on any thread. - dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_IX)); - } else { - // mmapV1 ensures that all operations to the same collection are executed from - // the same worker thread, so it takes an exclusive lock (MODE_X) - dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_X)); - } - } - Collection* collection = db->getCollection(opCtx, requestNss); IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog(); const bool haveWrappingWriteUnitOfWork = opCtx->lockState()->inAWriteUnitOfWork(); uassert(ErrorCodes::CommandNotSupportedOnView, - str::stream() << "applyOps not supported on view: " << ns, - collection || !db->getViewCatalog()->lookup(opCtx, ns)); - - // operation type -- see logOp() comments for types - const char* opType = fieldOp.valuestrsafe(); - invariant(*opType != 'c'); // commands are processed in applyCommand_inlock() + str::stream() << "applyOps not supported on view: " << requestNss.ns(), + collection || !db->getViewCatalog()->lookup(opCtx, requestNss.ns())); if (*opType == 'i') { if (requestNss.isSystemDotIndexes()) { @@ -789,44 +926,7 @@ Status applyOperation_inlock(OperationContext* opCtx, NamespaceString indexNss; std::tie(indexSpec, indexNss) = repl::prepForApplyOpsIndexInsert(fieldO, op, requestNss); - - // Check if collection exists. - auto indexCollection = db->getCollection(opCtx, indexNss); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "Failed to create index due to missing collection: " - << op.toString(), - indexCollection); - - opCounters->gotInsert(); - - bool relaxIndexConstraints = - ReplicationCoordinator::get(opCtx)->shouldRelaxIndexConstraints(opCtx, indexNss); - if (indexSpec["background"].trueValue()) { - Lock::TempRelease release(opCtx->lockState()); - if (opCtx->lockState()->isLocked()) { - // If TempRelease fails, background index build will deadlock. - LOG(3) << "apply op: building background index " << indexSpec - << " in the foreground because temp release failed"; - IndexBuilder builder(indexSpec, relaxIndexConstraints); - Status status = builder.buildInForeground(opCtx, db); - uassertStatusOK(status); - } else { - IndexBuilder* builder = new IndexBuilder(indexSpec, relaxIndexConstraints); - // This spawns a new thread and returns immediately. - builder->go(); - // Wait for thread to start and register itself - IndexBuilder::waitForBgIndexStarting(); - } - opCtx->recoveryUnit()->abandonSnapshot(); - } else { - IndexBuilder builder(indexSpec, relaxIndexConstraints); - Status status = builder.buildInForeground(opCtx, db); - uassertStatusOK(status); - } - // Since this is an index operation we can return without falling through. - if (incrementOpsAppliedStats) { - incrementOpsAppliedStats(); - } + createIndexForApplyOps(opCtx, indexSpec, indexNss, incrementOpsAppliedStats); return Status::OK(); } uassert(ErrorCodes::NamespaceNotFound, @@ -1005,6 +1105,7 @@ Status applyOperation_inlock(OperationContext* opCtx, incrementOpsAppliedStats(); } } else { + invariant(*opType != 'c'); // commands are processed in applyCommand_inlock() throw MsgAssertionException( 14825, str::stream() << "error in applyOperation : unknown opType " << *opType); } @@ -1015,12 +1116,13 @@ Status applyOperation_inlock(OperationContext* opCtx, Status applyCommand_inlock(OperationContext* opCtx, const BSONObj& op, bool inSteadyStateReplication) { - const char* names[] = {"o", "ns", "op"}; - BSONElement fields[3]; - op.getFields(3, names, fields); + std::array<StringData, 4> names = {"o", "ui", "ns", "op"}; + std::array<BSONElement, 4> fields; + op.getFields(names, &fields); BSONElement& fieldO = fields[0]; - BSONElement& fieldNs = fields[1]; - BSONElement& fieldOp = fields[2]; + BSONElement& fieldUI = fields[1]; + BSONElement& fieldNs = fields[2]; + BSONElement& fieldOp = fields[3]; const char* opType = fieldOp.valuestrsafe(); invariant(*opType == 'c'); // only commands are processed here @@ -1084,7 +1186,7 @@ Status applyCommand_inlock(OperationContext* opCtx, ApplyOpMetadata curOpToApply = op->second; Status status = Status::OK(); try { - status = curOpToApply.applyFunc(opCtx, nss.ns().c_str(), o, opTime); + status = curOpToApply.applyFunc(opCtx, nss.ns().c_str(), fieldUI, o, opTime); } catch (...) { status = exceptionToStatus(); } |