diff options
-rw-r--r-- | jstests/replsets/libs/secondary_reads_test.js | 83 | ||||
-rw-r--r-- | jstests/replsets/secondary_reads_timestamp_visibility.js | 83 | ||||
-rw-r--r-- | jstests/replsets/secondary_reads_unique_indexes.js | 67 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp | 105 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 103 |
6 files changed, 400 insertions, 57 deletions
diff --git a/jstests/replsets/libs/secondary_reads_test.js b/jstests/replsets/libs/secondary_reads_test.js index 7b1f97e73a6..60816f3aa49 100644 --- a/jstests/replsets/libs/secondary_reads_test.js +++ b/jstests/replsets/libs/secondary_reads_test.js @@ -13,8 +13,12 @@ function SecondaryReadsTest(name = "secondary_reads_test", replSet) { let primary = rst.getPrimary(); let primaryDB = primary.getDB(dbName); let secondary = rst.getSecondary(); - let readers = []; + let secondaryDB = secondary.getDB(dbName); + secondaryDB.getMongo().setSlaveOk(); + let readers = []; + let signalColl = "signalColl"; + const testDoneId = "testDone"; /** * Return an instance of ReplSetTest initialized with a standard * two-node replica set running with the latest version. @@ -31,31 +35,74 @@ function SecondaryReadsTest(name = "secondary_reads_test", replSet) { return replSet; } - this.startSecondaryReaders = function(nReaders, cmd) { - let readCmd = `db.getMongo().setSlaveOk(); - db.getMongo().setReadPref("secondaryPreferred"); - db = db.getSiblingDB("${dbName}"); - ${cmd}`; + this.startSecondaryReaders = function(nReaders, readFn) { + + let read = function() { + db.getMongo().setSlaveOk(); + db = db.getSiblingDB(TestData.dbName); + while (true) { + eval(TestData.readFn); + let signalDoc = db.getCollection(TestData.signalColl) + .find({_id: TestData.testDoneId}) + .itcount(); + if (signalDoc != 0) { + print("signal doc found. quitting..."); + quit(); + } + } + }; + + TestData.dbName = dbName; + TestData.readFn = "(" + readFn.toString() + ")();"; + TestData.signalColl = signalColl; + TestData.testDoneId = testDoneId; for (let i = 0; i < nReaders; i++) { - readers.push( - startMongoProgramNoConnect("mongo", "--port", secondary.port, "--eval", readCmd)); + readers.push(startParallelShell(read, secondary.port)); print("reader " + readers.length + " started"); } }; - this.doOnPrimary = function(writeFn) { - let db = primary.getDB(dbName); - writeFn(db); + let failPoint = "pauseBatchApplicationBeforeCompletion"; + + // This returns a function that should be called once after performing a replicated write on a + // primary. The write will start a batch on a secondary and immediately pause before completion. + // The returned function will return once the batch has reached the point where it has applied + // but not updated the last applied optime. + this.pauseSecondaryBatchApplication = function() { + + clearRawMongoProgramOutput(); + + assert.commandWorked( + secondaryDB.adminCommand({configureFailPoint: failPoint, mode: "alwaysOn"})); + + return function() { + assert.soon(function() { + return rawMongoProgramOutput().match(failPoint + " fail point enabled"); + }); + }; + }; + + this.resumeSecondaryBatchApplication = function() { + assert.commandWorked( + secondaryDB.adminCommand({configureFailPoint: failPoint, mode: "off"})); + }; + + this.getPrimaryDB = function() { + return primaryDB; + }; + + this.getSecondaryDB = function() { + return secondaryDB; }; this.stopReaders = function() { + print("signaling readers to stop..."); + assert.gt(readers.length, 0, "no readers to stop"); + assert.writeOK(primaryDB.getCollection(signalColl).insert({_id: testDoneId})); for (let i = 0; i < readers.length; i++) { - const pid = readers[i]; - const ec = stopMongoProgramByPid(pid); - const expect = _isWindows() ? 1 : -15; - assert.eq( - ec, expect, "Expected mongo shell to exit with code " + expect + ". PID: " + pid); + const await = readers[i]; + await(); print("reader " + i + " done"); } readers = []; @@ -66,7 +113,9 @@ function SecondaryReadsTest(name = "secondary_reads_test", replSet) { }; this.stop = function() { - this.stopReaders(); + if (readers.length > 0) { + this.stopReaders(); + } rst.stopSet(); }; } diff --git a/jstests/replsets/secondary_reads_timestamp_visibility.js b/jstests/replsets/secondary_reads_timestamp_visibility.js new file mode 100644 index 00000000000..47b7163d7f5 --- /dev/null +++ b/jstests/replsets/secondary_reads_timestamp_visibility.js @@ -0,0 +1,83 @@ +/** + * Tests that reads on a secondary during batch application only see changes that occur at the last + * applied timestamp, whichis advanced at the completion of each batch. + * + * This test uses a failpoint to block right before batch application finishes, while holding the + * PBWM lock, and before advancing the last applied timestamp for readers. + * + */ +(function() { + "use strict"; + + load('jstests/replsets/libs/secondary_reads_test.js'); + + const name = "secondaryReadsTimestampVisibility"; + const collName = "testColl"; + let secondaryReadsTest = new SecondaryReadsTest(name); + let replSet = secondaryReadsTest.getReplset(); + + let primaryDB = secondaryReadsTest.getPrimaryDB(); + let secondaryDB = secondaryReadsTest.getSecondaryDB(); + + let primaryColl = primaryDB.getCollection(collName); + + // Create a collection and an index. Insert some data. + primaryDB.runCommand({drop: collName}); + assert.commandWorked(primaryDB.runCommand({create: collName})); + assert.commandWorked(primaryDB.runCommand( + {createIndexes: collName, indexes: [{key: {y: 1}, name: "y_1", unique: true}]})); + for (let i = 0; i < 100; i++) { + assert.commandWorked(primaryColl.insert({_id: i, x: 0, y: i + 1})); + } + + replSet.awaitReplication(); + + // Sanity check. + assert.eq(secondaryDB.getCollection(collName).find({x: 0}).itcount(), 100); + assert.eq(secondaryDB.getCollection(collName).find({y: {$gte: 1, $lt: 101}}).itcount(), 100); + + // Prevent a batch from completing on the secondary. + let pauseAwait = secondaryReadsTest.pauseSecondaryBatchApplication(); + + // Update x to 1 in each document with default writeConcern and make sure we see the correct + // data on the primary. + let updates = []; + for (let i = 0; i < 100; i++) { + updates[i] = {q: {_id: i}, u: {x: 1, y: i}}; + } + assert.commandWorked(primaryDB.runCommand({update: collName, updates: updates})); + assert.eq(primaryColl.find({x: 1}).itcount(), 100); + assert.eq(primaryColl.find({y: {$gte: 0, $lt: 100}}).itcount(), 100); + + // Wait for the batch application to pause. + pauseAwait(); + + let levels = ["local", "available", "majority"]; + + // We should see the previous, un-replicated state on the secondary with every readconcern. + for (let i in levels) { + assert.eq(secondaryDB.getCollection(collName).find({x: 0}).readConcern(levels[i]).itcount(), + 100); + assert.eq(secondaryDB.getCollection(collName).find({x: 1}).readConcern(levels[i]).itcount(), + 0); + assert.eq(secondaryDB.getCollection(collName) + .find({y: {$gte: 1, $lt: 101}}) + .readConcern(levels[i]) + .itcount(), + 100); + } + + // Disable the failpoint and let the batch complete. + secondaryReadsTest.resumeSecondaryBatchApplication(); + + replSet.awaitReplication(); + + for (let i in levels) { + // We should see the previous state on the secondary with every readconcern. + assert.eq(secondaryDB.getCollection(collName).find({x: 0}).readConcern(levels[i]).itcount(), + 0); + assert.eq(secondaryDB.getCollection(collName).find({x: 1}).readConcern(levels[i]).itcount(), + 100); + } + secondaryReadsTest.stop(); +})(); diff --git a/jstests/replsets/secondary_reads_unique_indexes.js b/jstests/replsets/secondary_reads_unique_indexes.js index 6935ee8d421..78fceabd436 100644 --- a/jstests/replsets/secondary_reads_unique_indexes.js +++ b/jstests/replsets/secondary_reads_unique_indexes.js @@ -35,18 +35,19 @@ const collName = "testColl"; let secondaryReadsTest = new SecondaryReadsTest(name); + let primaryDB = secondaryReadsTest.getPrimaryDB(); + let secondaryDB = secondaryReadsTest.getSecondaryDB(); + // Setup collection. - secondaryReadsTest.doOnPrimary(function(db) { - db.runCommand({drop: collName}); - assert.commandWorked(db.runCommand({create: collName})); + primaryDB.runCommand({drop: collName}); + assert.commandWorked(primaryDB.runCommand({create: collName})); - // Create a unique index on the collection in the foreground. - assert.commandWorked(db.runCommand( - {createIndexes: collName, indexes: [{key: {x: 1}, name: "x_1", unique: true}]})); - }); + // Create a unique index on the collection in the foreground. + assert.commandWorked(primaryDB.runCommand( + {createIndexes: collName, indexes: [{key: {x: 1}, name: "x_1", unique: true}]})); - let rst = secondaryReadsTest.getReplset(); - rst.awaitReplication(); + let replSet = secondaryReadsTest.getReplset(); + replSet.awaitReplication(); // We want to do updates with at least as many different documents as there are parallel batch // writer threads (16). Each iteration increments and decrements a uniquely indexed value, 'x'. @@ -58,28 +59,28 @@ // Do a bunch of reads using the 'x' index on the secondary. // No errors should be encountered on the secondary. - let readCmd = `while (true) { - for (let x = 0; x < ${nOps}; x++) { - assert.commandWorked(db.runCommand({ - find: "${collName}", - filter: {x: x}, - projection: {x: 1}, - readConcern: {level: "local"}, - })); - } - }`; - secondaryReadsTest.startSecondaryReaders(nReaders, readCmd); - - // Write the initial documents. Ensure they have been replicated. - secondaryReadsTest.doOnPrimary(function(db) { - for (let i = 0; i < nOps; i++) { + let readFn = function() { + for (let x = 0; x < TestData.nOps; x++) { assert.commandWorked(db.runCommand({ - insert: collName, - documents: [{_id: i, x: i, iter: 0}], - writeConcern: {w: "majority"} + find: TestData.collName, + filter: {x: x}, + projection: {x: 1}, + readConcern: {level: "local"}, })); } - }); + }; + TestData.nOps = nOps; + TestData.collName = collName; + secondaryReadsTest.startSecondaryReaders(nReaders, readFn); + + // Write the initial documents. Ensure they have been replicated. + for (let i = 0; i < nOps; i++) { + assert.commandWorked(primaryDB.runCommand({ + insert: collName, + documents: [{_id: i, x: i, iter: 0}], + writeConcern: {w: "majority"} + })); + } // Cycle the value of x in the document {_id: i, x: i} between i and i+1 each iteration. for (let iteration = 0; iteration < nIterations; iteration++) { @@ -89,9 +90,7 @@ updates[i] = {q: {_id: i}, u: {x: i, iter: iteration}}; } - secondaryReadsTest.doOnPrimary(function(db) { - assert.commandWorked(db.runCommand({update: collName, updates: updates})); - }); + assert.commandWorked(primaryDB.runCommand({update: collName, updates: updates})); updates = []; // Generate updates that increment x on each document backwards by _id to avoid conficts @@ -105,11 +104,9 @@ updates[i] = {q: {_id: end}, u: {x: nextX, iter: iteration}}; } print("iteration " + iteration); - secondaryReadsTest.doOnPrimary(function(db) { - assert.commandWorked(db.runCommand({update: collName, updates: updates})); - }); + assert.commandWorked(primaryDB.runCommand({update: collName, updates: updates})); } - rst.awaitReplication(); + replSet.awaitReplication(); secondaryReadsTest.stop(); })(); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 4a455f00e77..f26011121aa 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -92,6 +92,8 @@ AtomicInt32 SyncTail::replBatchLimitOperations{50 * 1000}; namespace { +MONGO_FP_DECLARE(pauseBatchApplicationBeforeCompletion); + /** * This variable determines the number of writer threads SyncTail will have. It can be overridden * using the "replWriterThreadCount" server parameter. @@ -1386,6 +1388,20 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O storageEngine->replicationBatchIsComplete(); } + // Use this fail point to hold the PBWM lock and prevent the batch from completing. + if (MONGO_FAIL_POINT(pauseBatchApplicationBeforeCompletion)) { + log() << "pauseBatchApplicationBeforeCompletion fail point enabled. Blocking until fail " + "point is disabled."; + while (MONGO_FAIL_POINT(pauseBatchApplicationBeforeCompletion)) { + if (inShutdown()) { + severe() << "Turn off pauseBatchApplicationBeforeCompletion before attempting " + "clean shutdown"; + fassertFailedNoTrace(50798); + } + sleepmillis(100); + } + } + Timestamp firstTimeInBatch = ops.front().getTimestamp(); // Set any indexes to multikey that this batch ignored. This must be done while holding the // parallel batch writer mutex. diff --git a/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp b/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp index 3f2329ae520..7fb1905c2b8 100644 --- a/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp +++ b/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp @@ -151,18 +151,29 @@ public: return itCountOn(op); } - boost::optional<Record> readRecordCommitted(RecordId id) { + int itCountLocal() { auto op = makeOperation(); op->recoveryUnit()->setReadConcernLevelAndReplicationMode( - repl::ReadConcernLevel::kMajorityReadConcern, - repl::ReplicationCoordinator::modeReplSet); - ASSERT_OK(op->recoveryUnit()->obtainMajorityCommittedSnapshot()); + repl::ReadConcernLevel::kLocalReadConcern, repl::ReplicationCoordinator::modeReplSet); + op->recoveryUnit()->setShouldReadAtLastAppliedTimestamp(true); + return itCountOn(op); + } + + boost::optional<Record> readRecordOn(OperationContext* op, RecordId id) { auto cursor = rs->getCursor(op); auto record = cursor->seekExact(id); if (record) record->data.makeOwned(); return record; } + boost::optional<Record> readRecordCommitted(RecordId id) { + auto op = makeOperation(); + op->recoveryUnit()->setReadConcernLevelAndReplicationMode( + repl::ReadConcernLevel::kMajorityReadConcern, + repl::ReplicationCoordinator::modeReplSet); + ASSERT_OK(op->recoveryUnit()->obtainMajorityCommittedSnapshot()); + return readRecordOn(op, id); + } std::string readStringCommitted(RecordId id) { auto record = readRecordCommitted(id); @@ -170,6 +181,20 @@ public: return std::string(record->data.data()); } + boost::optional<Record> readRecordLocal(RecordId id) { + auto op = makeOperation(); + op->recoveryUnit()->setReadConcernLevelAndReplicationMode( + repl::ReadConcernLevel::kLocalReadConcern, repl::ReplicationCoordinator::modeReplSet); + op->recoveryUnit()->setShouldReadAtLastAppliedTimestamp(true); + return readRecordOn(op, id); + } + + std::string readStringLocal(RecordId id) { + auto record = readRecordLocal(id); + ASSERT(record); + return std::string(record->data.data()); + } + void setUp() override { helper = KVHarnessHelper::create(); engine = helper->getEngine(); @@ -346,4 +371,76 @@ TEST_F(SnapshotManagerTests, UpdateAndDelete) { ASSERT(!readRecordCommitted(id)); } +TEST_F(SnapshotManagerTests, InsertAndReadOnLocalSnapshot) { + if (!snapshotManager) + return; // This test is only for engines that DO support SnapshotManagers. + + auto beforeInsert = fetchAndIncrementTimestamp(); + + auto id = insertRecordAndCommit(); + auto afterInsert = fetchAndIncrementTimestamp(); + + // Not reading on the last local timestamp returns the most recent data. + auto op = makeOperation(); + auto ru = op->recoveryUnit(); + ru->setShouldReadAtLastAppliedTimestamp(false); + ASSERT_EQ(itCountOn(op), 1); + ASSERT(readRecordOn(op, id)); + + deleteRecordAndCommit(id); + auto afterDelete = fetchAndIncrementTimestamp(); + + // Reading at the local snapshot timestamps returns data in order. + snapshotManager->setLocalSnapshot(beforeInsert); + ASSERT_EQ(itCountLocal(), 0); + ASSERT(!readRecordLocal(id)); + + snapshotManager->setLocalSnapshot(afterInsert); + ASSERT_EQ(itCountLocal(), 1); + ASSERT(readRecordLocal(id)); + + snapshotManager->setLocalSnapshot(afterDelete); + ASSERT_EQ(itCountLocal(), 0); + ASSERT(!readRecordLocal(id)); +} + +TEST_F(SnapshotManagerTests, UpdateAndDeleteOnLocalSnapshot) { + if (!snapshotManager) + return; // This test is only for engines that DO support SnapshotManagers. + + auto beforeInsert = fetchAndIncrementTimestamp(); + + auto id = insertRecordAndCommit("Aardvark"); + auto afterInsert = fetchAndIncrementTimestamp(); + + updateRecordAndCommit(id, "Blue spotted stingray"); + auto afterUpdate = fetchAndIncrementTimestamp(); + + // Not reading on the last local timestamp returns the most recent data. + auto op = makeOperation(); + auto ru = op->recoveryUnit(); + ru->setShouldReadAtLastAppliedTimestamp(false); + ASSERT_EQ(itCountOn(op), 1); + auto record = readRecordOn(op, id); + ASSERT_EQ(std::string(record->data.data()), "Blue spotted stingray"); + + deleteRecordAndCommit(id); + auto afterDelete = fetchAndIncrementTimestamp(); + + snapshotManager->setLocalSnapshot(beforeInsert); + ASSERT_EQ(itCountLocal(), 0); + ASSERT(!readRecordLocal(id)); + + snapshotManager->setLocalSnapshot(afterInsert); + ASSERT_EQ(itCountLocal(), 1); + ASSERT_EQ(readStringLocal(id), "Aardvark"); + + snapshotManager->setLocalSnapshot(afterUpdate); + ASSERT_EQ(itCountLocal(), 1); + ASSERT_EQ(readStringLocal(id), "Blue spotted stingray"); + + snapshotManager->setLocalSnapshot(afterDelete); + ASSERT_EQ(itCountLocal(), 0); + ASSERT(!readRecordLocal(id)); +} } // namespace mongo diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 4426fe47a1b..d2c31fb6e1a 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -50,9 +50,9 @@ #include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/apply_ops.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" +#include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" -#include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_consistency_markers_impl.h" @@ -67,6 +67,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/storage/kv/kv_storage_engine.h" #include "mongo/dbtests/dbtests.h" +#include "mongo/stdx/future.h" #include "mongo/unittest/unittest.h" #include "mongo/util/stacktrace.h" @@ -1806,6 +1807,105 @@ public: } }; +class SecondaryReadsDuringBatchApplicationAreAllowed : public StorageTimestampTest { +public: + void run() { + // Only run on 'wiredTiger'. No other storage engines to-date support timestamp writes. + if (mongo::storageGlobalParams.engine != "wiredTiger") { + return; + } + ASSERT( + _opCtx->getServiceContext()->getGlobalStorageEngine()->supportsReadConcernSnapshot()); + + NamespaceString ns("unittest.secondaryReadsDuringBatchApplicationAreAllowed"); + reset(ns); + UUID uuid = UUID::gen(); + { + AutoGetCollectionForRead autoColl(_opCtx, ns); + uuid = autoColl.getCollection()->uuid().get(); + ASSERT_EQ(itCount(autoColl.getCollection()), 0); + } + + // Returns true when the batch has started, meaning the applier is holding the PBWM lock. + // Will return false if the lock was not held. + Promise<bool> batchInProgressPromise; + // Attempt to read when in the middle of a batch. + stdx::packaged_task<bool()> task([&] { + Client::initThread(getThreadName()); + auto readOp = cc().makeOperationContext(); + + // Wait for the batch to start or fail. + if (!batchInProgressPromise.getFuture().get()) { + return false; + } + AutoGetCollectionForRead autoColl(readOp.get(), ns); + return !readOp->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode, + MODE_IS); + }); + auto taskFuture = task.get_future(); + stdx::thread taskThread{std::move(task)}; + + auto joinGuard = MakeGuard([&] { + batchInProgressPromise.emplaceValue(false); + taskThread.join(); + }); + + // This apply operation function will block until the reader has tried acquiring a + // collection lock. This returns BadValue statuses instead of asserting so that the worker + // threads can cleanly exit and this test case fails without crashing the entire suite. + auto applyOperationFn = [&](OperationContext* opCtx, + std::vector<const repl::OplogEntry*>* operationsToApply, + repl::SyncTail* st, + std::vector<MultikeyPathInfo>* pathInfo) -> Status { + if (!_opCtx->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode, + MODE_X)) { + return {ErrorCodes::BadValue, "Batch applied was not holding PBWM lock in MODE_X"}; + } + + // Insert the document. A reader without a PBWM lock should not see it yet. + auto status = repl::multiSyncApply(opCtx, operationsToApply, st, pathInfo); + if (!status.isOK()) { + return status; + } + + // Signals the reader to acquire a collection read lock. + batchInProgressPromise.emplaceValue(true); + + // Block while holding the PBWM lock until the reader is done. + if (!taskFuture.get()) { + return {ErrorCodes::BadValue, "Client was holding PBWM lock in MODE_IS"}; + } + return Status::OK(); + }; + + // Make a simple insert operation. + BSONObj doc0 = BSON("_id" << 0 << "a" << 0); + auto insertOp = repl::OplogEntry( + BSON("ts" << futureTs << "t" << 1LL << "h" << 0xBEEFBEEFLL << "v" << 2 << "op" + << "i" + << "ns" + << ns.ns() + << "ui" + << uuid + << "o" + << doc0)); + + // Apply the operation. + auto writerPool = repl::SyncTail::makeWriterPool(1); + repl::SyncTail syncTail(nullptr, applyOperationFn, writerPool.get()); + auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp})); + ASSERT_EQ(insertOp.getOpTime(), lastOpTime); + + joinGuard.Dismiss(); + taskThread.join(); + + // Read on the local snapshot to verify the document was inserted. + AutoGetCollectionForRead autoColl(_opCtx, ns); + assertDocumentAtTimestamp(autoColl.getCollection(), futureTs, doc0); + } +}; + + class AllStorageTimestampTests : public unittest::Suite { public: AllStorageTimestampTests() : unittest::Suite("StorageTimestampTests") {} @@ -1833,6 +1933,7 @@ public: // TimestampIndexBuilds<SimulatePrimary> add<TimestampIndexBuilds<false>>(); add<TimestampIndexBuilds<true>>(); + add<SecondaryReadsDuringBatchApplicationAreAllowed>(); } }; |