summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands')
-rw-r--r--src/mongo/db/commands/dbcommands.cpp163
-rw-r--r--src/mongo/db/commands/oplog_note.cpp34
2 files changed, 19 insertions, 178 deletions
diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp
index f3dfa5856fd..becd4501056 100644
--- a/src/mongo/db/commands/dbcommands.cpp
+++ b/src/mongo/db/commands/dbcommands.cpp
@@ -84,6 +84,7 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/query_planner.h"
+#include "mongo/db/read_concern.h"
#include "mongo/db/repair_database.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
@@ -106,7 +107,6 @@
#include "mongo/rpc/reply_builder_interface.h"
#include "mongo/rpc/request_interface.h"
#include "mongo/s/chunk_version.h"
-#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
#include "mongo/scripting/engine.h"
@@ -124,14 +124,6 @@ using std::unique_ptr;
namespace {
-// This is a special flag that allows for testing of snapshot behavior by skipping the replication
-// related checks and isolating the storage/query side of snapshotting.
-bool testingSnapshotBehaviorInIsolation = false;
-ExportedServerParameter<bool, ServerParameterType::kStartupOnly> TestingSnapshotBehaviorInIsolation(
- ServerParameterSet::getGlobal(),
- "testingSnapshotBehaviorInIsolation",
- &testingSnapshotBehaviorInIsolation);
-
void registerErrorImpl(OperationContext* opCtx, const DBException& exception) {
CurOp::get(opCtx)->debug().exceptionInfo = exception.getInfo();
}
@@ -193,159 +185,6 @@ LogicalTime computeOperationTime(OperationContext* opCtx,
return operationTime;
}
-Status makeNoopWriteIfNeeded(OperationContext* opCtx, LogicalTime clusterTime) {
- repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx);
- auto lastAppliedTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp());
- if (clusterTime > lastAppliedTime) {
- auto shardingState = ShardingState::get(opCtx);
- // standalone replica set, so there is no need to advance the OpLog on the primary.
- if (!shardingState->enabled()) {
- return Status::OK();
- }
-
- auto myShard =
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardingState->getShardName());
- if (!myShard.isOK()) {
- return myShard.getStatus();
- }
-
- auto swRes = myShard.getValue()->runCommand(
- opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- "admin",
- BSON("applyOpLogNote" << 1 << "clusterTime" << clusterTime.asTimestamp() << "data"
- << BSON("append noop write" << 1)),
- Shard::RetryPolicy::kIdempotent);
- return swRes.getStatus();
- }
- return Status::OK();
-}
-
-Status waitForReadConcern(OperationContext* opCtx, const repl::ReadConcernArgs& readConcernArgs) {
- repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx);
-
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) {
- if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) {
- // For master/slave and standalone nodes, Linearizable Read is not supported.
- return {ErrorCodes::NotAReplicaSet,
- "node needs to be a replica set member to use read concern"};
- }
-
- // Replica sets running pv0 do not support linearizable read concern until further testing
- // is completed (SERVER-27025).
- if (!replCoord->isV1ElectionProtocol()) {
- return {
- ErrorCodes::IncompatibleElectionProtocol,
- "Replica sets running protocol version 0 do not support readConcern: linearizable"};
- }
-
- if (readConcernArgs.getArgsOpTime()) {
- return {ErrorCodes::FailedToParse,
- "afterOpTime not compatible with linearizable read concern"};
- }
-
- if (!replCoord->getMemberState().primary()) {
- return {ErrorCodes::NotMaster,
- "cannot satisfy linearizable read concern on non-primary node"};
- }
- }
-
- auto afterClusterTime = readConcernArgs.getArgsClusterTime();
- if (afterClusterTime) {
- auto currentTime = LogicalClock::get(opCtx)->getClusterTime().getTime();
- if (currentTime < *afterClusterTime) {
- return {ErrorCodes::InvalidOptions,
- "readConcern afterClusterTime must not be greater than clusterTime value"};
- }
- }
-
- // Skip waiting for the OpTime when testing snapshot behavior
- if (!testingSnapshotBehaviorInIsolation && !readConcernArgs.isEmpty()) {
- if (afterClusterTime) {
- auto status = makeNoopWriteIfNeeded(opCtx, *afterClusterTime);
- if (!status.isOK()) {
- LOG(1) << "failed noop write due to " << status.toString();
- }
- }
-
- auto status = replCoord->waitUntilOpTimeForRead(opCtx, readConcernArgs);
- if (!status.isOK()) {
- return status;
- }
- }
-
- if ((replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet ||
- testingSnapshotBehaviorInIsolation) &&
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern) {
- // ReadConcern Majority is not supported in ProtocolVersion 0.
- if (!testingSnapshotBehaviorInIsolation && !replCoord->isV1ElectionProtocol()) {
- return {ErrorCodes::ReadConcernMajorityNotEnabled,
- str::stream() << "Replica sets running protocol version 0 do not support "
- "readConcern: majority"};
- }
-
- const int debugLevel = serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 1 : 2;
-
- LOG(debugLevel) << "Waiting for 'committed' snapshot to be available for reading: "
- << readConcernArgs;
-
- Status status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
-
- // Wait until a snapshot is available.
- while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) {
- LOG(debugLevel) << "Snapshot not available yet.";
- replCoord->waitUntilSnapshotCommitted(opCtx, SnapshotName::min());
- status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
- }
-
- if (!status.isOK()) {
- return status;
- }
-
- LOG(debugLevel) << "Using 'committed' snapshot: " << CurOp::get(opCtx)->query();
- }
-
- return Status::OK();
-}
-
-Status waitForLinearizableReadConcern(OperationContext* opCtx) {
-
- repl::ReplicationCoordinator* replCoord =
- repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
-
- {
- Lock::DBLock lk(opCtx, "local", MODE_IX);
- Lock::CollectionLock lock(opCtx->lockState(), "local.oplog.rs", MODE_IX);
-
- if (!replCoord->canAcceptWritesForDatabase(opCtx, "admin")) {
- return {ErrorCodes::NotMaster,
- "No longer primary when waiting for linearizable read concern"};
- }
-
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
-
- WriteUnitOfWork uow(opCtx);
- opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage(
- opCtx,
- BSON("msg"
- << "linearizable read"));
- uow.commit();
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- opCtx, "waitForLinearizableReadConcern", "local.rs.oplog");
- }
- WriteConcernOptions wc = WriteConcernOptions(
- WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0);
-
- repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- auto awaitReplResult = replCoord->awaitReplication(opCtx, lastOpApplied, wc);
- if (awaitReplResult.status == ErrorCodes::WriteConcernFailed) {
- return Status(ErrorCodes::LinearizableReadConcernError,
- "Failed to confirm that read was linearizable.");
- }
- return awaitReplResult.status;
-}
-
} // namespace
diff --git a/src/mongo/db/commands/oplog_note.cpp b/src/mongo/db/commands/oplog_note.cpp
index abdd78901c9..9a5760d616a 100644
--- a/src/mongo/db/commands/oplog_note.cpp
+++ b/src/mongo/db/commands/oplog_note.cpp
@@ -118,15 +118,6 @@ public:
BSONObj& cmdObj,
string& errmsg,
BSONObjBuilder& result) {
- BSONElement dataElement;
- auto dataStatus = bsonExtractTypedField(cmdObj, "data", Object, &dataElement);
- if (!dataStatus.isOK()) {
- return appendCommandStatus(result, dataStatus);
- }
-
- Timestamp clusterTime;
- auto clusterTimeStatus = bsonExtractTimestampField(cmdObj, "clusterTime", &clusterTime);
-
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
if (!replCoord->isReplEnabled()) {
return appendCommandStatus(result,
@@ -134,22 +125,33 @@ public:
"Must have replication set up to run \"appendOplogNote\""});
}
- if (!clusterTimeStatus.isOK()) {
- if (clusterTimeStatus == ErrorCodes::NoSuchKey) { // no need to use clusterTime
+ BSONElement dataElement;
+ auto dataStatus = bsonExtractTypedField(cmdObj, "data", Object, &dataElement);
+ if (!dataStatus.isOK()) {
+ return appendCommandStatus(result, dataStatus);
+ }
+
+ Timestamp maxClusterTime;
+ auto maxClusterTimeStatus =
+ bsonExtractTimestampField(cmdObj, "maxClusterTime", &maxClusterTime);
+
+ if (!maxClusterTimeStatus.isOK()) {
+ if (maxClusterTimeStatus == ErrorCodes::NoSuchKey) { // no need to use maxClusterTime
return appendCommandStatus(
result, _performNoopWrite(opCtx, dataElement.Obj(), "appendOpLogNote"));
}
- return appendCommandStatus(result, clusterTimeStatus);
+ return appendCommandStatus(result, maxClusterTimeStatus);
}
auto lastAppliedOpTime = replCoord->getMyLastAppliedOpTime().getTimestamp();
- if (clusterTime > lastAppliedOpTime) {
+ if (maxClusterTime > lastAppliedOpTime) {
return appendCommandStatus(
result, _performNoopWrite(opCtx, dataElement.Obj(), "appendOpLogNote"));
} else {
- LOG(1) << "Not scheduling a noop write. Requested clusterTime" << clusterTime
- << " is less or equal to the last primary OpTime: " << lastAppliedOpTime;
- return appendCommandStatus(result, Status::OK());
+ std::stringstream ss;
+ ss << "Requested maxClusterTime" << maxClusterTime.toString()
+ << " is less or equal to the last primary OpTime: " << lastAppliedOpTime.toString();
+ return appendCommandStatus(result, {ErrorCodes::StaleClusterTime, ss.str()});
}
}
};