/** * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand #include "mongo/platform/basic.h" #include "mongo/db/read_concern.h" #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/logical_clock.h" #include "mongo/db/op_observer.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" namespace mongo { namespace { /** * Synchronize writeRequests */ class WriteRequestSynchronizer; const auto getWriteRequestsSynchronizer = ServiceContext::declareDecoration(); class WriteRequestSynchronizer { public: WriteRequestSynchronizer() = default; /** * Returns a tuple if it can find the one that happened after or * at clusterTime. * Returns a tuple otherwise. */ std::tuple>> getOrCreateWriteRequest( LogicalTime clusterTime) { stdx::unique_lock lock(_mutex); auto lastEl = _writeRequests.rbegin(); if (lastEl != _writeRequests.rend() && lastEl->first >= clusterTime.asTimestamp()) { return std::make_tuple(false, lastEl->second); } else { auto newWriteRequest = std::make_shared>(); _writeRequests[clusterTime.asTimestamp()] = newWriteRequest; return std::make_tuple(true, newWriteRequest); } } /** * Erases writeRequest that happened at clusterTime */ void deleteWriteRequest(LogicalTime clusterTime) { stdx::unique_lock lock(_mutex); auto el = _writeRequests.find(clusterTime.asTimestamp()); invariant(el != _writeRequests.end()); invariant(el->second); el->second.reset(); _writeRequests.erase(el); } private: stdx::mutex _mutex; std::map>> _writeRequests; }; /** * Schedule a write via appendOplogNote command to the primary of this replica set. */ Status makeNoopWriteIfNeeded(OperationContext* opCtx, LogicalTime clusterTime) { repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx); invariant(replCoord->isReplEnabled()); auto& writeRequests = getWriteRequestsSynchronizer(opCtx->getClient()->getServiceContext()); auto lastAppliedOpTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp()); auto status = Status::OK(); int remainingAttempts = 3; // this loop addresses the case when two or more threads need to advance the opLog time but the // one that waits for the notification gets the later clusterTime, so when the request finishes // it needs to be repeated with the later time. while (clusterTime > lastAppliedOpTime) { auto shardingState = ShardingState::get(opCtx); // standalone replica set, so there is no need to advance the OpLog on the primary. if (!shardingState->enabled()) { return Status::OK(); } auto myShard = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardingState->getShardName()); if (!myShard.isOK()) { return myShard.getStatus(); } if (!remainingAttempts--) { std::stringstream ss; ss << "Requested clusterTime" << clusterTime.toString() << " is greater than the last primary OpTime: " << lastAppliedOpTime.toString() << " no retries left"; return Status(ErrorCodes::InternalError, ss.str()); } auto myWriteRequest = writeRequests.getOrCreateWriteRequest(clusterTime); if (std::get<0>(myWriteRequest)) { // Its a new request try { LOG(2) << "New appendOplogNote request on clusterTime: " << clusterTime.toString() << " remaining attempts: " << remainingAttempts; auto swRes = myShard.getValue()->runCommand( opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), "admin", BSON("appendOplogNote" << 1 << "maxClusterTime" << clusterTime.asTimestamp() << "data" << BSON("noop write for afterClusterTime read concern" << 1)), Shard::RetryPolicy::kIdempotent); status = swRes.getStatus(); std::get<1>(myWriteRequest)->set(status); writeRequests.deleteWriteRequest(clusterTime); } catch (const DBException& ex) { status = ex.toStatus(); // signal the writeRequest to unblock waiters std::get<1>(myWriteRequest)->set(status); writeRequests.deleteWriteRequest(clusterTime); } } else { LOG(2) << "Join appendOplogNote request on clusterTime: " << clusterTime.toString() << " remaining attempts: " << remainingAttempts; try { status = std::get<1>(myWriteRequest)->get(opCtx); } catch (const DBException& ex) { return ex.toStatus(); } } // If the write status is ok need to wait for the oplog to replicate. if (status.isOK()) { return status; } lastAppliedOpTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp()); } invariant(status.isOK()); return status; } } // namespace Status waitForReadConcern(OperationContext* opCtx, const repl::ReadConcernArgs& readConcernArgs, bool allowAfterClusterTime) { repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx); invariant(replCoord); if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) { if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { // For master/slave and standalone nodes, Linearizable Read is not supported. return {ErrorCodes::NotAReplicaSet, "node needs to be a replica set member to use read concern"}; } // Replica sets running pv0 do not support linearizable read concern until further testing // is completed (SERVER-27025). if (!replCoord->isV1ElectionProtocol()) { return { ErrorCodes::IncompatibleElectionProtocol, "Replica sets running protocol version 0 do not support readConcern: linearizable"}; } if (readConcernArgs.getArgsOpTime()) { return {ErrorCodes::FailedToParse, "afterOpTime not compatible with linearizable read concern"}; } if (!replCoord->getMemberState().primary()) { return {ErrorCodes::NotMaster, "cannot satisfy linearizable read concern on non-primary node"}; } } auto afterClusterTime = readConcernArgs.getArgsClusterTime(); if (afterClusterTime) { if (!allowAfterClusterTime) { return {ErrorCodes::InvalidOptions, "afterClusterTime is not allowed for this command"}; } if ((serverGlobalParams.featureCompatibility.getVersion() != ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo36) && ShardingState::get(opCtx)->enabled()) { return {ErrorCodes::InvalidOptions, "readConcern afterClusterTime is not available in featureCompatibilityVersion " "3.4 in a sharded cluster"}; } auto currentTime = LogicalClock::get(opCtx)->getClusterTime(); if (currentTime < *afterClusterTime) { return {ErrorCodes::InvalidOptions, "readConcern afterClusterTime must not be greater than clusterTime value"}; } } auto pointInTime = readConcernArgs.getArgsPointInTime(); if (pointInTime) { fassertStatusOK( 39345, opCtx->recoveryUnit()->selectSnapshot(SnapshotName(pointInTime->asTimestamp()))); } if (!readConcernArgs.isEmpty()) { if (replCoord->isReplEnabled() && afterClusterTime) { auto status = makeNoopWriteIfNeeded(opCtx, *afterClusterTime); if (!status.isOK()) { LOG(0) << "Failed noop write at clusterTime: " << afterClusterTime->toString() << " due to " << status.toString(); } } if (replCoord->isReplEnabled() || !afterClusterTime) { auto status = replCoord->waitUntilOpTimeForRead(opCtx, readConcernArgs); if (!status.isOK()) { return status; } } } if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern && replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet) { // ReadConcern Majority is not supported in ProtocolVersion 0. if (!replCoord->isV1ElectionProtocol()) { return {ErrorCodes::ReadConcernMajorityNotEnabled, str::stream() << "Replica sets running protocol version 0 do not support " "readConcern: majority"}; } const int debugLevel = serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 1 : 2; LOG(debugLevel) << "Waiting for 'committed' snapshot to be available for reading: " << readConcernArgs; Status status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); // Wait until a snapshot is available. while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) { LOG(debugLevel) << "Snapshot not available yet."; replCoord->waitUntilSnapshotCommitted(opCtx, SnapshotName::min()); status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); } if (!status.isOK()) { return status; } LOG(debugLevel) << "Using 'committed' snapshot: " << CurOp::get(opCtx)->opDescription(); } return Status::OK(); } Status waitForLinearizableReadConcern(OperationContext* opCtx) { repl::ReplicationCoordinator* replCoord = repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext()); { Lock::DBLock lk(opCtx, "local", MODE_IX); Lock::CollectionLock lock(opCtx->lockState(), "local.oplog.rs", MODE_IX); if (!replCoord->canAcceptWritesForDatabase(opCtx, "admin")) { return {ErrorCodes::NotMaster, "No longer primary when waiting for linearizable read concern"}; } writeConflictRetry(opCtx, "waitForLinearizableReadConcern", "local.rs.oplog", [&opCtx] { WriteUnitOfWork uow(opCtx); opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage( opCtx, BSON("msg" << "linearizable read")); uow.commit(); }); } WriteConcernOptions wc = WriteConcernOptions( WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0); repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); auto awaitReplResult = replCoord->awaitReplication(opCtx, lastOpApplied, wc); if (awaitReplResult.status == ErrorCodes::WriteConcernFailed) { return Status(ErrorCodes::LinearizableReadConcernError, "Failed to confirm that read was linearizable."); } return awaitReplResult.status; } } // namespace mongo