diff options
Diffstat (limited to 'src/mongo/db/commands')
-rw-r--r-- | src/mongo/db/commands/dbcommands.cpp | 163 | ||||
-rw-r--r-- | src/mongo/db/commands/oplog_note.cpp | 34 |
2 files changed, 19 insertions, 178 deletions
diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index f3dfa5856fd..becd4501056 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -84,6 +84,7 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_planner.h" +#include "mongo/db/read_concern.h" #include "mongo/db/repair_database.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" @@ -106,7 +107,6 @@ #include "mongo/rpc/reply_builder_interface.h" #include "mongo/rpc/request_interface.h" #include "mongo/s/chunk_version.h" -#include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" #include "mongo/scripting/engine.h" @@ -124,14 +124,6 @@ using std::unique_ptr; namespace { -// This is a special flag that allows for testing of snapshot behavior by skipping the replication -// related checks and isolating the storage/query side of snapshotting. -bool testingSnapshotBehaviorInIsolation = false; -ExportedServerParameter<bool, ServerParameterType::kStartupOnly> TestingSnapshotBehaviorInIsolation( - ServerParameterSet::getGlobal(), - "testingSnapshotBehaviorInIsolation", - &testingSnapshotBehaviorInIsolation); - void registerErrorImpl(OperationContext* opCtx, const DBException& exception) { CurOp::get(opCtx)->debug().exceptionInfo = exception.getInfo(); } @@ -193,159 +185,6 @@ LogicalTime computeOperationTime(OperationContext* opCtx, return operationTime; } -Status makeNoopWriteIfNeeded(OperationContext* opCtx, LogicalTime clusterTime) { - repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx); - auto lastAppliedTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp()); - if (clusterTime > lastAppliedTime) { - auto shardingState = ShardingState::get(opCtx); - // standalone replica set, so there is no need to advance the OpLog on the primary. - if (!shardingState->enabled()) { - return Status::OK(); - } - - auto myShard = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardingState->getShardName()); - if (!myShard.isOK()) { - return myShard.getStatus(); - } - - auto swRes = myShard.getValue()->runCommand( - opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - "admin", - BSON("applyOpLogNote" << 1 << "clusterTime" << clusterTime.asTimestamp() << "data" - << BSON("append noop write" << 1)), - Shard::RetryPolicy::kIdempotent); - return swRes.getStatus(); - } - return Status::OK(); -} - -Status waitForReadConcern(OperationContext* opCtx, const repl::ReadConcernArgs& readConcernArgs) { - repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx); - - if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) { - if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { - // For master/slave and standalone nodes, Linearizable Read is not supported. - return {ErrorCodes::NotAReplicaSet, - "node needs to be a replica set member to use read concern"}; - } - - // Replica sets running pv0 do not support linearizable read concern until further testing - // is completed (SERVER-27025). - if (!replCoord->isV1ElectionProtocol()) { - return { - ErrorCodes::IncompatibleElectionProtocol, - "Replica sets running protocol version 0 do not support readConcern: linearizable"}; - } - - if (readConcernArgs.getArgsOpTime()) { - return {ErrorCodes::FailedToParse, - "afterOpTime not compatible with linearizable read concern"}; - } - - if (!replCoord->getMemberState().primary()) { - return {ErrorCodes::NotMaster, - "cannot satisfy linearizable read concern on non-primary node"}; - } - } - - auto afterClusterTime = readConcernArgs.getArgsClusterTime(); - if (afterClusterTime) { - auto currentTime = LogicalClock::get(opCtx)->getClusterTime().getTime(); - if (currentTime < *afterClusterTime) { - return {ErrorCodes::InvalidOptions, - "readConcern afterClusterTime must not be greater than clusterTime value"}; - } - } - - // Skip waiting for the OpTime when testing snapshot behavior - if (!testingSnapshotBehaviorInIsolation && !readConcernArgs.isEmpty()) { - if (afterClusterTime) { - auto status = makeNoopWriteIfNeeded(opCtx, *afterClusterTime); - if (!status.isOK()) { - LOG(1) << "failed noop write due to " << status.toString(); - } - } - - auto status = replCoord->waitUntilOpTimeForRead(opCtx, readConcernArgs); - if (!status.isOK()) { - return status; - } - } - - if ((replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet || - testingSnapshotBehaviorInIsolation) && - readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern) { - // ReadConcern Majority is not supported in ProtocolVersion 0. - if (!testingSnapshotBehaviorInIsolation && !replCoord->isV1ElectionProtocol()) { - return {ErrorCodes::ReadConcernMajorityNotEnabled, - str::stream() << "Replica sets running protocol version 0 do not support " - "readConcern: majority"}; - } - - const int debugLevel = serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 1 : 2; - - LOG(debugLevel) << "Waiting for 'committed' snapshot to be available for reading: " - << readConcernArgs; - - Status status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); - - // Wait until a snapshot is available. - while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) { - LOG(debugLevel) << "Snapshot not available yet."; - replCoord->waitUntilSnapshotCommitted(opCtx, SnapshotName::min()); - status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); - } - - if (!status.isOK()) { - return status; - } - - LOG(debugLevel) << "Using 'committed' snapshot: " << CurOp::get(opCtx)->query(); - } - - return Status::OK(); -} - -Status waitForLinearizableReadConcern(OperationContext* opCtx) { - - repl::ReplicationCoordinator* replCoord = - repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext()); - - { - Lock::DBLock lk(opCtx, "local", MODE_IX); - Lock::CollectionLock lock(opCtx->lockState(), "local.oplog.rs", MODE_IX); - - if (!replCoord->canAcceptWritesForDatabase(opCtx, "admin")) { - return {ErrorCodes::NotMaster, - "No longer primary when waiting for linearizable read concern"}; - } - - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - - WriteUnitOfWork uow(opCtx); - opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage( - opCtx, - BSON("msg" - << "linearizable read")); - uow.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - opCtx, "waitForLinearizableReadConcern", "local.rs.oplog"); - } - WriteConcernOptions wc = WriteConcernOptions( - WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0); - - repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - auto awaitReplResult = replCoord->awaitReplication(opCtx, lastOpApplied, wc); - if (awaitReplResult.status == ErrorCodes::WriteConcernFailed) { - return Status(ErrorCodes::LinearizableReadConcernError, - "Failed to confirm that read was linearizable."); - } - return awaitReplResult.status; -} - } // namespace diff --git a/src/mongo/db/commands/oplog_note.cpp b/src/mongo/db/commands/oplog_note.cpp index abdd78901c9..9a5760d616a 100644 --- a/src/mongo/db/commands/oplog_note.cpp +++ b/src/mongo/db/commands/oplog_note.cpp @@ -118,15 +118,6 @@ public: BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result) { - BSONElement dataElement; - auto dataStatus = bsonExtractTypedField(cmdObj, "data", Object, &dataElement); - if (!dataStatus.isOK()) { - return appendCommandStatus(result, dataStatus); - } - - Timestamp clusterTime; - auto clusterTimeStatus = bsonExtractTimestampField(cmdObj, "clusterTime", &clusterTime); - auto replCoord = repl::ReplicationCoordinator::get(opCtx); if (!replCoord->isReplEnabled()) { return appendCommandStatus(result, @@ -134,22 +125,33 @@ public: "Must have replication set up to run \"appendOplogNote\""}); } - if (!clusterTimeStatus.isOK()) { - if (clusterTimeStatus == ErrorCodes::NoSuchKey) { // no need to use clusterTime + BSONElement dataElement; + auto dataStatus = bsonExtractTypedField(cmdObj, "data", Object, &dataElement); + if (!dataStatus.isOK()) { + return appendCommandStatus(result, dataStatus); + } + + Timestamp maxClusterTime; + auto maxClusterTimeStatus = + bsonExtractTimestampField(cmdObj, "maxClusterTime", &maxClusterTime); + + if (!maxClusterTimeStatus.isOK()) { + if (maxClusterTimeStatus == ErrorCodes::NoSuchKey) { // no need to use maxClusterTime return appendCommandStatus( result, _performNoopWrite(opCtx, dataElement.Obj(), "appendOpLogNote")); } - return appendCommandStatus(result, clusterTimeStatus); + return appendCommandStatus(result, maxClusterTimeStatus); } auto lastAppliedOpTime = replCoord->getMyLastAppliedOpTime().getTimestamp(); - if (clusterTime > lastAppliedOpTime) { + if (maxClusterTime > lastAppliedOpTime) { return appendCommandStatus( result, _performNoopWrite(opCtx, dataElement.Obj(), "appendOpLogNote")); } else { - LOG(1) << "Not scheduling a noop write. Requested clusterTime" << clusterTime - << " is less or equal to the last primary OpTime: " << lastAppliedOpTime; - return appendCommandStatus(result, Status::OK()); + std::stringstream ss; + ss << "Requested maxClusterTime" << maxClusterTime.toString() + << " is less or equal to the last primary OpTime: " << lastAppliedOpTime.toString(); + return appendCommandStatus(result, {ErrorCodes::StaleClusterTime, ss.str()}); } } }; |