diff options
author | Xiangyu Yao <xiangyu.yao@mongodb.com> | 2018-03-07 15:10:10 -0500 |
---|---|---|
committer | Xiangyu Yao <xiangyu.yao@mongodb.com> | 2018-03-09 18:06:06 -0500 |
commit | 4ef0fe789ef349307c4cffd6548dc8657059cca5 (patch) | |
tree | d13f1dd19a97ca4e9a1e082f566bb0d59cee3147 /src | |
parent | ed1e2b4d2a4987e3744484f9482fdc7a0e119e94 (diff) | |
download | mongo-4ef0fe789ef349307c4cffd6548dc8657059cca5.tar.gz |
SERVER-33609 Pass readConcernLevel to WiredTigerRecoveryUnit
Diffstat (limited to 'src')
23 files changed, 257 insertions, 91 deletions
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index a2354f0e6d3..27d851fa043 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -85,7 +85,7 @@ ClientCursor::ClientCursor(ClientCursorParams params, _authenticatedUsers(std::move(params.authenticatedUsers)), _lsid(operationUsingCursor->getLogicalSessionId()), _txnNumber(operationUsingCursor->getTxnNumber()), - _isReadCommitted(params.isReadCommitted), + _readConcernLevel(params.readConcernLevel), _cursorManager(cursorManager), _originatingCommand(params.originatingCommandObj), _queryOptions(params.queryOptions), diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index b5e917acbe0..b811fc47722 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -37,6 +37,7 @@ #include "mongo/db/logical_session_id.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/record_id.h" +#include "mongo/db/repl/read_concern_level.h" #include "mongo/stdx/functional.h" #include "mongo/util/net/message.h" @@ -57,11 +58,11 @@ struct ClientCursorParams { ClientCursorParams(std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor, NamespaceString nss, UserNameIterator authenticatedUsersIter, - bool isReadCommitted, + repl::ReadConcernLevel readConcernLevel, BSONObj originatingCommandObj) : exec(std::move(planExecutor)), nss(std::move(nss)), - isReadCommitted(isReadCommitted), + readConcernLevel(readConcernLevel), queryOptions(exec->getCanonicalQuery() ? exec->getCanonicalQuery()->getQueryRequest().getOptions() : 0), @@ -88,7 +89,7 @@ struct ClientCursorParams { std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; const NamespaceString nss; std::vector<UserName> authenticatedUsers; - bool isReadCommitted = false; + const repl::ReadConcernLevel readConcernLevel; int queryOptions = 0; BSONObj originatingCommandObj; }; @@ -133,8 +134,8 @@ public: return _txnNumber; } - bool isReadCommitted() const { - return _isReadCommitted; + repl::ReadConcernLevel getReadConcernLevel() const { + return _readConcernLevel; } /** @@ -307,7 +308,7 @@ private: // A transaction number for this cursor, if it was provided in the originating command. const boost::optional<TxnNumber> _txnNumber; - const bool _isReadCommitted = false; + const repl::ReadConcernLevel _readConcernLevel; CursorManager* _cursorManager; diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 9f4e12cd062..5a7bf9c0cec 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -379,7 +379,7 @@ public: {std::move(exec), nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + opCtx->recoveryUnit()->getReadConcernLevel(), cmdObj}); cursorId = pinnedCursor.getCursor()->cursorid(); diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index d293b5c09f5..6674092b823 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -391,8 +391,18 @@ public: ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin.getValue()); stashedCursorIndicator.Dismiss(); - if (cursor->isReadCommitted()) - uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); + const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode(); + opCtx->recoveryUnit()->setReadConcernLevelAndReplicationMode(cursor->getReadConcernLevel(), + replicationMode); + + // TODO SERVER-33698: Remove kSnapshotReadConcern clause once we can guarantee that a + // readConcern level snapshot getMore will have an established point-in-time WiredTiger + // snapshot. + if (replicationMode == repl::ReplicationCoordinator::modeReplSet && + (cursor->getReadConcernLevel() == repl::ReadConcernLevel::kMajorityReadConcern || + cursor->getReadConcernLevel() == repl::ReadConcernLevel::kSnapshotReadConcern)) { + uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot()); + } const bool disableAwaitDataFailpointActive = MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd); diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp index 035ea4c6271..822baef6de2 100644 --- a/src/mongo/db/commands/list_collections.cpp +++ b/src/mongo/db/commands/list_collections.cpp @@ -345,7 +345,7 @@ public: {std::move(exec), cursorNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + opCtx->recoveryUnit()->getReadConcernLevel(), jsobj}); cursorId = pinnedCursor.getCursor()->cursorid(); } diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index a64f91dc080..19993d6084b 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -198,7 +198,7 @@ public: {std::move(exec), cursorNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + opCtx->recoveryUnit()->getReadConcernLevel(), cmdObj}); cursorId = pinnedCursor.getCursor()->cursorid(); } diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp index 05b8dbff677..c040e9e0510 100644 --- a/src/mongo/db/commands/parallel_collection_scan.cpp +++ b/src/mongo/db/commands/parallel_collection_scan.cpp @@ -155,7 +155,7 @@ public: {std::move(exec), nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + opCtx->recoveryUnit()->getReadConcernLevel(), cmdObj}); pinnedCursor.getCursor()->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp index 4234a0d91d6..f8d3b058d2d 100644 --- a/src/mongo/db/commands/repair_cursor.cpp +++ b/src/mongo/db/commands/repair_cursor.cpp @@ -105,7 +105,7 @@ public: {std::move(exec), ns, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + opCtx->recoveryUnit()->getReadConcernLevel(), cmdObj}); appendCursorResponseObject( diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 066ad027e4f..d33a1a061ab 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -521,7 +521,7 @@ Status runAggregate(OperationContext* opCtx, std::move(exec), origNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + opCtx->recoveryUnit()->getReadConcernLevel(), cmdObj); if (expCtx->tailableMode == TailableMode::kTailableAndAwaitData) { cursorParams.setTailable(true); diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index 6b9a695d02b..f42857afb76 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -104,7 +104,7 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx, repl::ReplicationCoordinator::get(opCtx)->waitUntilSnapshotCommitted(opCtx, *minSnapshot); - uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); + uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot()); { stdx::lock_guard<Client> lk(*opCtx->getClient()); diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 85370429156..8b01d239967 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -358,8 +358,18 @@ Message getMore(OperationContext* opCtx, *isCursorAuthorized = true; - if (cc->isReadCommitted()) - uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); + const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode(); + opCtx->recoveryUnit()->setReadConcernLevelAndReplicationMode(cc->getReadConcernLevel(), + replicationMode); + + // TODO SERVER-33698: Remove kSnapshotReadConcern clause once we can guarantee that a + // readConcern level snapshot getMore will have an established point-in-time WiredTiger + // snapshot. + if (replicationMode == repl::ReplicationCoordinator::modeReplSet && + (cc->getReadConcernLevel() == repl::ReadConcernLevel::kMajorityReadConcern || + cc->getReadConcernLevel() == repl::ReadConcernLevel::kSnapshotReadConcern)) { + uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot()); + } uassert(40548, "OP_GET_MORE operations are not supported on tailable aggregations. Only clients " @@ -690,7 +700,7 @@ std::string runQuery(OperationContext* opCtx, {std::move(exec), nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + opCtx->recoveryUnit()->getReadConcernLevel(), upconvertQueryEntry(q.query, qr.nss(), q.ntoreturn, q.ntoskip)}); ccId = pinnedCursor.getCursor()->cursorid(); diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp index 7c054469bd5..4e8c04440f8 100644 --- a/src/mongo/db/read_concern.cpp +++ b/src/mongo/db/read_concern.cpp @@ -207,6 +207,9 @@ Status waitForReadConcern(OperationContext* opCtx, repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx); invariant(replCoord); + opCtx->recoveryUnit()->setReadConcernLevelAndReplicationMode(readConcernArgs.getLevel(), + replCoord->getReplicationMode()); + if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) { if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { // For master/slave and standalone nodes, Linearizable Read is not supported. @@ -299,13 +302,13 @@ Status waitForReadConcern(OperationContext* opCtx, LOG(debugLevel) << "Waiting for 'committed' snapshot to be available for reading: " << readConcernArgs; - Status status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); + Status status = opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot(); // Wait until a snapshot is available. while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) { LOG(debugLevel) << "Snapshot not available yet."; replCoord->waitUntilSnapshotCommitted(opCtx, Timestamp()); - status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); + status = opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot(); } if (!status.isOK()) { diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h index f8a6226ecf8..29aab009a89 100644 --- a/src/mongo/db/repl/read_concern_args.h +++ b/src/mongo/db/repl/read_concern_args.h @@ -36,6 +36,7 @@ #include "mongo/db/logical_time.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/read_concern_level.h" #include "mongo/util/time_support.h" namespace mongo { @@ -44,14 +45,6 @@ class BSONObj; namespace repl { -enum class ReadConcernLevel { - kLocalReadConcern, - kMajorityReadConcern, - kLinearizableReadConcern, - kAvailableReadConcern, - kSnapshotReadConcern -}; - class ReadConcernArgs { public: static const std::string kReadConcernFieldName; diff --git a/src/mongo/db/repl/read_concern_level.h b/src/mongo/db/repl/read_concern_level.h new file mode 100644 index 00000000000..ffb91250afa --- /dev/null +++ b/src/mongo/db/repl/read_concern_level.h @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +namespace mongo { +namespace repl { + +enum class ReadConcernLevel { + kLocalReadConcern, + kMajorityReadConcern, + kLinearizableReadConcern, + kAvailableReadConcern, + kSnapshotReadConcern +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index 3b8eac55cdd..ddacb478aa6 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -394,7 +394,7 @@ TEST_F(StorageInterfaceImplTest, GetRollbackIDReturnsBadStatusIfRollbackIDIsNotI TEST_F(StorageInterfaceImplTest, SnapshotSupported) { auto opCtx = getOperationContext(); - Status status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); + Status status = opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot(); ASSERT(status.isOK()); } diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp index 800b6b6f1cc..97471b21ca2 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp @@ -67,7 +67,7 @@ void EphemeralForTestRecoveryUnit::abortUnitOfWork() { } } -Status EphemeralForTestRecoveryUnit::setReadFromMajorityCommittedSnapshot() { +Status EphemeralForTestRecoveryUnit::obtainMajorityCommittedSnapshot() { return Status::OK(); } } diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h index 7de00846992..ad6f9091295 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h @@ -58,7 +58,7 @@ public: virtual void abandonSnapshot() {} - Status setReadFromMajorityCommittedSnapshot() final; + Status obtainMajorityCommittedSnapshot() final; virtual void registerChange(Change* change) { _changes.push_back(ChangePtr(change)); diff --git a/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp b/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp index 7112ce561c8..a99e632a456 100644 --- a/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp +++ b/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp @@ -33,6 +33,8 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/operation_context_noop.h" +#include "mongo/db/repl/read_concern_level.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context_noop.h" #include "mongo/db/storage/kv/kv_engine.h" #include "mongo/db/storage/record_store.h" @@ -142,13 +144,19 @@ public: int itCountCommitted() { auto op = makeOperation(); - ASSERT_OK(op->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); + op->recoveryUnit()->setReadConcernLevelAndReplicationMode( + repl::ReadConcernLevel::kMajorityReadConcern, + repl::ReplicationCoordinator::modeReplSet); + ASSERT_OK(op->recoveryUnit()->obtainMajorityCommittedSnapshot()); return itCountOn(op); } boost::optional<Record> readRecordCommitted(RecordId id) { auto op = makeOperation(); - ASSERT_OK(op->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); + op->recoveryUnit()->setReadConcernLevelAndReplicationMode( + repl::ReadConcernLevel::kMajorityReadConcern, + repl::ReplicationCoordinator::modeReplSet); + ASSERT_OK(op->recoveryUnit()->obtainMajorityCommittedSnapshot()); auto cursor = rs->getCursor(op); auto record = cursor->seekExact(id); if (record) @@ -204,23 +212,25 @@ TEST_F(SnapshotManagerTests, FailsWithNoCommittedSnapshot) { auto op = makeOperation(); auto ru = op->recoveryUnit(); + ru->setReadConcernLevelAndReplicationMode(repl::ReadConcernLevel::kMajorityReadConcern, + repl::ReplicationCoordinator::modeReplSet); // Before first snapshot is created. - ASSERT_EQ(ru->setReadFromMajorityCommittedSnapshot(), + ASSERT_EQ(ru->obtainMajorityCommittedSnapshot(), ErrorCodes::ReadConcernMajorityNotAvailableYet); // There is a snapshot but it isn't committed. auto snap = fetchAndIncrementTimestamp(); - ASSERT_EQ(ru->setReadFromMajorityCommittedSnapshot(), + ASSERT_EQ(ru->obtainMajorityCommittedSnapshot(), ErrorCodes::ReadConcernMajorityNotAvailableYet); // Now there is a committed snapshot. snapshotManager->setCommittedSnapshot(snap); - ASSERT_OK(ru->setReadFromMajorityCommittedSnapshot()); + ASSERT_OK(ru->obtainMajorityCommittedSnapshot()); // Not anymore! snapshotManager->dropAllSnapshots(); - ASSERT_EQ(ru->setReadFromMajorityCommittedSnapshot(), + ASSERT_EQ(ru->obtainMajorityCommittedSnapshot(), ErrorCodes::ReadConcernMajorityNotAvailableYet); } @@ -229,11 +239,13 @@ TEST_F(SnapshotManagerTests, FailsAfterDropAllSnapshotsWhileYielded) { return; // This test is only for engines that DO support SnapshotMangers. auto op = makeOperation(); + op->recoveryUnit()->setReadConcernLevelAndReplicationMode( + repl::ReadConcernLevel::kMajorityReadConcern, repl::ReplicationCoordinator::modeReplSet); // Start an operation using a committed snapshot. auto snap = fetchAndIncrementTimestamp(); snapshotManager->setCommittedSnapshot(snap); - ASSERT_OK(op->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); + ASSERT_OK(op->recoveryUnit()->obtainMajorityCommittedSnapshot()); ASSERT_EQ(itCountOn(op), 0); // acquires a snapshot. // Everything still works until we abandon our snapshot. @@ -284,7 +296,9 @@ TEST_F(SnapshotManagerTests, BasicFunctionality) { // This op should keep its original snapshot until abandoned. auto longOp = makeOperation(); - ASSERT_OK(longOp->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); + longOp->recoveryUnit()->setReadConcernLevelAndReplicationMode( + repl::ReadConcernLevel::kMajorityReadConcern, repl::ReplicationCoordinator::modeReplSet); + ASSERT_OK(longOp->recoveryUnit()->obtainMajorityCommittedSnapshot()); ASSERT_EQ(itCountOn(longOp), 3); // If this fails, the snapshot contains writes that were rolled back. diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index bbc56dadb73..846e7015eb9 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -35,6 +35,8 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/bson/timestamp.h" +#include "mongo/db/repl/read_concern_level.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/storage/snapshot.h" namespace mongo { @@ -93,9 +95,8 @@ public: virtual void preallocateSnapshot() {} /** - * Informs this RecoveryUnit that all future reads through it should be from a snapshot - * marked as Majority Committed. Snapshots should still be separately acquired and newer - * committed snapshots should be used if available whenever implementations would normally + * Obtains a majority committed snapshot. Snapshots should still be separately acquired and + * newer committed snapshots should be used if available whenever implementations would normally * change snapshots. * * If no snapshot has yet been marked as Majority Committed, returns a status with error code @@ -107,19 +108,35 @@ public: * StorageEngines that don't support a SnapshotManager should use the default * implementation. */ - virtual Status setReadFromMajorityCommittedSnapshot() { + virtual Status obtainMajorityCommittedSnapshot() { return {ErrorCodes::CommandNotSupported, "Current storage engine does not support majority readConcerns"}; } /** - * Returns true if setReadFromMajorityCommittedSnapshot() has been called. + * Returns true if we are reading from a majority committed snapshot. */ virtual bool isReadingFromMajorityCommittedSnapshot() const { return false; } /** + * Set this operation's readConcern level and replication mode on the recovery unit. + */ + void setReadConcernLevelAndReplicationMode(repl::ReadConcernLevel readConcernLevel, + repl::ReplicationCoordinator::Mode replicationMode) { + _readConcernLevel = readConcernLevel; + _replicationMode = replicationMode; + } + + /** + * Returns the readConcern level of this recovery unit. + */ + repl::ReadConcernLevel getReadConcernLevel() const { + return _readConcernLevel; + } + + /** * Returns the Timestamp being used by this recovery unit or boost::none if not reading from * a majority committed snapshot. * @@ -296,6 +313,8 @@ public: protected: RecoveryUnit() {} + repl::ReplicationCoordinator::Mode _replicationMode = repl::ReplicationCoordinator::modeNone; + repl::ReadConcernLevel _readConcernLevel = repl::ReadConcernLevel::kLocalReadConcern; }; } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 02f9f76224f..2c2174f2a6c 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -227,20 +227,19 @@ SnapshotId WiredTigerRecoveryUnit::getSnapshotId() const { return SnapshotId(_mySnapshotId); } -Status WiredTigerRecoveryUnit::setReadFromMajorityCommittedSnapshot() { +Status WiredTigerRecoveryUnit::obtainMajorityCommittedSnapshot() { + invariant(isReadingFromMajorityCommittedSnapshot()); auto snapshotName = _sessionCache->snapshotManager().getMinSnapshotForNextCommittedRead(); if (!snapshotName) { return {ErrorCodes::ReadConcernMajorityNotAvailableYet, "Read concern majority reads are currently not possible."}; } - _majorityCommittedSnapshot = *snapshotName; - _readFromMajorityCommittedSnapshot = true; return Status::OK(); } boost::optional<Timestamp> WiredTigerRecoveryUnit::getMajorityCommittedSnapshot() const { - if (!_readFromMajorityCommittedSnapshot) + if (!isReadingFromMajorityCommittedSnapshot()) return {}; return _majorityCommittedSnapshot; } @@ -268,7 +267,7 @@ void WiredTigerRecoveryUnit::_txnOpen() { << " is older than the oldest available timestamp."); } uassertStatusOK(status); - } else if (_readFromMajorityCommittedSnapshot) { + } else if (isReadingFromMajorityCommittedSnapshot()) { // We reset _majorityCommittedSnapshot to the actual read timestamp used when the // transaction was started. _majorityCommittedSnapshot = diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index c2b65222442..99b11f060a8 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -39,6 +39,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/operation_context.h" #include "mongo/db/record_id.h" +#include "mongo/db/repl/read_concern_level.h" #include "mongo/db/storage/recovery_unit.h" #include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" #include "mongo/util/timer.h" @@ -72,9 +73,12 @@ public: void abandonSnapshot() override; void preallocateSnapshot() override; - Status setReadFromMajorityCommittedSnapshot() override; + Status obtainMajorityCommittedSnapshot() override; + bool isReadingFromMajorityCommittedSnapshot() const override { - return _readFromMajorityCommittedSnapshot; + return _replicationMode == repl::ReplicationCoordinator::modeReplSet && + (_readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern || + _readConcernLevel == repl::ReadConcernLevel::kSnapshotReadConcern); } boost::optional<Timestamp> getMajorityCommittedSnapshot() const override; @@ -146,7 +150,6 @@ private: bool _isTimestamped = false; Timestamp _commitTimestamp; uint64_t _mySnapshotId; - bool _readFromMajorityCommittedSnapshot = false; Timestamp _majorityCommittedSnapshot; Timestamp _readAtTimestamp; std::unique_ptr<Timer> _timer; diff --git a/src/mongo/dbtests/cursor_manager_test.cpp b/src/mongo/dbtests/cursor_manager_test.cpp index 12d393c48df..ee3933dfa35 100644 --- a/src/mongo/dbtests/cursor_manager_test.cpp +++ b/src/mongo/dbtests/cursor_manager_test.cpp @@ -44,6 +44,7 @@ #include "mongo/db/operation_context_noop.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_test_service_context.h" +#include "mongo/db/repl/read_concern_level.h" #include "mongo/dbtests/dbtests.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" @@ -107,7 +108,11 @@ public: } ClientCursorParams makeParams(OperationContext* opCtx) { - return {makeFakePlanExecutor(opCtx), kTestNss, {}, false, BSONObj()}; + return {makeFakePlanExecutor(opCtx), + kTestNss, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}; } ClientCursorPin makeCursor(OperationContext* opCtx) { @@ -145,7 +150,11 @@ TEST_F(CursorManagerTest, GlobalCursorManagerShouldReportOwnershipOfCursorsItCre for (int i = 0; i < 1000; i++) { auto cursorPin = CursorManager::getGlobalCursorManager()->registerCursor( _opCtx.get(), - {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()}); + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); ASSERT_TRUE(CursorManager::isGloballyManagedCursor(cursorPin.getCursor()->cursorid())); } } @@ -155,8 +164,12 @@ TEST_F(CursorManagerTest, CursorManager* cursorManager = useCursorManager(); auto opCtx = cc().makeOperationContext(); for (int i = 0; i < 1000; i++) { - auto cursorPin = cursorManager->registerCursor( - _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + kTestNss, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); ASSERT_FALSE(CursorManager::isGloballyManagedCursor(cursorPin.getCursor()->cursorid())); } } @@ -170,8 +183,12 @@ TEST_F(CursorManagerTest, CursorManager* cursorManager = useCursorManager(); boost::optional<uint32_t> prefix; for (int i = 0; i < 1000; i++) { - auto cursorPin = cursorManager->registerCursor( - _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + kTestNss, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); auto cursorId = cursorPin.getCursor()->cursorid(); if (prefix) { ASSERT_EQ(*prefix, extractLeading32Bits(cursorId)); @@ -189,8 +206,12 @@ TEST_F(CursorManagerTest, */ TEST_F(CursorManagerTest, InvalidateCursor) { CursorManager* cursorManager = useCursorManager(); - auto cursorPin = cursorManager->registerCursor( - _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + kTestNss, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); auto cursorId = cursorPin.getCursor()->cursorid(); cursorPin.release(); @@ -215,8 +236,12 @@ TEST_F(CursorManagerTest, InvalidateCursor) { TEST_F(CursorManagerTest, InvalidateCursorWithDrop) { CursorManager* cursorManager = useCursorManager(); - auto cursorPin = cursorManager->registerCursor( - _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + kTestNss, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); auto cursorId = cursorPin.getCursor()->cursorid(); cursorPin.release(); @@ -238,8 +263,12 @@ TEST_F(CursorManagerTest, InvalidateCursorWithDrop) { TEST_F(CursorManagerTest, InvalidatePinnedCursor) { CursorManager* cursorManager = useCursorManager(); - auto cursorPin = cursorManager->registerCursor( - _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + kTestNss, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); // If the cursor is pinned, it sticks around, even after invalidation. ASSERT_EQUALS(1U, cursorManager->numCursors()); @@ -266,8 +295,12 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursor) { const bool shouldAudit = false; OperationContext* const pinningOpCtx = _opCtx.get(); - auto cursorPin = cursorManager->registerCursor( - pinningOpCtx, {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(pinningOpCtx, + {makeFakePlanExecutor(), + kTestNss, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); auto cursorId = cursorPin.getCursor()->cursorid(); ASSERT_OK(cursorManager->killCursor(_opCtx.get(), cursorId, shouldAudit)); @@ -285,8 +318,12 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursorMultiClient) { OperationContext* const pinningOpCtx = _opCtx.get(); // Pin the cursor from one client. - auto cursorPin = cursorManager->registerCursor( - pinningOpCtx, {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(pinningOpCtx, + {makeFakePlanExecutor(), + kTestNss, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); auto cursorId = cursorPin.getCursor()->cursorid(); @@ -315,9 +352,12 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) { CursorManager* cursorManager = useCursorManager(); auto clock = useClock(); - cursorManager->registerCursor( - _opCtx.get(), - {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()}); + cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); ASSERT_EQ(0UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t())); @@ -325,9 +365,12 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) { ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now())); ASSERT_EQ(0UL, cursorManager->numCursors()); - cursorManager->registerCursor( - _opCtx.get(), - {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()}); + cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t::max())); ASSERT_EQ(0UL, cursorManager->numCursors()); } @@ -339,9 +382,12 @@ TEST_F(CursorManagerTest, InactivePinnedCursorShouldNotTimeout) { CursorManager* cursorManager = useCursorManager(); auto clock = useClock(); - auto cursorPin = cursorManager->registerCursor( - _opCtx.get(), - {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); // The pin is still in scope, so it should not time out. clock->advance(getDefaultCursorTimeoutMillis()); @@ -356,9 +402,12 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsShouldTimeout) { auto clock = useClock(); // Make a cursor from the plan executor, and immediately kill it. - auto cursorPin = cursorManager->registerCursor( - _opCtx.get(), - {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); cursorPin.release(); const bool collectionGoingAway = false; cursorManager->invalidateAll( @@ -379,9 +428,12 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsThatAreStillPinnedShouldNotTimeou auto clock = useClock(); // Make a cursor from the plan executor, and immediately kill it. - auto cursorPin = cursorManager->registerCursor( - _opCtx.get(), - {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); const bool collectionGoingAway = false; cursorManager->invalidateAll( _opCtx.get(), collectionGoingAway, "KilledCursorsShouldTimeoutTest"); @@ -401,15 +453,23 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) { auto clock = useClock(); // Register a cursor which we will look at again. - auto cursorPin = cursorManager->registerCursor( - _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + kTestNss, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); auto usedCursorId = cursorPin.getCursor()->cursorid(); cursorPin.release(); // Register a cursor to immediately forget about, to make sure it will time out on a normal // schedule. cursorManager->registerCursor(_opCtx.get(), - {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()}); + {makeFakePlanExecutor(), + kTestNss, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); // Advance the clock to simulate time passing. clock->advance(Milliseconds(1)); @@ -438,8 +498,12 @@ TEST_F(CursorManagerTest, CursorShouldNotTimeOutUntilIdleForLongEnoughAfterBeing auto clock = useClock(); // Register a cursor which we will look at again. - auto cursorPin = cursorManager->registerCursor( - _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()}); + auto cursorPin = cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + kTestNss, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); // Advance the clock to simulate time passing. clock->advance(getDefaultCursorTimeoutMillis() + Milliseconds(1)); diff --git a/src/mongo/s/client/rs_local_client.cpp b/src/mongo/s/client/rs_local_client.cpp index 718ff0746ce..d6c27dd719a 100644 --- a/src/mongo/s/client/rs_local_client.cpp +++ b/src/mongo/s/client/rs_local_client.cpp @@ -107,7 +107,14 @@ StatusWith<Shard::QueryResponse> RSLocalClient::queryOnce( if (readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern) { // Set up operation context with majority read snapshot so correct optime can be retrieved. - Status status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); + // TODO SERVER-33794: Always set the replicationMode to modeReplSet so + // that we can call obtainMajorityCommittedSnapshot. Note we can be + // modeNone(standalone) here, so setting it to modeReplSet is only a + // temporary solution. + opCtx->recoveryUnit()->setReadConcernLevelAndReplicationMode( + readConcernLevel, repl::ReplicationCoordinator::modeReplSet); + + Status status = opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot(); // Wait for any writes performed by this ShardLocal instance to be committed and visible. Status readConcernStatus = replCoord->waitUntilOpTimeForRead( @@ -118,7 +125,7 @@ StatusWith<Shard::QueryResponse> RSLocalClient::queryOnce( // Inform the storage engine to read from the committed snapshot for the rest of this // operation. - status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); + status = opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot(); if (!status.isOK()) { return status; } |