summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2018-03-08 10:12:42 -0500
committerAndy Schwerin <schwerin@mongodb.com>2018-03-13 15:09:53 -0400
commit64e649a622b5ac4c1bfad0933132dc7d994b9458 (patch)
treeff907cca516228300489ed9351819a0bb9c8193b
parent5ce39d9dd292dda65d59dbb18bdc176ea2b528a0 (diff)
downloadmongo-64e649a622b5ac4c1bfad0933132dc7d994b9458.tar.gz
SERVER-31802 SERVER-31239 Remove master-slave replication and resync command.
-rw-r--r--buildscripts/resmokeconfig/suites/jstestfuzz_replication_resync.yml38
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_resync_static_jscore_passthrough.yml44
-rw-r--r--buildscripts/resmokelib/testing/hooks/initialsync.py50
-rw-r--r--etc/evergreen.yml46
-rw-r--r--jstests/auth/lib/commands_lib.js15
-rw-r--r--jstests/core/views/views_all_commands.js1
-rw-r--r--jstests/replsets/initial_sync_document_validation.js5
-rw-r--r--jstests/replsets/initial_sync_fail_insert_once.js5
-rw-r--r--jstests/replsets/read_committed_no_snapshots.js43
-rw-r--r--jstests/replsets/resync.js119
-rw-r--r--jstests/replsets/resync_with_write_load.js93
-rw-r--r--src/mongo/db/commands/count_cmd.cpp5
-rw-r--r--src/mongo/db/db.cpp11
-rw-r--r--src/mongo/db/logical_clock_test_fixture.cpp3
-rw-r--r--src/mongo/db/mongod_options.cpp112
-rw-r--r--src/mongo/db/repair_database_and_check_version.cpp3
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/master_slave.cpp1462
-rw-r--r--src/mongo/db/repl/master_slave.h219
-rw-r--r--src/mongo/db/repl/mock_repl_coord_server_fixture.cpp3
-rw-r--r--src/mongo/db/repl/oplog.cpp5
-rw-r--r--src/mongo/db/repl/oplog.h4
-rw-r--r--src/mongo/db/repl/repl_settings.cpp67
-rw-r--r--src/mongo/db/repl/repl_settings.h35
-rw-r--r--src/mongo/db/repl/replication_coordinator.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator.h10
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp49
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp11
-rw-r--r--src/mongo/db/repl/replication_info.cpp12
-rw-r--r--src/mongo/db/repl/resync.cpp160
-rw-r--r--src/mongo/dbtests/repltests.cpp65
-rw-r--r--src/mongo/s/sharding_mongod_test_fixture.cpp6
37 files changed, 79 insertions, 2641 deletions
diff --git a/buildscripts/resmokeconfig/suites/jstestfuzz_replication_resync.yml b/buildscripts/resmokeconfig/suites/jstestfuzz_replication_resync.yml
deleted file mode 100644
index 1cd73444983..00000000000
--- a/buildscripts/resmokeconfig/suites/jstestfuzz_replication_resync.yml
+++ /dev/null
@@ -1,38 +0,0 @@
-test_kind: js_test
-
-selector:
- roots:
- - jstestfuzz/out/*.js
-
-executor:
- archive:
- hooks:
- - BackgroundInitialSync
- config:
- shell_options:
- readMode: commands
- global_vars:
- TestData:
- ignoreCommandsIncompatibleWithInitialSync: true
- hooks:
- - class: CheckPrimary
- - class: BackgroundInitialSync
- use_resync: true
- n: 1
- shell_options:
- global_vars:
- TestData:
- skipValidationOnInvalidViewDefinitions: true
- fixture:
- class: ReplicaSetFixture
- mongod_options:
- verbose: ''
- set_parameters:
- disableLogicalSessionCacheRefresh: false
- enableTestCommands: 1
- logComponentVerbosity:
- replication: 2
- numInitialSyncAttempts: 1
- writePeriodicNoops: 1
- num_nodes: 2
- start_initial_sync_node: True
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_resync_static_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_resync_static_jscore_passthrough.yml
deleted file mode 100644
index 0aa9d04ddc8..00000000000
--- a/buildscripts/resmokeconfig/suites/replica_sets_resync_static_jscore_passthrough.yml
+++ /dev/null
@@ -1,44 +0,0 @@
-test_kind: js_test
-
-selector:
- roots:
- - jstests/core/**/*.js
- exclude_files:
- # These tests are not expected to pass with replica-sets:
- - jstests/core/dbadmin.js
- - jstests/core/opcounters_write_cmd.js
- - jstests/core/read_after_optime.js
- - jstests/core/capped_update.js
- # Having duplicate namespaces is not supported and will cause initial sync to fail.
- - jstests/core/views/duplicate_ns.js
- # Restarting the catalog on the sync source will cause the downstream node to copy only a subset
- # of the required data.
- - jstests/core/restart_catalog.js
-
-run_hook_interval: &run_hook_interval 20
-executor:
- archive:
- hooks:
- - IntermediateInitialSync
- config:
- shell_options:
- eval: "testingReplication = true;"
- readMode: commands
- hooks:
- - class: CheckPrimary
- - class: IntermediateInitialSync
- use_resync: True
- n: *run_hook_interval
- - class: CleanEveryN
- n: *run_hook_interval
- fixture:
- class: ReplicaSetFixture
- mongod_options:
- set_parameters:
- enableTestCommands: 1
- numInitialSyncAttempts: 1
- num_nodes: 2
- start_initial_sync_node: True
- replset_config_options:
- settings:
- chainingAllowed: False
diff --git a/buildscripts/resmokelib/testing/hooks/initialsync.py b/buildscripts/resmokelib/testing/hooks/initialsync.py
index f3a5311dab4..328c4ac182e 100644
--- a/buildscripts/resmokelib/testing/hooks/initialsync.py
+++ b/buildscripts/resmokelib/testing/hooks/initialsync.py
@@ -23,9 +23,7 @@ class BackgroundInitialSync(interface.Hook):
validates it, tears it down, and restarts it.
This test accepts a parameter 'n' that specifies a number of tests after which it will wait for
- replication to finish before validating and restarting the initial sync node. It also accepts
- a parameter 'use_resync' for whether to restart the initial sync node with resync or by
- shutting it down and restarting it.
+ replication to finish before validating and restarting the initial sync node.
This requires the ReplicaSetFixture to be started with 'start_initial_sync_node=True'. If used
at the same time as CleanEveryN, the 'n' value passed to this hook should be equal to the 'n'
@@ -34,7 +32,7 @@ class BackgroundInitialSync(interface.Hook):
DEFAULT_N = cleanup.CleanEveryN.DEFAULT_N
- def __init__(self, hook_logger, fixture, use_resync=False, n=DEFAULT_N, shell_options=None):
+ def __init__(self, hook_logger, fixture, n=DEFAULT_N, shell_options=None):
if not isinstance(fixture, replicaset.ReplicaSetFixture):
raise ValueError("`fixture` must be an instance of ReplicaSetFixture, not {}".format(
fixture.__class__.__name__))
@@ -42,7 +40,6 @@ class BackgroundInitialSync(interface.Hook):
description = "Background Initial Sync"
interface.Hook.__init__(self, hook_logger, fixture, description)
- self.use_resync = use_resync
self.n = n
self.tests_run = 0
self.random_restarts = 0
@@ -102,9 +99,8 @@ class BackgroundInitialSyncTestCase(jsfile.DynamicJSTestCase):
# If we have not restarted initial sync since the last time we ran the data
# validation, restart initial sync with a 20% probability.
if self._hook.random_restarts < 1 and random.random() < 0.2:
- hook_type = "resync" if self._hook.use_resync else "initial sync"
- self.logger.info("randomly restarting " + hook_type +
- " in the middle of " + hook_type)
+ self.logger.info(
+ "randomly restarting initial sync in the middle of initial sync")
self.__restart_init_sync(sync_node, sync_node_conn)
self._hook.random_restarts += 1
return
@@ -123,35 +119,27 @@ class BackgroundInitialSyncTestCase(jsfile.DynamicJSTestCase):
self.__restart_init_sync(sync_node, sync_node_conn)
- # Restarts initial sync by shutting down the node, clearing its data, and restarting it,
- # or by calling resync if use_resync is specified.
+ # Restarts initial sync by shutting down the node, clearing its data, and restarting it.
def __restart_init_sync(self, sync_node, sync_node_conn):
- if self._hook.use_resync:
- self.logger.info("Calling resync on initial sync node...")
- cmd = bson.SON([("resync", 1), ("wait", 0)])
- sync_node_conn.admin.command(cmd)
- else:
- # Tear down and restart the initial sync node to start initial sync again.
- sync_node.teardown()
+ # Tear down and restart the initial sync node to start initial sync again.
+ sync_node.teardown()
- self.logger.info("Starting the initial sync node back up again...")
- sync_node.setup()
- sync_node.await_ready()
+ self.logger.info("Starting the initial sync node back up again...")
+ sync_node.setup()
+ sync_node.await_ready()
class IntermediateInitialSync(interface.Hook):
"""
This hook accepts a parameter 'n' that specifies a number of tests after which it will start up
- a node to initial sync, wait for replication to finish, and then validate the data. It also
- accepts a parameter 'use_resync' for whether to restart the initial sync node with resync or by
- shutting it down and restarting it.
+ a node to initial sync, wait for replication to finish, and then validate the data.
This requires the ReplicaSetFixture to be started with 'start_initial_sync_node=True'.
"""
DEFAULT_N = cleanup.CleanEveryN.DEFAULT_N
- def __init__(self, hook_logger, fixture, use_resync=False, n=DEFAULT_N):
+ def __init__(self, hook_logger, fixture, n=DEFAULT_N):
if not isinstance(fixture, replicaset.ReplicaSetFixture):
raise ValueError("`fixture` must be an instance of ReplicaSetFixture, not {}".format(
fixture.__class__.__name__))
@@ -159,7 +147,6 @@ class IntermediateInitialSync(interface.Hook):
description = "Intermediate Initial Sync"
interface.Hook.__init__(self, hook_logger, fixture, description)
- self.use_resync = use_resync
self.n = n
self.tests_run = 0
@@ -195,16 +182,11 @@ class IntermediateInitialSyncTestCase(jsfile.DynamicJSTestCase):
sync_node = self.fixture.get_initial_sync_node()
sync_node_conn = sync_node.mongo_client()
- if self._hook.use_resync:
- self.logger.info("Calling resync on initial sync node...")
- cmd = bson.SON([("resync", 1)])
- sync_node_conn.admin.command(cmd)
- else:
- sync_node.teardown()
+ sync_node.teardown()
- self.logger.info("Starting the initial sync node back up again...")
- sync_node.setup()
- sync_node.await_ready()
+ self.logger.info("Starting the initial sync node back up again...")
+ sync_node.setup()
+ sync_node.await_ready()
# Do initial sync round.
self.logger.info("Waiting for initial sync node to go into SECONDARY state")
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index bad5cc033ea..96bf8b2adc3 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -739,7 +739,6 @@ functions:
case "${task_name}" in
replica_sets_initsync_jscore_passthrough \
|replica_sets_initsync_static_jscore_passthrough \
- |replica_sets_resync_static_jscore_passthrough \
|replica_sets* \
|sharding* \
)
@@ -3032,19 +3031,6 @@ tasks:
<<: *jstestfuzz_config_vars
resmoke_args: --suites=jstestfuzz_replication_initsync --storageEngine=wiredTiger
-## jstestfuzz initial sync replica set ##
-- <<: *jstestfuzz_template
- name: jstestfuzz_replication_resync
- commands:
- - func: "do setup"
- - func: "run jstestfuzz"
- vars:
- jstestfuzz_vars: --numGeneratedFiles 75
- - <<: *run_jstestfuzz_tests
- vars:
- <<: *jstestfuzz_config_vars
- resmoke_args: --suites=jstestfuzz_replication_resync --storageEngine=wiredTiger
-
## jstestfuzz replica set with logical session ##
- <<: *jstestfuzz_template
name: jstestfuzz_replication_session
@@ -3544,17 +3530,6 @@ tasks:
run_multiple_jobs: true
- <<: *task_template
- name: replica_sets_resync_static_jscore_passthrough
- depends_on:
- - name: jsCore
- commands:
- - func: "do setup"
- - func: "run tests"
- vars:
- resmoke_args: --suites=replica_sets_resync_static_jscore_passthrough --storageEngine=wiredTiger
- run_multiple_jobs: true
-
-- <<: *task_template
name: replica_sets_kill_secondaries_jscore_passthrough
depends_on:
- name: jsCore
@@ -4995,7 +4970,6 @@ buildvariants:
- name: replica_sets_initsync_static_jscore_passthrough
- name: replica_sets_jscore_passthrough
- name: replica_sets_kill_secondaries_jscore_passthrough
- - name: replica_sets_resync_static_jscore_passthrough
- name: replica_sets_pv0
- name: retryable_writes_jscore_passthrough
- name: retryable_writes_jscore_stepdown_passthrough
@@ -5994,9 +5968,6 @@ buildvariants:
- name: replica_sets_initsync_static_jscore_passthrough
distros:
- windows-64-vs2015-large
- - name: replica_sets_resync_static_jscore_passthrough
- distros:
- - windows-64-vs2015-large
- name: replica_sets_kill_secondaries_jscore_passthrough
distros:
- windows-64-vs2015-large
@@ -6772,9 +6743,6 @@ buildvariants:
- name: replica_sets_initsync_static_jscore_passthrough
distros:
- rhel62-large
- - name: replica_sets_resync_static_jscore_passthrough
- distros:
- - rhel62-large
- name: replica_sets_kill_secondaries_jscore_passthrough
distros:
- rhel62-large
@@ -6920,7 +6888,6 @@ buildvariants:
- name: replica_sets_jscore_passthrough
- name: replica_sets_initsync_jscore_passthrough
- name: replica_sets_initsync_static_jscore_passthrough
- - name: replica_sets_resync_static_jscore_passthrough
- name: replica_sets_kill_secondaries_jscore_passthrough
- name: retryable_writes_jscore_passthrough
- name: sasl
@@ -8998,7 +8965,6 @@ buildvariants:
- name: replica_sets_jscore_passthrough
- name: replica_sets_initsync_jscore_passthrough
- name: replica_sets_initsync_static_jscore_passthrough
- - name: replica_sets_resync_static_jscore_passthrough
- name: replica_sets_kill_secondaries_jscore_passthrough
- name: retryable_writes_jscore_passthrough
- name: retryable_writes_jscore_stepdown_passthrough
@@ -9153,7 +9119,6 @@ buildvariants:
- name: replica_sets_jscore_passthrough
- name: replica_sets_initsync_jscore_passthrough
- name: replica_sets_initsync_static_jscore_passthrough
- - name: replica_sets_resync_static_jscore_passthrough
- name: replica_sets_kill_secondaries_jscore_passthrough
- name: retryable_writes_jscore_passthrough
- name: retryable_writes_jscore_stepdown_passthrough
@@ -9283,7 +9248,6 @@ buildvariants:
- name: replica_sets_jscore_passthrough
- name: replica_sets_initsync_jscore_passthrough
- name: replica_sets_initsync_static_jscore_passthrough
- - name: replica_sets_resync_static_jscore_passthrough
- name: replica_sets_kill_secondaries_jscore_passthrough
- name: sasl
- name: sharded_collections_jscore_passthrough
@@ -9403,7 +9367,6 @@ buildvariants:
- name: jstestfuzz_concurrent_sharded_session
- name: jstestfuzz_replication
- name: jstestfuzz_replication_initsync
- - name: jstestfuzz_replication_resync
- name: jstestfuzz_replication_session
- name: jstestfuzz_sharded
- name: jstestfuzz_sharded_causal_consistency
@@ -9433,9 +9396,6 @@ buildvariants:
- name: replica_sets_jscore_passthrough
distros:
- rhel62-large
- - name: replica_sets_resync_static_jscore_passthrough
- distros:
- - rhel62-large
- name: rlp
- name: rollback_fuzzer
- name: serial_run
@@ -9529,7 +9489,6 @@ buildvariants:
- name: jstestfuzz_concurrent_sharded_session
- name: jstestfuzz_replication
- name: jstestfuzz_replication_initsync
- - name: jstestfuzz_replication_resync
- name: jstestfuzz_replication_session
- name: jstestfuzz_sharded
- name: jstestfuzz_sharded_causal_consistency
@@ -9557,9 +9516,6 @@ buildvariants:
- name: replica_sets_jscore_passthrough
distros:
- windows-64-vs2015-large
- - name: replica_sets_resync_static_jscore_passthrough
- distros:
- - windows-64-vs2015-large
- name: rollback_fuzzer
- name: serial_run
- name: sharded_collections_jscore_passthrough
@@ -9632,7 +9588,6 @@ buildvariants:
- name: jstestfuzz_concurrent_sharded_session
- name: jstestfuzz_replication
- name: jstestfuzz_replication_initsync
- - name: jstestfuzz_replication_resync
- name: jstestfuzz_replication_session
- name: jstestfuzz_sharded
- name: jstestfuzz_sharded_causal_consistency
@@ -9650,7 +9605,6 @@ buildvariants:
- name: replica_sets_initsync_jscore_passthrough
- name: replica_sets_initsync_static_jscore_passthrough
- name: replica_sets_jscore_passthrough
- - name: replica_sets_resync_static_jscore_passthrough
- name: rollback_fuzzer
- name: serial_run
- name: sharded_collections_jscore_passthrough
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js
index ea375a214f6..f40213bbaef 100644
--- a/jstests/auth/lib/commands_lib.js
+++ b/jstests/auth/lib/commands_lib.js
@@ -5199,21 +5199,6 @@ var authCommandsLib = {
]
},
{
- testname: "resync",
- command: {resync: 1},
- skipSharded: true,
- testcases: [
- {
- runOnDb: adminDbName,
- roles: {hostManager: 1, clusterManager: 1, clusterAdmin: 1, root: 1, __system: 1},
- privileges: [{resource: {cluster: true}, actions: ["resync"]}],
- expectFail: true
- },
- {runOnDb: firstDbName, roles: {}},
- {runOnDb: secondDbName, roles: {}}
- ]
- },
- {
testname: "serverStatus",
command: {serverStatus: 1},
testcases: [
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index e5f11d2c1b5..3dc22a53fac 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -443,7 +443,6 @@
replSetUpdatePosition: {skip: isUnrelated},
replSetResizeOplog: {skip: isUnrelated},
resetError: {skip: isUnrelated},
- resync: {skip: isUnrelated},
revokePrivilegesFromRole: {
command: {
revokePrivilegesFromRole: "testrole",
diff --git a/jstests/replsets/initial_sync_document_validation.js b/jstests/replsets/initial_sync_document_validation.js
index 1dd69e0e1d3..06d9388b51d 100644
--- a/jstests/replsets/initial_sync_document_validation.js
+++ b/jstests/replsets/initial_sync_document_validation.js
@@ -1,6 +1,5 @@
/**
- * Tests that initial sync via resync command does not fail if it inserts documents
- * which don't validate.
+ * Tests that initial sync does not fail if it inserts documents which don't validate.
*/
(function() {
@@ -19,7 +18,7 @@
assert.writeOK(coll.insert({_id: 0, x: 1}));
assert.commandWorked(coll.runCommand("collMod", {"validator": {a: {$exists: true}}}));
- assert.commandWorked(secondary.getDB("admin").runCommand({resync: 1}));
+ secondary = replSet.restart(secondary, {startClean: true});
replSet.awaitReplication();
replSet.awaitSecondaryNodes();
diff --git a/jstests/replsets/initial_sync_fail_insert_once.js b/jstests/replsets/initial_sync_fail_insert_once.js
index a565c64216d..d85f97f5a63 100644
--- a/jstests/replsets/initial_sync_fail_insert_once.js
+++ b/jstests/replsets/initial_sync_fail_insert_once.js
@@ -24,9 +24,8 @@
data: {collectionNS: coll.getFullName()}
}));
- jsTest.log("Issuing RESYNC command to " + tojson(secondary));
- assert.commandWorked(secondary.getDB("admin").runCommand({resync: 1}));
-
+ jsTest.log("Re-syncing " + tojson(secondary));
+ secondary = replSet.restart(secondary, {startClean: true});
replSet.awaitReplication();
replSet.awaitSecondaryNodes();
diff --git a/jstests/replsets/read_committed_no_snapshots.js b/jstests/replsets/read_committed_no_snapshots.js
index 9adfdc37502..83ba451e836 100644
--- a/jstests/replsets/read_committed_no_snapshots.js
+++ b/jstests/replsets/read_committed_no_snapshots.js
@@ -27,7 +27,7 @@ load("jstests/replsets/rslib.js"); // For reconfig and startSetIfSupportsReadMa
"members": [
{"_id": 0, "host": nodes[0]},
{"_id": 1, "host": nodes[1], priority: 0},
- {"_id": 2, "host": nodes[2], arbiterOnly: true}
+ {"_id": 2, "host": nodes[2], priority: 0}
],
"protocolVersion": 1
};
@@ -36,12 +36,15 @@ load("jstests/replsets/rslib.js"); // For reconfig and startSetIfSupportsReadMa
// Get connections and collection.
var primary = replTest.getPrimary();
- var secondary = replTest.liveNodes.slaves[0];
- var secondaryId = replTest.getNodeId(secondary);
- var db = primary.getDB(name);
+ var healthySecondary = replTest.liveNodes.slaves[0];
+ healthySecondary.setSlaveOk();
+ var noSnapshotSecondary = replTest.liveNodes.slaves[1];
+ noSnapshotSecondary.setSlaveOk();
+ assert.commandWorked(noSnapshotSecondary.adminCommand(
+ {configureFailPoint: 'disableSnapshotting', mode: 'alwaysOn'}));
// Do a write, wait for it to replicate, and ensure it is visible.
- var res = db.runCommandWithMetadata( //
+ var res = primary.getDB(name).runCommandWithMetadata( //
{
insert: "foo",
documents: [{_id: 1, state: 0}],
@@ -50,36 +53,32 @@ load("jstests/replsets/rslib.js"); // For reconfig and startSetIfSupportsReadMa
{"$replData": 1});
assert.commandWorked(res.commandReply);
- // We need to propagate the lastOpVisible from the primary as afterOpTime in the secondary to
- // ensure
- // we wait for the write to be in the majority committed view.
+ // We need to propagate the lastOpVisible from the primary as afterOpTime in the secondaries to
+ // ensure we wait for the write to be in the majority committed view.
var lastOp = res.metadata["$replData"].lastOpVisible;
- secondary.setSlaveOk();
// Timeout is based on heartbeat timeout.
- assert.commandWorked(secondary.getDB(name).foo.runCommand(
+ assert.commandWorked(healthySecondary.getDB(name).foo.runCommand(
'find',
{"readConcern": {"level": "majority", "afterOpTime": lastOp}, "maxTimeMS": 10 * 1000}));
- // Disable snapshotting via failpoint
- secondary.adminCommand({configureFailPoint: 'disableSnapshotting', mode: 'alwaysOn'});
-
- // Resync to drop any existing snapshots
- secondary.adminCommand({resync: 1});
-
// Ensure maxTimeMS times out while waiting for this snapshot
- assert.commandFailed(secondary.getDB(name).foo.runCommand(
- 'find', {"readConcern": {"level": "majority"}, "maxTimeMS": 1000}));
+ assert.commandFailedWithCode(
+ noSnapshotSecondary.getDB(name).foo.runCommand(
+ 'find', {"readConcern": {"level": "majority"}, "maxTimeMS": 1000}),
+ ErrorCodes.ExceededTimeLimit);
- // Reconfig to make the secondary the primary
+ // Reconfig to make the no-snapshot secondary the primary
var config = primary.getDB("local").system.replset.findOne();
config.members[0].priority = 0;
- config.members[1].priority = 3;
+ config.members[2].priority = 1;
config.version++;
primary = reconfig(replTest, config, true);
// Ensure maxTimeMS times out while waiting for this snapshot
- assert.commandFailed(primary.getSiblingDB(name).foo.runCommand(
- 'find', {"readConcern": {"level": "majority"}, "maxTimeMS": 1000}));
+ assert.commandFailedWithCode(
+ primary.getSiblingDB(name).foo.runCommand(
+ 'find', {"readConcern": {"level": "majority"}, "maxTimeMS": 1000}),
+ ErrorCodes.ExceededTimeLimit);
replTest.stopSet();
})();
diff --git a/jstests/replsets/resync.js b/jstests/replsets/resync.js
deleted file mode 100644
index 8a6a35ee699..00000000000
--- a/jstests/replsets/resync.js
+++ /dev/null
@@ -1,119 +0,0 @@
-// test that the resync command works with replica sets and that one does not need to manually
-// force a replica set resync by deleting all datafiles
-// Also tests that you can do this from a node that is "too stale"
-//
-// This test requires persistence in order for a restarted node with a stale oplog to stay in the
-// RECOVERING state. A restarted node with an ephemeral storage engine will not have an oplog upon
-// restart, so will immediately resync.
-// @tags: [requires_persistence]
-(function() {
- "use strict";
-
- var replTest = new ReplSetTest({
- name: 'resync',
- nodes: 3,
- oplogSize: 1,
- // At the end of this test we call resync on a node that may have blacklisted the only other
- // data bearing node. We need to ensure that the resync attempt will keep looking for a
- // sync source for at least 60 seconds, until the blacklist period ends. Since we sleep 1
- // second in between each attempt to find a sync source, setting the number of attempts to
- // find a sync source to larger than 60 should ensure that the resync attempt is able to
- // succeed.
- nodeOptions: {setParameter: "numInitialSyncConnectAttempts=90"}
- });
- var nodes = replTest.nodeList();
-
- var conns = replTest.startSet();
- var r = replTest.initiate({
- "_id": "resync",
- "members": [
- {"_id": 0, "host": nodes[0], priority: 1},
- {"_id": 1, "host": nodes[1], priority: 0},
- {"_id": 2, "host": nodes[2], arbiterOnly: true}
- ]
- });
-
- var a_conn = conns[0];
- // Make sure we have a master, and it is conns[0]
- replTest.waitForState(a_conn, ReplSetTest.State.PRIMARY);
- var b_conn = conns[1];
- a_conn.setSlaveOk();
- b_conn.setSlaveOk();
- var A = a_conn.getDB("test");
- var B = b_conn.getDB("test");
- var AID = replTest.getNodeId(a_conn);
- var BID = replTest.getNodeId(b_conn);
-
- // create an oplog entry with an insert
- assert.writeOK(
- A.foo.insert({x: 1}, {writeConcern: {w: 2, wtimeout: ReplSetTest.kDefaultTimeoutMS}}));
- assert.eq(B.foo.findOne().x, 1);
-
- // run resync and wait for it to happen
- assert.commandWorked(b_conn.getDB("admin").runCommand({resync: 1}));
- replTest.awaitReplication();
- replTest.awaitSecondaryNodes();
-
- assert.eq(B.foo.findOne().x, 1);
- replTest.stop(BID);
-
- function hasCycled() {
- var oplog = a_conn.getDB("local").oplog.rs;
- try {
- // Collection scan to determine if the oplog entry from the first insert has been
- // deleted yet.
- return oplog.find({"o.x": 1}).sort({$natural: 1}).limit(10).itcount() == 0;
- } catch (except) {
- // An error is expected in the case that capped deletions blow away the position of the
- // collection scan during a yield. In this case, we just try again.
- var errorRegex = /CappedPositionLost/;
- assert(errorRegex.test(except.message));
- return hasCycled();
- }
- }
-
- jsTestLog("Rolling over oplog");
-
- // Make sure the oplog has rolled over on the primary and secondary that is up,
- // so when we bring up the other replica it is "too stale"
- for (var cycleNumber = 0; cycleNumber < 10; cycleNumber++) {
- // insert enough to cycle oplog
- var bulk = A.foo.initializeUnorderedBulkOp();
- for (var i = 2; i < 10000; i++) {
- bulk.insert({x: i});
- }
-
- // wait for secondary to also have its oplog cycle
- assert.writeOK(bulk.execute({w: 1, wtimeout: ReplSetTest.kDefaultTimeoutMS}));
-
- if (hasCycled())
- break;
- }
-
- assert(hasCycled());
-
- jsTestLog("Restarting node B");
- // bring node B and it will enter recovery mode because its newest oplog entry is too old
- replTest.restart(BID);
-
- jsTestLog("Waiting for node B to to into RECOVERING");
- // check that it is in recovery mode
- assert.soon(function() {
- try {
- var result = b_conn.getDB("admin").runCommand({replSetGetStatus: 1});
- return (result.members[1].stateStr === "RECOVERING");
- } catch (e) {
- print(e);
- }
- }, "node didn't enter RECOVERING state");
-
- jsTestLog("Resync node B");
- // run resync and wait for it to happen
- assert.commandWorked(b_conn.getDB("admin").runCommand({resync: 1}));
- replTest.awaitReplication();
- replTest.awaitSecondaryNodes();
- assert.eq(B.foo.findOne().x, 1);
-
- replTest.stopSet(15);
- jsTest.log("success");
-})();
diff --git a/jstests/replsets/resync_with_write_load.js b/jstests/replsets/resync_with_write_load.js
deleted file mode 100644
index 10df8501b91..00000000000
--- a/jstests/replsets/resync_with_write_load.js
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * This test creates a replica set and writes during the resync call in order to verify
- * that all phases of the resync/initial-sync process work correctly.
- *
- */
-
-// Test disabled because resync still has a race where it doesn't reset its last fetched optime
-// before trying to run initial sync, which results in an upstream query with a $gte that might
-// return an empty batch.
-if (false) {
- var testName = "resync_with_write_load";
- var replTest = new ReplSetTest({name: testName, nodes: 3, oplogSize: 100});
- var nodes = replTest.nodeList();
-
- var conns = replTest.startSet();
- var config = {
- "_id": testName,
- "members": [
- {"_id": 0, "host": nodes[0]},
- {"_id": 1, "host": nodes[1], priority: 0},
- {"_id": 2, "host": nodes[2], priority: 0}
- ],
- settings: {chainingAllowed: false}
- };
- var r = replTest.initiate(config);
- replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY);
- // Make sure we have a master
- var master = replTest.getPrimary();
- var a_conn = conns[0];
- var b_conn = conns[1];
- a_conn.setSlaveOk();
- b_conn.setSlaveOk();
- var A = a_conn.getDB("test");
- var B = b_conn.getDB("test");
- var AID = replTest.getNodeId(a_conn);
- var BID = replTest.getNodeId(b_conn);
-
- assert(master == conns[0], "conns[0] assumed to be master");
- assert(a_conn.host == master.host);
-
- // create an oplog entry with an insert
- assert.writeOK(
- A.foo.insert({x: 1}, {writeConcern: {w: 3, wtimeout: ReplSetTest.kDefaultTimeoutMS}}));
-
- print("******************** starting load for 30 secs *********************");
- var work = function() {
- print("starting loadgen");
- var start = new Date().getTime();
-
- assert.writeOK(db.timeToStartTrigger.insert({_id: 1}));
-
- while (true) {
- for (x = 0; x < 100; x++) {
- db["a" + x].insert({a: x});
- }
-
- var runTime = (new Date().getTime() - start);
- if (runTime > 30000)
- break;
- else if (runTime < 5000) // back-off more during first 2 seconds
- sleep(50);
- else
- sleep(1);
- }
- print("finshing loadgen");
- };
- // insert enough that resync node has to go through oplog replay in each step
- var loadGen = startParallelShell(work, replTest.ports[0]);
-
- // wait for document to appear to continue
- assert.soon(function() {
- try {
- return 1 == master.getDB("test")["timeToStartTrigger"].find().itcount();
- } catch (e) {
- print(e);
- return false;
- }
- }, "waited too long for start trigger", 90 * 1000 /* 90 secs */);
-
- print("*************** issuing resync command ***************");
- assert.commandWorked(B.adminCommand("resync"));
-
- print("waiting for load generation to finish");
- loadGen();
-
- // Make sure oplogs & dbHashes match
- replTest.checkOplogs(testName);
- replTest.checkReplicatedDataHashes(testName);
-
- replTest.stopSet();
-
- print("*****test done******");
-}
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index a91561f6da6..85a6e1f02c8 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -41,7 +41,6 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/view_response_formatter.h"
-#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/util/log.h"
@@ -68,10 +67,6 @@ public:
}
AllowedOnSecondary secondaryAllowed(ServiceContext* serviceContext) const override {
- if (repl::ReplicationCoordinator::get(serviceContext)->getSettings().isSlave()) {
- // ok on --slave setups
- return Command::AllowedOnSecondary::kAlways;
- }
return Command::AllowedOnSecondary::kOptIn;
}
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index d52ef2343f2..85eb916a6fe 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -294,10 +294,6 @@ ExitCode _initAndListen(int listenPort) {
LogstreamBuilder l = log(LogComponent::kControl);
l << "MongoDB starting : pid=" << pid << " port=" << serverGlobalParams.port
<< " dbpath=" << storageGlobalParams.dbpath;
- if (replSettings.isMaster())
- l << " master=" << replSettings.isMaster();
- if (replSettings.isSlave())
- l << " slave=" << (int)replSettings.isSlave();
const bool is32bit = sizeof(int*) == 4;
l << (is32bit ? " 32" : " 64") << "-bit host=" << getHostNameCached() << endl;
@@ -398,8 +394,8 @@ ExitCode _initAndListen(int listenPort) {
auto startupOpCtx = serviceContext->makeOperationContext(&cc());
- bool canCallFCVSetIfCleanStartup = !storageGlobalParams.readOnly &&
- !(replSettings.isSlave() || storageGlobalParams.engine == "devnull");
+ bool canCallFCVSetIfCleanStartup =
+ !storageGlobalParams.readOnly && (storageGlobalParams.engine != "devnull");
if (canCallFCVSetIfCleanStartup && !replSettings.usingReplSets()) {
Lock::GlobalWrite lk(startupOpCtx.get());
FeatureCompatibilityVersion::setIfCleanStartup(startupOpCtx.get(),
@@ -560,8 +556,7 @@ ExitCode _initAndListen(int listenPort) {
startTTLBackgroundJob();
}
- if (replSettings.usingReplSets() || (!replSettings.isMaster() && replSettings.isSlave()) ||
- !internalValidateFeaturesAsMaster) {
+ if (replSettings.usingReplSets() || !internalValidateFeaturesAsMaster) {
serverGlobalParams.validateFeaturesAsMaster.store(false);
}
}
diff --git a/src/mongo/db/logical_clock_test_fixture.cpp b/src/mongo/db/logical_clock_test_fixture.cpp
index 2d384c83274..66ae1b889f1 100644
--- a/src/mongo/db/logical_clock_test_fixture.cpp
+++ b/src/mongo/db/logical_clock_test_fixture.cpp
@@ -61,9 +61,6 @@ void LogicalClockTestFixture::setUp() {
_dbDirectClient = stdx::make_unique<DBDirectClient>(operationContext());
- // Set master to false (set to true in ShardingMongodTestFixture::setUp()) so follower mode can
- // be toggled meaningfully. Default follower mode to primary, so writes can be accepted.
- replicationCoordinator()->setMaster(false);
ASSERT_OK(replicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY));
}
diff --git a/src/mongo/db/mongod_options.cpp b/src/mongo/db/mongod_options.cpp
index cde2a2a1a3b..a466fa4313f 100644
--- a/src/mongo/db/mongod_options.cpp
+++ b/src/mongo/db/mongod_options.cpp
@@ -82,7 +82,6 @@ Status addMongodOptions(moe::OptionSection* options) {
}
#endif
- moe::OptionSection ms_options("Master/slave options (old; use replica sets instead)");
moe::OptionSection rs_options("Replica set options");
moe::OptionSection replication_options("Replication options");
moe::OptionSection sharding_options("Sharding options");
@@ -370,49 +369,6 @@ Status addMongodOptions(moe::OptionSection* options) {
#endif
- // Master Slave Options
-
- ms_options.addOptionChaining("master", "master", moe::Switch, "master mode")
- .incompatibleWith("replication.replSet")
- .incompatibleWith("replication.replSetName")
- .setSources(moe::SourceAllLegacy);
-
- ms_options.addOptionChaining("slave", "slave", moe::Switch, "slave mode")
- .incompatibleWith("replication.replSet")
- .incompatibleWith("replication.replSetName")
- .setSources(moe::SourceAllLegacy);
-
- ms_options
- .addOptionChaining(
- "source", "source", moe::String, "when slave: specify master as <server:port>")
- .incompatibleWith("replication.replSet")
- .incompatibleWith("replication.replSetName")
- .setSources(moe::SourceAllLegacy);
-
- ms_options
- .addOptionChaining(
- "only", "only", moe::String, "when slave: specify a single database to replicate")
- .incompatibleWith("replication.replSet")
- .incompatibleWith("replication.replSetName")
- .setSources(moe::SourceAllLegacy);
-
- ms_options
- .addOptionChaining(
- "slavedelay",
- "slavedelay",
- moe::Int,
- "specify delay (in seconds) to be used when applying master ops to slave")
- .incompatibleWith("replication.replSet")
- .incompatibleWith("replication.replSetName")
- .setSources(moe::SourceAllLegacy);
-
- ms_options
- .addOptionChaining(
- "autoresync", "autoresync", moe::Switch, "automatically resync if slave data is stale")
- .incompatibleWith("replication.replSet")
- .incompatibleWith("replication.replSetName")
- .setSources(moe::SourceAllLegacy);
-
// Replication Options
replication_options.addOptionChaining(
@@ -468,9 +424,7 @@ Status addMongodOptions(moe::OptionSection* options) {
moe::Switch,
"declare this is a shard db of a cluster; default port 27018")
.setSources(moe::SourceAllLegacy)
- .incompatibleWith("configsvr")
- .incompatibleWith("master")
- .incompatibleWith("slave");
+ .incompatibleWith("configsvr");
sharding_options
.addOptionChaining(
@@ -530,7 +484,6 @@ Status addMongodOptions(moe::OptionSection* options) {
options->addSection(windows_scm_options).transitional_ignore();
#endif
options->addSection(replication_options).transitional_ignore();
- options->addSection(ms_options).transitional_ignore();
options->addSection(rs_options).transitional_ignore();
options->addSection(sharding_options).transitional_ignore();
#ifdef MONGO_CONFIG_SSL
@@ -540,23 +493,6 @@ Status addMongodOptions(moe::OptionSection* options) {
// The following are legacy options that are disallowed in the JSON config file
- options
- ->addOptionChaining(
- "fastsync",
- "fastsync",
- moe::Switch,
- "indicate that this instance is starting from a dbpath snapshot of the repl peer")
- .hidden()
- .setSources(moe::SourceAllLegacy);
-
- options
- ->addOptionChaining("pretouch",
- "pretouch",
- moe::Int,
- "n pretouch threads for applying master/slave operations")
- .hidden()
- .setSources(moe::SourceAllLegacy);
-
// This is a deprecated option that we are supporting for backwards compatibility
// The first value for this option can be either 'dbpath' or 'run'.
// If it is 'dbpath', mongod prints the dbpath and exits. Any extra values are ignored.
@@ -651,18 +587,8 @@ Status validateMongodOptions(const moe::Environment& params) {
if (params.count("storage.queryableBackupMode")) {
// Command line options that are disallowed when --queryableBackupMode is specified.
- for (const auto& disallowedOption : {"replication.replSet",
- "configsvr",
- "upgrade",
- "repair",
- "profile",
- "master",
- "slave",
- "source",
- "only",
- "slavedelay",
- "autoresync",
- "fastsync"}) {
+ for (const auto& disallowedOption :
+ {"replication.replSet", "configsvr", "upgrade", "repair", "profile"}) {
if (params.count(disallowedOption)) {
return Status(ErrorCodes::BadValue,
str::stream() << "Cannot specify both queryable backup mode and "
@@ -1097,32 +1023,6 @@ Status storeMongodOptions(const moe::Environment& params) {
}
repl::ReplSettings replSettings;
- if (params.count("master")) {
- replSettings.setMaster(params["master"].as<bool>());
- }
- if (params.count("slave") && params["slave"].as<bool>() == true) {
- replSettings.setSlave(true);
- }
- if (params.count("slavedelay")) {
- replSettings.setSlaveDelaySecs(params["slavedelay"].as<int>());
- }
- if (params.count("fastsync")) {
- if (!replSettings.isSlave()) {
- return Status(ErrorCodes::BadValue,
- str::stream() << "--fastsync must only be used with --slave");
- }
- replSettings.setFastSyncEnabled(params["fastsync"].as<bool>());
- }
- if (params.count("autoresync")) {
- replSettings.setAutoResyncEnabled(params["autoresync"].as<bool>());
- }
- if (params.count("source")) {
- /* specifies what the source in local.sources should be */
- replSettings.setSource(params["source"].as<std::string>().c_str());
- }
- if (params.count("pretouch")) {
- replSettings.setPretouch(params["pretouch"].as<int>());
- }
if (params.count("replication.replSetName")) {
replSettings.setReplSetString(params["replication.replSetName"].as<std::string>().c_str());
}
@@ -1147,9 +1047,6 @@ Status storeMongodOptions(const moe::Environment& params) {
serverGlobalParams.indexBuildRetry = params["storage.indexBuildRetry"].as<bool>();
}
- if (params.count("only")) {
- replSettings.setOnly(params["only"].as<std::string>().c_str());
- }
if (params.count("storage.mmapv1.nsSize")) {
int x = params["storage.mmapv1.nsSize"].as<int>();
if (x <= 0 || x > (0x7fffffff / 1024 / 1024)) {
@@ -1272,9 +1169,6 @@ Status storeMongodOptions(const moe::Environment& params) {
storageGlobalParams.repairpath = storageGlobalParams.dbpath;
}
- if (replSettings.getPretouch())
- log() << "--pretouch " << replSettings.getPretouch();
-
// Check if we are 32 bit and have not explicitly specified any journaling options
if (sizeof(void*) == 4 && !params.count("storage.journal.enabled")) {
// trying to make this stand out more like startup warnings
diff --git a/src/mongo/db/repair_database_and_check_version.cpp b/src/mongo/db/repair_database_and_check_version.cpp
index eceed6b3198..d86c0da16af 100644
--- a/src/mongo/db/repair_database_and_check_version.cpp
+++ b/src/mongo/db/repair_database_and_check_version.cpp
@@ -366,8 +366,7 @@ StatusWith<bool> repairDatabasesAndCheckVersion(OperationContext* opCtx) {
// to. The local DB is special because it is not replicated. See SERVER-10927 for more
// details.
const bool shouldClearNonLocalTmpCollections =
- !(checkIfReplMissingFromCommandLine(opCtx) || replSettings.usingReplSets() ||
- replSettings.isSlave());
+ !(checkIfReplMissingFromCommandLine(opCtx) || replSettings.usingReplSets());
// To check whether a featureCompatibilityVersion document exists.
bool fcvDocumentExists = false;
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 5cebfed131d..6200fb4b39a 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1535,10 +1535,8 @@ env.Library(
env.Library(
target="serveronly_repl",
source=[
- "master_slave.cpp",
'noop_writer.cpp',
"replication_coordinator_external_state_impl.cpp",
- "resync.cpp",
"rs_sync.cpp",
"sync_source_feedback.cpp",
],
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp
deleted file mode 100644
index 4f8a109702b..00000000000
--- a/src/mongo/db/repl/master_slave.cpp
+++ /dev/null
@@ -1,1462 +0,0 @@
-/**
-* Copyright (C) 2008-2014 MongoDB Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
-
-/* Collections we use:
-
- local.sources - indicates what sources we pull from as a "slave", and the last update of
- each
- local.oplog.$main - our op log as "master"
- local.dbinfo.<dbname> - no longer used???
- local.pair.startup - [deprecated] can contain a special value indicating for a pair that we
- have the master copy.
- used when replacing other half of the pair which has permanently failed.
- local.pair.sync - [deprecated] { initialsynccomplete: 1 }
-*/
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/master_slave.h"
-
-#include <pcrecpp.h>
-
-#include "mongo/db/auth/authorization_manager.h"
-#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/catalog/database_catalog_entry.h"
-#include "mongo/db/catalog/database_holder.h"
-#include "mongo/db/catalog/document_validation.h"
-#include "mongo/db/client.h"
-#include "mongo/db/cloner.h"
-#include "mongo/db/commands.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/dbhelpers.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/op_observer.h"
-#include "mongo/db/ops/update.h"
-#include "mongo/db/query/internal_plans.h"
-#include "mongo/db/repl/handshake_args.h"
-#include "mongo/db/repl/oplog.h"
-#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/sync_tail.h"
-#include "mongo/db/server_parameters.h"
-#include "mongo/db/service_context.h"
-#include "mongo/db/storage/storage_options.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/exit.h"
-#include "mongo/util/log.h"
-#include "mongo/util/net/sock.h"
-#include "mongo/util/quick_exit.h"
-
-using std::cout;
-using std::endl;
-using std::max;
-using std::min;
-using std::set;
-using std::stringstream;
-using std::unique_ptr;
-using std::vector;
-
-namespace mongo {
-namespace repl {
-
-namespace {
-time_t lastForcedResync = 0;
-
-const int forceReconnect = -1;
-const int restartSync = 0;
-const int restartSyncAfterSleep = 1;
-} // namespace
-
-void pretouchOperation(OperationContext* opCtx, const BSONObj& op);
-void pretouchN(const vector<BSONObj>&, unsigned a, unsigned b);
-
-/* if 1 sync() is running */
-AtomicInt32 syncing(0);
-AtomicInt32 relinquishSyncingSome(0);
-
-/* output by the web console */
-const char* replInfo = "";
-struct ReplInfo {
- ReplInfo(const char* msg) {
- replInfo = msg;
- }
- ~ReplInfo() {
- replInfo = "?";
- }
-};
-
-
-ReplSource::ReplSource(OperationContext* opCtx) {
- nClonedThisPass = 0;
- ensureMe(opCtx);
-}
-
-ReplSource::ReplSource(OperationContext* opCtx, BSONObj o) : nClonedThisPass(0) {
- only = o.getStringField("only");
- hostName = o.getStringField("host");
- _sourceName = o.getStringField("source");
- uassert(10118, "'host' field not set in sources collection object", !hostName.empty());
- uassert(10119, "only source='main' allowed for now with replication", sourceName() == "main");
- BSONElement e = o.getField("syncedTo");
- if (!e.eoo()) {
- uassert(10120,
- "bad sources 'syncedTo' field value",
- e.type() == Date || e.type() == bsonTimestamp);
- Timestamp tmp(e.date());
- syncedTo = tmp;
- }
-
- BSONObj dbsObj = o.getObjectField("dbsNextPass");
- if (!dbsObj.isEmpty()) {
- BSONObjIterator i(dbsObj);
- while (1) {
- BSONElement e = i.next();
- if (e.eoo())
- break;
- addDbNextPass.insert(e.fieldName());
- }
- }
-
- dbsObj = o.getObjectField("incompleteCloneDbs");
- if (!dbsObj.isEmpty()) {
- BSONObjIterator i(dbsObj);
- while (1) {
- BSONElement e = i.next();
- if (e.eoo())
- break;
- incompleteCloneDbs.insert(e.fieldName());
- }
- }
- ensureMe(opCtx);
-}
-
-/* Turn our C++ Source object into a BSONObj */
-BSONObj ReplSource::jsobj() {
- BSONObjBuilder b;
- b.append("host", hostName);
- b.append("source", sourceName());
- if (!only.empty())
- b.append("only", only);
- if (!syncedTo.isNull())
- b.append("syncedTo", syncedTo);
-
- BSONObjBuilder dbsNextPassBuilder;
- int n = 0;
- for (set<std::string>::iterator i = addDbNextPass.begin(); i != addDbNextPass.end(); i++) {
- n++;
- dbsNextPassBuilder.appendBool(*i, 1);
- }
- if (n)
- b.append("dbsNextPass", dbsNextPassBuilder.done());
-
- BSONObjBuilder incompleteCloneDbsBuilder;
- n = 0;
- for (set<std::string>::iterator i = incompleteCloneDbs.begin(); i != incompleteCloneDbs.end();
- i++) {
- n++;
- incompleteCloneDbsBuilder.appendBool(*i, 1);
- }
- if (n)
- b.append("incompleteCloneDbs", incompleteCloneDbsBuilder.done());
-
- return b.obj();
-}
-
-void ReplSource::ensureMe(OperationContext* opCtx) {
- std::string myname = getHostName();
-
- // local.me is an identifier for a server for getLastError w:2+
- bool exists = Helpers::getSingleton(opCtx, "local.me", _me);
-
- if (!exists || !_me.hasField("host") || _me["host"].String() != myname) {
- Lock::DBLock dblk(opCtx, "local", MODE_X);
- WriteUnitOfWork wunit(opCtx);
- // clean out local.me
- Helpers::emptyCollection(opCtx, NamespaceString("local.me"));
-
- // repopulate
- BSONObjBuilder b;
- b.appendOID("_id", 0, true);
- b.append("host", myname);
- _me = b.obj();
- Helpers::putSingleton(opCtx, "local.me", _me);
- wunit.commit();
- }
- _me = _me.getOwned();
-}
-
-void ReplSource::save(OperationContext* opCtx) {
- BSONObjBuilder b;
- verify(!hostName.empty());
- b.append("host", hostName);
- // todo: finish allowing multiple source configs.
- // this line doesn't work right when source is null, if that is allowed as it is now:
- // b.append("source", _sourceName);
- BSONObj pattern = b.done();
-
- BSONObj o = jsobj();
- LOG(1) << "Saving repl source: " << o << endl;
-
- {
- OldClientContext ctx(opCtx, "local.sources", false);
-
- const NamespaceString requestNs("local.sources");
- UpdateRequest request(requestNs);
-
- request.setQuery(pattern);
- request.setUpdates(o);
- request.setUpsert();
-
- UpdateResult res = update(opCtx, ctx.db(), request);
-
- verify(!res.modifiers);
- verify(res.numMatched == 1 || !res.upserted.isEmpty());
- }
-}
-
-static void addSourceToList(OperationContext* opCtx,
- ReplSource::SourceVector& v,
- ReplSource& s,
- ReplSource::SourceVector& old) {
- if (!s.syncedTo.isNull()) { // Don't reuse old ReplSource if there was a forced resync.
- for (ReplSource::SourceVector::iterator i = old.begin(); i != old.end();) {
- if (s == **i) {
- v.push_back(*i);
- old.erase(i);
- return;
- }
- i++;
- }
- }
-
- v.push_back(std::shared_ptr<ReplSource>(new ReplSource(s)));
-}
-
-/* we reuse our existing objects so that we can keep our existing connection
- and cursor in effect.
-*/
-void ReplSource::loadAll(OperationContext* opCtx, SourceVector& v) {
- const char* localSources = "local.sources";
- OldClientContext ctx(opCtx, localSources, false);
- SourceVector old = v;
- v.clear();
-
- const ReplSettings& replSettings = ReplicationCoordinator::get(opCtx)->getSettings();
- if (!replSettings.getSource().empty()) {
- // --source <host> specified.
- // check that no items are in sources other than that
- // add if missing
- int n = 0;
- auto exec = InternalPlanner::collectionScan(opCtx,
- localSources,
- ctx.db()->getCollection(opCtx, localSources),
- PlanExecutor::NO_YIELD);
- BSONObj obj;
- PlanExecutor::ExecState state;
- while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) {
- n++;
- ReplSource tmp(opCtx, obj);
- if (tmp.hostName != replSettings.getSource()) {
- log() << "--source " << replSettings.getSource() << " != " << tmp.hostName
- << " from local.sources collection" << endl;
- log() << "for instructions on changing this slave's source, see:" << endl;
- log() << "http://dochub.mongodb.org/core/masterslave" << endl;
- log() << "terminating mongod after 30 seconds" << endl;
- sleepsecs(30);
- quickExit(EXIT_REPLICATION_ERROR);
- }
- if (tmp.only != replSettings.getOnly()) {
- log() << "--only " << replSettings.getOnly() << " != " << tmp.only
- << " from local.sources collection" << endl;
- log() << "terminating after 30 seconds" << endl;
- sleepsecs(30);
- quickExit(EXIT_REPLICATION_ERROR);
- }
- }
- uassert(17065, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state);
- uassert(10002, "local.sources collection corrupt?", n < 2);
- if (n == 0) {
- // source missing. add.
- ReplSource s(opCtx);
- s.hostName = replSettings.getSource();
- s.only = replSettings.getOnly();
- s.save(opCtx);
- }
- } else {
- try {
- massert(10384, "--only requires use of --source", replSettings.getOnly().empty());
- } catch (...) {
- quickExit(EXIT_BADOPTIONS);
- }
- }
-
- auto exec = InternalPlanner::collectionScan(
- opCtx, localSources, ctx.db()->getCollection(opCtx, localSources), PlanExecutor::NO_YIELD);
- BSONObj obj;
- PlanExecutor::ExecState state;
- while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) {
- ReplSource tmp(opCtx, obj);
- if (tmp.syncedTo.isNull()) {
- DBDirectClient c(opCtx);
- BSONObj op = c.findOne("local.oplog.$main",
- QUERY("op" << NE << "n").sort(BSON("$natural" << -1)));
- if (!op.isEmpty()) {
- tmp.syncedTo = op["ts"].timestamp();
- }
- }
- addSourceToList(opCtx, v, tmp, old);
- }
- uassert(17066, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state);
-}
-
-bool ReplSource::throttledForceResyncDead(OperationContext* opCtx, const char* requester) {
- if (time(0) - lastForcedResync > 600) {
- forceResyncDead(opCtx, requester);
- lastForcedResync = time(0);
- return true;
- }
- return false;
-}
-
-void ReplSource::forceResyncDead(OperationContext* opCtx, const char* requester) {
- if (!replAllDead)
- return;
- SourceVector sources;
- ReplSource::loadAll(opCtx, sources);
- for (SourceVector::iterator i = sources.begin(); i != sources.end(); ++i) {
- log() << requester << " forcing resync from " << (*i)->hostName << endl;
- (*i)->forceResync(opCtx, requester);
- }
- replAllDead = 0;
-}
-
-class HandshakeCmd : public BasicCommand {
-public:
- std::string help() const override {
- return "internal";
- }
- HandshakeCmd() : BasicCommand("handshake") {}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
- return false;
- }
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kAlways;
- }
- virtual bool adminOnly() const {
- return false;
- }
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) const {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
-
- virtual bool run(OperationContext* opCtx,
- const std::string& ns,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- HandshakeArgs handshake;
- Status status = handshake.initialize(cmdObj);
- if (!status.isOK()) {
- return CommandHelpers::appendCommandStatus(result, status);
- }
-
- ReplClientInfo::forClient(opCtx->getClient()).setRemoteID(handshake.getRid());
-
- status = ReplicationCoordinator::get(opCtx)->processHandshake(opCtx, handshake);
- return CommandHelpers::appendCommandStatus(result, status);
- }
-
-} handshakeCmd;
-
-bool replHandshake(DBClientConnection* conn, const OID& myRID) {
- std::string myname = getHostName();
-
- BSONObjBuilder cmd;
- cmd.append("handshake", myRID);
-
- BSONObj res;
- bool ok = conn->runCommand("admin", cmd.obj(), res);
- // ignoring for now on purpose for older versions
- LOG(ok ? 1 : 0) << "replHandshake result: " << res << endl;
- return true;
-}
-
-bool ReplSource::_connect(OplogReader* reader, const HostAndPort& host, const OID& myRID) {
- if (reader->conn()) {
- return true;
- }
-
- if (!reader->connect(host)) {
- return false;
- }
-
- if (_doHandshake && !replHandshake(reader->conn(), myRID)) {
- return false;
- }
-
- return true;
-}
-
-
-void ReplSource::forceResync(OperationContext* opCtx, const char* requester) {
- BSONObj info;
- {
- // This is always a GlobalWrite lock (so no ns/db used from the context)
- invariant(opCtx->lockState()->isW());
- Lock::TempRelease tempRelease(opCtx->lockState());
-
- if (!_connect(&oplogReader,
- HostAndPort(hostName),
- ReplicationCoordinator::get(opCtx)->getMyRID())) {
- msgasserted(14051, "unable to connect to resync");
- }
- bool ok = oplogReader.conn()->runCommand(
- "admin", BSON("listDatabases" << 1), info, QueryOption_SlaveOk);
- massert(10385, "Unable to get database list", ok);
- }
-
- BSONObjIterator i(info.getField("databases").embeddedObject());
- while (i.moreWithEOO()) {
- BSONElement e = i.next();
- if (e.eoo())
- break;
- std::string name = e.embeddedObject().getField("name").valuestr();
- if (!e.embeddedObject().getBoolField("empty")) {
- if (name != "local") {
- if (only.empty() || only == name) {
- resyncDrop(opCtx, name);
- }
- }
- }
- }
- syncedTo = Timestamp();
- addDbNextPass.clear();
- save(opCtx);
-}
-
-Status ReplSource::_updateIfDoneWithInitialSync(OperationContext* opCtx) {
- const auto usedToDoHandshake = _doHandshake;
- if (!usedToDoHandshake && addDbNextPass.empty() && incompleteCloneDbs.empty()) {
- _doHandshake = true;
- oplogReader.resetConnection();
- const auto myRID = ReplicationCoordinator::get(opCtx)->getMyRID();
- if (!_connect(&oplogReader, HostAndPort{hostName}, myRID)) {
- return {ErrorCodes::MasterSlaveConnectionFailure,
- str::stream() << "could not connect to " << hostName << " with rid: "
- << myRID.toString()};
- } else {
- return {ErrorCodes::Interrupted, "Initial Sync is done."};
- }
- }
- return Status::OK();
-}
-
-void ReplSource::resyncDrop(OperationContext* opCtx, const std::string& dbName) {
- log() << "resync: dropping database " << dbName;
- invariant(opCtx->lockState()->isW());
-
- Database* const db = dbHolder().get(opCtx, dbName);
- if (!db) {
- log() << "resync: dropping database " << dbName
- << " - database does not exist. nothing to do.";
- return;
- }
- Database::dropDatabase(opCtx, db);
-}
-
-/* grab initial copy of a database from the master */
-void ReplSource::resync(OperationContext* opCtx, const std::string& dbName) {
- const std::string db(dbName); // need local copy of the name, we're dropping the original
- resyncDrop(opCtx, db);
-
- {
- log() << "resync: cloning database " << db << " to get an initial copy" << endl;
- ReplInfo r("resync: cloning a database");
-
- CloneOptions cloneOptions;
- cloneOptions.fromDB = db;
- cloneOptions.slaveOk = true;
- cloneOptions.useReplAuth = true;
- cloneOptions.snapshot = true;
-
- Cloner cloner;
- Status status = cloner.copyDb(opCtx, db, hostName.c_str(), cloneOptions, NULL);
-
- if (!status.isOK()) {
- if (status.code() == ErrorCodes::DatabaseDifferCase) {
- resyncDrop(opCtx, db);
- log() << "resync: database " << db
- << " not valid on the master due to a name conflict, dropping.";
- return;
- } else {
- log() << "resync of " << db << " from " << hostName
- << " failed due to: " << redact(status);
- throw SyncException();
- }
- }
- }
-
- log() << "resync: done with initial clone for db: " << db << endl;
-}
-
-static DatabaseIgnorer ___databaseIgnorer;
-
-void DatabaseIgnorer::doIgnoreUntilAfter(const std::string& db, const Timestamp& futureOplogTime) {
- if (futureOplogTime > _ignores[db]) {
- _ignores[db] = futureOplogTime;
- }
-}
-
-bool DatabaseIgnorer::ignoreAt(const std::string& db, const Timestamp& currentOplogTime) {
- if (_ignores[db].isNull()) {
- return false;
- }
- if (_ignores[db] >= currentOplogTime) {
- return true;
- } else {
- // The ignore state has expired, so clear it.
- _ignores.erase(db);
- return false;
- }
-}
-
-bool ReplSource::handleDuplicateDbName(OperationContext* opCtx,
- const BSONObj& op,
- const char* ns,
- const char* db) {
- // We are already locked at this point
- if (dbHolder().get(opCtx, ns) != NULL) {
- // Database is already present.
- return true;
- }
- BSONElement ts = op.getField("ts");
- if ((ts.type() == Date || ts.type() == bsonTimestamp) &&
- ___databaseIgnorer.ignoreAt(db, ts.timestamp())) {
- // Database is ignored due to a previous indication that it is
- // missing from master after optime "ts".
- return false;
- }
- if (dbHolder().getNamesWithConflictingCasing(db).empty()) {
- // No duplicate database names are present.
- return true;
- }
-
- Timestamp lastTime;
- bool dbOk = false;
- {
- // This is always a GlobalWrite lock (so no ns/db used from the context)
- invariant(opCtx->lockState()->isW());
- Lock::TempRelease tempRelease(opCtx->lockState());
-
- // We always log an operation after executing it (never before), so
- // a database list will always be valid as of an oplog entry generated
- // before it was retrieved.
-
- BSONObj last =
- oplogReader.findOne(this->ns().c_str(), Query().sort(BSON("$natural" << -1)));
- if (!last.isEmpty()) {
- BSONElement ts = last.getField("ts");
- massert(14032,
- "Invalid 'ts' in remote log",
- ts.type() == Date || ts.type() == bsonTimestamp);
- lastTime = Timestamp(ts.date());
- }
-
- BSONObj info;
- bool ok = oplogReader.conn()->runCommand("admin", BSON("listDatabases" << 1), info);
- massert(14033, "Unable to get database list", ok);
- BSONObjIterator i(info.getField("databases").embeddedObject());
- while (i.more()) {
- BSONElement e = i.next();
-
- const char* name = e.embeddedObject().getField("name").valuestr();
- if (strcasecmp(name, db) != 0)
- continue;
-
- if (strcmp(name, db) == 0) {
- // The db exists on master, still need to check that no conflicts exist there.
- dbOk = true;
- continue;
- }
-
- // The master has a db name that conflicts with the requested name.
- dbOk = false;
- break;
- }
- }
-
- if (!dbOk) {
- ___databaseIgnorer.doIgnoreUntilAfter(db, lastTime);
- incompleteCloneDbs.erase(db);
- addDbNextPass.erase(db);
- return false;
- }
-
- // Check for duplicates again, since we released the lock above.
- auto duplicates = dbHolder().getNamesWithConflictingCasing(db);
-
- // The database is present on the master and no conflicting databases
- // are present on the master. Drop any local conflicts.
- for (set<std::string>::const_iterator i = duplicates.begin(); i != duplicates.end(); ++i) {
- ___databaseIgnorer.doIgnoreUntilAfter(*i, lastTime);
- incompleteCloneDbs.erase(*i);
- addDbNextPass.erase(*i);
-
- AutoGetDb autoDb(opCtx, *i, MODE_X);
- Database::dropDatabase(opCtx, autoDb.getDb());
- }
-
- massert(14034,
- "Duplicate database names present after attempting to delete duplicates",
- dbHolder().getNamesWithConflictingCasing(db).empty());
- return true;
-}
-
-void ReplSource::applyCommand(OperationContext* opCtx, const BSONObj& op) {
- try {
- Status status = applyCommand_inlock(opCtx, op, OplogApplication::Mode::kMasterSlave);
- uassert(28639, "Failure applying initial sync command", status.isOK());
- } catch (AssertionException& e) {
- log() << "sync: caught user assertion " << redact(e) << " while applying op: " << redact(op)
- << endl;
- ;
- } catch (DBException& e) {
- log() << "sync: caught db exception " << redact(e) << " while applying op: " << redact(op)
- << endl;
- ;
- }
-}
-
-void ReplSource::applyOperation(OperationContext* opCtx, Database* db, const BSONObj& op) {
- try {
- Status status =
- applyOperation_inlock(opCtx, db, op, false, OplogApplication::Mode::kMasterSlave);
- if (!status.isOK()) {
- uassert(15914,
- "Failure applying initial sync operation",
- status == ErrorCodes::UpdateOperationFailed);
-
- // In initial sync, update operations can cause documents to be missed during
- // collection cloning. As a result, it is possible that a document that we need to
- // update is not present locally. In that case we fetch the document from the
- // sync source.
- SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr);
- sync.setHostname(hostName);
- OplogEntry oplogEntry(op);
- sync.fetchAndInsertMissingDocument(opCtx, oplogEntry);
- }
- } catch (AssertionException& e) {
- log() << "sync: caught user assertion " << redact(e) << " while applying op: " << redact(op)
- << endl;
- ;
- } catch (DBException& e) {
- log() << "sync: caught db exception " << redact(e) << " while applying op: " << redact(op)
- << endl;
- ;
- }
-}
-
-/* local.$oplog.main is of the form:
- { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
- ...
- see logOp() comments.
-
- @param alreadyLocked caller already put us in write lock if true
-*/
-void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* opCtx,
- BSONObj& op,
- bool alreadyLocked) {
- LOG(6) << "processing op: " << redact(op) << endl;
-
- if (op.getStringField("op")[0] == 'n')
- return;
-
- const char* ns = op.getStringField("ns");
- const auto dbName = nsToDatabaseSubstring(ns).toString();
-
- if (*ns == '.') {
- log() << "skipping bad op in oplog: " << redact(op) << endl;
- return;
- } else if (*ns == 0) {
- /*if( op.getStringField("op")[0] != 'n' )*/ {
- log() << "halting replication, bad op in oplog:\n " << redact(op) << endl;
- replAllDead = "bad object in oplog";
- throw SyncException();
- }
- // ns = "local.system.x";
- // nsToDatabase(ns, clientName);
- }
-
- if (!only.empty() && only != dbName)
- return;
-
- // Push the CurOp stack for "opCtx" so each individual oplog entry application is separately
- // reported.
- CurOp individualOp(opCtx);
- UnreplicatedWritesBlock uwb(opCtx);
- const ReplSettings& replSettings = ReplicationCoordinator::get(opCtx)->getSettings();
- if (replSettings.getPretouch() &&
- !alreadyLocked /*doesn't make sense if in write lock already*/) {
- if (replSettings.getPretouch() > 1) {
- /* note: this is bad - should be put in ReplSource. but this is first test... */
- static int countdown;
- verify(countdown >= 0);
- if (countdown > 0) {
- countdown--; // was pretouched on a prev pass
- } else {
- const int m = 4;
- if (tp.get() == 0) {
- int nthr = min(8, replSettings.getPretouch());
- nthr = max(nthr, 1);
- ThreadPool::Options options;
- options.maxThreads = options.minThreads = std::size_t(nthr);
- options.poolName = "master_slave_pretouch";
- tp.reset(new ThreadPool(options));
- }
- vector<BSONObj> v;
- oplogReader.peek(v, replSettings.getPretouch());
- unsigned a = 0;
- while (1) {
- if (a >= v.size())
- break;
- unsigned b = a + m - 1; // v[a..b]
- if (b >= v.size())
- b = v.size() - 1;
- invariantOK(tp->schedule([&v, a, b] { pretouchN(v, a, b); }));
- DEV cout << "pretouch task: " << a << ".." << b << endl;
- a += m;
- }
- // we do one too...
- pretouchOperation(opCtx, op);
- tp->waitForIdle();
- countdown = v.size();
- }
- } else {
- pretouchOperation(opCtx, op);
- }
- }
-
- unique_ptr<Lock::GlobalWrite> lk(alreadyLocked ? 0 : new Lock::GlobalWrite(opCtx));
-
- if (replAllDead) {
- // hmmm why is this check here and not at top of this function? does it get set between top
- // and here?
- log() << "replAllDead, throwing SyncException: " << replAllDead << endl;
- throw SyncException();
- }
-
- if (!handleDuplicateDbName(opCtx, op, ns, dbName.c_str())) {
- return;
- }
-
- // special case apply for commands to avoid implicit database creation
- if (*op.getStringField("op") == 'c') {
- applyCommand(opCtx, op);
- return;
- }
-
- // This code executes on the slaves only, so it doesn't need to be sharding-aware since
- // mongos will not send requests there. That's why the last argument is false (do not do
- // version checking).
- OldClientContext ctx(opCtx, ns, false);
-
- bool empty = !ctx.db()->getDatabaseCatalogEntry()->hasUserData();
- bool incompleteClone = incompleteCloneDbs.count(dbName) != 0;
-
- LOG(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty
- << ", incompleteClone: " << incompleteClone << endl;
-
- if (ctx.justCreated() || empty || incompleteClone) {
- // we must add to incomplete list now that setClient has been called
- incompleteCloneDbs.insert(dbName);
- if (nClonedThisPass) {
- /* we only clone one database per pass, even if a lot need done. This helps us
- avoid overflowing the master's transaction log by doing too much work before going
- back to read more transactions. (Imagine a scenario of slave startup where we try to
- clone 100 databases in one pass.)
- */
- addDbNextPass.insert(dbName);
- } else {
- if (incompleteClone) {
- log() << "An earlier initial clone of '" << dbName
- << "' did not complete, now resyncing." << endl;
- }
- save(opCtx);
- OldClientContext ctx(opCtx, ns, false);
- nClonedThisPass++;
- resync(opCtx, ctx.db()->name());
- addDbNextPass.erase(dbName);
- incompleteCloneDbs.erase(dbName);
- }
- save(opCtx);
- } else {
- applyOperation(opCtx, ctx.db(), op);
- addDbNextPass.erase(dbName);
- }
-}
-
-void ReplSource::syncToTailOfRemoteLog() {
- std::string _ns = ns();
- BSONObjBuilder b;
- if (!only.empty()) {
- b.appendRegex("ns", std::string("^") + pcrecpp::RE::QuoteMeta(only));
- }
- BSONObj last = oplogReader.findOne(_ns.c_str(), Query(b.done()).sort(BSON("$natural" << -1)));
- if (!last.isEmpty()) {
- BSONElement ts = last.getField("ts");
- massert(10386,
- "non Date ts found: " + last.toString(),
- ts.type() == Date || ts.type() == bsonTimestamp);
- syncedTo = Timestamp(ts.date());
- }
-}
-
-AtomicInt32 replApplyBatchSize(1);
-
-class ReplApplyBatchSize
- : public ExportedServerParameter<int, ServerParameterType::kStartupAndRuntime> {
-public:
- ReplApplyBatchSize()
- : ExportedServerParameter<int, ServerParameterType::kStartupAndRuntime>(
- ServerParameterSet::getGlobal(), "replApplyBatchSize", &replApplyBatchSize) {}
-
- virtual Status validate(const int& potentialNewValue) {
- if (potentialNewValue < 1 || potentialNewValue > 1024) {
- return Status(ErrorCodes::BadValue, "replApplyBatchSize has to be >= 1 and <= 1024");
- }
-
- const ReplSettings& replSettings =
- ReplicationCoordinator::get(getGlobalServiceContext())->getSettings();
- if (replSettings.getSlaveDelaySecs() != Seconds(0) && potentialNewValue > 1) {
- return Status(ErrorCodes::BadValue, "can't use a batch size > 1 with slavedelay");
- }
-
- if (!replSettings.isSlave()) {
- return Status(ErrorCodes::BadValue,
- "can't set replApplyBatchSize on a non-slave machine");
- }
-
- return Status::OK();
- }
-} replApplyBatchSizeServerParameter;
-
-/* slave: pull some data from the master's oplog
- note: not yet in db mutex at this point.
- @return -1 error
- 0 ok, don't sleep
- 1 ok, sleep
-*/
-int ReplSource::_sync_pullOpLog(OperationContext* opCtx, int& nApplied) {
- int okResultCode = restartSyncAfterSleep;
- std::string ns = std::string("local.oplog.$") + sourceName();
- LOG(2) << "sync_pullOpLog " << ns << " syncedTo:" << syncedTo << '\n';
-
- bool tailing = true;
- oplogReader.tailCheck();
-
- // Due to the lack of exception handlers, don't allow lock interrupts.
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
-
- bool initial = syncedTo.isNull();
-
- if (!oplogReader.haveCursor() || initial) {
- if (initial) {
- // Important to grab last oplog timestamp before listing databases.
- syncToTailOfRemoteLog();
- BSONObj info;
- bool ok = oplogReader.conn()->runCommand("admin", BSON("listDatabases" << 1), info);
- massert(10389, "Unable to get database list", ok);
- BSONObjIterator i(info.getField("databases").embeddedObject());
- while (i.moreWithEOO()) {
- BSONElement e = i.next();
- if (e.eoo())
- break;
- std::string name = e.embeddedObject().getField("name").valuestr();
- if (!e.embeddedObject().getBoolField("empty")) {
- if (name != "local") {
- if (only.empty() || only == name) {
- LOG(2) << "adding to 'addDbNextPass': " << name << endl;
- addDbNextPass.insert(name);
- }
- }
- }
- }
- // obviously global isn't ideal, but non-repl set is old so
- // keeping it simple
- Lock::GlobalWrite lk(opCtx);
- save(opCtx);
- }
-
- BSONObjBuilder gte;
- gte.append("$gte", syncedTo);
- BSONObjBuilder query;
- query.append("ts", gte.done());
- if (!only.empty()) {
- // note we may here skip a LOT of data table scanning, a lot of work for the master.
- // maybe append "\\." here?
- query.appendRegex("ns", std::string("^") + pcrecpp::RE::QuoteMeta(only));
- }
- BSONObj queryObj = query.done();
- // e.g. queryObj = { ts: { $gte: syncedTo } }
-
- oplogReader.tailingQuery(ns.c_str(), queryObj);
- tailing = false;
- } else {
- LOG(2) << "tailing=true\n";
- }
-
- if (!oplogReader.haveCursor()) {
- log() << "dbclient::query returns null (conn closed?)" << endl;
- oplogReader.resetConnection();
- return forceReconnect;
- }
-
- // show any deferred database creates from a previous pass
- {
- set<std::string>::iterator i = addDbNextPass.begin();
- if (i != addDbNextPass.end()) {
- BSONObjBuilder b;
- b.append("ns", *i + '.');
- b.append("op", "db");
- BSONObj op = b.done();
- _sync_pullOpLog_applyOperation(opCtx, op, false);
- }
- }
-
- auto status = _updateIfDoneWithInitialSync(opCtx);
- if (!status.isOK()) {
- switch (status.code()) {
- case ErrorCodes::Interrupted: {
- return restartSync; // don't sleep;
- }
- default: {
- error() << redact(status);
- return forceReconnect; // causes reconnect.
- }
- }
- }
-
- if (!oplogReader.more()) {
- if (tailing) {
- LOG(2) << "tailing & no new activity\n";
- okResultCode = restartSync; // don't sleep
-
- } else {
- log() << ns << " oplog is empty" << endl;
- }
- {
- Lock::GlobalWrite lk(opCtx);
- save(opCtx);
- }
- return okResultCode;
- }
-
- Timestamp nextOpTime;
- {
- BSONObj op = oplogReader.nextSafe();
- BSONElement ts = op.getField("ts");
- if (ts.type() != Date && ts.type() != bsonTimestamp) {
- std::string err = op.getStringField("$err");
- if (!err.empty()) {
- // 13051 is "tailable cursor requested on non capped collection"
- if (op.getIntField("code") == 13051) {
- log() << "trying to slave off of a non-master" << '\n';
- massert(13344, "trying to slave off of a non-master", false);
- } else {
- error() << "$err reading remote oplog: " + redact(err) << '\n';
- massert(10390, "got $err reading remote oplog", false);
- }
- } else {
- error() << "bad object read from remote oplog: " << redact(op) << '\n';
- massert(10391, "bad object read from remote oplog", false);
- }
- }
-
- nextOpTime = Timestamp(ts.date());
- LOG(2) << "first op time received: " << nextOpTime.toString() << '\n';
- if (initial) {
- LOG(1) << "initial run\n";
- }
- if (tailing) {
- if (!(syncedTo < nextOpTime)) {
- warning() << "ASSERTION failed : syncedTo < nextOpTime" << endl;
- log() << "syncTo: " << syncedTo << endl;
- log() << "nextOpTime: " << nextOpTime << endl;
- verify(false);
- }
- oplogReader.putBack(op); // op will be processed in the loop below
- nextOpTime = Timestamp(); // will reread the op below
- } else if (nextOpTime != syncedTo) { // didn't get what we queried for - error
- log() << "nextOpTime " << nextOpTime << ' ' << ((nextOpTime < syncedTo) ? "<??" : ">")
- << " syncedTo " << syncedTo << '\n'
- << "time diff: " << (nextOpTime.getSecs() - syncedTo.getSecs()) << "sec\n"
- << "tailing: " << tailing << '\n'
- << "data too stale, halting replication" << endl;
- replInfo = replAllDead = "data too stale halted replication";
- verify(syncedTo < nextOpTime);
- throw SyncException();
- } else {
- /* t == syncedTo, so the first op was applied previously or it is the first op of
- * initial query and need not be applied. */
- }
- }
-
- // apply operations
- {
- int n = 0;
- time_t saveLast = time(0);
- while (1) {
- // we need "&& n" to assure we actually process at least one op to get a sync
- // point recorded in the first place.
- const bool moreInitialSyncsPending = !addDbNextPass.empty() && n;
-
- if (moreInitialSyncsPending || !oplogReader.more()) {
- Lock::GlobalWrite lk(opCtx);
-
- if (tailing) {
- okResultCode = restartSync; // don't sleep
- }
-
- syncedTo = nextOpTime;
- save(opCtx); // note how far we are synced up to now
- nApplied = n;
- break;
- }
-
- OCCASIONALLY if (n > 0 && (n > 100000 || time(0) - saveLast > 60)) {
- // periodically note our progress, in case we are doing a lot of work and crash
- Lock::GlobalWrite lk(opCtx);
- syncedTo = nextOpTime;
- // can't update local log ts since there are pending operations from our peer
- save(opCtx);
- log() << "checkpoint applied " << n << " operations" << endl;
- log() << "syncedTo: " << syncedTo << endl;
- saveLast = time(0);
- n = 0;
- }
-
- BSONObj op = oplogReader.nextSafe();
-
- int b = replApplyBatchSize.load();
- bool justOne = b == 1;
- unique_ptr<Lock::GlobalWrite> lk(justOne ? 0 : new Lock::GlobalWrite(opCtx));
- while (1) {
- BSONElement ts = op.getField("ts");
- if (!(ts.type() == Date || ts.type() == bsonTimestamp)) {
- log() << "sync error: problem querying remote oplog record" << endl;
- log() << "op: " << redact(op) << endl;
- log() << "halting replication" << endl;
- replInfo = replAllDead = "sync error: no ts found querying remote oplog record";
- throw SyncException();
- }
- Timestamp last = nextOpTime;
- nextOpTime = Timestamp(ts.date());
- if (!(last < nextOpTime)) {
- log() << "sync error: last applied optime at slave >= nextOpTime from master"
- << endl;
- log() << " last: " << last << endl;
- log() << " nextOpTime: " << nextOpTime << endl;
- log() << " halting replication" << endl;
- replInfo = replAllDead = "sync error last >= nextOpTime";
- uassert(
- 10123,
- "replication error last applied optime at slave >= nextOpTime from master",
- false);
- }
- const ReplSettings& replSettings =
- ReplicationCoordinator::get(opCtx)->getSettings();
- if (replSettings.getSlaveDelaySecs() != Seconds(0) &&
- (Seconds(time(0)) <
- Seconds(nextOpTime.getSecs()) + replSettings.getSlaveDelaySecs())) {
- verify(justOne);
- oplogReader.putBack(op);
- _sleepAdviceTime = nextOpTime.getSecs() +
- durationCount<Seconds>(replSettings.getSlaveDelaySecs()) + 1;
- Lock::GlobalWrite lk(opCtx);
- if (n > 0) {
- syncedTo = last;
- save(opCtx);
- }
- log() << "applied " << n << " operations" << endl;
- log() << "syncedTo: " << syncedTo << endl;
- log() << "waiting until: " << _sleepAdviceTime << " to continue" << endl;
- return okResultCode;
- }
-
- _sync_pullOpLog_applyOperation(opCtx, op, !justOne);
- n++;
-
- if (--b == 0)
- break;
- // if to here, we are doing mulpile applications in a singel write lock acquisition
- if (!oplogReader.moreInCurrentBatch()) {
- // break if no more in batch so we release lock while reading from the master
- break;
- }
- op = oplogReader.nextSafe();
- }
- }
- }
-
- return okResultCode;
-}
-
-
-/* note: not yet in mutex at this point.
- returns >= 0 if ok. return -1 if you want to reconnect.
- return value of zero indicates no sleep necessary before next call
-*/
-int ReplSource::sync(OperationContext* opCtx, int& nApplied) {
- _sleepAdviceTime = 0;
- ReplInfo r("sync");
- if (!serverGlobalParams.quiet.load()) {
- LogstreamBuilder l = log();
- l << "syncing from ";
- if (sourceName() != "main") {
- l << "source:" << sourceName() << ' ';
- }
- l << "host:" << hostName << endl;
- }
- nClonedThisPass = 0;
-
- // FIXME Handle cases where this db isn't on default port, or default port is spec'd in
- // hostName.
- if ((std::string("localhost") == hostName || std::string("127.0.0.1") == hostName) &&
- serverGlobalParams.port == ServerGlobalParams::DefaultDBPort) {
- log() << "can't sync from self (localhost). sources configuration may be wrong." << endl;
- sleepsecs(5);
- return -1;
- }
-
- if (!_connect(
- &oplogReader, HostAndPort(hostName), ReplicationCoordinator::get(opCtx)->getMyRID())) {
- LOG(4) << "can't connect to sync source" << endl;
- return -1;
- }
-
- return _sync_pullOpLog(opCtx, nApplied);
-}
-
-/* --------------------------------------------------------------*/
-
-static bool _replMainStarted = false;
-
-/*
-TODO:
-_ source has autoptr to the cursor
-_ reuse that cursor when we can
-*/
-
-/* returns: # of seconds to sleep before next pass
- 0 = no sleep recommended
- 1 = special sentinel indicating adaptive sleep recommended
-*/
-int _replMain(OperationContext* opCtx, ReplSource::SourceVector& sources, int& nApplied) {
- {
- ReplInfo r("replMain load sources");
- Lock::GlobalWrite lk(opCtx);
- ReplSource::loadAll(opCtx, sources);
-
- // only need this param for initial reset
- _replMainStarted = true;
- }
-
- if (sources.empty()) {
- /* replication is not configured yet (for --slave) in local.sources. Poll for config it
- every 20 seconds.
- */
- log() << "no source given, add a master to local.sources to start replication" << endl;
- return 20;
- }
-
- int sleepAdvice = 1;
- for (ReplSource::SourceVector::iterator i = sources.begin(); i != sources.end(); i++) {
- ReplSource* s = i->get();
- int res = forceReconnect;
- try {
- res = s->sync(opCtx, nApplied);
- bool moreToSync = s->haveMoreDbsToSync();
- if (res < 0) {
- sleepAdvice = 3;
- } else if (moreToSync) {
- sleepAdvice = 0;
- } else if (s->sleepAdvice()) {
- sleepAdvice = s->sleepAdvice();
- } else
- sleepAdvice = res;
- } catch (const SyncException&) {
- log() << "caught SyncException" << endl;
- return 10;
- } catch (const DBException& e) {
- log() << "DBException " << redact(e) << endl;
- replInfo = "replMain caught DBException";
- } catch (const std::exception& e) {
- log() << "std::exception " << redact(e.what()) << endl;
- replInfo = "replMain caught std::exception";
- } catch (...) {
- log() << "unexpected exception during replication. replication will halt" << endl;
- replAllDead = "caught unexpected exception during replication";
- }
- if (res < 0)
- s->oplogReader.resetConnection();
- }
- return sleepAdvice;
-}
-
-static void replMain(OperationContext* opCtx) {
- ReplSource::SourceVector sources;
- while (1) {
- auto s = restartSync;
- {
- Lock::GlobalWrite lk(opCtx);
- if (replAllDead) {
- // throttledForceResyncDead can throw
- if (!ReplicationCoordinator::get(opCtx)->getSettings().isAutoResyncEnabled() ||
- !ReplSource::throttledForceResyncDead(opCtx, "auto")) {
- log() << "all sources dead: " << replAllDead << ", sleeping for 5 seconds"
- << endl;
- break;
- }
- }
-
- // i.e., there is only one sync thread running. we will want to change/fix this.
- invariant(syncing.swap(1) == 0);
- }
-
- try {
- int nApplied = 0;
- s = _replMain(opCtx, sources, nApplied);
- if (s == restartSyncAfterSleep) {
- if (nApplied == 0)
- s = 2;
- else if (nApplied > 100) {
- // sleep very little - just enough that we aren't truly hammering master
- sleepmillis(75);
- s = restartSync;
- }
- }
- } catch (...) {
- log() << "caught exception in _replMain" << endl;
- s = 4;
- }
-
- {
- Lock::GlobalWrite lk(opCtx);
- invariant(syncing.swap(0) == 1);
- }
-
- if (relinquishSyncingSome.load()) {
- relinquishSyncingSome.store(0);
- s = restartSyncAfterSleep; // sleep before going back in to syncing=1
- }
-
- if (s) {
- stringstream ss;
- ss << "sleep " << s << " sec before next pass";
- std::string msg = ss.str();
- if (!serverGlobalParams.quiet.load())
- log() << msg << endl;
- ReplInfo r(msg.c_str());
- sleepsecs(s);
- }
- }
-}
-
-static void replMasterThread() {
- sleepsecs(4);
- Client::initThread("replmaster");
- int toSleep = 10;
- while (1) {
- sleepsecs(toSleep);
-
- // Write a keep-alive like entry to the log. This will make things like
- // printReplicationStatus() and printSlaveReplicationStatus() stay up-to-date even
- // when things are idle.
- const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
- OperationContext& opCtx = *opCtxPtr;
- AuthorizationSession::get(opCtx.getClient())->grantInternalAuthorization();
-
- UninterruptibleLockGuard noInterrupt(opCtx.lockState());
- Lock::GlobalWrite globalWrite(&opCtx, Date_t::now() + Milliseconds(1));
- if (globalWrite.isLocked()) {
- toSleep = 10;
-
- try {
- WriteUnitOfWork wuow(&opCtx);
- getGlobalServiceContext()->getOpObserver()->onOpMessage(&opCtx, BSONObj());
- wuow.commit();
- } catch (...) {
- log() << "caught exception in replMasterThread()" << endl;
- }
- } else {
- LOG(5) << "couldn't logKeepalive" << endl;
- toSleep = 1;
- }
- }
-}
-
-static void replSlaveThread() {
- sleepsecs(1);
- Client::initThread("replslave");
-
- const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
- OperationContext& opCtx = *opCtxPtr;
- AuthorizationSession::get(opCtx.getClient())->grantInternalAuthorization();
- DisableDocumentValidation validationDisabler(&opCtx);
-
- while (1) {
- try {
- replMain(&opCtx);
- sleepsecs(5);
- } catch (AssertionException&) {
- ReplInfo r("Assertion in replSlaveThread(): sleeping 5 minutes before retry");
- log() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl;
- sleepsecs(300);
- } catch (DBException& e) {
- log() << "exception in replSlaveThread(): " << e.what()
- << ", sleeping 5 minutes before retry" << endl;
- sleepsecs(300);
- } catch (...) {
- log() << "error in replSlaveThread(): sleeping 5 minutes before retry" << endl;
- sleepsecs(300);
- }
- }
-}
-
-void startMasterSlave(OperationContext* opCtx) {
- const ReplSettings& replSettings = ReplicationCoordinator::get(opCtx)->getSettings();
- if (!replSettings.isSlave() && !replSettings.isMaster())
- return;
-
- AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization();
-
- {
- ReplSource temp(opCtx); // Ensures local.me is populated
- }
-
- if (replSettings.isSlave()) {
- LOG(1) << "slave=true" << endl;
- stdx::thread repl_thread(replSlaveThread);
- repl_thread.detach();
- }
-
- if (replSettings.isMaster()) {
- LOG(1) << "master=true" << endl;
- createOplog(opCtx);
- stdx::thread t(replMasterThread);
- t.detach();
- }
-
- if (replSettings.isFastSyncEnabled()) {
- while (!_replMainStarted) // don't allow writes until we've set up from log
- sleepmillis(50);
- }
-}
-int _dummy_z;
-
-void pretouchN(const std::vector<BSONObj>& v, unsigned a, unsigned b) {
- Client::initThreadIfNotAlready("pretouchN");
-
- const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
- OperationContext& opCtx = *opCtxPtr; // XXX
- Lock::GlobalRead lk(&opCtx);
-
- for (unsigned i = a; i <= b; i++) {
- const BSONObj& op = v[i];
- const char* which = "o";
- const char* opType = op.getStringField("op");
- if (*opType == 'i')
- ;
- else if (*opType == 'u')
- which = "o2";
- else
- continue;
- /* todo : other operations */
-
- try {
- BSONObj o = op.getObjectField(which);
- BSONElement _id;
- if (o.getObjectID(_id)) {
- const char* ns = op.getStringField("ns");
- BSONObjBuilder b;
- b.append(_id);
- BSONObj result;
- OldClientContext ctx(&opCtx, ns, false);
- if (Helpers::findById(&opCtx, ctx.db(), ns, b.done(), result))
- _dummy_z += result.objsize(); // touch
- }
- } catch (DBException& e) {
- log() << "ignoring assertion in pretouchN() " << a << ' ' << b << ' ' << i << ' '
- << redact(e) << endl;
- }
- }
-}
-
-void pretouchOperation(OperationContext* opCtx, const BSONObj& op) {
- if (opCtx->lockState()->isWriteLocked()) {
- // no point pretouching if write locked. not sure if this will ever fire, but just in case.
- return;
- }
-
- const char* which = "o";
- const char* opType = op.getStringField("op");
- if (*opType == 'i')
- ;
- else if (*opType == 'u')
- which = "o2";
- else
- return;
- /* todo : other operations */
-
- try {
- BSONObj o = op.getObjectField(which);
- BSONElement _id;
- if (o.getObjectID(_id)) {
- const char* ns = op.getStringField("ns");
- BSONObjBuilder b;
- b.append(_id);
- BSONObj result;
- AutoGetCollectionForReadCommand ctx(opCtx, NamespaceString(ns));
- if (Helpers::findById(opCtx, ctx.getDb(), ns, b.done(), result)) {
- _dummy_z += result.objsize(); // touch
- }
- }
- } catch (DBException&) {
- log() << "ignoring assertion in pretouchOperation()" << endl;
- }
-}
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h
deleted file mode 100644
index 3c96386080f..00000000000
--- a/src/mongo/db/repl/master_slave.h
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
-* Copyright (C) 2008 10gen Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
-
-#pragma once
-
-#include "mongo/db/repl/oplogreader.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/util/concurrency/thread_pool.h"
-
-/* replication data overview
-
- at the slave:
- local.sources { host: ..., source: ..., only: ..., syncedTo: ..., localLogTs: ...,
- dbsNextPass: { ... }, incompleteCloneDbs: { ... } }
-
- at the master:
- local.oplog.$<source>
-*/
-
-namespace mongo {
-
-class Database;
-class OperationContext;
-
-namespace repl {
-
-// Main entry point for master/slave at startup time.
-void startMasterSlave(OperationContext* opCtx);
-
-// externed for use with resync.cpp
-extern AtomicInt32 relinquishSyncingSome;
-extern AtomicInt32 syncing;
-
-extern const char* replInfo;
-
-/* A replication exception */
-class SyncException final : public DBException {
-public:
- SyncException() : DBException(Status(ErrorCodes::Error(10001), "sync exception")) {}
-
-private:
- void defineOnlyInFinalSubclassToPreventSlicing() final {}
-};
-
-/* A Source is a source from which we can pull (replicate) data.
- stored in collection local.sources.
-
- Can be a group of things to replicate for several databases.
-
- { host: ..., source: ..., only: ..., syncedTo: ..., dbsNextPass: { ... },
- incompleteCloneDbs: { ... } }
-
- 'source' defaults to 'main'; support for multiple source names is
- not done (always use main for now).
-*/
-class ReplSource {
- std::shared_ptr<ThreadPool> tp;
-
- void resync(OperationContext* opCtx, const std::string& dbName);
-
- /** @param alreadyLocked caller already put us in write lock if true */
- void _sync_pullOpLog_applyOperation(OperationContext* opCtx, BSONObj& op, bool alreadyLocked);
-
- /* pull some operations from the master's oplog, and apply them.
- calls sync_pullOpLog_applyOperation
- */
- int _sync_pullOpLog(OperationContext* opCtx, int& nApplied);
-
- /* we only clone one database per pass, even if a lot need done. This helps us
- avoid overflowing the master's transaction log by doing too much work before going
- back to read more transactions. (Imagine a scenario of slave startup where we try to
- clone 100 databases in one pass.)
- */
- std::set<std::string> addDbNextPass;
-
- std::set<std::string> incompleteCloneDbs;
-
- /// TODO(spencer): Remove this once the LegacyReplicationCoordinator is gone.
- BSONObj _me;
-
- /**
- * Flag to control if a handshake is done on a connection or not. Default to false
- * until all databases are cloned.
- */
- bool _doHandshake = false;
-
- void resyncDrop(OperationContext* opCtx, const std::string& dbName);
- // call without the db mutex
- void syncToTailOfRemoteLog();
- std::string ns() const {
- return std::string("local.oplog.$") + sourceName();
- }
- unsigned _sleepAdviceTime;
-
- /**
- * If 'db' is a new database and its name would conflict with that of
- * an existing database, synchronize these database names with the
- * master.
- * @return true iff an op with the specified ns may be applied.
- */
- bool handleDuplicateDbName(OperationContext* opCtx,
- const BSONObj& op,
- const char* ns,
- const char* db);
-
- // populates _me so that it can be passed to oplogreader for handshakes
- /// TODO(spencer): Remove this function once the LegacyReplicationCoordinator is gone.
- void ensureMe(OperationContext* opCtx);
-
- void forceResync(OperationContext* opCtx, const char* requester);
-
- bool _connect(OplogReader* reader, const HostAndPort& host, const OID& myRID);
-
- Status _updateIfDoneWithInitialSync(OperationContext* opCtx);
-
-public:
- OplogReader oplogReader;
-
- void applyCommand(OperationContext* opCtx, const BSONObj& op);
- void applyOperation(OperationContext* opCtx, Database* db, const BSONObj& op);
- std::string hostName; // ip addr or hostname plus optionally, ":<port>"
- std::string _sourceName; // a logical source name.
- std::string sourceName() const {
- return _sourceName.empty() ? "main" : _sourceName;
- }
-
- // only a certain db. note that in the sources collection, this may not be changed once you
- // start replicating.
- std::string only;
-
- /* the last time point we have already synced up to (in the remote/master's oplog). */
- Timestamp syncedTo;
-
- int nClonedThisPass;
-
- typedef std::vector<std::shared_ptr<ReplSource>> SourceVector;
- static void loadAll(OperationContext* opCtx, SourceVector&);
-
- explicit ReplSource(OperationContext* opCtx, BSONObj);
- // This is not the constructor you are looking for. Always prefer the version that takes
- // a BSONObj. This is public only as a hack so that the ReplicationCoordinator can find
- // out the process's RID in master/slave setups.
- ReplSource(OperationContext* opCtx);
-
- /* -1 = error */
- int sync(OperationContext* opCtx, int& nApplied);
-
- void save(OperationContext* opCtx); // write ourself to local.sources
-
- // make a jsobj from our member fields of the form
- // { host: ..., source: ..., syncedTo: ... }
- BSONObj jsobj();
-
- bool operator==(const ReplSource& r) const {
- return hostName == r.hostName && sourceName() == r.sourceName();
- }
- std::string toString() const {
- return sourceName() + "@" + hostName;
- }
-
- bool haveMoreDbsToSync() const {
- return !addDbNextPass.empty();
- }
- int sleepAdvice() const {
- if (!_sleepAdviceTime)
- return 0;
- int wait = _sleepAdviceTime - unsigned(time(0));
- return wait > 0 ? wait : 0;
- }
-
- static bool throttledForceResyncDead(OperationContext* opCtx, const char* requester);
- static void forceResyncDead(OperationContext* opCtx, const char* requester);
-};
-
-/**
- * Helper class used to set and query an ignore state for a named database.
- * The ignore state will expire after a specified Timestamp.
- */
-class DatabaseIgnorer {
-public:
- /** Indicate that operations for 'db' should be ignored until after 'futureOplogTime' */
- void doIgnoreUntilAfter(const std::string& db, const Timestamp& futureOplogTime);
- /**
- * Query ignore state of 'db'; if 'currentOplogTime' is after the ignore
- * limit, the ignore state will be cleared.
- */
- bool ignoreAt(const std::string& db, const Timestamp& currentOplogTime);
-
-private:
- std::map<std::string, Timestamp> _ignores;
-};
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp
index 95dd56ea92b..e173b709602 100644
--- a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp
+++ b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp
@@ -71,10 +71,11 @@ void MockReplCoordServerFixture::setUp() {
repl::ReplSettings replSettings;
replSettings.setReplSetString(
ConnectionString::forReplicaSet("sessionTxnStateTest", {HostAndPort("a:1")}).toString());
- replSettings.setMaster(true);
repl::ReplicationCoordinator::set(
service, stdx::make_unique<repl::ReplicationCoordinatorMock>(service, replSettings));
+ ASSERT_OK(
+ repl::ReplicationCoordinator::get(service)->setFollowerMode(repl::MemberState::RS_PRIMARY));
// Note: internal code does not allow implicit creation of non-capped oplog collection.
DBDirectClient client(opCtx());
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index bd7e730c7a0..eba01f1076c 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -917,7 +917,6 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
} // namespace
constexpr StringData OplogApplication::kInitialSyncOplogApplicationMode;
-constexpr StringData OplogApplication::kMasterSlaveOplogApplicationMode;
constexpr StringData OplogApplication::kRecoveringOplogApplicationMode;
constexpr StringData OplogApplication::kSecondaryOplogApplicationMode;
constexpr StringData OplogApplication::kApplyOpsCmdOplogApplicationMode;
@@ -926,8 +925,6 @@ StringData OplogApplication::modeToString(OplogApplication::Mode mode) {
switch (mode) {
case OplogApplication::Mode::kInitialSync:
return OplogApplication::kInitialSyncOplogApplicationMode;
- case OplogApplication::Mode::kMasterSlave:
- return OplogApplication::kMasterSlaveOplogApplicationMode;
case OplogApplication::Mode::kRecovering:
return OplogApplication::kRecoveringOplogApplicationMode;
case OplogApplication::Mode::kSecondary:
@@ -941,8 +938,6 @@ StringData OplogApplication::modeToString(OplogApplication::Mode mode) {
StatusWith<OplogApplication::Mode> OplogApplication::parseMode(const std::string& mode) {
if (mode == OplogApplication::kInitialSyncOplogApplicationMode) {
return OplogApplication::Mode::kInitialSync;
- } else if (mode == OplogApplication::kMasterSlaveOplogApplicationMode) {
- return OplogApplication::Mode::kMasterSlave;
} else if (mode == OplogApplication::kRecoveringOplogApplicationMode) {
return OplogApplication::Mode::kRecovering;
} else if (mode == OplogApplication::kSecondaryOplogApplicationMode) {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index f6ea00d0067..1120b96c152 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -183,7 +183,6 @@ std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement
class OplogApplication {
public:
static constexpr StringData kInitialSyncOplogApplicationMode = "InitialSync"_sd;
- static constexpr StringData kMasterSlaveOplogApplicationMode = "MasterSlave"_sd;
static constexpr StringData kRecoveringOplogApplicationMode = "Recovering"_sd;
static constexpr StringData kSecondaryOplogApplicationMode = "Secondary"_sd;
static constexpr StringData kApplyOpsCmdOplogApplicationMode = "ApplyOps"_sd;
@@ -192,9 +191,6 @@ public:
// Used during the oplog application phase of the initial sync process.
kInitialSync,
- // Used when a slave is applying operations from a master node in master-slave.
- kMasterSlave,
-
// Used when we are applying oplog operations to recover the database state following an
// unclean shutdown, or when we are recovering from the oplog after we rollback to a
// checkpoint.
diff --git a/src/mongo/db/repl/repl_settings.cpp b/src/mongo/db/repl/repl_settings.cpp
index 5f73147ee47..3fb69df5d3d 100644
--- a/src/mongo/db/repl/repl_settings.cpp
+++ b/src/mongo/db/repl/repl_settings.cpp
@@ -28,11 +28,10 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
-#include "mongo/db/repl/repl_settings.h"
+#include "mongo/platform/basic.h"
-#include <string>
+#include "mongo/db/repl/repl_settings.h"
-#include "mongo/db/repl/bgsync.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -52,42 +51,11 @@ bool ReplSettings::usingReplSets() const {
/**
* Getters
*/
-bool ReplSettings::isSlave() const {
- return _slave;
-}
-
-bool ReplSettings::isMaster() const {
- return _master;
-}
-
-bool ReplSettings::isFastSyncEnabled() const {
- return _fastSyncEnabled;
-}
-
-bool ReplSettings::isAutoResyncEnabled() const {
- return _autoResyncEnabled;
-}
-
-Seconds ReplSettings::getSlaveDelaySecs() const {
- return _slaveDelaySecs;
-}
-
-int ReplSettings::getPretouch() const {
- return _pretouch;
-}
long long ReplSettings::getOplogSizeBytes() const {
return _oplogSizeBytes;
}
-std::string ReplSettings::getSource() const {
- return _source;
-}
-
-std::string ReplSettings::getOnly() const {
- return _only;
-}
-
std::string ReplSettings::getReplSetString() const {
return _replSetString;
}
@@ -103,42 +71,11 @@ bool ReplSettings::isPrefetchIndexModeSet() const {
/**
* Setters
*/
-void ReplSettings::setSlave(bool slave) {
- _slave = slave;
-}
-
-void ReplSettings::setMaster(bool master) {
- _master = master;
-}
-
-void ReplSettings::setFastSyncEnabled(bool fastSyncEnabled) {
- _fastSyncEnabled = fastSyncEnabled;
-}
-
-void ReplSettings::setAutoResyncEnabled(bool autoResyncEnabled) {
- _autoResyncEnabled = autoResyncEnabled;
-}
-
-void ReplSettings::setSlaveDelaySecs(int slaveDelay) {
- _slaveDelaySecs = Seconds(slaveDelay);
-}
-
-void ReplSettings::setPretouch(int pretouch) {
- _pretouch = pretouch;
-}
void ReplSettings::setOplogSizeBytes(long long oplogSizeBytes) {
_oplogSizeBytes = oplogSizeBytes;
}
-void ReplSettings::setSource(std::string source) {
- _source = source;
-}
-
-void ReplSettings::setOnly(std::string only) {
- _only = only;
-}
-
void ReplSettings::setReplSetString(std::string replSetString) {
_replSetString = replSetString;
}
diff --git a/src/mongo/db/repl/repl_settings.h b/src/mongo/db/repl/repl_settings.h
index ce804fb3502..1a74414dade 100644
--- a/src/mongo/db/repl/repl_settings.h
+++ b/src/mongo/db/repl/repl_settings.h
@@ -28,11 +28,9 @@
#pragma once
-#include <set>
#include <string>
#include "mongo/db/jsobj.h"
-#include "mongo/util/concurrency/mutex.h"
namespace mongo {
namespace repl {
@@ -56,15 +54,7 @@ public:
/**
* Getters
*/
- bool isSlave() const;
- bool isMaster() const;
- bool isFastSyncEnabled() const;
- bool isAutoResyncEnabled() const;
- Seconds getSlaveDelaySecs() const;
- int getPretouch() const;
long long getOplogSizeBytes() const;
- std::string getSource() const;
- std::string getOnly() const;
std::string getReplSetString() const;
/**
@@ -82,38 +72,13 @@ public:
/**
* Setters
*/
- void setSlave(bool slave);
- void setMaster(bool master);
- void setFastSyncEnabled(bool fastSyncEnabled);
- void setAutoResyncEnabled(bool autoResyncEnabled);
- void setSlaveDelaySecs(int slaveDelay);
- void setPretouch(int pretouch);
void setOplogSizeBytes(long long oplogSizeBytes);
- void setSource(std::string source);
- void setOnly(std::string only);
void setReplSetString(std::string replSetString);
void setPrefetchIndexMode(std::string prefetchIndexModeString);
private:
- /**
- * true means we are master and doing replication. If we are not writing to oplog, this won't
- * be true.
- */
- bool _master = false;
-
- // replication slave? (possibly with slave)
- bool _slave = false;
-
- bool _fastSyncEnabled = false;
- bool _autoResyncEnabled = false;
- Seconds _slaveDelaySecs = Seconds(0);
long long _oplogSizeBytes = 0; // --oplogSize
- // for master/slave replication
- std::string _source; // --source
- std::string _only; // --only
- int _pretouch = 0; // --pretouch for replication application (experimental)
-
std::string _replSetString; // --replSet[/<seedlist>]
// --indexPrefetch
diff --git a/src/mongo/db/repl/replication_coordinator.cpp b/src/mongo/db/repl/replication_coordinator.cpp
index 9c603c4db71..288cabf18a6 100644
--- a/src/mongo/db/repl/replication_coordinator.cpp
+++ b/src/mongo/db/repl/replication_coordinator.cpp
@@ -45,9 +45,6 @@ const auto getReplicationCoordinator =
ReplicationCoordinator::ReplicationCoordinator() {}
ReplicationCoordinator::~ReplicationCoordinator() {}
-// TODO(dannenberg) remove when master slave is removed
-const char* replAllDead = 0;
-
ReplicationCoordinator* ReplicationCoordinator::get(ServiceContext* service) {
return getReplicationCoordinator(service).get();
}
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index bce17fa9ea6..96be0d7be59 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -80,16 +80,6 @@ class ReplSetRequestVotesResponse;
class UpdatePositionArgs;
/**
- * Global variable that contains a std::string telling why master/slave halted
- *
- * "dead" means something really bad happened like replication falling completely out of sync.
- * when non-null, we are dead and the string is informational
- *
- * TODO(dannenberg) remove when master slave goes
- */
-extern const char* replAllDead;
-
-/**
* The ReplicationCoordinator is responsible for coordinating the interaction of replication
* with the rest of the system. The public methods on ReplicationCoordinator are the public
* API that the replication subsystem presents to the rest of the codebase.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 17fe69b8a0d..38697e94c1c 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -97,11 +97,6 @@ public:
virtual void stopDataReplication(OperationContext* opCtx) = 0;
/**
- * Starts the Master/Slave threads and sets up logOp
- */
- virtual void startMasterSlave(OperationContext* opCtx) = 0;
-
- /**
* Performs any necessary external state specific shutdown tasks, such as cleaning up
* the threads it started.
*/
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 474b15506af..00f2d968489 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -57,7 +57,6 @@
#include "mongo/db/repl/drop_pending_collection_reaper.h"
#include "mongo/db/repl/isself.h"
#include "mongo/db/repl/last_vote.h"
-#include "mongo/db/repl/master_slave.h"
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/noop_writer.h"
#include "mongo/db/repl/oplog.h"
@@ -325,10 +324,6 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s
_startedThreads = true;
}
-void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* opCtx) {
- repl::startMasterSlave(opCtx);
-}
-
void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx) {
UniqueLock lk(_threadMutex);
if (!_startedThreads) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 93e8149e3b3..3d124a24ef2 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -74,7 +74,6 @@ public:
virtual bool isInitialSyncFlagSet(OperationContext* opCtx) override;
- virtual void startMasterSlave(OperationContext* opCtx);
virtual void shutdown(OperationContext* opCtx);
virtual executor::TaskExecutor* getTaskExecutor() const override;
virtual ThreadPool* getDbWorkThreadPool() const override;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 29648d65bad..5f88d85e0ec 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -72,8 +72,6 @@ void ReplicationCoordinatorExternalStateMock::startSteadyStateReplication(Operat
void ReplicationCoordinatorExternalStateMock::stopDataReplication(OperationContext*) {}
-void ReplicationCoordinatorExternalStateMock::startMasterSlave(OperationContext*) {}
-
Status ReplicationCoordinatorExternalStateMock::runRepairOnLocalDB(OperationContext* opCtx) {
return Status::OK();
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 693a95ce4e2..2d7f5563bba 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -62,7 +62,6 @@ public:
virtual void stopDataReplication(OperationContext* opCtx) override;
virtual bool isInitialSyncFlagSet(OperationContext* opCtx) override;
- virtual void startMasterSlave(OperationContext*);
virtual void shutdown(OperationContext* opCtx);
virtual executor::TaskExecutor* getTaskExecutor() const override;
virtual ThreadPool* getDbWorkThreadPool() const override;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index f3393400981..bbf1aa7d020 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -303,9 +303,6 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings&
if (settings.usingReplSets()) {
return ReplicationCoordinator::modeReplSet;
}
- if (settings.isMaster() || settings.isSlave()) {
- return ReplicationCoordinator::modeMasterSlave;
- }
return ReplicationCoordinator::modeNone;
}
@@ -347,7 +344,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
_rsConfigState(kConfigPreStart),
_selfIndex(-1),
_sleptLastElection(false),
- _canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.isSlave())),
+ _canAcceptNonLocalWrites(!settings.usingReplSets()),
_canServeNonLocalReads(0U),
_replicationProcess(replicationProcess),
_storage(storage),
@@ -736,6 +733,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
_setConfigState_inlock(kConfigReplicationDisabled);
return;
}
+ invariant(_settings.usingReplSets());
{
OID rid = _externalState->ensureMe(opCtx);
@@ -747,13 +745,6 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
_topCoord->setMyRid(rid);
}
- if (!_settings.usingReplSets()) {
- // Must be Master/Slave
- invariant(_settings.isMaster() || _settings.isSlave());
- _externalState->startMasterSlave(opCtx);
- return;
- }
-
_replExecutor->startup();
{
@@ -1752,29 +1743,13 @@ void ReplicationCoordinatorImpl::_handleTimePassing(
}
bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() {
- if (_settings.usingReplSets()) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (getReplicationMode() == modeReplSet && _getMemberState_inlock().primary()) {
- return true;
- }
- return false;
- }
-
- if (!_settings.isSlave())
- return true;
-
-
- // TODO(dannenberg) replAllDead is bad and should be removed when master slave is removed
- if (replAllDead) {
- return false;
- }
-
- if (_settings.isMaster()) {
- // if running with --master --slave, allow.
+ if (!_settings.usingReplSets()) {
return true;
}
- return false;
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(getReplicationMode() == modeReplSet);
+ return _getMemberState_inlock().primary();
}
bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(OperationContext* opCtx,
@@ -1786,20 +1761,18 @@ bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(OperationContext* op
bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationContext* opCtx,
StringData dbName) {
- // _canAcceptNonLocalWrites is always true for standalone nodes, always false for nodes
- // started with --slave, and adjusted based on primary+drain state in replica sets.
+ // _canAcceptNonLocalWrites is always true for standalone nodes, and adjusted based on
+ // primary+drain state in replica sets.
//
- // That is, stand-alone nodes, non-slave nodes and drained replica set primaries can always
- // accept writes. Similarly, writes are always permitted to the "local" database. Finally,
- // in the event that a node is started with --slave and --master, we allow writes unless the
- // master/slave system has set the replAllDead flag.
+ // Stand-alone nodes and drained replica set primaries can always accept writes. Writes are
+ // always permitted to the "local" database.
if (_canAcceptNonLocalWrites || alwaysAllowNonLocalWrites(*opCtx)) {
return true;
}
if (dbName == kLocalDB) {
return true;
}
- return !replAllDead && _settings.isMaster();
+ return false;
}
bool ReplicationCoordinatorImpl::canAcceptWritesFor(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 25141375b23..bbc70047e36 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -78,16 +78,13 @@ const ReplSettings& ReplicationCoordinatorMock::getSettings() const {
}
bool ReplicationCoordinatorMock::isReplEnabled() const {
- return _settings.usingReplSets() || _settings.isMaster() || _settings.isSlave();
+ return _settings.usingReplSets();
}
ReplicationCoordinator::Mode ReplicationCoordinatorMock::getReplicationMode() const {
if (_settings.usingReplSets()) {
return modeReplSet;
}
- if (_settings.isMaster() || _settings.isSlave()) {
- return modeMasterSlave;
- }
return modeNone;
}
@@ -141,7 +138,7 @@ bool ReplicationCoordinatorMock::canAcceptWritesForDatabase(OperationContext* op
if (_alwaysAllowWrites) {
return true;
}
- return dbName == "local" || _memberState.primary() || _settings.isMaster();
+ return dbName == "local" || _memberState.primary();
}
bool ReplicationCoordinatorMock::canAcceptWritesForDatabase_UNSAFE(OperationContext* opCtx,
@@ -494,10 +491,6 @@ void ReplicationCoordinatorMock::alwaysAllowWrites(bool allowWrites) {
_alwaysAllowWrites = allowWrites;
}
-void ReplicationCoordinatorMock::setMaster(bool isMaster) {
- _settings.setMaster(isMaster);
-}
-
Status ReplicationCoordinatorMock::abortCatchupIfNeeded() {
return Status::OK();
}
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index fc33d735f13..7ee9664690e 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -46,7 +46,6 @@
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/is_master_response.h"
-#include "mongo/db/repl/master_slave.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -88,15 +87,8 @@ void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int
return;
}
- // TODO(dannenberg) replAllDead is bad and should be removed when master slave is removed
- if (replAllDead) {
- result.append("ismaster", 0);
- string s = string("dead: ") + replAllDead;
- result.append("info", s);
- } else {
- result.appendBool("ismaster",
- ReplicationCoordinator::get(opCtx)->isMasterForReportingPurposes());
- }
+ result.appendBool("ismaster",
+ ReplicationCoordinator::get(opCtx)->isMasterForReportingPurposes());
if (level) {
BSONObjBuilder sources(result.subarrayStart("sources"));
diff --git a/src/mongo/db/repl/resync.cpp b/src/mongo/db/repl/resync.cpp
deleted file mode 100644
index 4141485cb93..00000000000
--- a/src/mongo/db/repl/resync.cpp
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
-* Copyright (C) 2008 10gen Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
-
-#include "mongo/db/commands.h"
-#include "mongo/db/concurrency/d_concurrency.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/master_slave.h" // replSettings
-#include "mongo/db/repl/replication_coordinator.h"
-
-namespace mongo {
-
-using std::string;
-using std::stringstream;
-
-namespace repl {
-
-namespace {
-
-constexpr StringData kResyncFieldName = "resync"_sd;
-constexpr StringData kWaitFieldName = "wait"_sd;
-
-} // namespace
-
-// operator requested resynchronization of replication (on a slave or secondary). {resync: 1}
-class CmdResync : public ErrmsgCommandDeprecated {
-public:
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kAlways;
- }
- virtual bool adminOnly() const {
- return true;
- }
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
- return false;
- }
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) const {
- ActionSet actions;
- actions.addAction(ActionType::resync);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
-
- std::string help() const override {
- return "resync (from scratch) a stale slave or replica set secondary node.\n";
- }
-
- CmdResync() : ErrmsgCommandDeprecated(kResyncFieldName) {}
- virtual bool errmsgRun(OperationContext* opCtx,
- const string& dbname,
- const BSONObj& cmdObj,
- string& errmsg,
- BSONObjBuilder& result) {
- bool waitForResync = !cmdObj.hasField(kWaitFieldName) || cmdObj[kWaitFieldName].trueValue();
-
- // Replica set resync.
- ReplicationCoordinator* replCoord = ReplicationCoordinator::get(opCtx);
- if (replCoord->getSettings().usingReplSets()) {
- // Resync is disabled in production on replica sets until it stabilizes (SERVER-27081).
- if (!Command::testCommandsEnabled) {
- return CommandHelpers::appendCommandStatus(
- result,
- Status(ErrorCodes::OperationFailed,
- "Replica sets do not support the resync command"));
- }
-
- {
- // Need global write lock to transition out of SECONDARY
- Lock::GlobalWrite globalWriteLock(opCtx);
-
- const MemberState memberState = replCoord->getMemberState();
- if (memberState.startup()) {
- return CommandHelpers::appendCommandStatus(
- result, Status(ErrorCodes::NotYetInitialized, "no replication yet active"));
- }
- if (memberState.primary()) {
- return CommandHelpers::appendCommandStatus(
- result, Status(ErrorCodes::NotSecondary, "primaries cannot resync"));
- }
- auto status = replCoord->setFollowerMode(MemberState::RS_STARTUP2);
- if (!status.isOK()) {
- return CommandHelpers::appendCommandStatus(
- result,
- status.withContext(
- "Failed to transition to STARTUP2 state to perform resync"));
- }
- }
- uassertStatusOKWithLocation(replCoord->resyncData(opCtx, waitForResync), "resync", 0);
- return true;
- }
-
- // Master/Slave resync.
- Lock::GlobalWrite globalWriteLock(opCtx);
- // below this comment pertains only to master/slave replication
- if (cmdObj.getBoolField("force")) {
- if (!waitForSyncToFinish(opCtx, errmsg))
- return false;
- replAllDead = "resync forced";
- }
- // TODO(dannenberg) replAllDead is bad and should be removed when masterslave is removed
- if (!replAllDead) {
- errmsg = "not dead, no need to resync";
- return false;
- }
- if (!waitForSyncToFinish(opCtx, errmsg))
- return false;
-
- ReplSource::forceResyncDead(opCtx, "client");
- result.append("info", "triggered resync for all sources");
-
- return true;
- }
-
- bool waitForSyncToFinish(OperationContext* opCtx, string& errmsg) const {
- // Wait for slave thread to finish syncing, so sources will be be
- // reloaded with new saved state on next pass.
- Timer t;
- while (1) {
- if (syncing.load() == 0 || t.millis() > 30000)
- break;
- {
- Lock::TempRelease t(opCtx->lockState());
- relinquishSyncingSome.store(1);
- sleepmillis(1);
- }
- }
- if (syncing.load()) {
- errmsg = "timeout waiting for sync() to finish";
- return false;
- }
- return true;
- }
-} cmdResync;
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp
index 8ccb88af950..4eff300faf9 100644
--- a/src/mongo/dbtests/repltests.cpp
+++ b/src/mongo/dbtests/repltests.cpp
@@ -45,7 +45,6 @@
#include "mongo/db/json.h"
#include "mongo/db/op_observer_impl.h"
#include "mongo/db/ops/update.h"
-#include "mongo/db/repl/master_slave.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -121,12 +120,13 @@ public:
ASSERT_OK(sc->getTransportLayer()->start());
ReplSettings replSettings;
- replSettings.setOplogSizeBytes(10 * 1024 * 1024);
- replSettings.setMaster(true);
+ replSettings.setReplSetString("rs0/host1");
ReplicationCoordinator::set(
getGlobalServiceContext(),
std::unique_ptr<repl::ReplicationCoordinator>(
new repl::ReplicationCoordinatorMock(_opCtx.getServiceContext(), replSettings)));
+ ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext())
+ ->setFollowerMode(MemberState::RS_PRIMARY));
// Since the Client object persists across tests, even though the global
// ReplicationCoordinator does not, we need to clear the last op associated with the client
@@ -177,7 +177,7 @@ protected:
return "unittests.repltests";
}
static const char* cllNS() {
- return "local.oplog.$main";
+ return "local.oplog.rs";
}
BSONObj one(const BSONObj& query = BSONObj()) const {
return _client.findOne(ns(), query);
@@ -235,16 +235,13 @@ protected:
}
{
OldClientContext ctx(&_opCtx, ns());
- BSONObjBuilder b;
- b.append("host", "localhost");
- b.appendTimestamp("syncedTo", 0);
- ReplSource a(&_opCtx, b.obj());
for (vector<BSONObj>::iterator i = ops.begin(); i != ops.end(); ++i) {
if (0) {
mongo::unittest::log() << "op: " << *i << endl;
}
repl::UnreplicatedWritesBlock uwb(&_opCtx);
- a.applyOperation(&_opCtx, ctx.db(), *i);
+ uassertStatusOK(applyOperation_inlock(
+ &_opCtx, ctx.db(), *i, false, OplogApplication::Mode::kSecondary));
}
}
}
@@ -329,9 +326,9 @@ public:
if (mongo::storageGlobalParams.engine == "mobile") {
return;
}
- ASSERT_EQUALS(2, opCount());
+ ASSERT_EQUALS(1, opCount());
_client.insert(ns(), fromjson("{\"a\":\"b\"}"));
- ASSERT_EQUALS(3, opCount());
+ ASSERT_EQUALS(2, opCount());
}
};
@@ -1346,50 +1343,6 @@ public:
}
};
-class DatabaseIgnorerBasic {
-public:
- void run() {
- // Replication is not supported by mobile SE.
- if (mongo::storageGlobalParams.engine == "mobile") {
- return;
- }
- DatabaseIgnorer d;
- ASSERT(!d.ignoreAt("a", Timestamp(4, 0)));
- d.doIgnoreUntilAfter("a", Timestamp(5, 0));
- ASSERT(d.ignoreAt("a", Timestamp(4, 0)));
- ASSERT(!d.ignoreAt("b", Timestamp(4, 0)));
- ASSERT(d.ignoreAt("a", Timestamp(4, 10)));
- ASSERT(d.ignoreAt("a", Timestamp(5, 0)));
- ASSERT(!d.ignoreAt("a", Timestamp(5, 1)));
- // Ignore state is expired.
- ASSERT(!d.ignoreAt("a", Timestamp(4, 0)));
- }
-};
-
-class DatabaseIgnorerUpdate {
-public:
- void run() {
- // Replication is not supported by mobile SE.
- if (mongo::storageGlobalParams.engine == "mobile") {
- return;
- }
- DatabaseIgnorer d;
- d.doIgnoreUntilAfter("a", Timestamp(5, 0));
- d.doIgnoreUntilAfter("a", Timestamp(6, 0));
- ASSERT(d.ignoreAt("a", Timestamp(5, 5)));
- ASSERT(d.ignoreAt("a", Timestamp(6, 0)));
- ASSERT(!d.ignoreAt("a", Timestamp(6, 1)));
-
- d.doIgnoreUntilAfter("a", Timestamp(5, 0));
- d.doIgnoreUntilAfter("a", Timestamp(6, 0));
- d.doIgnoreUntilAfter("a", Timestamp(6, 0));
- d.doIgnoreUntilAfter("a", Timestamp(5, 0));
- ASSERT(d.ignoreAt("a", Timestamp(5, 5)));
- ASSERT(d.ignoreAt("a", Timestamp(6, 0)));
- ASSERT(!d.ignoreAt("a", Timestamp(6, 1)));
- }
-};
-
class SyncTest : public SyncTail {
public:
bool returnEmpty;
@@ -1511,8 +1464,6 @@ public:
add<Idempotence::ReplaySetPreexistingNoOpPull>();
add<Idempotence::ReplayArrayFieldNotAppended>();
add<DeleteOpIsIdBased>();
- add<DatabaseIgnorerBasic>();
- add<DatabaseIgnorerUpdate>();
add<FetchAndInsertMissingDocument>();
}
};
diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp
index cbf5099a2df..f9a54847fd2 100644
--- a/src/mongo/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/s/sharding_mongod_test_fixture.cpp
@@ -105,7 +105,6 @@ void ShardingMongodTestFixture::setUp() {
repl::ReplSettings replSettings;
replSettings.setReplSetString(ConnectionString::forReplicaSet(_setName, _servers).toString());
- replSettings.setMaster(true);
auto replCoordPtr = makeReplicationCoordinator(replSettings);
_replCoord = replCoordPtr.get();
@@ -156,7 +155,10 @@ void ShardingMongodTestFixture::setUp() {
std::unique_ptr<ReplicationCoordinatorMock> ShardingMongodTestFixture::makeReplicationCoordinator(
ReplSettings replSettings) {
- return stdx::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext(), replSettings);
+ auto coordinator =
+ stdx::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext(), replSettings);
+ ASSERT_OK(coordinator->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ return coordinator;
}
std::unique_ptr<executor::TaskExecutorPool> ShardingMongodTestFixture::makeTaskExecutorPool() {