1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
|
"""Fixture with multiple replica sets for executing JSTests against."""
import os.path
from buildscripts.resmokelib import config
from buildscripts.resmokelib import errors
from buildscripts.resmokelib import logging
from buildscripts.resmokelib import utils
from buildscripts.resmokelib.testing.fixtures import interface
from buildscripts.resmokelib.testing.fixtures import replicaset
class TenantMigrationFixture(interface.Fixture): # pylint: disable=too-many-instance-attributes
"""Fixture which provides JSTests with a set of replica sets to run tenant migration against."""
def __init__( # pylint: disable=too-many-arguments,too-many-locals
self, logger, job_num, common_mongod_options=None, per_mongod_options=None,
dbpath_prefix=None, preserve_dbpath=False, num_replica_sets=1,
num_nodes_per_replica_set=2, start_initial_sync_node=False,
write_concern_majority_journal_default=None, auth_options=None,
replset_config_options=None, voting_secondaries=True, all_nodes_electable=False,
use_replica_set_connection_string=None, linear_chain=False, mixed_bin_versions=None,
default_read_concern=None, default_write_concern=None):
"""Initialize TenantMigrationFixture with different options for the replica set processes."""
interface.Fixture.__init__(self, logger, job_num, dbpath_prefix=dbpath_prefix)
self.common_mongod_options = utils.default_if_none(common_mongod_options, {})
self.per_mongod_options = utils.default_if_none(per_mongod_options, {})
self.preserve_dbpath = preserve_dbpath
self.start_initial_sync_node = start_initial_sync_node
self.write_concern_majority_journal_default = write_concern_majority_journal_default
self.auth_options = auth_options
self.replset_config_options = utils.default_if_none(replset_config_options, {})
self.voting_secondaries = voting_secondaries
self.all_nodes_electable = all_nodes_electable
self.use_replica_set_connection_string = use_replica_set_connection_string
self.default_read_concern = default_read_concern
self.default_write_concern = default_write_concern
self.mixed_bin_versions = utils.default_if_none(mixed_bin_versions,
config.MIXED_BIN_VERSIONS)
self.mixed_bin_versions_config = self.mixed_bin_versions
# Use the values given from the command line if they exist for linear_chain and num_nodes.
linear_chain_option = utils.default_if_none(config.LINEAR_CHAIN, linear_chain)
self.linear_chain = linear_chain_option if linear_chain_option else linear_chain
self.num_nodes_per_replica_set = num_nodes_per_replica_set if num_nodes_per_replica_set \
else config.NUM_REPLSET_NODES
self.num_replica_sets = num_replica_sets if num_replica_sets else config.NUM_REPLSETS
if self.num_replica_sets < 2:
raise ValueError("num_replica_sets must be greater or equal to 2")
self.replica_sets = []
# The ReplicaSetFixture for the replica set that starts out owning the data (i.e. the
# replica set that driver should connect to when running commands).
self.replica_set_with_tenant = None
def pids(self):
""":return: pids owned by this fixture if any."""
out = []
for replica_set in self.replica_sets:
out.extend(replica_set.pids())
if not out:
self.logger.debug('No replica sets when gathering multi replicaset fixture pids.')
return out
def setup(self):
"""Set up the replica sets."""
if not self.replica_sets:
for i in range(self.num_replica_sets):
rs_name = f"rs{i}"
mongod_options = self.common_mongod_options.copy()
mongod_options.update(self.per_mongod_options[i])
mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, rs_name)
mongod_options["replSet"] = rs_name
self.replica_sets.append(
replicaset.ReplicaSetFixture(
self.logger, self.job_num, mongod_options=mongod_options,
preserve_dbpath=self.preserve_dbpath,
num_nodes=self.num_nodes_per_replica_set, auth_options=self.auth_options,
replset_config_options=self.replset_config_options,
mixed_bin_versions=self.mixed_bin_versions,
replicaset_logging_prefix=rs_name,
use_replica_set_connection_string=self.use_replica_set_connection_string,
all_nodes_electable=self.all_nodes_electable))
self.replica_set_with_tenant = self.replica_sets[0]
# Start up each of the replica sets
for replica_set in self.replica_sets:
replica_set.setup()
self._create_tenant_migration_donor_and_recipient_roles(replica_set)
def await_ready(self):
"""Block until the fixture can be used for testing."""
# Wait for each of the replica sets
for replica_set in self.replica_sets:
replica_set.await_ready()
def _do_teardown(self, mode=None):
"""Shut down the replica sets."""
self.logger.info("Stopping all replica sets...")
running_at_start = self.is_running()
if not running_at_start:
self.logger.warning("All replica sets were expected to be running, but weren't.")
teardown_handler = interface.FixtureTeardownHandler(self.logger)
for replica_set in self.replica_sets:
teardown_handler.teardown(replica_set, "replica_set", mode=mode)
if teardown_handler.was_successful():
self.logger.info("Successfully stopped all replica sets.")
else:
self.logger.error("Stopping the fixture failed.")
raise errors.ServerFailure(teardown_handler.get_error_message())
def is_running(self):
"""Return true if all replica sets are still operating."""
return all(replica_set.is_running() for replica_set in self.replica_sets)
def get_num_replsets(self):
"""Return the number of replica sets."""
return self.num_replica_sets
def get_replset(self, index):
"""Return the ReplicaSetFixture for the replica set at the given index."""
if not self.replica_sets:
raise ValueError("Must call setup() before calling get_replset")
return self.replica_sets[index]
def get_replsets(self):
"""Return the ReplicaSetFixtures for all the replica sets."""
if not self.replica_sets:
raise ValueError("Must call setup() before calling get_replsets")
return self.replica_sets
def get_internal_connection_string(self):
"""Return the internal connection string to the replica set that currently starts out owning the data."""
if not self.replica_sets:
raise ValueError("Must call setup() before calling get_internal_connection_string()")
return self.replica_set_with_tenant.get_internal_connection_string()
def get_driver_connection_url(self):
"""Return the driver connection URL to the replica set that currently starts out owning the data."""
if not self.replica_set_with_tenant:
raise ValueError("Must call setup() before calling get_driver_connection_url")
return self.replica_set_with_tenant.get_driver_connection_url()
def get_node_info(self):
"""Return a list of dicts of NodeInfo objects."""
output = []
for replica_set in self.replica_sets:
output += replica_set.get_node_info()
return output
def _create_tenant_migration_donor_and_recipient_roles(self, rs):
"""Create a role for tenant migration donor and recipient."""
primary = rs.get_primary()
primary_client = interface.authenticate(primary.mongo_client(), self.auth_options)
try:
primary_client.admin.command({
"createRole": "tenantMigrationDonorRole", "privileges": [{
"resource": {"cluster": True}, "actions": ["runTenantMigration"]
}, {"resource": {"db": "admin", "collection": "system.keys"}, "actions": ["find"]}],
"roles": []
})
except:
self.logger.exception(
"Error creating tenant migration donor role on primary on port %d of replica" +
" set '%s'.", primary.port, rs.replset_name)
raise
try:
primary_client.admin.command({
"createRole": "tenantMigrationRecipientRole", "privileges":
[{"resource": {"cluster": True}, "actions": ["listDatabases", "useUUID"]},
{"resource": {"db": "", "collection": ""}, "actions": ["listCollections"]},
{
"resource": {"anyResource": True},
"actions": ["dbStats", "collStats", "find", "listIndexes"]
}], "roles": []
})
except:
self.logger.exception(
"Error creating tenant migration recipient role on primary on port %d of replica" +
" set '%s'.", primary.port, rs.replset_name)
raise
|