diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-07-06 13:54:16 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-07-08 17:18:46 -0400 |
commit | 71c38ae1c608e1fa4abd9c8d8642508b87d333e5 (patch) | |
tree | 4462e263bcec66cc2fb898402b29c875f8449e05 /src/mongo | |
parent | 8aa68afc0ec5043d13c2d0a131974c0ac6da7e6d (diff) | |
download | mongo-71c38ae1c608e1fa4abd9c8d8642508b87d333e5.tar.gz |
SERVER-24939 Pull read concern extract, check and wait to separate utility
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/dbcommands.cpp | 140 | ||||
-rw-r--r-- | src/mongo/db/read_concern.cpp | 133 | ||||
-rw-r--r-- | src/mongo/db/read_concern.h | 58 | ||||
-rw-r--r-- | src/mongo/db/write_concern.cpp | 2 |
5 files changed, 220 insertions, 114 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index e096f4f571e..e8ff463464b 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -656,6 +656,7 @@ serverOnlyFiles = [ "prefetch.cpp", "range_deleter_db_env.cpp", "range_deleter_service.cpp", + "read_concern.cpp", "repair_database.cpp", "repl/initial_sync.cpp", "repl/master_slave.cpp", diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 91f9ad970d3..99462a65d2e 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -79,6 +79,7 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_planner.h" +#include "mongo/db/read_concern.h" #include "mongo/db/repair_database.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" @@ -87,7 +88,6 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/server_parameters.h" #include "mongo/db/write_concern.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/config_server_metadata.h" @@ -115,15 +115,6 @@ using std::string; using std::stringstream; using std::unique_ptr; -// This is a special flag that allows for testing of snapshot behavior by skipping the replication -// related checks and isolating the storage/query side of snapshotting. -bool testingSnapshotBehaviorInIsolation = false; -ExportedServerParameter<bool, ServerParameterType::kStartupOnly> TestingSnapshotBehaviorInIsolation( - ServerParameterSet::getGlobal(), - "testingSnapshotBehaviorInIsolation", - &testingSnapshotBehaviorInIsolation); - - class CmdShutdownMongoD : public CmdShutdown { public: virtual void help(stringstream& help) const { @@ -1443,119 +1434,41 @@ bool Command::run(OperationContext* txn, bytesToReserve = 0; #endif - BSONObjBuilder inPlaceReplyBob(replyBuilder->getInPlaceReplyBuilder(bytesToReserve)); + // run expects non-const bsonobj + BSONObj cmd = request.getCommandArgs(); - repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); + // run expects const db std::string (can't bind to temporary) + const std::string db = request.getDatabase().toString(); + + BSONObjBuilder inPlaceReplyBob(replyBuilder->getInPlaceReplyBuilder(bytesToReserve)); - repl::ReadConcernArgs readConcernArgs; { - // parse and validate ReadConcernArgs - auto readConcernParseStatus = readConcernArgs.initialize(request.getCommandArgs()); - if (!readConcernParseStatus.isOK()) { - auto result = appendCommandStatus(inPlaceReplyBob, readConcernParseStatus); + auto readConcernArgsStatus = extractReadConcern(txn, cmd, supportsReadConcern()); + if (!readConcernArgsStatus.isOK()) { + auto result = appendCommandStatus(inPlaceReplyBob, readConcernArgsStatus.getStatus()); inPlaceReplyBob.doneFast(); replyBuilder->setMetadata(rpc::makeEmptyMetadata()); return result; } - if (!supportsReadConcern()) { - // Only return an error if a non-nullish readConcern was parsed, but do not process - // readConcern regardless. - if (!readConcernArgs.getOpTime().isNull() || - readConcernArgs.getLevel() != repl::ReadConcernLevel::kLocalReadConcern) { - auto result = appendCommandStatus( - inPlaceReplyBob, - {ErrorCodes::InvalidOptions, - str::stream() << "Command " << getName() << " does not support " - << repl::ReadConcernArgs::kReadConcernFieldName}); - inPlaceReplyBob.doneFast(); - replyBuilder->setMetadata(rpc::makeEmptyMetadata()); - return result; - } - } else { - // Skip waiting for the OpTime when testing snapshot behavior. - if (!testingSnapshotBehaviorInIsolation && !readConcernArgs.isEmpty()) { - // Wait for readConcern to be satisfied. - auto readConcernStatus = replCoord->waitUntilOpTimeForRead(txn, readConcernArgs); - if (!readConcernStatus.isOK()) { - if (ErrorCodes::ExceededTimeLimit == readConcernStatus) { - const int debugLevel = - serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2; - LOG(debugLevel) - << "Command on database " << request.getDatabase() - << " timed out waiting for read concern to be satisfied. Command: " - << getRedactedCopyForLogging(request.getCommandArgs()); - } - - auto result = appendCommandStatus(inPlaceReplyBob, readConcernStatus); - inPlaceReplyBob.doneFast(); - replyBuilder->setMetadata(rpc::makeEmptyMetadata()); - return result; - } - } - - if ((replCoord->getReplicationMode() == - repl::ReplicationCoordinator::Mode::modeReplSet || - testingSnapshotBehaviorInIsolation) && - (readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern || - readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern)) { - // ReadConcern Majority is not supported in ProtocolVersion 0. - if (!testingSnapshotBehaviorInIsolation && !replCoord->isV1ElectionProtocol()) { - auto result = appendCommandStatus( - inPlaceReplyBob, - {ErrorCodes::ReadConcernMajorityNotEnabled, - str::stream() << "Replica sets running protocol version 0 do not support " - "readConcern: majority"}); - inPlaceReplyBob.doneFast(); - replyBuilder->setMetadata(rpc::makeEmptyMetadata()); - return result; - } + Status rcStatus = waitForReadConcern(txn, readConcernArgsStatus.getValue()); + if (!rcStatus.isOK()) { + if (rcStatus == ErrorCodes::ExceededTimeLimit) { const int debugLevel = - serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 1 : 2; - LOG(debugLevel) << "Waiting for 'committed' snapshot to be available for reading: " - << readConcernArgs; - Status status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); - - // Wait until a snapshot is available. - while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) { - LOG(debugLevel) << "Snapshot not available for readConcern: " - << readConcernArgs; - replCoord->waitUntilSnapshotCommitted(txn, SnapshotName::min()); - status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); - } - - LOG(debugLevel) << "Using 'committed' snapshot. " << CurOp::get(txn)->query(); - - if (!status.isOK()) { - auto result = appendCommandStatus(inPlaceReplyBob, status); - inPlaceReplyBob.doneFast(); - replyBuilder->setMetadata(rpc::makeEmptyMetadata()); - return result; - } + serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2; + LOG(debugLevel) << "Command on database " << db + << " timed out waiting for read concern to be satisfied. Command: " + << getRedactedCopyForLogging(request.getCommandArgs()); } - } - if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) { - uassert(ErrorCodes::FailedToParse, - "afterOpTime not compatible with read concern level linearizable", - readConcernArgs.getOpTime().isNull()); - uassert(ErrorCodes::NotMaster, - "cannot satisfy linearizable read concern on non-primary node", - replCoord->getMemberState().primary()); + auto result = appendCommandStatus(inPlaceReplyBob, rcStatus); + inPlaceReplyBob.doneFast(); + replyBuilder->setMetadata(rpc::makeEmptyMetadata()); + return result; } } - // run expects non-const bsonobj - BSONObj cmd = request.getCommandArgs(); - // Implementation just forwards to the old method signature for now. - std::string errmsg; - - // run expects const db std::string (can't bind to temporary) - const std::string db = request.getDatabase().toString(); - - StatusWith<WriteConcernOptions> wcResult = - extractWriteConcern(txn, cmd, db, this->supportsWriteConcern(cmd)); - + auto wcResult = extractWriteConcern(txn, cmd, db, supportsWriteConcern(cmd)); if (!wcResult.isOK()) { auto result = appendCommandStatus(inPlaceReplyBob, wcResult.getStatus()); inPlaceReplyBob.doneFast(); @@ -1563,17 +1476,18 @@ bool Command::run(OperationContext* txn, return result; } + std::string errmsg; bool result; - if (!this->supportsWriteConcern(cmd)) { + if (!supportsWriteConcern(cmd)) { // TODO: remove queryOptions parameter from command's run method. - result = this->run(txn, db, cmd, 0, errmsg, inPlaceReplyBob); + result = run(txn, db, cmd, 0, errmsg, inPlaceReplyBob); } else { // Change the write concern while running the command. const auto oldWC = txn->getWriteConcern(); ON_BLOCK_EXIT([&] { txn->setWriteConcern(oldWC); }); txn->setWriteConcern(wcResult.getValue()); - result = this->run(txn, db, cmd, 0, errmsg, inPlaceReplyBob); + result = run(txn, db, cmd, 0, errmsg, inPlaceReplyBob); // Nothing in run() should change the writeConcern. dassert(txn->getWriteConcern().toBSON() == wcResult.getValue().toBSON()); @@ -1588,7 +1502,7 @@ bool Command::run(OperationContext* txn, // SERVER-22421: This code is to ensure error response backwards compatibility with the // user management commands. This can be removed in 3.6. - if (!waitForWCStatus.isOK() && isUserManagementCommand(this->getName())) { + if (!waitForWCStatus.isOK() && isUserManagementCommand(getName())) { BSONObj temp = inPlaceReplyBob.asTempObj().copy(); inPlaceReplyBob.resetToEmpty(); appendCommandStatus(inPlaceReplyBob, waitForWCStatus); diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp new file mode 100644 index 00000000000..f4baada6218 --- /dev/null +++ b/src/mongo/db/read_concern.cpp @@ -0,0 +1,133 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/read_concern.h" + +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/db/commands.h" +#include "mongo/db/curop.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/server_options.h" +#include "mongo/db/server_parameters.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +// This is a special flag that allows for testing of snapshot behavior by skipping the replication +// related checks and isolating the storage/query side of snapshotting. +bool testingSnapshotBehaviorInIsolation = false; +ExportedServerParameter<bool, ServerParameterType::kStartupOnly> TestingSnapshotBehaviorInIsolation( + ServerParameterSet::getGlobal(), + "testingSnapshotBehaviorInIsolation", + &testingSnapshotBehaviorInIsolation); + +} // namespace + +StatusWith<repl::ReadConcernArgs> extractReadConcern(OperationContext* txn, + const BSONObj& cmdObj, + bool supportsReadConcern) { + repl::ReadConcernArgs readConcernArgs; + + auto readConcernParseStatus = readConcernArgs.initialize(cmdObj); + if (!readConcernParseStatus.isOK()) { + return readConcernParseStatus; + } + + if (!supportsReadConcern && !readConcernArgs.isEmpty()) { + return {ErrorCodes::InvalidOptions, + str::stream() << "Command does not support read concern"}; + } + + return readConcernArgs; +} + +Status waitForReadConcern(OperationContext* txn, const repl::ReadConcernArgs& readConcernArgs) { + repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(txn); + + // Skip waiting for the OpTime when testing snapshot behavior + if (!testingSnapshotBehaviorInIsolation && !readConcernArgs.isEmpty()) { + Status status = replCoord->waitUntilOpTimeForRead(txn, readConcernArgs); + if (!status.isOK()) { + return status; + } + } + + if ((replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet || + testingSnapshotBehaviorInIsolation) && + (readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern || + readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern)) { + // ReadConcern Majority is not supported in ProtocolVersion 0. + if (!testingSnapshotBehaviorInIsolation && !replCoord->isV1ElectionProtocol()) { + return {ErrorCodes::ReadConcernMajorityNotEnabled, + str::stream() << "Replica sets running protocol version 0 do not support " + "readConcern: majority"}; + } + + const int debugLevel = serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 1 : 2; + + LOG(debugLevel) << "Waiting for 'committed' snapshot to be available for reading: " + << readConcernArgs; + + Status status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); + + // Wait until a snapshot is available. + while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) { + LOG(debugLevel) << "Snapshot not available yet."; + replCoord->waitUntilSnapshotCommitted(txn, SnapshotName::min()); + status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); + } + + if (!status.isOK()) { + return status; + } + + LOG(debugLevel) << "Using 'committed' snapshot: " << CurOp::get(txn)->query(); + } + + if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) { + if (!readConcernArgs.getOpTime().isNull()) + return {ErrorCodes::FailedToParse, + "afterOpTime not compatible with linearizable read concern"}; + + if (!replCoord->getMemberState().primary()) + return {ErrorCodes::NotMaster, + "cannot satisfy linearizable read concern on non-primary node"}; + } + + return Status::OK(); +} + +} // namespace mongo diff --git a/src/mongo/db/read_concern.h b/src/mongo/db/read_concern.h new file mode 100644 index 00000000000..35dd7f68b43 --- /dev/null +++ b/src/mongo/db/read_concern.h @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +namespace mongo { + +class BSONObj; +class OperationContext; +class Status; +template <typename T> +class StatusWith; + +namespace repl { +class ReadConcernArgs; +} + +/** + * Given the specified command and whether it supports read concern, returns an effective read + * concern which should be used. + */ +StatusWith<repl::ReadConcernArgs> extractReadConcern(OperationContext* txn, + const BSONObj& cmdObj, + bool supportsReadConcern); + +/** + * Given the specified read concern arguments, performs checks that the read concern can actually be + * satisfied given the current state of the server and if so calls into the replication subsystem to + * perform the wait. + */ +Status waitForReadConcern(OperationContext* txn, const repl::ReadConcernArgs& readConcernArgs); + +} // namespace mongo diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp index ac428b29183..a9c9adf1b99 100644 --- a/src/mongo/db/write_concern.cpp +++ b/src/mongo/db/write_concern.cpp @@ -26,7 +26,7 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand #include "mongo/platform/basic.h" |