diff options
-rw-r--r-- | src/mongo/db/catalog/coll_mod.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 217 |
3 files changed, 202 insertions, 20 deletions
diff --git a/src/mongo/db/catalog/coll_mod.cpp b/src/mongo/db/catalog/coll_mod.cpp index df3e2e203e1..700d50beb45 100644 --- a/src/mongo/db/catalog/coll_mod.cpp +++ b/src/mongo/db/catalog/coll_mod.cpp @@ -84,7 +84,7 @@ StatusWith<CollModRequest> parseCollModRequest(OperationContext* txn, const IndexDescriptor* idx = coll->getIndexCatalog()->findIndexByKeyPattern(txn, keyPattern); if (idx == NULL) { - return Status(ErrorCodes::InvalidOptions, + return Status(ErrorCodes::IndexNotFound, str::stream() << "cannot find index " << keyPattern << " for ns " << nss.ns()); } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 2c23f12e97b..72db5bfa9fc 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -644,7 +644,8 @@ std::map<std::string, ApplyOpMetadata> opsMap = { {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { BSONObjBuilder resultWeDontCareAbout; return collMod(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); - }}}, + }, + {ErrorCodes::IndexNotFound, ErrorCodes::NamespaceNotFound}}}, {"dropDatabase", {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { return dropDatabase(txn, NamespaceString(ns).db().toString()); }, diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index eb0895c0d82..ad4f9b95c35 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -30,14 +30,19 @@ #include <memory> +#include "mongo/db/catalog/collection_options.h" +#include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" +#include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/client.h" #include "mongo/db/curop.h" +#include "mongo/db/db_raii.h" #include "mongo/db/jsobj.h" +#include "mongo/db/json.h" +#include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -45,8 +50,11 @@ #include "mongo/db/repl/sync_tail.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/storage_options.h" -#include "mongo/unittest/unittest.h" #include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/md5.hpp" +#include "mongo/util/scopeguard.h" + namespace { @@ -88,7 +96,7 @@ void SyncTailTest::setUp() { // go away after the global storage engine is initialized. unittest::TempDir tempDir("sync_tail_test"); mongo::storageGlobalParams.dbpath = tempDir.path(); - mongo::storageGlobalParams.engine = "devnull"; + mongo::storageGlobalParams.engine = "ephemeralForTest"; mongo::storageGlobalParams.engineSetByUser = true; serviceContext->initializeGlobalStorageEngine(); } @@ -109,8 +117,21 @@ void SyncTailTest::setUp() { } void SyncTailTest::tearDown() { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + + dropAllDatabasesExceptLocal(_txn.get()); { + ScopedTransaction transaction(_txn.get(), MODE_X); Lock::GlobalWrite globalLock(_txn->lockState()); + AutoGetDb autoDBLocal(_txn.get(), "local", MODE_X); + auto localDB = autoDBLocal.getDb(); + if (localDB) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + // Do not wrap in a WriteUnitOfWork until SERVER-17103 is addressed. + dropDatabase(_txn.get(), localDB); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn.get(), "_dropAllDBs", "local"); + } BSONObjBuilder unused; invariant(mongo::dbHolder().closeAll(_txn.get(), unused, false)); } @@ -249,11 +270,13 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionMissing) { TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionExists) { { Lock::GlobalWrite globalLock(_txn->lockState()); + WriteUnitOfWork wunit(_txn.get()); bool justCreated = false; Database* db = dbHolder().openDb(_txn.get(), "test", &justCreated); ASSERT_TRUE(db); ASSERT_TRUE(justCreated); Collection* collection = db->createCollection(_txn.get(), "test.t"); + wunit.commit(); ASSERT_TRUE(collection); } _testSyncApplyInsertDocument(MODE_IX); @@ -342,25 +365,183 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) { ASSERT_EQUALS(1U, _opsApplied); } -TEST_F(SyncTailTest, MultiInitialSyncApplyFailsOnRenameCollection) { - SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc()); - +/** + * Creates a command oplog entry with given optime and namespace. + */ +BSONObj makeCommandOplogEntry(OpTime opTime, const NamespaceString& nss, const BSONObj& command) { BSONObjBuilder bob; - bob.appendElements(OpTime(Timestamp(1, 0), 1LL).toBSON()); + bob.appendElements(opTime.toBSON()); bob.append("h", 1LL); + bob.append("v", 2); bob.append("op", "c"); - bob.append("ns", "test.$cmd"); - bob.append("o", - BSON("renameCollection" - << "test.foo" - << "to" - << "test.bar" - << "stayTemp" << false << "dropTarget" << false)); - auto op = bob.obj(); - - ASSERT_EQUALS(ErrorCodes::OplogOperationUnsupported, - SyncTail::syncApply(_txn.get(), op, false)); + bob.append("ns", nss.getCommandNS()); + bob.append("o", command); + return bob.obj(); +} + +/** + * Creates a create collection oplog entry with given optime. + */ +BSONObj makeCreateCollectionOplogEntry(OpTime opTime, + const NamespaceString& nss = NamespaceString("test.foo"), + const BSONObj& options = BSONObj()) { + BSONObjBuilder bob; + bob.append("create", nss.coll()); + bob.appendElements(options); + return makeCommandOplogEntry(opTime, nss, bob.obj()); } +/** + * Creates an insert oplog entry with given optime and namespace. + */ +BSONObj makeInsertDocumentOplogEntry(OpTime opTime, + const NamespaceString& nss, + const BSONObj& documentToInsert) { + BSONObjBuilder bob; + bob.appendElements(opTime.toBSON()); + bob.append("h", 1LL); + bob.append("op", "i"); + bob.append("ns", nss.ns()); + bob.append("o", documentToInsert); + return bob.obj(); +} + +class IdempotencyTest : public SyncTailTest { +protected: + BSONObj createCollection(); + BSONObj buildIndex(const BSONObj& indexSpec, const BSONObj& options = BSONObj()); + BSONObj dropIndex(const std::string& indexName); + OpTime nextOpTime() { + static long long lastSecond = 1; + return OpTime(Timestamp(Seconds(lastSecond++), 0), 1LL); + } + Status runOp(const BSONObj& entry); + Status runOps(std::initializer_list<BSONObj> ops); + // Validate data and indexes. Return the MD5 hash of the documents ordered by _id. + std::string validate(); + + NamespaceString nss{"test.foo"}; + NamespaceString nssIndex{"test.system.indexes"}; +}; + +Status IdempotencyTest::runOp(const BSONObj& op) { + return runOps({op}); +} + +Status IdempotencyTest::runOps(std::initializer_list<BSONObj> ops) { + for (auto& op : ops) { + Status status = SyncTail::syncApply(_txn.get(), op, false); + if (!status.isOK()) { + return status; + } + } + return Status::OK(); +} + +BSONObj IdempotencyTest::createCollection() { + return makeCreateCollectionOplogEntry(nextOpTime(), nss); +} + +BSONObj IdempotencyTest::buildIndex(const BSONObj& indexSpec, const BSONObj& options) { + BSONObjBuilder bob; + bob.append("v", 1); + bob.append("key", indexSpec); + bob.append("name", std::string(indexSpec.firstElementFieldName()) + "_index"); + bob.append("ns", nss.ns()); + bob.appendElementsUnique(options); + return makeInsertDocumentOplogEntry(nextOpTime(), nssIndex, bob.obj()); +} + +BSONObj IdempotencyTest::dropIndex(const std::string& indexName) { + auto cmd = BSON("deleteIndex" << nss.coll() << "index" << indexName); + return makeCommandOplogEntry(nextOpTime(), nss, cmd); +} + +std::string IdempotencyTest::validate() { + auto collection = AutoGetCollectionForRead(_txn.get(), nss).getCollection(); + if (!collection) { + return "CollectionNotFound"; + } + ValidateResults validateResults; + BSONObjBuilder bob; + + Lock::DBLock lk(_txn->lockState(), nss.db(), MODE_IS); + Lock::CollectionLock lock(_txn->lockState(), nss.ns(), MODE_IS); + ASSERT_OK(collection->validate(_txn.get(), true, true, &validateResults, &bob)); + ASSERT_TRUE(validateResults.valid); + + IndexDescriptor* desc = collection->getIndexCatalog()->findIdIndex(_txn.get()); + ASSERT_TRUE(desc); + auto exec = InternalPlanner::indexScan(_txn.get(), + collection, + desc, + BSONObj(), + BSONObj(), + false, + PlanExecutor::YIELD_MANUAL, + InternalPlanner::FORWARD, + InternalPlanner::IXSCAN_FETCH); + ASSERT(NULL != exec.get()); + md5_state_t st; + md5_init(&st); + + PlanExecutor::ExecState state; + BSONObj c; + while (PlanExecutor::ADVANCED == (state = exec->getNext(&c, NULL))) { + md5_append(&st, (const md5_byte_t*)c.objdata(), c.objsize()); + } + ASSERT_EQUALS(PlanExecutor::IS_EOF, state); + md5digest d; + md5_finish(&st, d); + return digestToString(d); +} + +TEST_F(IdempotencyTest, CollModNamespaceNotFound) { + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING); + + ASSERT_OK(runOp(createCollection())); + ASSERT_OK(runOp(buildIndex(BSON("createdAt" << 1), BSON("expireAfterSeconds" << 3600)))); + + auto indexChange = fromjson("{keyPattern: {createdAt:1}, expireAfterSeconds:4000}}"); + auto collModCmd = BSON("collMod" << nss.coll() << "index" << indexChange); + auto collModOp = makeCommandOplogEntry(nextOpTime(), nss, collModCmd); + auto dropCollOp = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); + + auto ops = {collModOp, dropCollOp}; + + ASSERT_OK(runOps(ops)); + auto hash = validate(); + ASSERT_OK(runOps(ops)); + ASSERT_EQUALS(hash, validate()); +} + +TEST_F(IdempotencyTest, CollModIndexNotFound) { + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING); + + ASSERT_OK(runOp(createCollection())); + ASSERT_OK(runOp(buildIndex(BSON("createdAt" << 1), BSON("expireAfterSeconds" << 3600)))); + + auto indexChange = fromjson("{keyPattern: {createdAt:1}, expireAfterSeconds:4000}}"); + auto collModCmd = BSON("collMod" << nss.coll() << "index" << indexChange); + auto collModOp = makeCommandOplogEntry(nextOpTime(), nss, collModCmd); + auto dropIndexOp = dropIndex("createdAt_index"); + + auto ops = {collModOp, dropIndexOp}; + + ASSERT_OK(runOps(ops)); + auto hash = validate(); + ASSERT_OK(runOps(ops)); + ASSERT_EQUALS(hash, validate()); +} + +TEST_F(IdempotencyTest, ResyncOnRenameCollection) { + ReplicationCoordinator::get(_txn.get())->setFollowerMode(MemberState::RS_RECOVERING); + + auto cmd = BSON("renameCollection" << nss.ns() << "to" + << "test.bar" + << "stayTemp" << false << "dropTarget" << false); + auto op = makeCommandOplogEntry(nextOpTime(), nss, cmd); + ASSERT_EQUALS(runOp(op), ErrorCodes::OplogOperationUnsupported); +} } // namespace |