summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2015-04-20 15:36:09 -0400
committerRandolph Tan <randolph@10gen.com>2015-05-11 10:09:05 -0400
commit9f0ceef0b37df2525cdebb172e6b05e2db8a2b20 (patch)
tree11fa779defcac750089916964aeee605d3330a17
parentb73ab5765fb75ae87ee5ad0f4afbc4fdfc2bc151 (diff)
downloadmongo-9f0ceef0b37df2525cdebb172e6b05e2db8a2b20.tar.gz
SERVER-18195 Read after optime (repl only)
-rw-r--r--buildscripts/resmokeconfig/suites/core_small_oplog_rs.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml2
-rwxr-xr-xbuildscripts/smoke.py15
-rw-r--r--jstests/core/group_empty.js6
-rw-r--r--jstests/core/read_after_optime.js22
-rw-r--r--jstests/replsets/drop_oplog.js14
-rw-r--r--jstests/replsets/read_after_optime_timeout.js105
-rw-r--r--jstests/slow2/sharding_jscore_passthrough.js5
-rw-r--r--src/mongo/base/error_codes.err3
-rw-r--r--src/mongo/db/dbcommands.cpp36
-rw-r--r--src/mongo/db/operation_context.h2
-rw-r--r--src/mongo/db/operation_context_impl.cpp4
-rw-r--r--src/mongo/db/operation_context_impl.h30
-rw-r--r--src/mongo/db/operation_context_noop.h32
-rw-r--r--src/mongo/db/query/SConscript1
-rw-r--r--src/mongo/db/query/lite_parsed_query.cpp6
-rw-r--r--src/mongo/db/repl/SConscript28
-rw-r--r--src/mongo/db/repl/operation_context_repl_mock.cpp44
-rw-r--r--src/mongo/db/repl/operation_context_repl_mock.h22
-rw-r--r--src/mongo/db/repl/read_after_optime_args.cpp134
-rw-r--r--src/mongo/db/repl/read_after_optime_args.h78
-rw-r--r--src/mongo/db/repl/read_after_optime_args_test.cpp178
-rw-r--r--src/mongo/db/repl/read_after_optime_response.cpp87
-rw-r--r--src/mongo/db/repl/read_after_optime_response.h91
-rw-r--r--src/mongo/db/repl/read_after_optime_response_test.cpp82
-rw-r--r--src/mongo/db/repl/replication_coordinator.h19
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp102
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h111
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp226
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h4
31 files changed, 1347 insertions, 151 deletions
diff --git a/buildscripts/resmokeconfig/suites/core_small_oplog_rs.yml b/buildscripts/resmokeconfig/suites/core_small_oplog_rs.yml
index 2d44c5675e2..7dda737496a 100644
--- a/buildscripts/resmokeconfig/suites/core_small_oplog_rs.yml
+++ b/buildscripts/resmokeconfig/suites/core_small_oplog_rs.yml
@@ -11,6 +11,7 @@ selector:
- jstests/core/dropdb_race.js
- jstests/core/opcounters_write_cmd.js
- jstests/core/rename.js
+ - jstests/core/read_after_optime.js
executor:
js_test:
diff --git a/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml
index 86ef6bdc3d3..842f4b5d5bd 100644
--- a/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml
@@ -45,6 +45,8 @@ selector:
- jstests/core/top.js
- jstests/core/dbcase.js # SERVER-11735
- jstests/core/dbcase2.js # SERVER-11735
+ # TODO: SERVER-17284 remove once find cmd is implemented in mongos
+ - jstests/core/read_after_optime.js
executor:
js_test:
diff --git a/buildscripts/smoke.py b/buildscripts/smoke.py
index faa37622857..93fa2b38ad3 100755
--- a/buildscripts/smoke.py
+++ b/buildscripts/smoke.py
@@ -443,10 +443,19 @@ def skipTest(path):
parentPath = os.path.dirname(path)
parentDir = os.path.basename(parentPath)
if small_oplog or small_oplog_rs: # For tests running in parallel
- if basename in ["cursor8.js", "indexh.js", "dropdb.js", "dropdb_race.js",
- "connections_opened.js", "opcounters_write_cmd.js", "dbadmin.js",
+ if basename in ["cursor8.js",
+ "indexh.js",
+ "dropdb.js",
+ "dropdb_race.js",
+ "connections_opened.js",
+ "opcounters_write_cmd.js",
+ "dbadmin.js",
+ # Should not run in repl mode:
+ "read_after_optime.js",
## Capped tests
- "capped_max1.js", "capped_convertToCapped1.js", "rename.js"]:
+ "capped_max1.js",
+ "capped_convertToCapped1.js",
+ "rename.js"]:
return True
if auth or keyFile: # For tests running with auth
# Skip any tests that run with auth explicitly
diff --git a/jstests/core/group_empty.js b/jstests/core/group_empty.js
index 62a734ed0f8..c1772f88b22 100644
--- a/jstests/core/group_empty.js
+++ b/jstests/core/group_empty.js
@@ -5,4 +5,8 @@ t.drop();
res1 = db.runCommand({group: {$reduce: function(){}, ns: 'group_empty', cond: {}, key: {}, initial: {count: 0}}});
t.ensureIndex( { x : 1 } );
res2 = db.runCommand({group: {$reduce: function(){}, ns: 'group_empty', cond: {}, key: {}, initial: {count: 0}}});
-assert.eq( res1, res2 );
+
+assert.docEq(res1.retval, res2.retval);
+assert.eq(res1.keys, res2.keys);
+assert.eq(res1.count, res2.count);
+
diff --git a/jstests/core/read_after_optime.js b/jstests/core/read_after_optime.js
new file mode 100644
index 00000000000..d185b660887
--- /dev/null
+++ b/jstests/core/read_after_optime.js
@@ -0,0 +1,22 @@
+// Test that attempting to read after optime fails if replication is not enabled.
+
+(function() {
+"use strict";
+
+var currentTime = new Date();
+
+var futureOpTime = new Timestamp((currentTime / 1000 + 3600), 0);
+
+var res = assert.commandFailed(db.runCommand({
+ find: 'user',
+ filter: { x: 1 },
+ after: {
+ opTime: { ts: futureOpTime, term: 0 }
+ }
+}));
+
+assert.eq(123, res.code); // ErrorCodes::NotAReplicaSet
+assert.eq(null, res.waitedMS);
+
+})();
+
diff --git a/jstests/replsets/drop_oplog.js b/jstests/replsets/drop_oplog.js
index 96c7582c4d9..90c920e1b27 100644
--- a/jstests/replsets/drop_oplog.js
+++ b/jstests/replsets/drop_oplog.js
@@ -10,16 +10,10 @@
var ml = master.getDB( 'local' );
var threw = false;
- try {
- ml.oplog.rs.drop();
- }
- catch (err) {
- assert.eq(err,
- "Error: drop failed: { \"ok\" : 0, \"errmsg\" : " +
- "\"can't drop live oplog while replicating\" }");
- threw = true;
- }
- assert(threw);
+
+ var ret = assert.commandFailed(ml.runCommand({ drop: 'oplog.rs' }));
+ assert.eq('can\'t drop live oplog while replicating', ret.errmsg);
+
var dropOutput = ml.dropDatabase();
assert.eq(dropOutput.ok, 0);
assert.eq(dropOutput.errmsg, "Cannot drop 'local' database while replication is active");
diff --git a/jstests/replsets/read_after_optime_timeout.js b/jstests/replsets/read_after_optime_timeout.js
new file mode 100644
index 00000000000..0efee4fd8da
--- /dev/null
+++ b/jstests/replsets/read_after_optime_timeout.js
@@ -0,0 +1,105 @@
+// Test read after opTime functionality with maxTimeMS.
+
+(function() {
+"use strict";
+
+var replTest = new ReplSetTest({ nodes: 2 });
+replTest.startSet();
+
+var config = replTest.getReplSetConfig();
+// TODO: SERVER-18298 uncomment once implemented.
+//config.protocolVersion = 1;
+replTest.initiate(config);
+
+var runTest = function(testDB, primaryConn) {
+ primaryConn.getDB('test').user.insert({ x: 1 }, { writeConcern: { w: 2 }});
+
+ var localDB = primaryConn.getDB('local');
+
+ var oplogTS = localDB.oplog.rs.find().sort({ $natural: -1 }).limit(1).next().ts;
+ var twoSecTS = new Timestamp(oplogTS.getTime() + 2, 0);
+
+ // Test timeout with maxTimeMS < after.opTime
+ var res = assert.commandFailed(testDB.runCommand({
+ find: 'user',
+ filter: { x: 1 },
+ after: {
+ opTime: { ts: twoSecTS, term: 0 },
+ timeoutMS: 10 * 1000
+ },
+ maxTimeMS: 1000
+ }));
+
+ assert.eq(50, res.code); // ErrorCodes::ExceededTimeLimit
+ assert.gt(res.waitedMS, 500);
+ assert.lt(res.waitedMS, 2500);
+
+ // Test timeout with after.opTime < maxTimeMS
+ res = assert.commandFailed(testDB.runCommand({
+ find: 'user',
+ filter: { x: 1 },
+ after: {
+ opTime: { ts: twoSecTS, term: 0 },
+ timeoutMS: 1 * 1000
+ },
+ maxTimeMS: 10 * 1000
+ }));
+
+ assert.eq(122, res.code); // ErrorCodes::ReadAfterOptimeTimeout
+ assert.gt(res.waitedMS, 500);
+ assert.lt(res.waitedMS, 2500);
+
+ // Test timeout with maxTimeMS, no after.timeout
+ res = assert.commandFailed(testDB.runCommand({
+ find: 'user',
+ filter: { x: 1 },
+ after: {
+ opTime: { ts: twoSecTS, term: 0 }
+ },
+ maxTimeMS: 1 * 1000
+ }));
+
+ assert.eq(50, res.code); // ErrorCodes::ExceededTimeLimit
+ assert.gt(res.waitedMS, 500);
+ assert.lt(res.waitedMS, 2500);
+
+ // Test timeout with after.opTime, no maxTimeMS
+ res = assert.commandFailed(testDB.runCommand({
+ find: 'user',
+ filter: { x: 1 },
+ after: {
+ opTime: { ts: twoSecTS, term: 0 },
+ timeoutMS: 1 * 1000
+ }
+ }));
+
+ assert.eq(122, res.code); // ErrorCodes::ReadAfterOptimeTimeout
+ assert.gt(res.waitedMS, 500);
+ assert.lt(res.waitedMS, 2500);
+
+ // Test read on future opTime that will eventually occur.
+ var insertFunc = startParallelShell(
+ "sleep(2100); db.user.insert({ y: 1 }, { writeConcern: { w: 2 }});",
+ primaryConn.port);
+
+ res = assert.commandWorked(testDB.runCommand({
+ find: 'user',
+ filter: { x: 1 },
+ after: {
+ opTime: { ts: twoSecTS, term: 0 }
+ }
+ }));
+
+ assert.eq(null, res.code);
+ assert.gt(res.waitedMS, 0);
+
+ insertFunc();
+};
+
+var primary = replTest.getPrimary();
+runTest(primary.getDB('test'), primary);
+runTest(replTest.getSecondary().getDB('test'), primary);
+
+replTest.stopSet();
+
+})();
diff --git a/jstests/slow2/sharding_jscore_passthrough.js b/jstests/slow2/sharding_jscore_passthrough.js
index 44598d4ddc5..04bcd73235c 100644
--- a/jstests/slow2/sharding_jscore_passthrough.js
+++ b/jstests/slow2/sharding_jscore_passthrough.js
@@ -86,7 +86,10 @@ var db;
'auth1|' +
'auth2|' +
'dropdb_race|' +
- 'unix_socket\\d*' +
+ 'unix_socket\\d*|' +
+ // TODO: SERVER-17284 remove once find cmd is
+ // implemented in mongos
+ 'read_after_optime' +
')\.js$');
// These are bugs (some might be fixed now):
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 44cc990045a..8bb00b55e39 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -121,6 +121,9 @@ error_code("NamespaceNotSharded", 118)
error_code("InvalidSyncSource", 119)
error_code("OplogStartMissing", 120)
error_code("DocumentValidationFailure", 121) # Only for the document validator on collections.
+error_code("ReadAfterOptimeTimeout", 122)
+error_code("NotAReplicaSet", 123)
+error_code("IncompatibleElectionProtocol", 124)
# Non-sequential error codes (for compatibility only)
error_code("NotMaster", 10107) #this comes from assert_util.h
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index 8bbe2b4ea35..89159347ed0 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -82,6 +82,9 @@
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/repair_database.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/read_after_optime_args.h"
+#include "mongo/db/repl/read_after_optime_response.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator_global.h"
@@ -1186,14 +1189,14 @@ namespace mongo {
bool _impersonation;
};
- namespace {
- void appendGLEHelperData(BSONObjBuilder& bob, const Timestamp& opTime, const OID& oid) {
- BSONObjBuilder subobj(bob.subobjStart(kGLEStatsFieldName));
- subobj.append(kGLEStatsLastOpTimeFieldName, opTime);
- subobj.appendOID(kGLEStatsElectionIdFieldName, const_cast<OID*>(&oid));
- subobj.done();
- }
+namespace {
+ void appendGLEHelperData(BSONObjBuilder& bob, const Timestamp& opTime, const OID& oid) {
+ BSONObjBuilder subobj(bob.subobjStart(kGLEStatsFieldName));
+ subobj.append(kGLEStatsLastOpTimeFieldName, opTime);
+ subobj.appendOID(kGLEStatsElectionIdFieldName, const_cast<OID*>(&oid));
+ subobj.done();
}
+} // unnamed namespace
/**
* this handles
@@ -1334,6 +1337,25 @@ namespace mongo {
c->_commandsExecuted.increment();
+ {
+ // Handle read after opTime.
+
+ repl::ReadAfterOpTimeArgs readAfterOptimeSettings;
+ auto readAfterParseStatus = readAfterOptimeSettings.initialize(cmdObj);
+ if (!readAfterParseStatus.isOK()) {
+ appendCommandStatus(result, readAfterParseStatus);
+ return;
+ }
+
+ auto readAfterResult = replCoord->waitUntilOpTime(txn, readAfterOptimeSettings);
+ readAfterResult.appendInfo(&result);
+
+ if (!readAfterResult.getStatus().isOK()) {
+ appendCommandStatus(result, readAfterResult.getStatus());
+ return;
+ }
+ }
+
retval = _execCommand(txn, c, dbname, cmdObj, queryOptions, errmsg, result);
if ( !retval ){
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index a5815f9d4eb..9a94d762eed 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -120,6 +120,8 @@ namespace mongo {
*/
virtual CurOp* getCurOp() const = 0;
+ virtual uint64_t getRemainingMaxTimeMicros() const = 0;
+
/**
* Returns the operation ID associated with this operation.
* WARNING: Due to SERVER-14995, this OpID is not guaranteed to stay the same for the
diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp
index bd29ce23251..6936e32df02 100644
--- a/src/mongo/db/operation_context_impl.cpp
+++ b/src/mongo/db/operation_context_impl.cpp
@@ -134,6 +134,10 @@ namespace {
return getCurOp()->opNum();
}
+ uint64_t OperationContextImpl::getRemainingMaxTimeMicros() const {
+ return getCurOp()->getRemainingMaxTimeMicros();
+ }
+
// Enabling the checkForInterruptFail fail point will start a game of random chance on the
// connection specified in the fail point data, generating an interrupt with a given fixed
// probability. Example invocation:
diff --git a/src/mongo/db/operation_context_impl.h b/src/mongo/db/operation_context_impl.h
index 735e59db6e4..5f9f9f1903a 100644
--- a/src/mongo/db/operation_context_impl.h
+++ b/src/mongo/db/operation_context_impl.h
@@ -40,34 +40,36 @@ namespace mongo {
virtual ~OperationContextImpl();
- virtual RecoveryUnit* recoveryUnit() const;
+ virtual RecoveryUnit* recoveryUnit() const override;
- virtual RecoveryUnit* releaseRecoveryUnit();
+ virtual RecoveryUnit* releaseRecoveryUnit() override;
- virtual void setRecoveryUnit(RecoveryUnit* unit);
+ virtual void setRecoveryUnit(RecoveryUnit* unit) override;
- virtual Locker* lockState() const;
+ virtual Locker* lockState() const override;
virtual ProgressMeter* setMessage(const char* msg,
const std::string& name,
unsigned long long progressMeterTotal,
- int secondsBetween);
+ int secondsBetween) override;
- virtual std::string getNS() const;
+ virtual std::string getNS() const override;
- virtual Client* getClient() const;
+ virtual Client* getClient() const override;
- virtual CurOp* getCurOp() const;
+ virtual CurOp* getCurOp() const override;
- virtual unsigned int getOpID() const;
+ virtual unsigned int getOpID() const override;
- virtual void checkForInterrupt() const;
- virtual Status checkForInterruptNoAssert() const;
+ virtual uint64_t getRemainingMaxTimeMicros() const override;
- virtual bool isPrimaryFor( StringData ns );
+ virtual void checkForInterrupt() const override;
+ virtual Status checkForInterruptNoAssert() const override;
- virtual void setReplicatedWrites(bool writesAreReplicated = true);
- virtual bool writesAreReplicated() const;
+ virtual bool isPrimaryFor( StringData ns ) override;
+
+ virtual void setReplicatedWrites(bool writesAreReplicated = true) override;
+ virtual bool writesAreReplicated() const override;
private:
std::auto_ptr<RecoveryUnit> _recovery;
diff --git a/src/mongo/db/operation_context_noop.h b/src/mongo/db/operation_context_noop.h
index 0b2650531ce..f27f3d1ef96 100644
--- a/src/mongo/db/operation_context_noop.h
+++ b/src/mongo/db/operation_context_noop.h
@@ -53,62 +53,66 @@ namespace mongo {
virtual ~OperationContextNoop() { }
- virtual Client* getClient() const {
+ virtual Client* getClient() const override {
invariant(false);
return NULL;
}
- virtual CurOp* getCurOp() const {
+ virtual CurOp* getCurOp() const override {
invariant(false);
return NULL;
}
- virtual RecoveryUnit* recoveryUnit() const {
+ virtual RecoveryUnit* recoveryUnit() const override {
return _recoveryUnit.get();
}
- virtual RecoveryUnit* releaseRecoveryUnit() {
+ virtual RecoveryUnit* releaseRecoveryUnit() override {
return _recoveryUnit.release();
}
- virtual void setRecoveryUnit(RecoveryUnit* unit) {
+ virtual void setRecoveryUnit(RecoveryUnit* unit) override {
_recoveryUnit.reset(unit);
}
- virtual Locker* lockState() const {
+ virtual Locker* lockState() const override {
return _locker.get();
}
virtual ProgressMeter* setMessage(const char * msg,
const std::string &name,
unsigned long long progressMeterTotal,
- int secondsBetween) {
+ int secondsBetween) override {
return &_pm;
}
- virtual void checkForInterrupt() const { }
- virtual Status checkForInterruptNoAssert() const {
+ virtual void checkForInterrupt() const override { }
+ virtual Status checkForInterruptNoAssert() const override {
return Status::OK();
}
- virtual bool isPrimaryFor( StringData ns ) {
+ virtual bool isPrimaryFor( StringData ns ) override {
return true;
}
- virtual std::string getNS() const {
+ virtual std::string getNS() const override {
return std::string();
};
- virtual unsigned int getOpID() const {
+ virtual unsigned int getOpID() const override {
return 0;
}
- void setReplicatedWrites(bool writesAreReplicated = true) {}
+ void setReplicatedWrites(bool writesAreReplicated = true) override {}
- bool writesAreReplicated() const {
+ bool writesAreReplicated() const override {
return false;
}
+ virtual uint64_t getRemainingMaxTimeMicros() const override {
+ return 0;
+ }
+
private:
std::auto_ptr<RecoveryUnit> _recoveryUnit;
boost::scoped_ptr<Locker> _locker;
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 6043b457f46..76fafe780b3 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -134,6 +134,7 @@ env.Library(
],
LIBDEPS=[
"$BUILD_DIR/mongo/bson/bson",
+ "$BUILD_DIR/mongo/db/repl/read_after_optime_args",
],
)
diff --git a/src/mongo/db/query/lite_parsed_query.cpp b/src/mongo/db/query/lite_parsed_query.cpp
index 6d0bf7aabec..f186cce4a80 100644
--- a/src/mongo/db/query/lite_parsed_query.cpp
+++ b/src/mongo/db/query/lite_parsed_query.cpp
@@ -31,6 +31,7 @@
#include <cmath>
#include "mongo/db/dbmessage.h"
+#include "mongo/db/repl/read_after_optime_args.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/mongoutils/str.h"
@@ -327,6 +328,11 @@ namespace mongo {
<< "You may need to update your shell or driver.");
}
}
+ else if (mongoutils::str::equals(fieldName,
+ repl::ReadAfterOpTimeArgs::kRootFieldName.c_str())) {
+ // read after optime parsing is handled elsewhere.
+ continue;
+ }
else {
mongoutils::str::stream ss;
ss << "Failed to parse: " << cmdObj.toString() << ". "
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index fbb37662882..8eb73d7f8ba 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -246,11 +246,21 @@ env.Library('replmocks',
'replication_executor',
])
+env.Library('read_after_optime_args',
+ [
+ 'read_after_optime_args.cpp'
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base/base',
+ '$BUILD_DIR/mongo/bson/util/bson_extract',
+ ])
+
env.Library('replica_set_messages',
[
'handshake_args.cpp',
'is_master_response.cpp',
'member_config.cpp',
+ 'read_after_optime_response.cpp',
'repl_set_declare_election_winner_args.cpp',
'repl_set_heartbeat_args.cpp',
'repl_set_heartbeat_args_v1.cpp',
@@ -264,6 +274,7 @@ env.Library('replica_set_messages',
'last_vote.cpp',
],
LIBDEPS=[
+ 'read_after_optime_args',
'$BUILD_DIR/mongo/bson/bson',
'$BUILD_DIR/mongo/bson/util/bson_extract',
'$BUILD_DIR/mongo/util/net/hostandport',
@@ -449,3 +460,20 @@ env.CppUnitTest(
'task_runner_test_fixture',
],
)
+
+env.CppUnitTest(
+ target='read_after_optime_args_test',
+ source=[
+ 'read_after_optime_args_test.cpp',
+ ],
+ LIBDEPS=['replica_set_messages']
+)
+
+env.CppUnitTest(
+ target='read_after_optime_response_test',
+ source=[
+ 'read_after_optime_response_test.cpp',
+ ],
+ LIBDEPS=['replica_set_messages']
+)
+
diff --git a/src/mongo/db/repl/operation_context_repl_mock.cpp b/src/mongo/db/repl/operation_context_repl_mock.cpp
index b5c44316261..e1730eacf1c 100644
--- a/src/mongo/db/repl/operation_context_repl_mock.cpp
+++ b/src/mongo/db/repl/operation_context_repl_mock.cpp
@@ -36,10 +36,50 @@
namespace mongo {
namespace repl {
- OperationContextReplMock::OperationContextReplMock()
- : _lockState(new MMAPV1LockerImpl()) { }
+ OperationContextReplMock::OperationContextReplMock():
+ _lockState(new MMAPV1LockerImpl()),
+ _opID(0),
+ _checkForInterruptStatus(Status::OK()),
+ _maxTimeMicrosRemaining(0) {
+ }
OperationContextReplMock::~OperationContextReplMock() {}
+ Locker* OperationContextReplMock::lockState() const {
+ return _lockState.get();
+ }
+
+ unsigned int OperationContextReplMock::getOpID() const {
+ return _opID;
+ }
+
+ void OperationContextReplMock::setOpID(unsigned int opID) {
+ _opID = opID;
+ }
+
+ void OperationContextReplMock::checkForInterrupt() const {
+ uassertStatusOK(checkForInterruptNoAssert());
+ }
+
+ Status OperationContextReplMock::checkForInterruptNoAssert() const {
+ if (!_checkForInterruptStatus.isOK()) {
+ return _checkForInterruptStatus;
+ }
+
+ return Status::OK();
+ }
+
+ void OperationContextReplMock::setCheckForInterruptStatus(Status status) {
+ _checkForInterruptStatus = std::move(status);
+ }
+
+ uint64_t OperationContextReplMock::getRemainingMaxTimeMicros() const {
+ return _maxTimeMicrosRemaining;
+ }
+
+ void OperationContextReplMock::setRemainingMaxTimeMicros(uint64_t micros) {
+ _maxTimeMicrosRemaining = micros;
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/operation_context_repl_mock.h b/src/mongo/db/repl/operation_context_repl_mock.h
index 660fbdfc5db..fb53dff2e3c 100644
--- a/src/mongo/db/repl/operation_context_repl_mock.h
+++ b/src/mongo/db/repl/operation_context_repl_mock.h
@@ -40,16 +40,36 @@ namespace repl {
/**
* Mock implementation of OperationContext that can be used with real instances of LockManager.
+ * Note this is not thread safe and the setter methods should only be called in the context
+ * where access to this object is guaranteed to be serialized.
*/
class OperationContextReplMock : public OperationContextNoop {
public:
OperationContextReplMock();
virtual ~OperationContextReplMock();
- virtual Locker* lockState() const { return _lockState.get(); }
+ virtual Locker* lockState() const override;
+
+ virtual unsigned int getOpID() const override;
+
+ void setOpID(unsigned int opID);
+
+ virtual void checkForInterrupt() const override;
+
+ virtual Status checkForInterruptNoAssert() const override;
+
+ void setCheckForInterruptStatus(Status status);
+
+ virtual uint64_t getRemainingMaxTimeMicros() const override;
+
+ void setRemainingMaxTimeMicros(uint64_t micros);
private:
boost::scoped_ptr<Locker> _lockState;
+ unsigned int _opID;
+
+ Status _checkForInterruptStatus;
+ uint64_t _maxTimeMicrosRemaining;
};
} // namespace repl
diff --git a/src/mongo/db/repl/read_after_optime_args.cpp b/src/mongo/db/repl/read_after_optime_args.cpp
new file mode 100644
index 00000000000..b17c24f23fa
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_args.cpp
@@ -0,0 +1,134 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/read_after_optime_args.h"
+
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/util/mongoutils/str.h"
+
+using std::string;
+
+namespace mongo {
+namespace repl {
+
+ const string ReadAfterOpTimeArgs::kRootFieldName("after");
+ const string ReadAfterOpTimeArgs::kOpTimeFieldName("opTime");
+ const string ReadAfterOpTimeArgs::kOpTimestampFieldName("ts");
+ const string ReadAfterOpTimeArgs::kOpTermFieldName("term");
+ const string ReadAfterOpTimeArgs::kTimeoutFieldName("timeoutMS");
+
+ ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(): ReadAfterOpTimeArgs(OpTime(), Milliseconds(0)) {
+ }
+
+ ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(OpTime opTime, Milliseconds timeout):
+ _opTime(std::move(opTime)),
+ _timeout(std::move(timeout)) {
+ }
+
+ const OpTime& ReadAfterOpTimeArgs::getOpTime() const {
+ return _opTime;
+ }
+
+ const Milliseconds& ReadAfterOpTimeArgs::getTimeout() const {
+ return _timeout;
+ }
+
+ Status ReadAfterOpTimeArgs::initialize(const BSONObj& cmdObj) {
+ auto afterElem = cmdObj[ReadAfterOpTimeArgs::kRootFieldName];
+
+ if (afterElem.eoo()) {
+ return Status::OK();
+ }
+
+ if (!afterElem.isABSONObj()) {
+ return Status(ErrorCodes::FailedToParse, "'after' field should be an object");
+ }
+
+ BSONObj readAfterObj = afterElem.Obj();
+ BSONElement opTimeElem;
+ auto opTimeStatus = bsonExtractTypedField(readAfterObj,
+ ReadAfterOpTimeArgs::kOpTimeFieldName,
+ Object,
+ &opTimeElem);
+
+ if (!opTimeStatus.isOK()) {
+ return opTimeStatus;
+ }
+
+ BSONObj opTimeObj = opTimeElem.Obj();
+ BSONElement timestampElem;
+
+ Timestamp timestamp;
+ auto timestampStatus = bsonExtractTimestampField(opTimeObj,
+ ReadAfterOpTimeArgs::kOpTimestampFieldName,
+ &timestamp);
+
+ if (!timestampStatus.isOK()) {
+ return timestampStatus;
+ }
+
+ long long termNumber;
+ auto termStatus = bsonExtractIntegerField(opTimeObj,
+ ReadAfterOpTimeArgs::kOpTermFieldName,
+ &termNumber);
+
+ if (!termStatus.isOK()) {
+ return termStatus;
+ }
+
+ long long timeoutMS;
+ auto timeoutStatus = bsonExtractIntegerFieldWithDefault(
+ readAfterObj,
+ ReadAfterOpTimeArgs::kTimeoutFieldName,
+ 0, // Default to no timeout.
+ &timeoutMS);
+
+ if (!timeoutStatus.isOK()) {
+ return timeoutStatus;
+ }
+
+ if (timeoutMS < 0) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << ReadAfterOpTimeArgs::kRootFieldName
+ << "." << ReadAfterOpTimeArgs::kTimeoutFieldName
+ << " value must be positive");
+ }
+
+ _opTime = OpTime(timestamp, termNumber);
+ _timeout = Milliseconds(timeoutMS); // Note: 'long long' -> 'long' down casting.
+
+ return Status::OK();
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/read_after_optime_args.h b/src/mongo/db/repl/read_after_optime_args.h
new file mode 100644
index 00000000000..a8703ad52f7
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_args.h
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "mongo/base/status.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+ class BSONObj;
+
+namespace repl {
+
+ class ReadAfterOpTimeArgs {
+ public:
+
+ static const std::string kRootFieldName;
+ static const std::string kOpTimeFieldName;
+ static const std::string kOpTimestampFieldName;
+ static const std::string kOpTermFieldName;
+ static const std::string kTimeoutFieldName;
+
+ ReadAfterOpTimeArgs();
+ ReadAfterOpTimeArgs(OpTime opTime, Milliseconds timeout);
+
+ /**
+ * Format:
+ * {
+ * find: “coll”,
+ * filter: <Query Object>,
+ * after: { // optional
+ * opTime: { ts: <timestamp>, term: <NumberLong> },
+ * timeoutMS: <NumberLong> //optional
+ * }
+ * }
+ */
+ Status initialize(const BSONObj& cmdObj);
+
+ const OpTime& getOpTime() const;
+ const Milliseconds& getTimeout() const;
+
+ private:
+
+ OpTime _opTime;
+ Milliseconds _timeout;
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/read_after_optime_args_test.cpp b/src/mongo/db/repl/read_after_optime_args_test.cpp
new file mode 100644
index 00000000000..d780500d9c2
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_args_test.cpp
@@ -0,0 +1,178 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/read_after_optime_args.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace repl {
+namespace {
+
+ TEST(ReadAfterParse, BasicFullSpecification) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(20, 30)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+
+ ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp());
+ ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm());
+ ASSERT_EQ(100, readAfterOpTime.getTimeout().total_milliseconds());
+ }
+
+ TEST(ReadAfterParse, Empty) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_OK(readAfterOpTime.initialize(BSON("find" << "test")));
+
+ ASSERT(readAfterOpTime.getOpTime().getTimestamp().isNull());
+ ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds());
+ }
+
+ TEST(ReadAfterParse, BadRootType) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName << "x")));
+ }
+
+ TEST(ReadAfterParse, BadOpTimeType) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName << 2))));
+ }
+
+ TEST(ReadAfterParse, OpTimeRequiredIfRootPresent) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+ }
+
+ TEST(ReadAfterParse, NoOpTimeTS) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+ }
+
+ TEST(ReadAfterParse, NoOpTimeTerm) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+ }
+
+ TEST(ReadAfterParse, BadOpTimeTSType) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << BSON("x" << 1)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+ }
+
+ TEST(ReadAfterParse, BadOpTimeTermType) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << "y")
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+ }
+
+ TEST(ReadAfterParse, TimeoutDefault) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)))));
+
+ ASSERT_EQ(Timestamp(1, 0), readAfterOpTime.getOpTime().getTimestamp());
+ ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm());
+ ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds());
+ }
+
+ TEST(ReadAfterParse, BadTimeoutType) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << "abc"))));
+ }
+
+ TEST(ReadAfterParse, NegativeTimeout) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << -100))));
+ }
+
+ TEST(ReadAfterParse, ZeroTimeout) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(20, 30)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 0))));
+
+ ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp());
+ ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm());
+ ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds());
+ }
+
+} // unnamed namespace
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/read_after_optime_response.cpp b/src/mongo/db/repl/read_after_optime_response.cpp
new file mode 100644
index 00000000000..7caffe09a96
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_response.cpp
@@ -0,0 +1,87 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/read_after_optime_response.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+
+using std::string;
+
+namespace mongo {
+namespace repl {
+
+ const string ReadAfterOpTimeResponse::kWaitedMSFieldName("waitedMS");
+
+ ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status):
+ ReadAfterOpTimeResponse(status, boost::posix_time::milliseconds(0), false) {
+ }
+
+ ReadAfterOpTimeResponse::ReadAfterOpTimeResponse():
+ ReadAfterOpTimeResponse(Status::OK()) {
+ }
+
+ ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status,
+ boost::posix_time::milliseconds duration):
+ ReadAfterOpTimeResponse(status, duration, true) {
+ }
+
+ ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status,
+ boost::posix_time::milliseconds duration,
+ bool waited):
+ _waited(waited),
+ _duration(duration),
+ _status(status) {
+ }
+
+ void ReadAfterOpTimeResponse::appendInfo(BSONObjBuilder* builder) {
+ if (!_waited) {
+ return;
+ }
+
+ builder->append(kWaitedMSFieldName,
+ static_cast<long long>(_duration.total_milliseconds()));
+ }
+
+ bool ReadAfterOpTimeResponse::didWait() const {
+ return _waited;
+ }
+
+ boost::posix_time::milliseconds ReadAfterOpTimeResponse::getDuration() const {
+ return _duration;
+ }
+
+ Status ReadAfterOpTimeResponse::getStatus() const {
+ return _status;
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/read_after_optime_response.h b/src/mongo/db/repl/read_after_optime_response.h
new file mode 100644
index 00000000000..b906dc196d4
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_response.h
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include <string>
+
+#include "mongo/base/status.h"
+
+namespace mongo {
+
+ class BSONObjBuilder;
+
+namespace repl {
+
+ class ReadAfterOpTimeResponse {
+ public:
+ static const std::string kWaitedMSFieldName;
+
+ /**
+ * Constructs a default response that has OK status, and wait is false.
+ */
+ ReadAfterOpTimeResponse();
+
+ /**
+ * Constructs a response with the given status with wait equals to false.
+ */
+ explicit ReadAfterOpTimeResponse(Status status);
+
+ /**
+ * Constructs a response with wait set to true along with the given parameters.
+ */
+ ReadAfterOpTimeResponse(Status status, boost::posix_time::milliseconds duration);
+
+ /**
+ * Appends to the builder the timeout and duration info if didWait() is true.
+ * Note: does not include status.
+ */
+ void appendInfo(BSONObjBuilder* builder);
+
+ bool didWait() const;
+
+ /**
+ * Returns the amount of duration waiting for opTime to pass.
+ * Valid only if didWait is true.
+ */
+ boost::posix_time::milliseconds getDuration() const;
+
+ /**
+ * Returns more details about an error if it occurred.
+ */
+ Status getStatus() const;
+
+ private:
+ ReadAfterOpTimeResponse(Status status,
+ boost::posix_time::milliseconds duration,
+ bool waited);
+
+ bool _waited;
+ boost::posix_time::milliseconds _duration;
+ Status _status;
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/read_after_optime_response_test.cpp b/src/mongo/db/repl/read_after_optime_response_test.cpp
new file mode 100644
index 00000000000..a30824a57b8
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_response_test.cpp
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/read_after_optime_response.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace repl {
+namespace {
+
+ TEST(ReadAfterResponse, Default) {
+ ReadAfterOpTimeResponse response;
+
+ ASSERT_FALSE(response.didWait());
+
+ BSONObjBuilder builder;
+ response.appendInfo(&builder);
+
+ BSONObj obj(builder.done());
+ ASSERT_TRUE(obj.isEmpty());
+ }
+
+ TEST(ReadAfterResponse, WithStatus) {
+ ReadAfterOpTimeResponse response(Status(ErrorCodes::InternalError, "test"));
+
+ ASSERT_FALSE(response.didWait());
+
+ ASSERT_EQ(ErrorCodes::InternalError, response.getStatus().code());
+
+ BSONObjBuilder builder;
+ response.appendInfo(&builder);
+
+ BSONObj obj(builder.done());
+ ASSERT_TRUE(obj.isEmpty());
+ }
+
+ TEST(ReadAfterResponse, WaitedWithDuration) {
+ ReadAfterOpTimeResponse response(Status(ErrorCodes::InternalError, "test"),
+ boost::posix_time::milliseconds(7));
+
+ ASSERT_TRUE(response.didWait());
+ ASSERT_EQUALS(7, response.getDuration().total_milliseconds());
+ ASSERT_EQ(ErrorCodes::InternalError, response.getStatus().code());
+
+ BSONObjBuilder builder;
+ response.appendInfo(&builder);
+
+ BSONObj obj(builder.done());
+ auto waitedMSElem = obj[ReadAfterOpTimeResponse::kWaitedMSFieldName];
+ ASSERT_TRUE(waitedMSElem.isNumber());
+ ASSERT_EQ(7, waitedMSElem.numberLong());
+ }
+
+} // unnamed namespace
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 660c7b47407..9201f4af326 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -55,6 +55,8 @@ namespace repl {
class IsMasterResponse;
class OplogReader;
class OpTime;
+ class ReadAfterOpTimeArgs;
+ class ReadAfterOpTimeResponse;
class ReplSetDeclareElectionWinnerArgs;
class ReplSetDeclareElectionWinnerResponse;
class ReplSetHeartbeatArgs;
@@ -288,6 +290,23 @@ namespace repl {
virtual Timestamp getMyLastOptime() const = 0;
/**
+ * Waits until the optime of the current node is at least the opTime specified in
+ * 'settings'.
+ *
+ * The returned ReadAfterOpTimeResponse object's didWait() method returns true if
+ * an attempt was made to wait for the specified opTime. Cases when this can be
+ * false could include:
+ *
+ * 1. No read after opTime was specified.
+ * 2. Attempting to do read after opTime when node is not a replica set member.
+ *
+ * Note: getDuration() on the returned ReadAfterOpTimeResponse will only be valid if
+ * its didWait() method returns true.
+ */
+ virtual ReadAfterOpTimeResponse waitUntilOpTime(const OperationContext* txn,
+ const ReadAfterOpTimeArgs& settings) = 0;
+
+ /**
* Retrieves and returns the current election id, which is a unique id that is local to
* this node and changes every time we become primary.
* TODO(spencer): Use term instead.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 246c973dc5e..a83b37b9b76 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -34,6 +34,7 @@
#include <algorithm>
#include <boost/thread.hpp>
+#include <limits>
#include "mongo/base/status.h"
#include "mongo/db/concurrency/d_concurrency.h"
@@ -45,6 +46,8 @@
#include "mongo/db/repl/freshness_checker.h"
#include "mongo/db/repl/handshake_args.h"
#include "mongo/db/repl/is_master_response.h"
+#include "mongo/db/repl/read_after_optime_args.h"
+#include "mongo/db/repl/read_after_optime_response.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_declare_election_winner_args.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
@@ -736,10 +739,19 @@ namespace {
if (getReplicationMode() != modeReplSet) {
return;
}
+
+ for (auto& opTimeWaiter : _opTimeWaiterList) {
+ if (*(opTimeWaiter->opTime) <= ts) {
+ opTimeWaiter->condVar->notify_all();
+ }
+ }
+
if (_getMemberState_inlock().primary()) {
return;
}
+
lock->unlock();
+
_externalState->forwardSlaveProgress(); // Must do this outside _mutex
}
@@ -758,6 +770,85 @@ namespace {
return _getMyLastOptime_inlock();
}
+ ReadAfterOpTimeResponse ReplicationCoordinatorImpl::waitUntilOpTime(
+ const OperationContext* txn,
+ const ReadAfterOpTimeArgs& settings) {
+ // TODO: SERVER-18217 use OpTime directly.
+ const auto& ts = settings.getOpTime().getTimestamp();
+ const auto& timeout = settings.getTimeout();
+
+ if (ts.isNull()) {
+ return ReadAfterOpTimeResponse();
+ }
+
+ if (getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) {
+ return ReadAfterOpTimeResponse(Status(ErrorCodes::NotAReplicaSet,
+ "node needs to be a replica set member to use read after opTime"));
+ }
+
+ // TODO: SERVER-18298 enable code once V1 protocol is fully implemented.
+#if 0
+ if (!isV1ElectionProtocol()) {
+ return ReadAfterOpTimeResponse(Status(ErrorCodes::IncompatibleElectionProtocol,
+ "node needs to be running on v1 election protocol to "
+ "use read after opTime"));
+ }
+#endif
+
+ Timer timer;
+ boost::unique_lock<boost::mutex> lock(_mutex);
+
+ while (ts > _getMyLastOptime_inlock()) {
+ Status interruptedStatus = txn->checkForInterruptNoAssert();
+ if (!interruptedStatus.isOK()) {
+ return ReadAfterOpTimeResponse(interruptedStatus,
+ Milliseconds(timer.millis()));
+ }
+
+ if (_inShutdown) {
+ return ReadAfterOpTimeResponse(
+ Status(ErrorCodes::ShutdownInProgress, "shutting down"),
+ Milliseconds(timer.millis()));
+ }
+
+ const auto elapsedMS = timer.millis();
+ if (timeout.total_milliseconds() > 0 &&
+ elapsedMS > timeout.total_milliseconds()) {
+ return ReadAfterOpTimeResponse(
+ Status(ErrorCodes::ReadAfterOptimeTimeout,
+ str::stream() << "timed out waiting for opTime: "
+ << ts.toStringPretty()),
+ Milliseconds(timer.millis()));
+ }
+
+ boost::condition_variable condVar;
+ WaiterInfo waitInfo(&_opTimeWaiterList,
+ txn->getOpID(),
+ &ts,
+ nullptr, // Don't care about write concern.
+ &condVar);
+
+ uint64_t maxTimeMicrosRemaining = txn->getRemainingMaxTimeMicros();
+ auto maxTimeMSRemaining = (maxTimeMicrosRemaining == 0) ?
+ std::numeric_limits<uint64_t>::max() : (maxTimeMicrosRemaining / 1000);
+
+ auto timeoutMSRemaining = (timeout.total_milliseconds() == 0) ?
+ std::numeric_limits<uint64_t>::max() :
+ static_cast<uint64_t>(timeout.total_milliseconds() - elapsedMS);
+
+ auto sleepTimeMS = std::min(maxTimeMSRemaining, timeoutMSRemaining);
+
+ if (sleepTimeMS == std::numeric_limits<uint64_t>::max()) {
+ condVar.wait(lock);
+ }
+ else {
+ condVar.timed_wait(lock, Milliseconds(sleepTimeMS));
+ }
+ }
+
+ return ReadAfterOpTimeResponse(Status::OK(), Milliseconds(timer.millis()));
+ }
+
Timestamp ReplicationCoordinatorImpl::_getMyLastOptime_inlock() const {
return _slaveInfo[_getMyIndexInSlaveInfo_inlock()].opTime;
}
@@ -847,6 +938,13 @@ namespace {
}
}
+ for (auto& opTimeWaiter : _opTimeWaiterList) {
+ if (opTimeWaiter->opID == opId) {
+ opTimeWaiter->condVar->notify_all();
+ return;
+ }
+ }
+
_replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback,
this,
@@ -861,6 +959,10 @@ namespace {
info->condVar->notify_all();
}
+ for (auto& opTimeWaiter : _opTimeWaiterList) {
+ opTimeWaiter->condVar->notify_all();
+ }
+
_replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback,
this,
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 83c80b73bb7..16223c17e86 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -91,21 +91,21 @@ namespace repl {
// ================== Members of public ReplicationCoordinator API ===================
- virtual void startReplication(OperationContext* txn);
+ virtual void startReplication(OperationContext* txn) override;
- virtual void shutdown();
+ virtual void shutdown() override;
- virtual const ReplSettings& getSettings() const;
+ virtual const ReplSettings& getSettings() const override;
- virtual Mode getReplicationMode() const;
+ virtual Mode getReplicationMode() const override;
- virtual MemberState getMemberState() const;
+ virtual MemberState getMemberState() const override;
- virtual bool isInPrimaryOrSecondaryState() const;
+ virtual bool isInPrimaryOrSecondaryState() const override;
- virtual Seconds getSlaveDelaySecs() const;
+ virtual Seconds getSlaveDelaySecs() const override;
- virtual void clearSyncSourceBlacklist();
+ virtual void clearSyncSourceBlacklist() override;
/*
* Implementation of the KillOpListenerInterface interrupt method so that we can wake up
@@ -156,107 +156,112 @@ namespace repl {
virtual Timestamp getMyLastOptime() const;
- virtual OpTime getMyLastOptimeV1() const;
+ virtual OpTime getMyLastOptimeV1() const override;
- virtual OID getElectionId();
+ virtual ReadAfterOpTimeResponse waitUntilOpTime(
+ const OperationContext* txn,
+ const ReadAfterOpTimeArgs& settings) override;
+
+ virtual OID getElectionId() override;
- virtual OID getMyRID() const;
+ virtual OID getMyRID() const override;
- virtual int getMyId() const;
+ virtual int getMyId() const override;
- virtual bool setFollowerMode(const MemberState& newState);
+ virtual bool setFollowerMode(const MemberState& newState) override;
- virtual bool isWaitingForApplierToDrain();
+ virtual bool isWaitingForApplierToDrain() override;
- virtual void signalDrainComplete(OperationContext* txn);
+ virtual void signalDrainComplete(OperationContext* txn) override;
- virtual void signalUpstreamUpdater();
+ virtual void signalUpstreamUpdater() override;
- virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder);
+ virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) override;
- virtual Status processReplSetGetStatus(BSONObjBuilder* result);
+ virtual Status processReplSetGetStatus(BSONObjBuilder* result) override;
- virtual void fillIsMasterForReplSet(IsMasterResponse* result);
+ virtual void fillIsMasterForReplSet(IsMasterResponse* result) override;
- virtual void appendSlaveInfoData(BSONObjBuilder* result);
+ virtual void appendSlaveInfoData(BSONObjBuilder* result) override;
- virtual ReplicaSetConfig getConfig() const;
+ virtual ReplicaSetConfig getConfig() const override;
- virtual void processReplSetGetConfig(BSONObjBuilder* result);
+ virtual void processReplSetGetConfig(BSONObjBuilder* result) override;
- virtual Status setMaintenanceMode(bool activate);
+ virtual Status setMaintenanceMode(bool activate) override;
- virtual bool getMaintenanceMode();
+ virtual bool getMaintenanceMode() override;
virtual Status processReplSetSyncFrom(const HostAndPort& target,
- BSONObjBuilder* resultObj);
+ BSONObjBuilder* resultObj) override;
- virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj);
+ virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override;
virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args,
- ReplSetHeartbeatResponse* response);
+ ReplSetHeartbeatResponse* response) override;
virtual Status processReplSetReconfig(OperationContext* txn,
const ReplSetReconfigArgs& args,
- BSONObjBuilder* resultObj);
+ BSONObjBuilder* resultObj) override;
virtual Status processReplSetInitiate(OperationContext* txn,
const BSONObj& configObj,
- BSONObjBuilder* resultObj);
+ BSONObjBuilder* resultObj) override;
- virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj);
+ virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) override;
- virtual void incrementRollbackID();
+ virtual void incrementRollbackID() override;
virtual Status processReplSetFresh(const ReplSetFreshArgs& args,
- BSONObjBuilder* resultObj);
+ BSONObjBuilder* resultObj) override;
virtual Status processReplSetElect(const ReplSetElectArgs& args,
- BSONObjBuilder* response);
+ BSONObjBuilder* response) override;
virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates,
- long long* configVersion);
+ long long* configVersion) override;
- virtual Status processHandshake(OperationContext* txn, const HandshakeArgs& handshake);
+ virtual Status processHandshake(OperationContext* txn,
+ const HandshakeArgs& handshake) override;
- virtual bool buildsIndexes();
+ virtual bool buildsIndexes() override;
- virtual std::vector<HostAndPort> getHostsWrittenTo(const Timestamp& op);
+ virtual std::vector<HostAndPort> getHostsWrittenTo(const Timestamp& op) override;
- virtual std::vector<HostAndPort> getOtherNodesInReplSet() const;
+ virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override;
- virtual WriteConcernOptions getGetLastErrorDefault();
+ virtual WriteConcernOptions getGetLastErrorDefault() override;
- virtual Status checkReplEnabledForCommand(BSONObjBuilder* result);
+ virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override;
- virtual bool isReplEnabled() const;
+ virtual bool isReplEnabled() const override;
- virtual HostAndPort chooseNewSyncSource();
+ virtual HostAndPort chooseNewSyncSource() override;
- virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
+ virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
- virtual void resetLastOpTimeFromOplog(OperationContext* txn);
+ virtual void resetLastOpTimeFromOplog(OperationContext* txn) override;
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource);
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) override;
- virtual Timestamp getLastCommittedOpTime() const;
+ virtual Timestamp getLastCommittedOpTime() const override;
virtual Status processReplSetRequestVotes(OperationContext* txn,
const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response);
+ ReplSetRequestVotesResponse* response) override;
virtual Status processReplSetDeclareElectionWinner(
const ReplSetDeclareElectionWinnerArgs& args,
- long long* responseTerm);
+ long long* responseTerm) override;
virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder);
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
- ReplSetHeartbeatResponseV1* response);
+ ReplSetHeartbeatResponseV1* response) override;
- virtual bool isV1ElectionProtocol();
+ virtual bool isV1ElectionProtocol() override;
- virtual void summarizeAsHtml(ReplSetHtmlSummary* s);
+ virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override;
// ================== Test support API ===================
@@ -880,6 +885,10 @@ namespace repl {
// WaiterInfos.
std::vector<WaiterInfo*> _replicationWaiterList; // (M)
+ // list of information about clients waiting for a particular opTime.
+ // Does *not* own the WaiterInfos.
+ std::vector<WaiterInfo*> _opTimeWaiterList; // (M)
+
// Set to true when we are in the process of shutting down replication.
bool _inShutdown; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index f486fed8694..b33b87b1a83 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -32,6 +32,7 @@
#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
+#include <future>
#include <iostream>
#include <memory>
#include <set>
@@ -42,6 +43,9 @@
#include "mongo/db/repl/is_master_response.h"
#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/operation_context_repl_mock.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/read_after_optime_args.h"
+#include "mongo/db/repl/read_after_optime_response.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replica_set_config.h"
@@ -57,12 +61,15 @@
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
+#include "mongo/util/time_support.h"
+#include "mongo/util/timer.h"
namespace mongo {
namespace repl {
namespace {
typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs;
+ Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted");
TEST_F(ReplCoordTest, StartupWithValidLocalConfig) {
assertStartSuccess(
@@ -324,7 +331,7 @@ namespace {
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
BSONObjBuilder result1;
- getExternalState()->setStoreLocalConfigDocumentStatus(Status(ErrorCodes::OutOfDiskSpace,
+ getExternalState()->setStoreLocalConfigDocumentStatus(Status(ErrorCodes::OutOfDiskSpace,
"The test set this"));
ASSERT_EQUALS(
ErrorCodes::OutOfDiskSpace,
@@ -840,50 +847,9 @@ namespace {
awaiter.reset();
}
- class OperationContextNoopWithInterrupt : public OperationContextReplMock {
- public:
-
- OperationContextNoopWithInterrupt() : _opID(0), _interruptOp(false) {}
-
- virtual unsigned int getOpID() const {
- return _opID;
- }
-
- /**
- * Can only be called before any multi-threaded access to this object has begun.
- */
- void setOpID(unsigned int opID) {
- _opID = opID;
- }
-
- virtual void checkForInterrupt() const {
- if (_interruptOp) {
- uasserted(ErrorCodes::Interrupted, "operation was interrupted");
- }
- }
-
- virtual Status checkForInterruptNoAssert() const {
- if (_interruptOp) {
- return Status(ErrorCodes::Interrupted, "operation was interrupted");
- }
- return Status::OK();
- }
-
- /**
- * Can only be called before any multi-threaded access to this object has begun.
- */
- void setInterruptOp(bool interrupt) {
- _interruptOp = interrupt;
- }
-
- private:
- unsigned int _opID;
- bool _interruptOp;
- };
-
TEST_F(ReplCoordTest, AwaitReplicationInterrupt) {
// Tests that a thread blocked in awaitReplication can be killed by a killOp operation
- OperationContextNoopWithInterrupt txn;
+ OperationContextReplMock txn;
assertStartSuccess(
BSON("_id" << "mySet" <<
"version" << 2 <<
@@ -914,7 +880,7 @@ namespace {
ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time1));
- txn.setInterruptOp(true);
+ txn.setCheckForInterruptStatus(kInterruptedStatus);
getReplCoord()->interrupt(opID);
ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status);
@@ -1202,7 +1168,7 @@ namespace {
}
TEST_F(StepDownTest, InterruptStepDown) {
- OperationContextNoopWithInterrupt txn;
+ OperationContextReplMock txn;
Timestamp optime1(100, 1);
Timestamp optime2(100, 2);
// No secondary is caught up
@@ -1223,7 +1189,7 @@ namespace {
unsigned int opID = 100;
txn.setOpID(opID);
- txn.setInterruptOp(true);
+ txn.setCheckForInterruptStatus(kInterruptedStatus);
getReplCoord()->interrupt(opID);
ASSERT_EQUALS(ErrorCodes::Interrupted, runner.getResult());
@@ -1558,7 +1524,7 @@ namespace {
IsMasterResponse roundTripped;
ASSERT_OK(roundTripped.initialize(response.toBSON()));
}
-
+
TEST_F(ReplCoordTest, ShutDownBeforeStartUpFinished) {
init();
startCapturingLogMessages();
@@ -1945,6 +1911,172 @@ namespace {
ASSERT_EQUALS(newTime, getReplCoord()->getLastCommittedOpTime());
}
+ TEST_F(ReplCoordTest, CantUseReadAfterIfNotReplSet) {
+ init(ReplSettings());
+ OperationContextNoop txn;
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1),
+ Milliseconds(0)));
+
+ ASSERT_FALSE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterWhileShutdown) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(10, 0));
+
+ shutdown();
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1),
+ Milliseconds(0)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterInterrupted) {
+ OperationContextReplMock txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(10, 0));
+
+ txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test"));
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1),
+ Milliseconds(0)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterNoOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs());
+
+ ASSERT_FALSE(result.didWait());
+ ASSERT_OK(result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterGreaterOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(100, 0));
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1),
+ Milliseconds(100)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterEqualOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+
+ OpTime time(Timestamp(100, 0), 1);
+ getReplCoord()->setMyLastOptime(time.getTimestamp());
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(time, Milliseconds(100)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterDeferredGreaterOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(0, 0));
+
+ auto pseudoLogOp = std::async(std::launch::async, [this]() {
+ // Not guaranteed to be scheduled after waitUnitl blocks...
+ getReplCoord()->setMyLastOptime(Timestamp(200, 0));
+ });
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(100, 0), 1),
+ Milliseconds(0)));
+ pseudoLogOp.get();
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterDeferredEqualOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(0, 0));
+
+ repl::OpTime opTimeToWait(Timestamp(100, 0), 1);
+
+ auto pseudoLogOp = std::async(std::launch::async, [this, &opTimeToWait]() {
+ // Not guaranteed to be scheduled after waitUnitl blocks...
+ getReplCoord()->setMyLastOptime(opTimeToWait.getTimestamp());
+ });
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(opTimeToWait, Milliseconds(0)));
+ pseudoLogOp.get();
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterOpTimeTimeoutNoMaxTimeMS) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(100, 0));
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(200, 0), 1), Milliseconds(10)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::ReadAfterOptimeTimeout, result.getStatus());
+ }
+
// TODO(schwerin): Unit test election id updating
} // namespace
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 9d7f28e62af..ca9f32605ed 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -32,6 +32,8 @@
#include "mongo/base/status.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/db/repl/read_after_optime_args.h"
+#include "mongo/db/repl/read_after_optime_response.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/optime.h"
#include "mongo/util/assert_util.h"
@@ -145,6 +147,12 @@ namespace repl {
return Timestamp();
}
+ ReadAfterOpTimeResponse ReplicationCoordinatorMock::waitUntilOpTime(
+ const OperationContext* txn,
+ const ReadAfterOpTimeArgs& settings) {
+ return ReadAfterOpTimeResponse();
+ }
+
OID ReplicationCoordinatorMock::getElectionId() {
// TODO
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 79cf56507c8..d4f8bd33754 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -103,6 +103,10 @@ namespace repl {
virtual Timestamp getMyLastOptime() const;
+ virtual ReadAfterOpTimeResponse waitUntilOpTime(
+ const OperationContext* txn,
+ const ReadAfterOpTimeArgs& settings) override;
+
virtual OID getElectionId();
virtual OID getMyRID() const;