diff options
author | Randolph Tan <randolph@10gen.com> | 2015-04-20 15:36:09 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2015-05-11 10:09:05 -0400 |
commit | 9f0ceef0b37df2525cdebb172e6b05e2db8a2b20 (patch) | |
tree | 11fa779defcac750089916964aeee605d3330a17 | |
parent | b73ab5765fb75ae87ee5ad0f4afbc4fdfc2bc151 (diff) | |
download | mongo-9f0ceef0b37df2525cdebb172e6b05e2db8a2b20.tar.gz |
SERVER-18195 Read after optime (repl only)
31 files changed, 1347 insertions, 151 deletions
diff --git a/buildscripts/resmokeconfig/suites/core_small_oplog_rs.yml b/buildscripts/resmokeconfig/suites/core_small_oplog_rs.yml index 2d44c5675e2..7dda737496a 100644 --- a/buildscripts/resmokeconfig/suites/core_small_oplog_rs.yml +++ b/buildscripts/resmokeconfig/suites/core_small_oplog_rs.yml @@ -11,6 +11,7 @@ selector: - jstests/core/dropdb_race.js - jstests/core/opcounters_write_cmd.js - jstests/core/rename.js + - jstests/core/read_after_optime.js executor: js_test: diff --git a/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml index 86ef6bdc3d3..842f4b5d5bd 100644 --- a/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml @@ -45,6 +45,8 @@ selector: - jstests/core/top.js - jstests/core/dbcase.js # SERVER-11735 - jstests/core/dbcase2.js # SERVER-11735 + # TODO: SERVER-17284 remove once find cmd is implemented in mongos + - jstests/core/read_after_optime.js executor: js_test: diff --git a/buildscripts/smoke.py b/buildscripts/smoke.py index faa37622857..93fa2b38ad3 100755 --- a/buildscripts/smoke.py +++ b/buildscripts/smoke.py @@ -443,10 +443,19 @@ def skipTest(path): parentPath = os.path.dirname(path) parentDir = os.path.basename(parentPath) if small_oplog or small_oplog_rs: # For tests running in parallel - if basename in ["cursor8.js", "indexh.js", "dropdb.js", "dropdb_race.js", - "connections_opened.js", "opcounters_write_cmd.js", "dbadmin.js", + if basename in ["cursor8.js", + "indexh.js", + "dropdb.js", + "dropdb_race.js", + "connections_opened.js", + "opcounters_write_cmd.js", + "dbadmin.js", + # Should not run in repl mode: + "read_after_optime.js", ## Capped tests - "capped_max1.js", "capped_convertToCapped1.js", "rename.js"]: + "capped_max1.js", + "capped_convertToCapped1.js", + "rename.js"]: return True if auth or keyFile: # For tests running with auth # Skip any tests that run with auth explicitly diff --git a/jstests/core/group_empty.js b/jstests/core/group_empty.js index 62a734ed0f8..c1772f88b22 100644 --- a/jstests/core/group_empty.js +++ b/jstests/core/group_empty.js @@ -5,4 +5,8 @@ t.drop(); res1 = db.runCommand({group: {$reduce: function(){}, ns: 'group_empty', cond: {}, key: {}, initial: {count: 0}}}); t.ensureIndex( { x : 1 } ); res2 = db.runCommand({group: {$reduce: function(){}, ns: 'group_empty', cond: {}, key: {}, initial: {count: 0}}}); -assert.eq( res1, res2 ); + +assert.docEq(res1.retval, res2.retval); +assert.eq(res1.keys, res2.keys); +assert.eq(res1.count, res2.count); + diff --git a/jstests/core/read_after_optime.js b/jstests/core/read_after_optime.js new file mode 100644 index 00000000000..d185b660887 --- /dev/null +++ b/jstests/core/read_after_optime.js @@ -0,0 +1,22 @@ +// Test that attempting to read after optime fails if replication is not enabled. + +(function() { +"use strict"; + +var currentTime = new Date(); + +var futureOpTime = new Timestamp((currentTime / 1000 + 3600), 0); + +var res = assert.commandFailed(db.runCommand({ + find: 'user', + filter: { x: 1 }, + after: { + opTime: { ts: futureOpTime, term: 0 } + } +})); + +assert.eq(123, res.code); // ErrorCodes::NotAReplicaSet +assert.eq(null, res.waitedMS); + +})(); + diff --git a/jstests/replsets/drop_oplog.js b/jstests/replsets/drop_oplog.js index 96c7582c4d9..90c920e1b27 100644 --- a/jstests/replsets/drop_oplog.js +++ b/jstests/replsets/drop_oplog.js @@ -10,16 +10,10 @@ var ml = master.getDB( 'local' ); var threw = false; - try { - ml.oplog.rs.drop(); - } - catch (err) { - assert.eq(err, - "Error: drop failed: { \"ok\" : 0, \"errmsg\" : " + - "\"can't drop live oplog while replicating\" }"); - threw = true; - } - assert(threw); + + var ret = assert.commandFailed(ml.runCommand({ drop: 'oplog.rs' })); + assert.eq('can\'t drop live oplog while replicating', ret.errmsg); + var dropOutput = ml.dropDatabase(); assert.eq(dropOutput.ok, 0); assert.eq(dropOutput.errmsg, "Cannot drop 'local' database while replication is active"); diff --git a/jstests/replsets/read_after_optime_timeout.js b/jstests/replsets/read_after_optime_timeout.js new file mode 100644 index 00000000000..0efee4fd8da --- /dev/null +++ b/jstests/replsets/read_after_optime_timeout.js @@ -0,0 +1,105 @@ +// Test read after opTime functionality with maxTimeMS. + +(function() { +"use strict"; + +var replTest = new ReplSetTest({ nodes: 2 }); +replTest.startSet(); + +var config = replTest.getReplSetConfig(); +// TODO: SERVER-18298 uncomment once implemented. +//config.protocolVersion = 1; +replTest.initiate(config); + +var runTest = function(testDB, primaryConn) { + primaryConn.getDB('test').user.insert({ x: 1 }, { writeConcern: { w: 2 }}); + + var localDB = primaryConn.getDB('local'); + + var oplogTS = localDB.oplog.rs.find().sort({ $natural: -1 }).limit(1).next().ts; + var twoSecTS = new Timestamp(oplogTS.getTime() + 2, 0); + + // Test timeout with maxTimeMS < after.opTime + var res = assert.commandFailed(testDB.runCommand({ + find: 'user', + filter: { x: 1 }, + after: { + opTime: { ts: twoSecTS, term: 0 }, + timeoutMS: 10 * 1000 + }, + maxTimeMS: 1000 + })); + + assert.eq(50, res.code); // ErrorCodes::ExceededTimeLimit + assert.gt(res.waitedMS, 500); + assert.lt(res.waitedMS, 2500); + + // Test timeout with after.opTime < maxTimeMS + res = assert.commandFailed(testDB.runCommand({ + find: 'user', + filter: { x: 1 }, + after: { + opTime: { ts: twoSecTS, term: 0 }, + timeoutMS: 1 * 1000 + }, + maxTimeMS: 10 * 1000 + })); + + assert.eq(122, res.code); // ErrorCodes::ReadAfterOptimeTimeout + assert.gt(res.waitedMS, 500); + assert.lt(res.waitedMS, 2500); + + // Test timeout with maxTimeMS, no after.timeout + res = assert.commandFailed(testDB.runCommand({ + find: 'user', + filter: { x: 1 }, + after: { + opTime: { ts: twoSecTS, term: 0 } + }, + maxTimeMS: 1 * 1000 + })); + + assert.eq(50, res.code); // ErrorCodes::ExceededTimeLimit + assert.gt(res.waitedMS, 500); + assert.lt(res.waitedMS, 2500); + + // Test timeout with after.opTime, no maxTimeMS + res = assert.commandFailed(testDB.runCommand({ + find: 'user', + filter: { x: 1 }, + after: { + opTime: { ts: twoSecTS, term: 0 }, + timeoutMS: 1 * 1000 + } + })); + + assert.eq(122, res.code); // ErrorCodes::ReadAfterOptimeTimeout + assert.gt(res.waitedMS, 500); + assert.lt(res.waitedMS, 2500); + + // Test read on future opTime that will eventually occur. + var insertFunc = startParallelShell( + "sleep(2100); db.user.insert({ y: 1 }, { writeConcern: { w: 2 }});", + primaryConn.port); + + res = assert.commandWorked(testDB.runCommand({ + find: 'user', + filter: { x: 1 }, + after: { + opTime: { ts: twoSecTS, term: 0 } + } + })); + + assert.eq(null, res.code); + assert.gt(res.waitedMS, 0); + + insertFunc(); +}; + +var primary = replTest.getPrimary(); +runTest(primary.getDB('test'), primary); +runTest(replTest.getSecondary().getDB('test'), primary); + +replTest.stopSet(); + +})(); diff --git a/jstests/slow2/sharding_jscore_passthrough.js b/jstests/slow2/sharding_jscore_passthrough.js index 44598d4ddc5..04bcd73235c 100644 --- a/jstests/slow2/sharding_jscore_passthrough.js +++ b/jstests/slow2/sharding_jscore_passthrough.js @@ -86,7 +86,10 @@ var db; 'auth1|' + 'auth2|' + 'dropdb_race|' + - 'unix_socket\\d*' + + 'unix_socket\\d*|' + + // TODO: SERVER-17284 remove once find cmd is + // implemented in mongos + 'read_after_optime' + ')\.js$'); // These are bugs (some might be fixed now): diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 44cc990045a..8bb00b55e39 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -121,6 +121,9 @@ error_code("NamespaceNotSharded", 118) error_code("InvalidSyncSource", 119) error_code("OplogStartMissing", 120) error_code("DocumentValidationFailure", 121) # Only for the document validator on collections. +error_code("ReadAfterOptimeTimeout", 122) +error_code("NotAReplicaSet", 123) +error_code("IncompatibleElectionProtocol", 124) # Non-sequential error codes (for compatibility only) error_code("NotMaster", 10107) #this comes from assert_util.h diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 8bbe2b4ea35..89159347ed0 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -82,6 +82,9 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repair_database.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/read_after_optime_args.h" +#include "mongo/db/repl/read_after_optime_response.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -1186,14 +1189,14 @@ namespace mongo { bool _impersonation; }; - namespace { - void appendGLEHelperData(BSONObjBuilder& bob, const Timestamp& opTime, const OID& oid) { - BSONObjBuilder subobj(bob.subobjStart(kGLEStatsFieldName)); - subobj.append(kGLEStatsLastOpTimeFieldName, opTime); - subobj.appendOID(kGLEStatsElectionIdFieldName, const_cast<OID*>(&oid)); - subobj.done(); - } +namespace { + void appendGLEHelperData(BSONObjBuilder& bob, const Timestamp& opTime, const OID& oid) { + BSONObjBuilder subobj(bob.subobjStart(kGLEStatsFieldName)); + subobj.append(kGLEStatsLastOpTimeFieldName, opTime); + subobj.appendOID(kGLEStatsElectionIdFieldName, const_cast<OID*>(&oid)); + subobj.done(); } +} // unnamed namespace /** * this handles @@ -1334,6 +1337,25 @@ namespace mongo { c->_commandsExecuted.increment(); + { + // Handle read after opTime. + + repl::ReadAfterOpTimeArgs readAfterOptimeSettings; + auto readAfterParseStatus = readAfterOptimeSettings.initialize(cmdObj); + if (!readAfterParseStatus.isOK()) { + appendCommandStatus(result, readAfterParseStatus); + return; + } + + auto readAfterResult = replCoord->waitUntilOpTime(txn, readAfterOptimeSettings); + readAfterResult.appendInfo(&result); + + if (!readAfterResult.getStatus().isOK()) { + appendCommandStatus(result, readAfterResult.getStatus()); + return; + } + } + retval = _execCommand(txn, c, dbname, cmdObj, queryOptions, errmsg, result); if ( !retval ){ diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index a5815f9d4eb..9a94d762eed 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -120,6 +120,8 @@ namespace mongo { */ virtual CurOp* getCurOp() const = 0; + virtual uint64_t getRemainingMaxTimeMicros() const = 0; + /** * Returns the operation ID associated with this operation. * WARNING: Due to SERVER-14995, this OpID is not guaranteed to stay the same for the diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp index bd29ce23251..6936e32df02 100644 --- a/src/mongo/db/operation_context_impl.cpp +++ b/src/mongo/db/operation_context_impl.cpp @@ -134,6 +134,10 @@ namespace { return getCurOp()->opNum(); } + uint64_t OperationContextImpl::getRemainingMaxTimeMicros() const { + return getCurOp()->getRemainingMaxTimeMicros(); + } + // Enabling the checkForInterruptFail fail point will start a game of random chance on the // connection specified in the fail point data, generating an interrupt with a given fixed // probability. Example invocation: diff --git a/src/mongo/db/operation_context_impl.h b/src/mongo/db/operation_context_impl.h index 735e59db6e4..5f9f9f1903a 100644 --- a/src/mongo/db/operation_context_impl.h +++ b/src/mongo/db/operation_context_impl.h @@ -40,34 +40,36 @@ namespace mongo { virtual ~OperationContextImpl(); - virtual RecoveryUnit* recoveryUnit() const; + virtual RecoveryUnit* recoveryUnit() const override; - virtual RecoveryUnit* releaseRecoveryUnit(); + virtual RecoveryUnit* releaseRecoveryUnit() override; - virtual void setRecoveryUnit(RecoveryUnit* unit); + virtual void setRecoveryUnit(RecoveryUnit* unit) override; - virtual Locker* lockState() const; + virtual Locker* lockState() const override; virtual ProgressMeter* setMessage(const char* msg, const std::string& name, unsigned long long progressMeterTotal, - int secondsBetween); + int secondsBetween) override; - virtual std::string getNS() const; + virtual std::string getNS() const override; - virtual Client* getClient() const; + virtual Client* getClient() const override; - virtual CurOp* getCurOp() const; + virtual CurOp* getCurOp() const override; - virtual unsigned int getOpID() const; + virtual unsigned int getOpID() const override; - virtual void checkForInterrupt() const; - virtual Status checkForInterruptNoAssert() const; + virtual uint64_t getRemainingMaxTimeMicros() const override; - virtual bool isPrimaryFor( StringData ns ); + virtual void checkForInterrupt() const override; + virtual Status checkForInterruptNoAssert() const override; - virtual void setReplicatedWrites(bool writesAreReplicated = true); - virtual bool writesAreReplicated() const; + virtual bool isPrimaryFor( StringData ns ) override; + + virtual void setReplicatedWrites(bool writesAreReplicated = true) override; + virtual bool writesAreReplicated() const override; private: std::auto_ptr<RecoveryUnit> _recovery; diff --git a/src/mongo/db/operation_context_noop.h b/src/mongo/db/operation_context_noop.h index 0b2650531ce..f27f3d1ef96 100644 --- a/src/mongo/db/operation_context_noop.h +++ b/src/mongo/db/operation_context_noop.h @@ -53,62 +53,66 @@ namespace mongo { virtual ~OperationContextNoop() { } - virtual Client* getClient() const { + virtual Client* getClient() const override { invariant(false); return NULL; } - virtual CurOp* getCurOp() const { + virtual CurOp* getCurOp() const override { invariant(false); return NULL; } - virtual RecoveryUnit* recoveryUnit() const { + virtual RecoveryUnit* recoveryUnit() const override { return _recoveryUnit.get(); } - virtual RecoveryUnit* releaseRecoveryUnit() { + virtual RecoveryUnit* releaseRecoveryUnit() override { return _recoveryUnit.release(); } - virtual void setRecoveryUnit(RecoveryUnit* unit) { + virtual void setRecoveryUnit(RecoveryUnit* unit) override { _recoveryUnit.reset(unit); } - virtual Locker* lockState() const { + virtual Locker* lockState() const override { return _locker.get(); } virtual ProgressMeter* setMessage(const char * msg, const std::string &name, unsigned long long progressMeterTotal, - int secondsBetween) { + int secondsBetween) override { return &_pm; } - virtual void checkForInterrupt() const { } - virtual Status checkForInterruptNoAssert() const { + virtual void checkForInterrupt() const override { } + virtual Status checkForInterruptNoAssert() const override { return Status::OK(); } - virtual bool isPrimaryFor( StringData ns ) { + virtual bool isPrimaryFor( StringData ns ) override { return true; } - virtual std::string getNS() const { + virtual std::string getNS() const override { return std::string(); }; - virtual unsigned int getOpID() const { + virtual unsigned int getOpID() const override { return 0; } - void setReplicatedWrites(bool writesAreReplicated = true) {} + void setReplicatedWrites(bool writesAreReplicated = true) override {} - bool writesAreReplicated() const { + bool writesAreReplicated() const override { return false; } + virtual uint64_t getRemainingMaxTimeMicros() const override { + return 0; + } + private: std::auto_ptr<RecoveryUnit> _recoveryUnit; boost::scoped_ptr<Locker> _locker; diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 6043b457f46..76fafe780b3 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -134,6 +134,7 @@ env.Library( ], LIBDEPS=[ "$BUILD_DIR/mongo/bson/bson", + "$BUILD_DIR/mongo/db/repl/read_after_optime_args", ], ) diff --git a/src/mongo/db/query/lite_parsed_query.cpp b/src/mongo/db/query/lite_parsed_query.cpp index 6d0bf7aabec..f186cce4a80 100644 --- a/src/mongo/db/query/lite_parsed_query.cpp +++ b/src/mongo/db/query/lite_parsed_query.cpp @@ -31,6 +31,7 @@ #include <cmath> #include "mongo/db/dbmessage.h" +#include "mongo/db/repl/read_after_optime_args.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" @@ -327,6 +328,11 @@ namespace mongo { << "You may need to update your shell or driver."); } } + else if (mongoutils::str::equals(fieldName, + repl::ReadAfterOpTimeArgs::kRootFieldName.c_str())) { + // read after optime parsing is handled elsewhere. + continue; + } else { mongoutils::str::stream ss; ss << "Failed to parse: " << cmdObj.toString() << ". " diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index fbb37662882..8eb73d7f8ba 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -246,11 +246,21 @@ env.Library('replmocks', 'replication_executor', ]) +env.Library('read_after_optime_args', + [ + 'read_after_optime_args.cpp' + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base/base', + '$BUILD_DIR/mongo/bson/util/bson_extract', + ]) + env.Library('replica_set_messages', [ 'handshake_args.cpp', 'is_master_response.cpp', 'member_config.cpp', + 'read_after_optime_response.cpp', 'repl_set_declare_election_winner_args.cpp', 'repl_set_heartbeat_args.cpp', 'repl_set_heartbeat_args_v1.cpp', @@ -264,6 +274,7 @@ env.Library('replica_set_messages', 'last_vote.cpp', ], LIBDEPS=[ + 'read_after_optime_args', '$BUILD_DIR/mongo/bson/bson', '$BUILD_DIR/mongo/bson/util/bson_extract', '$BUILD_DIR/mongo/util/net/hostandport', @@ -449,3 +460,20 @@ env.CppUnitTest( 'task_runner_test_fixture', ], ) + +env.CppUnitTest( + target='read_after_optime_args_test', + source=[ + 'read_after_optime_args_test.cpp', + ], + LIBDEPS=['replica_set_messages'] +) + +env.CppUnitTest( + target='read_after_optime_response_test', + source=[ + 'read_after_optime_response_test.cpp', + ], + LIBDEPS=['replica_set_messages'] +) + diff --git a/src/mongo/db/repl/operation_context_repl_mock.cpp b/src/mongo/db/repl/operation_context_repl_mock.cpp index b5c44316261..e1730eacf1c 100644 --- a/src/mongo/db/repl/operation_context_repl_mock.cpp +++ b/src/mongo/db/repl/operation_context_repl_mock.cpp @@ -36,10 +36,50 @@ namespace mongo { namespace repl { - OperationContextReplMock::OperationContextReplMock() - : _lockState(new MMAPV1LockerImpl()) { } + OperationContextReplMock::OperationContextReplMock(): + _lockState(new MMAPV1LockerImpl()), + _opID(0), + _checkForInterruptStatus(Status::OK()), + _maxTimeMicrosRemaining(0) { + } OperationContextReplMock::~OperationContextReplMock() {} + Locker* OperationContextReplMock::lockState() const { + return _lockState.get(); + } + + unsigned int OperationContextReplMock::getOpID() const { + return _opID; + } + + void OperationContextReplMock::setOpID(unsigned int opID) { + _opID = opID; + } + + void OperationContextReplMock::checkForInterrupt() const { + uassertStatusOK(checkForInterruptNoAssert()); + } + + Status OperationContextReplMock::checkForInterruptNoAssert() const { + if (!_checkForInterruptStatus.isOK()) { + return _checkForInterruptStatus; + } + + return Status::OK(); + } + + void OperationContextReplMock::setCheckForInterruptStatus(Status status) { + _checkForInterruptStatus = std::move(status); + } + + uint64_t OperationContextReplMock::getRemainingMaxTimeMicros() const { + return _maxTimeMicrosRemaining; + } + + void OperationContextReplMock::setRemainingMaxTimeMicros(uint64_t micros) { + _maxTimeMicrosRemaining = micros; + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/operation_context_repl_mock.h b/src/mongo/db/repl/operation_context_repl_mock.h index 660fbdfc5db..fb53dff2e3c 100644 --- a/src/mongo/db/repl/operation_context_repl_mock.h +++ b/src/mongo/db/repl/operation_context_repl_mock.h @@ -40,16 +40,36 @@ namespace repl { /** * Mock implementation of OperationContext that can be used with real instances of LockManager. + * Note this is not thread safe and the setter methods should only be called in the context + * where access to this object is guaranteed to be serialized. */ class OperationContextReplMock : public OperationContextNoop { public: OperationContextReplMock(); virtual ~OperationContextReplMock(); - virtual Locker* lockState() const { return _lockState.get(); } + virtual Locker* lockState() const override; + + virtual unsigned int getOpID() const override; + + void setOpID(unsigned int opID); + + virtual void checkForInterrupt() const override; + + virtual Status checkForInterruptNoAssert() const override; + + void setCheckForInterruptStatus(Status status); + + virtual uint64_t getRemainingMaxTimeMicros() const override; + + void setRemainingMaxTimeMicros(uint64_t micros); private: boost::scoped_ptr<Locker> _lockState; + unsigned int _opID; + + Status _checkForInterruptStatus; + uint64_t _maxTimeMicrosRemaining; }; } // namespace repl diff --git a/src/mongo/db/repl/read_after_optime_args.cpp b/src/mongo/db/repl/read_after_optime_args.cpp new file mode 100644 index 00000000000..b17c24f23fa --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_args.cpp @@ -0,0 +1,134 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/read_after_optime_args.h" + +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/jsobj.h" +#include "mongo/util/mongoutils/str.h" + +using std::string; + +namespace mongo { +namespace repl { + + const string ReadAfterOpTimeArgs::kRootFieldName("after"); + const string ReadAfterOpTimeArgs::kOpTimeFieldName("opTime"); + const string ReadAfterOpTimeArgs::kOpTimestampFieldName("ts"); + const string ReadAfterOpTimeArgs::kOpTermFieldName("term"); + const string ReadAfterOpTimeArgs::kTimeoutFieldName("timeoutMS"); + + ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(): ReadAfterOpTimeArgs(OpTime(), Milliseconds(0)) { + } + + ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(OpTime opTime, Milliseconds timeout): + _opTime(std::move(opTime)), + _timeout(std::move(timeout)) { + } + + const OpTime& ReadAfterOpTimeArgs::getOpTime() const { + return _opTime; + } + + const Milliseconds& ReadAfterOpTimeArgs::getTimeout() const { + return _timeout; + } + + Status ReadAfterOpTimeArgs::initialize(const BSONObj& cmdObj) { + auto afterElem = cmdObj[ReadAfterOpTimeArgs::kRootFieldName]; + + if (afterElem.eoo()) { + return Status::OK(); + } + + if (!afterElem.isABSONObj()) { + return Status(ErrorCodes::FailedToParse, "'after' field should be an object"); + } + + BSONObj readAfterObj = afterElem.Obj(); + BSONElement opTimeElem; + auto opTimeStatus = bsonExtractTypedField(readAfterObj, + ReadAfterOpTimeArgs::kOpTimeFieldName, + Object, + &opTimeElem); + + if (!opTimeStatus.isOK()) { + return opTimeStatus; + } + + BSONObj opTimeObj = opTimeElem.Obj(); + BSONElement timestampElem; + + Timestamp timestamp; + auto timestampStatus = bsonExtractTimestampField(opTimeObj, + ReadAfterOpTimeArgs::kOpTimestampFieldName, + ×tamp); + + if (!timestampStatus.isOK()) { + return timestampStatus; + } + + long long termNumber; + auto termStatus = bsonExtractIntegerField(opTimeObj, + ReadAfterOpTimeArgs::kOpTermFieldName, + &termNumber); + + if (!termStatus.isOK()) { + return termStatus; + } + + long long timeoutMS; + auto timeoutStatus = bsonExtractIntegerFieldWithDefault( + readAfterObj, + ReadAfterOpTimeArgs::kTimeoutFieldName, + 0, // Default to no timeout. + &timeoutMS); + + if (!timeoutStatus.isOK()) { + return timeoutStatus; + } + + if (timeoutMS < 0) { + return Status(ErrorCodes::BadValue, + str::stream() << ReadAfterOpTimeArgs::kRootFieldName + << "." << ReadAfterOpTimeArgs::kTimeoutFieldName + << " value must be positive"); + } + + _opTime = OpTime(timestamp, termNumber); + _timeout = Milliseconds(timeoutMS); // Note: 'long long' -> 'long' down casting. + + return Status::OK(); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/read_after_optime_args.h b/src/mongo/db/repl/read_after_optime_args.h new file mode 100644 index 00000000000..a8703ad52f7 --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_args.h @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/base/status.h" +#include "mongo/db/repl/optime.h" +#include "mongo/util/time_support.h" + +namespace mongo { + + class BSONObj; + +namespace repl { + + class ReadAfterOpTimeArgs { + public: + + static const std::string kRootFieldName; + static const std::string kOpTimeFieldName; + static const std::string kOpTimestampFieldName; + static const std::string kOpTermFieldName; + static const std::string kTimeoutFieldName; + + ReadAfterOpTimeArgs(); + ReadAfterOpTimeArgs(OpTime opTime, Milliseconds timeout); + + /** + * Format: + * { + * find: “coll”, + * filter: <Query Object>, + * after: { // optional + * opTime: { ts: <timestamp>, term: <NumberLong> }, + * timeoutMS: <NumberLong> //optional + * } + * } + */ + Status initialize(const BSONObj& cmdObj); + + const OpTime& getOpTime() const; + const Milliseconds& getTimeout() const; + + private: + + OpTime _opTime; + Milliseconds _timeout; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/read_after_optime_args_test.cpp b/src/mongo/db/repl/read_after_optime_args_test.cpp new file mode 100644 index 00000000000..d780500d9c2 --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_args_test.cpp @@ -0,0 +1,178 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/read_after_optime_args.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace repl { +namespace { + + TEST(ReadAfterParse, BasicFullSpecification) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(20, 30) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + + ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp()); + ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm()); + ASSERT_EQ(100, readAfterOpTime.getTimeout().total_milliseconds()); + } + + TEST(ReadAfterParse, Empty) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_OK(readAfterOpTime.initialize(BSON("find" << "test"))); + + ASSERT(readAfterOpTime.getOpTime().getTimestamp().isNull()); + ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds()); + } + + TEST(ReadAfterParse, BadRootType) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName << "x"))); + } + + TEST(ReadAfterParse, BadOpTimeType) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName << 2)))); + } + + TEST(ReadAfterParse, OpTimeRequiredIfRootPresent) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + } + + TEST(ReadAfterParse, NoOpTimeTS) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + } + + TEST(ReadAfterParse, NoOpTimeTerm) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + } + + TEST(ReadAfterParse, BadOpTimeTSType) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << BSON("x" << 1) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + } + + TEST(ReadAfterParse, BadOpTimeTermType) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0) + << ReadAfterOpTimeArgs::kOpTermFieldName << "y") + << ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + } + + TEST(ReadAfterParse, TimeoutDefault) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2))))); + + ASSERT_EQ(Timestamp(1, 0), readAfterOpTime.getOpTime().getTimestamp()); + ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm()); + ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds()); + } + + TEST(ReadAfterParse, BadTimeoutType) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << "abc")))); + } + + TEST(ReadAfterParse, NegativeTimeout) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << -100)))); + } + + TEST(ReadAfterParse, ZeroTimeout) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(20, 30) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << 0)))); + + ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp()); + ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm()); + ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds()); + } + +} // unnamed namespace +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/read_after_optime_response.cpp b/src/mongo/db/repl/read_after_optime_response.cpp new file mode 100644 index 00000000000..7caffe09a96 --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_response.cpp @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/read_after_optime_response.h" + +#include "mongo/bson/bsonobjbuilder.h" + +using std::string; + +namespace mongo { +namespace repl { + + const string ReadAfterOpTimeResponse::kWaitedMSFieldName("waitedMS"); + + ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status): + ReadAfterOpTimeResponse(status, boost::posix_time::milliseconds(0), false) { + } + + ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(): + ReadAfterOpTimeResponse(Status::OK()) { + } + + ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status, + boost::posix_time::milliseconds duration): + ReadAfterOpTimeResponse(status, duration, true) { + } + + ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status, + boost::posix_time::milliseconds duration, + bool waited): + _waited(waited), + _duration(duration), + _status(status) { + } + + void ReadAfterOpTimeResponse::appendInfo(BSONObjBuilder* builder) { + if (!_waited) { + return; + } + + builder->append(kWaitedMSFieldName, + static_cast<long long>(_duration.total_milliseconds())); + } + + bool ReadAfterOpTimeResponse::didWait() const { + return _waited; + } + + boost::posix_time::milliseconds ReadAfterOpTimeResponse::getDuration() const { + return _duration; + } + + Status ReadAfterOpTimeResponse::getStatus() const { + return _status; + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/read_after_optime_response.h b/src/mongo/db/repl/read_after_optime_response.h new file mode 100644 index 00000000000..b906dc196d4 --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_response.h @@ -0,0 +1,91 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/date_time/posix_time/posix_time_types.hpp> +#include <string> + +#include "mongo/base/status.h" + +namespace mongo { + + class BSONObjBuilder; + +namespace repl { + + class ReadAfterOpTimeResponse { + public: + static const std::string kWaitedMSFieldName; + + /** + * Constructs a default response that has OK status, and wait is false. + */ + ReadAfterOpTimeResponse(); + + /** + * Constructs a response with the given status with wait equals to false. + */ + explicit ReadAfterOpTimeResponse(Status status); + + /** + * Constructs a response with wait set to true along with the given parameters. + */ + ReadAfterOpTimeResponse(Status status, boost::posix_time::milliseconds duration); + + /** + * Appends to the builder the timeout and duration info if didWait() is true. + * Note: does not include status. + */ + void appendInfo(BSONObjBuilder* builder); + + bool didWait() const; + + /** + * Returns the amount of duration waiting for opTime to pass. + * Valid only if didWait is true. + */ + boost::posix_time::milliseconds getDuration() const; + + /** + * Returns more details about an error if it occurred. + */ + Status getStatus() const; + + private: + ReadAfterOpTimeResponse(Status status, + boost::posix_time::milliseconds duration, + bool waited); + + bool _waited; + boost::posix_time::milliseconds _duration; + Status _status; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/read_after_optime_response_test.cpp b/src/mongo/db/repl/read_after_optime_response_test.cpp new file mode 100644 index 00000000000..a30824a57b8 --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_response_test.cpp @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/read_after_optime_response.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace repl { +namespace { + + TEST(ReadAfterResponse, Default) { + ReadAfterOpTimeResponse response; + + ASSERT_FALSE(response.didWait()); + + BSONObjBuilder builder; + response.appendInfo(&builder); + + BSONObj obj(builder.done()); + ASSERT_TRUE(obj.isEmpty()); + } + + TEST(ReadAfterResponse, WithStatus) { + ReadAfterOpTimeResponse response(Status(ErrorCodes::InternalError, "test")); + + ASSERT_FALSE(response.didWait()); + + ASSERT_EQ(ErrorCodes::InternalError, response.getStatus().code()); + + BSONObjBuilder builder; + response.appendInfo(&builder); + + BSONObj obj(builder.done()); + ASSERT_TRUE(obj.isEmpty()); + } + + TEST(ReadAfterResponse, WaitedWithDuration) { + ReadAfterOpTimeResponse response(Status(ErrorCodes::InternalError, "test"), + boost::posix_time::milliseconds(7)); + + ASSERT_TRUE(response.didWait()); + ASSERT_EQUALS(7, response.getDuration().total_milliseconds()); + ASSERT_EQ(ErrorCodes::InternalError, response.getStatus().code()); + + BSONObjBuilder builder; + response.appendInfo(&builder); + + BSONObj obj(builder.done()); + auto waitedMSElem = obj[ReadAfterOpTimeResponse::kWaitedMSFieldName]; + ASSERT_TRUE(waitedMSElem.isNumber()); + ASSERT_EQ(7, waitedMSElem.numberLong()); + } + +} // unnamed namespace +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 660c7b47407..9201f4af326 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -55,6 +55,8 @@ namespace repl { class IsMasterResponse; class OplogReader; class OpTime; + class ReadAfterOpTimeArgs; + class ReadAfterOpTimeResponse; class ReplSetDeclareElectionWinnerArgs; class ReplSetDeclareElectionWinnerResponse; class ReplSetHeartbeatArgs; @@ -288,6 +290,23 @@ namespace repl { virtual Timestamp getMyLastOptime() const = 0; /** + * Waits until the optime of the current node is at least the opTime specified in + * 'settings'. + * + * The returned ReadAfterOpTimeResponse object's didWait() method returns true if + * an attempt was made to wait for the specified opTime. Cases when this can be + * false could include: + * + * 1. No read after opTime was specified. + * 2. Attempting to do read after opTime when node is not a replica set member. + * + * Note: getDuration() on the returned ReadAfterOpTimeResponse will only be valid if + * its didWait() method returns true. + */ + virtual ReadAfterOpTimeResponse waitUntilOpTime(const OperationContext* txn, + const ReadAfterOpTimeArgs& settings) = 0; + + /** * Retrieves and returns the current election id, which is a unique id that is local to * this node and changes every time we become primary. * TODO(spencer): Use term instead. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 246c973dc5e..a83b37b9b76 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -34,6 +34,7 @@ #include <algorithm> #include <boost/thread.hpp> +#include <limits> #include "mongo/base/status.h" #include "mongo/db/concurrency/d_concurrency.h" @@ -45,6 +46,8 @@ #include "mongo/db/repl/freshness_checker.h" #include "mongo/db/repl/handshake_args.h" #include "mongo/db/repl/is_master_response.h" +#include "mongo/db/repl/read_after_optime_args.h" +#include "mongo/db/repl/read_after_optime_response.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_declare_election_winner_args.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" @@ -736,10 +739,19 @@ namespace { if (getReplicationMode() != modeReplSet) { return; } + + for (auto& opTimeWaiter : _opTimeWaiterList) { + if (*(opTimeWaiter->opTime) <= ts) { + opTimeWaiter->condVar->notify_all(); + } + } + if (_getMemberState_inlock().primary()) { return; } + lock->unlock(); + _externalState->forwardSlaveProgress(); // Must do this outside _mutex } @@ -758,6 +770,85 @@ namespace { return _getMyLastOptime_inlock(); } + ReadAfterOpTimeResponse ReplicationCoordinatorImpl::waitUntilOpTime( + const OperationContext* txn, + const ReadAfterOpTimeArgs& settings) { + // TODO: SERVER-18217 use OpTime directly. + const auto& ts = settings.getOpTime().getTimestamp(); + const auto& timeout = settings.getTimeout(); + + if (ts.isNull()) { + return ReadAfterOpTimeResponse(); + } + + if (getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { + return ReadAfterOpTimeResponse(Status(ErrorCodes::NotAReplicaSet, + "node needs to be a replica set member to use read after opTime")); + } + + // TODO: SERVER-18298 enable code once V1 protocol is fully implemented. +#if 0 + if (!isV1ElectionProtocol()) { + return ReadAfterOpTimeResponse(Status(ErrorCodes::IncompatibleElectionProtocol, + "node needs to be running on v1 election protocol to " + "use read after opTime")); + } +#endif + + Timer timer; + boost::unique_lock<boost::mutex> lock(_mutex); + + while (ts > _getMyLastOptime_inlock()) { + Status interruptedStatus = txn->checkForInterruptNoAssert(); + if (!interruptedStatus.isOK()) { + return ReadAfterOpTimeResponse(interruptedStatus, + Milliseconds(timer.millis())); + } + + if (_inShutdown) { + return ReadAfterOpTimeResponse( + Status(ErrorCodes::ShutdownInProgress, "shutting down"), + Milliseconds(timer.millis())); + } + + const auto elapsedMS = timer.millis(); + if (timeout.total_milliseconds() > 0 && + elapsedMS > timeout.total_milliseconds()) { + return ReadAfterOpTimeResponse( + Status(ErrorCodes::ReadAfterOptimeTimeout, + str::stream() << "timed out waiting for opTime: " + << ts.toStringPretty()), + Milliseconds(timer.millis())); + } + + boost::condition_variable condVar; + WaiterInfo waitInfo(&_opTimeWaiterList, + txn->getOpID(), + &ts, + nullptr, // Don't care about write concern. + &condVar); + + uint64_t maxTimeMicrosRemaining = txn->getRemainingMaxTimeMicros(); + auto maxTimeMSRemaining = (maxTimeMicrosRemaining == 0) ? + std::numeric_limits<uint64_t>::max() : (maxTimeMicrosRemaining / 1000); + + auto timeoutMSRemaining = (timeout.total_milliseconds() == 0) ? + std::numeric_limits<uint64_t>::max() : + static_cast<uint64_t>(timeout.total_milliseconds() - elapsedMS); + + auto sleepTimeMS = std::min(maxTimeMSRemaining, timeoutMSRemaining); + + if (sleepTimeMS == std::numeric_limits<uint64_t>::max()) { + condVar.wait(lock); + } + else { + condVar.timed_wait(lock, Milliseconds(sleepTimeMS)); + } + } + + return ReadAfterOpTimeResponse(Status::OK(), Milliseconds(timer.millis())); + } + Timestamp ReplicationCoordinatorImpl::_getMyLastOptime_inlock() const { return _slaveInfo[_getMyIndexInSlaveInfo_inlock()].opTime; } @@ -847,6 +938,13 @@ namespace { } } + for (auto& opTimeWaiter : _opTimeWaiterList) { + if (opTimeWaiter->opID == opId) { + opTimeWaiter->condVar->notify_all(); + return; + } + } + _replExecutor.scheduleWork( stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback, this, @@ -861,6 +959,10 @@ namespace { info->condVar->notify_all(); } + for (auto& opTimeWaiter : _opTimeWaiterList) { + opTimeWaiter->condVar->notify_all(); + } + _replExecutor.scheduleWork( stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback, this, diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 83c80b73bb7..16223c17e86 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -91,21 +91,21 @@ namespace repl { // ================== Members of public ReplicationCoordinator API =================== - virtual void startReplication(OperationContext* txn); + virtual void startReplication(OperationContext* txn) override; - virtual void shutdown(); + virtual void shutdown() override; - virtual const ReplSettings& getSettings() const; + virtual const ReplSettings& getSettings() const override; - virtual Mode getReplicationMode() const; + virtual Mode getReplicationMode() const override; - virtual MemberState getMemberState() const; + virtual MemberState getMemberState() const override; - virtual bool isInPrimaryOrSecondaryState() const; + virtual bool isInPrimaryOrSecondaryState() const override; - virtual Seconds getSlaveDelaySecs() const; + virtual Seconds getSlaveDelaySecs() const override; - virtual void clearSyncSourceBlacklist(); + virtual void clearSyncSourceBlacklist() override; /* * Implementation of the KillOpListenerInterface interrupt method so that we can wake up @@ -156,107 +156,112 @@ namespace repl { virtual Timestamp getMyLastOptime() const; - virtual OpTime getMyLastOptimeV1() const; + virtual OpTime getMyLastOptimeV1() const override; - virtual OID getElectionId(); + virtual ReadAfterOpTimeResponse waitUntilOpTime( + const OperationContext* txn, + const ReadAfterOpTimeArgs& settings) override; + + virtual OID getElectionId() override; - virtual OID getMyRID() const; + virtual OID getMyRID() const override; - virtual int getMyId() const; + virtual int getMyId() const override; - virtual bool setFollowerMode(const MemberState& newState); + virtual bool setFollowerMode(const MemberState& newState) override; - virtual bool isWaitingForApplierToDrain(); + virtual bool isWaitingForApplierToDrain() override; - virtual void signalDrainComplete(OperationContext* txn); + virtual void signalDrainComplete(OperationContext* txn) override; - virtual void signalUpstreamUpdater(); + virtual void signalUpstreamUpdater() override; - virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder); + virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) override; - virtual Status processReplSetGetStatus(BSONObjBuilder* result); + virtual Status processReplSetGetStatus(BSONObjBuilder* result) override; - virtual void fillIsMasterForReplSet(IsMasterResponse* result); + virtual void fillIsMasterForReplSet(IsMasterResponse* result) override; - virtual void appendSlaveInfoData(BSONObjBuilder* result); + virtual void appendSlaveInfoData(BSONObjBuilder* result) override; - virtual ReplicaSetConfig getConfig() const; + virtual ReplicaSetConfig getConfig() const override; - virtual void processReplSetGetConfig(BSONObjBuilder* result); + virtual void processReplSetGetConfig(BSONObjBuilder* result) override; - virtual Status setMaintenanceMode(bool activate); + virtual Status setMaintenanceMode(bool activate) override; - virtual bool getMaintenanceMode(); + virtual bool getMaintenanceMode() override; virtual Status processReplSetSyncFrom(const HostAndPort& target, - BSONObjBuilder* resultObj); + BSONObjBuilder* resultObj) override; - virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj); + virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override; virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args, - ReplSetHeartbeatResponse* response); + ReplSetHeartbeatResponse* response) override; virtual Status processReplSetReconfig(OperationContext* txn, const ReplSetReconfigArgs& args, - BSONObjBuilder* resultObj); + BSONObjBuilder* resultObj) override; virtual Status processReplSetInitiate(OperationContext* txn, const BSONObj& configObj, - BSONObjBuilder* resultObj); + BSONObjBuilder* resultObj) override; - virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj); + virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) override; - virtual void incrementRollbackID(); + virtual void incrementRollbackID() override; virtual Status processReplSetFresh(const ReplSetFreshArgs& args, - BSONObjBuilder* resultObj); + BSONObjBuilder* resultObj) override; virtual Status processReplSetElect(const ReplSetElectArgs& args, - BSONObjBuilder* response); + BSONObjBuilder* response) override; virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates, - long long* configVersion); + long long* configVersion) override; - virtual Status processHandshake(OperationContext* txn, const HandshakeArgs& handshake); + virtual Status processHandshake(OperationContext* txn, + const HandshakeArgs& handshake) override; - virtual bool buildsIndexes(); + virtual bool buildsIndexes() override; - virtual std::vector<HostAndPort> getHostsWrittenTo(const Timestamp& op); + virtual std::vector<HostAndPort> getHostsWrittenTo(const Timestamp& op) override; - virtual std::vector<HostAndPort> getOtherNodesInReplSet() const; + virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override; - virtual WriteConcernOptions getGetLastErrorDefault(); + virtual WriteConcernOptions getGetLastErrorDefault() override; - virtual Status checkReplEnabledForCommand(BSONObjBuilder* result); + virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override; - virtual bool isReplEnabled() const; + virtual bool isReplEnabled() const override; - virtual HostAndPort chooseNewSyncSource(); + virtual HostAndPort chooseNewSyncSource() override; - virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); + virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override; - virtual void resetLastOpTimeFromOplog(OperationContext* txn); + virtual void resetLastOpTimeFromOplog(OperationContext* txn) override; - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource); + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) override; - virtual Timestamp getLastCommittedOpTime() const; + virtual Timestamp getLastCommittedOpTime() const override; virtual Status processReplSetRequestVotes(OperationContext* txn, const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response); + ReplSetRequestVotesResponse* response) override; virtual Status processReplSetDeclareElectionWinner( const ReplSetDeclareElectionWinnerArgs& args, - long long* responseTerm); + long long* responseTerm) override; virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder); virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, - ReplSetHeartbeatResponseV1* response); + ReplSetHeartbeatResponseV1* response) override; - virtual bool isV1ElectionProtocol(); + virtual bool isV1ElectionProtocol() override; - virtual void summarizeAsHtml(ReplSetHtmlSummary* s); + virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override; // ================== Test support API =================== @@ -880,6 +885,10 @@ namespace repl { // WaiterInfos. std::vector<WaiterInfo*> _replicationWaiterList; // (M) + // list of information about clients waiting for a particular opTime. + // Does *not* own the WaiterInfos. + std::vector<WaiterInfo*> _opTimeWaiterList; // (M) + // Set to true when we are in the process of shutting down replication. bool _inShutdown; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index f486fed8694..b33b87b1a83 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -32,6 +32,7 @@ #include <boost/scoped_ptr.hpp> #include <boost/thread.hpp> +#include <future> #include <iostream> #include <memory> #include <set> @@ -42,6 +43,9 @@ #include "mongo/db/repl/is_master_response.h" #include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/operation_context_repl_mock.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/read_after_optime_args.h" +#include "mongo/db/repl/read_after_optime_response.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replica_set_config.h" @@ -57,12 +61,15 @@ #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" +#include "mongo/util/time_support.h" +#include "mongo/util/timer.h" namespace mongo { namespace repl { namespace { typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; + Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted"); TEST_F(ReplCoordTest, StartupWithValidLocalConfig) { assertStartSuccess( @@ -324,7 +331,7 @@ namespace { ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); BSONObjBuilder result1; - getExternalState()->setStoreLocalConfigDocumentStatus(Status(ErrorCodes::OutOfDiskSpace, + getExternalState()->setStoreLocalConfigDocumentStatus(Status(ErrorCodes::OutOfDiskSpace, "The test set this")); ASSERT_EQUALS( ErrorCodes::OutOfDiskSpace, @@ -840,50 +847,9 @@ namespace { awaiter.reset(); } - class OperationContextNoopWithInterrupt : public OperationContextReplMock { - public: - - OperationContextNoopWithInterrupt() : _opID(0), _interruptOp(false) {} - - virtual unsigned int getOpID() const { - return _opID; - } - - /** - * Can only be called before any multi-threaded access to this object has begun. - */ - void setOpID(unsigned int opID) { - _opID = opID; - } - - virtual void checkForInterrupt() const { - if (_interruptOp) { - uasserted(ErrorCodes::Interrupted, "operation was interrupted"); - } - } - - virtual Status checkForInterruptNoAssert() const { - if (_interruptOp) { - return Status(ErrorCodes::Interrupted, "operation was interrupted"); - } - return Status::OK(); - } - - /** - * Can only be called before any multi-threaded access to this object has begun. - */ - void setInterruptOp(bool interrupt) { - _interruptOp = interrupt; - } - - private: - unsigned int _opID; - bool _interruptOp; - }; - TEST_F(ReplCoordTest, AwaitReplicationInterrupt) { // Tests that a thread blocked in awaitReplication can be killed by a killOp operation - OperationContextNoopWithInterrupt txn; + OperationContextReplMock txn; assertStartSuccess( BSON("_id" << "mySet" << "version" << 2 << @@ -914,7 +880,7 @@ namespace { ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1)); ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time1)); - txn.setInterruptOp(true); + txn.setCheckForInterruptStatus(kInterruptedStatus); getReplCoord()->interrupt(opID); ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status); @@ -1202,7 +1168,7 @@ namespace { } TEST_F(StepDownTest, InterruptStepDown) { - OperationContextNoopWithInterrupt txn; + OperationContextReplMock txn; Timestamp optime1(100, 1); Timestamp optime2(100, 2); // No secondary is caught up @@ -1223,7 +1189,7 @@ namespace { unsigned int opID = 100; txn.setOpID(opID); - txn.setInterruptOp(true); + txn.setCheckForInterruptStatus(kInterruptedStatus); getReplCoord()->interrupt(opID); ASSERT_EQUALS(ErrorCodes::Interrupted, runner.getResult()); @@ -1558,7 +1524,7 @@ namespace { IsMasterResponse roundTripped; ASSERT_OK(roundTripped.initialize(response.toBSON())); } - + TEST_F(ReplCoordTest, ShutDownBeforeStartUpFinished) { init(); startCapturingLogMessages(); @@ -1945,6 +1911,172 @@ namespace { ASSERT_EQUALS(newTime, getReplCoord()->getLastCommittedOpTime()); } + TEST_F(ReplCoordTest, CantUseReadAfterIfNotReplSet) { + init(ReplSettings()); + OperationContextNoop txn; + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1), + Milliseconds(0))); + + ASSERT_FALSE(result.didWait()); + ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterWhileShutdown) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(10, 0)); + + shutdown(); + + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1), + Milliseconds(0))); + + ASSERT_TRUE(result.didWait()); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterInterrupted) { + OperationContextReplMock txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(10, 0)); + + txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test")); + + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1), + Milliseconds(0))); + + ASSERT_TRUE(result.didWait()); + ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterNoOpTime) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs()); + + ASSERT_FALSE(result.didWait()); + ASSERT_OK(result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterGreaterOpTime) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(100, 0)); + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1), + Milliseconds(100))); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterEqualOpTime) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + + OpTime time(Timestamp(100, 0), 1); + getReplCoord()->setMyLastOptime(time.getTimestamp()); + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(time, Milliseconds(100))); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterDeferredGreaterOpTime) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(0, 0)); + + auto pseudoLogOp = std::async(std::launch::async, [this]() { + // Not guaranteed to be scheduled after waitUnitl blocks... + getReplCoord()->setMyLastOptime(Timestamp(200, 0)); + }); + + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(100, 0), 1), + Milliseconds(0))); + pseudoLogOp.get(); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterDeferredEqualOpTime) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(0, 0)); + + repl::OpTime opTimeToWait(Timestamp(100, 0), 1); + + auto pseudoLogOp = std::async(std::launch::async, [this, &opTimeToWait]() { + // Not guaranteed to be scheduled after waitUnitl blocks... + getReplCoord()->setMyLastOptime(opTimeToWait.getTimestamp()); + }); + + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(opTimeToWait, Milliseconds(0))); + pseudoLogOp.get(); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterOpTimeTimeoutNoMaxTimeMS) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(100, 0)); + + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(200, 0), 1), Milliseconds(10))); + + ASSERT_TRUE(result.didWait()); + ASSERT_EQUALS(ErrorCodes::ReadAfterOptimeTimeout, result.getStatus()); + } + // TODO(schwerin): Unit test election id updating } // namespace diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 9d7f28e62af..ca9f32605ed 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -32,6 +32,8 @@ #include "mongo/base/status.h" #include "mongo/db/write_concern_options.h" +#include "mongo/db/repl/read_after_optime_args.h" +#include "mongo/db/repl/read_after_optime_response.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/optime.h" #include "mongo/util/assert_util.h" @@ -145,6 +147,12 @@ namespace repl { return Timestamp(); } + ReadAfterOpTimeResponse ReplicationCoordinatorMock::waitUntilOpTime( + const OperationContext* txn, + const ReadAfterOpTimeArgs& settings) { + return ReadAfterOpTimeResponse(); + } + OID ReplicationCoordinatorMock::getElectionId() { // TODO diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 79cf56507c8..d4f8bd33754 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -103,6 +103,10 @@ namespace repl { virtual Timestamp getMyLastOptime() const; + virtual ReadAfterOpTimeResponse waitUntilOpTime( + const OperationContext* txn, + const ReadAfterOpTimeArgs& settings) override; + virtual OID getElectionId(); virtual OID getMyRID() const; |