summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhari devaraj <hari.devaraj@10gen.com>2016-06-17 15:25:54 -0400
committerhari devaraj <hari.devaraj@10gen.com>2016-08-10 18:39:27 -0400
commita0b7e4fc8cf224505267b2fe589975ba36f49078 (patch)
tree875a3c4d912da243f05b722e863875a439162ca8
parentd9bb24e5f5af34862182424b890762cb2c3e9bc8 (diff)
downloadmongo-a0b7e4fc8cf224505267b2fe589975ba36f49078.tar.gz
SERVER-24497 Linearizable read concern implemented
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_legacy.yml3
-rw-r--r--jstests/replsets/linearizable_read_concern.js136
-rw-r--r--jstests/replsets/linearizable_read_concern_parsing.js57
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/commands/dbcommands.cpp61
-rw-r--r--src/mongo/db/read_concern.cpp54
-rw-r--r--src/mongo/db/read_concern.h8
7 files changed, 228 insertions, 92 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml
index 3008a80f977..e1c96106cc9 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml
@@ -8,7 +8,8 @@ selector:
- jstests/replsets/disallow_adding_initialized_node1.js
- jstests/replsets/disallow_adding_initialized_node2.js
- jstests/replsets/last_op_visible.js
- - jstests/replsets/linearizable_read_concern_parsing.js
+ # Linearizable reads not supported on PV0
+ - jstests/replsets/linearizable_read_concern.js
- jstests/replsets/oplog_truncated_on_recovery.js
- jstests/replsets/priority_takeover_cascading_priorities.js
- jstests/replsets/priority_takeover_one_node_higher_priority.js
diff --git a/jstests/replsets/linearizable_read_concern.js b/jstests/replsets/linearizable_read_concern.js
new file mode 100644
index 00000000000..a8e707fb556
--- /dev/null
+++ b/jstests/replsets/linearizable_read_concern.js
@@ -0,0 +1,136 @@
+/*
+ * This test creates a 3 node replica set and then performs a write
+ * with write concern majority to create a committed snapshot. The test first sends
+ * a regular linearizable read command which should succeed. Then the test
+ * examines linearizable read parsing abilities by sending a linearizable
+ * read command to a secondary and then to the primary with an 'afterOpTime'
+ * field, both of which should fail. The test then starts to test the actual
+ * functionality of linearizable reads by creating a network partition between the primary
+ * and the other two nodes and then sending in a linearizable read command.
+ * Finally we test whether the linearizable read command will block forever
+ * by issuing a linearizable read command in another thread on the still
+ * partitioned primary and then making the primary step down in the main
+ * thread after finding the issued noop. The secondary thread should throw
+ * an exception and exit.
+ */
+load('jstests/replsets/rslib.js');
+load('jstests/libs/parallelTester.js');
+load('jstests/libs/write_concern_util.js');
+(function() {
+ 'use strict';
+ var send_linearizable_read = function() {
+ // The primary will step down and throw an exception, which is expected.
+ var coll = db.getSiblingDB("test").foo;
+ jsTestLog('Sending in linearizable read in secondary thread');
+ // 'isMaster' ensures that the following command fails (and returns a response rather than
+ // an exception) before its connection is cut because of the primary step down. Refer to
+ // SERVER-24574.
+ assert.commandWorked(coll.runCommand({isMaster: 1, hangUpOnStepDown: false}));
+ assert.commandFailedWithCode(
+ coll.runCommand(
+ {'find': 'foo', readConcern: {level: "linearizable"}, maxTimeMS: 60000}),
+ ErrorCodes.InterruptedDueToReplStateChange);
+ };
+
+ var num_nodes = 3;
+ var name = 'linearizable_read_concern';
+ var replTest = new ReplSetTest({
+ name: name,
+ nodes: num_nodes,
+ useBridge: true,
+ nodeOptions: {enableMajorityReadConcern: ''}
+ });
+ if (!startSetIfSupportsReadMajority(replTest)) {
+ jsTest.log("skipping test since storage engine doesn't support committed reads");
+ return;
+ }
+ var config = replTest.getReplSetConfig();
+
+ // Increased election timeout to avoid having the primary step down while we are
+ // testing linearizable functionality on an isolated primary.
+ config.settings = {electionTimeoutMillis: 60000};
+ updateConfigIfNotDurable(config);
+ replTest.initiate(config);
+
+ replTest.awaitReplication();
+ var primary = replTest.getPrimary();
+ var secondaries = replTest.getSecondaries();
+
+ // We should have at least one successful write with write concern majority
+ // to get a committed snapshot.
+ assert.writeOK(primary.getDB("test").foo.insert(
+ {"number": 7}, {"writeConcern": {"w": "majority", "wtimeout": 60000}}));
+
+ jsTestLog("Testing linearizable readConcern parsing");
+ // This command is sent to the primary, and the primary is fully connected so it should work.
+ var goodRead = assert.writeOK(primary.getDB("test").runCommand(
+ {'find': 'foo', readConcern: {level: "linearizable"}, "maxTimeMS": 60000}));
+ assert.eq(goodRead.cursor.firstBatch[0].number, 7);
+
+ // This fails because you cannot have a linearizable read command sent to a secondary.
+ var badCmd = assert.commandFailed(secondaries[0].getDB("test").runCommand(
+ {"find": "foo", readConcern: {level: "linearizable"}, "maxTimeMS": 60000}));
+
+ assert.eq(badCmd.errmsg, "cannot satisfy linearizable read concern on non-primary node");
+ assert.eq(badCmd.code, ErrorCodes.NotMaster);
+
+ // This fails because you cannot specify 'afterOpTime' for linearizable read.
+ var opTimeCmd = assert.commandFailed(primary.getDB("test").runCommand({
+ "find": "foo",
+ readConcern: {level: "linearizable", "afterOpTime": {ts: Timestamp(1, 2), t: 1}},
+ "maxTimeMS": 60000
+ }));
+ assert.eq(opTimeCmd.errmsg, "afterOpTime not compatible with linearizable read concern");
+ assert.eq(opTimeCmd.code, ErrorCodes.FailedToParse);
+
+ primary = replTest.getPrimary();
+
+ jsTestLog("Starting linearizablility testing");
+ jsTestLog(
+ "Setting up partitions such that the primary is isolated: [Secondary-Secondary] [Primary]");
+ secondaries[0].disconnect(primary);
+ secondaries[1].disconnect(primary);
+
+ jsTestLog("Read with readConcern majority should still work when sent to the old primary");
+ var res = assert.writeOK(primary.getDB("test").runCommand(
+ {"find": "foo", readConcern: {level: "majority"}, "maxTimeMS": 60000}));
+ assert.eq(res.cursor.firstBatch[0].number, 7);
+
+ var result = primary.getDB("test").runCommand(
+ {"find": "foo", "readConcern": {level: "linearizable"}, "maxTimeMS": 3000});
+ assert.commandFailedWithCode(result, ErrorCodes.ExceededTimeLimit);
+
+ jsTestLog("Testing to make sure linearizable read command does not block forever.");
+
+ // Get last noop Optime before sending the linearizable read command
+ // to ensure that we are waiting for the most recent noop write.
+ var lastOpTimestamp = getLatestOp(primary).ts;
+
+ var parallelShell = startParallelShell(send_linearizable_read, primary.port);
+ // Sending a linearizable read implicitly replicates a noop to the secondaries. We need to find
+ // the most recently issued noop to ensure that we call stepdown during the recently
+ // issued linearizable read and not before the read (in the separate thread) has been called.
+ jsTestLog("Checking end of oplog for noop");
+ assert.soon(function() {
+ var isEarlierTimestamp = function(ts1, ts2) {
+ if (ts1.getTime() == ts2.getTime()) {
+ return ts1.getInc() < ts2.getInc();
+ }
+ return ts1.getTime() < ts2.getTime();
+ };
+ var latestOp = getLatestOp(primary);
+ if (latestOp.op == "n" && isEarlierTimestamp(lastOpTimestamp, latestOp.ts)) {
+ return true;
+ }
+
+ return false;
+ });
+ assert.eq(primary, replTest.getPrimary(), "Primary unexpectedly changed mid test.");
+ jsTestLog("Making Primary step down");
+ var stepDownException = assert.throws(function() {
+ var result = primary.adminCommand(
+ {"replSetStepDown": 100, secondaryCatchUpPeriodSecs: 0, "force": true});
+ print('replSetStepDown did not throw exception but returned: ' + tojson(result));
+ });
+ parallelShell();
+}()); \ No newline at end of file
diff --git a/jstests/replsets/linearizable_read_concern_parsing.js b/jstests/replsets/linearizable_read_concern_parsing.js
deleted file mode 100644
index 299b598e259..00000000000
--- a/jstests/replsets/linearizable_read_concern_parsing.js
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * This tests that commands passed with 'readConcern: linearizable' are parsed correctly. It
- * first expects a success on the primary node. Then it expects a failure when a
- * linearizable read concern is sent to the secondary node. Finally, it expects a
- * failure when the afterOpTime field is also provided.
- *
- */
-load("jstests/replsets/rslib.js");
-(function() {
-
- var replTest = new ReplSetTest({
- name: 'linearizable_read_concern_parsing',
- nodes: 3,
- nodeOptions: {enableMajorityReadConcern: ''}
- });
-
- if (!startSetIfSupportsReadMajority(replTest)) {
- jsTest.log("skipping test since storage engine doesn't support committed reads");
- return true;
- }
-
- var nodes = replTest.nodeList();
- var config = {
- "_id": 'linearizable_read_concern_parsing',
- "members": [
- {"_id": 0, "host": nodes[0]},
- {"_id": 1, "host": nodes[1], priority: 0},
- {"_id": 2, "host": nodes[2], arbiterOnly: true}
- ]
- };
- // to make sure the test works with in Memory storage
- updateConfigIfNotDurable(config);
- replTest.initiate(config);
-
- var primary = replTest.getPrimary();
- primary.getDB("test").foo.insert({number: 2});
-
- assert.commandWorked(
- primary.getDB("test").runCommand({'find': 'foo', readConcern: {level: "linearizable"}}));
-
- var secondary = replTest.getSecondary();
- var badCmd = assert.commandFailed(secondary.getDB("test").runCommand({
- 'find': 'foo',
- readConcern: {level: "linearizable"},
- }));
-
- assert.eq(badCmd.errmsg, "cannot satisfy linearizable read concern on non-primary node");
- assert.eq(badCmd.code, ErrorCodes.NotMaster);
-
- var opTimeCmd = assert.commandFailed(primary.getDB("test").runCommand({
- 'find': 'foo',
- readConcern: {level: "linearizable", 'afterOpTime': {ts: Timestamp(1, 2), t: 1}}
- }));
- assert.eq(opTimeCmd.errmsg, "afterOpTime not compatible with linearizable read concern");
- assert.eq(opTimeCmd.code, ErrorCodes.FailedToParse);
-
-}()); \ No newline at end of file
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 104ed4c8f84..2f12e0d9154 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -185,6 +185,7 @@ error_code("ClientMetadataMissingField", 183)
error_code("ClientMetadataAppNameTooLarge", 184)
error_code("ClientMetadataDocumentTooLarge", 185)
error_code("ClientMetadataCannotBeMutated", 186)
+error_code("LinearizableReadConcernError", 187)
# Non-sequential error codes (for compatibility only)
error_code("SocketException", 9001)
diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp
index 8fd2617062a..cc089ffea92 100644
--- a/src/mongo/db/commands/dbcommands.cpp
+++ b/src/mongo/db/commands/dbcommands.cpp
@@ -1445,33 +1445,30 @@ bool Command::run(OperationContext* txn,
const std::string db = request.getDatabase().toString();
BSONObjBuilder inPlaceReplyBob(replyBuilder->getInPlaceReplyBuilder(bytesToReserve));
+ auto readConcernArgsStatus = extractReadConcern(txn, cmd, supportsReadConcern());
- {
- auto readConcernArgsStatus = extractReadConcern(txn, cmd, supportsReadConcern());
- if (!readConcernArgsStatus.isOK()) {
- auto result = appendCommandStatus(inPlaceReplyBob, readConcernArgsStatus.getStatus());
- inPlaceReplyBob.doneFast();
- replyBuilder->setMetadata(rpc::makeEmptyMetadata());
- return result;
- }
-
- Status rcStatus = waitForReadConcern(txn, readConcernArgsStatus.getValue());
- if (!rcStatus.isOK()) {
- if (rcStatus == ErrorCodes::ExceededTimeLimit) {
- const int debugLevel =
- serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2;
- LOG(debugLevel) << "Command on database " << db
- << " timed out waiting for read concern to be satisfied. Command: "
- << getRedactedCopyForLogging(request.getCommandArgs());
- }
+ if (!readConcernArgsStatus.isOK()) {
+ auto result = appendCommandStatus(inPlaceReplyBob, readConcernArgsStatus.getStatus());
+ inPlaceReplyBob.doneFast();
+ replyBuilder->setMetadata(rpc::makeEmptyMetadata());
+ return result;
+ }
- auto result = appendCommandStatus(inPlaceReplyBob, rcStatus);
- inPlaceReplyBob.doneFast();
- replyBuilder->setMetadata(rpc::makeEmptyMetadata());
- return result;
+ Status rcStatus = waitForReadConcern(txn, readConcernArgsStatus.getValue());
+ if (!rcStatus.isOK()) {
+ if (rcStatus == ErrorCodes::ExceededTimeLimit) {
+ const int debugLevel =
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2;
+ LOG(debugLevel) << "Command on database " << db
+ << " timed out waiting for read concern to be satisfied. Command: "
+ << getRedactedCopyForLogging(request.getCommandArgs());
}
- }
+ auto result = appendCommandStatus(inPlaceReplyBob, rcStatus);
+ inPlaceReplyBob.doneFast();
+ replyBuilder->setMetadata(rpc::makeEmptyMetadata());
+ return result;
+ }
auto wcResult = extractWriteConcern(txn, cmd, db, supportsWriteConcern(cmd));
if (!wcResult.isOK()) {
auto result = appendCommandStatus(inPlaceReplyBob, wcResult.getStatus());
@@ -1479,7 +1476,6 @@ bool Command::run(OperationContext* txn,
replyBuilder->setMetadata(rpc::makeEmptyMetadata());
return result;
}
-
std::string errmsg;
bool result;
if (!supportsWriteConcern(cmd)) {
@@ -1514,6 +1510,23 @@ bool Command::run(OperationContext* txn,
}
}
+ // When a linearizable read command is passed in, check to make sure we're reading
+ // from the primary.
+ if (supportsReadConcern() && (readConcernArgsStatus.getValue().getLevel() ==
+ repl::ReadConcernLevel::kLinearizableReadConcern) &&
+ (request.getCommandName() != "getMore")) {
+
+ auto linearizableReadStatus = waitForLinearizableReadConcern(txn);
+
+ if (!linearizableReadStatus.isOK()) {
+ inPlaceReplyBob.resetToEmpty();
+ auto result = appendCommandStatus(inPlaceReplyBob, linearizableReadStatus);
+ inPlaceReplyBob.doneFast();
+ replyBuilder->setMetadata(rpc::makeEmptyMetadata());
+ return result;
+ }
+ }
+
appendCommandStatus(inPlaceReplyBob, result, errmsg);
inPlaceReplyBob.doneFast();
diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp
index f4baada6218..d5354c62945 100644
--- a/src/mongo/db/read_concern.cpp
+++ b/src/mongo/db/read_concern.cpp
@@ -35,9 +35,12 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/db/commands.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
+#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
@@ -77,6 +80,16 @@ StatusWith<repl::ReadConcernArgs> extractReadConcern(OperationContext* txn,
Status waitForReadConcern(OperationContext* txn, const repl::ReadConcernArgs& readConcernArgs) {
repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(txn);
+ if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) {
+ if (!readConcernArgs.getOpTime().isNull())
+ return {ErrorCodes::FailedToParse,
+ "afterOpTime not compatible with linearizable read concern"};
+
+ if (!replCoord->getMemberState().primary())
+ return {ErrorCodes::NotMaster,
+ "cannot satisfy linearizable read concern on non-primary node"};
+ }
+
// Skip waiting for the OpTime when testing snapshot behavior
if (!testingSnapshotBehaviorInIsolation && !readConcernArgs.isEmpty()) {
Status status = replCoord->waitUntilOpTimeForRead(txn, readConcernArgs);
@@ -117,17 +130,40 @@ Status waitForReadConcern(OperationContext* txn, const repl::ReadConcernArgs& re
LOG(debugLevel) << "Using 'committed' snapshot: " << CurOp::get(txn)->query();
}
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) {
- if (!readConcernArgs.getOpTime().isNull())
- return {ErrorCodes::FailedToParse,
- "afterOpTime not compatible with linearizable read concern"};
+ return Status::OK();
+}
- if (!replCoord->getMemberState().primary())
- return {ErrorCodes::NotMaster,
- "cannot satisfy linearizable read concern on non-primary node"};
- }
+Status waitForLinearizableReadConcern(OperationContext* txn) {
- return Status::OK();
+ repl::ReplicationCoordinator* replCoord =
+ repl::ReplicationCoordinator::get(txn->getClient()->getServiceContext());
+
+ {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
+ Lock::CollectionLock lock(txn->lockState(), "local.oplog.rs", MODE_IX);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+
+ WriteUnitOfWork uow(txn);
+ txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage(
+ txn,
+ BSON("msg"
+ << "linearizable read"));
+ uow.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ txn, "waitForLinearizableReadConcern", "local.rs.oplog");
+ }
+ WriteConcernOptions wc = WriteConcernOptions(
+ WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0);
+
+ repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
+ auto awaitReplResult = replCoord->awaitReplication(txn, lastOpApplied, wc);
+ if (awaitReplResult.status == ErrorCodes::WriteConcernFailed) {
+ return Status(ErrorCodes::LinearizableReadConcernError,
+ "Failed to confirm that read was linearizable.");
+ }
+ return awaitReplResult.status;
}
} // namespace mongo
diff --git a/src/mongo/db/read_concern.h b/src/mongo/db/read_concern.h
index 35dd7f68b43..695be4d1b8c 100644
--- a/src/mongo/db/read_concern.h
+++ b/src/mongo/db/read_concern.h
@@ -35,11 +35,11 @@ class OperationContext;
class Status;
template <typename T>
class StatusWith;
-
namespace repl {
class ReadConcernArgs;
}
+
/**
* Given the specified command and whether it supports read concern, returns an effective read
* concern which should be used.
@@ -55,4 +55,10 @@ StatusWith<repl::ReadConcernArgs> extractReadConcern(OperationContext* txn,
*/
Status waitForReadConcern(OperationContext* txn, const repl::ReadConcernArgs& readConcernArgs);
+/*
+ * Given a linearizable read command, confirm that
+ * current primary is still the true primary of the replica set.
+ */
+Status waitForLinearizableReadConcern(OperationContext* txn);
+
} // namespace mongo