summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorXiangyu Yao <xiangyu.yao@mongodb.com>2018-03-07 15:10:10 -0500
committerXiangyu Yao <xiangyu.yao@mongodb.com>2018-03-09 18:06:06 -0500
commit4ef0fe789ef349307c4cffd6548dc8657059cca5 (patch)
treed13f1dd19a97ca4e9a1e082f566bb0d59cee3147 /src
parented1e2b4d2a4987e3744484f9482fdc7a0e119e94 (diff)
downloadmongo-4ef0fe789ef349307c4cffd6548dc8657059cca5.tar.gz
SERVER-33609 Pass readConcernLevel to WiredTigerRecoveryUnit
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/clientcursor.cpp2
-rw-r--r--src/mongo/db/clientcursor.h13
-rw-r--r--src/mongo/db/commands/find_cmd.cpp2
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp14
-rw-r--r--src/mongo/db/commands/list_collections.cpp2
-rw-r--r--src/mongo/db/commands/list_indexes.cpp2
-rw-r--r--src/mongo/db/commands/parallel_collection_scan.cpp2
-rw-r--r--src/mongo/db/commands/repair_cursor.cpp2
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp2
-rw-r--r--src/mongo/db/db_raii.cpp2
-rw-r--r--src/mongo/db/query/find.cpp16
-rw-r--r--src/mongo/db/read_concern.cpp7
-rw-r--r--src/mongo/db/repl/read_concern_args.h9
-rw-r--r--src/mongo/db/repl/read_concern_level.h43
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp2
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp2
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h2
-rw-r--r--src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp30
-rw-r--r--src/mongo/db/storage/recovery_unit.h29
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp9
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h9
-rw-r--r--src/mongo/dbtests/cursor_manager_test.cpp136
-rw-r--r--src/mongo/s/client/rs_local_client.cpp11
23 files changed, 257 insertions, 91 deletions
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index a2354f0e6d3..27d851fa043 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -85,7 +85,7 @@ ClientCursor::ClientCursor(ClientCursorParams params,
_authenticatedUsers(std::move(params.authenticatedUsers)),
_lsid(operationUsingCursor->getLogicalSessionId()),
_txnNumber(operationUsingCursor->getTxnNumber()),
- _isReadCommitted(params.isReadCommitted),
+ _readConcernLevel(params.readConcernLevel),
_cursorManager(cursorManager),
_originatingCommand(params.originatingCommandObj),
_queryOptions(params.queryOptions),
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index b5e917acbe0..b811fc47722 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -37,6 +37,7 @@
#include "mongo/db/logical_session_id.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/record_id.h"
+#include "mongo/db/repl/read_concern_level.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/net/message.h"
@@ -57,11 +58,11 @@ struct ClientCursorParams {
ClientCursorParams(std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor,
NamespaceString nss,
UserNameIterator authenticatedUsersIter,
- bool isReadCommitted,
+ repl::ReadConcernLevel readConcernLevel,
BSONObj originatingCommandObj)
: exec(std::move(planExecutor)),
nss(std::move(nss)),
- isReadCommitted(isReadCommitted),
+ readConcernLevel(readConcernLevel),
queryOptions(exec->getCanonicalQuery()
? exec->getCanonicalQuery()->getQueryRequest().getOptions()
: 0),
@@ -88,7 +89,7 @@ struct ClientCursorParams {
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
const NamespaceString nss;
std::vector<UserName> authenticatedUsers;
- bool isReadCommitted = false;
+ const repl::ReadConcernLevel readConcernLevel;
int queryOptions = 0;
BSONObj originatingCommandObj;
};
@@ -133,8 +134,8 @@ public:
return _txnNumber;
}
- bool isReadCommitted() const {
- return _isReadCommitted;
+ repl::ReadConcernLevel getReadConcernLevel() const {
+ return _readConcernLevel;
}
/**
@@ -307,7 +308,7 @@ private:
// A transaction number for this cursor, if it was provided in the originating command.
const boost::optional<TxnNumber> _txnNumber;
- const bool _isReadCommitted = false;
+ const repl::ReadConcernLevel _readConcernLevel;
CursorManager* _cursorManager;
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 9f4e12cd062..5a7bf9c0cec 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -379,7 +379,7 @@ public:
{std::move(exec),
nss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ opCtx->recoveryUnit()->getReadConcernLevel(),
cmdObj});
cursorId = pinnedCursor.getCursor()->cursorid();
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index d293b5c09f5..6674092b823 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -391,8 +391,18 @@ public:
ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin.getValue());
stashedCursorIndicator.Dismiss();
- if (cursor->isReadCommitted())
- uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
+ const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode();
+ opCtx->recoveryUnit()->setReadConcernLevelAndReplicationMode(cursor->getReadConcernLevel(),
+ replicationMode);
+
+ // TODO SERVER-33698: Remove kSnapshotReadConcern clause once we can guarantee that a
+ // readConcern level snapshot getMore will have an established point-in-time WiredTiger
+ // snapshot.
+ if (replicationMode == repl::ReplicationCoordinator::modeReplSet &&
+ (cursor->getReadConcernLevel() == repl::ReadConcernLevel::kMajorityReadConcern ||
+ cursor->getReadConcernLevel() == repl::ReadConcernLevel::kSnapshotReadConcern)) {
+ uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot());
+ }
const bool disableAwaitDataFailpointActive =
MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd);
diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp
index 035ea4c6271..822baef6de2 100644
--- a/src/mongo/db/commands/list_collections.cpp
+++ b/src/mongo/db/commands/list_collections.cpp
@@ -345,7 +345,7 @@ public:
{std::move(exec),
cursorNss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ opCtx->recoveryUnit()->getReadConcernLevel(),
jsobj});
cursorId = pinnedCursor.getCursor()->cursorid();
}
diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp
index a64f91dc080..19993d6084b 100644
--- a/src/mongo/db/commands/list_indexes.cpp
+++ b/src/mongo/db/commands/list_indexes.cpp
@@ -198,7 +198,7 @@ public:
{std::move(exec),
cursorNss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ opCtx->recoveryUnit()->getReadConcernLevel(),
cmdObj});
cursorId = pinnedCursor.getCursor()->cursorid();
}
diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp
index 05b8dbff677..c040e9e0510 100644
--- a/src/mongo/db/commands/parallel_collection_scan.cpp
+++ b/src/mongo/db/commands/parallel_collection_scan.cpp
@@ -155,7 +155,7 @@ public:
{std::move(exec),
nss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ opCtx->recoveryUnit()->getReadConcernLevel(),
cmdObj});
pinnedCursor.getCursor()->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp
index 4234a0d91d6..f8d3b058d2d 100644
--- a/src/mongo/db/commands/repair_cursor.cpp
+++ b/src/mongo/db/commands/repair_cursor.cpp
@@ -105,7 +105,7 @@ public:
{std::move(exec),
ns,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ opCtx->recoveryUnit()->getReadConcernLevel(),
cmdObj});
appendCursorResponseObject(
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 066ad027e4f..d33a1a061ab 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -521,7 +521,7 @@ Status runAggregate(OperationContext* opCtx,
std::move(exec),
origNss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ opCtx->recoveryUnit()->getReadConcernLevel(),
cmdObj);
if (expCtx->tailableMode == TailableMode::kTailableAndAwaitData) {
cursorParams.setTailable(true);
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index 6b9a695d02b..f42857afb76 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -104,7 +104,7 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
repl::ReplicationCoordinator::get(opCtx)->waitUntilSnapshotCommitted(opCtx, *minSnapshot);
- uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
+ uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot());
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 85370429156..8b01d239967 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -358,8 +358,18 @@ Message getMore(OperationContext* opCtx,
*isCursorAuthorized = true;
- if (cc->isReadCommitted())
- uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
+ const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode();
+ opCtx->recoveryUnit()->setReadConcernLevelAndReplicationMode(cc->getReadConcernLevel(),
+ replicationMode);
+
+ // TODO SERVER-33698: Remove kSnapshotReadConcern clause once we can guarantee that a
+ // readConcern level snapshot getMore will have an established point-in-time WiredTiger
+ // snapshot.
+ if (replicationMode == repl::ReplicationCoordinator::modeReplSet &&
+ (cc->getReadConcernLevel() == repl::ReadConcernLevel::kMajorityReadConcern ||
+ cc->getReadConcernLevel() == repl::ReadConcernLevel::kSnapshotReadConcern)) {
+ uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot());
+ }
uassert(40548,
"OP_GET_MORE operations are not supported on tailable aggregations. Only clients "
@@ -690,7 +700,7 @@ std::string runQuery(OperationContext* opCtx,
{std::move(exec),
nss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ opCtx->recoveryUnit()->getReadConcernLevel(),
upconvertQueryEntry(q.query, qr.nss(), q.ntoreturn, q.ntoskip)});
ccId = pinnedCursor.getCursor()->cursorid();
diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp
index 7c054469bd5..4e8c04440f8 100644
--- a/src/mongo/db/read_concern.cpp
+++ b/src/mongo/db/read_concern.cpp
@@ -207,6 +207,9 @@ Status waitForReadConcern(OperationContext* opCtx,
repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx);
invariant(replCoord);
+ opCtx->recoveryUnit()->setReadConcernLevelAndReplicationMode(readConcernArgs.getLevel(),
+ replCoord->getReplicationMode());
+
if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) {
if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) {
// For master/slave and standalone nodes, Linearizable Read is not supported.
@@ -299,13 +302,13 @@ Status waitForReadConcern(OperationContext* opCtx,
LOG(debugLevel) << "Waiting for 'committed' snapshot to be available for reading: "
<< readConcernArgs;
- Status status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
+ Status status = opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot();
// Wait until a snapshot is available.
while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) {
LOG(debugLevel) << "Snapshot not available yet.";
replCoord->waitUntilSnapshotCommitted(opCtx, Timestamp());
- status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
+ status = opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot();
}
if (!status.isOK()) {
diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h
index f8a6226ecf8..29aab009a89 100644
--- a/src/mongo/db/repl/read_concern_args.h
+++ b/src/mongo/db/repl/read_concern_args.h
@@ -36,6 +36,7 @@
#include "mongo/db/logical_time.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/read_concern_level.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -44,14 +45,6 @@ class BSONObj;
namespace repl {
-enum class ReadConcernLevel {
- kLocalReadConcern,
- kMajorityReadConcern,
- kLinearizableReadConcern,
- kAvailableReadConcern,
- kSnapshotReadConcern
-};
-
class ReadConcernArgs {
public:
static const std::string kReadConcernFieldName;
diff --git a/src/mongo/db/repl/read_concern_level.h b/src/mongo/db/repl/read_concern_level.h
new file mode 100644
index 00000000000..ffb91250afa
--- /dev/null
+++ b/src/mongo/db/repl/read_concern_level.h
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+namespace mongo {
+namespace repl {
+
+enum class ReadConcernLevel {
+ kLocalReadConcern,
+ kMajorityReadConcern,
+ kLinearizableReadConcern,
+ kAvailableReadConcern,
+ kSnapshotReadConcern
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index 3b8eac55cdd..ddacb478aa6 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -394,7 +394,7 @@ TEST_F(StorageInterfaceImplTest, GetRollbackIDReturnsBadStatusIfRollbackIDIsNotI
TEST_F(StorageInterfaceImplTest, SnapshotSupported) {
auto opCtx = getOperationContext();
- Status status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
+ Status status = opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot();
ASSERT(status.isOK());
}
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
index 800b6b6f1cc..97471b21ca2 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
@@ -67,7 +67,7 @@ void EphemeralForTestRecoveryUnit::abortUnitOfWork() {
}
}
-Status EphemeralForTestRecoveryUnit::setReadFromMajorityCommittedSnapshot() {
+Status EphemeralForTestRecoveryUnit::obtainMajorityCommittedSnapshot() {
return Status::OK();
}
}
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
index 7de00846992..ad6f9091295 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
@@ -58,7 +58,7 @@ public:
virtual void abandonSnapshot() {}
- Status setReadFromMajorityCommittedSnapshot() final;
+ Status obtainMajorityCommittedSnapshot() final;
virtual void registerChange(Change* change) {
_changes.push_back(ChangePtr(change));
diff --git a/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp b/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp
index 7112ce561c8..a99e632a456 100644
--- a/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp
+++ b/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp
@@ -33,6 +33,8 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/operation_context_noop.h"
+#include "mongo/db/repl/read_concern_level.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context_noop.h"
#include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/db/storage/record_store.h"
@@ -142,13 +144,19 @@ public:
int itCountCommitted() {
auto op = makeOperation();
- ASSERT_OK(op->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
+ op->recoveryUnit()->setReadConcernLevelAndReplicationMode(
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ repl::ReplicationCoordinator::modeReplSet);
+ ASSERT_OK(op->recoveryUnit()->obtainMajorityCommittedSnapshot());
return itCountOn(op);
}
boost::optional<Record> readRecordCommitted(RecordId id) {
auto op = makeOperation();
- ASSERT_OK(op->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
+ op->recoveryUnit()->setReadConcernLevelAndReplicationMode(
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ repl::ReplicationCoordinator::modeReplSet);
+ ASSERT_OK(op->recoveryUnit()->obtainMajorityCommittedSnapshot());
auto cursor = rs->getCursor(op);
auto record = cursor->seekExact(id);
if (record)
@@ -204,23 +212,25 @@ TEST_F(SnapshotManagerTests, FailsWithNoCommittedSnapshot) {
auto op = makeOperation();
auto ru = op->recoveryUnit();
+ ru->setReadConcernLevelAndReplicationMode(repl::ReadConcernLevel::kMajorityReadConcern,
+ repl::ReplicationCoordinator::modeReplSet);
// Before first snapshot is created.
- ASSERT_EQ(ru->setReadFromMajorityCommittedSnapshot(),
+ ASSERT_EQ(ru->obtainMajorityCommittedSnapshot(),
ErrorCodes::ReadConcernMajorityNotAvailableYet);
// There is a snapshot but it isn't committed.
auto snap = fetchAndIncrementTimestamp();
- ASSERT_EQ(ru->setReadFromMajorityCommittedSnapshot(),
+ ASSERT_EQ(ru->obtainMajorityCommittedSnapshot(),
ErrorCodes::ReadConcernMajorityNotAvailableYet);
// Now there is a committed snapshot.
snapshotManager->setCommittedSnapshot(snap);
- ASSERT_OK(ru->setReadFromMajorityCommittedSnapshot());
+ ASSERT_OK(ru->obtainMajorityCommittedSnapshot());
// Not anymore!
snapshotManager->dropAllSnapshots();
- ASSERT_EQ(ru->setReadFromMajorityCommittedSnapshot(),
+ ASSERT_EQ(ru->obtainMajorityCommittedSnapshot(),
ErrorCodes::ReadConcernMajorityNotAvailableYet);
}
@@ -229,11 +239,13 @@ TEST_F(SnapshotManagerTests, FailsAfterDropAllSnapshotsWhileYielded) {
return; // This test is only for engines that DO support SnapshotMangers.
auto op = makeOperation();
+ op->recoveryUnit()->setReadConcernLevelAndReplicationMode(
+ repl::ReadConcernLevel::kMajorityReadConcern, repl::ReplicationCoordinator::modeReplSet);
// Start an operation using a committed snapshot.
auto snap = fetchAndIncrementTimestamp();
snapshotManager->setCommittedSnapshot(snap);
- ASSERT_OK(op->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
+ ASSERT_OK(op->recoveryUnit()->obtainMajorityCommittedSnapshot());
ASSERT_EQ(itCountOn(op), 0); // acquires a snapshot.
// Everything still works until we abandon our snapshot.
@@ -284,7 +296,9 @@ TEST_F(SnapshotManagerTests, BasicFunctionality) {
// This op should keep its original snapshot until abandoned.
auto longOp = makeOperation();
- ASSERT_OK(longOp->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
+ longOp->recoveryUnit()->setReadConcernLevelAndReplicationMode(
+ repl::ReadConcernLevel::kMajorityReadConcern, repl::ReplicationCoordinator::modeReplSet);
+ ASSERT_OK(longOp->recoveryUnit()->obtainMajorityCommittedSnapshot());
ASSERT_EQ(itCountOn(longOp), 3);
// If this fails, the snapshot contains writes that were rolled back.
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index bbc56dadb73..846e7015eb9 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -35,6 +35,8 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
#include "mongo/bson/timestamp.h"
+#include "mongo/db/repl/read_concern_level.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/storage/snapshot.h"
namespace mongo {
@@ -93,9 +95,8 @@ public:
virtual void preallocateSnapshot() {}
/**
- * Informs this RecoveryUnit that all future reads through it should be from a snapshot
- * marked as Majority Committed. Snapshots should still be separately acquired and newer
- * committed snapshots should be used if available whenever implementations would normally
+ * Obtains a majority committed snapshot. Snapshots should still be separately acquired and
+ * newer committed snapshots should be used if available whenever implementations would normally
* change snapshots.
*
* If no snapshot has yet been marked as Majority Committed, returns a status with error code
@@ -107,19 +108,35 @@ public:
* StorageEngines that don't support a SnapshotManager should use the default
* implementation.
*/
- virtual Status setReadFromMajorityCommittedSnapshot() {
+ virtual Status obtainMajorityCommittedSnapshot() {
return {ErrorCodes::CommandNotSupported,
"Current storage engine does not support majority readConcerns"};
}
/**
- * Returns true if setReadFromMajorityCommittedSnapshot() has been called.
+ * Returns true if we are reading from a majority committed snapshot.
*/
virtual bool isReadingFromMajorityCommittedSnapshot() const {
return false;
}
/**
+ * Set this operation's readConcern level and replication mode on the recovery unit.
+ */
+ void setReadConcernLevelAndReplicationMode(repl::ReadConcernLevel readConcernLevel,
+ repl::ReplicationCoordinator::Mode replicationMode) {
+ _readConcernLevel = readConcernLevel;
+ _replicationMode = replicationMode;
+ }
+
+ /**
+ * Returns the readConcern level of this recovery unit.
+ */
+ repl::ReadConcernLevel getReadConcernLevel() const {
+ return _readConcernLevel;
+ }
+
+ /**
* Returns the Timestamp being used by this recovery unit or boost::none if not reading from
* a majority committed snapshot.
*
@@ -296,6 +313,8 @@ public:
protected:
RecoveryUnit() {}
+ repl::ReplicationCoordinator::Mode _replicationMode = repl::ReplicationCoordinator::modeNone;
+ repl::ReadConcernLevel _readConcernLevel = repl::ReadConcernLevel::kLocalReadConcern;
};
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 02f9f76224f..2c2174f2a6c 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -227,20 +227,19 @@ SnapshotId WiredTigerRecoveryUnit::getSnapshotId() const {
return SnapshotId(_mySnapshotId);
}
-Status WiredTigerRecoveryUnit::setReadFromMajorityCommittedSnapshot() {
+Status WiredTigerRecoveryUnit::obtainMajorityCommittedSnapshot() {
+ invariant(isReadingFromMajorityCommittedSnapshot());
auto snapshotName = _sessionCache->snapshotManager().getMinSnapshotForNextCommittedRead();
if (!snapshotName) {
return {ErrorCodes::ReadConcernMajorityNotAvailableYet,
"Read concern majority reads are currently not possible."};
}
-
_majorityCommittedSnapshot = *snapshotName;
- _readFromMajorityCommittedSnapshot = true;
return Status::OK();
}
boost::optional<Timestamp> WiredTigerRecoveryUnit::getMajorityCommittedSnapshot() const {
- if (!_readFromMajorityCommittedSnapshot)
+ if (!isReadingFromMajorityCommittedSnapshot())
return {};
return _majorityCommittedSnapshot;
}
@@ -268,7 +267,7 @@ void WiredTigerRecoveryUnit::_txnOpen() {
<< " is older than the oldest available timestamp.");
}
uassertStatusOK(status);
- } else if (_readFromMajorityCommittedSnapshot) {
+ } else if (isReadingFromMajorityCommittedSnapshot()) {
// We reset _majorityCommittedSnapshot to the actual read timestamp used when the
// transaction was started.
_majorityCommittedSnapshot =
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index c2b65222442..99b11f060a8 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -39,6 +39,7 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/record_id.h"
+#include "mongo/db/repl/read_concern_level.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
#include "mongo/util/timer.h"
@@ -72,9 +73,12 @@ public:
void abandonSnapshot() override;
void preallocateSnapshot() override;
- Status setReadFromMajorityCommittedSnapshot() override;
+ Status obtainMajorityCommittedSnapshot() override;
+
bool isReadingFromMajorityCommittedSnapshot() const override {
- return _readFromMajorityCommittedSnapshot;
+ return _replicationMode == repl::ReplicationCoordinator::modeReplSet &&
+ (_readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern ||
+ _readConcernLevel == repl::ReadConcernLevel::kSnapshotReadConcern);
}
boost::optional<Timestamp> getMajorityCommittedSnapshot() const override;
@@ -146,7 +150,6 @@ private:
bool _isTimestamped = false;
Timestamp _commitTimestamp;
uint64_t _mySnapshotId;
- bool _readFromMajorityCommittedSnapshot = false;
Timestamp _majorityCommittedSnapshot;
Timestamp _readAtTimestamp;
std::unique_ptr<Timer> _timer;
diff --git a/src/mongo/dbtests/cursor_manager_test.cpp b/src/mongo/dbtests/cursor_manager_test.cpp
index 12d393c48df..ee3933dfa35 100644
--- a/src/mongo/dbtests/cursor_manager_test.cpp
+++ b/src/mongo/dbtests/cursor_manager_test.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/query_test_service_context.h"
+#include "mongo/db/repl/read_concern_level.h"
#include "mongo/dbtests/dbtests.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/clock_source_mock.h"
@@ -107,7 +108,11 @@ public:
}
ClientCursorParams makeParams(OperationContext* opCtx) {
- return {makeFakePlanExecutor(opCtx), kTestNss, {}, false, BSONObj()};
+ return {makeFakePlanExecutor(opCtx),
+ kTestNss,
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()};
}
ClientCursorPin makeCursor(OperationContext* opCtx) {
@@ -145,7 +150,11 @@ TEST_F(CursorManagerTest, GlobalCursorManagerShouldReportOwnershipOfCursorsItCre
for (int i = 0; i < 1000; i++) {
auto cursorPin = CursorManager::getGlobalCursorManager()->registerCursor(
_opCtx.get(),
- {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()});
+ {makeFakePlanExecutor(),
+ NamespaceString{"test.collection"},
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
ASSERT_TRUE(CursorManager::isGloballyManagedCursor(cursorPin.getCursor()->cursorid()));
}
}
@@ -155,8 +164,12 @@ TEST_F(CursorManagerTest,
CursorManager* cursorManager = useCursorManager();
auto opCtx = cc().makeOperationContext();
for (int i = 0; i < 1000; i++) {
- auto cursorPin = cursorManager->registerCursor(
- _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ kTestNss,
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
ASSERT_FALSE(CursorManager::isGloballyManagedCursor(cursorPin.getCursor()->cursorid()));
}
}
@@ -170,8 +183,12 @@ TEST_F(CursorManagerTest,
CursorManager* cursorManager = useCursorManager();
boost::optional<uint32_t> prefix;
for (int i = 0; i < 1000; i++) {
- auto cursorPin = cursorManager->registerCursor(
- _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ kTestNss,
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
auto cursorId = cursorPin.getCursor()->cursorid();
if (prefix) {
ASSERT_EQ(*prefix, extractLeading32Bits(cursorId));
@@ -189,8 +206,12 @@ TEST_F(CursorManagerTest,
*/
TEST_F(CursorManagerTest, InvalidateCursor) {
CursorManager* cursorManager = useCursorManager();
- auto cursorPin = cursorManager->registerCursor(
- _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ kTestNss,
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
auto cursorId = cursorPin.getCursor()->cursorid();
cursorPin.release();
@@ -215,8 +236,12 @@ TEST_F(CursorManagerTest, InvalidateCursor) {
TEST_F(CursorManagerTest, InvalidateCursorWithDrop) {
CursorManager* cursorManager = useCursorManager();
- auto cursorPin = cursorManager->registerCursor(
- _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ kTestNss,
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
auto cursorId = cursorPin.getCursor()->cursorid();
cursorPin.release();
@@ -238,8 +263,12 @@ TEST_F(CursorManagerTest, InvalidateCursorWithDrop) {
TEST_F(CursorManagerTest, InvalidatePinnedCursor) {
CursorManager* cursorManager = useCursorManager();
- auto cursorPin = cursorManager->registerCursor(
- _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ kTestNss,
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
// If the cursor is pinned, it sticks around, even after invalidation.
ASSERT_EQUALS(1U, cursorManager->numCursors());
@@ -266,8 +295,12 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursor) {
const bool shouldAudit = false;
OperationContext* const pinningOpCtx = _opCtx.get();
- auto cursorPin = cursorManager->registerCursor(
- pinningOpCtx, {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(pinningOpCtx,
+ {makeFakePlanExecutor(),
+ kTestNss,
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
auto cursorId = cursorPin.getCursor()->cursorid();
ASSERT_OK(cursorManager->killCursor(_opCtx.get(), cursorId, shouldAudit));
@@ -285,8 +318,12 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursorMultiClient) {
OperationContext* const pinningOpCtx = _opCtx.get();
// Pin the cursor from one client.
- auto cursorPin = cursorManager->registerCursor(
- pinningOpCtx, {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(pinningOpCtx,
+ {makeFakePlanExecutor(),
+ kTestNss,
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
auto cursorId = cursorPin.getCursor()->cursorid();
@@ -315,9 +352,12 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) {
CursorManager* cursorManager = useCursorManager();
auto clock = useClock();
- cursorManager->registerCursor(
- _opCtx.get(),
- {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()});
+ cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ NamespaceString{"test.collection"},
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
ASSERT_EQ(0UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t()));
@@ -325,9 +365,12 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) {
ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now()));
ASSERT_EQ(0UL, cursorManager->numCursors());
- cursorManager->registerCursor(
- _opCtx.get(),
- {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()});
+ cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ NamespaceString{"test.collection"},
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t::max()));
ASSERT_EQ(0UL, cursorManager->numCursors());
}
@@ -339,9 +382,12 @@ TEST_F(CursorManagerTest, InactivePinnedCursorShouldNotTimeout) {
CursorManager* cursorManager = useCursorManager();
auto clock = useClock();
- auto cursorPin = cursorManager->registerCursor(
- _opCtx.get(),
- {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ NamespaceString{"test.collection"},
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
// The pin is still in scope, so it should not time out.
clock->advance(getDefaultCursorTimeoutMillis());
@@ -356,9 +402,12 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsShouldTimeout) {
auto clock = useClock();
// Make a cursor from the plan executor, and immediately kill it.
- auto cursorPin = cursorManager->registerCursor(
- _opCtx.get(),
- {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ NamespaceString{"test.collection"},
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
cursorPin.release();
const bool collectionGoingAway = false;
cursorManager->invalidateAll(
@@ -379,9 +428,12 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsThatAreStillPinnedShouldNotTimeou
auto clock = useClock();
// Make a cursor from the plan executor, and immediately kill it.
- auto cursorPin = cursorManager->registerCursor(
- _opCtx.get(),
- {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ NamespaceString{"test.collection"},
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
const bool collectionGoingAway = false;
cursorManager->invalidateAll(
_opCtx.get(), collectionGoingAway, "KilledCursorsShouldTimeoutTest");
@@ -401,15 +453,23 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) {
auto clock = useClock();
// Register a cursor which we will look at again.
- auto cursorPin = cursorManager->registerCursor(
- _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ kTestNss,
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
auto usedCursorId = cursorPin.getCursor()->cursorid();
cursorPin.release();
// Register a cursor to immediately forget about, to make sure it will time out on a normal
// schedule.
cursorManager->registerCursor(_opCtx.get(),
- {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()});
+ {makeFakePlanExecutor(),
+ kTestNss,
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
// Advance the clock to simulate time passing.
clock->advance(Milliseconds(1));
@@ -438,8 +498,12 @@ TEST_F(CursorManagerTest, CursorShouldNotTimeOutUntilIdleForLongEnoughAfterBeing
auto clock = useClock();
// Register a cursor which we will look at again.
- auto cursorPin = cursorManager->registerCursor(
- _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()});
+ auto cursorPin = cursorManager->registerCursor(_opCtx.get(),
+ {makeFakePlanExecutor(),
+ kTestNss,
+ {},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ BSONObj()});
// Advance the clock to simulate time passing.
clock->advance(getDefaultCursorTimeoutMillis() + Milliseconds(1));
diff --git a/src/mongo/s/client/rs_local_client.cpp b/src/mongo/s/client/rs_local_client.cpp
index 718ff0746ce..d6c27dd719a 100644
--- a/src/mongo/s/client/rs_local_client.cpp
+++ b/src/mongo/s/client/rs_local_client.cpp
@@ -107,7 +107,14 @@ StatusWith<Shard::QueryResponse> RSLocalClient::queryOnce(
if (readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern) {
// Set up operation context with majority read snapshot so correct optime can be retrieved.
- Status status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
+ // TODO SERVER-33794: Always set the replicationMode to modeReplSet so
+ // that we can call obtainMajorityCommittedSnapshot. Note we can be
+ // modeNone(standalone) here, so setting it to modeReplSet is only a
+ // temporary solution.
+ opCtx->recoveryUnit()->setReadConcernLevelAndReplicationMode(
+ readConcernLevel, repl::ReplicationCoordinator::modeReplSet);
+
+ Status status = opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot();
// Wait for any writes performed by this ShardLocal instance to be committed and visible.
Status readConcernStatus = replCoord->waitUntilOpTimeForRead(
@@ -118,7 +125,7 @@ StatusWith<Shard::QueryResponse> RSLocalClient::queryOnce(
// Inform the storage engine to read from the committed snapshot for the rest of this
// operation.
- status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
+ status = opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot();
if (!status.isOK()) {
return status;
}