summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLouis Williams <louis.williams@mongodb.com>2018-03-20 13:18:07 -0400
committerLouis Williams <louis.williams@mongodb.com>2018-04-13 10:48:56 -0400
commit10bcc73a75ac857c290c3af6a3f89a45c4867043 (patch)
treeee4612c4807e4232f3242589f2d70bd9e6cbf4ab /src
parent235858ceee5d209d5e10579b79cfdc3bad7ff877 (diff)
downloadmongo-10bcc73a75ac857c290c3af6a3f89a45c4867043.tar.gz
SERVER-34192 Allow secondary reads during batch applications
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/db_raii.cpp176
-rw-r--r--src/mongo/db/db_raii.h19
-rw-r--r--src/mongo/db/exec/collection_scan.cpp8
-rw-r--r--src/mongo/db/exec/collection_scan_common.h3
-rw-r--r--src/mongo/db/query/get_executor.cpp35
-rw-r--r--src/mongo/db/query/get_executor.h8
-rw-r--r--src/mongo/db/query/internal_plans.cpp2
-rw-r--r--src/mongo/db/query/planner_access.cpp2
-rw-r--r--src/mongo/db/query/query_planner.cpp4
-rw-r--r--src/mongo/db/query/query_planner_params.h3
-rw-r--r--src/mongo/db/query/query_solution.cpp1
-rw-r--r--src/mongo/db/query/query_solution.h3
-rw-r--r--src/mongo/db/query/stage_builder.cpp1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp5
-rw-r--r--src/mongo/db/storage/recovery_unit.h12
-rw-r--r--src/mongo/db/storage/snapshot_manager.h10
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp16
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp57
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h24
24 files changed, 353 insertions, 54 deletions
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index df4b851aa99..03c3ee52ec5 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -26,15 +26,20 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
#include "mongo/platform/basic.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/concurrency/locker.h"
#include "mongo/db/curop.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/db/session_catalog.h"
+#include "mongo/util/log.h"
namespace mongo {
namespace {
@@ -43,6 +48,10 @@ const boost::optional<int> kDoNotChangeProfilingLevel = boost::none;
} // namespace
+// If true, do not take the PBWM lock in AutoGetCollectionForRead on secondaries during batch
+// application.
+MONGO_EXPORT_SERVER_PARAMETER(allowSecondaryReadsDuringBatchApplication, bool, true);
+
AutoStatsTracker::AutoStatsTracker(OperationContext* opCtx,
const NamespaceString& nss,
Top::LockType lockType,
@@ -78,45 +87,83 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
const NamespaceStringOrUUID& nsOrUUID,
AutoGetCollection::ViewMode viewMode,
Date_t deadline) {
+ // Don't take the ParallelBatchWriterMode lock when the server parameter is set and our
+ // storage engine supports snapshot reads.
+ if (allowSecondaryReadsDuringBatchApplication.load() &&
+ opCtx->getServiceContext()->getGlobalStorageEngine()->supportsReadConcernSnapshot()) {
+ _shouldNotConflictWithSecondaryBatchApplicationBlock.emplace(opCtx->lockState());
+ }
const auto collectionLockMode = getLockModeForQuery(opCtx);
_autoColl.emplace(opCtx, nsOrUUID, collectionLockMode, viewMode, deadline);
- while (true) {
- auto coll = _autoColl->getCollection();
- if (!coll) {
- return;
- }
-
+ repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx);
+ const auto readConcernLevel = opCtx->recoveryUnit()->getReadConcernLevel();
+
+ // If the collection doesn't exist or disappears after releasing locks and waiting, there is no
+ // need to check for pending catalog changes.
+ while (auto coll = _autoColl->getCollection()) {
+
+ // During batch application on secondaries, there is a potential to read inconsistent states
+ // that would normally be protected by the PBWM lock. In order to serve secondary reads
+ // during this period, we default to not acquiring the lock (by setting
+ // _shouldNotConflictWithSecondaryBatchApplicationBlock). On primaries, we always read at a
+ // consistent time, so not taking the PBWM lock is not a problem. On secondaries, we have to
+ // guarantee we read at a consistent state, so we must read at the last applied timestamp,
+ // which is set after each complete batch.
+ //
+ // If an attempt to read at the last applied timestamp is unsuccessful because there are
+ // pending catalog changes that occur after the last applied timestamp, we release our locks
+ // and try again with the PBWM lock (by unsetting
+ // _shouldNotConflictWithSecondaryBatchApplicationBlock).
+
+ const NamespaceString& nss = coll->ns();
+
+ // Read at the last applied timestamp if we don't have the PBWM lock and correct conditions
+ // are met.
+ bool readAtLastAppliedTimestamp = _shouldNotConflictWithSecondaryBatchApplicationBlock &&
+ _shouldDoSecondaryLocalSnapshotRead(opCtx, nss, readConcernLevel);
+
+ opCtx->recoveryUnit()->setShouldReadAtLastAppliedTimestamp(readAtLastAppliedTimestamp);
+
+ // This timestamp could be earlier than the timestamp seen when the transaction is opened
+ // because it is set asynchonously. This is not problematic because holding the collection
+ // lock guarantees no metadata changes will occur in that time.
+ auto lastAppliedTimestamp = boost::make_optional(
+ readAtLastAppliedTimestamp, replCoord->getMyLastAppliedOpTime().getTimestamp());
+
+ // Return if there are no conflicting catalog changes on the collection.
auto minSnapshot = coll->getMinimumVisibleSnapshot();
- if (!minSnapshot) {
- return;
- }
- auto mySnapshot = opCtx->recoveryUnit()->getPointInTimeReadTimestamp();
- if (!mySnapshot) {
- return;
- }
- if (mySnapshot >= minSnapshot) {
+ if (!_conflictingCatalogChanges(
+ opCtx, readConcernLevel, minSnapshot, lastAppliedTimestamp)) {
return;
}
- auto readConcernLevel = opCtx->recoveryUnit()->getReadConcernLevel();
- if (readConcernLevel == repl::ReadConcernLevel::kSnapshotReadConcern) {
- uasserted(ErrorCodes::SnapshotUnavailable,
- str::stream()
- << "Unable to read from a snapshot due to pending collection catalog "
- "changes; please retry the operation. Snapshot timestamp is "
- << mySnapshot->toString()
- << ". Collection minimum is "
- << minSnapshot->toString());
- }
- invariant(readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern);
+ invariant(lastAppliedTimestamp ||
+ readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern);
- // Yield locks in order to do the blocking call below
+ // Yield locks in order to do the blocking call below.
+ // This should only be called if we are doing a snapshot read at the last applied time or
+ // majority commit point.
_autoColl = boost::none;
- repl::ReplicationCoordinator::get(opCtx)->waitUntilSnapshotCommitted(opCtx, *minSnapshot);
+ // If there are pending catalog changes, we should conflict with any in-progress batches (by
+ // taking the PBWM lock) and choose not to read from the last applied timestamp by unsetting
+ // _shouldNotConflictWithSecondaryBatchApplicationBlock. Index builds on secondaries can
+ // complete at timestamps later than the lastAppliedTimestamp during initial sync
+ // (SERVER-34343). After initial sync finishes, if we waited instead of retrying, readers
+ // would block indefinitely waiting for the lastAppliedTimestamp to move forward. Instead we
+ // force the reader take the PBWM lock and retry.
+ if (lastAppliedTimestamp) {
+ LOG(2) << "Tried reading from local snapshot time: " << *lastAppliedTimestamp
+ << " on nss: " << nss.ns() << ", but future catalog changes are pending at time "
+ << *minSnapshot << ". Trying again without reading from the local snapshot";
+ _shouldNotConflictWithSecondaryBatchApplicationBlock = boost::none;
+ }
- uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot());
+ if (readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern) {
+ replCoord->waitUntilSnapshotCommitted(opCtx, *minSnapshot);
+ uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot());
+ }
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
@@ -127,6 +174,79 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
}
}
+bool AutoGetCollectionForRead::_shouldDoSecondaryLocalSnapshotRead(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ repl::ReadConcernLevel readConcernLevel) const {
+ // Majority and snapshot readConcern levels should not read from the local snapshot. For both we
+ // already wait until the desired timestamp, which is updated at the end of each batch.
+ if (readConcernLevel != repl::ReadConcernLevel::kLocalReadConcern &&
+ readConcernLevel != repl::ReadConcernLevel::kAvailableReadConcern) {
+ return false;
+ }
+
+ // If we are in a replication state (like secondary or primary catch-up) where we are not
+ // accepting writes, we should always read from the last applied snapshot. If this node can
+ // accept writes, then no conflicting replication batches are being applied and we can read from
+ // the default snapshot.
+ if (repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, "admin")) {
+ return false;
+ }
+
+ // Local and available read concerns should not be restricted to batch boundaries for
+ // non-replicated collections. Not being able to read from the last applied timestamp with
+ // non-network clients is tracked by SERVER-34440.
+ if (!nss.isReplicated() || !opCtx->getClient()->isFromUserConnection()) {
+ return false;
+ }
+ return true;
+}
+
+bool AutoGetCollectionForRead::_conflictingCatalogChanges(
+ OperationContext* opCtx,
+ repl::ReadConcernLevel readConcernLevel,
+ boost::optional<Timestamp> minSnapshot,
+ boost::optional<Timestamp> lastAppliedTimestamp) const {
+ // This is the timestamp of the most recent catalog changes to this collection. If this is
+ // greater than any point in time read timestamps, we should either wait or return an error.
+ if (!minSnapshot) {
+ return false;
+ }
+
+ // If we are reading from the lastAppliedTimestamp and it is up-to-date with any catalog
+ // changes, we can return.
+ if (lastAppliedTimestamp &&
+ (lastAppliedTimestamp->isNull() || *lastAppliedTimestamp >= *minSnapshot)) {
+ return false;
+ }
+
+ // This can be set when readConcern is "snapshot" or "majority".
+ auto mySnapshot = opCtx->recoveryUnit()->getPointInTimeReadTimestamp();
+
+ // If we do not have a point in time to conflict with minSnapshot, return.
+ if (!mySnapshot && !lastAppliedTimestamp) {
+ return false;
+ }
+
+ // Return if there are no conflicting catalog changes with mySnapshot.
+ if (mySnapshot && *mySnapshot >= *minSnapshot) {
+ return false;
+ }
+
+ // Snapshot readConcern can't yield its locks when there are catalog changes.
+ if (readConcernLevel == repl::ReadConcernLevel::kSnapshotReadConcern) {
+ uasserted(
+ ErrorCodes::SnapshotUnavailable,
+ str::stream() << "Unable to read from a snapshot due to pending collection catalog "
+ "changes; please retry the operation. Snapshot timestamp is "
+ << mySnapshot->toString()
+ << ". Collection minimum is "
+ << minSnapshot->toString());
+ }
+ return true;
+}
+
+
AutoGetCollectionForReadCommand::AutoGetCollectionForReadCommand(
OperationContext* opCtx,
const NamespaceStringOrUUID& nsOrUUID,
diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h
index c20968be550..63d8524d637 100644
--- a/src/mongo/db/db_raii.h
+++ b/src/mongo/db/db_raii.h
@@ -105,9 +105,28 @@ public:
}
private:
+ // If this field is set, the reader will not take the ParallelBatchWriterMode lock and conflict
+ // with secondary batch application. This stays in scope with the _autoColl so that locks are
+ // taken and released in the right order.
+ boost::optional<ShouldNotConflictWithSecondaryBatchApplicationBlock>
+ _shouldNotConflictWithSecondaryBatchApplicationBlock;
+
// This field is optional, because the code to wait for majority committed snapshot needs to
// release locks in order to block waiting
boost::optional<AutoGetCollection> _autoColl;
+
+ // Returns true if we should read from the local snapshot (last applied timestamp) on a
+ // secondary.
+ bool _shouldDoSecondaryLocalSnapshotRead(OperationContext* opCtx,
+ const NamespaceString& nss,
+ repl::ReadConcernLevel readConcernLevel) const;
+
+ // Returns true if the minSnapshot causes conflicting catalog changes for the provided read
+ // concern level or lastAppliedTimestamp.
+ bool _conflictingCatalogChanges(OperationContext* opCtx,
+ repl::ReadConcernLevel readConcernLevel,
+ boost::optional<Timestamp> minSnapshot,
+ boost::optional<Timestamp> lastAppliedTimestamp) const;
};
/**
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index a0cff1a7e81..ad73d673bbc 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -103,7 +103,7 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
if (needToMakeCursor) {
const bool forward = _params.direction == CollectionScanParams::FORWARD;
- if (forward && !_params.tailable && _params.collection->ns().isOplog()) {
+ if (forward && _params.shouldWaitForOplogVisibility) {
// Forward, non-tailable scans from the oplog need to wait until all oplog entries
// before the read begins to be visible. This isn't needed for reverse scans because
// we only hide oplog entries from forward scans, and it isn't necessary for tailing
@@ -112,8 +112,10 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
// might not include writes that finished before the read started. This also must be
// done before we create the cursor as that is when we establish the endpoint for
// the cursor. Also call abandonSnapshot to make sure that we are using a fresh
- // storage engine snapshot while waiting. Otherwise, we will end up reading from
- // the snapshot where the oplog entries are not yet visible even after the wait.
+ // storage engine snapshot while waiting. Otherwise, we will end up reading from the
+ // snapshot where the oplog entries are not yet visible even after the wait.
+ invariant(!_params.tailable && _params.collection->ns().isOplog());
+
getOpCtx()->recoveryUnit()->abandonSnapshot();
_params.collection->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(
getOpCtx());
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index 5f960f0a904..b6c141691aa 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -69,6 +69,9 @@ struct CollectionScanParams {
// If non-zero, how many documents will we look at?
size_t maxScan = 0;
+
+ // Whether or not to wait for oplog visibility on oplog collection scans.
+ bool shouldWaitForOplogVisibility = false;
};
} // namespace mongo
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index b1dcebfff3d..eed63cec24a 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -207,6 +207,39 @@ void fillOutPlannerParams(OperationContext* opCtx,
} else {
plannerParams->options |= QueryPlannerParams::KEEP_MUTATIONS;
}
+
+ if (shouldWaitForOplogVisibility(
+ opCtx, collection, canonicalQuery->getQueryRequest().isTailable())) {
+ plannerParams->options |= QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE;
+ }
+}
+
+bool shouldWaitForOplogVisibility(OperationContext* opCtx,
+ const Collection* collection,
+ bool tailable) {
+
+ // Only non-tailable cursors on the oplog are affected. Only forward cursors, not reverse
+ // cursors, are affected, but this is checked when the cursor is opened.
+ if (!collection->ns().isOplog() || tailable) {
+ return false;
+ }
+
+ // Only primaries should require readers to wait for oplog visibility. In any other replication
+ // state, readers read at the most visible oplog timestamp. The reason why readers on primaries
+ // need to wait is because multiple optimes can be allocated for operations before their entries
+ // are written to the storage engine. "Holes" will appear when an operation with a later optime
+ // commits before an operation with an earlier optime, and readers should wait so that all data
+ // is consistent.
+ //
+ // Secondaries can't wait for oplog visibility without the PBWM lock because it can introduce a
+ // hang while a batch application is in progress. The wait is done while holding a global lock,
+ // and the oplog visibility timestamp is updated at the end of every batch on a secondary,
+ // signalling the wait to complete. If a replication worker had a global lock and temporarily
+ // released it, a reader could acquire the lock to read the oplog. If the secondary reader were
+ // to wait for the oplog visibility timestamp to be updated, it would wait for a replication
+ // batch that would never complete because it couldn't reacquire its own lock, the global lock
+ // held by the waiting reader.
+ return repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, "admin");
}
namespace {
@@ -617,6 +650,8 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack(
params.tailable = cq->getQueryRequest().isTailable();
params.shouldTrackLatestOplogTimestamp =
plannerOptions & QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ params.shouldWaitForOplogVisibility =
+ shouldWaitForOplogVisibility(opCtx, collection, params.tailable);
// If the query is just a lower bound on "ts", we know that every document in the collection
// after the first matching one must also match. To avoid wasting time running the match
diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h
index 237f577de7d..118e0afbee2 100644
--- a/src/mongo/db/query/get_executor.h
+++ b/src/mongo/db/query/get_executor.h
@@ -64,6 +64,14 @@ void fillOutPlannerParams(OperationContext* opCtx,
QueryPlannerParams* plannerParams);
/**
+ * Determines whether or not to wait for oplog visibility for a query. This is only used for
+ * collection scans on the oplog.
+ */
+bool shouldWaitForOplogVisibility(OperationContext* opCtx,
+ const Collection* collection,
+ bool tailable);
+
+/**
* Get a plan executor for a query.
*
* If the query is valid and an executor could be created, returns a StatusWith with the
diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp
index 6b1ae0857b7..b47a4ce627d 100644
--- a/src/mongo/db/query/internal_plans.cpp
+++ b/src/mongo/db/query/internal_plans.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/exec/idhack.h"
#include "mongo/db/exec/index_scan.h"
#include "mongo/db/exec/update.h"
+#include "mongo/db/query/get_executor.h"
#include "mongo/stdx/memory.h"
namespace mongo {
@@ -179,6 +180,7 @@ std::unique_ptr<PlanStage> InternalPlanner::_collectionScan(OperationContext* op
CollectionScanParams params;
params.collection = collection;
params.start = startLoc;
+ params.shouldWaitForOplogVisibility = shouldWaitForOplogVisibility(opCtx, collection, false);
if (FORWARD == direction) {
params.direction = CollectionScanParams::FORWARD;
diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp
index 82d40993aea..92a35968227 100644
--- a/src/mongo/db/query/planner_access.cpp
+++ b/src/mongo/db/query/planner_access.cpp
@@ -153,6 +153,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
csn->maxScan = query.getQueryRequest().getMaxScan();
csn->shouldTrackLatestOplogTimestamp =
params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ csn->shouldWaitForOplogVisibility =
+ params.options & QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE;
// If the hint is {$natural: +-1} this changes the direction of the collection scan.
if (!query.getQueryRequest().getHint().isEmpty()) {
diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp
index b0e046fe98b..1e49b615c97 100644
--- a/src/mongo/db/query/query_planner.cpp
+++ b/src/mongo/db/query/query_planner.cpp
@@ -132,6 +132,10 @@ string optionString(size_t options) {
break;
case QueryPlannerParams::TRACK_LATEST_OPLOG_TS:
ss << "TRACK_LATEST_OPLOG_TS ";
+ break;
+ case QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE:
+ ss << "OPLOG_SCAN_WAIT_FOR_VISIBLE ";
+ break;
case QueryPlannerParams::DEFAULT:
MONGO_UNREACHABLE;
break;
diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h
index f54bab70458..85da68100bd 100644
--- a/src/mongo/db/query/query_planner_params.h
+++ b/src/mongo/db/query/query_planner_params.h
@@ -96,6 +96,9 @@ struct QueryPlannerParams {
// Set this to track the most recent timestamp seen by this cursor while scanning the oplog.
TRACK_LATEST_OPLOG_TS = 1 << 12,
+
+ // Set this so that collection scans on the oplog wait for visibility before reading.
+ OPLOG_SCAN_WAIT_FOR_VISIBLE = 1 << 13,
};
// See Options enum above.
diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp
index 62c884634d2..5f6cfd7e9a1 100644
--- a/src/mongo/db/query/query_solution.cpp
+++ b/src/mongo/db/query/query_solution.cpp
@@ -248,6 +248,7 @@ QuerySolutionNode* CollectionScanNode::clone() const {
copy->direction = this->direction;
copy->maxScan = this->maxScan;
copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp;
+ copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility;
return copy;
}
diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h
index e6b2e148d7c..0e63b8cedfc 100644
--- a/src/mongo/db/query/query_solution.h
+++ b/src/mongo/db/query/query_solution.h
@@ -305,6 +305,9 @@ struct CollectionScanNode : public QuerySolutionNode {
// maxScan option to .find() limits how many docs we look at.
int maxScan;
+
+ // Whether or not to wait for oplog visibility on oplog collection scans.
+ bool shouldWaitForOplogVisibility = false;
};
struct AndHashNode : public QuerySolutionNode {
diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp
index 2834498dc03..587264cfb9c 100644
--- a/src/mongo/db/query/stage_builder.cpp
+++ b/src/mongo/db/query/stage_builder.cpp
@@ -82,6 +82,7 @@ PlanStage* buildStages(OperationContext* opCtx,
params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD
: CollectionScanParams::BACKWARD;
params.maxScan = csn->maxScan;
+ params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility;
return new CollectionScan(opCtx, params, ws, csn->filter.get());
}
case STAGE_IXSCAN: {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 5ab2ab1c20f..b9cd7dee465 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -249,6 +249,13 @@ public:
virtual void updateCommittedSnapshot(const OpTime& newCommitPoint) = 0;
/**
+ * Updates the local snapshot to a consistent point for secondary reads.
+ *
+ * It is illegal to call with a optime that does not name an existing snapshot.
+ */
+ virtual void updateLocalSnapshot(const OpTime& optime) = 0;
+
+ /**
* Returns whether or not the SnapshotThread is active.
*/
virtual bool snapshotsEnabled() const = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index d516784e32d..5ec7ca490b7 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -800,6 +800,13 @@ void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(
notifyOplogMetadataWaiters(newCommitPoint);
}
+void ReplicationCoordinatorExternalStateImpl::updateLocalSnapshot(const OpTime& optime) {
+ auto manager = _service->getGlobalStorageEngine()->getSnapshotManager();
+ if (manager) {
+ manager->setLocalSnapshot(optime.getTimestamp());
+ }
+}
+
bool ReplicationCoordinatorExternalStateImpl::snapshotsEnabled() const {
return _service->getGlobalStorageEngine()->getSnapshotManager() != nullptr;
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 633404e5dd8..242071f9cb7 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -98,6 +98,7 @@ public:
virtual void startProducerIfStopped();
void dropAllSnapshots() final;
void updateCommittedSnapshot(const OpTime& newCommitPoint) final;
+ void updateLocalSnapshot(const OpTime& optime) final;
virtual bool snapshotsEnabled() const;
virtual void notifyOplogMetadataWaiters(const OpTime& committedOpTime);
virtual double getElectionTimeoutOffsetLimitFraction() const;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 758ee63404c..e0bbea95a66 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -214,6 +214,8 @@ void ReplicationCoordinatorExternalStateMock::dropAllSnapshots() {}
void ReplicationCoordinatorExternalStateMock::updateCommittedSnapshot(
const OpTime& newCommitPoint) {}
+void ReplicationCoordinatorExternalStateMock::updateLocalSnapshot(const OpTime& optime) {}
+
bool ReplicationCoordinatorExternalStateMock::snapshotsEnabled() const {
return _areSnapshotsEnabled;
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 7fa11481018..4beafc38d5b 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -87,6 +87,7 @@ public:
virtual void startProducerIfStopped();
virtual void dropAllSnapshots();
virtual void updateCommittedSnapshot(const OpTime& newCommitPoint);
+ virtual void updateLocalSnapshot(const OpTime& optime);
virtual bool snapshotsEnabled() const;
virtual void notifyOplogMetadataWaiters(const OpTime& committedOpTime);
virtual double getElectionTimeoutOffsetLimitFraction() const;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 1757f872725..5160f297ee7 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1099,6 +1099,11 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op
return;
}
+ // Update the local snapshot before updating the stable timestamp on the storage engine. New
+ // transactions reading from the local snapshot should start before the oldest timestamp is
+ // advanced to avoid races.
+ _externalState->updateLocalSnapshot(opTime);
+
// Add the new applied optime to the list of stable optime candidates and then set the last
// stable optime. Stable optimes are used to determine the last optime that it is safe to revert
// the database to, in the event of a rollback via the 'recover to timestamp' method. If we are
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index 8fecccb2baf..e9cfe4c6b7b 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -161,6 +161,17 @@ public:
}
/**
+ * Tells the recovery unit to read at the last applied timestamp, tracked by the SnapshotManger.
+ * This should only be set to true for local and available read concerns. This should be used to
+ * read from a consistent state on a secondary while replicated batches are being applied.
+ */
+ void setShouldReadAtLastAppliedTimestamp(bool value) {
+ invariant(!value || _readConcernLevel == repl::ReadConcernLevel::kLocalReadConcern ||
+ _readConcernLevel == repl::ReadConcernLevel::kAvailableReadConcern);
+ _shouldReadAtLastAppliedTimestamp = value;
+ }
+
+ /**
* Returns the Timestamp being used by this recovery unit or boost::none if not reading from
* a point in time. Any point in time returned will reflect either:
* - A timestamp set via call to setPointInTimeReadTimestamp()
@@ -353,6 +364,7 @@ protected:
RecoveryUnit() {}
repl::ReplicationCoordinator::Mode _replicationMode = repl::ReplicationCoordinator::modeNone;
repl::ReadConcernLevel _readConcernLevel = repl::ReadConcernLevel::kLocalReadConcern;
+ bool _shouldReadAtLastAppliedTimestamp = false;
};
} // namespace mongo
diff --git a/src/mongo/db/storage/snapshot_manager.h b/src/mongo/db/storage/snapshot_manager.h
index 4c1def141af..c4c97c32281 100644
--- a/src/mongo/db/storage/snapshot_manager.h
+++ b/src/mongo/db/storage/snapshot_manager.h
@@ -58,6 +58,16 @@ public:
virtual void setCommittedSnapshot(const Timestamp& timestamp) = 0;
/**
+ * Sets the snapshot for the last stable timestamp for reading on secondaries.
+ */
+ virtual void setLocalSnapshot(const Timestamp& timestamp) = 0;
+
+ /**
+ * Returns the local snapshot timestamp.
+ */
+ virtual boost::optional<Timestamp> getLocalSnapshot() = 0;
+
+ /**
* Drops all snapshots and clears the "committed" snapshot.
*/
virtual void dropAllSnapshots() = 0;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index bff42b0b78b..f146143a255 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -319,6 +319,7 @@ void WiredTigerRecoveryUnit::_txnOpen() {
}
WT_SESSION* session = _session->getSession();
+ auto ignorePrepare = _readConcernLevel == repl::ReadConcernLevel::kAvailableReadConcern;
// '_readAtTimestamp' is available outside of a check for readConcern level 'snapshot' to
// accommodate unit testing. Note that the order of this if/else chain below is important for
// correctness. Also, note that we use the '_readAtTimestamp' to work around an oplog visibility
@@ -343,14 +344,19 @@ void WiredTigerRecoveryUnit::_txnOpen() {
_majorityCommittedSnapshot =
_sessionCache->snapshotManager().beginTransactionOnCommittedSnapshot(session);
} else if (_isOplogReader) {
+ invariant(!_shouldReadAtLastAppliedTimestamp);
_sessionCache->snapshotManager().beginTransactionOnOplog(
_sessionCache->getKVEngine()->getOplogManager(), session);
+
+ } else if (_shouldReadAtLastAppliedTimestamp &&
+ _sessionCache->snapshotManager().getLocalSnapshot()) {
+ // Read from the last applied timestamp (tracked globally by the SnapshotManager), which is
+ // the timestamp of the most recent completed replication batch operation. This should only
+ // be true for local or available readConcern on secondaries.
+ _sessionCache->snapshotManager().beginTransactionOnLocalSnapshot(session, ignorePrepare);
} else {
- invariantWTOK(session->begin_transaction(
- session,
- _readConcernLevel == repl::ReadConcernLevel::kAvailableReadConcern
- ? "ignore_prepare=true"
- : nullptr));
+ invariantWTOK(
+ session->begin_transaction(session, ignorePrepare ? "ignore_prepare=true" : nullptr));
}
LOG(3) << "WT begin_transaction for snapshot id " << _mySnapshotId;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp
index baad47cf5c6..5e464dec4aa 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp
@@ -48,28 +48,42 @@
namespace mongo {
void WiredTigerSnapshotManager::setCommittedSnapshot(const Timestamp& timestamp) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lock(_committedSnapshotMutex);
invariant(!_committedSnapshot || *_committedSnapshot <= timestamp);
_committedSnapshot = timestamp;
}
+void WiredTigerSnapshotManager::setLocalSnapshot(const Timestamp& timestamp) {
+ stdx::lock_guard<stdx::mutex> lock(_localSnapshotMutex);
+ _localSnapshot = timestamp;
+}
+
+boost::optional<Timestamp> WiredTigerSnapshotManager::getLocalSnapshot() {
+ stdx::lock_guard<stdx::mutex> lock(_localSnapshotMutex);
+ return _localSnapshot;
+}
+
void WiredTigerSnapshotManager::dropAllSnapshots() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lock(_committedSnapshotMutex);
_committedSnapshot = boost::none;
}
boost::optional<Timestamp> WiredTigerSnapshotManager::getMinSnapshotForNextCommittedRead() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lock(_committedSnapshotMutex);
return _committedSnapshot;
}
Status WiredTigerSnapshotManager::setTransactionReadTimestamp(Timestamp pointInTime,
- WT_SESSION* session) const {
+ WT_SESSION* session,
+ bool roundToOldest) const {
char readTSConfigString[15 /* read_timestamp= */ + 16 /* 16 hexadecimal digits */ +
- 1 /* trailing null */];
- auto size = std::snprintf(
- readTSConfigString, sizeof(readTSConfigString), "read_timestamp=%llx", pointInTime.asULL());
+ 17 /* ,round_to_oldest= */ + 5 /* false */ + 1 /* trailing null */];
+ auto size = std::snprintf(readTSConfigString,
+ sizeof(readTSConfigString),
+ "read_timestamp=%llx,round_to_oldest=%s",
+ pointInTime.asULL(),
+ (roundToOldest) ? "true" : "false");
if (size < 0) {
int e = errno;
error() << "error snprintf " << errnoWithDescription(e);
@@ -86,7 +100,7 @@ Timestamp WiredTigerSnapshotManager::beginTransactionOnCommittedSnapshot(
auto rollbacker =
MakeGuard([&] { invariant(session->rollback_transaction(session, nullptr) == 0); });
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lock(_committedSnapshotMutex);
uassert(ErrorCodes::ReadConcernMajorityNotAvailableYet,
"Committed view disappeared while running operation",
_committedSnapshot);
@@ -97,25 +111,36 @@ Timestamp WiredTigerSnapshotManager::beginTransactionOnCommittedSnapshot(
return *_committedSnapshot;
}
+void WiredTigerSnapshotManager::beginTransactionOnLocalSnapshot(WT_SESSION* session,
+ bool ignorePrepare) const {
+ invariantWTOK(
+ session->begin_transaction(session, (ignorePrepare) ? "ignore_prepare=true" : nullptr));
+ auto rollbacker =
+ MakeGuard([&] { invariant(session->rollback_transaction(session, nullptr) == 0); });
+
+ stdx::lock_guard<stdx::mutex> lock(_localSnapshotMutex);
+ invariant(_localSnapshot);
+
+ LOG(3) << "begin_transaction on local snapshot " << _localSnapshot.get().toString();
+ auto status = setTransactionReadTimestamp(_localSnapshot.get(), session);
+ fassert(50775, status);
+ rollbacker.Dismiss();
+}
+
void WiredTigerSnapshotManager::beginTransactionOnOplog(WiredTigerOplogManager* oplogManager,
WT_SESSION* session) const {
invariantWTOK(session->begin_transaction(session, nullptr));
auto rollbacker =
MakeGuard([&] { invariant(session->rollback_transaction(session, nullptr) == 0); });
- stdx::lock_guard<stdx::mutex> lock(_mutex);
auto allCommittedTimestamp = oplogManager->getOplogReadTimestamp();
invariant(Timestamp(static_cast<unsigned long long>(allCommittedTimestamp)).asULL() ==
allCommittedTimestamp);
auto status = setTransactionReadTimestamp(
- Timestamp(static_cast<unsigned long long>(allCommittedTimestamp)), session);
+ Timestamp(static_cast<unsigned long long>(allCommittedTimestamp)),
+ session,
+ true /* roundToOldest */);
- // If we failed to set the read timestamp, we assume it is due to the oldest_timestamp racing
- // ahead. Rather than synchronizing for this rare case, if requested, throw a
- // WriteConflictException which will be retried.
- if (!status.isOK() && status.code() == ErrorCodes::BadValue) {
- throw WriteConflictException();
- }
fassert(50771, status);
rollbacker.Dismiss();
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h
index 4eda95caa89..ff671e8a468 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h
@@ -48,6 +48,8 @@ public:
WiredTigerSnapshotManager() = default;
void setCommittedSnapshot(const Timestamp& timestamp) final;
+ void setLocalSnapshot(const Timestamp& timestamp) final;
+ boost::optional<Timestamp> getLocalSnapshot() final;
void dropAllSnapshots() final;
//
@@ -58,8 +60,13 @@ public:
* Sets the read timestamp on a transaction.
*
* Reads will be reflect the state of data as of the specified timestamp.
+ *
+ * If roundToOldest is true, rounds the timestamp up to the oldest_timestamp if it is larger.
+ * The default is false.
*/
- Status setTransactionReadTimestamp(Timestamp pointInTime, WT_SESSION* session) const;
+ Status setTransactionReadTimestamp(Timestamp pointInTime,
+ WT_SESSION* session,
+ bool roundToOldest = false) const;
/**
* Starts a transaction and returns the SnapshotName used.
@@ -69,6 +76,14 @@ public:
Timestamp beginTransactionOnCommittedSnapshot(WT_SESSION* session) const;
/**
+ * Starts a transaction on the last stable local timestamp, set by setLocalSnapshot.
+ *
+ * Throws if no local snapshot has been set.
+ */
+ void beginTransactionOnLocalSnapshot(WT_SESSION* session, bool ignorePrepare) const;
+
+
+ /**
* Starts a transaction on the oplog using an appropriate timestamp for oplog visiblity.
*/
void beginTransactionOnOplog(WiredTigerOplogManager* oplogManager, WT_SESSION* session) const;
@@ -84,7 +99,12 @@ public:
boost::optional<Timestamp> getMinSnapshotForNextCommittedRead() const;
private:
- mutable stdx::mutex _mutex; // Guards _committedSnapshot.
+ // Snapshot to use for reads at a commit timestamp.
+ mutable stdx::mutex _committedSnapshotMutex; // Guards _committedSnapshot.
boost::optional<Timestamp> _committedSnapshot;
+
+ // Snapshot to use for reads at a local stable timestamp.
+ mutable stdx::mutex _localSnapshotMutex; // Guards _localSnapshot.
+ boost::optional<Timestamp> _localSnapshot;
};
}