summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-07-06 13:54:16 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-07-08 17:18:46 -0400
commit71c38ae1c608e1fa4abd9c8d8642508b87d333e5 (patch)
tree4462e263bcec66cc2fb898402b29c875f8449e05 /src/mongo
parent8aa68afc0ec5043d13c2d0a131974c0ac6da7e6d (diff)
downloadmongo-71c38ae1c608e1fa4abd9c8d8642508b87d333e5.tar.gz
SERVER-24939 Pull read concern extract, check and wait to separate utility
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/dbcommands.cpp140
-rw-r--r--src/mongo/db/read_concern.cpp133
-rw-r--r--src/mongo/db/read_concern.h58
-rw-r--r--src/mongo/db/write_concern.cpp2
5 files changed, 220 insertions, 114 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index e096f4f571e..e8ff463464b 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -656,6 +656,7 @@ serverOnlyFiles = [
"prefetch.cpp",
"range_deleter_db_env.cpp",
"range_deleter_service.cpp",
+ "read_concern.cpp",
"repair_database.cpp",
"repl/initial_sync.cpp",
"repl/master_slave.cpp",
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index 91f9ad970d3..99462a65d2e 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -79,6 +79,7 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/query_planner.h"
+#include "mongo/db/read_concern.h"
#include "mongo/db/repair_database.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
@@ -87,7 +88,6 @@
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/server_parameters.h"
#include "mongo/db/write_concern.h"
#include "mongo/rpc/metadata.h"
#include "mongo/rpc/metadata/config_server_metadata.h"
@@ -115,15 +115,6 @@ using std::string;
using std::stringstream;
using std::unique_ptr;
-// This is a special flag that allows for testing of snapshot behavior by skipping the replication
-// related checks and isolating the storage/query side of snapshotting.
-bool testingSnapshotBehaviorInIsolation = false;
-ExportedServerParameter<bool, ServerParameterType::kStartupOnly> TestingSnapshotBehaviorInIsolation(
- ServerParameterSet::getGlobal(),
- "testingSnapshotBehaviorInIsolation",
- &testingSnapshotBehaviorInIsolation);
-
-
class CmdShutdownMongoD : public CmdShutdown {
public:
virtual void help(stringstream& help) const {
@@ -1443,119 +1434,41 @@ bool Command::run(OperationContext* txn,
bytesToReserve = 0;
#endif
- BSONObjBuilder inPlaceReplyBob(replyBuilder->getInPlaceReplyBuilder(bytesToReserve));
+ // run expects non-const bsonobj
+ BSONObj cmd = request.getCommandArgs();
- repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator();
+ // run expects const db std::string (can't bind to temporary)
+ const std::string db = request.getDatabase().toString();
+
+ BSONObjBuilder inPlaceReplyBob(replyBuilder->getInPlaceReplyBuilder(bytesToReserve));
- repl::ReadConcernArgs readConcernArgs;
{
- // parse and validate ReadConcernArgs
- auto readConcernParseStatus = readConcernArgs.initialize(request.getCommandArgs());
- if (!readConcernParseStatus.isOK()) {
- auto result = appendCommandStatus(inPlaceReplyBob, readConcernParseStatus);
+ auto readConcernArgsStatus = extractReadConcern(txn, cmd, supportsReadConcern());
+ if (!readConcernArgsStatus.isOK()) {
+ auto result = appendCommandStatus(inPlaceReplyBob, readConcernArgsStatus.getStatus());
inPlaceReplyBob.doneFast();
replyBuilder->setMetadata(rpc::makeEmptyMetadata());
return result;
}
- if (!supportsReadConcern()) {
- // Only return an error if a non-nullish readConcern was parsed, but do not process
- // readConcern regardless.
- if (!readConcernArgs.getOpTime().isNull() ||
- readConcernArgs.getLevel() != repl::ReadConcernLevel::kLocalReadConcern) {
- auto result = appendCommandStatus(
- inPlaceReplyBob,
- {ErrorCodes::InvalidOptions,
- str::stream() << "Command " << getName() << " does not support "
- << repl::ReadConcernArgs::kReadConcernFieldName});
- inPlaceReplyBob.doneFast();
- replyBuilder->setMetadata(rpc::makeEmptyMetadata());
- return result;
- }
- } else {
- // Skip waiting for the OpTime when testing snapshot behavior.
- if (!testingSnapshotBehaviorInIsolation && !readConcernArgs.isEmpty()) {
- // Wait for readConcern to be satisfied.
- auto readConcernStatus = replCoord->waitUntilOpTimeForRead(txn, readConcernArgs);
- if (!readConcernStatus.isOK()) {
- if (ErrorCodes::ExceededTimeLimit == readConcernStatus) {
- const int debugLevel =
- serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2;
- LOG(debugLevel)
- << "Command on database " << request.getDatabase()
- << " timed out waiting for read concern to be satisfied. Command: "
- << getRedactedCopyForLogging(request.getCommandArgs());
- }
-
- auto result = appendCommandStatus(inPlaceReplyBob, readConcernStatus);
- inPlaceReplyBob.doneFast();
- replyBuilder->setMetadata(rpc::makeEmptyMetadata());
- return result;
- }
- }
-
- if ((replCoord->getReplicationMode() ==
- repl::ReplicationCoordinator::Mode::modeReplSet ||
- testingSnapshotBehaviorInIsolation) &&
- (readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern ||
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern)) {
- // ReadConcern Majority is not supported in ProtocolVersion 0.
- if (!testingSnapshotBehaviorInIsolation && !replCoord->isV1ElectionProtocol()) {
- auto result = appendCommandStatus(
- inPlaceReplyBob,
- {ErrorCodes::ReadConcernMajorityNotEnabled,
- str::stream() << "Replica sets running protocol version 0 do not support "
- "readConcern: majority"});
- inPlaceReplyBob.doneFast();
- replyBuilder->setMetadata(rpc::makeEmptyMetadata());
- return result;
- }
+ Status rcStatus = waitForReadConcern(txn, readConcernArgsStatus.getValue());
+ if (!rcStatus.isOK()) {
+ if (rcStatus == ErrorCodes::ExceededTimeLimit) {
const int debugLevel =
- serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 1 : 2;
- LOG(debugLevel) << "Waiting for 'committed' snapshot to be available for reading: "
- << readConcernArgs;
- Status status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
-
- // Wait until a snapshot is available.
- while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) {
- LOG(debugLevel) << "Snapshot not available for readConcern: "
- << readConcernArgs;
- replCoord->waitUntilSnapshotCommitted(txn, SnapshotName::min());
- status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
- }
-
- LOG(debugLevel) << "Using 'committed' snapshot. " << CurOp::get(txn)->query();
-
- if (!status.isOK()) {
- auto result = appendCommandStatus(inPlaceReplyBob, status);
- inPlaceReplyBob.doneFast();
- replyBuilder->setMetadata(rpc::makeEmptyMetadata());
- return result;
- }
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2;
+ LOG(debugLevel) << "Command on database " << db
+ << " timed out waiting for read concern to be satisfied. Command: "
+ << getRedactedCopyForLogging(request.getCommandArgs());
}
- }
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) {
- uassert(ErrorCodes::FailedToParse,
- "afterOpTime not compatible with read concern level linearizable",
- readConcernArgs.getOpTime().isNull());
- uassert(ErrorCodes::NotMaster,
- "cannot satisfy linearizable read concern on non-primary node",
- replCoord->getMemberState().primary());
+ auto result = appendCommandStatus(inPlaceReplyBob, rcStatus);
+ inPlaceReplyBob.doneFast();
+ replyBuilder->setMetadata(rpc::makeEmptyMetadata());
+ return result;
}
}
- // run expects non-const bsonobj
- BSONObj cmd = request.getCommandArgs();
- // Implementation just forwards to the old method signature for now.
- std::string errmsg;
-
- // run expects const db std::string (can't bind to temporary)
- const std::string db = request.getDatabase().toString();
-
- StatusWith<WriteConcernOptions> wcResult =
- extractWriteConcern(txn, cmd, db, this->supportsWriteConcern(cmd));
-
+ auto wcResult = extractWriteConcern(txn, cmd, db, supportsWriteConcern(cmd));
if (!wcResult.isOK()) {
auto result = appendCommandStatus(inPlaceReplyBob, wcResult.getStatus());
inPlaceReplyBob.doneFast();
@@ -1563,17 +1476,18 @@ bool Command::run(OperationContext* txn,
return result;
}
+ std::string errmsg;
bool result;
- if (!this->supportsWriteConcern(cmd)) {
+ if (!supportsWriteConcern(cmd)) {
// TODO: remove queryOptions parameter from command's run method.
- result = this->run(txn, db, cmd, 0, errmsg, inPlaceReplyBob);
+ result = run(txn, db, cmd, 0, errmsg, inPlaceReplyBob);
} else {
// Change the write concern while running the command.
const auto oldWC = txn->getWriteConcern();
ON_BLOCK_EXIT([&] { txn->setWriteConcern(oldWC); });
txn->setWriteConcern(wcResult.getValue());
- result = this->run(txn, db, cmd, 0, errmsg, inPlaceReplyBob);
+ result = run(txn, db, cmd, 0, errmsg, inPlaceReplyBob);
// Nothing in run() should change the writeConcern.
dassert(txn->getWriteConcern().toBSON() == wcResult.getValue().toBSON());
@@ -1588,7 +1502,7 @@ bool Command::run(OperationContext* txn,
// SERVER-22421: This code is to ensure error response backwards compatibility with the
// user management commands. This can be removed in 3.6.
- if (!waitForWCStatus.isOK() && isUserManagementCommand(this->getName())) {
+ if (!waitForWCStatus.isOK() && isUserManagementCommand(getName())) {
BSONObj temp = inPlaceReplyBob.asTempObj().copy();
inPlaceReplyBob.resetToEmpty();
appendCommandStatus(inPlaceReplyBob, waitForWCStatus);
diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp
new file mode 100644
index 00000000000..f4baada6218
--- /dev/null
+++ b/src/mongo/db/read_concern.cpp
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/read_concern.h"
+
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/server_options.h"
+#include "mongo/db/server_parameters.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+// This is a special flag that allows for testing of snapshot behavior by skipping the replication
+// related checks and isolating the storage/query side of snapshotting.
+bool testingSnapshotBehaviorInIsolation = false;
+ExportedServerParameter<bool, ServerParameterType::kStartupOnly> TestingSnapshotBehaviorInIsolation(
+ ServerParameterSet::getGlobal(),
+ "testingSnapshotBehaviorInIsolation",
+ &testingSnapshotBehaviorInIsolation);
+
+} // namespace
+
+StatusWith<repl::ReadConcernArgs> extractReadConcern(OperationContext* txn,
+ const BSONObj& cmdObj,
+ bool supportsReadConcern) {
+ repl::ReadConcernArgs readConcernArgs;
+
+ auto readConcernParseStatus = readConcernArgs.initialize(cmdObj);
+ if (!readConcernParseStatus.isOK()) {
+ return readConcernParseStatus;
+ }
+
+ if (!supportsReadConcern && !readConcernArgs.isEmpty()) {
+ return {ErrorCodes::InvalidOptions,
+ str::stream() << "Command does not support read concern"};
+ }
+
+ return readConcernArgs;
+}
+
+Status waitForReadConcern(OperationContext* txn, const repl::ReadConcernArgs& readConcernArgs) {
+ repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(txn);
+
+ // Skip waiting for the OpTime when testing snapshot behavior
+ if (!testingSnapshotBehaviorInIsolation && !readConcernArgs.isEmpty()) {
+ Status status = replCoord->waitUntilOpTimeForRead(txn, readConcernArgs);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
+ if ((replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet ||
+ testingSnapshotBehaviorInIsolation) &&
+ (readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern ||
+ readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern)) {
+ // ReadConcern Majority is not supported in ProtocolVersion 0.
+ if (!testingSnapshotBehaviorInIsolation && !replCoord->isV1ElectionProtocol()) {
+ return {ErrorCodes::ReadConcernMajorityNotEnabled,
+ str::stream() << "Replica sets running protocol version 0 do not support "
+ "readConcern: majority"};
+ }
+
+ const int debugLevel = serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 1 : 2;
+
+ LOG(debugLevel) << "Waiting for 'committed' snapshot to be available for reading: "
+ << readConcernArgs;
+
+ Status status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
+
+ // Wait until a snapshot is available.
+ while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) {
+ LOG(debugLevel) << "Snapshot not available yet.";
+ replCoord->waitUntilSnapshotCommitted(txn, SnapshotName::min());
+ status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
+ }
+
+ if (!status.isOK()) {
+ return status;
+ }
+
+ LOG(debugLevel) << "Using 'committed' snapshot: " << CurOp::get(txn)->query();
+ }
+
+ if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) {
+ if (!readConcernArgs.getOpTime().isNull())
+ return {ErrorCodes::FailedToParse,
+ "afterOpTime not compatible with linearizable read concern"};
+
+ if (!replCoord->getMemberState().primary())
+ return {ErrorCodes::NotMaster,
+ "cannot satisfy linearizable read concern on non-primary node"};
+ }
+
+ return Status::OK();
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/read_concern.h b/src/mongo/db/read_concern.h
new file mode 100644
index 00000000000..35dd7f68b43
--- /dev/null
+++ b/src/mongo/db/read_concern.h
@@ -0,0 +1,58 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+namespace mongo {
+
+class BSONObj;
+class OperationContext;
+class Status;
+template <typename T>
+class StatusWith;
+
+namespace repl {
+class ReadConcernArgs;
+}
+
+/**
+ * Given the specified command and whether it supports read concern, returns an effective read
+ * concern which should be used.
+ */
+StatusWith<repl::ReadConcernArgs> extractReadConcern(OperationContext* txn,
+ const BSONObj& cmdObj,
+ bool supportsReadConcern);
+
+/**
+ * Given the specified read concern arguments, performs checks that the read concern can actually be
+ * satisfied given the current state of the server and if so calls into the replication subsystem to
+ * perform the wait.
+ */
+Status waitForReadConcern(OperationContext* txn, const repl::ReadConcernArgs& readConcernArgs);
+
+} // namespace mongo
diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp
index ac428b29183..a9c9adf1b99 100644
--- a/src/mongo/db/write_concern.cpp
+++ b/src/mongo/db/write_concern.cpp
@@ -26,7 +26,7 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
#include "mongo/platform/basic.h"