diff options
Diffstat (limited to 'src/mongo/db/repl/sync_tail_test.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 295 |
1 files changed, 270 insertions, 25 deletions
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 891a389a3d9..1ce29f5d51f 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -33,6 +33,7 @@ #include <utility> #include <vector> +#include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" @@ -43,6 +44,7 @@ #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/jsobj.h" +#include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_interface_local.h" @@ -56,6 +58,7 @@ #include "mongo/stdx/mutex.h" #include "mongo/unittest/unittest.h" #include "mongo/util/concurrency/old_thread_pool.h" +#include "mongo/util/md5.hpp" #include "mongo/util/string_map.h" namespace { @@ -78,7 +81,6 @@ protected: return Status::OK(); } -private: void setUp() override; void tearDown() override; }; @@ -108,6 +110,7 @@ void SyncTailTest::setUp() { ServiceContextMongoDTest::setUp(); ReplSettings replSettings; replSettings.setOplogSizeBytes(5 * 1024 * 1024); + replSettings.setReplSetString("repl"); auto serviceContext = getServiceContext(); ReplicationCoordinator::set(serviceContext, @@ -188,20 +191,30 @@ void createCollection(OperationContext* txn, } /** - * Creates a create collection oplog entry with given optime. + * Creates a command oplog entry with given optime and namespace. */ -OplogEntry makeCreateCollectionOplogEntry(OpTime opTime, - const NamespaceString& nss = NamespaceString("test.t")) { +OplogEntry makeCommandOplogEntry(OpTime opTime, + const NamespaceString& nss, + const BSONObj& command) { BSONObjBuilder bob; bob.appendElements(opTime.toBSON()); bob.append("h", 1LL); + bob.append("v", 2); bob.append("op", "c"); bob.append("ns", nss.getCommandNS()); - bob.append("o", BSON("create" << nss.coll())); + bob.append("o", command); return OplogEntry(bob.obj()); } /** + * Creates a create collection oplog entry with given optime. + */ +OplogEntry makeCreateCollectionOplogEntry(OpTime opTime, + const NamespaceString& nss = NamespaceString("test.t")) { + return makeCommandOplogEntry(opTime, nss, BSON("create" << nss.coll())); +} + +/** * Creates an insert oplog entry with given optime and namespace. */ OplogEntry makeInsertDocumentOplogEntry(OpTime opTime, @@ -921,30 +934,262 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughShouldSyncTailRetryError) ASSERT_EQUALS(fetchCount.load(), 1U); } -TEST_F(SyncTailTest, MultiInitialSyncApplyFailsOnRenameCollection) { +class IdempotencyTest : public SyncTailTest { +protected: + OplogEntry createCollection(); + OplogEntry insert(const BSONObj& obj); + OplogEntry update(int _id, const BSONObj& obj); + OplogEntry buildIndex(const BSONObj& indexSpec, const BSONObj& options = BSONObj()); + OplogEntry dropIndex(const std::string& indexName); + OpTime nextOpTime() { + static long long lastSecond = 1; + return OpTime(Timestamp(Seconds(lastSecond++), 0), 1LL); + } + Status runOp(const OplogEntry& entry); + Status runOps(std::initializer_list<OplogEntry> ops); + // Validate data and indexes. Return the MD5 hash of the documents ordered by _id. + std::string validate(); + + NamespaceString nss{"test.foo"}; + NamespaceString nssIndex{"test.system.indexes"}; +}; + +Status IdempotencyTest::runOp(const OplogEntry& op) { + return runOps({op}); +} + +Status IdempotencyTest::runOps(std::initializer_list<OplogEntry> ops) { SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); + MultiApplier::OperationPtrs opsPtrs; + for (auto& op : ops) { + opsPtrs.push_back(&op); + } + AtomicUInt32 fetchCount(0); + return multiInitialSyncApply_noAbort(_txn.get(), &opsPtrs, &syncTail, &fetchCount); +} +OplogEntry IdempotencyTest::createCollection() { + return makeCreateCollectionOplogEntry(nextOpTime(), nss); +} + +OplogEntry IdempotencyTest::insert(const BSONObj& obj) { + return makeInsertDocumentOplogEntry(nextOpTime(), nss, obj); +} + +OplogEntry IdempotencyTest::update(int id, const BSONObj& obj) { + return makeUpdateDocumentOplogEntry(nextOpTime(), nss, BSON("_id" << id), obj); +} + +OplogEntry IdempotencyTest::buildIndex(const BSONObj& indexSpec, const BSONObj& options) { BSONObjBuilder bob; - bob.appendElements(OpTime(Timestamp(1, 0), 1LL).toBSON()); - bob.append("h", 1LL); - bob.append("op", "c"); - bob.append("ns", "test.$cmd"); - bob.append("o", - BSON("renameCollection" - << "test.foo" - << "to" - << "test.bar" - << "stayTemp" - << false - << "dropTarget" - << false)); - auto op = OplogEntry(bob.obj()); + bob.append("v", 2); + bob.append("key", indexSpec); + bob.append("name", std::string(indexSpec.firstElementFieldName()) + "_index"); + bob.append("ns", nss.ns()); + bob.appendElementsUnique(options); + return makeInsertDocumentOplogEntry(nextOpTime(), nssIndex, bob.obj()); +} - MultiApplier::OperationPtrs ops = {&op}; - AtomicUInt32 fetchCount(0); - ASSERT_EQUALS(ErrorCodes::OplogOperationUnsupported, - multiInitialSyncApply_noAbort(_txn.get(), &ops, &syncTail, &fetchCount)); - ASSERT_EQUALS(fetchCount.load(), 0U); +OplogEntry IdempotencyTest::dropIndex(const std::string& indexName) { + auto cmd = BSON("deleteIndexes" << nss.coll() << "index" << indexName); + return makeCommandOplogEntry(nextOpTime(), nss, cmd); +} + +std::string IdempotencyTest::validate() { + auto collection = AutoGetCollectionForRead(_txn.get(), nss).getCollection(); + ValidateResults validateResults; + BSONObjBuilder bob; + + Lock::DBLock lk(_txn->lockState(), nss.db(), MODE_IS); + Lock::CollectionLock lock(_txn->lockState(), nss.ns(), MODE_IS); + ASSERT_OK(collection->validate(_txn.get(), kValidateFull, &validateResults, &bob)); + ASSERT_TRUE(validateResults.valid); + + IndexDescriptor* desc = collection->getIndexCatalog()->findIdIndex(_txn.get()); + ASSERT_TRUE(desc); + auto exec = InternalPlanner::indexScan(_txn.get(), + collection, + desc, + BSONObj(), + BSONObj(), + BoundInclusion::kIncludeStartKeyOnly, + PlanExecutor::YIELD_MANUAL, + InternalPlanner::FORWARD, + InternalPlanner::IXSCAN_FETCH); + ASSERT(NULL != exec.get()); + md5_state_t st; + md5_init(&st); + + PlanExecutor::ExecState state; + BSONObj c; + while (PlanExecutor::ADVANCED == (state = exec->getNext(&c, NULL))) { + md5_append(&st, (const md5_byte_t*)c.objdata(), c.objsize()); + } + ASSERT_EQUALS(PlanExecutor::IS_EOF, state); + md5digest d; + md5_finish(&st, d); + return digestToString(d); +} + +TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnUpdate) { + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING); + ASSERT_OK(runOp(createCollection())); + auto insertOp = insert(fromjson("{_id: 1, loc: 'hi'}")); + auto updateOp = update(1, fromjson("{$set: {loc: [1, 2]}}")); + auto indexOp = buildIndex(fromjson("{loc: '2dsphere'}"), BSON("2dsphereIndexVersion" << 3)); + + auto ops = {insertOp, updateOp, indexOp}; + + ASSERT_OK(runOps(ops)); + auto hash = validate(); + ASSERT_OK(runOps(ops)); + ASSERT_EQUALS(hash, validate()); + + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY); + auto status = runOps(ops); + ASSERT_EQ(status.code(), 16755); +} + +TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnIndexing) { + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING); + ASSERT_OK(runOp(createCollection())); + auto indexOp = buildIndex(fromjson("{loc: '2dsphere'}"), BSON("2dsphereIndexVersion" << 3)); + auto dropIndexOp = dropIndex("loc_index"); + auto insertOp = insert(fromjson("{_id: 1, loc: 'hi'}")); + + auto ops = {indexOp, dropIndexOp, insertOp}; + + ASSERT_OK(runOps(ops)); + auto hash = validate(); + ASSERT_OK(runOps(ops)); + ASSERT_EQUALS(hash, validate()); + + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY); + auto status = runOps(ops); + ASSERT_EQ(status.code(), 16755); +} + +TEST_F(IdempotencyTest, Geo2dIndex) { + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING); + ASSERT_OK(runOp(createCollection())); + auto insertOp = insert(fromjson("{_id: 1, loc: [1]}")); + auto updateOp = update(1, fromjson("{$set: {loc: [1, 2]}}")); + auto indexOp = buildIndex(fromjson("{loc: '2d'}")); + + auto ops = {insertOp, updateOp, indexOp}; + + ASSERT_OK(runOps(ops)); + auto hash = validate(); + ASSERT_OK(runOps(ops)); + ASSERT_EQUALS(hash, validate()); + + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY); + auto status = runOps(ops); + ASSERT_EQ(status.code(), 13068); +} + +TEST_F(IdempotencyTest, UniqueKeyIndex) { + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING); + ASSERT_OK(runOp(createCollection())); + auto insertOp = insert(fromjson("{_id: 1, x: 5}")); + auto updateOp = update(1, fromjson("{$set: {x: 6}}")); + auto insertOp2 = insert(fromjson("{_id: 2, x: 5}")); + auto indexOp = buildIndex(fromjson("{x: 1}"), fromjson("{unique: true}")); + + auto ops = {insertOp, updateOp, insertOp2, indexOp}; + + ASSERT_OK(runOps(ops)); + auto hash = validate(); + ASSERT_OK(runOps(ops)); + ASSERT_EQUALS(hash, validate()); + + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY); + auto status = runOps(ops); + ASSERT_EQ(status.code(), ErrorCodes::DuplicateKey); +} + +TEST_F(IdempotencyTest, ParallelArrayError) { + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING); + + ASSERT_OK(runOp(createCollection())); + ASSERT_OK(runOp(insert(fromjson("{_id: 1}")))); + + auto updateOp1 = update(1, fromjson("{$set: {x: [1, 2]}}")); + auto updateOp2 = update(1, fromjson("{$set: {x: 1}}")); + auto updateOp3 = update(1, fromjson("{$set: {y: [3, 4]}}")); + auto indexOp = buildIndex(fromjson("{x: 1, y: 1}")); + + auto ops = {updateOp1, updateOp2, updateOp3, indexOp}; + + ASSERT_OK(runOps(ops)); + auto hash = validate(); + ASSERT_OK(runOps(ops)); + ASSERT_EQUALS(hash, validate()); + + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY); + auto status = runOps(ops); + ASSERT_EQ(status.code(), ErrorCodes::CannotIndexParallelArrays); +} + +TEST_F(IdempotencyTest, IndexKeyTooLongError) { + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING); + + ASSERT_OK(runOp(createCollection())); + ASSERT_OK(runOp(insert(fromjson("{_id: 1}")))); + + // Key size limit is 1024 for ephemeral storage engine, so two 800 byte fields cannot + // co-exist. + std::string longStr(800, 'a'); + auto updateOp1 = update(1, BSON("$set" << BSON("x" << longStr))); + auto updateOp2 = update(1, fromjson("{$set: {x: 1}}")); + auto updateOp3 = update(1, BSON("$set" << BSON("y" << longStr))); + auto indexOp = buildIndex(fromjson("{x: 1, y: 1}")); + + auto ops = {updateOp1, updateOp2, updateOp3, indexOp}; + + ASSERT_OK(runOps(ops)); + auto hash = validate(); + ASSERT_OK(runOps(ops)); + ASSERT_EQUALS(hash, validate()); + + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY); + auto status = runOps(ops); + ASSERT_EQ(status.code(), ErrorCodes::KeyTooLong); +} + +TEST_F(IdempotencyTest, IndexWithDifferentOptions) { + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING); + + ASSERT_OK(runOp(createCollection())); + ASSERT_OK(runOp(insert(fromjson("{_id: 1, x: 'hi'}")))); + + auto indexOp1 = buildIndex(fromjson("{x: 'text'}"), fromjson("{default_language: 'spanish'}")); + auto dropIndexOp = dropIndex("x_index"); + auto indexOp2 = buildIndex(fromjson("{x: 'text'}"), fromjson("{default_language: 'english'}")); + + auto ops = {indexOp1, dropIndexOp, indexOp2}; + + ASSERT_OK(runOps(ops)); + auto hash = validate(); + ASSERT_OK(runOps(ops)); + ASSERT_EQUALS(hash, validate()); + + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY); + auto status = runOps(ops); + ASSERT_EQ(status.code(), ErrorCodes::IndexOptionsConflict); +} + +TEST_F(IdempotencyTest, ResyncOnRenameCollection) { + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING); + + auto cmd = BSON("renameCollection" << nss.ns() << "to" + << "test.bar" + << "stayTemp" + << false + << "dropTarget" + << false); + auto op = makeCommandOplogEntry(nextOpTime(), nss, cmd); + ASSERT_EQUALS(runOp(op), ErrorCodes::OplogOperationUnsupported); } } // namespace |