summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/sync_tail_test.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp295
1 files changed, 270 insertions, 25 deletions
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 891a389a3d9..1ce29f5d51f 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -33,6 +33,7 @@
#include <utility>
#include <vector>
+#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
@@ -43,6 +44,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
@@ -56,6 +58,7 @@
#include "mongo/stdx/mutex.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/concurrency/old_thread_pool.h"
+#include "mongo/util/md5.hpp"
#include "mongo/util/string_map.h"
namespace {
@@ -78,7 +81,6 @@ protected:
return Status::OK();
}
-private:
void setUp() override;
void tearDown() override;
};
@@ -108,6 +110,7 @@ void SyncTailTest::setUp() {
ServiceContextMongoDTest::setUp();
ReplSettings replSettings;
replSettings.setOplogSizeBytes(5 * 1024 * 1024);
+ replSettings.setReplSetString("repl");
auto serviceContext = getServiceContext();
ReplicationCoordinator::set(serviceContext,
@@ -188,20 +191,30 @@ void createCollection(OperationContext* txn,
}
/**
- * Creates a create collection oplog entry with given optime.
+ * Creates a command oplog entry with given optime and namespace.
*/
-OplogEntry makeCreateCollectionOplogEntry(OpTime opTime,
- const NamespaceString& nss = NamespaceString("test.t")) {
+OplogEntry makeCommandOplogEntry(OpTime opTime,
+ const NamespaceString& nss,
+ const BSONObj& command) {
BSONObjBuilder bob;
bob.appendElements(opTime.toBSON());
bob.append("h", 1LL);
+ bob.append("v", 2);
bob.append("op", "c");
bob.append("ns", nss.getCommandNS());
- bob.append("o", BSON("create" << nss.coll()));
+ bob.append("o", command);
return OplogEntry(bob.obj());
}
/**
+ * Creates a create collection oplog entry with given optime.
+ */
+OplogEntry makeCreateCollectionOplogEntry(OpTime opTime,
+ const NamespaceString& nss = NamespaceString("test.t")) {
+ return makeCommandOplogEntry(opTime, nss, BSON("create" << nss.coll()));
+}
+
+/**
* Creates an insert oplog entry with given optime and namespace.
*/
OplogEntry makeInsertDocumentOplogEntry(OpTime opTime,
@@ -921,30 +934,262 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughShouldSyncTailRetryError)
ASSERT_EQUALS(fetchCount.load(), 1U);
}
-TEST_F(SyncTailTest, MultiInitialSyncApplyFailsOnRenameCollection) {
+class IdempotencyTest : public SyncTailTest {
+protected:
+ OplogEntry createCollection();
+ OplogEntry insert(const BSONObj& obj);
+ OplogEntry update(int _id, const BSONObj& obj);
+ OplogEntry buildIndex(const BSONObj& indexSpec, const BSONObj& options = BSONObj());
+ OplogEntry dropIndex(const std::string& indexName);
+ OpTime nextOpTime() {
+ static long long lastSecond = 1;
+ return OpTime(Timestamp(Seconds(lastSecond++), 0), 1LL);
+ }
+ Status runOp(const OplogEntry& entry);
+ Status runOps(std::initializer_list<OplogEntry> ops);
+ // Validate data and indexes. Return the MD5 hash of the documents ordered by _id.
+ std::string validate();
+
+ NamespaceString nss{"test.foo"};
+ NamespaceString nssIndex{"test.system.indexes"};
+};
+
+Status IdempotencyTest::runOp(const OplogEntry& op) {
+ return runOps({op});
+}
+
+Status IdempotencyTest::runOps(std::initializer_list<OplogEntry> ops) {
SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr);
+ MultiApplier::OperationPtrs opsPtrs;
+ for (auto& op : ops) {
+ opsPtrs.push_back(&op);
+ }
+ AtomicUInt32 fetchCount(0);
+ return multiInitialSyncApply_noAbort(_txn.get(), &opsPtrs, &syncTail, &fetchCount);
+}
+OplogEntry IdempotencyTest::createCollection() {
+ return makeCreateCollectionOplogEntry(nextOpTime(), nss);
+}
+
+OplogEntry IdempotencyTest::insert(const BSONObj& obj) {
+ return makeInsertDocumentOplogEntry(nextOpTime(), nss, obj);
+}
+
+OplogEntry IdempotencyTest::update(int id, const BSONObj& obj) {
+ return makeUpdateDocumentOplogEntry(nextOpTime(), nss, BSON("_id" << id), obj);
+}
+
+OplogEntry IdempotencyTest::buildIndex(const BSONObj& indexSpec, const BSONObj& options) {
BSONObjBuilder bob;
- bob.appendElements(OpTime(Timestamp(1, 0), 1LL).toBSON());
- bob.append("h", 1LL);
- bob.append("op", "c");
- bob.append("ns", "test.$cmd");
- bob.append("o",
- BSON("renameCollection"
- << "test.foo"
- << "to"
- << "test.bar"
- << "stayTemp"
- << false
- << "dropTarget"
- << false));
- auto op = OplogEntry(bob.obj());
+ bob.append("v", 2);
+ bob.append("key", indexSpec);
+ bob.append("name", std::string(indexSpec.firstElementFieldName()) + "_index");
+ bob.append("ns", nss.ns());
+ bob.appendElementsUnique(options);
+ return makeInsertDocumentOplogEntry(nextOpTime(), nssIndex, bob.obj());
+}
- MultiApplier::OperationPtrs ops = {&op};
- AtomicUInt32 fetchCount(0);
- ASSERT_EQUALS(ErrorCodes::OplogOperationUnsupported,
- multiInitialSyncApply_noAbort(_txn.get(), &ops, &syncTail, &fetchCount));
- ASSERT_EQUALS(fetchCount.load(), 0U);
+OplogEntry IdempotencyTest::dropIndex(const std::string& indexName) {
+ auto cmd = BSON("deleteIndexes" << nss.coll() << "index" << indexName);
+ return makeCommandOplogEntry(nextOpTime(), nss, cmd);
+}
+
+std::string IdempotencyTest::validate() {
+ auto collection = AutoGetCollectionForRead(_txn.get(), nss).getCollection();
+ ValidateResults validateResults;
+ BSONObjBuilder bob;
+
+ Lock::DBLock lk(_txn->lockState(), nss.db(), MODE_IS);
+ Lock::CollectionLock lock(_txn->lockState(), nss.ns(), MODE_IS);
+ ASSERT_OK(collection->validate(_txn.get(), kValidateFull, &validateResults, &bob));
+ ASSERT_TRUE(validateResults.valid);
+
+ IndexDescriptor* desc = collection->getIndexCatalog()->findIdIndex(_txn.get());
+ ASSERT_TRUE(desc);
+ auto exec = InternalPlanner::indexScan(_txn.get(),
+ collection,
+ desc,
+ BSONObj(),
+ BSONObj(),
+ BoundInclusion::kIncludeStartKeyOnly,
+ PlanExecutor::YIELD_MANUAL,
+ InternalPlanner::FORWARD,
+ InternalPlanner::IXSCAN_FETCH);
+ ASSERT(NULL != exec.get());
+ md5_state_t st;
+ md5_init(&st);
+
+ PlanExecutor::ExecState state;
+ BSONObj c;
+ while (PlanExecutor::ADVANCED == (state = exec->getNext(&c, NULL))) {
+ md5_append(&st, (const md5_byte_t*)c.objdata(), c.objsize());
+ }
+ ASSERT_EQUALS(PlanExecutor::IS_EOF, state);
+ md5digest d;
+ md5_finish(&st, d);
+ return digestToString(d);
+}
+
+TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnUpdate) {
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING);
+ ASSERT_OK(runOp(createCollection()));
+ auto insertOp = insert(fromjson("{_id: 1, loc: 'hi'}"));
+ auto updateOp = update(1, fromjson("{$set: {loc: [1, 2]}}"));
+ auto indexOp = buildIndex(fromjson("{loc: '2dsphere'}"), BSON("2dsphereIndexVersion" << 3));
+
+ auto ops = {insertOp, updateOp, indexOp};
+
+ ASSERT_OK(runOps(ops));
+ auto hash = validate();
+ ASSERT_OK(runOps(ops));
+ ASSERT_EQUALS(hash, validate());
+
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY);
+ auto status = runOps(ops);
+ ASSERT_EQ(status.code(), 16755);
+}
+
+TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnIndexing) {
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING);
+ ASSERT_OK(runOp(createCollection()));
+ auto indexOp = buildIndex(fromjson("{loc: '2dsphere'}"), BSON("2dsphereIndexVersion" << 3));
+ auto dropIndexOp = dropIndex("loc_index");
+ auto insertOp = insert(fromjson("{_id: 1, loc: 'hi'}"));
+
+ auto ops = {indexOp, dropIndexOp, insertOp};
+
+ ASSERT_OK(runOps(ops));
+ auto hash = validate();
+ ASSERT_OK(runOps(ops));
+ ASSERT_EQUALS(hash, validate());
+
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY);
+ auto status = runOps(ops);
+ ASSERT_EQ(status.code(), 16755);
+}
+
+TEST_F(IdempotencyTest, Geo2dIndex) {
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING);
+ ASSERT_OK(runOp(createCollection()));
+ auto insertOp = insert(fromjson("{_id: 1, loc: [1]}"));
+ auto updateOp = update(1, fromjson("{$set: {loc: [1, 2]}}"));
+ auto indexOp = buildIndex(fromjson("{loc: '2d'}"));
+
+ auto ops = {insertOp, updateOp, indexOp};
+
+ ASSERT_OK(runOps(ops));
+ auto hash = validate();
+ ASSERT_OK(runOps(ops));
+ ASSERT_EQUALS(hash, validate());
+
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY);
+ auto status = runOps(ops);
+ ASSERT_EQ(status.code(), 13068);
+}
+
+TEST_F(IdempotencyTest, UniqueKeyIndex) {
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING);
+ ASSERT_OK(runOp(createCollection()));
+ auto insertOp = insert(fromjson("{_id: 1, x: 5}"));
+ auto updateOp = update(1, fromjson("{$set: {x: 6}}"));
+ auto insertOp2 = insert(fromjson("{_id: 2, x: 5}"));
+ auto indexOp = buildIndex(fromjson("{x: 1}"), fromjson("{unique: true}"));
+
+ auto ops = {insertOp, updateOp, insertOp2, indexOp};
+
+ ASSERT_OK(runOps(ops));
+ auto hash = validate();
+ ASSERT_OK(runOps(ops));
+ ASSERT_EQUALS(hash, validate());
+
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY);
+ auto status = runOps(ops);
+ ASSERT_EQ(status.code(), ErrorCodes::DuplicateKey);
+}
+
+TEST_F(IdempotencyTest, ParallelArrayError) {
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING);
+
+ ASSERT_OK(runOp(createCollection()));
+ ASSERT_OK(runOp(insert(fromjson("{_id: 1}"))));
+
+ auto updateOp1 = update(1, fromjson("{$set: {x: [1, 2]}}"));
+ auto updateOp2 = update(1, fromjson("{$set: {x: 1}}"));
+ auto updateOp3 = update(1, fromjson("{$set: {y: [3, 4]}}"));
+ auto indexOp = buildIndex(fromjson("{x: 1, y: 1}"));
+
+ auto ops = {updateOp1, updateOp2, updateOp3, indexOp};
+
+ ASSERT_OK(runOps(ops));
+ auto hash = validate();
+ ASSERT_OK(runOps(ops));
+ ASSERT_EQUALS(hash, validate());
+
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY);
+ auto status = runOps(ops);
+ ASSERT_EQ(status.code(), ErrorCodes::CannotIndexParallelArrays);
+}
+
+TEST_F(IdempotencyTest, IndexKeyTooLongError) {
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING);
+
+ ASSERT_OK(runOp(createCollection()));
+ ASSERT_OK(runOp(insert(fromjson("{_id: 1}"))));
+
+ // Key size limit is 1024 for ephemeral storage engine, so two 800 byte fields cannot
+ // co-exist.
+ std::string longStr(800, 'a');
+ auto updateOp1 = update(1, BSON("$set" << BSON("x" << longStr)));
+ auto updateOp2 = update(1, fromjson("{$set: {x: 1}}"));
+ auto updateOp3 = update(1, BSON("$set" << BSON("y" << longStr)));
+ auto indexOp = buildIndex(fromjson("{x: 1, y: 1}"));
+
+ auto ops = {updateOp1, updateOp2, updateOp3, indexOp};
+
+ ASSERT_OK(runOps(ops));
+ auto hash = validate();
+ ASSERT_OK(runOps(ops));
+ ASSERT_EQUALS(hash, validate());
+
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY);
+ auto status = runOps(ops);
+ ASSERT_EQ(status.code(), ErrorCodes::KeyTooLong);
+}
+
+TEST_F(IdempotencyTest, IndexWithDifferentOptions) {
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING);
+
+ ASSERT_OK(runOp(createCollection()));
+ ASSERT_OK(runOp(insert(fromjson("{_id: 1, x: 'hi'}"))));
+
+ auto indexOp1 = buildIndex(fromjson("{x: 'text'}"), fromjson("{default_language: 'spanish'}"));
+ auto dropIndexOp = dropIndex("x_index");
+ auto indexOp2 = buildIndex(fromjson("{x: 'text'}"), fromjson("{default_language: 'english'}"));
+
+ auto ops = {indexOp1, dropIndexOp, indexOp2};
+
+ ASSERT_OK(runOps(ops));
+ auto hash = validate();
+ ASSERT_OK(runOps(ops));
+ ASSERT_EQUALS(hash, validate());
+
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_PRIMARY);
+ auto status = runOps(ops);
+ ASSERT_EQ(status.code(), ErrorCodes::IndexOptionsConflict);
+}
+
+TEST_F(IdempotencyTest, ResyncOnRenameCollection) {
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING);
+
+ auto cmd = BSON("renameCollection" << nss.ns() << "to"
+ << "test.bar"
+ << "stayTemp"
+ << false
+ << "dropTarget"
+ << false);
+ auto op = makeCommandOplogEntry(nextOpTime(), nss, cmd);
+ ASSERT_EQUALS(runOp(op), ErrorCodes::OplogOperationUnsupported);
}
} // namespace