diff options
author | Benety Goh <benety@mongodb.com> | 2016-05-31 16:42:16 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-06-01 16:31:09 -0400 |
commit | 8d41cd981f778d3f9d4e3add11ee26bd81f4fb11 (patch) | |
tree | 78bb57e253efd26e6f30650135991af6cdc863e5 | |
parent | 504c299109fd72b3c9155d7bd4e5a41e800cd457 (diff) | |
download | mongo-8d41cd981f778d3f9d4e3add11ee26bd81f4fb11.tar.gz |
SERVER-24273 added unit tests for repl::multiApply
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 200 |
6 files changed, 237 insertions, 4 deletions
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 477b1538d3f..55e49f329e0 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -108,6 +108,12 @@ std::string OplogEntry::toString() const { return raw.toString(); } +OplogEntry OplogEntry::getOwned() const { + OplogEntry copy(*this); + copy.raw = copy.raw.getOwned(); + return copy; +} + std::ostream& operator<<(std::ostream& s, const OplogEntry& o) { return s << o.toString(); } diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 90f7bdbe8bc..b4861daaf1d 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -63,6 +63,11 @@ struct OplogEntry { StringData getCollectionName() const; std::string toString() const; + /** + * Returns a copy of this oplog entry with its own copy of "raw". + */ + OplogEntry getOwned() const; + BSONObj raw; // Owned. StringData ns = ""; diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp index 27bcd8500db..36dcb6ce847 100644 --- a/src/mongo/db/repl/storage_interface_mock.cpp +++ b/src/mongo/db/repl/storage_interface_mock.cpp @@ -68,8 +68,17 @@ void StorageInterfaceMock::setMinValid(OperationContext* txn, const BatchBoundar StatusWith<OpTime> StorageInterfaceMock::writeOpsToOplog( OperationContext* txn, const NamespaceString& nss, const MultiApplier::Operations& operations) { invariant(!operations.empty()); + stdx::lock_guard<stdx::mutex> lock(_operationsWrittenToOplogMutex); + for (const auto& oplogEntry : operations) { + _operationsWrittenToOplog.push_back(oplogEntry.getOwned()); + } return operations.back().getOpTime(); } +MultiApplier::Operations StorageInterfaceMock::getOperationsWrittenToOplog() const { + stdx::lock_guard<stdx::mutex> lock(_operationsWrittenToOplogMutex); + return _operationsWrittenToOplog; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 0c5705c4302..99ca0020e0b 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -57,12 +57,17 @@ public: const NamespaceString& nss, const MultiApplier::Operations& operations) override; + MultiApplier::Operations getOperationsWrittenToOplog() const; + private: mutable stdx::mutex _initialSyncFlagMutex; bool _initialSyncFlag = false; mutable stdx::mutex _minValidBoundariesMutex; BatchBoundaries _minValidBoundaries = {OpTime(), OpTime()}; + + mutable stdx::mutex _operationsWrittenToOplogMutex; + MultiApplier::Operations _operationsWrittenToOplog; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 30d8dcd6bcd..efa68be10a4 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -1168,7 +1168,21 @@ StatusWith<OpTime> multiApply(OperationContext* txn, OldThreadPool* workerPool, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn applyOperation) { - invariant(applyOperation); + if (!txn) { + return {ErrorCodes::BadValue, "invalid operation context"}; + } + + if (!workerPool) { + return {ErrorCodes::BadValue, "invalid worker pool"}; + } + + if (ops.empty()) { + return {ErrorCodes::EmptyArrayOperation, "no operations provided to multiApply"}; + } + + if (!applyOperation) { + return {ErrorCodes::BadValue, "invalid apply operation function"}; + } if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) { // Use a ThreadPool to prefetch all the operations in a batch. diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 9fee7c0958f..5a02addfd22 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -28,8 +28,11 @@ #include "mongo/platform/basic.h" +#include <algorithm> #include <memory> +#include <vector> +#include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" @@ -37,13 +40,20 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" +#include "mongo/db/db_raii.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/repl/sync_tail.h" +#include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/stdx/mutex.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/old_thread_pool.h" +#include "mongo/util/string_map.h" namespace { @@ -53,12 +63,12 @@ using namespace mongo::repl; class SyncTailTest : public ServiceContextMongoDTest { protected: void _testSyncApplyInsertDocument(LockMode expectedMode); - ServiceContext::UniqueOperationContext _txn; unsigned int _opsApplied; SyncTail::ApplyOperationInLockFn _applyOp; SyncTail::ApplyCommandInLockFn _applyCmd; SyncTail::IncrementOpsAppliedStatsFn _incOps; + StorageInterfaceMock* _storageInterface = nullptr; private: void setUp() override; @@ -70,7 +80,12 @@ void SyncTailTest::setUp() { ReplSettings replSettings; replSettings.setOplogSizeBytes(5 * 1024 * 1024); - setGlobalReplicationCoordinator(new ReplicationCoordinatorMock(replSettings)); + auto serviceContext = mongo::getGlobalServiceContext(); + ReplicationCoordinator::set(serviceContext, + stdx::make_unique<ReplicationCoordinatorMock>(replSettings)); + auto storageInterface = stdx::make_unique<StorageInterfaceMock>(); + _storageInterface = storageInterface.get(); + StorageInterface::set(serviceContext, std::move(storageInterface)); Client::initThreadIfNotAlready(); _txn = cc().makeOperationContext(); @@ -91,7 +106,67 @@ void SyncTailTest::tearDown() { invariant(mongo::dbHolder().closeAll(_txn.get(), unused, false)); } _txn.reset(); - setGlobalReplicationCoordinator(nullptr); + _storageInterface = nullptr; +} + +/** + * Creates collection options suitable for oplog. + */ +CollectionOptions createOplogCollectionOptions() { + CollectionOptions options; + options.capped = true; + options.cappedSize = 64 * 1024 * 1024LL; + options.autoIndexId = CollectionOptions::NO; + return options; +} + +/** + * Create test collection. + * Returns collection. + */ +void createCollection(OperationContext* txn, + const NamespaceString& nss, + const CollectionOptions& options) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dblk(txn->lockState(), nss.db(), MODE_X); + OldClientContext ctx(txn, nss.ns()); + auto db = ctx.db(); + ASSERT_TRUE(db); + mongo::WriteUnitOfWork wuow(txn); + auto coll = db->createCollection(txn, nss.ns(), options); + ASSERT_TRUE(coll); + wuow.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", nss.ns()); +} + +/** + * Creates a create collection oplog entry with given optime. + */ +OplogEntry makeCreateCollectionOplogEntry(OpTime opTime, + const NamespaceString& nss = NamespaceString("test.t")) { + BSONObjBuilder bob; + bob.appendElements(opTime.toBSON()); + bob.append("h", 1LL); + bob.append("op", "c"); + bob.append("ns", nss.ns()); + return OplogEntry(bob.obj()); +} + +/** + * Creates an insert oplog entry with given optime and namespace. + */ +OplogEntry makeInsertDocumentOplogEntry(OpTime opTime, + const NamespaceString& nss, + const BSONObj& documentToInsert) { + BSONObjBuilder bob; + bob.appendElements(opTime.toBSON()); + bob.append("h", 1LL); + bob.append("op", "i"); + bob.append("ns", nss.ns()); + bob.append("o", documentToInsert); + return OplogEntry(bob.obj()); } TEST_F(SyncTailTest, SyncApplyNoNamespaceBadOp) { @@ -328,4 +403,123 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) { ASSERT_EQUALS(1U, _opsApplied); } +TEST_F(SyncTailTest, MultiApplyReturnsBadValueOnNullOperationContext) { + auto writerPool = SyncTail::makeWriterPool(); + auto applyOperationFn = [](const MultiApplier::Operations&) {}; + auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}); + auto status = multiApply(nullptr, writerPool.get(), {op}, applyOperationFn).getStatus(); + ASSERT_EQUALS(ErrorCodes::BadValue, status); + ASSERT_STRING_CONTAINS(status.reason(), "invalid operation context"); +} + +TEST_F(SyncTailTest, MultiApplyReturnsBadValueOnNullWriterPool) { + auto applyOperationFn = [](const MultiApplier::Operations&) {}; + auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}); + auto status = multiApply(_txn.get(), nullptr, {op}, applyOperationFn).getStatus(); + ASSERT_EQUALS(ErrorCodes::BadValue, status); + ASSERT_STRING_CONTAINS(status.reason(), "invalid worker pool"); +} + +TEST_F(SyncTailTest, MultiApplyReturnsEmptyArrayOperationWhenNoOperationsAreGiven) { + auto writerPool = SyncTail::makeWriterPool(); + auto applyOperationFn = [](const MultiApplier::Operations&) {}; + auto status = multiApply(_txn.get(), writerPool.get(), {}, applyOperationFn).getStatus(); + ASSERT_EQUALS(ErrorCodes::EmptyArrayOperation, status); + ASSERT_STRING_CONTAINS(status.reason(), "no operations provided to multiApply"); +} + +TEST_F(SyncTailTest, MultiApplyReturnsBadValueOnNullApplyOperation) { + auto writerPool = SyncTail::makeWriterPool(); + MultiApplier::ApplyOperationFn nullApplyOperationFn; + auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}); + auto status = multiApply(_txn.get(), writerPool.get(), {op}, nullApplyOperationFn).getStatus(); + ASSERT_EQUALS(ErrorCodes::BadValue, status); + ASSERT_STRING_CONTAINS(status.reason(), "invalid apply operation function"); +} + +bool _testOplogEntryIsForCappedCollection(OperationContext* txn, + const NamespaceString& nss, + const CollectionOptions& options) { + auto writerPool = SyncTail::makeWriterPool(); + MultiApplier::Operations operationsApplied; + auto applyOperationFn = + [&operationsApplied](const MultiApplier::Operations& operationsToApply) { + operationsApplied = operationsToApply; + }; + createCollection(txn, nss, options); + + auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1)); + ASSERT_FALSE(op.isForCappedCollection); + + auto lastOpTime = + unittest::assertGet(multiApply(txn, writerPool.get(), {op}, applyOperationFn)); + ASSERT_EQUALS(op.getOpTime(), lastOpTime); + + ASSERT_EQUALS(1U, operationsApplied.size()); + const auto& opApplied = operationsApplied.front(); + ASSERT_EQUALS(op, opApplied); + // "isForCappedCollection" is not parsed from raw oplog entry document. + return opApplied.isForCappedCollection; +} + +TEST_F( + SyncTailTest, + MultiApplyDoesNotSetOplogEntryIsForCappedCollectionWhenProcessingNonCappedCollectionInsertOperation) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + ASSERT_FALSE(_testOplogEntryIsForCappedCollection(_txn.get(), nss, CollectionOptions())); +} + +TEST_F(SyncTailTest, + MultiApplySetsOplogEntryIsForCappedCollectionWhenProcessingCappedCollectionInsertOperation) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + ASSERT_TRUE( + _testOplogEntryIsForCappedCollection(_txn.get(), nss, createOplogCollectionOptions())); +} + +TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceHash) { + NamespaceString nss1("test.t0"); + NamespaceString nss2("test.t1"); + OldThreadPool writerPool(2); + + // Ensure that namespaces are hashed to different threads in pool. + ASSERT_EQUALS(0U, StringMapTraits::hash(nss1.ns()) % writerPool.getNumThreads()); + ASSERT_EQUALS(1U, StringMapTraits::hash(nss2.ns()) % writerPool.getNumThreads()); + + stdx::mutex mutex; + std::vector<MultiApplier::Operations> operationsApplied; + auto applyOperationFn = [&mutex, &operationsApplied]( + const MultiApplier::Operations& operationsForWriterThreadToApply) { + stdx::lock_guard<stdx::mutex> lock(mutex); + operationsApplied.push_back(operationsForWriterThreadToApply); + }; + + auto op1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1); + auto op2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2); + + auto lastOpTime = + unittest::assertGet(multiApply(_txn.get(), &writerPool, {op1, op2}, applyOperationFn)); + ASSERT_EQUALS(op2.getOpTime(), lastOpTime); + + // Each writer thread should be given exactly one operation to apply. + std::vector<OpTime> seen; + { + stdx::lock_guard<stdx::mutex> lock(mutex); + ASSERT_EQUALS(writerPool.getNumThreads(), operationsApplied.size()); + for (auto&& operationsAppliedByThread : operationsApplied) { + ASSERT_EQUALS(1U, operationsAppliedByThread.size()); + const auto& oplogEntry = operationsAppliedByThread.front(); + ASSERT_TRUE(std::find(seen.cbegin(), seen.cend(), oplogEntry.getOpTime()) == + seen.cend()); + ASSERT_TRUE(oplogEntry == op1 || oplogEntry == op2); + seen.push_back(oplogEntry.getOpTime()); + } + } + + // Check ops in oplog. + auto operationsWritternToOplog = _storageInterface->getOperationsWrittenToOplog(); + ASSERT_EQUALS(2U, operationsWritternToOplog.size()); + ASSERT_EQUALS(op1, operationsWritternToOplog[0]); + ASSERT_EQUALS(op2, operationsWritternToOplog[1]); +} + } // namespace |