summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-05-31 16:42:16 -0400
committerBenety Goh <benety@mongodb.com>2016-06-01 16:31:09 -0400
commit8d41cd981f778d3f9d4e3add11ee26bd81f4fb11 (patch)
tree78bb57e253efd26e6f30650135991af6cdc863e5
parent504c299109fd72b3c9155d7bd4e5a41e800cd457 (diff)
downloadmongo-8d41cd981f778d3f9d4e3add11ee26bd81f4fb11.tar.gz
SERVER-24273 added unit tests for repl::multiApply
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp6
-rw-r--r--src/mongo/db/repl/oplog_entry.h5
-rw-r--r--src/mongo/db/repl/storage_interface_mock.cpp9
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h5
-rw-r--r--src/mongo/db/repl/sync_tail.cpp16
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp200
6 files changed, 237 insertions, 4 deletions
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 477b1538d3f..55e49f329e0 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -108,6 +108,12 @@ std::string OplogEntry::toString() const {
return raw.toString();
}
+OplogEntry OplogEntry::getOwned() const {
+ OplogEntry copy(*this);
+ copy.raw = copy.raw.getOwned();
+ return copy;
+}
+
std::ostream& operator<<(std::ostream& s, const OplogEntry& o) {
return s << o.toString();
}
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 90f7bdbe8bc..b4861daaf1d 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -63,6 +63,11 @@ struct OplogEntry {
StringData getCollectionName() const;
std::string toString() const;
+ /**
+ * Returns a copy of this oplog entry with its own copy of "raw".
+ */
+ OplogEntry getOwned() const;
+
BSONObj raw; // Owned.
StringData ns = "";
diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp
index 27bcd8500db..36dcb6ce847 100644
--- a/src/mongo/db/repl/storage_interface_mock.cpp
+++ b/src/mongo/db/repl/storage_interface_mock.cpp
@@ -68,8 +68,17 @@ void StorageInterfaceMock::setMinValid(OperationContext* txn, const BatchBoundar
StatusWith<OpTime> StorageInterfaceMock::writeOpsToOplog(
OperationContext* txn, const NamespaceString& nss, const MultiApplier::Operations& operations) {
invariant(!operations.empty());
+ stdx::lock_guard<stdx::mutex> lock(_operationsWrittenToOplogMutex);
+ for (const auto& oplogEntry : operations) {
+ _operationsWrittenToOplog.push_back(oplogEntry.getOwned());
+ }
return operations.back().getOpTime();
}
+MultiApplier::Operations StorageInterfaceMock::getOperationsWrittenToOplog() const {
+ stdx::lock_guard<stdx::mutex> lock(_operationsWrittenToOplogMutex);
+ return _operationsWrittenToOplog;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 0c5705c4302..99ca0020e0b 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -57,12 +57,17 @@ public:
const NamespaceString& nss,
const MultiApplier::Operations& operations) override;
+ MultiApplier::Operations getOperationsWrittenToOplog() const;
+
private:
mutable stdx::mutex _initialSyncFlagMutex;
bool _initialSyncFlag = false;
mutable stdx::mutex _minValidBoundariesMutex;
BatchBoundaries _minValidBoundaries = {OpTime(), OpTime()};
+
+ mutable stdx::mutex _operationsWrittenToOplogMutex;
+ MultiApplier::Operations _operationsWrittenToOplog;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 30d8dcd6bcd..efa68be10a4 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -1168,7 +1168,21 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
OldThreadPool* workerPool,
const MultiApplier::Operations& ops,
MultiApplier::ApplyOperationFn applyOperation) {
- invariant(applyOperation);
+ if (!txn) {
+ return {ErrorCodes::BadValue, "invalid operation context"};
+ }
+
+ if (!workerPool) {
+ return {ErrorCodes::BadValue, "invalid worker pool"};
+ }
+
+ if (ops.empty()) {
+ return {ErrorCodes::EmptyArrayOperation, "no operations provided to multiApply"};
+ }
+
+ if (!applyOperation) {
+ return {ErrorCodes::BadValue, "invalid apply operation function"};
+ }
if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) {
// Use a ThreadPool to prefetch all the operations in a batch.
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 9fee7c0958f..5a02addfd22 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -28,8 +28,11 @@
#include "mongo/platform/basic.h"
+#include <algorithm>
#include <memory>
+#include <vector>
+#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
@@ -37,13 +40,20 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/repl/sync_tail.h"
+#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/old_thread_pool.h"
+#include "mongo/util/string_map.h"
namespace {
@@ -53,12 +63,12 @@ using namespace mongo::repl;
class SyncTailTest : public ServiceContextMongoDTest {
protected:
void _testSyncApplyInsertDocument(LockMode expectedMode);
-
ServiceContext::UniqueOperationContext _txn;
unsigned int _opsApplied;
SyncTail::ApplyOperationInLockFn _applyOp;
SyncTail::ApplyCommandInLockFn _applyCmd;
SyncTail::IncrementOpsAppliedStatsFn _incOps;
+ StorageInterfaceMock* _storageInterface = nullptr;
private:
void setUp() override;
@@ -70,7 +80,12 @@ void SyncTailTest::setUp() {
ReplSettings replSettings;
replSettings.setOplogSizeBytes(5 * 1024 * 1024);
- setGlobalReplicationCoordinator(new ReplicationCoordinatorMock(replSettings));
+ auto serviceContext = mongo::getGlobalServiceContext();
+ ReplicationCoordinator::set(serviceContext,
+ stdx::make_unique<ReplicationCoordinatorMock>(replSettings));
+ auto storageInterface = stdx::make_unique<StorageInterfaceMock>();
+ _storageInterface = storageInterface.get();
+ StorageInterface::set(serviceContext, std::move(storageInterface));
Client::initThreadIfNotAlready();
_txn = cc().makeOperationContext();
@@ -91,7 +106,67 @@ void SyncTailTest::tearDown() {
invariant(mongo::dbHolder().closeAll(_txn.get(), unused, false));
}
_txn.reset();
- setGlobalReplicationCoordinator(nullptr);
+ _storageInterface = nullptr;
+}
+
+/**
+ * Creates collection options suitable for oplog.
+ */
+CollectionOptions createOplogCollectionOptions() {
+ CollectionOptions options;
+ options.capped = true;
+ options.cappedSize = 64 * 1024 * 1024LL;
+ options.autoIndexId = CollectionOptions::NO;
+ return options;
+}
+
+/**
+ * Create test collection.
+ * Returns collection.
+ */
+void createCollection(OperationContext* txn,
+ const NamespaceString& nss,
+ const CollectionOptions& options) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dblk(txn->lockState(), nss.db(), MODE_X);
+ OldClientContext ctx(txn, nss.ns());
+ auto db = ctx.db();
+ ASSERT_TRUE(db);
+ mongo::WriteUnitOfWork wuow(txn);
+ auto coll = db->createCollection(txn, nss.ns(), options);
+ ASSERT_TRUE(coll);
+ wuow.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", nss.ns());
+}
+
+/**
+ * Creates a create collection oplog entry with given optime.
+ */
+OplogEntry makeCreateCollectionOplogEntry(OpTime opTime,
+ const NamespaceString& nss = NamespaceString("test.t")) {
+ BSONObjBuilder bob;
+ bob.appendElements(opTime.toBSON());
+ bob.append("h", 1LL);
+ bob.append("op", "c");
+ bob.append("ns", nss.ns());
+ return OplogEntry(bob.obj());
+}
+
+/**
+ * Creates an insert oplog entry with given optime and namespace.
+ */
+OplogEntry makeInsertDocumentOplogEntry(OpTime opTime,
+ const NamespaceString& nss,
+ const BSONObj& documentToInsert) {
+ BSONObjBuilder bob;
+ bob.appendElements(opTime.toBSON());
+ bob.append("h", 1LL);
+ bob.append("op", "i");
+ bob.append("ns", nss.ns());
+ bob.append("o", documentToInsert);
+ return OplogEntry(bob.obj());
}
TEST_F(SyncTailTest, SyncApplyNoNamespaceBadOp) {
@@ -328,4 +403,123 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) {
ASSERT_EQUALS(1U, _opsApplied);
}
+TEST_F(SyncTailTest, MultiApplyReturnsBadValueOnNullOperationContext) {
+ auto writerPool = SyncTail::makeWriterPool();
+ auto applyOperationFn = [](const MultiApplier::Operations&) {};
+ auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL});
+ auto status = multiApply(nullptr, writerPool.get(), {op}, applyOperationFn).getStatus();
+ ASSERT_EQUALS(ErrorCodes::BadValue, status);
+ ASSERT_STRING_CONTAINS(status.reason(), "invalid operation context");
+}
+
+TEST_F(SyncTailTest, MultiApplyReturnsBadValueOnNullWriterPool) {
+ auto applyOperationFn = [](const MultiApplier::Operations&) {};
+ auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL});
+ auto status = multiApply(_txn.get(), nullptr, {op}, applyOperationFn).getStatus();
+ ASSERT_EQUALS(ErrorCodes::BadValue, status);
+ ASSERT_STRING_CONTAINS(status.reason(), "invalid worker pool");
+}
+
+TEST_F(SyncTailTest, MultiApplyReturnsEmptyArrayOperationWhenNoOperationsAreGiven) {
+ auto writerPool = SyncTail::makeWriterPool();
+ auto applyOperationFn = [](const MultiApplier::Operations&) {};
+ auto status = multiApply(_txn.get(), writerPool.get(), {}, applyOperationFn).getStatus();
+ ASSERT_EQUALS(ErrorCodes::EmptyArrayOperation, status);
+ ASSERT_STRING_CONTAINS(status.reason(), "no operations provided to multiApply");
+}
+
+TEST_F(SyncTailTest, MultiApplyReturnsBadValueOnNullApplyOperation) {
+ auto writerPool = SyncTail::makeWriterPool();
+ MultiApplier::ApplyOperationFn nullApplyOperationFn;
+ auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL});
+ auto status = multiApply(_txn.get(), writerPool.get(), {op}, nullApplyOperationFn).getStatus();
+ ASSERT_EQUALS(ErrorCodes::BadValue, status);
+ ASSERT_STRING_CONTAINS(status.reason(), "invalid apply operation function");
+}
+
+bool _testOplogEntryIsForCappedCollection(OperationContext* txn,
+ const NamespaceString& nss,
+ const CollectionOptions& options) {
+ auto writerPool = SyncTail::makeWriterPool();
+ MultiApplier::Operations operationsApplied;
+ auto applyOperationFn =
+ [&operationsApplied](const MultiApplier::Operations& operationsToApply) {
+ operationsApplied = operationsToApply;
+ };
+ createCollection(txn, nss, options);
+
+ auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1));
+ ASSERT_FALSE(op.isForCappedCollection);
+
+ auto lastOpTime =
+ unittest::assertGet(multiApply(txn, writerPool.get(), {op}, applyOperationFn));
+ ASSERT_EQUALS(op.getOpTime(), lastOpTime);
+
+ ASSERT_EQUALS(1U, operationsApplied.size());
+ const auto& opApplied = operationsApplied.front();
+ ASSERT_EQUALS(op, opApplied);
+ // "isForCappedCollection" is not parsed from raw oplog entry document.
+ return opApplied.isForCappedCollection;
+}
+
+TEST_F(
+ SyncTailTest,
+ MultiApplyDoesNotSetOplogEntryIsForCappedCollectionWhenProcessingNonCappedCollectionInsertOperation) {
+ NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ ASSERT_FALSE(_testOplogEntryIsForCappedCollection(_txn.get(), nss, CollectionOptions()));
+}
+
+TEST_F(SyncTailTest,
+ MultiApplySetsOplogEntryIsForCappedCollectionWhenProcessingCappedCollectionInsertOperation) {
+ NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ ASSERT_TRUE(
+ _testOplogEntryIsForCappedCollection(_txn.get(), nss, createOplogCollectionOptions()));
+}
+
+TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceHash) {
+ NamespaceString nss1("test.t0");
+ NamespaceString nss2("test.t1");
+ OldThreadPool writerPool(2);
+
+ // Ensure that namespaces are hashed to different threads in pool.
+ ASSERT_EQUALS(0U, StringMapTraits::hash(nss1.ns()) % writerPool.getNumThreads());
+ ASSERT_EQUALS(1U, StringMapTraits::hash(nss2.ns()) % writerPool.getNumThreads());
+
+ stdx::mutex mutex;
+ std::vector<MultiApplier::Operations> operationsApplied;
+ auto applyOperationFn = [&mutex, &operationsApplied](
+ const MultiApplier::Operations& operationsForWriterThreadToApply) {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ operationsApplied.push_back(operationsForWriterThreadToApply);
+ };
+
+ auto op1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1);
+ auto op2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2);
+
+ auto lastOpTime =
+ unittest::assertGet(multiApply(_txn.get(), &writerPool, {op1, op2}, applyOperationFn));
+ ASSERT_EQUALS(op2.getOpTime(), lastOpTime);
+
+ // Each writer thread should be given exactly one operation to apply.
+ std::vector<OpTime> seen;
+ {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ ASSERT_EQUALS(writerPool.getNumThreads(), operationsApplied.size());
+ for (auto&& operationsAppliedByThread : operationsApplied) {
+ ASSERT_EQUALS(1U, operationsAppliedByThread.size());
+ const auto& oplogEntry = operationsAppliedByThread.front();
+ ASSERT_TRUE(std::find(seen.cbegin(), seen.cend(), oplogEntry.getOpTime()) ==
+ seen.cend());
+ ASSERT_TRUE(oplogEntry == op1 || oplogEntry == op2);
+ seen.push_back(oplogEntry.getOpTime());
+ }
+ }
+
+ // Check ops in oplog.
+ auto operationsWritternToOplog = _storageInterface->getOperationsWrittenToOplog();
+ ASSERT_EQUALS(2U, operationsWritternToOplog.size());
+ ASSERT_EQUALS(op1, operationsWritternToOplog[0]);
+ ASSERT_EQUALS(op2, operationsWritternToOplog[1]);
+}
+
} // namespace