summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/testing/hooks/tenant_migration.py
blob: 0ec5d69897b164df3b15ba1706574ac683769910 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
"""Test hook that runs tenant migrations continuously."""

import copy
import random
import re
import threading
import time
import uuid

import bson
import pymongo.errors

from buildscripts.resmokelib import errors
from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
from buildscripts.resmokelib.testing.fixtures import tenant_migration
from buildscripts.resmokelib.testing.hooks import dbhash_tenant_migration
from buildscripts.resmokelib.testing.hooks import interface


class ContinuousTenantMigration(interface.Hook):
    """Starts a tenant migration thread at the beginning of each test."""

    DESCRIPTION = ("Continuous tenant migrations")

    IS_BACKGROUND = True

    def __init__(self, hook_logger, fixture, shell_options):
        """Initialize the ContinuousTenantMigration.

        Args:
            hook_logger: the logger instance for this hook.
            fixture: the target TenantMigrationFixture containing two replica sets.
            shell_options: contains the global_vars which contains TestData.tenantId to be used for
                           tenant migrations.

        """
        interface.Hook.__init__(self, hook_logger, fixture, ContinuousTenantMigration.DESCRIPTION)

        if not isinstance(fixture, tenant_migration.TenantMigrationFixture):
            raise ValueError("The ContinuousTenantMigration hook requires a TenantMigrationFixture")
        self._tenant_migration_fixture = fixture
        self._shell_options = copy.deepcopy(shell_options)

        self._tenant_migration_thread = None

    def before_suite(self, test_report):
        """Before suite."""
        if not self._tenant_migration_fixture:
            raise ValueError("No TenantMigrationFixture to run migrations on")
        self.logger.info("Starting the tenant migration thread.")
        self._tenant_migration_thread = _TenantMigrationThread(
            self.logger, self._tenant_migration_fixture, self._shell_options, test_report)
        self._tenant_migration_thread.start()

    def after_suite(self, test_report, teardown_flag=None):
        """After suite."""
        self.logger.info("Stopping the tenant migration thread.")
        self._tenant_migration_thread.stop()
        self.logger.info("Stopped the tenant migration thread.")

    def before_test(self, test, test_report):
        """Before test."""
        self.logger.info("Resuming the tenant migration thread.")
        self._tenant_migration_thread.resume(test)

    def after_test(self, test, test_report):
        """After test."""
        self.logger.info("Pausing the tenant migration thread.")
        self._tenant_migration_thread.pause()
        self.logger.info("Paused the tenant migration thread.")


class TenantMigrationLifeCycle(object):
    """Class for managing the various states of the tenant migration thread.

    The job thread alternates between calling mark_test_started() and mark_test_finished(). The
    tenant migration thread is allowed to perform migrations at any point between these two calls.
    Note that the job thread synchronizes with the tenant migration thread outside the context of
    this object to know it isn't in the process of running a migration.
    """

    _TEST_STARTED_STATE = "start"
    _TEST_FINISHED_STATE = "finished"

    def __init__(self):
        """Initialize the MigrationLifecycle instance."""
        self.__lock = threading.Lock()
        self.__cond = threading.Condition(self.__lock)

        self.test_num = 0
        self.__test_state = self._TEST_FINISHED_STATE
        self.__should_stop = False

    def mark_test_started(self):
        """Signal to the tenant migration thread that a new test has started.

        This function should be called during before_test(). Calling it causes the
        wait_for_tenant_migration_permitted() function to no longer block and to instead return
        true.
        """
        with self.__lock:
            self.test_num += 1
            self.__test_state = self._TEST_STARTED_STATE
            self.__cond.notify_all()

    def mark_test_finished(self):
        """Signal to the tenant migration thread that the current test has finished.

        This function should be called during after_test(). Calling it causes the
        wait_for_tenant_migration_permitted() function to block until mark_test_started() is called
        again.
        """
        with self.__lock:
            self.__test_state = self._TEST_FINISHED_STATE
            self.__cond.notify_all()

    def is_test_finished(self):
        """Return true if the current test has finished."""
        with self.__lock:
            return self.__test_state == self._TEST_FINISHED_STATE

    def stop(self):
        """Signal to the tenant migration thread that it should exit.

        This function should be called during after_suite(). Calling it causes the
        wait_for_tenant_migration_permitted() function to no longer block and to instead return
        false.
        """
        with self.__lock:
            self.__should_stop = True
            self.__cond.notify_all()

    def wait_for_tenant_migration_permitted(self):
        """Block until migrations are permitted, or until stop() is called.

        Return true if migrations are permitted, and false if migrations are not permitted.
        """
        with self.__lock:
            while not self.__should_stop:
                if self.__test_state == self._TEST_STARTED_STATE:
                    return True

                self.__cond.wait()

        return False

    def wait_for_tenant_migration_interval(self, timeout):
        """Block for 'timeout' seconds, or until stop() is called."""
        with self.__lock:
            self.__cond.wait(timeout)

    def poll_for_idle_request(self):  # noqa: D205,D400
        """Return true if the tenant migration thread should continue running migrations, or false
        if it should temporarily stop running migrations.
        """
        with self.__lock:
            return self.__test_state == self._TEST_FINISHED_STATE


