diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2018-03-08 10:12:42 -0500 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2018-03-13 15:09:53 -0400 |
commit | 64e649a622b5ac4c1bfad0933132dc7d994b9458 (patch) | |
tree | ff907cca516228300489ed9351819a0bb9c8193b | |
parent | 5ce39d9dd292dda65d59dbb18bdc176ea2b528a0 (diff) | |
download | mongo-64e649a622b5ac4c1bfad0933132dc7d994b9458.tar.gz |
SERVER-31802 SERVER-31239 Remove master-slave replication and resync command.
37 files changed, 79 insertions, 2641 deletions
diff --git a/buildscripts/resmokeconfig/suites/jstestfuzz_replication_resync.yml b/buildscripts/resmokeconfig/suites/jstestfuzz_replication_resync.yml deleted file mode 100644 index 1cd73444983..00000000000 --- a/buildscripts/resmokeconfig/suites/jstestfuzz_replication_resync.yml +++ /dev/null @@ -1,38 +0,0 @@ -test_kind: js_test - -selector: - roots: - - jstestfuzz/out/*.js - -executor: - archive: - hooks: - - BackgroundInitialSync - config: - shell_options: - readMode: commands - global_vars: - TestData: - ignoreCommandsIncompatibleWithInitialSync: true - hooks: - - class: CheckPrimary - - class: BackgroundInitialSync - use_resync: true - n: 1 - shell_options: - global_vars: - TestData: - skipValidationOnInvalidViewDefinitions: true - fixture: - class: ReplicaSetFixture - mongod_options: - verbose: '' - set_parameters: - disableLogicalSessionCacheRefresh: false - enableTestCommands: 1 - logComponentVerbosity: - replication: 2 - numInitialSyncAttempts: 1 - writePeriodicNoops: 1 - num_nodes: 2 - start_initial_sync_node: True diff --git a/buildscripts/resmokeconfig/suites/replica_sets_resync_static_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_resync_static_jscore_passthrough.yml deleted file mode 100644 index 0aa9d04ddc8..00000000000 --- a/buildscripts/resmokeconfig/suites/replica_sets_resync_static_jscore_passthrough.yml +++ /dev/null @@ -1,44 +0,0 @@ -test_kind: js_test - -selector: - roots: - - jstests/core/**/*.js - exclude_files: - # These tests are not expected to pass with replica-sets: - - jstests/core/dbadmin.js - - jstests/core/opcounters_write_cmd.js - - jstests/core/read_after_optime.js - - jstests/core/capped_update.js - # Having duplicate namespaces is not supported and will cause initial sync to fail. - - jstests/core/views/duplicate_ns.js - # Restarting the catalog on the sync source will cause the downstream node to copy only a subset - # of the required data. - - jstests/core/restart_catalog.js - -run_hook_interval: &run_hook_interval 20 -executor: - archive: - hooks: - - IntermediateInitialSync - config: - shell_options: - eval: "testingReplication = true;" - readMode: commands - hooks: - - class: CheckPrimary - - class: IntermediateInitialSync - use_resync: True - n: *run_hook_interval - - class: CleanEveryN - n: *run_hook_interval - fixture: - class: ReplicaSetFixture - mongod_options: - set_parameters: - enableTestCommands: 1 - numInitialSyncAttempts: 1 - num_nodes: 2 - start_initial_sync_node: True - replset_config_options: - settings: - chainingAllowed: False diff --git a/buildscripts/resmokelib/testing/hooks/initialsync.py b/buildscripts/resmokelib/testing/hooks/initialsync.py index f3a5311dab4..328c4ac182e 100644 --- a/buildscripts/resmokelib/testing/hooks/initialsync.py +++ b/buildscripts/resmokelib/testing/hooks/initialsync.py @@ -23,9 +23,7 @@ class BackgroundInitialSync(interface.Hook): validates it, tears it down, and restarts it. This test accepts a parameter 'n' that specifies a number of tests after which it will wait for - replication to finish before validating and restarting the initial sync node. It also accepts - a parameter 'use_resync' for whether to restart the initial sync node with resync or by - shutting it down and restarting it. + replication to finish before validating and restarting the initial sync node. This requires the ReplicaSetFixture to be started with 'start_initial_sync_node=True'. If used at the same time as CleanEveryN, the 'n' value passed to this hook should be equal to the 'n' @@ -34,7 +32,7 @@ class BackgroundInitialSync(interface.Hook): DEFAULT_N = cleanup.CleanEveryN.DEFAULT_N - def __init__(self, hook_logger, fixture, use_resync=False, n=DEFAULT_N, shell_options=None): + def __init__(self, hook_logger, fixture, n=DEFAULT_N, shell_options=None): if not isinstance(fixture, replicaset.ReplicaSetFixture): raise ValueError("`fixture` must be an instance of ReplicaSetFixture, not {}".format( fixture.__class__.__name__)) @@ -42,7 +40,6 @@ class BackgroundInitialSync(interface.Hook): description = "Background Initial Sync" interface.Hook.__init__(self, hook_logger, fixture, description) - self.use_resync = use_resync self.n = n self.tests_run = 0 self.random_restarts = 0 @@ -102,9 +99,8 @@ class BackgroundInitialSyncTestCase(jsfile.DynamicJSTestCase): # If we have not restarted initial sync since the last time we ran the data # validation, restart initial sync with a 20% probability. if self._hook.random_restarts < 1 and random.random() < 0.2: - hook_type = "resync" if self._hook.use_resync else "initial sync" - self.logger.info("randomly restarting " + hook_type + - " in the middle of " + hook_type) + self.logger.info( + "randomly restarting initial sync in the middle of initial sync") self.__restart_init_sync(sync_node, sync_node_conn) self._hook.random_restarts += 1 return @@ -123,35 +119,27 @@ class BackgroundInitialSyncTestCase(jsfile.DynamicJSTestCase): self.__restart_init_sync(sync_node, sync_node_conn) - # Restarts initial sync by shutting down the node, clearing its data, and restarting it, - # or by calling resync if use_resync is specified. + # Restarts initial sync by shutting down the node, clearing its data, and restarting it. def __restart_init_sync(self, sync_node, sync_node_conn): - if self._hook.use_resync: - self.logger.info("Calling resync on initial sync node...") - cmd = bson.SON([("resync", 1), ("wait", 0)]) - sync_node_conn.admin.command(cmd) - else: - # Tear down and restart the initial sync node to start initial sync again. - sync_node.teardown() + # Tear down and restart the initial sync node to start initial sync again. + sync_node.teardown() - self.logger.info("Starting the initial sync node back up again...") - sync_node.setup() - sync_node.await_ready() + self.logger.info("Starting the initial sync node back up again...") + sync_node.setup() + sync_node.await_ready() class IntermediateInitialSync(interface.Hook): """ This hook accepts a parameter 'n' that specifies a number of tests after which it will start up - a node to initial sync, wait for replication to finish, and then validate the data. It also - accepts a parameter 'use_resync' for whether to restart the initial sync node with resync or by - shutting it down and restarting it. + a node to initial sync, wait for replication to finish, and then validate the data. This requires the ReplicaSetFixture to be started with 'start_initial_sync_node=True'. """ DEFAULT_N = cleanup.CleanEveryN.DEFAULT_N - def __init__(self, hook_logger, fixture, use_resync=False, n=DEFAULT_N): + def __init__(self, hook_logger, fixture, n=DEFAULT_N): if not isinstance(fixture, replicaset.ReplicaSetFixture): raise ValueError("`fixture` must be an instance of ReplicaSetFixture, not {}".format( fixture.__class__.__name__)) @@ -159,7 +147,6 @@ class IntermediateInitialSync(interface.Hook): description = "Intermediate Initial Sync" interface.Hook.__init__(self, hook_logger, fixture, description) - self.use_resync = use_resync self.n = n self.tests_run = 0 @@ -195,16 +182,11 @@ class IntermediateInitialSyncTestCase(jsfile.DynamicJSTestCase): sync_node = self.fixture.get_initial_sync_node() sync_node_conn = sync_node.mongo_client() - if self._hook.use_resync: - self.logger.info("Calling resync on initial sync node...") - cmd = bson.SON([("resync", 1)]) - sync_node_conn.admin.command(cmd) - else: - sync_node.teardown() + sync_node.teardown() - self.logger.info("Starting the initial sync node back up again...") - sync_node.setup() - sync_node.await_ready() + self.logger.info("Starting the initial sync node back up again...") + sync_node.setup() + sync_node.await_ready() # Do initial sync round. self.logger.info("Waiting for initial sync node to go into SECONDARY state") diff --git a/etc/evergreen.yml b/etc/evergreen.yml index bad5cc033ea..96bf8b2adc3 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -739,7 +739,6 @@ functions: case "${task_name}" in replica_sets_initsync_jscore_passthrough \ |replica_sets_initsync_static_jscore_passthrough \ - |replica_sets_resync_static_jscore_passthrough \ |replica_sets* \ |sharding* \ ) @@ -3032,19 +3031,6 @@ tasks: <<: *jstestfuzz_config_vars resmoke_args: --suites=jstestfuzz_replication_initsync --storageEngine=wiredTiger -## jstestfuzz initial sync replica set ## -- <<: *jstestfuzz_template - name: jstestfuzz_replication_resync - commands: - - func: "do setup" - - func: "run jstestfuzz" - vars: - jstestfuzz_vars: --numGeneratedFiles 75 - - <<: *run_jstestfuzz_tests - vars: - <<: *jstestfuzz_config_vars - resmoke_args: --suites=jstestfuzz_replication_resync --storageEngine=wiredTiger - ## jstestfuzz replica set with logical session ## - <<: *jstestfuzz_template name: jstestfuzz_replication_session @@ -3544,17 +3530,6 @@ tasks: run_multiple_jobs: true - <<: *task_template - name: replica_sets_resync_static_jscore_passthrough - depends_on: - - name: jsCore - commands: - - func: "do setup" - - func: "run tests" - vars: - resmoke_args: --suites=replica_sets_resync_static_jscore_passthrough --storageEngine=wiredTiger - run_multiple_jobs: true - -- <<: *task_template name: replica_sets_kill_secondaries_jscore_passthrough depends_on: - name: jsCore @@ -4995,7 +4970,6 @@ buildvariants: - name: replica_sets_initsync_static_jscore_passthrough - name: replica_sets_jscore_passthrough - name: replica_sets_kill_secondaries_jscore_passthrough - - name: replica_sets_resync_static_jscore_passthrough - name: replica_sets_pv0 - name: retryable_writes_jscore_passthrough - name: retryable_writes_jscore_stepdown_passthrough @@ -5994,9 +5968,6 @@ buildvariants: - name: replica_sets_initsync_static_jscore_passthrough distros: - windows-64-vs2015-large - - name: replica_sets_resync_static_jscore_passthrough - distros: - - windows-64-vs2015-large - name: replica_sets_kill_secondaries_jscore_passthrough distros: - windows-64-vs2015-large @@ -6772,9 +6743,6 @@ buildvariants: - name: replica_sets_initsync_static_jscore_passthrough distros: - rhel62-large - - name: replica_sets_resync_static_jscore_passthrough - distros: - - rhel62-large - name: replica_sets_kill_secondaries_jscore_passthrough distros: - rhel62-large @@ -6920,7 +6888,6 @@ buildvariants: - name: replica_sets_jscore_passthrough - name: replica_sets_initsync_jscore_passthrough - name: replica_sets_initsync_static_jscore_passthrough - - name: replica_sets_resync_static_jscore_passthrough - name: replica_sets_kill_secondaries_jscore_passthrough - name: retryable_writes_jscore_passthrough - name: sasl @@ -8998,7 +8965,6 @@ buildvariants: - name: replica_sets_jscore_passthrough - name: replica_sets_initsync_jscore_passthrough - name: replica_sets_initsync_static_jscore_passthrough - - name: replica_sets_resync_static_jscore_passthrough - name: replica_sets_kill_secondaries_jscore_passthrough - name: retryable_writes_jscore_passthrough - name: retryable_writes_jscore_stepdown_passthrough @@ -9153,7 +9119,6 @@ buildvariants: - name: replica_sets_jscore_passthrough - name: replica_sets_initsync_jscore_passthrough - name: replica_sets_initsync_static_jscore_passthrough - - name: replica_sets_resync_static_jscore_passthrough - name: replica_sets_kill_secondaries_jscore_passthrough - name: retryable_writes_jscore_passthrough - name: retryable_writes_jscore_stepdown_passthrough @@ -9283,7 +9248,6 @@ buildvariants: - name: replica_sets_jscore_passthrough - name: replica_sets_initsync_jscore_passthrough - name: replica_sets_initsync_static_jscore_passthrough - - name: replica_sets_resync_static_jscore_passthrough - name: replica_sets_kill_secondaries_jscore_passthrough - name: sasl - name: sharded_collections_jscore_passthrough @@ -9403,7 +9367,6 @@ buildvariants: - name: jstestfuzz_concurrent_sharded_session - name: jstestfuzz_replication - name: jstestfuzz_replication_initsync - - name: jstestfuzz_replication_resync - name: jstestfuzz_replication_session - name: jstestfuzz_sharded - name: jstestfuzz_sharded_causal_consistency @@ -9433,9 +9396,6 @@ buildvariants: - name: replica_sets_jscore_passthrough distros: - rhel62-large - - name: replica_sets_resync_static_jscore_passthrough - distros: - - rhel62-large - name: rlp - name: rollback_fuzzer - name: serial_run @@ -9529,7 +9489,6 @@ buildvariants: - name: jstestfuzz_concurrent_sharded_session - name: jstestfuzz_replication - name: jstestfuzz_replication_initsync - - name: jstestfuzz_replication_resync - name: jstestfuzz_replication_session - name: jstestfuzz_sharded - name: jstestfuzz_sharded_causal_consistency @@ -9557,9 +9516,6 @@ buildvariants: - name: replica_sets_jscore_passthrough distros: - windows-64-vs2015-large - - name: replica_sets_resync_static_jscore_passthrough - distros: - - windows-64-vs2015-large - name: rollback_fuzzer - name: serial_run - name: sharded_collections_jscore_passthrough @@ -9632,7 +9588,6 @@ buildvariants: - name: jstestfuzz_concurrent_sharded_session - name: jstestfuzz_replication - name: jstestfuzz_replication_initsync - - name: jstestfuzz_replication_resync - name: jstestfuzz_replication_session - name: jstestfuzz_sharded - name: jstestfuzz_sharded_causal_consistency @@ -9650,7 +9605,6 @@ buildvariants: - name: replica_sets_initsync_jscore_passthrough - name: replica_sets_initsync_static_jscore_passthrough - name: replica_sets_jscore_passthrough - - name: replica_sets_resync_static_jscore_passthrough - name: rollback_fuzzer - name: serial_run - name: sharded_collections_jscore_passthrough diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js index ea375a214f6..f40213bbaef 100644 --- a/jstests/auth/lib/commands_lib.js +++ b/jstests/auth/lib/commands_lib.js @@ -5199,21 +5199,6 @@ var authCommandsLib = { ] }, { - testname: "resync", - command: {resync: 1}, - skipSharded: true, - testcases: [ - { - runOnDb: adminDbName, - roles: {hostManager: 1, clusterManager: 1, clusterAdmin: 1, root: 1, __system: 1}, - privileges: [{resource: {cluster: true}, actions: ["resync"]}], - expectFail: true - }, - {runOnDb: firstDbName, roles: {}}, - {runOnDb: secondDbName, roles: {}} - ] - }, - { testname: "serverStatus", command: {serverStatus: 1}, testcases: [ diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index e5f11d2c1b5..3dc22a53fac 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -443,7 +443,6 @@ replSetUpdatePosition: {skip: isUnrelated}, replSetResizeOplog: {skip: isUnrelated}, resetError: {skip: isUnrelated}, - resync: {skip: isUnrelated}, revokePrivilegesFromRole: { command: { revokePrivilegesFromRole: "testrole", diff --git a/jstests/replsets/initial_sync_document_validation.js b/jstests/replsets/initial_sync_document_validation.js index 1dd69e0e1d3..06d9388b51d 100644 --- a/jstests/replsets/initial_sync_document_validation.js +++ b/jstests/replsets/initial_sync_document_validation.js @@ -1,6 +1,5 @@ /** - * Tests that initial sync via resync command does not fail if it inserts documents - * which don't validate. + * Tests that initial sync does not fail if it inserts documents which don't validate. */ (function() { @@ -19,7 +18,7 @@ assert.writeOK(coll.insert({_id: 0, x: 1})); assert.commandWorked(coll.runCommand("collMod", {"validator": {a: {$exists: true}}})); - assert.commandWorked(secondary.getDB("admin").runCommand({resync: 1})); + secondary = replSet.restart(secondary, {startClean: true}); replSet.awaitReplication(); replSet.awaitSecondaryNodes(); diff --git a/jstests/replsets/initial_sync_fail_insert_once.js b/jstests/replsets/initial_sync_fail_insert_once.js index a565c64216d..d85f97f5a63 100644 --- a/jstests/replsets/initial_sync_fail_insert_once.js +++ b/jstests/replsets/initial_sync_fail_insert_once.js @@ -24,9 +24,8 @@ data: {collectionNS: coll.getFullName()} })); - jsTest.log("Issuing RESYNC command to " + tojson(secondary)); - assert.commandWorked(secondary.getDB("admin").runCommand({resync: 1})); - + jsTest.log("Re-syncing " + tojson(secondary)); + secondary = replSet.restart(secondary, {startClean: true}); replSet.awaitReplication(); replSet.awaitSecondaryNodes(); diff --git a/jstests/replsets/read_committed_no_snapshots.js b/jstests/replsets/read_committed_no_snapshots.js index 9adfdc37502..83ba451e836 100644 --- a/jstests/replsets/read_committed_no_snapshots.js +++ b/jstests/replsets/read_committed_no_snapshots.js @@ -27,7 +27,7 @@ load("jstests/replsets/rslib.js"); // For reconfig and startSetIfSupportsReadMa "members": [ {"_id": 0, "host": nodes[0]}, {"_id": 1, "host": nodes[1], priority: 0}, - {"_id": 2, "host": nodes[2], arbiterOnly: true} + {"_id": 2, "host": nodes[2], priority: 0} ], "protocolVersion": 1 }; @@ -36,12 +36,15 @@ load("jstests/replsets/rslib.js"); // For reconfig and startSetIfSupportsReadMa // Get connections and collection. var primary = replTest.getPrimary(); - var secondary = replTest.liveNodes.slaves[0]; - var secondaryId = replTest.getNodeId(secondary); - var db = primary.getDB(name); + var healthySecondary = replTest.liveNodes.slaves[0]; + healthySecondary.setSlaveOk(); + var noSnapshotSecondary = replTest.liveNodes.slaves[1]; + noSnapshotSecondary.setSlaveOk(); + assert.commandWorked(noSnapshotSecondary.adminCommand( + {configureFailPoint: 'disableSnapshotting', mode: 'alwaysOn'})); // Do a write, wait for it to replicate, and ensure it is visible. - var res = db.runCommandWithMetadata( // + var res = primary.getDB(name).runCommandWithMetadata( // { insert: "foo", documents: [{_id: 1, state: 0}], @@ -50,36 +53,32 @@ load("jstests/replsets/rslib.js"); // For reconfig and startSetIfSupportsReadMa {"$replData": 1}); assert.commandWorked(res.commandReply); - // We need to propagate the lastOpVisible from the primary as afterOpTime in the secondary to - // ensure - // we wait for the write to be in the majority committed view. + // We need to propagate the lastOpVisible from the primary as afterOpTime in the secondaries to + // ensure we wait for the write to be in the majority committed view. var lastOp = res.metadata["$replData"].lastOpVisible; - secondary.setSlaveOk(); // Timeout is based on heartbeat timeout. - assert.commandWorked(secondary.getDB(name).foo.runCommand( + assert.commandWorked(healthySecondary.getDB(name).foo.runCommand( 'find', {"readConcern": {"level": "majority", "afterOpTime": lastOp}, "maxTimeMS": 10 * 1000})); - // Disable snapshotting via failpoint - secondary.adminCommand({configureFailPoint: 'disableSnapshotting', mode: 'alwaysOn'}); - - // Resync to drop any existing snapshots - secondary.adminCommand({resync: 1}); - // Ensure maxTimeMS times out while waiting for this snapshot - assert.commandFailed(secondary.getDB(name).foo.runCommand( - 'find', {"readConcern": {"level": "majority"}, "maxTimeMS": 1000})); + assert.commandFailedWithCode( + noSnapshotSecondary.getDB(name).foo.runCommand( + 'find', {"readConcern": {"level": "majority"}, "maxTimeMS": 1000}), + ErrorCodes.ExceededTimeLimit); - // Reconfig to make the secondary the primary + // Reconfig to make the no-snapshot secondary the primary var config = primary.getDB("local").system.replset.findOne(); config.members[0].priority = 0; - config.members[1].priority = 3; + config.members[2].priority = 1; config.version++; primary = reconfig(replTest, config, true); // Ensure maxTimeMS times out while waiting for this snapshot - assert.commandFailed(primary.getSiblingDB(name).foo.runCommand( - 'find', {"readConcern": {"level": "majority"}, "maxTimeMS": 1000})); + assert.commandFailedWithCode( + primary.getSiblingDB(name).foo.runCommand( + 'find', {"readConcern": {"level": "majority"}, "maxTimeMS": 1000}), + ErrorCodes.ExceededTimeLimit); replTest.stopSet(); })(); diff --git a/jstests/replsets/resync.js b/jstests/replsets/resync.js deleted file mode 100644 index 8a6a35ee699..00000000000 --- a/jstests/replsets/resync.js +++ /dev/null @@ -1,119 +0,0 @@ -// test that the resync command works with replica sets and that one does not need to manually -// force a replica set resync by deleting all datafiles -// Also tests that you can do this from a node that is "too stale" -// -// This test requires persistence in order for a restarted node with a stale oplog to stay in the -// RECOVERING state. A restarted node with an ephemeral storage engine will not have an oplog upon -// restart, so will immediately resync. -// @tags: [requires_persistence] -(function() { - "use strict"; - - var replTest = new ReplSetTest({ - name: 'resync', - nodes: 3, - oplogSize: 1, - // At the end of this test we call resync on a node that may have blacklisted the only other - // data bearing node. We need to ensure that the resync attempt will keep looking for a - // sync source for at least 60 seconds, until the blacklist period ends. Since we sleep 1 - // second in between each attempt to find a sync source, setting the number of attempts to - // find a sync source to larger than 60 should ensure that the resync attempt is able to - // succeed. - nodeOptions: {setParameter: "numInitialSyncConnectAttempts=90"} - }); - var nodes = replTest.nodeList(); - - var conns = replTest.startSet(); - var r = replTest.initiate({ - "_id": "resync", - "members": [ - {"_id": 0, "host": nodes[0], priority: 1}, - {"_id": 1, "host": nodes[1], priority: 0}, - {"_id": 2, "host": nodes[2], arbiterOnly: true} - ] - }); - - var a_conn = conns[0]; - // Make sure we have a master, and it is conns[0] - replTest.waitForState(a_conn, ReplSetTest.State.PRIMARY); - var b_conn = conns[1]; - a_conn.setSlaveOk(); - b_conn.setSlaveOk(); - var A = a_conn.getDB("test"); - var B = b_conn.getDB("test"); - var AID = replTest.getNodeId(a_conn); - var BID = replTest.getNodeId(b_conn); - - // create an oplog entry with an insert - assert.writeOK( - A.foo.insert({x: 1}, {writeConcern: {w: 2, wtimeout: ReplSetTest.kDefaultTimeoutMS}})); - assert.eq(B.foo.findOne().x, 1); - - // run resync and wait for it to happen - assert.commandWorked(b_conn.getDB("admin").runCommand({resync: 1})); - replTest.awaitReplication(); - replTest.awaitSecondaryNodes(); - - assert.eq(B.foo.findOne().x, 1); - replTest.stop(BID); - - function hasCycled() { - var oplog = a_conn.getDB("local").oplog.rs; - try { - // Collection scan to determine if the oplog entry from the first insert has been - // deleted yet. - return oplog.find({"o.x": 1}).sort({$natural: 1}).limit(10).itcount() == 0; - } catch (except) { - // An error is expected in the case that capped deletions blow away the position of the - // collection scan during a yield. In this case, we just try again. - var errorRegex = /CappedPositionLost/; - assert(errorRegex.test(except.message)); - return hasCycled(); - } - } - - jsTestLog("Rolling over oplog"); - - // Make sure the oplog has rolled over on the primary and secondary that is up, - // so when we bring up the other replica it is "too stale" - for (var cycleNumber = 0; cycleNumber < 10; cycleNumber++) { - // insert enough to cycle oplog - var bulk = A.foo.initializeUnorderedBulkOp(); - for (var i = 2; i < 10000; i++) { - bulk.insert({x: i}); - } - - // wait for secondary to also have its oplog cycle - assert.writeOK(bulk.execute({w: 1, wtimeout: ReplSetTest.kDefaultTimeoutMS})); - - if (hasCycled()) - break; - } - - assert(hasCycled()); - - jsTestLog("Restarting node B"); - // bring node B and it will enter recovery mode because its newest oplog entry is too old - replTest.restart(BID); - - jsTestLog("Waiting for node B to to into RECOVERING"); - // check that it is in recovery mode - assert.soon(function() { - try { - var result = b_conn.getDB("admin").runCommand({replSetGetStatus: 1}); - return (result.members[1].stateStr === "RECOVERING"); - } catch (e) { - print(e); - } - }, "node didn't enter RECOVERING state"); - - jsTestLog("Resync node B"); - // run resync and wait for it to happen - assert.commandWorked(b_conn.getDB("admin").runCommand({resync: 1})); - replTest.awaitReplication(); - replTest.awaitSecondaryNodes(); - assert.eq(B.foo.findOne().x, 1); - - replTest.stopSet(15); - jsTest.log("success"); -})(); diff --git a/jstests/replsets/resync_with_write_load.js b/jstests/replsets/resync_with_write_load.js deleted file mode 100644 index 10df8501b91..00000000000 --- a/jstests/replsets/resync_with_write_load.js +++ /dev/null @@ -1,93 +0,0 @@ -/** - * This test creates a replica set and writes during the resync call in order to verify - * that all phases of the resync/initial-sync process work correctly. - * - */ - -// Test disabled because resync still has a race where it doesn't reset its last fetched optime -// before trying to run initial sync, which results in an upstream query with a $gte that might -// return an empty batch. -if (false) { - var testName = "resync_with_write_load"; - var replTest = new ReplSetTest({name: testName, nodes: 3, oplogSize: 100}); - var nodes = replTest.nodeList(); - - var conns = replTest.startSet(); - var config = { - "_id": testName, - "members": [ - {"_id": 0, "host": nodes[0]}, - {"_id": 1, "host": nodes[1], priority: 0}, - {"_id": 2, "host": nodes[2], priority: 0} - ], - settings: {chainingAllowed: false} - }; - var r = replTest.initiate(config); - replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY); - // Make sure we have a master - var master = replTest.getPrimary(); - var a_conn = conns[0]; - var b_conn = conns[1]; - a_conn.setSlaveOk(); - b_conn.setSlaveOk(); - var A = a_conn.getDB("test"); - var B = b_conn.getDB("test"); - var AID = replTest.getNodeId(a_conn); - var BID = replTest.getNodeId(b_conn); - - assert(master == conns[0], "conns[0] assumed to be master"); - assert(a_conn.host == master.host); - - // create an oplog entry with an insert - assert.writeOK( - A.foo.insert({x: 1}, {writeConcern: {w: 3, wtimeout: ReplSetTest.kDefaultTimeoutMS}})); - - print("******************** starting load for 30 secs *********************"); - var work = function() { - print("starting loadgen"); - var start = new Date().getTime(); - - assert.writeOK(db.timeToStartTrigger.insert({_id: 1})); - - while (true) { - for (x = 0; x < 100; x++) { - db["a" + x].insert({a: x}); - } - - var runTime = (new Date().getTime() - start); - if (runTime > 30000) - break; - else if (runTime < 5000) // back-off more during first 2 seconds - sleep(50); - else - sleep(1); - } - print("finshing loadgen"); - }; - // insert enough that resync node has to go through oplog replay in each step - var loadGen = startParallelShell(work, replTest.ports[0]); - - // wait for document to appear to continue - assert.soon(function() { - try { - return 1 == master.getDB("test")["timeToStartTrigger"].find().itcount(); - } catch (e) { - print(e); - return false; - } - }, "waited too long for start trigger", 90 * 1000 /* 90 secs */); - - print("*************** issuing resync command ***************"); - assert.commandWorked(B.adminCommand("resync")); - - print("waiting for load generation to finish"); - loadGen(); - - // Make sure oplogs & dbHashes match - replTest.checkOplogs(testName); - replTest.checkReplicatedDataHashes(testName); - - replTest.stopSet(); - - print("*****test done******"); -} diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index a91561f6da6..85a6e1f02c8 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -41,7 +41,6 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/view_response_formatter.h" -#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/views/resolved_view.h" #include "mongo/util/log.h" @@ -68,10 +67,6 @@ public: } AllowedOnSecondary secondaryAllowed(ServiceContext* serviceContext) const override { - if (repl::ReplicationCoordinator::get(serviceContext)->getSettings().isSlave()) { - // ok on --slave setups - return Command::AllowedOnSecondary::kAlways; - } return Command::AllowedOnSecondary::kOptIn; } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index d52ef2343f2..85eb916a6fe 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -294,10 +294,6 @@ ExitCode _initAndListen(int listenPort) { LogstreamBuilder l = log(LogComponent::kControl); l << "MongoDB starting : pid=" << pid << " port=" << serverGlobalParams.port << " dbpath=" << storageGlobalParams.dbpath; - if (replSettings.isMaster()) - l << " master=" << replSettings.isMaster(); - if (replSettings.isSlave()) - l << " slave=" << (int)replSettings.isSlave(); const bool is32bit = sizeof(int*) == 4; l << (is32bit ? " 32" : " 64") << "-bit host=" << getHostNameCached() << endl; @@ -398,8 +394,8 @@ ExitCode _initAndListen(int listenPort) { auto startupOpCtx = serviceContext->makeOperationContext(&cc()); - bool canCallFCVSetIfCleanStartup = !storageGlobalParams.readOnly && - !(replSettings.isSlave() || storageGlobalParams.engine == "devnull"); + bool canCallFCVSetIfCleanStartup = + !storageGlobalParams.readOnly && (storageGlobalParams.engine != "devnull"); if (canCallFCVSetIfCleanStartup && !replSettings.usingReplSets()) { Lock::GlobalWrite lk(startupOpCtx.get()); FeatureCompatibilityVersion::setIfCleanStartup(startupOpCtx.get(), @@ -560,8 +556,7 @@ ExitCode _initAndListen(int listenPort) { startTTLBackgroundJob(); } - if (replSettings.usingReplSets() || (!replSettings.isMaster() && replSettings.isSlave()) || - !internalValidateFeaturesAsMaster) { + if (replSettings.usingReplSets() || !internalValidateFeaturesAsMaster) { serverGlobalParams.validateFeaturesAsMaster.store(false); } } diff --git a/src/mongo/db/logical_clock_test_fixture.cpp b/src/mongo/db/logical_clock_test_fixture.cpp index 2d384c83274..66ae1b889f1 100644 --- a/src/mongo/db/logical_clock_test_fixture.cpp +++ b/src/mongo/db/logical_clock_test_fixture.cpp @@ -61,9 +61,6 @@ void LogicalClockTestFixture::setUp() { _dbDirectClient = stdx::make_unique<DBDirectClient>(operationContext()); - // Set master to false (set to true in ShardingMongodTestFixture::setUp()) so follower mode can - // be toggled meaningfully. Default follower mode to primary, so writes can be accepted. - replicationCoordinator()->setMaster(false); ASSERT_OK(replicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY)); } diff --git a/src/mongo/db/mongod_options.cpp b/src/mongo/db/mongod_options.cpp index cde2a2a1a3b..a466fa4313f 100644 --- a/src/mongo/db/mongod_options.cpp +++ b/src/mongo/db/mongod_options.cpp @@ -82,7 +82,6 @@ Status addMongodOptions(moe::OptionSection* options) { } #endif - moe::OptionSection ms_options("Master/slave options (old; use replica sets instead)"); moe::OptionSection rs_options("Replica set options"); moe::OptionSection replication_options("Replication options"); moe::OptionSection sharding_options("Sharding options"); @@ -370,49 +369,6 @@ Status addMongodOptions(moe::OptionSection* options) { #endif - // Master Slave Options - - ms_options.addOptionChaining("master", "master", moe::Switch, "master mode") - .incompatibleWith("replication.replSet") - .incompatibleWith("replication.replSetName") - .setSources(moe::SourceAllLegacy); - - ms_options.addOptionChaining("slave", "slave", moe::Switch, "slave mode") - .incompatibleWith("replication.replSet") - .incompatibleWith("replication.replSetName") - .setSources(moe::SourceAllLegacy); - - ms_options - .addOptionChaining( - "source", "source", moe::String, "when slave: specify master as <server:port>") - .incompatibleWith("replication.replSet") - .incompatibleWith("replication.replSetName") - .setSources(moe::SourceAllLegacy); - - ms_options - .addOptionChaining( - "only", "only", moe::String, "when slave: specify a single database to replicate") - .incompatibleWith("replication.replSet") - .incompatibleWith("replication.replSetName") - .setSources(moe::SourceAllLegacy); - - ms_options - .addOptionChaining( - "slavedelay", - "slavedelay", - moe::Int, - "specify delay (in seconds) to be used when applying master ops to slave") - .incompatibleWith("replication.replSet") - .incompatibleWith("replication.replSetName") - .setSources(moe::SourceAllLegacy); - - ms_options - .addOptionChaining( - "autoresync", "autoresync", moe::Switch, "automatically resync if slave data is stale") - .incompatibleWith("replication.replSet") - .incompatibleWith("replication.replSetName") - .setSources(moe::SourceAllLegacy); - // Replication Options replication_options.addOptionChaining( @@ -468,9 +424,7 @@ Status addMongodOptions(moe::OptionSection* options) { moe::Switch, "declare this is a shard db of a cluster; default port 27018") .setSources(moe::SourceAllLegacy) - .incompatibleWith("configsvr") - .incompatibleWith("master") - .incompatibleWith("slave"); + .incompatibleWith("configsvr"); sharding_options .addOptionChaining( @@ -530,7 +484,6 @@ Status addMongodOptions(moe::OptionSection* options) { options->addSection(windows_scm_options).transitional_ignore(); #endif options->addSection(replication_options).transitional_ignore(); - options->addSection(ms_options).transitional_ignore(); options->addSection(rs_options).transitional_ignore(); options->addSection(sharding_options).transitional_ignore(); #ifdef MONGO_CONFIG_SSL @@ -540,23 +493,6 @@ Status addMongodOptions(moe::OptionSection* options) { // The following are legacy options that are disallowed in the JSON config file - options - ->addOptionChaining( - "fastsync", - "fastsync", - moe::Switch, - "indicate that this instance is starting from a dbpath snapshot of the repl peer") - .hidden() - .setSources(moe::SourceAllLegacy); - - options - ->addOptionChaining("pretouch", - "pretouch", - moe::Int, - "n pretouch threads for applying master/slave operations") - .hidden() - .setSources(moe::SourceAllLegacy); - // This is a deprecated option that we are supporting for backwards compatibility // The first value for this option can be either 'dbpath' or 'run'. // If it is 'dbpath', mongod prints the dbpath and exits. Any extra values are ignored. @@ -651,18 +587,8 @@ Status validateMongodOptions(const moe::Environment& params) { if (params.count("storage.queryableBackupMode")) { // Command line options that are disallowed when --queryableBackupMode is specified. - for (const auto& disallowedOption : {"replication.replSet", - "configsvr", - "upgrade", - "repair", - "profile", - "master", - "slave", - "source", - "only", - "slavedelay", - "autoresync", - "fastsync"}) { + for (const auto& disallowedOption : + {"replication.replSet", "configsvr", "upgrade", "repair", "profile"}) { if (params.count(disallowedOption)) { return Status(ErrorCodes::BadValue, str::stream() << "Cannot specify both queryable backup mode and " @@ -1097,32 +1023,6 @@ Status storeMongodOptions(const moe::Environment& params) { } repl::ReplSettings replSettings; - if (params.count("master")) { - replSettings.setMaster(params["master"].as<bool>()); - } - if (params.count("slave") && params["slave"].as<bool>() == true) { - replSettings.setSlave(true); - } - if (params.count("slavedelay")) { - replSettings.setSlaveDelaySecs(params["slavedelay"].as<int>()); - } - if (params.count("fastsync")) { - if (!replSettings.isSlave()) { - return Status(ErrorCodes::BadValue, - str::stream() << "--fastsync must only be used with --slave"); - } - replSettings.setFastSyncEnabled(params["fastsync"].as<bool>()); - } - if (params.count("autoresync")) { - replSettings.setAutoResyncEnabled(params["autoresync"].as<bool>()); - } - if (params.count("source")) { - /* specifies what the source in local.sources should be */ - replSettings.setSource(params["source"].as<std::string>().c_str()); - } - if (params.count("pretouch")) { - replSettings.setPretouch(params["pretouch"].as<int>()); - } if (params.count("replication.replSetName")) { replSettings.setReplSetString(params["replication.replSetName"].as<std::string>().c_str()); } @@ -1147,9 +1047,6 @@ Status storeMongodOptions(const moe::Environment& params) { serverGlobalParams.indexBuildRetry = params["storage.indexBuildRetry"].as<bool>(); } - if (params.count("only")) { - replSettings.setOnly(params["only"].as<std::string>().c_str()); - } if (params.count("storage.mmapv1.nsSize")) { int x = params["storage.mmapv1.nsSize"].as<int>(); if (x <= 0 || x > (0x7fffffff / 1024 / 1024)) { @@ -1272,9 +1169,6 @@ Status storeMongodOptions(const moe::Environment& params) { storageGlobalParams.repairpath = storageGlobalParams.dbpath; } - if (replSettings.getPretouch()) - log() << "--pretouch " << replSettings.getPretouch(); - // Check if we are 32 bit and have not explicitly specified any journaling options if (sizeof(void*) == 4 && !params.count("storage.journal.enabled")) { // trying to make this stand out more like startup warnings diff --git a/src/mongo/db/repair_database_and_check_version.cpp b/src/mongo/db/repair_database_and_check_version.cpp index eceed6b3198..d86c0da16af 100644 --- a/src/mongo/db/repair_database_and_check_version.cpp +++ b/src/mongo/db/repair_database_and_check_version.cpp @@ -366,8 +366,7 @@ StatusWith<bool> repairDatabasesAndCheckVersion(OperationContext* opCtx) { // to. The local DB is special because it is not replicated. See SERVER-10927 for more // details. const bool shouldClearNonLocalTmpCollections = - !(checkIfReplMissingFromCommandLine(opCtx) || replSettings.usingReplSets() || - replSettings.isSlave()); + !(checkIfReplMissingFromCommandLine(opCtx) || replSettings.usingReplSets()); // To check whether a featureCompatibilityVersion document exists. bool fcvDocumentExists = false; diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 5cebfed131d..6200fb4b39a 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1535,10 +1535,8 @@ env.Library( env.Library( target="serveronly_repl", source=[ - "master_slave.cpp", 'noop_writer.cpp', "replication_coordinator_external_state_impl.cpp", - "resync.cpp", "rs_sync.cpp", "sync_source_feedback.cpp", ], diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp deleted file mode 100644 index 4f8a109702b..00000000000 --- a/src/mongo/db/repl/master_slave.cpp +++ /dev/null @@ -1,1462 +0,0 @@ -/** -* Copyright (C) 2008-2014 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -/* Collections we use: - - local.sources - indicates what sources we pull from as a "slave", and the last update of - each - local.oplog.$main - our op log as "master" - local.dbinfo.<dbname> - no longer used??? - local.pair.startup - [deprecated] can contain a special value indicating for a pair that we - have the master copy. - used when replacing other half of the pair which has permanently failed. - local.pair.sync - [deprecated] { initialsynccomplete: 1 } -*/ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/master_slave.h" - -#include <pcrecpp.h> - -#include "mongo/db/auth/authorization_manager.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/catalog/database_catalog_entry.h" -#include "mongo/db/catalog/database_holder.h" -#include "mongo/db/catalog/document_validation.h" -#include "mongo/db/client.h" -#include "mongo/db/cloner.h" -#include "mongo/db/commands.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/op_observer.h" -#include "mongo/db/ops/update.h" -#include "mongo/db/query/internal_plans.h" -#include "mongo/db/repl/handshake_args.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/sync_tail.h" -#include "mongo/db/server_parameters.h" -#include "mongo/db/service_context.h" -#include "mongo/db/storage/storage_options.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/exit.h" -#include "mongo/util/log.h" -#include "mongo/util/net/sock.h" -#include "mongo/util/quick_exit.h" - -using std::cout; -using std::endl; -using std::max; -using std::min; -using std::set; -using std::stringstream; -using std::unique_ptr; -using std::vector; - -namespace mongo { -namespace repl { - -namespace { -time_t lastForcedResync = 0; - -const int forceReconnect = -1; -const int restartSync = 0; -const int restartSyncAfterSleep = 1; -} // namespace - -void pretouchOperation(OperationContext* opCtx, const BSONObj& op); -void pretouchN(const vector<BSONObj>&, unsigned a, unsigned b); - -/* if 1 sync() is running */ -AtomicInt32 syncing(0); -AtomicInt32 relinquishSyncingSome(0); - -/* output by the web console */ -const char* replInfo = ""; -struct ReplInfo { - ReplInfo(const char* msg) { - replInfo = msg; - } - ~ReplInfo() { - replInfo = "?"; - } -}; - - -ReplSource::ReplSource(OperationContext* opCtx) { - nClonedThisPass = 0; - ensureMe(opCtx); -} - -ReplSource::ReplSource(OperationContext* opCtx, BSONObj o) : nClonedThisPass(0) { - only = o.getStringField("only"); - hostName = o.getStringField("host"); - _sourceName = o.getStringField("source"); - uassert(10118, "'host' field not set in sources collection object", !hostName.empty()); - uassert(10119, "only source='main' allowed for now with replication", sourceName() == "main"); - BSONElement e = o.getField("syncedTo"); - if (!e.eoo()) { - uassert(10120, - "bad sources 'syncedTo' field value", - e.type() == Date || e.type() == bsonTimestamp); - Timestamp tmp(e.date()); - syncedTo = tmp; - } - - BSONObj dbsObj = o.getObjectField("dbsNextPass"); - if (!dbsObj.isEmpty()) { - BSONObjIterator i(dbsObj); - while (1) { - BSONElement e = i.next(); - if (e.eoo()) - break; - addDbNextPass.insert(e.fieldName()); - } - } - - dbsObj = o.getObjectField("incompleteCloneDbs"); - if (!dbsObj.isEmpty()) { - BSONObjIterator i(dbsObj); - while (1) { - BSONElement e = i.next(); - if (e.eoo()) - break; - incompleteCloneDbs.insert(e.fieldName()); - } - } - ensureMe(opCtx); -} - -/* Turn our C++ Source object into a BSONObj */ -BSONObj ReplSource::jsobj() { - BSONObjBuilder b; - b.append("host", hostName); - b.append("source", sourceName()); - if (!only.empty()) - b.append("only", only); - if (!syncedTo.isNull()) - b.append("syncedTo", syncedTo); - - BSONObjBuilder dbsNextPassBuilder; - int n = 0; - for (set<std::string>::iterator i = addDbNextPass.begin(); i != addDbNextPass.end(); i++) { - n++; - dbsNextPassBuilder.appendBool(*i, 1); - } - if (n) - b.append("dbsNextPass", dbsNextPassBuilder.done()); - - BSONObjBuilder incompleteCloneDbsBuilder; - n = 0; - for (set<std::string>::iterator i = incompleteCloneDbs.begin(); i != incompleteCloneDbs.end(); - i++) { - n++; - incompleteCloneDbsBuilder.appendBool(*i, 1); - } - if (n) - b.append("incompleteCloneDbs", incompleteCloneDbsBuilder.done()); - - return b.obj(); -} - -void ReplSource::ensureMe(OperationContext* opCtx) { - std::string myname = getHostName(); - - // local.me is an identifier for a server for getLastError w:2+ - bool exists = Helpers::getSingleton(opCtx, "local.me", _me); - - if (!exists || !_me.hasField("host") || _me["host"].String() != myname) { - Lock::DBLock dblk(opCtx, "local", MODE_X); - WriteUnitOfWork wunit(opCtx); - // clean out local.me - Helpers::emptyCollection(opCtx, NamespaceString("local.me")); - - // repopulate - BSONObjBuilder b; - b.appendOID("_id", 0, true); - b.append("host", myname); - _me = b.obj(); - Helpers::putSingleton(opCtx, "local.me", _me); - wunit.commit(); - } - _me = _me.getOwned(); -} - -void ReplSource::save(OperationContext* opCtx) { - BSONObjBuilder b; - verify(!hostName.empty()); - b.append("host", hostName); - // todo: finish allowing multiple source configs. - // this line doesn't work right when source is null, if that is allowed as it is now: - // b.append("source", _sourceName); - BSONObj pattern = b.done(); - - BSONObj o = jsobj(); - LOG(1) << "Saving repl source: " << o << endl; - - { - OldClientContext ctx(opCtx, "local.sources", false); - - const NamespaceString requestNs("local.sources"); - UpdateRequest request(requestNs); - - request.setQuery(pattern); - request.setUpdates(o); - request.setUpsert(); - - UpdateResult res = update(opCtx, ctx.db(), request); - - verify(!res.modifiers); - verify(res.numMatched == 1 || !res.upserted.isEmpty()); - } -} - -static void addSourceToList(OperationContext* opCtx, - ReplSource::SourceVector& v, - ReplSource& s, - ReplSource::SourceVector& old) { - if (!s.syncedTo.isNull()) { // Don't reuse old ReplSource if there was a forced resync. - for (ReplSource::SourceVector::iterator i = old.begin(); i != old.end();) { - if (s == **i) { - v.push_back(*i); - old.erase(i); - return; - } - i++; - } - } - - v.push_back(std::shared_ptr<ReplSource>(new ReplSource(s))); -} - -/* we reuse our existing objects so that we can keep our existing connection - and cursor in effect. -*/ -void ReplSource::loadAll(OperationContext* opCtx, SourceVector& v) { - const char* localSources = "local.sources"; - OldClientContext ctx(opCtx, localSources, false); - SourceVector old = v; - v.clear(); - - const ReplSettings& replSettings = ReplicationCoordinator::get(opCtx)->getSettings(); - if (!replSettings.getSource().empty()) { - // --source <host> specified. - // check that no items are in sources other than that - // add if missing - int n = 0; - auto exec = InternalPlanner::collectionScan(opCtx, - localSources, - ctx.db()->getCollection(opCtx, localSources), - PlanExecutor::NO_YIELD); - BSONObj obj; - PlanExecutor::ExecState state; - while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { - n++; - ReplSource tmp(opCtx, obj); - if (tmp.hostName != replSettings.getSource()) { - log() << "--source " << replSettings.getSource() << " != " << tmp.hostName - << " from local.sources collection" << endl; - log() << "for instructions on changing this slave's source, see:" << endl; - log() << "http://dochub.mongodb.org/core/masterslave" << endl; - log() << "terminating mongod after 30 seconds" << endl; - sleepsecs(30); - quickExit(EXIT_REPLICATION_ERROR); - } - if (tmp.only != replSettings.getOnly()) { - log() << "--only " << replSettings.getOnly() << " != " << tmp.only - << " from local.sources collection" << endl; - log() << "terminating after 30 seconds" << endl; - sleepsecs(30); - quickExit(EXIT_REPLICATION_ERROR); - } - } - uassert(17065, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state); - uassert(10002, "local.sources collection corrupt?", n < 2); - if (n == 0) { - // source missing. add. - ReplSource s(opCtx); - s.hostName = replSettings.getSource(); - s.only = replSettings.getOnly(); - s.save(opCtx); - } - } else { - try { - massert(10384, "--only requires use of --source", replSettings.getOnly().empty()); - } catch (...) { - quickExit(EXIT_BADOPTIONS); - } - } - - auto exec = InternalPlanner::collectionScan( - opCtx, localSources, ctx.db()->getCollection(opCtx, localSources), PlanExecutor::NO_YIELD); - BSONObj obj; - PlanExecutor::ExecState state; - while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { - ReplSource tmp(opCtx, obj); - if (tmp.syncedTo.isNull()) { - DBDirectClient c(opCtx); - BSONObj op = c.findOne("local.oplog.$main", - QUERY("op" << NE << "n").sort(BSON("$natural" << -1))); - if (!op.isEmpty()) { - tmp.syncedTo = op["ts"].timestamp(); - } - } - addSourceToList(opCtx, v, tmp, old); - } - uassert(17066, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state); -} - -bool ReplSource::throttledForceResyncDead(OperationContext* opCtx, const char* requester) { - if (time(0) - lastForcedResync > 600) { - forceResyncDead(opCtx, requester); - lastForcedResync = time(0); - return true; - } - return false; -} - -void ReplSource::forceResyncDead(OperationContext* opCtx, const char* requester) { - if (!replAllDead) - return; - SourceVector sources; - ReplSource::loadAll(opCtx, sources); - for (SourceVector::iterator i = sources.begin(); i != sources.end(); ++i) { - log() << requester << " forcing resync from " << (*i)->hostName << endl; - (*i)->forceResync(opCtx, requester); - } - replAllDead = 0; -} - -class HandshakeCmd : public BasicCommand { -public: - std::string help() const override { - return "internal"; - } - HandshakeCmd() : BasicCommand("handshake") {} - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { - return false; - } - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kAlways; - } - virtual bool adminOnly() const { - return false; - } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) const { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - - virtual bool run(OperationContext* opCtx, - const std::string& ns, - const BSONObj& cmdObj, - BSONObjBuilder& result) { - HandshakeArgs handshake; - Status status = handshake.initialize(cmdObj); - if (!status.isOK()) { - return CommandHelpers::appendCommandStatus(result, status); - } - - ReplClientInfo::forClient(opCtx->getClient()).setRemoteID(handshake.getRid()); - - status = ReplicationCoordinator::get(opCtx)->processHandshake(opCtx, handshake); - return CommandHelpers::appendCommandStatus(result, status); - } - -} handshakeCmd; - -bool replHandshake(DBClientConnection* conn, const OID& myRID) { - std::string myname = getHostName(); - - BSONObjBuilder cmd; - cmd.append("handshake", myRID); - - BSONObj res; - bool ok = conn->runCommand("admin", cmd.obj(), res); - // ignoring for now on purpose for older versions - LOG(ok ? 1 : 0) << "replHandshake result: " << res << endl; - return true; -} - -bool ReplSource::_connect(OplogReader* reader, const HostAndPort& host, const OID& myRID) { - if (reader->conn()) { - return true; - } - - if (!reader->connect(host)) { - return false; - } - - if (_doHandshake && !replHandshake(reader->conn(), myRID)) { - return false; - } - - return true; -} - - -void ReplSource::forceResync(OperationContext* opCtx, const char* requester) { - BSONObj info; - { - // This is always a GlobalWrite lock (so no ns/db used from the context) - invariant(opCtx->lockState()->isW()); - Lock::TempRelease tempRelease(opCtx->lockState()); - - if (!_connect(&oplogReader, - HostAndPort(hostName), - ReplicationCoordinator::get(opCtx)->getMyRID())) { - msgasserted(14051, "unable to connect to resync"); - } - bool ok = oplogReader.conn()->runCommand( - "admin", BSON("listDatabases" << 1), info, QueryOption_SlaveOk); - massert(10385, "Unable to get database list", ok); - } - - BSONObjIterator i(info.getField("databases").embeddedObject()); - while (i.moreWithEOO()) { - BSONElement e = i.next(); - if (e.eoo()) - break; - std::string name = e.embeddedObject().getField("name").valuestr(); - if (!e.embeddedObject().getBoolField("empty")) { - if (name != "local") { - if (only.empty() || only == name) { - resyncDrop(opCtx, name); - } - } - } - } - syncedTo = Timestamp(); - addDbNextPass.clear(); - save(opCtx); -} - -Status ReplSource::_updateIfDoneWithInitialSync(OperationContext* opCtx) { - const auto usedToDoHandshake = _doHandshake; - if (!usedToDoHandshake && addDbNextPass.empty() && incompleteCloneDbs.empty()) { - _doHandshake = true; - oplogReader.resetConnection(); - const auto myRID = ReplicationCoordinator::get(opCtx)->getMyRID(); - if (!_connect(&oplogReader, HostAndPort{hostName}, myRID)) { - return {ErrorCodes::MasterSlaveConnectionFailure, - str::stream() << "could not connect to " << hostName << " with rid: " - << myRID.toString()}; - } else { - return {ErrorCodes::Interrupted, "Initial Sync is done."}; - } - } - return Status::OK(); -} - -void ReplSource::resyncDrop(OperationContext* opCtx, const std::string& dbName) { - log() << "resync: dropping database " << dbName; - invariant(opCtx->lockState()->isW()); - - Database* const db = dbHolder().get(opCtx, dbName); - if (!db) { - log() << "resync: dropping database " << dbName - << " - database does not exist. nothing to do."; - return; - } - Database::dropDatabase(opCtx, db); -} - -/* grab initial copy of a database from the master */ -void ReplSource::resync(OperationContext* opCtx, const std::string& dbName) { - const std::string db(dbName); // need local copy of the name, we're dropping the original - resyncDrop(opCtx, db); - - { - log() << "resync: cloning database " << db << " to get an initial copy" << endl; - ReplInfo r("resync: cloning a database"); - - CloneOptions cloneOptions; - cloneOptions.fromDB = db; - cloneOptions.slaveOk = true; - cloneOptions.useReplAuth = true; - cloneOptions.snapshot = true; - - Cloner cloner; - Status status = cloner.copyDb(opCtx, db, hostName.c_str(), cloneOptions, NULL); - - if (!status.isOK()) { - if (status.code() == ErrorCodes::DatabaseDifferCase) { - resyncDrop(opCtx, db); - log() << "resync: database " << db - << " not valid on the master due to a name conflict, dropping."; - return; - } else { - log() << "resync of " << db << " from " << hostName - << " failed due to: " << redact(status); - throw SyncException(); - } - } - } - - log() << "resync: done with initial clone for db: " << db << endl; -} - -static DatabaseIgnorer ___databaseIgnorer; - -void DatabaseIgnorer::doIgnoreUntilAfter(const std::string& db, const Timestamp& futureOplogTime) { - if (futureOplogTime > _ignores[db]) { - _ignores[db] = futureOplogTime; - } -} - -bool DatabaseIgnorer::ignoreAt(const std::string& db, const Timestamp& currentOplogTime) { - if (_ignores[db].isNull()) { - return false; - } - if (_ignores[db] >= currentOplogTime) { - return true; - } else { - // The ignore state has expired, so clear it. - _ignores.erase(db); - return false; - } -} - -bool ReplSource::handleDuplicateDbName(OperationContext* opCtx, - const BSONObj& op, - const char* ns, - const char* db) { - // We are already locked at this point - if (dbHolder().get(opCtx, ns) != NULL) { - // Database is already present. - return true; - } - BSONElement ts = op.getField("ts"); - if ((ts.type() == Date || ts.type() == bsonTimestamp) && - ___databaseIgnorer.ignoreAt(db, ts.timestamp())) { - // Database is ignored due to a previous indication that it is - // missing from master after optime "ts". - return false; - } - if (dbHolder().getNamesWithConflictingCasing(db).empty()) { - // No duplicate database names are present. - return true; - } - - Timestamp lastTime; - bool dbOk = false; - { - // This is always a GlobalWrite lock (so no ns/db used from the context) - invariant(opCtx->lockState()->isW()); - Lock::TempRelease tempRelease(opCtx->lockState()); - - // We always log an operation after executing it (never before), so - // a database list will always be valid as of an oplog entry generated - // before it was retrieved. - - BSONObj last = - oplogReader.findOne(this->ns().c_str(), Query().sort(BSON("$natural" << -1))); - if (!last.isEmpty()) { - BSONElement ts = last.getField("ts"); - massert(14032, - "Invalid 'ts' in remote log", - ts.type() == Date || ts.type() == bsonTimestamp); - lastTime = Timestamp(ts.date()); - } - - BSONObj info; - bool ok = oplogReader.conn()->runCommand("admin", BSON("listDatabases" << 1), info); - massert(14033, "Unable to get database list", ok); - BSONObjIterator i(info.getField("databases").embeddedObject()); - while (i.more()) { - BSONElement e = i.next(); - - const char* name = e.embeddedObject().getField("name").valuestr(); - if (strcasecmp(name, db) != 0) - continue; - - if (strcmp(name, db) == 0) { - // The db exists on master, still need to check that no conflicts exist there. - dbOk = true; - continue; - } - - // The master has a db name that conflicts with the requested name. - dbOk = false; - break; - } - } - - if (!dbOk) { - ___databaseIgnorer.doIgnoreUntilAfter(db, lastTime); - incompleteCloneDbs.erase(db); - addDbNextPass.erase(db); - return false; - } - - // Check for duplicates again, since we released the lock above. - auto duplicates = dbHolder().getNamesWithConflictingCasing(db); - - // The database is present on the master and no conflicting databases - // are present on the master. Drop any local conflicts. - for (set<std::string>::const_iterator i = duplicates.begin(); i != duplicates.end(); ++i) { - ___databaseIgnorer.doIgnoreUntilAfter(*i, lastTime); - incompleteCloneDbs.erase(*i); - addDbNextPass.erase(*i); - - AutoGetDb autoDb(opCtx, *i, MODE_X); - Database::dropDatabase(opCtx, autoDb.getDb()); - } - - massert(14034, - "Duplicate database names present after attempting to delete duplicates", - dbHolder().getNamesWithConflictingCasing(db).empty()); - return true; -} - -void ReplSource::applyCommand(OperationContext* opCtx, const BSONObj& op) { - try { - Status status = applyCommand_inlock(opCtx, op, OplogApplication::Mode::kMasterSlave); - uassert(28639, "Failure applying initial sync command", status.isOK()); - } catch (AssertionException& e) { - log() << "sync: caught user assertion " << redact(e) << " while applying op: " << redact(op) - << endl; - ; - } catch (DBException& e) { - log() << "sync: caught db exception " << redact(e) << " while applying op: " << redact(op) - << endl; - ; - } -} - -void ReplSource::applyOperation(OperationContext* opCtx, Database* db, const BSONObj& op) { - try { - Status status = - applyOperation_inlock(opCtx, db, op, false, OplogApplication::Mode::kMasterSlave); - if (!status.isOK()) { - uassert(15914, - "Failure applying initial sync operation", - status == ErrorCodes::UpdateOperationFailed); - - // In initial sync, update operations can cause documents to be missed during - // collection cloning. As a result, it is possible that a document that we need to - // update is not present locally. In that case we fetch the document from the - // sync source. - SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); - sync.setHostname(hostName); - OplogEntry oplogEntry(op); - sync.fetchAndInsertMissingDocument(opCtx, oplogEntry); - } - } catch (AssertionException& e) { - log() << "sync: caught user assertion " << redact(e) << " while applying op: " << redact(op) - << endl; - ; - } catch (DBException& e) { - log() << "sync: caught db exception " << redact(e) << " while applying op: " << redact(op) - << endl; - ; - } -} - -/* local.$oplog.main is of the form: - { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> } - ... - see logOp() comments. - - @param alreadyLocked caller already put us in write lock if true -*/ -void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* opCtx, - BSONObj& op, - bool alreadyLocked) { - LOG(6) << "processing op: " << redact(op) << endl; - - if (op.getStringField("op")[0] == 'n') - return; - - const char* ns = op.getStringField("ns"); - const auto dbName = nsToDatabaseSubstring(ns).toString(); - - if (*ns == '.') { - log() << "skipping bad op in oplog: " << redact(op) << endl; - return; - } else if (*ns == 0) { - /*if( op.getStringField("op")[0] != 'n' )*/ { - log() << "halting replication, bad op in oplog:\n " << redact(op) << endl; - replAllDead = "bad object in oplog"; - throw SyncException(); - } - // ns = "local.system.x"; - // nsToDatabase(ns, clientName); - } - - if (!only.empty() && only != dbName) - return; - - // Push the CurOp stack for "opCtx" so each individual oplog entry application is separately - // reported. - CurOp individualOp(opCtx); - UnreplicatedWritesBlock uwb(opCtx); - const ReplSettings& replSettings = ReplicationCoordinator::get(opCtx)->getSettings(); - if (replSettings.getPretouch() && - !alreadyLocked /*doesn't make sense if in write lock already*/) { - if (replSettings.getPretouch() > 1) { - /* note: this is bad - should be put in ReplSource. but this is first test... */ - static int countdown; - verify(countdown >= 0); - if (countdown > 0) { - countdown--; // was pretouched on a prev pass - } else { - const int m = 4; - if (tp.get() == 0) { - int nthr = min(8, replSettings.getPretouch()); - nthr = max(nthr, 1); - ThreadPool::Options options; - options.maxThreads = options.minThreads = std::size_t(nthr); - options.poolName = "master_slave_pretouch"; - tp.reset(new ThreadPool(options)); - } - vector<BSONObj> v; - oplogReader.peek(v, replSettings.getPretouch()); - unsigned a = 0; - while (1) { - if (a >= v.size()) - break; - unsigned b = a + m - 1; // v[a..b] - if (b >= v.size()) - b = v.size() - 1; - invariantOK(tp->schedule([&v, a, b] { pretouchN(v, a, b); })); - DEV cout << "pretouch task: " << a << ".." << b << endl; - a += m; - } - // we do one too... - pretouchOperation(opCtx, op); - tp->waitForIdle(); - countdown = v.size(); - } - } else { - pretouchOperation(opCtx, op); - } - } - - unique_ptr<Lock::GlobalWrite> lk(alreadyLocked ? 0 : new Lock::GlobalWrite(opCtx)); - - if (replAllDead) { - // hmmm why is this check here and not at top of this function? does it get set between top - // and here? - log() << "replAllDead, throwing SyncException: " << replAllDead << endl; - throw SyncException(); - } - - if (!handleDuplicateDbName(opCtx, op, ns, dbName.c_str())) { - return; - } - - // special case apply for commands to avoid implicit database creation - if (*op.getStringField("op") == 'c') { - applyCommand(opCtx, op); - return; - } - - // This code executes on the slaves only, so it doesn't need to be sharding-aware since - // mongos will not send requests there. That's why the last argument is false (do not do - // version checking). - OldClientContext ctx(opCtx, ns, false); - - bool empty = !ctx.db()->getDatabaseCatalogEntry()->hasUserData(); - bool incompleteClone = incompleteCloneDbs.count(dbName) != 0; - - LOG(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty - << ", incompleteClone: " << incompleteClone << endl; - - if (ctx.justCreated() || empty || incompleteClone) { - // we must add to incomplete list now that setClient has been called - incompleteCloneDbs.insert(dbName); - if (nClonedThisPass) { - /* we only clone one database per pass, even if a lot need done. This helps us - avoid overflowing the master's transaction log by doing too much work before going - back to read more transactions. (Imagine a scenario of slave startup where we try to - clone 100 databases in one pass.) - */ - addDbNextPass.insert(dbName); - } else { - if (incompleteClone) { - log() << "An earlier initial clone of '" << dbName - << "' did not complete, now resyncing." << endl; - } - save(opCtx); - OldClientContext ctx(opCtx, ns, false); - nClonedThisPass++; - resync(opCtx, ctx.db()->name()); - addDbNextPass.erase(dbName); - incompleteCloneDbs.erase(dbName); - } - save(opCtx); - } else { - applyOperation(opCtx, ctx.db(), op); - addDbNextPass.erase(dbName); - } -} - -void ReplSource::syncToTailOfRemoteLog() { - std::string _ns = ns(); - BSONObjBuilder b; - if (!only.empty()) { - b.appendRegex("ns", std::string("^") + pcrecpp::RE::QuoteMeta(only)); - } - BSONObj last = oplogReader.findOne(_ns.c_str(), Query(b.done()).sort(BSON("$natural" << -1))); - if (!last.isEmpty()) { - BSONElement ts = last.getField("ts"); - massert(10386, - "non Date ts found: " + last.toString(), - ts.type() == Date || ts.type() == bsonTimestamp); - syncedTo = Timestamp(ts.date()); - } -} - -AtomicInt32 replApplyBatchSize(1); - -class ReplApplyBatchSize - : public ExportedServerParameter<int, ServerParameterType::kStartupAndRuntime> { -public: - ReplApplyBatchSize() - : ExportedServerParameter<int, ServerParameterType::kStartupAndRuntime>( - ServerParameterSet::getGlobal(), "replApplyBatchSize", &replApplyBatchSize) {} - - virtual Status validate(const int& potentialNewValue) { - if (potentialNewValue < 1 || potentialNewValue > 1024) { - return Status(ErrorCodes::BadValue, "replApplyBatchSize has to be >= 1 and <= 1024"); - } - - const ReplSettings& replSettings = - ReplicationCoordinator::get(getGlobalServiceContext())->getSettings(); - if (replSettings.getSlaveDelaySecs() != Seconds(0) && potentialNewValue > 1) { - return Status(ErrorCodes::BadValue, "can't use a batch size > 1 with slavedelay"); - } - - if (!replSettings.isSlave()) { - return Status(ErrorCodes::BadValue, - "can't set replApplyBatchSize on a non-slave machine"); - } - - return Status::OK(); - } -} replApplyBatchSizeServerParameter; - -/* slave: pull some data from the master's oplog - note: not yet in db mutex at this point. - @return -1 error - 0 ok, don't sleep - 1 ok, sleep -*/ -int ReplSource::_sync_pullOpLog(OperationContext* opCtx, int& nApplied) { - int okResultCode = restartSyncAfterSleep; - std::string ns = std::string("local.oplog.$") + sourceName(); - LOG(2) << "sync_pullOpLog " << ns << " syncedTo:" << syncedTo << '\n'; - - bool tailing = true; - oplogReader.tailCheck(); - - // Due to the lack of exception handlers, don't allow lock interrupts. - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - - bool initial = syncedTo.isNull(); - - if (!oplogReader.haveCursor() || initial) { - if (initial) { - // Important to grab last oplog timestamp before listing databases. - syncToTailOfRemoteLog(); - BSONObj info; - bool ok = oplogReader.conn()->runCommand("admin", BSON("listDatabases" << 1), info); - massert(10389, "Unable to get database list", ok); - BSONObjIterator i(info.getField("databases").embeddedObject()); - while (i.moreWithEOO()) { - BSONElement e = i.next(); - if (e.eoo()) - break; - std::string name = e.embeddedObject().getField("name").valuestr(); - if (!e.embeddedObject().getBoolField("empty")) { - if (name != "local") { - if (only.empty() || only == name) { - LOG(2) << "adding to 'addDbNextPass': " << name << endl; - addDbNextPass.insert(name); - } - } - } - } - // obviously global isn't ideal, but non-repl set is old so - // keeping it simple - Lock::GlobalWrite lk(opCtx); - save(opCtx); - } - - BSONObjBuilder gte; - gte.append("$gte", syncedTo); - BSONObjBuilder query; - query.append("ts", gte.done()); - if (!only.empty()) { - // note we may here skip a LOT of data table scanning, a lot of work for the master. - // maybe append "\\." here? - query.appendRegex("ns", std::string("^") + pcrecpp::RE::QuoteMeta(only)); - } - BSONObj queryObj = query.done(); - // e.g. queryObj = { ts: { $gte: syncedTo } } - - oplogReader.tailingQuery(ns.c_str(), queryObj); - tailing = false; - } else { - LOG(2) << "tailing=true\n"; - } - - if (!oplogReader.haveCursor()) { - log() << "dbclient::query returns null (conn closed?)" << endl; - oplogReader.resetConnection(); - return forceReconnect; - } - - // show any deferred database creates from a previous pass - { - set<std::string>::iterator i = addDbNextPass.begin(); - if (i != addDbNextPass.end()) { - BSONObjBuilder b; - b.append("ns", *i + '.'); - b.append("op", "db"); - BSONObj op = b.done(); - _sync_pullOpLog_applyOperation(opCtx, op, false); - } - } - - auto status = _updateIfDoneWithInitialSync(opCtx); - if (!status.isOK()) { - switch (status.code()) { - case ErrorCodes::Interrupted: { - return restartSync; // don't sleep; - } - default: { - error() << redact(status); - return forceReconnect; // causes reconnect. - } - } - } - - if (!oplogReader.more()) { - if (tailing) { - LOG(2) << "tailing & no new activity\n"; - okResultCode = restartSync; // don't sleep - - } else { - log() << ns << " oplog is empty" << endl; - } - { - Lock::GlobalWrite lk(opCtx); - save(opCtx); - } - return okResultCode; - } - - Timestamp nextOpTime; - { - BSONObj op = oplogReader.nextSafe(); - BSONElement ts = op.getField("ts"); - if (ts.type() != Date && ts.type() != bsonTimestamp) { - std::string err = op.getStringField("$err"); - if (!err.empty()) { - // 13051 is "tailable cursor requested on non capped collection" - if (op.getIntField("code") == 13051) { - log() << "trying to slave off of a non-master" << '\n'; - massert(13344, "trying to slave off of a non-master", false); - } else { - error() << "$err reading remote oplog: " + redact(err) << '\n'; - massert(10390, "got $err reading remote oplog", false); - } - } else { - error() << "bad object read from remote oplog: " << redact(op) << '\n'; - massert(10391, "bad object read from remote oplog", false); - } - } - - nextOpTime = Timestamp(ts.date()); - LOG(2) << "first op time received: " << nextOpTime.toString() << '\n'; - if (initial) { - LOG(1) << "initial run\n"; - } - if (tailing) { - if (!(syncedTo < nextOpTime)) { - warning() << "ASSERTION failed : syncedTo < nextOpTime" << endl; - log() << "syncTo: " << syncedTo << endl; - log() << "nextOpTime: " << nextOpTime << endl; - verify(false); - } - oplogReader.putBack(op); // op will be processed in the loop below - nextOpTime = Timestamp(); // will reread the op below - } else if (nextOpTime != syncedTo) { // didn't get what we queried for - error - log() << "nextOpTime " << nextOpTime << ' ' << ((nextOpTime < syncedTo) ? "<??" : ">") - << " syncedTo " << syncedTo << '\n' - << "time diff: " << (nextOpTime.getSecs() - syncedTo.getSecs()) << "sec\n" - << "tailing: " << tailing << '\n' - << "data too stale, halting replication" << endl; - replInfo = replAllDead = "data too stale halted replication"; - verify(syncedTo < nextOpTime); - throw SyncException(); - } else { - /* t == syncedTo, so the first op was applied previously or it is the first op of - * initial query and need not be applied. */ - } - } - - // apply operations - { - int n = 0; - time_t saveLast = time(0); - while (1) { - // we need "&& n" to assure we actually process at least one op to get a sync - // point recorded in the first place. - const bool moreInitialSyncsPending = !addDbNextPass.empty() && n; - - if (moreInitialSyncsPending || !oplogReader.more()) { - Lock::GlobalWrite lk(opCtx); - - if (tailing) { - okResultCode = restartSync; // don't sleep - } - - syncedTo = nextOpTime; - save(opCtx); // note how far we are synced up to now - nApplied = n; - break; - } - - OCCASIONALLY if (n > 0 && (n > 100000 || time(0) - saveLast > 60)) { - // periodically note our progress, in case we are doing a lot of work and crash - Lock::GlobalWrite lk(opCtx); - syncedTo = nextOpTime; - // can't update local log ts since there are pending operations from our peer - save(opCtx); - log() << "checkpoint applied " << n << " operations" << endl; - log() << "syncedTo: " << syncedTo << endl; - saveLast = time(0); - n = 0; - } - - BSONObj op = oplogReader.nextSafe(); - - int b = replApplyBatchSize.load(); - bool justOne = b == 1; - unique_ptr<Lock::GlobalWrite> lk(justOne ? 0 : new Lock::GlobalWrite(opCtx)); - while (1) { - BSONElement ts = op.getField("ts"); - if (!(ts.type() == Date || ts.type() == bsonTimestamp)) { - log() << "sync error: problem querying remote oplog record" << endl; - log() << "op: " << redact(op) << endl; - log() << "halting replication" << endl; - replInfo = replAllDead = "sync error: no ts found querying remote oplog record"; - throw SyncException(); - } - Timestamp last = nextOpTime; - nextOpTime = Timestamp(ts.date()); - if (!(last < nextOpTime)) { - log() << "sync error: last applied optime at slave >= nextOpTime from master" - << endl; - log() << " last: " << last << endl; - log() << " nextOpTime: " << nextOpTime << endl; - log() << " halting replication" << endl; - replInfo = replAllDead = "sync error last >= nextOpTime"; - uassert( - 10123, - "replication error last applied optime at slave >= nextOpTime from master", - false); - } - const ReplSettings& replSettings = - ReplicationCoordinator::get(opCtx)->getSettings(); - if (replSettings.getSlaveDelaySecs() != Seconds(0) && - (Seconds(time(0)) < - Seconds(nextOpTime.getSecs()) + replSettings.getSlaveDelaySecs())) { - verify(justOne); - oplogReader.putBack(op); - _sleepAdviceTime = nextOpTime.getSecs() + - durationCount<Seconds>(replSettings.getSlaveDelaySecs()) + 1; - Lock::GlobalWrite lk(opCtx); - if (n > 0) { - syncedTo = last; - save(opCtx); - } - log() << "applied " << n << " operations" << endl; - log() << "syncedTo: " << syncedTo << endl; - log() << "waiting until: " << _sleepAdviceTime << " to continue" << endl; - return okResultCode; - } - - _sync_pullOpLog_applyOperation(opCtx, op, !justOne); - n++; - - if (--b == 0) - break; - // if to here, we are doing mulpile applications in a singel write lock acquisition - if (!oplogReader.moreInCurrentBatch()) { - // break if no more in batch so we release lock while reading from the master - break; - } - op = oplogReader.nextSafe(); - } - } - } - - return okResultCode; -} - - -/* note: not yet in mutex at this point. - returns >= 0 if ok. return -1 if you want to reconnect. - return value of zero indicates no sleep necessary before next call -*/ -int ReplSource::sync(OperationContext* opCtx, int& nApplied) { - _sleepAdviceTime = 0; - ReplInfo r("sync"); - if (!serverGlobalParams.quiet.load()) { - LogstreamBuilder l = log(); - l << "syncing from "; - if (sourceName() != "main") { - l << "source:" << sourceName() << ' '; - } - l << "host:" << hostName << endl; - } - nClonedThisPass = 0; - - // FIXME Handle cases where this db isn't on default port, or default port is spec'd in - // hostName. - if ((std::string("localhost") == hostName || std::string("127.0.0.1") == hostName) && - serverGlobalParams.port == ServerGlobalParams::DefaultDBPort) { - log() << "can't sync from self (localhost). sources configuration may be wrong." << endl; - sleepsecs(5); - return -1; - } - - if (!_connect( - &oplogReader, HostAndPort(hostName), ReplicationCoordinator::get(opCtx)->getMyRID())) { - LOG(4) << "can't connect to sync source" << endl; - return -1; - } - - return _sync_pullOpLog(opCtx, nApplied); -} - -/* --------------------------------------------------------------*/ - -static bool _replMainStarted = false; - -/* -TODO: -_ source has autoptr to the cursor -_ reuse that cursor when we can -*/ - -/* returns: # of seconds to sleep before next pass - 0 = no sleep recommended - 1 = special sentinel indicating adaptive sleep recommended -*/ -int _replMain(OperationContext* opCtx, ReplSource::SourceVector& sources, int& nApplied) { - { - ReplInfo r("replMain load sources"); - Lock::GlobalWrite lk(opCtx); - ReplSource::loadAll(opCtx, sources); - - // only need this param for initial reset - _replMainStarted = true; - } - - if (sources.empty()) { - /* replication is not configured yet (for --slave) in local.sources. Poll for config it - every 20 seconds. - */ - log() << "no source given, add a master to local.sources to start replication" << endl; - return 20; - } - - int sleepAdvice = 1; - for (ReplSource::SourceVector::iterator i = sources.begin(); i != sources.end(); i++) { - ReplSource* s = i->get(); - int res = forceReconnect; - try { - res = s->sync(opCtx, nApplied); - bool moreToSync = s->haveMoreDbsToSync(); - if (res < 0) { - sleepAdvice = 3; - } else if (moreToSync) { - sleepAdvice = 0; - } else if (s->sleepAdvice()) { - sleepAdvice = s->sleepAdvice(); - } else - sleepAdvice = res; - } catch (const SyncException&) { - log() << "caught SyncException" << endl; - return 10; - } catch (const DBException& e) { - log() << "DBException " << redact(e) << endl; - replInfo = "replMain caught DBException"; - } catch (const std::exception& e) { - log() << "std::exception " << redact(e.what()) << endl; - replInfo = "replMain caught std::exception"; - } catch (...) { - log() << "unexpected exception during replication. replication will halt" << endl; - replAllDead = "caught unexpected exception during replication"; - } - if (res < 0) - s->oplogReader.resetConnection(); - } - return sleepAdvice; -} - -static void replMain(OperationContext* opCtx) { - ReplSource::SourceVector sources; - while (1) { - auto s = restartSync; - { - Lock::GlobalWrite lk(opCtx); - if (replAllDead) { - // throttledForceResyncDead can throw - if (!ReplicationCoordinator::get(opCtx)->getSettings().isAutoResyncEnabled() || - !ReplSource::throttledForceResyncDead(opCtx, "auto")) { - log() << "all sources dead: " << replAllDead << ", sleeping for 5 seconds" - << endl; - break; - } - } - - // i.e., there is only one sync thread running. we will want to change/fix this. - invariant(syncing.swap(1) == 0); - } - - try { - int nApplied = 0; - s = _replMain(opCtx, sources, nApplied); - if (s == restartSyncAfterSleep) { - if (nApplied == 0) - s = 2; - else if (nApplied > 100) { - // sleep very little - just enough that we aren't truly hammering master - sleepmillis(75); - s = restartSync; - } - } - } catch (...) { - log() << "caught exception in _replMain" << endl; - s = 4; - } - - { - Lock::GlobalWrite lk(opCtx); - invariant(syncing.swap(0) == 1); - } - - if (relinquishSyncingSome.load()) { - relinquishSyncingSome.store(0); - s = restartSyncAfterSleep; // sleep before going back in to syncing=1 - } - - if (s) { - stringstream ss; - ss << "sleep " << s << " sec before next pass"; - std::string msg = ss.str(); - if (!serverGlobalParams.quiet.load()) - log() << msg << endl; - ReplInfo r(msg.c_str()); - sleepsecs(s); - } - } -} - -static void replMasterThread() { - sleepsecs(4); - Client::initThread("replmaster"); - int toSleep = 10; - while (1) { - sleepsecs(toSleep); - - // Write a keep-alive like entry to the log. This will make things like - // printReplicationStatus() and printSlaveReplicationStatus() stay up-to-date even - // when things are idle. - const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); - OperationContext& opCtx = *opCtxPtr; - AuthorizationSession::get(opCtx.getClient())->grantInternalAuthorization(); - - UninterruptibleLockGuard noInterrupt(opCtx.lockState()); - Lock::GlobalWrite globalWrite(&opCtx, Date_t::now() + Milliseconds(1)); - if (globalWrite.isLocked()) { - toSleep = 10; - - try { - WriteUnitOfWork wuow(&opCtx); - getGlobalServiceContext()->getOpObserver()->onOpMessage(&opCtx, BSONObj()); - wuow.commit(); - } catch (...) { - log() << "caught exception in replMasterThread()" << endl; - } - } else { - LOG(5) << "couldn't logKeepalive" << endl; - toSleep = 1; - } - } -} - -static void replSlaveThread() { - sleepsecs(1); - Client::initThread("replslave"); - - const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); - OperationContext& opCtx = *opCtxPtr; - AuthorizationSession::get(opCtx.getClient())->grantInternalAuthorization(); - DisableDocumentValidation validationDisabler(&opCtx); - - while (1) { - try { - replMain(&opCtx); - sleepsecs(5); - } catch (AssertionException&) { - ReplInfo r("Assertion in replSlaveThread(): sleeping 5 minutes before retry"); - log() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl; - sleepsecs(300); - } catch (DBException& e) { - log() << "exception in replSlaveThread(): " << e.what() - << ", sleeping 5 minutes before retry" << endl; - sleepsecs(300); - } catch (...) { - log() << "error in replSlaveThread(): sleeping 5 minutes before retry" << endl; - sleepsecs(300); - } - } -} - -void startMasterSlave(OperationContext* opCtx) { - const ReplSettings& replSettings = ReplicationCoordinator::get(opCtx)->getSettings(); - if (!replSettings.isSlave() && !replSettings.isMaster()) - return; - - AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(); - - { - ReplSource temp(opCtx); // Ensures local.me is populated - } - - if (replSettings.isSlave()) { - LOG(1) << "slave=true" << endl; - stdx::thread repl_thread(replSlaveThread); - repl_thread.detach(); - } - - if (replSettings.isMaster()) { - LOG(1) << "master=true" << endl; - createOplog(opCtx); - stdx::thread t(replMasterThread); - t.detach(); - } - - if (replSettings.isFastSyncEnabled()) { - while (!_replMainStarted) // don't allow writes until we've set up from log - sleepmillis(50); - } -} -int _dummy_z; - -void pretouchN(const std::vector<BSONObj>& v, unsigned a, unsigned b) { - Client::initThreadIfNotAlready("pretouchN"); - - const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); - OperationContext& opCtx = *opCtxPtr; // XXX - Lock::GlobalRead lk(&opCtx); - - for (unsigned i = a; i <= b; i++) { - const BSONObj& op = v[i]; - const char* which = "o"; - const char* opType = op.getStringField("op"); - if (*opType == 'i') - ; - else if (*opType == 'u') - which = "o2"; - else - continue; - /* todo : other operations */ - - try { - BSONObj o = op.getObjectField(which); - BSONElement _id; - if (o.getObjectID(_id)) { - const char* ns = op.getStringField("ns"); - BSONObjBuilder b; - b.append(_id); - BSONObj result; - OldClientContext ctx(&opCtx, ns, false); - if (Helpers::findById(&opCtx, ctx.db(), ns, b.done(), result)) - _dummy_z += result.objsize(); // touch - } - } catch (DBException& e) { - log() << "ignoring assertion in pretouchN() " << a << ' ' << b << ' ' << i << ' ' - << redact(e) << endl; - } - } -} - -void pretouchOperation(OperationContext* opCtx, const BSONObj& op) { - if (opCtx->lockState()->isWriteLocked()) { - // no point pretouching if write locked. not sure if this will ever fire, but just in case. - return; - } - - const char* which = "o"; - const char* opType = op.getStringField("op"); - if (*opType == 'i') - ; - else if (*opType == 'u') - which = "o2"; - else - return; - /* todo : other operations */ - - try { - BSONObj o = op.getObjectField(which); - BSONElement _id; - if (o.getObjectID(_id)) { - const char* ns = op.getStringField("ns"); - BSONObjBuilder b; - b.append(_id); - BSONObj result; - AutoGetCollectionForReadCommand ctx(opCtx, NamespaceString(ns)); - if (Helpers::findById(opCtx, ctx.getDb(), ns, b.done(), result)) { - _dummy_z += result.objsize(); // touch - } - } - } catch (DBException&) { - log() << "ignoring assertion in pretouchOperation()" << endl; - } -} - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h deleted file mode 100644 index 3c96386080f..00000000000 --- a/src/mongo/db/repl/master_slave.h +++ /dev/null @@ -1,219 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include "mongo/db/repl/oplogreader.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/util/concurrency/thread_pool.h" - -/* replication data overview - - at the slave: - local.sources { host: ..., source: ..., only: ..., syncedTo: ..., localLogTs: ..., - dbsNextPass: { ... }, incompleteCloneDbs: { ... } } - - at the master: - local.oplog.$<source> -*/ - -namespace mongo { - -class Database; -class OperationContext; - -namespace repl { - -// Main entry point for master/slave at startup time. -void startMasterSlave(OperationContext* opCtx); - -// externed for use with resync.cpp -extern AtomicInt32 relinquishSyncingSome; -extern AtomicInt32 syncing; - -extern const char* replInfo; - -/* A replication exception */ -class SyncException final : public DBException { -public: - SyncException() : DBException(Status(ErrorCodes::Error(10001), "sync exception")) {} - -private: - void defineOnlyInFinalSubclassToPreventSlicing() final {} -}; - -/* A Source is a source from which we can pull (replicate) data. - stored in collection local.sources. - - Can be a group of things to replicate for several databases. - - { host: ..., source: ..., only: ..., syncedTo: ..., dbsNextPass: { ... }, - incompleteCloneDbs: { ... } } - - 'source' defaults to 'main'; support for multiple source names is - not done (always use main for now). -*/ -class ReplSource { - std::shared_ptr<ThreadPool> tp; - - void resync(OperationContext* opCtx, const std::string& dbName); - - /** @param alreadyLocked caller already put us in write lock if true */ - void _sync_pullOpLog_applyOperation(OperationContext* opCtx, BSONObj& op, bool alreadyLocked); - - /* pull some operations from the master's oplog, and apply them. - calls sync_pullOpLog_applyOperation - */ - int _sync_pullOpLog(OperationContext* opCtx, int& nApplied); - - /* we only clone one database per pass, even if a lot need done. This helps us - avoid overflowing the master's transaction log by doing too much work before going - back to read more transactions. (Imagine a scenario of slave startup where we try to - clone 100 databases in one pass.) - */ - std::set<std::string> addDbNextPass; - - std::set<std::string> incompleteCloneDbs; - - /// TODO(spencer): Remove this once the LegacyReplicationCoordinator is gone. - BSONObj _me; - - /** - * Flag to control if a handshake is done on a connection or not. Default to false - * until all databases are cloned. - */ - bool _doHandshake = false; - - void resyncDrop(OperationContext* opCtx, const std::string& dbName); - // call without the db mutex - void syncToTailOfRemoteLog(); - std::string ns() const { - return std::string("local.oplog.$") + sourceName(); - } - unsigned _sleepAdviceTime; - - /** - * If 'db' is a new database and its name would conflict with that of - * an existing database, synchronize these database names with the - * master. - * @return true iff an op with the specified ns may be applied. - */ - bool handleDuplicateDbName(OperationContext* opCtx, - const BSONObj& op, - const char* ns, - const char* db); - - // populates _me so that it can be passed to oplogreader for handshakes - /// TODO(spencer): Remove this function once the LegacyReplicationCoordinator is gone. - void ensureMe(OperationContext* opCtx); - - void forceResync(OperationContext* opCtx, const char* requester); - - bool _connect(OplogReader* reader, const HostAndPort& host, const OID& myRID); - - Status _updateIfDoneWithInitialSync(OperationContext* opCtx); - -public: - OplogReader oplogReader; - - void applyCommand(OperationContext* opCtx, const BSONObj& op); - void applyOperation(OperationContext* opCtx, Database* db, const BSONObj& op); - std::string hostName; // ip addr or hostname plus optionally, ":<port>" - std::string _sourceName; // a logical source name. - std::string sourceName() const { - return _sourceName.empty() ? "main" : _sourceName; - } - - // only a certain db. note that in the sources collection, this may not be changed once you - // start replicating. - std::string only; - - /* the last time point we have already synced up to (in the remote/master's oplog). */ - Timestamp syncedTo; - - int nClonedThisPass; - - typedef std::vector<std::shared_ptr<ReplSource>> SourceVector; - static void loadAll(OperationContext* opCtx, SourceVector&); - - explicit ReplSource(OperationContext* opCtx, BSONObj); - // This is not the constructor you are looking for. Always prefer the version that takes - // a BSONObj. This is public only as a hack so that the ReplicationCoordinator can find - // out the process's RID in master/slave setups. - ReplSource(OperationContext* opCtx); - - /* -1 = error */ - int sync(OperationContext* opCtx, int& nApplied); - - void save(OperationContext* opCtx); // write ourself to local.sources - - // make a jsobj from our member fields of the form - // { host: ..., source: ..., syncedTo: ... } - BSONObj jsobj(); - - bool operator==(const ReplSource& r) const { - return hostName == r.hostName && sourceName() == r.sourceName(); - } - std::string toString() const { - return sourceName() + "@" + hostName; - } - - bool haveMoreDbsToSync() const { - return !addDbNextPass.empty(); - } - int sleepAdvice() const { - if (!_sleepAdviceTime) - return 0; - int wait = _sleepAdviceTime - unsigned(time(0)); - return wait > 0 ? wait : 0; - } - - static bool throttledForceResyncDead(OperationContext* opCtx, const char* requester); - static void forceResyncDead(OperationContext* opCtx, const char* requester); -}; - -/** - * Helper class used to set and query an ignore state for a named database. - * The ignore state will expire after a specified Timestamp. - */ -class DatabaseIgnorer { -public: - /** Indicate that operations for 'db' should be ignored until after 'futureOplogTime' */ - void doIgnoreUntilAfter(const std::string& db, const Timestamp& futureOplogTime); - /** - * Query ignore state of 'db'; if 'currentOplogTime' is after the ignore - * limit, the ignore state will be cleared. - */ - bool ignoreAt(const std::string& db, const Timestamp& currentOplogTime); - -private: - std::map<std::string, Timestamp> _ignores; -}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp index 95dd56ea92b..e173b709602 100644 --- a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp +++ b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp @@ -71,10 +71,11 @@ void MockReplCoordServerFixture::setUp() { repl::ReplSettings replSettings; replSettings.setReplSetString( ConnectionString::forReplicaSet("sessionTxnStateTest", {HostAndPort("a:1")}).toString()); - replSettings.setMaster(true); repl::ReplicationCoordinator::set( service, stdx::make_unique<repl::ReplicationCoordinatorMock>(service, replSettings)); + ASSERT_OK( + repl::ReplicationCoordinator::get(service)->setFollowerMode(repl::MemberState::RS_PRIMARY)); // Note: internal code does not allow implicit creation of non-capped oplog collection. DBDirectClient client(opCtx()); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index bd7e730c7a0..eba01f1076c 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -917,7 +917,6 @@ std::map<std::string, ApplyOpMetadata> opsMap = { } // namespace constexpr StringData OplogApplication::kInitialSyncOplogApplicationMode; -constexpr StringData OplogApplication::kMasterSlaveOplogApplicationMode; constexpr StringData OplogApplication::kRecoveringOplogApplicationMode; constexpr StringData OplogApplication::kSecondaryOplogApplicationMode; constexpr StringData OplogApplication::kApplyOpsCmdOplogApplicationMode; @@ -926,8 +925,6 @@ StringData OplogApplication::modeToString(OplogApplication::Mode mode) { switch (mode) { case OplogApplication::Mode::kInitialSync: return OplogApplication::kInitialSyncOplogApplicationMode; - case OplogApplication::Mode::kMasterSlave: - return OplogApplication::kMasterSlaveOplogApplicationMode; case OplogApplication::Mode::kRecovering: return OplogApplication::kRecoveringOplogApplicationMode; case OplogApplication::Mode::kSecondary: @@ -941,8 +938,6 @@ StringData OplogApplication::modeToString(OplogApplication::Mode mode) { StatusWith<OplogApplication::Mode> OplogApplication::parseMode(const std::string& mode) { if (mode == OplogApplication::kInitialSyncOplogApplicationMode) { return OplogApplication::Mode::kInitialSync; - } else if (mode == OplogApplication::kMasterSlaveOplogApplicationMode) { - return OplogApplication::Mode::kMasterSlave; } else if (mode == OplogApplication::kRecoveringOplogApplicationMode) { return OplogApplication::Mode::kRecovering; } else if (mode == OplogApplication::kSecondaryOplogApplicationMode) { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index f6ea00d0067..1120b96c152 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -183,7 +183,6 @@ std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement class OplogApplication { public: static constexpr StringData kInitialSyncOplogApplicationMode = "InitialSync"_sd; - static constexpr StringData kMasterSlaveOplogApplicationMode = "MasterSlave"_sd; static constexpr StringData kRecoveringOplogApplicationMode = "Recovering"_sd; static constexpr StringData kSecondaryOplogApplicationMode = "Secondary"_sd; static constexpr StringData kApplyOpsCmdOplogApplicationMode = "ApplyOps"_sd; @@ -192,9 +191,6 @@ public: // Used during the oplog application phase of the initial sync process. kInitialSync, - // Used when a slave is applying operations from a master node in master-slave. - kMasterSlave, - // Used when we are applying oplog operations to recover the database state following an // unclean shutdown, or when we are recovering from the oplog after we rollback to a // checkpoint. diff --git a/src/mongo/db/repl/repl_settings.cpp b/src/mongo/db/repl/repl_settings.cpp index 5f73147ee47..3fb69df5d3d 100644 --- a/src/mongo/db/repl/repl_settings.cpp +++ b/src/mongo/db/repl/repl_settings.cpp @@ -28,11 +28,10 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication -#include "mongo/db/repl/repl_settings.h" +#include "mongo/platform/basic.h" -#include <string> +#include "mongo/db/repl/repl_settings.h" -#include "mongo/db/repl/bgsync.h" #include "mongo/util/log.h" namespace mongo { @@ -52,42 +51,11 @@ bool ReplSettings::usingReplSets() const { /** * Getters */ -bool ReplSettings::isSlave() const { - return _slave; -} - -bool ReplSettings::isMaster() const { - return _master; -} - -bool ReplSettings::isFastSyncEnabled() const { - return _fastSyncEnabled; -} - -bool ReplSettings::isAutoResyncEnabled() const { - return _autoResyncEnabled; -} - -Seconds ReplSettings::getSlaveDelaySecs() const { - return _slaveDelaySecs; -} - -int ReplSettings::getPretouch() const { - return _pretouch; -} long long ReplSettings::getOplogSizeBytes() const { return _oplogSizeBytes; } -std::string ReplSettings::getSource() const { - return _source; -} - -std::string ReplSettings::getOnly() const { - return _only; -} - std::string ReplSettings::getReplSetString() const { return _replSetString; } @@ -103,42 +71,11 @@ bool ReplSettings::isPrefetchIndexModeSet() const { /** * Setters */ -void ReplSettings::setSlave(bool slave) { - _slave = slave; -} - -void ReplSettings::setMaster(bool master) { - _master = master; -} - -void ReplSettings::setFastSyncEnabled(bool fastSyncEnabled) { - _fastSyncEnabled = fastSyncEnabled; -} - -void ReplSettings::setAutoResyncEnabled(bool autoResyncEnabled) { - _autoResyncEnabled = autoResyncEnabled; -} - -void ReplSettings::setSlaveDelaySecs(int slaveDelay) { - _slaveDelaySecs = Seconds(slaveDelay); -} - -void ReplSettings::setPretouch(int pretouch) { - _pretouch = pretouch; -} void ReplSettings::setOplogSizeBytes(long long oplogSizeBytes) { _oplogSizeBytes = oplogSizeBytes; } -void ReplSettings::setSource(std::string source) { - _source = source; -} - -void ReplSettings::setOnly(std::string only) { - _only = only; -} - void ReplSettings::setReplSetString(std::string replSetString) { _replSetString = replSetString; } diff --git a/src/mongo/db/repl/repl_settings.h b/src/mongo/db/repl/repl_settings.h index ce804fb3502..1a74414dade 100644 --- a/src/mongo/db/repl/repl_settings.h +++ b/src/mongo/db/repl/repl_settings.h @@ -28,11 +28,9 @@ #pragma once -#include <set> #include <string> #include "mongo/db/jsobj.h" -#include "mongo/util/concurrency/mutex.h" namespace mongo { namespace repl { @@ -56,15 +54,7 @@ public: /** * Getters */ - bool isSlave() const; - bool isMaster() const; - bool isFastSyncEnabled() const; - bool isAutoResyncEnabled() const; - Seconds getSlaveDelaySecs() const; - int getPretouch() const; long long getOplogSizeBytes() const; - std::string getSource() const; - std::string getOnly() const; std::string getReplSetString() const; /** @@ -82,38 +72,13 @@ public: /** * Setters */ - void setSlave(bool slave); - void setMaster(bool master); - void setFastSyncEnabled(bool fastSyncEnabled); - void setAutoResyncEnabled(bool autoResyncEnabled); - void setSlaveDelaySecs(int slaveDelay); - void setPretouch(int pretouch); void setOplogSizeBytes(long long oplogSizeBytes); - void setSource(std::string source); - void setOnly(std::string only); void setReplSetString(std::string replSetString); void setPrefetchIndexMode(std::string prefetchIndexModeString); private: - /** - * true means we are master and doing replication. If we are not writing to oplog, this won't - * be true. - */ - bool _master = false; - - // replication slave? (possibly with slave) - bool _slave = false; - - bool _fastSyncEnabled = false; - bool _autoResyncEnabled = false; - Seconds _slaveDelaySecs = Seconds(0); long long _oplogSizeBytes = 0; // --oplogSize - // for master/slave replication - std::string _source; // --source - std::string _only; // --only - int _pretouch = 0; // --pretouch for replication application (experimental) - std::string _replSetString; // --replSet[/<seedlist>] // --indexPrefetch diff --git a/src/mongo/db/repl/replication_coordinator.cpp b/src/mongo/db/repl/replication_coordinator.cpp index 9c603c4db71..288cabf18a6 100644 --- a/src/mongo/db/repl/replication_coordinator.cpp +++ b/src/mongo/db/repl/replication_coordinator.cpp @@ -45,9 +45,6 @@ const auto getReplicationCoordinator = ReplicationCoordinator::ReplicationCoordinator() {} ReplicationCoordinator::~ReplicationCoordinator() {} -// TODO(dannenberg) remove when master slave is removed -const char* replAllDead = 0; - ReplicationCoordinator* ReplicationCoordinator::get(ServiceContext* service) { return getReplicationCoordinator(service).get(); } diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index bce17fa9ea6..96be0d7be59 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -80,16 +80,6 @@ class ReplSetRequestVotesResponse; class UpdatePositionArgs; /** - * Global variable that contains a std::string telling why master/slave halted - * - * "dead" means something really bad happened like replication falling completely out of sync. - * when non-null, we are dead and the string is informational - * - * TODO(dannenberg) remove when master slave goes - */ -extern const char* replAllDead; - -/** * The ReplicationCoordinator is responsible for coordinating the interaction of replication * with the rest of the system. The public methods on ReplicationCoordinator are the public * API that the replication subsystem presents to the rest of the codebase. diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 17fe69b8a0d..38697e94c1c 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -97,11 +97,6 @@ public: virtual void stopDataReplication(OperationContext* opCtx) = 0; /** - * Starts the Master/Slave threads and sets up logOp - */ - virtual void startMasterSlave(OperationContext* opCtx) = 0; - - /** * Performs any necessary external state specific shutdown tasks, such as cleaning up * the threads it started. */ diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 474b15506af..00f2d968489 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -57,7 +57,6 @@ #include "mongo/db/repl/drop_pending_collection_reaper.h" #include "mongo/db/repl/isself.h" #include "mongo/db/repl/last_vote.h" -#include "mongo/db/repl/master_slave.h" #include "mongo/db/repl/member_state.h" #include "mongo/db/repl/noop_writer.h" #include "mongo/db/repl/oplog.h" @@ -325,10 +324,6 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s _startedThreads = true; } -void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* opCtx) { - repl::startMasterSlave(opCtx); -} - void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx) { UniqueLock lk(_threadMutex); if (!_startedThreads) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 93e8149e3b3..3d124a24ef2 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -74,7 +74,6 @@ public: virtual bool isInitialSyncFlagSet(OperationContext* opCtx) override; - virtual void startMasterSlave(OperationContext* opCtx); virtual void shutdown(OperationContext* opCtx); virtual executor::TaskExecutor* getTaskExecutor() const override; virtual ThreadPool* getDbWorkThreadPool() const override; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 29648d65bad..5f88d85e0ec 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -72,8 +72,6 @@ void ReplicationCoordinatorExternalStateMock::startSteadyStateReplication(Operat void ReplicationCoordinatorExternalStateMock::stopDataReplication(OperationContext*) {} -void ReplicationCoordinatorExternalStateMock::startMasterSlave(OperationContext*) {} - Status ReplicationCoordinatorExternalStateMock::runRepairOnLocalDB(OperationContext* opCtx) { return Status::OK(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 693a95ce4e2..2d7f5563bba 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -62,7 +62,6 @@ public: virtual void stopDataReplication(OperationContext* opCtx) override; virtual bool isInitialSyncFlagSet(OperationContext* opCtx) override; - virtual void startMasterSlave(OperationContext*); virtual void shutdown(OperationContext* opCtx); virtual executor::TaskExecutor* getTaskExecutor() const override; virtual ThreadPool* getDbWorkThreadPool() const override; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f3393400981..bbf1aa7d020 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -303,9 +303,6 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& if (settings.usingReplSets()) { return ReplicationCoordinator::modeReplSet; } - if (settings.isMaster() || settings.isSlave()) { - return ReplicationCoordinator::modeMasterSlave; - } return ReplicationCoordinator::modeNone; } @@ -347,7 +344,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( _rsConfigState(kConfigPreStart), _selfIndex(-1), _sleptLastElection(false), - _canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.isSlave())), + _canAcceptNonLocalWrites(!settings.usingReplSets()), _canServeNonLocalReads(0U), _replicationProcess(replicationProcess), _storage(storage), @@ -736,6 +733,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { _setConfigState_inlock(kConfigReplicationDisabled); return; } + invariant(_settings.usingReplSets()); { OID rid = _externalState->ensureMe(opCtx); @@ -747,13 +745,6 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { _topCoord->setMyRid(rid); } - if (!_settings.usingReplSets()) { - // Must be Master/Slave - invariant(_settings.isMaster() || _settings.isSlave()); - _externalState->startMasterSlave(opCtx); - return; - } - _replExecutor->startup(); { @@ -1752,29 +1743,13 @@ void ReplicationCoordinatorImpl::_handleTimePassing( } bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() { - if (_settings.usingReplSets()) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (getReplicationMode() == modeReplSet && _getMemberState_inlock().primary()) { - return true; - } - return false; - } - - if (!_settings.isSlave()) - return true; - - - // TODO(dannenberg) replAllDead is bad and should be removed when master slave is removed - if (replAllDead) { - return false; - } - - if (_settings.isMaster()) { - // if running with --master --slave, allow. + if (!_settings.usingReplSets()) { return true; } - return false; + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(getReplicationMode() == modeReplSet); + return _getMemberState_inlock().primary(); } bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(OperationContext* opCtx, @@ -1786,20 +1761,18 @@ bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(OperationContext* op bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationContext* opCtx, StringData dbName) { - // _canAcceptNonLocalWrites is always true for standalone nodes, always false for nodes - // started with --slave, and adjusted based on primary+drain state in replica sets. + // _canAcceptNonLocalWrites is always true for standalone nodes, and adjusted based on + // primary+drain state in replica sets. // - // That is, stand-alone nodes, non-slave nodes and drained replica set primaries can always - // accept writes. Similarly, writes are always permitted to the "local" database. Finally, - // in the event that a node is started with --slave and --master, we allow writes unless the - // master/slave system has set the replAllDead flag. + // Stand-alone nodes and drained replica set primaries can always accept writes. Writes are + // always permitted to the "local" database. if (_canAcceptNonLocalWrites || alwaysAllowNonLocalWrites(*opCtx)) { return true; } if (dbName == kLocalDB) { return true; } - return !replAllDead && _settings.isMaster(); + return false; } bool ReplicationCoordinatorImpl::canAcceptWritesFor(OperationContext* opCtx, diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 25141375b23..bbc70047e36 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -78,16 +78,13 @@ const ReplSettings& ReplicationCoordinatorMock::getSettings() const { } bool ReplicationCoordinatorMock::isReplEnabled() const { - return _settings.usingReplSets() || _settings.isMaster() || _settings.isSlave(); + return _settings.usingReplSets(); } ReplicationCoordinator::Mode ReplicationCoordinatorMock::getReplicationMode() const { if (_settings.usingReplSets()) { return modeReplSet; } - if (_settings.isMaster() || _settings.isSlave()) { - return modeMasterSlave; - } return modeNone; } @@ -141,7 +138,7 @@ bool ReplicationCoordinatorMock::canAcceptWritesForDatabase(OperationContext* op if (_alwaysAllowWrites) { return true; } - return dbName == "local" || _memberState.primary() || _settings.isMaster(); + return dbName == "local" || _memberState.primary(); } bool ReplicationCoordinatorMock::canAcceptWritesForDatabase_UNSAFE(OperationContext* opCtx, @@ -494,10 +491,6 @@ void ReplicationCoordinatorMock::alwaysAllowWrites(bool allowWrites) { _alwaysAllowWrites = allowWrites; } -void ReplicationCoordinatorMock::setMaster(bool isMaster) { - _settings.setMaster(isMaster); -} - Status ReplicationCoordinatorMock::abortCatchupIfNeeded() { return Status::OK(); } diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index fc33d735f13..7ee9664690e 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -46,7 +46,6 @@ #include "mongo/db/ops/write_ops.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/is_master_response.h" -#include "mongo/db/repl/master_slave.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/replication_coordinator.h" @@ -88,15 +87,8 @@ void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int return; } - // TODO(dannenberg) replAllDead is bad and should be removed when master slave is removed - if (replAllDead) { - result.append("ismaster", 0); - string s = string("dead: ") + replAllDead; - result.append("info", s); - } else { - result.appendBool("ismaster", - ReplicationCoordinator::get(opCtx)->isMasterForReportingPurposes()); - } + result.appendBool("ismaster", + ReplicationCoordinator::get(opCtx)->isMasterForReportingPurposes()); if (level) { BSONObjBuilder sources(result.subarrayStart("sources")); diff --git a/src/mongo/db/repl/resync.cpp b/src/mongo/db/repl/resync.cpp deleted file mode 100644 index 4141485cb93..00000000000 --- a/src/mongo/db/repl/resync.cpp +++ /dev/null @@ -1,160 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#include "mongo/db/commands.h" -#include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/repl/master_slave.h" // replSettings -#include "mongo/db/repl/replication_coordinator.h" - -namespace mongo { - -using std::string; -using std::stringstream; - -namespace repl { - -namespace { - -constexpr StringData kResyncFieldName = "resync"_sd; -constexpr StringData kWaitFieldName = "wait"_sd; - -} // namespace - -// operator requested resynchronization of replication (on a slave or secondary). {resync: 1} -class CmdResync : public ErrmsgCommandDeprecated { -public: - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kAlways; - } - virtual bool adminOnly() const { - return true; - } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { - return false; - } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) const { - ActionSet actions; - actions.addAction(ActionType::resync); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - - std::string help() const override { - return "resync (from scratch) a stale slave or replica set secondary node.\n"; - } - - CmdResync() : ErrmsgCommandDeprecated(kResyncFieldName) {} - virtual bool errmsgRun(OperationContext* opCtx, - const string& dbname, - const BSONObj& cmdObj, - string& errmsg, - BSONObjBuilder& result) { - bool waitForResync = !cmdObj.hasField(kWaitFieldName) || cmdObj[kWaitFieldName].trueValue(); - - // Replica set resync. - ReplicationCoordinator* replCoord = ReplicationCoordinator::get(opCtx); - if (replCoord->getSettings().usingReplSets()) { - // Resync is disabled in production on replica sets until it stabilizes (SERVER-27081). - if (!Command::testCommandsEnabled) { - return CommandHelpers::appendCommandStatus( - result, - Status(ErrorCodes::OperationFailed, - "Replica sets do not support the resync command")); - } - - { - // Need global write lock to transition out of SECONDARY - Lock::GlobalWrite globalWriteLock(opCtx); - - const MemberState memberState = replCoord->getMemberState(); - if (memberState.startup()) { - return CommandHelpers::appendCommandStatus( - result, Status(ErrorCodes::NotYetInitialized, "no replication yet active")); - } - if (memberState.primary()) { - return CommandHelpers::appendCommandStatus( - result, Status(ErrorCodes::NotSecondary, "primaries cannot resync")); - } - auto status = replCoord->setFollowerMode(MemberState::RS_STARTUP2); - if (!status.isOK()) { - return CommandHelpers::appendCommandStatus( - result, - status.withContext( - "Failed to transition to STARTUP2 state to perform resync")); - } - } - uassertStatusOKWithLocation(replCoord->resyncData(opCtx, waitForResync), "resync", 0); - return true; - } - - // Master/Slave resync. - Lock::GlobalWrite globalWriteLock(opCtx); - // below this comment pertains only to master/slave replication - if (cmdObj.getBoolField("force")) { - if (!waitForSyncToFinish(opCtx, errmsg)) - return false; - replAllDead = "resync forced"; - } - // TODO(dannenberg) replAllDead is bad and should be removed when masterslave is removed - if (!replAllDead) { - errmsg = "not dead, no need to resync"; - return false; - } - if (!waitForSyncToFinish(opCtx, errmsg)) - return false; - - ReplSource::forceResyncDead(opCtx, "client"); - result.append("info", "triggered resync for all sources"); - - return true; - } - - bool waitForSyncToFinish(OperationContext* opCtx, string& errmsg) const { - // Wait for slave thread to finish syncing, so sources will be be - // reloaded with new saved state on next pass. - Timer t; - while (1) { - if (syncing.load() == 0 || t.millis() > 30000) - break; - { - Lock::TempRelease t(opCtx->lockState()); - relinquishSyncingSome.store(1); - sleepmillis(1); - } - } - if (syncing.load()) { - errmsg = "timeout waiting for sync() to finish"; - return false; - } - return true; - } -} cmdResync; -} // namespace repl -} // namespace mongo diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index 8ccb88af950..4eff300faf9 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -45,7 +45,6 @@ #include "mongo/db/json.h" #include "mongo/db/op_observer_impl.h" #include "mongo/db/ops/update.h" -#include "mongo/db/repl/master_slave.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator.h" @@ -121,12 +120,13 @@ public: ASSERT_OK(sc->getTransportLayer()->start()); ReplSettings replSettings; - replSettings.setOplogSizeBytes(10 * 1024 * 1024); - replSettings.setMaster(true); + replSettings.setReplSetString("rs0/host1"); ReplicationCoordinator::set( getGlobalServiceContext(), std::unique_ptr<repl::ReplicationCoordinator>( new repl::ReplicationCoordinatorMock(_opCtx.getServiceContext(), replSettings))); + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_PRIMARY)); // Since the Client object persists across tests, even though the global // ReplicationCoordinator does not, we need to clear the last op associated with the client @@ -177,7 +177,7 @@ protected: return "unittests.repltests"; } static const char* cllNS() { - return "local.oplog.$main"; + return "local.oplog.rs"; } BSONObj one(const BSONObj& query = BSONObj()) const { return _client.findOne(ns(), query); @@ -235,16 +235,13 @@ protected: } { OldClientContext ctx(&_opCtx, ns()); - BSONObjBuilder b; - b.append("host", "localhost"); - b.appendTimestamp("syncedTo", 0); - ReplSource a(&_opCtx, b.obj()); for (vector<BSONObj>::iterator i = ops.begin(); i != ops.end(); ++i) { if (0) { mongo::unittest::log() << "op: " << *i << endl; } repl::UnreplicatedWritesBlock uwb(&_opCtx); - a.applyOperation(&_opCtx, ctx.db(), *i); + uassertStatusOK(applyOperation_inlock( + &_opCtx, ctx.db(), *i, false, OplogApplication::Mode::kSecondary)); } } } @@ -329,9 +326,9 @@ public: if (mongo::storageGlobalParams.engine == "mobile") { return; } - ASSERT_EQUALS(2, opCount()); + ASSERT_EQUALS(1, opCount()); _client.insert(ns(), fromjson("{\"a\":\"b\"}")); - ASSERT_EQUALS(3, opCount()); + ASSERT_EQUALS(2, opCount()); } }; @@ -1346,50 +1343,6 @@ public: } }; -class DatabaseIgnorerBasic { -public: - void run() { - // Replication is not supported by mobile SE. - if (mongo::storageGlobalParams.engine == "mobile") { - return; - } - DatabaseIgnorer d; - ASSERT(!d.ignoreAt("a", Timestamp(4, 0))); - d.doIgnoreUntilAfter("a", Timestamp(5, 0)); - ASSERT(d.ignoreAt("a", Timestamp(4, 0))); - ASSERT(!d.ignoreAt("b", Timestamp(4, 0))); - ASSERT(d.ignoreAt("a", Timestamp(4, 10))); - ASSERT(d.ignoreAt("a", Timestamp(5, 0))); - ASSERT(!d.ignoreAt("a", Timestamp(5, 1))); - // Ignore state is expired. - ASSERT(!d.ignoreAt("a", Timestamp(4, 0))); - } -}; - -class DatabaseIgnorerUpdate { -public: - void run() { - // Replication is not supported by mobile SE. - if (mongo::storageGlobalParams.engine == "mobile") { - return; - } - DatabaseIgnorer d; - d.doIgnoreUntilAfter("a", Timestamp(5, 0)); - d.doIgnoreUntilAfter("a", Timestamp(6, 0)); - ASSERT(d.ignoreAt("a", Timestamp(5, 5))); - ASSERT(d.ignoreAt("a", Timestamp(6, 0))); - ASSERT(!d.ignoreAt("a", Timestamp(6, 1))); - - d.doIgnoreUntilAfter("a", Timestamp(5, 0)); - d.doIgnoreUntilAfter("a", Timestamp(6, 0)); - d.doIgnoreUntilAfter("a", Timestamp(6, 0)); - d.doIgnoreUntilAfter("a", Timestamp(5, 0)); - ASSERT(d.ignoreAt("a", Timestamp(5, 5))); - ASSERT(d.ignoreAt("a", Timestamp(6, 0))); - ASSERT(!d.ignoreAt("a", Timestamp(6, 1))); - } -}; - class SyncTest : public SyncTail { public: bool returnEmpty; @@ -1511,8 +1464,6 @@ public: add<Idempotence::ReplaySetPreexistingNoOpPull>(); add<Idempotence::ReplayArrayFieldNotAppended>(); add<DeleteOpIsIdBased>(); - add<DatabaseIgnorerBasic>(); - add<DatabaseIgnorerUpdate>(); add<FetchAndInsertMissingDocument>(); } }; diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp index cbf5099a2df..f9a54847fd2 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/s/sharding_mongod_test_fixture.cpp @@ -105,7 +105,6 @@ void ShardingMongodTestFixture::setUp() { repl::ReplSettings replSettings; replSettings.setReplSetString(ConnectionString::forReplicaSet(_setName, _servers).toString()); - replSettings.setMaster(true); auto replCoordPtr = makeReplicationCoordinator(replSettings); _replCoord = replCoordPtr.get(); @@ -156,7 +155,10 @@ void ShardingMongodTestFixture::setUp() { std::unique_ptr<ReplicationCoordinatorMock> ShardingMongodTestFixture::makeReplicationCoordinator( ReplSettings replSettings) { - return stdx::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext(), replSettings); + auto coordinator = + stdx::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext(), replSettings); + ASSERT_OK(coordinator->setFollowerMode(repl::MemberState::RS_PRIMARY)); + return coordinator; } std::unique_ptr<executor::TaskExecutorPool> ShardingMongodTestFixture::makeTaskExecutorPool() { |