summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLouis Williams <louis.williams@mongodb.com>2018-04-10 15:13:59 -0400
committerLouis Williams <louis.williams@mongodb.com>2018-04-24 15:53:46 -0400
commite75fd5732ff4ecc269ab8a9cba601b79af460daa (patch)
tree8c13c041157e22fa5e76a40db3628725e53b48b4
parentb57eee5a295ede1fd67299dc9990c272c1f66ea3 (diff)
downloadmongo-e75fd5732ff4ecc269ab8a9cba601b79af460daa.tar.gz
SERVER-34385 Unit tests for secondary reads during oplog application
-rw-r--r--jstests/replsets/libs/secondary_reads_test.js83
-rw-r--r--jstests/replsets/secondary_reads_timestamp_visibility.js83
-rw-r--r--jstests/replsets/secondary_reads_unique_indexes.js67
-rw-r--r--src/mongo/db/repl/sync_tail.cpp16
-rw-r--r--src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp105
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp103
6 files changed, 400 insertions, 57 deletions
diff --git a/jstests/replsets/libs/secondary_reads_test.js b/jstests/replsets/libs/secondary_reads_test.js
index 7b1f97e73a6..60816f3aa49 100644
--- a/jstests/replsets/libs/secondary_reads_test.js
+++ b/jstests/replsets/libs/secondary_reads_test.js
@@ -13,8 +13,12 @@ function SecondaryReadsTest(name = "secondary_reads_test", replSet) {
let primary = rst.getPrimary();
let primaryDB = primary.getDB(dbName);
let secondary = rst.getSecondary();
- let readers = [];
+ let secondaryDB = secondary.getDB(dbName);
+ secondaryDB.getMongo().setSlaveOk();
+ let readers = [];
+ let signalColl = "signalColl";
+ const testDoneId = "testDone";
/**
* Return an instance of ReplSetTest initialized with a standard
* two-node replica set running with the latest version.
@@ -31,31 +35,74 @@ function SecondaryReadsTest(name = "secondary_reads_test", replSet) {
return replSet;
}
- this.startSecondaryReaders = function(nReaders, cmd) {
- let readCmd = `db.getMongo().setSlaveOk();
- db.getMongo().setReadPref("secondaryPreferred");
- db = db.getSiblingDB("${dbName}");
- ${cmd}`;
+ this.startSecondaryReaders = function(nReaders, readFn) {
+
+ let read = function() {
+ db.getMongo().setSlaveOk();
+ db = db.getSiblingDB(TestData.dbName);
+ while (true) {
+ eval(TestData.readFn);
+ let signalDoc = db.getCollection(TestData.signalColl)
+ .find({_id: TestData.testDoneId})
+ .itcount();
+ if (signalDoc != 0) {
+ print("signal doc found. quitting...");
+ quit();
+ }
+ }
+ };
+
+ TestData.dbName = dbName;
+ TestData.readFn = "(" + readFn.toString() + ")();";
+ TestData.signalColl = signalColl;
+ TestData.testDoneId = testDoneId;
for (let i = 0; i < nReaders; i++) {
- readers.push(
- startMongoProgramNoConnect("mongo", "--port", secondary.port, "--eval", readCmd));
+ readers.push(startParallelShell(read, secondary.port));
print("reader " + readers.length + " started");
}
};
- this.doOnPrimary = function(writeFn) {
- let db = primary.getDB(dbName);
- writeFn(db);
+ let failPoint = "pauseBatchApplicationBeforeCompletion";
+
+ // This returns a function that should be called once after performing a replicated write on a
+ // primary. The write will start a batch on a secondary and immediately pause before completion.
+ // The returned function will return once the batch has reached the point where it has applied
+ // but not updated the last applied optime.
+ this.pauseSecondaryBatchApplication = function() {
+
+ clearRawMongoProgramOutput();
+
+ assert.commandWorked(
+ secondaryDB.adminCommand({configureFailPoint: failPoint, mode: "alwaysOn"}));
+
+ return function() {
+ assert.soon(function() {
+ return rawMongoProgramOutput().match(failPoint + " fail point enabled");
+ });
+ };
+ };
+
+ this.resumeSecondaryBatchApplication = function() {
+ assert.commandWorked(
+ secondaryDB.adminCommand({configureFailPoint: failPoint, mode: "off"}));
+ };
+
+ this.getPrimaryDB = function() {
+ return primaryDB;
+ };
+
+ this.getSecondaryDB = function() {
+ return secondaryDB;
};
this.stopReaders = function() {
+ print("signaling readers to stop...");
+ assert.gt(readers.length, 0, "no readers to stop");
+ assert.writeOK(primaryDB.getCollection(signalColl).insert({_id: testDoneId}));
for (let i = 0; i < readers.length; i++) {
- const pid = readers[i];
- const ec = stopMongoProgramByPid(pid);
- const expect = _isWindows() ? 1 : -15;
- assert.eq(
- ec, expect, "Expected mongo shell to exit with code " + expect + ". PID: " + pid);
+ const await = readers[i];
+ await();
print("reader " + i + " done");
}
readers = [];
@@ -66,7 +113,9 @@ function SecondaryReadsTest(name = "secondary_reads_test", replSet) {
};
this.stop = function() {
- this.stopReaders();
+ if (readers.length > 0) {
+ this.stopReaders();
+ }
rst.stopSet();
};
}
diff --git a/jstests/replsets/secondary_reads_timestamp_visibility.js b/jstests/replsets/secondary_reads_timestamp_visibility.js
new file mode 100644
index 00000000000..47b7163d7f5
--- /dev/null
+++ b/jstests/replsets/secondary_reads_timestamp_visibility.js
@@ -0,0 +1,83 @@
+/**
+ * Tests that reads on a secondary during batch application only see changes that occur at the last
+ * applied timestamp, whichis advanced at the completion of each batch.
+ *
+ * This test uses a failpoint to block right before batch application finishes, while holding the
+ * PBWM lock, and before advancing the last applied timestamp for readers.
+ *
+ */
+(function() {
+ "use strict";
+
+ load('jstests/replsets/libs/secondary_reads_test.js');
+
+ const name = "secondaryReadsTimestampVisibility";
+ const collName = "testColl";
+ let secondaryReadsTest = new SecondaryReadsTest(name);
+ let replSet = secondaryReadsTest.getReplset();
+
+ let primaryDB = secondaryReadsTest.getPrimaryDB();
+ let secondaryDB = secondaryReadsTest.getSecondaryDB();
+
+ let primaryColl = primaryDB.getCollection(collName);
+
+ // Create a collection and an index. Insert some data.
+ primaryDB.runCommand({drop: collName});
+ assert.commandWorked(primaryDB.runCommand({create: collName}));
+ assert.commandWorked(primaryDB.runCommand(
+ {createIndexes: collName, indexes: [{key: {y: 1}, name: "y_1", unique: true}]}));
+ for (let i = 0; i < 100; i++) {
+ assert.commandWorked(primaryColl.insert({_id: i, x: 0, y: i + 1}));
+ }
+
+ replSet.awaitReplication();
+
+ // Sanity check.
+ assert.eq(secondaryDB.getCollection(collName).find({x: 0}).itcount(), 100);
+ assert.eq(secondaryDB.getCollection(collName).find({y: {$gte: 1, $lt: 101}}).itcount(), 100);
+
+ // Prevent a batch from completing on the secondary.
+ let pauseAwait = secondaryReadsTest.pauseSecondaryBatchApplication();
+
+ // Update x to 1 in each document with default writeConcern and make sure we see the correct
+ // data on the primary.
+ let updates = [];
+ for (let i = 0; i < 100; i++) {
+ updates[i] = {q: {_id: i}, u: {x: 1, y: i}};
+ }
+ assert.commandWorked(primaryDB.runCommand({update: collName, updates: updates}));
+ assert.eq(primaryColl.find({x: 1}).itcount(), 100);
+ assert.eq(primaryColl.find({y: {$gte: 0, $lt: 100}}).itcount(), 100);
+
+ // Wait for the batch application to pause.
+ pauseAwait();
+
+ let levels = ["local", "available", "majority"];
+
+ // We should see the previous, un-replicated state on the secondary with every readconcern.
+ for (let i in levels) {
+ assert.eq(secondaryDB.getCollection(collName).find({x: 0}).readConcern(levels[i]).itcount(),
+ 100);
+ assert.eq(secondaryDB.getCollection(collName).find({x: 1}).readConcern(levels[i]).itcount(),
+ 0);
+ assert.eq(secondaryDB.getCollection(collName)
+ .find({y: {$gte: 1, $lt: 101}})
+ .readConcern(levels[i])
+ .itcount(),
+ 100);
+ }
+
+ // Disable the failpoint and let the batch complete.
+ secondaryReadsTest.resumeSecondaryBatchApplication();
+
+ replSet.awaitReplication();
+
+ for (let i in levels) {
+ // We should see the previous state on the secondary with every readconcern.
+ assert.eq(secondaryDB.getCollection(collName).find({x: 0}).readConcern(levels[i]).itcount(),
+ 0);
+ assert.eq(secondaryDB.getCollection(collName).find({x: 1}).readConcern(levels[i]).itcount(),
+ 100);
+ }
+ secondaryReadsTest.stop();
+})();
diff --git a/jstests/replsets/secondary_reads_unique_indexes.js b/jstests/replsets/secondary_reads_unique_indexes.js
index 6935ee8d421..78fceabd436 100644
--- a/jstests/replsets/secondary_reads_unique_indexes.js
+++ b/jstests/replsets/secondary_reads_unique_indexes.js
@@ -35,18 +35,19 @@
const collName = "testColl";
let secondaryReadsTest = new SecondaryReadsTest(name);
+ let primaryDB = secondaryReadsTest.getPrimaryDB();
+ let secondaryDB = secondaryReadsTest.getSecondaryDB();
+
// Setup collection.
- secondaryReadsTest.doOnPrimary(function(db) {
- db.runCommand({drop: collName});
- assert.commandWorked(db.runCommand({create: collName}));
+ primaryDB.runCommand({drop: collName});
+ assert.commandWorked(primaryDB.runCommand({create: collName}));
- // Create a unique index on the collection in the foreground.
- assert.commandWorked(db.runCommand(
- {createIndexes: collName, indexes: [{key: {x: 1}, name: "x_1", unique: true}]}));
- });
+ // Create a unique index on the collection in the foreground.
+ assert.commandWorked(primaryDB.runCommand(
+ {createIndexes: collName, indexes: [{key: {x: 1}, name: "x_1", unique: true}]}));
- let rst = secondaryReadsTest.getReplset();
- rst.awaitReplication();
+ let replSet = secondaryReadsTest.getReplset();
+ replSet.awaitReplication();
// We want to do updates with at least as many different documents as there are parallel batch
// writer threads (16). Each iteration increments and decrements a uniquely indexed value, 'x'.
@@ -58,28 +59,28 @@
// Do a bunch of reads using the 'x' index on the secondary.
// No errors should be encountered on the secondary.
- let readCmd = `while (true) {
- for (let x = 0; x < ${nOps}; x++) {
- assert.commandWorked(db.runCommand({
- find: "${collName}",
- filter: {x: x},
- projection: {x: 1},
- readConcern: {level: "local"},
- }));
- }
- }`;
- secondaryReadsTest.startSecondaryReaders(nReaders, readCmd);
-
- // Write the initial documents. Ensure they have been replicated.
- secondaryReadsTest.doOnPrimary(function(db) {
- for (let i = 0; i < nOps; i++) {
+ let readFn = function() {
+ for (let x = 0; x < TestData.nOps; x++) {
assert.commandWorked(db.runCommand({
- insert: collName,
- documents: [{_id: i, x: i, iter: 0}],
- writeConcern: {w: "majority"}
+ find: TestData.collName,
+ filter: {x: x},
+ projection: {x: 1},
+ readConcern: {level: "local"},
}));
}
- });
+ };
+ TestData.nOps = nOps;
+ TestData.collName = collName;
+ secondaryReadsTest.startSecondaryReaders(nReaders, readFn);
+
+ // Write the initial documents. Ensure they have been replicated.
+ for (let i = 0; i < nOps; i++) {
+ assert.commandWorked(primaryDB.runCommand({
+ insert: collName,
+ documents: [{_id: i, x: i, iter: 0}],
+ writeConcern: {w: "majority"}
+ }));
+ }
// Cycle the value of x in the document {_id: i, x: i} between i and i+1 each iteration.
for (let iteration = 0; iteration < nIterations; iteration++) {
@@ -89,9 +90,7 @@
updates[i] = {q: {_id: i}, u: {x: i, iter: iteration}};
}
- secondaryReadsTest.doOnPrimary(function(db) {
- assert.commandWorked(db.runCommand({update: collName, updates: updates}));
- });
+ assert.commandWorked(primaryDB.runCommand({update: collName, updates: updates}));
updates = [];
// Generate updates that increment x on each document backwards by _id to avoid conficts
@@ -105,11 +104,9 @@
updates[i] = {q: {_id: end}, u: {x: nextX, iter: iteration}};
}
print("iteration " + iteration);
- secondaryReadsTest.doOnPrimary(function(db) {
- assert.commandWorked(db.runCommand({update: collName, updates: updates}));
- });
+ assert.commandWorked(primaryDB.runCommand({update: collName, updates: updates}));
}
- rst.awaitReplication();
+ replSet.awaitReplication();
secondaryReadsTest.stop();
})();
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 4a455f00e77..f26011121aa 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -92,6 +92,8 @@ AtomicInt32 SyncTail::replBatchLimitOperations{50 * 1000};
namespace {
+MONGO_FP_DECLARE(pauseBatchApplicationBeforeCompletion);
+
/**
* This variable determines the number of writer threads SyncTail will have. It can be overridden
* using the "replWriterThreadCount" server parameter.
@@ -1386,6 +1388,20 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
storageEngine->replicationBatchIsComplete();
}
+ // Use this fail point to hold the PBWM lock and prevent the batch from completing.
+ if (MONGO_FAIL_POINT(pauseBatchApplicationBeforeCompletion)) {
+ log() << "pauseBatchApplicationBeforeCompletion fail point enabled. Blocking until fail "
+ "point is disabled.";
+ while (MONGO_FAIL_POINT(pauseBatchApplicationBeforeCompletion)) {
+ if (inShutdown()) {
+ severe() << "Turn off pauseBatchApplicationBeforeCompletion before attempting "
+ "clean shutdown";
+ fassertFailedNoTrace(50798);
+ }
+ sleepmillis(100);
+ }
+ }
+
Timestamp firstTimeInBatch = ops.front().getTimestamp();
// Set any indexes to multikey that this batch ignored. This must be done while holding the
// parallel batch writer mutex.
diff --git a/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp b/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp
index 3f2329ae520..7fb1905c2b8 100644
--- a/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp
+++ b/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp
@@ -151,18 +151,29 @@ public:
return itCountOn(op);
}
- boost::optional<Record> readRecordCommitted(RecordId id) {
+ int itCountLocal() {
auto op = makeOperation();
op->recoveryUnit()->setReadConcernLevelAndReplicationMode(
- repl::ReadConcernLevel::kMajorityReadConcern,
- repl::ReplicationCoordinator::modeReplSet);
- ASSERT_OK(op->recoveryUnit()->obtainMajorityCommittedSnapshot());
+ repl::ReadConcernLevel::kLocalReadConcern, repl::ReplicationCoordinator::modeReplSet);
+ op->recoveryUnit()->setShouldReadAtLastAppliedTimestamp(true);
+ return itCountOn(op);
+ }
+
+ boost::optional<Record> readRecordOn(OperationContext* op, RecordId id) {
auto cursor = rs->getCursor(op);
auto record = cursor->seekExact(id);
if (record)
record->data.makeOwned();
return record;
}
+ boost::optional<Record> readRecordCommitted(RecordId id) {
+ auto op = makeOperation();
+ op->recoveryUnit()->setReadConcernLevelAndReplicationMode(
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ repl::ReplicationCoordinator::modeReplSet);
+ ASSERT_OK(op->recoveryUnit()->obtainMajorityCommittedSnapshot());
+ return readRecordOn(op, id);
+ }
std::string readStringCommitted(RecordId id) {
auto record = readRecordCommitted(id);
@@ -170,6 +181,20 @@ public:
return std::string(record->data.data());
}
+ boost::optional<Record> readRecordLocal(RecordId id) {
+ auto op = makeOperation();
+ op->recoveryUnit()->setReadConcernLevelAndReplicationMode(
+ repl::ReadConcernLevel::kLocalReadConcern, repl::ReplicationCoordinator::modeReplSet);
+ op->recoveryUnit()->setShouldReadAtLastAppliedTimestamp(true);
+ return readRecordOn(op, id);
+ }
+
+ std::string readStringLocal(RecordId id) {
+ auto record = readRecordLocal(id);
+ ASSERT(record);
+ return std::string(record->data.data());
+ }
+
void setUp() override {
helper = KVHarnessHelper::create();
engine = helper->getEngine();
@@ -346,4 +371,76 @@ TEST_F(SnapshotManagerTests, UpdateAndDelete) {
ASSERT(!readRecordCommitted(id));
}
+TEST_F(SnapshotManagerTests, InsertAndReadOnLocalSnapshot) {
+ if (!snapshotManager)
+ return; // This test is only for engines that DO support SnapshotManagers.
+
+ auto beforeInsert = fetchAndIncrementTimestamp();
+
+ auto id = insertRecordAndCommit();
+ auto afterInsert = fetchAndIncrementTimestamp();
+
+ // Not reading on the last local timestamp returns the most recent data.
+ auto op = makeOperation();
+ auto ru = op->recoveryUnit();
+ ru->setShouldReadAtLastAppliedTimestamp(false);
+ ASSERT_EQ(itCountOn(op), 1);
+ ASSERT(readRecordOn(op, id));
+
+ deleteRecordAndCommit(id);
+ auto afterDelete = fetchAndIncrementTimestamp();
+
+ // Reading at the local snapshot timestamps returns data in order.
+ snapshotManager->setLocalSnapshot(beforeInsert);
+ ASSERT_EQ(itCountLocal(), 0);
+ ASSERT(!readRecordLocal(id));
+
+ snapshotManager->setLocalSnapshot(afterInsert);
+ ASSERT_EQ(itCountLocal(), 1);
+ ASSERT(readRecordLocal(id));
+
+ snapshotManager->setLocalSnapshot(afterDelete);
+ ASSERT_EQ(itCountLocal(), 0);
+ ASSERT(!readRecordLocal(id));
+}
+
+TEST_F(SnapshotManagerTests, UpdateAndDeleteOnLocalSnapshot) {
+ if (!snapshotManager)
+ return; // This test is only for engines that DO support SnapshotManagers.
+
+ auto beforeInsert = fetchAndIncrementTimestamp();
+
+ auto id = insertRecordAndCommit("Aardvark");
+ auto afterInsert = fetchAndIncrementTimestamp();
+
+ updateRecordAndCommit(id, "Blue spotted stingray");
+ auto afterUpdate = fetchAndIncrementTimestamp();
+
+ // Not reading on the last local timestamp returns the most recent data.
+ auto op = makeOperation();
+ auto ru = op->recoveryUnit();
+ ru->setShouldReadAtLastAppliedTimestamp(false);
+ ASSERT_EQ(itCountOn(op), 1);
+ auto record = readRecordOn(op, id);
+ ASSERT_EQ(std::string(record->data.data()), "Blue spotted stingray");
+
+ deleteRecordAndCommit(id);
+ auto afterDelete = fetchAndIncrementTimestamp();
+
+ snapshotManager->setLocalSnapshot(beforeInsert);
+ ASSERT_EQ(itCountLocal(), 0);
+ ASSERT(!readRecordLocal(id));
+
+ snapshotManager->setLocalSnapshot(afterInsert);
+ ASSERT_EQ(itCountLocal(), 1);
+ ASSERT_EQ(readStringLocal(id), "Aardvark");
+
+ snapshotManager->setLocalSnapshot(afterUpdate);
+ ASSERT_EQ(itCountLocal(), 1);
+ ASSERT_EQ(readStringLocal(id), "Blue spotted stingray");
+
+ snapshotManager->setLocalSnapshot(afterDelete);
+ ASSERT_EQ(itCountLocal(), 0);
+ ASSERT(!readRecordLocal(id));
+}
} // namespace mongo
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 4426fe47a1b..d2c31fb6e1a 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -50,9 +50,9 @@
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/repl/apply_ops.h"
#include "mongo/db/repl/drop_pending_collection_reaper.h"
+#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
-#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_consistency_markers_impl.h"
@@ -67,6 +67,7 @@
#include "mongo/db/service_context.h"
#include "mongo/db/storage/kv/kv_storage_engine.h"
#include "mongo/dbtests/dbtests.h"
+#include "mongo/stdx/future.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/stacktrace.h"
@@ -1806,6 +1807,105 @@ public:
}
};
+class SecondaryReadsDuringBatchApplicationAreAllowed : public StorageTimestampTest {
+public:
+ void run() {
+ // Only run on 'wiredTiger'. No other storage engines to-date support timestamp writes.
+ if (mongo::storageGlobalParams.engine != "wiredTiger") {
+ return;
+ }
+ ASSERT(
+ _opCtx->getServiceContext()->getGlobalStorageEngine()->supportsReadConcernSnapshot());
+
+ NamespaceString ns("unittest.secondaryReadsDuringBatchApplicationAreAllowed");
+ reset(ns);
+ UUID uuid = UUID::gen();
+ {
+ AutoGetCollectionForRead autoColl(_opCtx, ns);
+ uuid = autoColl.getCollection()->uuid().get();
+ ASSERT_EQ(itCount(autoColl.getCollection()), 0);
+ }
+
+ // Returns true when the batch has started, meaning the applier is holding the PBWM lock.
+ // Will return false if the lock was not held.
+ Promise<bool> batchInProgressPromise;
+ // Attempt to read when in the middle of a batch.
+ stdx::packaged_task<bool()> task([&] {
+ Client::initThread(getThreadName());
+ auto readOp = cc().makeOperationContext();
+
+ // Wait for the batch to start or fail.
+ if (!batchInProgressPromise.getFuture().get()) {
+ return false;
+ }
+ AutoGetCollectionForRead autoColl(readOp.get(), ns);
+ return !readOp->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode,
+ MODE_IS);
+ });
+ auto taskFuture = task.get_future();
+ stdx::thread taskThread{std::move(task)};
+
+ auto joinGuard = MakeGuard([&] {
+ batchInProgressPromise.emplaceValue(false);
+ taskThread.join();
+ });
+
+ // This apply operation function will block until the reader has tried acquiring a
+ // collection lock. This returns BadValue statuses instead of asserting so that the worker
+ // threads can cleanly exit and this test case fails without crashing the entire suite.
+ auto applyOperationFn = [&](OperationContext* opCtx,
+ std::vector<const repl::OplogEntry*>* operationsToApply,
+ repl::SyncTail* st,
+ std::vector<MultikeyPathInfo>* pathInfo) -> Status {
+ if (!_opCtx->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode,
+ MODE_X)) {
+ return {ErrorCodes::BadValue, "Batch applied was not holding PBWM lock in MODE_X"};
+ }
+
+ // Insert the document. A reader without a PBWM lock should not see it yet.
+ auto status = repl::multiSyncApply(opCtx, operationsToApply, st, pathInfo);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ // Signals the reader to acquire a collection read lock.
+ batchInProgressPromise.emplaceValue(true);
+
+ // Block while holding the PBWM lock until the reader is done.
+ if (!taskFuture.get()) {
+ return {ErrorCodes::BadValue, "Client was holding PBWM lock in MODE_IS"};
+ }
+ return Status::OK();
+ };
+
+ // Make a simple insert operation.
+ BSONObj doc0 = BSON("_id" << 0 << "a" << 0);
+ auto insertOp = repl::OplogEntry(
+ BSON("ts" << futureTs << "t" << 1LL << "h" << 0xBEEFBEEFLL << "v" << 2 << "op"
+ << "i"
+ << "ns"
+ << ns.ns()
+ << "ui"
+ << uuid
+ << "o"
+ << doc0));
+
+ // Apply the operation.
+ auto writerPool = repl::SyncTail::makeWriterPool(1);
+ repl::SyncTail syncTail(nullptr, applyOperationFn, writerPool.get());
+ auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp}));
+ ASSERT_EQ(insertOp.getOpTime(), lastOpTime);
+
+ joinGuard.Dismiss();
+ taskThread.join();
+
+ // Read on the local snapshot to verify the document was inserted.
+ AutoGetCollectionForRead autoColl(_opCtx, ns);
+ assertDocumentAtTimestamp(autoColl.getCollection(), futureTs, doc0);
+ }
+};
+
+
class AllStorageTimestampTests : public unittest::Suite {
public:
AllStorageTimestampTests() : unittest::Suite("StorageTimestampTests") {}
@@ -1833,6 +1933,7 @@ public:
// TimestampIndexBuilds<SimulatePrimary>
add<TimestampIndexBuilds<false>>();
add<TimestampIndexBuilds<true>>();
+ add<SecondaryReadsDuringBatchApplicationAreAllowed>();
}
};