def get_certificate_and_private_key(pem_file_path):  # noqa: D205,D400
    """Return a dictionary containing the certificate and private key extracted from the given pem
    file.
    """
    lines = open(pem_file_path, 'rt').read()
    certificate = re.findall(
        re.compile("(-*BEGIN CERTIFICATE-*\n(.*\n)*-*END CERTIFICATE-*\n)", re.MULTILINE),
        lines)[0][0]
    private_key = re.findall(
        re.compile("(-*BEGIN PRIVATE KEY-*\n(.*\n)*-*END PRIVATE KEY-*\n)", re.MULTILINE),
        lines)[0][0]
    return {"certificate": certificate, "privateKey": private_key}


def get_primary(rs, logger, max_tries=5):  # noqa: D205,D400
    """Return the primary from a replica set. Retries up to 'max_tries' times of it fails to get
    the primary within the time limit.
    """
    num_tries = 0
    while num_tries < max_tries:
        num_tries += 1
        try:
            return rs.get_primary()
        except errors.ServerFailure:
            logger.info(
                "Timed out while waiting for a primary for replica set '%s' on try %d." +
                " Retrying." if num_tries < max_tries else "", rs.replset_name, num_tries)


class _TenantMigrationOptions:
    def __init__(self, donor_rs, recipient_rs, tenant_id, read_preference, logger):
        self.donor_rs = donor_rs
        self.recipient_rs = recipient_rs
        self.migration_id = uuid.uuid4()
        self.tenant_id = tenant_id
        self.read_preference = read_preference
        self.logger = logger

    def get_donor_name(self):
        """Return the replica set name for the donor."""
        return self.donor_rs.replset_name

    def get_recipient_name(self):
        """Return the replica set name for the recipient."""
        return self.recipient_rs.replset_name

    def get_donor_primary(self):
        """Return a connection to the donor primary."""
        return get_primary(self.donor_rs, self.logger)

    def get_recipient_primary(self):
        """Return a connection to the recipient primary."""
        return get_primary(self.recipient_rs, self.logger)

    def get_donor_nodes(self):
        """Return a list of connections to donor replica set nodes."""
        return self.donor_rs.nodes

    def get_recipient_nodes(self):
        """Return a list of connections to recipient replica set nodes."""
        return self.recipient_rs.nodes

    def __str__(self):
        opts = {
            "donor": self.get_donor_name(), "recipient": self.get_recipient_name(),
            "migration_id": self.migration_id, "tenant_id": self.tenant_id,
            "read_preference": self.read_preference
        }
        return str(opts)


