diff options
author | Louis Williams <louis.williams@mongodb.com> | 2018-03-20 13:18:07 -0400 |
---|---|---|
committer | Louis Williams <louis.williams@mongodb.com> | 2018-04-13 10:48:56 -0400 |
commit | 10bcc73a75ac857c290c3af6a3f89a45c4867043 (patch) | |
tree | ee4612c4807e4232f3242589f2d70bd9e6cbf4ab | |
parent | 235858ceee5d209d5e10579b79cfdc3bad7ff877 (diff) | |
download | mongo-10bcc73a75ac857c290c3af6a3f89a45c4867043.tar.gz |
SERVER-34192 Allow secondary reads during batch applications
26 files changed, 470 insertions, 107 deletions
diff --git a/jstests/replsets/libs/secondary_reads_test.js b/jstests/replsets/libs/secondary_reads_test.js new file mode 100644 index 00000000000..7b1f97e73a6 --- /dev/null +++ b/jstests/replsets/libs/secondary_reads_test.js @@ -0,0 +1,72 @@ +/** + * This is a library for testing secondary reads against a replica set + */ + +"use strict"; + +load("jstests/replsets/rslib.js"); + +function SecondaryReadsTest(name = "secondary_reads_test", replSet) { + let rst = (replSet) ? replSet : performStandardSetup(); + let dbName = name; + + let primary = rst.getPrimary(); + let primaryDB = primary.getDB(dbName); + let secondary = rst.getSecondary(); + let readers = []; + + /** + * Return an instance of ReplSetTest initialized with a standard + * two-node replica set running with the latest version. + */ + function performStandardSetup() { + let replSet = new ReplSetTest({name, nodes: 2}); + replSet.startSet(); + + const nodes = replSet.nodeList(); + replSet.initiate({ + _id: name, + members: [{_id: 0, host: nodes[0]}, {_id: 1, host: nodes[1], priority: 0}] + }); + return replSet; + } + + this.startSecondaryReaders = function(nReaders, cmd) { + let readCmd = `db.getMongo().setSlaveOk(); + db.getMongo().setReadPref("secondaryPreferred"); + db = db.getSiblingDB("${dbName}"); + ${cmd}`; + + for (let i = 0; i < nReaders; i++) { + readers.push( + startMongoProgramNoConnect("mongo", "--port", secondary.port, "--eval", readCmd)); + print("reader " + readers.length + " started"); + } + }; + + this.doOnPrimary = function(writeFn) { + let db = primary.getDB(dbName); + writeFn(db); + }; + + this.stopReaders = function() { + for (let i = 0; i < readers.length; i++) { + const pid = readers[i]; + const ec = stopMongoProgramByPid(pid); + const expect = _isWindows() ? 1 : -15; + assert.eq( + ec, expect, "Expected mongo shell to exit with code " + expect + ". PID: " + pid); + print("reader " + i + " done"); + } + readers = []; + }; + + this.getReplset = function() { + return rst; + }; + + this.stop = function() { + this.stopReaders(); + rst.stopSet(); + }; +} diff --git a/jstests/replsets/secondary_reads_unique_indexes.js b/jstests/replsets/secondary_reads_unique_indexes.js index f131fb7ed84..6935ee8d421 100644 --- a/jstests/replsets/secondary_reads_unique_indexes.js +++ b/jstests/replsets/secondary_reads_unique_indexes.js @@ -25,31 +25,28 @@ * uniqueness violations on indexes, and require readers on secondaries to wait for the parallel * batch insert to complete, at which point the state of the indexes will be consistent. */ + (function() { "use strict"; + load("jstests/replsets/libs/secondary_reads_test.js"); + const name = "secondaryReadsUniqueIndexes"; - let rst = new ReplSetTest({name: name, nodes: 2}); - let nodes = rst.startSet(); - rst.initiate({ - _id: name, - members: [ - {_id: 0, host: nodes[0].host, priority: 1}, - {_id: 1, host: nodes[1].host, priority: 0} - ] - }); + const collName = "testColl"; + let secondaryReadsTest = new SecondaryReadsTest(name); + + // Setup collection. + secondaryReadsTest.doOnPrimary(function(db) { + db.runCommand({drop: collName}); + assert.commandWorked(db.runCommand({create: collName})); - let primary = rst.getPrimary(); - let secondary = rst.getSecondary(); - let testDB = primary.getDB("test"); - let testCollName = "testColl"; - let testNs = "test." + testCollName; + // Create a unique index on the collection in the foreground. + assert.commandWorked(db.runCommand( + {createIndexes: collName, indexes: [{key: {x: 1}, name: "x_1", unique: true}]})); + }); - testDB.runCommand({drop: testCollName}); - assert.commandWorked(testDB.runCommand({create: testCollName})); - // Create a unique index on the collection in the foreground. - assert.commandWorked(testDB.runCommand( - {createIndexes: testCollName, indexes: [{key: {x: 1}, name: "x_1", unique: true}]})); + let rst = secondaryReadsTest.getReplset(); + rst.awaitReplication(); // We want to do updates with at least as many different documents as there are parallel batch // writer threads (16). Each iteration increments and decrements a uniquely indexed value, 'x'. @@ -61,30 +58,28 @@ // Do a bunch of reads using the 'x' index on the secondary. // No errors should be encountered on the secondary. - let readers = []; - let readCmd = ` - db.getMongo().setSlaveOk(); - while (true) { - for (let x = 0; x < ${nOps}; x++) { - assert.commandWorked(db.getSiblingDB('test').runCommand( - {find: "${testCollName}", filter: {x: x}, projection: {x: 1}})); - } - }`; - - for (let i = 0; i < nReaders; i++) { - readers[i] = - startMongoProgramNoConnect("mongo", "--port", secondary.port, "--eval", readCmd); - print("reader " + i + " started"); - } + let readCmd = `while (true) { + for (let x = 0; x < ${nOps}; x++) { + assert.commandWorked(db.runCommand({ + find: "${collName}", + filter: {x: x}, + projection: {x: 1}, + readConcern: {level: "local"}, + })); + } + }`; + secondaryReadsTest.startSecondaryReaders(nReaders, readCmd); // Write the initial documents. Ensure they have been replicated. - for (let i = 0; i < nOps; i++) { - assert.commandWorked(testDB.runCommand({ - insert: testCollName, - documents: [{_id: i, x: i, iter: 0}], - writeConcern: {w: "majority"} - })); - } + secondaryReadsTest.doOnPrimary(function(db) { + for (let i = 0; i < nOps; i++) { + assert.commandWorked(db.runCommand({ + insert: collName, + documents: [{_id: i, x: i, iter: 0}], + writeConcern: {w: "majority"} + })); + } + }); // Cycle the value of x in the document {_id: i, x: i} between i and i+1 each iteration. for (let iteration = 0; iteration < nIterations; iteration++) { @@ -93,7 +88,10 @@ for (let i = 0; i < nOps; i++) { updates[i] = {q: {_id: i}, u: {x: i, iter: iteration}}; } - assert.commandWorked(testDB.runCommand({update: testCollName, updates: updates})); + + secondaryReadsTest.doOnPrimary(function(db) { + assert.commandWorked(db.runCommand({update: collName, updates: updates})); + }); updates = []; // Generate updates that increment x on each document backwards by _id to avoid conficts @@ -106,18 +104,12 @@ let nextX = end + 1; updates[i] = {q: {_id: end}, u: {x: nextX, iter: iteration}}; } - print('iteration ' + iteration); - assert.commandWorked(testDB.runCommand({update: testCollName, updates: updates})); + print("iteration " + iteration); + secondaryReadsTest.doOnPrimary(function(db) { + assert.commandWorked(db.runCommand({update: collName, updates: updates})); + }); } rst.awaitReplication(); - - for (let i = 0; i < nReaders; i++) { - const pid = readers[i]; - const ec = stopMongoProgramByPid(pid); - const expect = _isWindows() ? 1 : -15; - assert.eq(ec, expect, "Expected mongo shell to exit with code " + expect + ". PID: " + pid); - print("reader " + i + " done"); - } - rst.stopSet(); + secondaryReadsTest.stop(); })(); diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index df4b851aa99..03c3ee52ec5 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -26,15 +26,20 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + #include "mongo/platform/basic.h" #include "mongo/db/db_raii.h" #include "mongo/db/catalog/database_holder.h" +#include "mongo/db/concurrency/locker.h" #include "mongo/db/curop.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/server_parameters.h" #include "mongo/db/session_catalog.h" +#include "mongo/util/log.h" namespace mongo { namespace { @@ -43,6 +48,10 @@ const boost::optional<int> kDoNotChangeProfilingLevel = boost::none; } // namespace +// If true, do not take the PBWM lock in AutoGetCollectionForRead on secondaries during batch +// application. +MONGO_EXPORT_SERVER_PARAMETER(allowSecondaryReadsDuringBatchApplication, bool, true); + AutoStatsTracker::AutoStatsTracker(OperationContext* opCtx, const NamespaceString& nss, Top::LockType lockType, @@ -78,45 +87,83 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx, const NamespaceStringOrUUID& nsOrUUID, AutoGetCollection::ViewMode viewMode, Date_t deadline) { + // Don't take the ParallelBatchWriterMode lock when the server parameter is set and our + // storage engine supports snapshot reads. + if (allowSecondaryReadsDuringBatchApplication.load() && + opCtx->getServiceContext()->getGlobalStorageEngine()->supportsReadConcernSnapshot()) { + _shouldNotConflictWithSecondaryBatchApplicationBlock.emplace(opCtx->lockState()); + } const auto collectionLockMode = getLockModeForQuery(opCtx); _autoColl.emplace(opCtx, nsOrUUID, collectionLockMode, viewMode, deadline); - while (true) { - auto coll = _autoColl->getCollection(); - if (!coll) { - return; - } - + repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx); + const auto readConcernLevel = opCtx->recoveryUnit()->getReadConcernLevel(); + + // If the collection doesn't exist or disappears after releasing locks and waiting, there is no + // need to check for pending catalog changes. + while (auto coll = _autoColl->getCollection()) { + + // During batch application on secondaries, there is a potential to read inconsistent states + // that would normally be protected by the PBWM lock. In order to serve secondary reads + // during this period, we default to not acquiring the lock (by setting + // _shouldNotConflictWithSecondaryBatchApplicationBlock). On primaries, we always read at a + // consistent time, so not taking the PBWM lock is not a problem. On secondaries, we have to + // guarantee we read at a consistent state, so we must read at the last applied timestamp, + // which is set after each complete batch. + // + // If an attempt to read at the last applied timestamp is unsuccessful because there are + // pending catalog changes that occur after the last applied timestamp, we release our locks + // and try again with the PBWM lock (by unsetting + // _shouldNotConflictWithSecondaryBatchApplicationBlock). + + const NamespaceString& nss = coll->ns(); + + // Read at the last applied timestamp if we don't have the PBWM lock and correct conditions + // are met. + bool readAtLastAppliedTimestamp = _shouldNotConflictWithSecondaryBatchApplicationBlock && + _shouldDoSecondaryLocalSnapshotRead(opCtx, nss, readConcernLevel); + + opCtx->recoveryUnit()->setShouldReadAtLastAppliedTimestamp(readAtLastAppliedTimestamp); + + // This timestamp could be earlier than the timestamp seen when the transaction is opened + // because it is set asynchonously. This is not problematic because holding the collection + // lock guarantees no metadata changes will occur in that time. + auto lastAppliedTimestamp = boost::make_optional( + readAtLastAppliedTimestamp, replCoord->getMyLastAppliedOpTime().getTimestamp()); + + // Return if there are no conflicting catalog changes on the collection. auto minSnapshot = coll->getMinimumVisibleSnapshot(); - if (!minSnapshot) { - return; - } - auto mySnapshot = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(); - if (!mySnapshot) { - return; - } - if (mySnapshot >= minSnapshot) { + if (!_conflictingCatalogChanges( + opCtx, readConcernLevel, minSnapshot, lastAppliedTimestamp)) { return; } - auto readConcernLevel = opCtx->recoveryUnit()->getReadConcernLevel(); - if (readConcernLevel == repl::ReadConcernLevel::kSnapshotReadConcern) { - uasserted(ErrorCodes::SnapshotUnavailable, - str::stream() - << "Unable to read from a snapshot due to pending collection catalog " - "changes; please retry the operation. Snapshot timestamp is " - << mySnapshot->toString() - << ". Collection minimum is " - << minSnapshot->toString()); - } - invariant(readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern); + invariant(lastAppliedTimestamp || + readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern); - // Yield locks in order to do the blocking call below + // Yield locks in order to do the blocking call below. + // This should only be called if we are doing a snapshot read at the last applied time or + // majority commit point. _autoColl = boost::none; - repl::ReplicationCoordinator::get(opCtx)->waitUntilSnapshotCommitted(opCtx, *minSnapshot); + // If there are pending catalog changes, we should conflict with any in-progress batches (by + // taking the PBWM lock) and choose not to read from the last applied timestamp by unsetting + // _shouldNotConflictWithSecondaryBatchApplicationBlock. Index builds on secondaries can + // complete at timestamps later than the lastAppliedTimestamp during initial sync + // (SERVER-34343). After initial sync finishes, if we waited instead of retrying, readers + // would block indefinitely waiting for the lastAppliedTimestamp to move forward. Instead we + // force the reader take the PBWM lock and retry. + if (lastAppliedTimestamp) { + LOG(2) << "Tried reading from local snapshot time: " << *lastAppliedTimestamp + << " on nss: " << nss.ns() << ", but future catalog changes are pending at time " + << *minSnapshot << ". Trying again without reading from the local snapshot"; + _shouldNotConflictWithSecondaryBatchApplicationBlock = boost::none; + } - uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot()); + if (readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern) { + replCoord->waitUntilSnapshotCommitted(opCtx, *minSnapshot); + uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot()); + } { stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -127,6 +174,79 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx, } } +bool AutoGetCollectionForRead::_shouldDoSecondaryLocalSnapshotRead( + OperationContext* opCtx, + const NamespaceString& nss, + repl::ReadConcernLevel readConcernLevel) const { + // Majority and snapshot readConcern levels should not read from the local snapshot. For both we + // already wait until the desired timestamp, which is updated at the end of each batch. + if (readConcernLevel != repl::ReadConcernLevel::kLocalReadConcern && + readConcernLevel != repl::ReadConcernLevel::kAvailableReadConcern) { + return false; + } + + // If we are in a replication state (like secondary or primary catch-up) where we are not + // accepting writes, we should always read from the last applied snapshot. If this node can + // accept writes, then no conflicting replication batches are being applied and we can read from + // the default snapshot. + if (repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, "admin")) { + return false; + } + + // Local and available read concerns should not be restricted to batch boundaries for + // non-replicated collections. Not being able to read from the last applied timestamp with + // non-network clients is tracked by SERVER-34440. + if (!nss.isReplicated() || !opCtx->getClient()->isFromUserConnection()) { + return false; + } + return true; +} + +bool AutoGetCollectionForRead::_conflictingCatalogChanges( + OperationContext* opCtx, + repl::ReadConcernLevel readConcernLevel, + boost::optional<Timestamp> minSnapshot, + boost::optional<Timestamp> lastAppliedTimestamp) const { + // This is the timestamp of the most recent catalog changes to this collection. If this is + // greater than any point in time read timestamps, we should either wait or return an error. + if (!minSnapshot) { + return false; + } + + // If we are reading from the lastAppliedTimestamp and it is up-to-date with any catalog + // changes, we can return. + if (lastAppliedTimestamp && + (lastAppliedTimestamp->isNull() || *lastAppliedTimestamp >= *minSnapshot)) { + return false; + } + + // This can be set when readConcern is "snapshot" or "majority". + auto mySnapshot = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(); + + // If we do not have a point in time to conflict with minSnapshot, return. + if (!mySnapshot && !lastAppliedTimestamp) { + return false; + } + + // Return if there are no conflicting catalog changes with mySnapshot. + if (mySnapshot && *mySnapshot >= *minSnapshot) { + return false; + } + + // Snapshot readConcern can't yield its locks when there are catalog changes. + if (readConcernLevel == repl::ReadConcernLevel::kSnapshotReadConcern) { + uasserted( + ErrorCodes::SnapshotUnavailable, + str::stream() << "Unable to read from a snapshot due to pending collection catalog " + "changes; please retry the operation. Snapshot timestamp is " + << mySnapshot->toString() + << ". Collection minimum is " + << minSnapshot->toString()); + } + return true; +} + + AutoGetCollectionForReadCommand::AutoGetCollectionForReadCommand( OperationContext* opCtx, const NamespaceStringOrUUID& nsOrUUID, diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h index c20968be550..63d8524d637 100644 --- a/src/mongo/db/db_raii.h +++ b/src/mongo/db/db_raii.h @@ -105,9 +105,28 @@ public: } private: + // If this field is set, the reader will not take the ParallelBatchWriterMode lock and conflict + // with secondary batch application. This stays in scope with the _autoColl so that locks are + // taken and released in the right order. + boost::optional<ShouldNotConflictWithSecondaryBatchApplicationBlock> + _shouldNotConflictWithSecondaryBatchApplicationBlock; + // This field is optional, because the code to wait for majority committed snapshot needs to // release locks in order to block waiting boost::optional<AutoGetCollection> _autoColl; + + // Returns true if we should read from the local snapshot (last applied timestamp) on a + // secondary. + bool _shouldDoSecondaryLocalSnapshotRead(OperationContext* opCtx, + const NamespaceString& nss, + repl::ReadConcernLevel readConcernLevel) const; + + // Returns true if the minSnapshot causes conflicting catalog changes for the provided read + // concern level or lastAppliedTimestamp. + bool _conflictingCatalogChanges(OperationContext* opCtx, + repl::ReadConcernLevel readConcernLevel, + boost::optional<Timestamp> minSnapshot, + boost::optional<Timestamp> lastAppliedTimestamp) const; }; /** diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index a0cff1a7e81..ad73d673bbc 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -103,7 +103,7 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { if (needToMakeCursor) { const bool forward = _params.direction == CollectionScanParams::FORWARD; - if (forward && !_params.tailable && _params.collection->ns().isOplog()) { + if (forward && _params.shouldWaitForOplogVisibility) { // Forward, non-tailable scans from the oplog need to wait until all oplog entries // before the read begins to be visible. This isn't needed for reverse scans because // we only hide oplog entries from forward scans, and it isn't necessary for tailing @@ -112,8 +112,10 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { // might not include writes that finished before the read started. This also must be // done before we create the cursor as that is when we establish the endpoint for // the cursor. Also call abandonSnapshot to make sure that we are using a fresh - // storage engine snapshot while waiting. Otherwise, we will end up reading from - // the snapshot where the oplog entries are not yet visible even after the wait. + // storage engine snapshot while waiting. Otherwise, we will end up reading from the + // snapshot where the oplog entries are not yet visible even after the wait. + invariant(!_params.tailable && _params.collection->ns().isOplog()); + getOpCtx()->recoveryUnit()->abandonSnapshot(); _params.collection->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible( getOpCtx()); diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index 5f960f0a904..b6c141691aa 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -69,6 +69,9 @@ struct CollectionScanParams { // If non-zero, how many documents will we look at? size_t maxScan = 0; + + // Whether or not to wait for oplog visibility on oplog collection scans. + bool shouldWaitForOplogVisibility = false; }; } // namespace mongo diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index b1dcebfff3d..eed63cec24a 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -207,6 +207,39 @@ void fillOutPlannerParams(OperationContext* opCtx, } else { plannerParams->options |= QueryPlannerParams::KEEP_MUTATIONS; } + + if (shouldWaitForOplogVisibility( + opCtx, collection, canonicalQuery->getQueryRequest().isTailable())) { + plannerParams->options |= QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE; + } +} + +bool shouldWaitForOplogVisibility(OperationContext* opCtx, + const Collection* collection, + bool tailable) { + + // Only non-tailable cursors on the oplog are affected. Only forward cursors, not reverse + // cursors, are affected, but this is checked when the cursor is opened. + if (!collection->ns().isOplog() || tailable) { + return false; + } + + // Only primaries should require readers to wait for oplog visibility. In any other replication + // state, readers read at the most visible oplog timestamp. The reason why readers on primaries + // need to wait is because multiple optimes can be allocated for operations before their entries + // are written to the storage engine. "Holes" will appear when an operation with a later optime + // commits before an operation with an earlier optime, and readers should wait so that all data + // is consistent. + // + // Secondaries can't wait for oplog visibility without the PBWM lock because it can introduce a + // hang while a batch application is in progress. The wait is done while holding a global lock, + // and the oplog visibility timestamp is updated at the end of every batch on a secondary, + // signalling the wait to complete. If a replication worker had a global lock and temporarily + // released it, a reader could acquire the lock to read the oplog. If the secondary reader were + // to wait for the oplog visibility timestamp to be updated, it would wait for a replication + // batch that would never complete because it couldn't reacquire its own lock, the global lock + // held by the waiting reader. + return repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, "admin"); } namespace { @@ -617,6 +650,8 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack( params.tailable = cq->getQueryRequest().isTailable(); params.shouldTrackLatestOplogTimestamp = plannerOptions & QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + params.shouldWaitForOplogVisibility = + shouldWaitForOplogVisibility(opCtx, collection, params.tailable); // If the query is just a lower bound on "ts", we know that every document in the collection // after the first matching one must also match. To avoid wasting time running the match diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index 237f577de7d..118e0afbee2 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -64,6 +64,14 @@ void fillOutPlannerParams(OperationContext* opCtx, QueryPlannerParams* plannerParams); /** + * Determines whether or not to wait for oplog visibility for a query. This is only used for + * collection scans on the oplog. + */ +bool shouldWaitForOplogVisibility(OperationContext* opCtx, + const Collection* collection, + bool tailable); + +/** * Get a plan executor for a query. * * If the query is valid and an executor could be created, returns a StatusWith with the diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp index 6b1ae0857b7..b47a4ce627d 100644 --- a/src/mongo/db/query/internal_plans.cpp +++ b/src/mongo/db/query/internal_plans.cpp @@ -39,6 +39,7 @@ #include "mongo/db/exec/idhack.h" #include "mongo/db/exec/index_scan.h" #include "mongo/db/exec/update.h" +#include "mongo/db/query/get_executor.h" #include "mongo/stdx/memory.h" namespace mongo { @@ -179,6 +180,7 @@ std::unique_ptr<PlanStage> InternalPlanner::_collectionScan(OperationContext* op CollectionScanParams params; params.collection = collection; params.start = startLoc; + params.shouldWaitForOplogVisibility = shouldWaitForOplogVisibility(opCtx, collection, false); if (FORWARD == direction) { params.direction = CollectionScanParams::FORWARD; diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp index 82d40993aea..92a35968227 100644 --- a/src/mongo/db/query/planner_access.cpp +++ b/src/mongo/db/query/planner_access.cpp @@ -153,6 +153,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( csn->maxScan = query.getQueryRequest().getMaxScan(); csn->shouldTrackLatestOplogTimestamp = params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + csn->shouldWaitForOplogVisibility = + params.options & QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE; // If the hint is {$natural: +-1} this changes the direction of the collection scan. if (!query.getQueryRequest().getHint().isEmpty()) { diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp index b0e046fe98b..1e49b615c97 100644 --- a/src/mongo/db/query/query_planner.cpp +++ b/src/mongo/db/query/query_planner.cpp @@ -132,6 +132,10 @@ string optionString(size_t options) { break; case QueryPlannerParams::TRACK_LATEST_OPLOG_TS: ss << "TRACK_LATEST_OPLOG_TS "; + break; + case QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE: + ss << "OPLOG_SCAN_WAIT_FOR_VISIBLE "; + break; case QueryPlannerParams::DEFAULT: MONGO_UNREACHABLE; break; diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h index f54bab70458..85da68100bd 100644 --- a/src/mongo/db/query/query_planner_params.h +++ b/src/mongo/db/query/query_planner_params.h @@ -96,6 +96,9 @@ struct QueryPlannerParams { // Set this to track the most recent timestamp seen by this cursor while scanning the oplog. TRACK_LATEST_OPLOG_TS = 1 << 12, + + // Set this so that collection scans on the oplog wait for visibility before reading. + OPLOG_SCAN_WAIT_FOR_VISIBLE = 1 << 13, }; // See Options enum above. diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp index 62c884634d2..5f6cfd7e9a1 100644 --- a/src/mongo/db/query/query_solution.cpp +++ b/src/mongo/db/query/query_solution.cpp @@ -248,6 +248,7 @@ QuerySolutionNode* CollectionScanNode::clone() const { copy->direction = this->direction; copy->maxScan = this->maxScan; copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp; + copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility; return copy; } diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h index e6b2e148d7c..0e63b8cedfc 100644 --- a/src/mongo/db/query/query_solution.h +++ b/src/mongo/db/query/query_solution.h @@ -305,6 +305,9 @@ struct CollectionScanNode : public QuerySolutionNode { // maxScan option to .find() limits how many docs we look at. int maxScan; + + // Whether or not to wait for oplog visibility on oplog collection scans. + bool shouldWaitForOplogVisibility = false; }; struct AndHashNode : public QuerySolutionNode { diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index 2834498dc03..587264cfb9c 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -82,6 +82,7 @@ PlanStage* buildStages(OperationContext* opCtx, params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD : CollectionScanParams::BACKWARD; params.maxScan = csn->maxScan; + params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility; return new CollectionScan(opCtx, params, ws, csn->filter.get()); } case STAGE_IXSCAN: { diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 5ab2ab1c20f..b9cd7dee465 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -249,6 +249,13 @@ public: virtual void updateCommittedSnapshot(const OpTime& newCommitPoint) = 0; /** + * Updates the local snapshot to a consistent point for secondary reads. + * + * It is illegal to call with a optime that does not name an existing snapshot. + */ + virtual void updateLocalSnapshot(const OpTime& optime) = 0; + + /** * Returns whether or not the SnapshotThread is active. */ virtual bool snapshotsEnabled() const = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index d516784e32d..5ec7ca490b7 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -800,6 +800,13 @@ void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot( notifyOplogMetadataWaiters(newCommitPoint); } +void ReplicationCoordinatorExternalStateImpl::updateLocalSnapshot(const OpTime& optime) { + auto manager = _service->getGlobalStorageEngine()->getSnapshotManager(); + if (manager) { + manager->setLocalSnapshot(optime.getTimestamp()); + } +} + bool ReplicationCoordinatorExternalStateImpl::snapshotsEnabled() const { return _service->getGlobalStorageEngine()->getSnapshotManager() != nullptr; } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 633404e5dd8..242071f9cb7 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -98,6 +98,7 @@ public: virtual void startProducerIfStopped(); void dropAllSnapshots() final; void updateCommittedSnapshot(const OpTime& newCommitPoint) final; + void updateLocalSnapshot(const OpTime& optime) final; virtual bool snapshotsEnabled() const; virtual void notifyOplogMetadataWaiters(const OpTime& committedOpTime); virtual double getElectionTimeoutOffsetLimitFraction() const; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 758ee63404c..e0bbea95a66 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -214,6 +214,8 @@ void ReplicationCoordinatorExternalStateMock::dropAllSnapshots() {} void ReplicationCoordinatorExternalStateMock::updateCommittedSnapshot( const OpTime& newCommitPoint) {} +void ReplicationCoordinatorExternalStateMock::updateLocalSnapshot(const OpTime& optime) {} + bool ReplicationCoordinatorExternalStateMock::snapshotsEnabled() const { return _areSnapshotsEnabled; } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 7fa11481018..4beafc38d5b 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -87,6 +87,7 @@ public: virtual void startProducerIfStopped(); virtual void dropAllSnapshots(); virtual void updateCommittedSnapshot(const OpTime& newCommitPoint); + virtual void updateLocalSnapshot(const OpTime& optime); virtual bool snapshotsEnabled() const; virtual void notifyOplogMetadataWaiters(const OpTime& committedOpTime); virtual double getElectionTimeoutOffsetLimitFraction() const; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 1757f872725..5160f297ee7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1099,6 +1099,11 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op return; } + // Update the local snapshot before updating the stable timestamp on the storage engine. New + // transactions reading from the local snapshot should start before the oldest timestamp is + // advanced to avoid races. + _externalState->updateLocalSnapshot(opTime); + // Add the new applied optime to the list of stable optime candidates and then set the last // stable optime. Stable optimes are used to determine the last optime that it is safe to revert // the database to, in the event of a rollback via the 'recover to timestamp' method. If we are diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index 8fecccb2baf..e9cfe4c6b7b 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -161,6 +161,17 @@ public: } /** + * Tells the recovery unit to read at the last applied timestamp, tracked by the SnapshotManger. + * This should only be set to true for local and available read concerns. This should be used to + * read from a consistent state on a secondary while replicated batches are being applied. + */ + void setShouldReadAtLastAppliedTimestamp(bool value) { + invariant(!value || _readConcernLevel == repl::ReadConcernLevel::kLocalReadConcern || + _readConcernLevel == repl::ReadConcernLevel::kAvailableReadConcern); + _shouldReadAtLastAppliedTimestamp = value; + } + + /** * Returns the Timestamp being used by this recovery unit or boost::none if not reading from * a point in time. Any point in time returned will reflect either: * - A timestamp set via call to setPointInTimeReadTimestamp() @@ -353,6 +364,7 @@ protected: RecoveryUnit() {} repl::ReplicationCoordinator::Mode _replicationMode = repl::ReplicationCoordinator::modeNone; repl::ReadConcernLevel _readConcernLevel = repl::ReadConcernLevel::kLocalReadConcern; + bool _shouldReadAtLastAppliedTimestamp = false; }; } // namespace mongo diff --git a/src/mongo/db/storage/snapshot_manager.h b/src/mongo/db/storage/snapshot_manager.h index 4c1def141af..c4c97c32281 100644 --- a/src/mongo/db/storage/snapshot_manager.h +++ b/src/mongo/db/storage/snapshot_manager.h @@ -58,6 +58,16 @@ public: virtual void setCommittedSnapshot(const Timestamp& timestamp) = 0; /** + * Sets the snapshot for the last stable timestamp for reading on secondaries. + */ + virtual void setLocalSnapshot(const Timestamp& timestamp) = 0; + + /** + * Returns the local snapshot timestamp. + */ + virtual boost::optional<Timestamp> getLocalSnapshot() = 0; + + /** * Drops all snapshots and clears the "committed" snapshot. */ virtual void dropAllSnapshots() = 0; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index bff42b0b78b..f146143a255 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -319,6 +319,7 @@ void WiredTigerRecoveryUnit::_txnOpen() { } WT_SESSION* session = _session->getSession(); + auto ignorePrepare = _readConcernLevel == repl::ReadConcernLevel::kAvailableReadConcern; // '_readAtTimestamp' is available outside of a check for readConcern level 'snapshot' to // accommodate unit testing. Note that the order of this if/else chain below is important for // correctness. Also, note that we use the '_readAtTimestamp' to work around an oplog visibility @@ -343,14 +344,19 @@ void WiredTigerRecoveryUnit::_txnOpen() { _majorityCommittedSnapshot = _sessionCache->snapshotManager().beginTransactionOnCommittedSnapshot(session); } else if (_isOplogReader) { + invariant(!_shouldReadAtLastAppliedTimestamp); _sessionCache->snapshotManager().beginTransactionOnOplog( _sessionCache->getKVEngine()->getOplogManager(), session); + + } else if (_shouldReadAtLastAppliedTimestamp && + _sessionCache->snapshotManager().getLocalSnapshot()) { + // Read from the last applied timestamp (tracked globally by the SnapshotManager), which is + // the timestamp of the most recent completed replication batch operation. This should only + // be true for local or available readConcern on secondaries. + _sessionCache->snapshotManager().beginTransactionOnLocalSnapshot(session, ignorePrepare); } else { - invariantWTOK(session->begin_transaction( - session, - _readConcernLevel == repl::ReadConcernLevel::kAvailableReadConcern - ? "ignore_prepare=true" - : nullptr)); + invariantWTOK( + session->begin_transaction(session, ignorePrepare ? "ignore_prepare=true" : nullptr)); } LOG(3) << "WT begin_transaction for snapshot id " << _mySnapshotId; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp index baad47cf5c6..5e464dec4aa 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp @@ -48,28 +48,42 @@ namespace mongo { void WiredTigerSnapshotManager::setCommittedSnapshot(const Timestamp& timestamp) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lock(_committedSnapshotMutex); invariant(!_committedSnapshot || *_committedSnapshot <= timestamp); _committedSnapshot = timestamp; } +void WiredTigerSnapshotManager::setLocalSnapshot(const Timestamp& timestamp) { + stdx::lock_guard<stdx::mutex> lock(_localSnapshotMutex); + _localSnapshot = timestamp; +} + +boost::optional<Timestamp> WiredTigerSnapshotManager::getLocalSnapshot() { + stdx::lock_guard<stdx::mutex> lock(_localSnapshotMutex); + return _localSnapshot; +} + void WiredTigerSnapshotManager::dropAllSnapshots() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lock(_committedSnapshotMutex); _committedSnapshot = boost::none; } boost::optional<Timestamp> WiredTigerSnapshotManager::getMinSnapshotForNextCommittedRead() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lock(_committedSnapshotMutex); return _committedSnapshot; } Status WiredTigerSnapshotManager::setTransactionReadTimestamp(Timestamp pointInTime, - WT_SESSION* session) const { + WT_SESSION* session, + bool roundToOldest) const { char readTSConfigString[15 /* read_timestamp= */ + 16 /* 16 hexadecimal digits */ + - 1 /* trailing null */]; - auto size = std::snprintf( - readTSConfigString, sizeof(readTSConfigString), "read_timestamp=%llx", pointInTime.asULL()); + 17 /* ,round_to_oldest= */ + 5 /* false */ + 1 /* trailing null */]; + auto size = std::snprintf(readTSConfigString, + sizeof(readTSConfigString), + "read_timestamp=%llx,round_to_oldest=%s", + pointInTime.asULL(), + (roundToOldest) ? "true" : "false"); if (size < 0) { int e = errno; error() << "error snprintf " << errnoWithDescription(e); @@ -86,7 +100,7 @@ Timestamp WiredTigerSnapshotManager::beginTransactionOnCommittedSnapshot( auto rollbacker = MakeGuard([&] { invariant(session->rollback_transaction(session, nullptr) == 0); }); - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lock(_committedSnapshotMutex); uassert(ErrorCodes::ReadConcernMajorityNotAvailableYet, "Committed view disappeared while running operation", _committedSnapshot); @@ -97,25 +111,36 @@ Timestamp WiredTigerSnapshotManager::beginTransactionOnCommittedSnapshot( return *_committedSnapshot; } +void WiredTigerSnapshotManager::beginTransactionOnLocalSnapshot(WT_SESSION* session, + bool ignorePrepare) const { + invariantWTOK( + session->begin_transaction(session, (ignorePrepare) ? "ignore_prepare=true" : nullptr)); + auto rollbacker = + MakeGuard([&] { invariant(session->rollback_transaction(session, nullptr) == 0); }); + + stdx::lock_guard<stdx::mutex> lock(_localSnapshotMutex); + invariant(_localSnapshot); + + LOG(3) << "begin_transaction on local snapshot " << _localSnapshot.get().toString(); + auto status = setTransactionReadTimestamp(_localSnapshot.get(), session); + fassert(50775, status); + rollbacker.Dismiss(); +} + void WiredTigerSnapshotManager::beginTransactionOnOplog(WiredTigerOplogManager* oplogManager, WT_SESSION* session) const { invariantWTOK(session->begin_transaction(session, nullptr)); auto rollbacker = MakeGuard([&] { invariant(session->rollback_transaction(session, nullptr) == 0); }); - stdx::lock_guard<stdx::mutex> lock(_mutex); auto allCommittedTimestamp = oplogManager->getOplogReadTimestamp(); invariant(Timestamp(static_cast<unsigned long long>(allCommittedTimestamp)).asULL() == allCommittedTimestamp); auto status = setTransactionReadTimestamp( - Timestamp(static_cast<unsigned long long>(allCommittedTimestamp)), session); + Timestamp(static_cast<unsigned long long>(allCommittedTimestamp)), + session, + true /* roundToOldest */); - // If we failed to set the read timestamp, we assume it is due to the oldest_timestamp racing - // ahead. Rather than synchronizing for this rare case, if requested, throw a - // WriteConflictException which will be retried. - if (!status.isOK() && status.code() == ErrorCodes::BadValue) { - throw WriteConflictException(); - } fassert(50771, status); rollbacker.Dismiss(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h index 4eda95caa89..ff671e8a468 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h @@ -48,6 +48,8 @@ public: WiredTigerSnapshotManager() = default; void setCommittedSnapshot(const Timestamp& timestamp) final; + void setLocalSnapshot(const Timestamp& timestamp) final; + boost::optional<Timestamp> getLocalSnapshot() final; void dropAllSnapshots() final; // @@ -58,8 +60,13 @@ public: * Sets the read timestamp on a transaction. * * Reads will be reflect the state of data as of the specified timestamp. + * + * If roundToOldest is true, rounds the timestamp up to the oldest_timestamp if it is larger. + * The default is false. */ - Status setTransactionReadTimestamp(Timestamp pointInTime, WT_SESSION* session) const; + Status setTransactionReadTimestamp(Timestamp pointInTime, + WT_SESSION* session, + bool roundToOldest = false) const; /** * Starts a transaction and returns the SnapshotName used. @@ -69,6 +76,14 @@ public: Timestamp beginTransactionOnCommittedSnapshot(WT_SESSION* session) const; /** + * Starts a transaction on the last stable local timestamp, set by setLocalSnapshot. + * + * Throws if no local snapshot has been set. + */ + void beginTransactionOnLocalSnapshot(WT_SESSION* session, bool ignorePrepare) const; + + + /** * Starts a transaction on the oplog using an appropriate timestamp for oplog visiblity. */ void beginTransactionOnOplog(WiredTigerOplogManager* oplogManager, WT_SESSION* session) const; @@ -84,7 +99,12 @@ public: boost::optional<Timestamp> getMinSnapshotForNextCommittedRead() const; private: - mutable stdx::mutex _mutex; // Guards _committedSnapshot. + // Snapshot to use for reads at a commit timestamp. + mutable stdx::mutex _committedSnapshotMutex; // Guards _committedSnapshot. boost::optional<Timestamp> _committedSnapshot; + + // Snapshot to use for reads at a local stable timestamp. + mutable stdx::mutex _localSnapshotMutex; // Guards _localSnapshot. + boost::optional<Timestamp> _localSnapshot; }; } |