class _TenantMigrationThread(threading.Thread):
    THREAD_NAME = "TenantMigrationThread"

    WAIT_SECS_RANGES = [[0.05, 0.1], [0.1, 0.5], [1, 5], [5, 15]]
    POLL_INTERVAL_SECS = 0.1

    NO_SUCH_MIGRATION_ERR_CODE = 327
    INTERNAL_ERR_CODE = 1
    INVALID_SYNC_SOURCE_ERR_CODE = 119
    FAIL_TO_PARSE_ERR_CODE = 9
    NO_SUCH_KEY_ERR_CODE = 4

    def __init__(self, logger, tenant_migration_fixture, shell_options, test_report):
        """Initialize _TenantMigrationThread."""
        threading.Thread.__init__(self, name=self.THREAD_NAME)
        self.daemon = True
        self.logger = logger
        self._tenant_migration_fixture = tenant_migration_fixture
        self._tenant_id = shell_options["global_vars"]["TestData"]["tenantId"]
        self._auth_options = shell_options["global_vars"]["TestData"]["authOptions"]
        self._test = None
        self._test_report = test_report
        self._shell_options = shell_options

        self.__lifecycle = TenantMigrationLifeCycle()
        # Event set when the thread has been stopped using the 'stop()' method.
        self._is_stopped_evt = threading.Event()
        # Event set when the thread is not performing migrations.
        self._is_idle_evt = threading.Event()
        self._is_idle_evt.set()

    def run(self):
        """Execute the thread."""
        if not self._tenant_migration_fixture:
            self.logger.warning("No TenantMigrationFixture to run migrations on.")
            return

        test_num = 0
        migration_num = 0
        donor_rs_index = 0

        try:
            while True:
                self._is_idle_evt.set()

                permitted = self.__lifecycle.wait_for_tenant_migration_permitted()
                if not permitted:
                    break

                self._is_idle_evt.clear()

                if self.__lifecycle.test_num > test_num:
                    # Reset donor_rs_index to 0 since the shell always routes all requests to rs0
                    # at the start of a test.
                    test_num = self.__lifecycle.test_num
                    donor_rs_index = 0

                recipient_rs_index = (
                    donor_rs_index + 1) % self._tenant_migration_fixture.get_num_replsets()
                migration_opts = self._create_migration_opts(donor_rs_index, recipient_rs_index)

                # Briefly wait to let the test run before starting the tenant migration, so that
                # the first migration is more likely to have data to migrate.
                wait_secs = random.uniform(
                    *self.WAIT_SECS_RANGES[migration_num % len(self.WAIT_SECS_RANGES)])
                self.logger.info("Waiting for %.3f seconds before starting migration.", wait_secs)
                self.__lifecycle.wait_for_tenant_migration_interval(wait_secs)

                self.logger.info("Starting tenant migration: %s.", str(migration_opts))
                start_time = time.time()
                is_committed = self._run_migration(migration_opts)
                end_time = time.time()
                self.logger.info("Completed tenant migration in %0d ms: %s.",
                                 (end_time - start_time) * 1000, str(migration_opts))

                found_idle_request = self.__lifecycle.poll_for_idle_request()
                if found_idle_request:
                    continue

                if is_committed:
                    donor_rs_index = recipient_rs_index
                migration_num += 1
        except Exception:  # pylint: disable=W0703
            # Proactively log the exception when it happens so it will be flushed immediately.
            self.logger.exception("Tenant migration thread threw exception")
            # The event should be signaled whenever the thread is not performing migrations.
            self._is_idle_evt.set()

    def stop(self):
        """Stop the thread when the suite finishes."""
        self.__lifecycle.stop()
        self._is_stopped_evt.set()
        # Unpause to allow the thread to finish.
        self.resume(self._test)
        self.join()

    def pause(self):
        """Pause the thread after test."""
        self.__lifecycle.mark_test_finished()

        # Wait until we are no longer executing migrations.
        self._is_idle_evt.wait()
        # Check if the thread is alive in case it has thrown an exception while running.
        self._check_thread()

        # Check that the fixture is still running.
        if not self._tenant_migration_fixture.is_running():
            raise errors.ServerFailure(
                "TenantMigrationFixture with pids {} expected to be running in"
                " ContinuousTenantMigration, but wasn't".format(
                    self._tenant_migration_fixture.pids()))

    def resume(self, test):
        """Resume the thread before test."""
        self._test = test
        self.__lifecycle.mark_test_started()

    def _wait(self, timeout):
        """Wait until stop or timeout."""
        self._is_stopped_evt.wait(timeout)

    def short_name(self):
        """Return the name of the thread."""
        return self.THREAD_NAME

    def _check_thread(self):
        """Throw an error if the thread is not running."""
        if not self.is_alive():
            msg = "Tenant migration thread is not running."
            self.logger.error(msg)
            raise errors.ServerFailure(msg)

    def _is_fail_point_abort_reason(self, abort_reason):
        return abort_reason["code"] == self.INTERNAL_ERR_CODE and abort_reason[
            "errmsg"] == "simulate a tenant migration error"

    def _create_migration_opts(self, donor_rs_index, recipient_rs_index):
        donor_rs = self._tenant_migration_fixture.get_replset(donor_rs_index)
        recipient_rs = self._tenant_migration_fixture.get_replset(recipient_rs_index)
        read_preference = {"mode": "primary"} if random.randint(0, 1) else {"mode": "secondary"}
        return _TenantMigrationOptions(donor_rs, recipient_rs, self._tenant_id, read_preference,
                                       self.logger)

    def _create_client(self, node):
        return fixture_interface.authenticate(node.mongo_client(), self._auth_options)

    def _check_tenant_migration_dbhash(self, migration_opts):
        # Set the donor connection string, recipient connection string, and migration uuid string
        # for the tenant migration dbhash check script.
        self._shell_options[
            "global_vars"]["TestData"]["donorConnectionString"] = migration_opts.get_donor_primary(
            ).get_internal_connection_string()
        self._shell_options["global_vars"]["TestData"][
            "recipientConnectionString"] = migration_opts.get_recipient_primary(
            ).get_internal_connection_string()
        self._shell_options["global_vars"]["TestData"][
            "migrationIdString"] = migration_opts.migration_id.__str__()

        dbhash_test_case = dbhash_tenant_migration.CheckTenantMigrationDBHash(
            self.logger, self._tenant_migration_fixture, self._shell_options)
        dbhash_test_case.before_suite(self._test_report)
        dbhash_test_case.before_test(self._test, self._test_report)
        dbhash_test_case.after_test(self._test, self._test_report)
        dbhash_test_case.after_suite(self._test_report)

    def _run_migration(self, migration_opts):  # noqa: D205,D400
        """Run a tenant migration based on 'migration_opts', wait for the migration decision and
        garbage collection. Return true if the migration commits and false otherwise.
        """
        try:
            # Clean up any orphaned tenant databases on the recipient allow next migration to start.
            self._drop_tenant_databases(migration_opts.recipient_rs)
            res = self._start_and_wait_for_migration(migration_opts)
            is_committed = res["state"] == "committed"

            # Garbage collect the migration prior to throwing error to avoid migration conflict
            # in the next test.
            if is_committed:
                # Once we have committed a migration, run a dbhash check before rerouting commands.
                self._check_tenant_migration_dbhash(migration_opts)

                # If the migration committed, to avoid routing commands incorrectly, wait for the
                # donor/proxy to reroute at least one command before doing garbage collection. Stop
                # waiting when the test finishes.
                self._wait_for_reroute_or_test_completion(migration_opts)
            self._forget_migration(migration_opts)
            self._wait_for_migration_garbage_collection(migration_opts)

            if not res["ok"]:
                raise errors.ServerFailure("Tenant migration '" + str(migration_opts.migration_id) +
                                           "' with donor replica set '" +
                                           migration_opts.get_donor_name() + "' failed: " +
                                           str(res))

            if is_committed:
                return True

            abort_reason = res["abortReason"]
            if self._is_fail_point_abort_reason(abort_reason):
                self.logger.info(
                    "Tenant migration '%s' with donor replica set '%s' aborted due to failpoint: " +
                    "%s.", migration_opts.migration_id, migration_opts.get_donor_name(), str(res))
                return False
            raise errors.ServerFailure("Tenant migration '" + str(migration_opts.migration_id) +
                                       "' with donor replica set '" +
                                       migration_opts.get_donor_name() +
                                       "' aborted due to an error: " + str(res))
        except pymongo.errors.PyMongoError:
            self.logger.exception(
                "Error running tenant migration '%s' with donor primary on replica set '%s'.",
                migration_opts.migration_id, migration_opts.get_donor_name())
            raise

    def _start_and_wait_for_migration(self, migration_opts):  # noqa: D205,D400
        """Run donorStartMigration to start a tenant migration based on 'migration_opts', wait for
        the migration decision and return the last response for donorStartMigration.
        """
        cmd_obj = {
            "donorStartMigration":
                1,
            "migrationId":
                bson.Binary(migration_opts.migration_id.bytes, 4),
            "recipientConnectionString":
                migration_opts.recipient_rs.get_driver_connection_url(),
            "tenantId":
                migration_opts.tenant_id,
            "readPreference":
                migration_opts.read_preference,
            "donorCertificateForRecipient":
                get_certificate_and_private_key("jstests/libs/tenant_migration_donor.pem"),
            "recipientCertificateForDonor":
                get_certificate_and_private_key("jstests/libs/tenant_migration_recipient.pem"),
        }
        donor_primary = migration_opts.get_donor_primary()

        self.logger.info(
            "Starting tenant migration '%s' on donor primary on port %d of replica set '%s'.",
            migration_opts.migration_id, donor_primary.port, migration_opts.get_donor_name())

        while True:
            try:
                # Keep polling the migration state until the migration completes.
                donor_primary_client = self._create_client(donor_primary)
                res = donor_primary_client.admin.command(
                    cmd_obj,
                    bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
            except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
                donor_primary = migration_opts.get_donor_primary()
                self.logger.info(
                    "Retrying tenant migration '%s' against donor primary on port %d of replica " +
                    "set '%s'.", migration_opts.migration_id, donor_primary.port,
                    migration_opts.get_donor_name())
                continue
            if res["state"] == "committed":
                self.logger.info(
                    "Tenant migration '%s' with donor primary on port %d of replica set '%s' has " +
                    "committed.", migration_opts.migration_id, donor_primary.port,
                    migration_opts.get_donor_name())
                return res
            if res["state"] == "aborted":
                self.logger.info(
                    "Tenant migration '%s' with donor primary on port %d of replica set '%s' has " +
                    "aborted: %s.", migration_opts.migration_id, donor_primary.port,
                    migration_opts.get_donor_name(), str(res))
                return res
            if not res["ok"]:
                self.logger.info(
                    "Tenant migration '%s' with donor primary on port %d of replica set '%s' has " +
                    "failed: %s.", migration_opts.migration_id, donor_primary.port,
                    migration_opts.get_donor_name(), str(res))
                return res
            time.sleep(self.POLL_INTERVAL_SECS)

    def _forget_migration(self, migration_opts):
        """Run donorForgetMigration to garbage collection the tenant migration denoted by migration_opts'."""
        self.logger.info("Forgetting tenant migration: %s.", str(migration_opts))

        cmd_obj = {
            "donorForgetMigration": 1, "migrationId": bson.Binary(migration_opts.migration_id.bytes,
                                                                  4)
        }
        donor_primary = migration_opts.get_donor_primary()

        self.logger.info(
            "Forgetting tenant migration '%s' on donor primary on port %d of replica set '%s'.",
            migration_opts.migration_id, donor_primary.port, migration_opts.get_donor_name())

        while True:
            try:
                donor_primary_client = self._create_client(donor_primary)
                donor_primary_client.admin.command(
                    cmd_obj,
                    bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
                return
            except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
                donor_primary = migration_opts.get_donor_primary()
                self.logger.info(
                    "Retrying forgetting tenant migration '%s' against donor primary on port %d of "
                    + "replica set '%s'.", migration_opts.migration_id, donor_primary.port,
                    migration_opts.get_donor_name())
                continue
            except pymongo.errors.OperationFailure as err:
                if err.code != self.NO_SUCH_MIGRATION_ERR_CODE:
                    raise
                # The fixture was restarted.
                self.logger.info(
                    "Could not find tenant migration '%s' on donor primary on" +
                    " port %d of replica set '%s': %s.", migration_opts.migration_id,
                    donor_primary.port, migration_opts.get_donor_name(), str(err))
                return
            except pymongo.errors.PyMongoError:
                self.logger.exception(
                    "Error forgetting tenant migration '%s' on donor primary on" +
                    " port %d of replica set '%s'.", migration_opts.migration_id,
                    donor_primary.port, migration_opts.get_donor_name())
                raise

    def _wait_for_migration_garbage_collection(self, migration_opts):  # noqa: D205,D400
        """Wait until the persisted state for tenant migration denoted by 'migration_opts' has been
        garbage collected on both the donor and recipient.
        """
        try:
            donor_nodes = migration_opts.get_donor_nodes()
            for donor_node in donor_nodes:
                self.logger.info(
                    "Waiting for tenant migration '%s' to be garbage collected on donor node on " +
                    "port %d of replica set '%s'.", migration_opts.migration_id, donor_node.port,
                    migration_opts.get_donor_name())

                while True:
                    try:
                        donor_node_client = self._create_client(donor_node)
                        res = donor_node_client.config.command({
                            "count": "tenantMigrationDonors",
                            "query": {"tenantId": migration_opts.tenant_id}
                        })
                        if res["n"] == 0:
                            break
                    except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
                        # Ignore NotMasterErrors because it's possible to fail with
                        # InterruptedDueToReplStateChange if the donor primary steps down or shuts
                        # down during the garbage collection check.
                        self.logger.info(
                            "Retrying waiting for tenant migration '%s' to be garbage collected on"
                            + " donor node on port %d of replica set '%s'.",
                            migration_opts.migration_id, donor_node.port,
                            migration_opts.get_donor_name())
                        continue
                    time.sleep(self.POLL_INTERVAL_SECS)

            recipient_nodes = migration_opts.get_recipient_nodes()
            for recipient_node in recipient_nodes:
                self.logger.info(
                    "Waiting for tenant migration '%s' to be garbage collected on recipient node on"
                    + " port %d of replica set '%s'.", migration_opts.migration_id,
                    recipient_node.port, migration_opts.get_recipient_name())

                while True:
                    try:
                        recipient_node_client = self._create_client(recipient_node)
                        res = recipient_node_client.config.command({
                            "count": "tenantMigrationRecipients",
                            "query": {"tenantId": migration_opts.tenant_id}
                        })
                        if res["n"] == 0:
                            break
                    except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
                        # Ignore NotMasterErrors because it's possible to fail with
                        # InterruptedDueToReplStateChange if the recipient primary steps down or
                        # shuts down during the garbage collection check.
                        self.logger.info(
                            "Retrying waiting for tenant migration '%s' to be garbage collected on"
                            + " recipient node on port %d of replica set '%s'.",
                            migration_opts.migration_id, recipient_node.port,
                            migration_opts.get_recipient_name())
                        continue
                    time.sleep(self.POLL_INTERVAL_SECS)
        except pymongo.errors.PyMongoError:
            self.logger.exception(
                "Error waiting for tenant migration '%s' from donor replica set '%s" +
                " to recipient replica set '%s' to be garbage collected.",
                migration_opts.migration_id, migration_opts.get_donor_name(),
                migration_opts.get_recipient_name())
            raise

    def _wait_for_reroute_or_test_completion(self, migration_opts):
        start_time = time.time()
        donor_primary = migration_opts.get_donor_primary()

        self.logger.info(
            "Waiting for donor primary on port %d of replica set '%s' for tenant migration '%s' " +
            "to reroute at least one conflicting command. Stop waiting when the test finishes.",
            donor_primary.port, migration_opts.get_donor_name(), migration_opts.migration_id)

        while not self.__lifecycle.is_test_finished():
            try:
                donor_primary_client = self._create_client(donor_primary)
                doc = donor_primary_client["testTenantMigration"]["rerouted"].find_one(
                    {"_id": bson.Binary(migration_opts.migration_id.bytes, 4)})
                if doc is not None:
                    return
            except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
                donor_primary = migration_opts.get_donor_primary()
                self.logger.info(
                    "Retrying waiting for donor primary on port '%d' of replica set '%s' for " +
                    "tenant migration '%s' to reroute at least one conflicting command.",
                    donor_primary.port, migration_opts.get_donor_name(),
                    migration_opts.migration_id)
                continue
            except pymongo.errors.PyMongoError:
                end_time = time.time()
                self.logger.exception(
                    "Error running find command on donor primary on port %d of replica set '%s' " +
                    "after waiting for reroute for %0d ms", donor_primary.port,
                    migration_opts.get_donor_name(), (end_time - start_time) * 1000)
                raise

            time.sleep(self.POLL_INTERVAL_SECS)

    def _drop_tenant_databases(self, rs):
        self.logger.info("Dropping tenant databases from replica set '%s'.", rs.replset_name)

        primary = get_primary(rs, self.logger)
        self.logger.info("Running dropDatabase commands against primary on port %d.", primary.port)

        while True:
            try:
                primary_client = self._create_client(primary)
                res = primary_client.admin.command({"listDatabases": 1})
                for database in res["databases"]:
                    db_name = database["name"]
                    if db_name.startswith(self._tenant_id + "_"):
                        primary_client.drop_database(db_name)
                return
            # We retry on all write concern errors because we assume the only reason waiting for
            # write concern should fail is because of a failover.
            except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError,
                    pymongo.errors.WriteConcernError) as err:
                primary = get_primary(rs, self.logger)
                self.logger.info(
                    "Retrying dropDatabase commands against primary on port %d after error %s.",
                    primary.port, str(err))
                continue
            except pymongo.errors.PyMongoError:
                self.logger.exception(
                    "Error dropping databases for tenant id '%s' on primary on" +
                    " port %d of replica set '%s' to be garbage collection.", self._tenant_id,
                    primary.port, rs.replset_name)
                raise