summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/testing/fixtures/replicator.py
blob: eb446fa704dbc4c2a2ee4bf1ac51dc3df72f3c63 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""Standalone replicator fixture to handle launching and stopping a replicator binary."""

import json
import time
from urllib import request

import buildscripts.resmokelib.testing.fixtures.interface as interface


class ReplicatorFixture(interface.Fixture):
    """Fixture which spins up a single replicator."""

    REGISTERED_NAME = "ReplicatorFixture"

    def __init__(  # pylint: disable=too-many-arguments
            self, logger, job_num, fixturelib, executable, quiesce_period=5, cli_options=None):
        """Initialize ReplicatorFixture with different options for the replicator process."""
        interface.Fixture.__init__(self, logger, job_num, fixturelib)

        self.executable = executable
        self.quiesce_period = quiesce_period
        self.cli_options = self.fixturelib.default_if_none(cli_options, {})
        self.port = self.fixturelib.get_next_port(job_num)
        self.set_cli_options({"port": self.port})

        # The running replicator process.
        self.replicator = None

    def setup(self):
        """Since launching the binary starts the replication, we do nothing here."""
        self._launch_replicator_process()

    def pids(self):
        """:return: pids owned by this fixture if any."""
        out = [x.pid for x in [self.replicator] if x is not None]
        if not out:
            self.logger.debug('Replicator not running when gathering replicator fixture pid.')
        return out

    def await_ready(self):
        """
        Block until the fixture can be used for testing.

        NOOP by default since on `setup` nothing is done.
        """
        pass

    def _launch_replicator_process(self):
        """Launch the replicator binary."""
        if "sourceURI" not in self.cli_options or "destinationURI" not in self.cli_options:
            raise ValueError("Cannot launch the replicator without source and destination URIs.")

        replicator = self.fixturelib.generic_program(self.logger, [self.executable], self.job_num,
                                                     test_id=None, process_kwargs=None,
                                                     **self.cli_options)
        try:
            self.logger.info("Launch replicator webserver...\n%s", replicator.as_command())
            replicator.start()
            self.logger.info("Replicator launched with pid %d on port %d.", replicator.pid,
                             self.port)
        except Exception as err:
            msg = "Failed to launch replicator: {}".format(err)
            self.logger.exception(msg)
            raise self.fixturelib.ServerFailure(msg)

        self.replicator = replicator

    def start(self):
        """Start the replication process by sending the replicator a command."""
        url = self.get_api_url() + '/api/v1/start'
        # Right now we set reversible to false, at some point this could be an
        # argument to start.
        data = '{"reversible": false}'.encode('ascii')
        headers = {'Content-Type': 'application/json'}

        req = request.Request(url=url, data=data, headers=headers)
        self.logger.info("Sending start command to replicator: %s", req.data)
        response = json.loads(request.urlopen(req).read().decode('ascii'))
        self.logger.info("Replicator start command response was: %s", response)

        if not response["success"]:
            msg = f"Replicator failed to start: {response}"
            self.logger.exception(msg)
            raise self.fixturelib.ServerFailure(msg)

    def stop(self, mode=None):
        """Stop the replicator binary."""
        self.logger.info("Sleeping for %d s to allow replicator to finish up.", self.quiesce_period)
        time.sleep(self.quiesce_period)
        self.logger.info("Done sleeping through quiesce period.")

        mode = interface.TeardownMode.TERMINATE if mode is None else mode

        self.logger.info("Stopping replicator with pid %d...", self.replicator.pid)
        if not self._is_process_running():
            exit_code = self.replicator.poll()
            msg = ("Replicator was expected to be running, but wasn't. "
                   "Process exited with code {:d}.").format(exit_code)
            self.logger.warning(msg)
            raise self.fixturelib.ServerFailure(msg)

        self.replicator.stop(mode)
        exit_code = self.replicator.wait()
        # TODO (SERVER-63544): Check to make sure the error code is correct.
        self.logger.info("Process exited with error code {:d}.".format(exit_code))

    def resume(self):
        """NOOP."""
        pass

    def pause(self):
        """NOOP."""
        pass

    def _do_teardown(self, mode=None):
        """Teardown the fixture."""
        if not self._is_process_running():
            self.logger.info("Replicator already stopped; teardown is a NOOP.")
            return

        self.logger.warning("The replicator had not been stopped at the time of teardown.")
        self.stop(mode)

    def _is_process_running(self):
        """Return true if the replicator binary is running as a process."""
        return self.replicator is not None and self.replicator.poll() is None

    def is_running(self):
        """Return if the fixture is running or has not errorred."""
        return self._is_process_running()

    def get_internal_connection_string(self):
        """Return the internal connection string."""
        raise NotImplementedError("Replicator cannot have an internal connection string.")

    def get_driver_connection_url(self):
        """Return the driver connection URL."""
        raise NotImplementedError("Replicator cannot have a driver connection URL.")

    def get_api_url(self):
        """Return the URL used to send the replicator commands."""
        return f'http://localhost:{self.port}'

    def get_node_info(self):
        """Return a list of NodeInfo objects."""
        info = interface.NodeInfo(full_name=self.logger.full_name, name=self.logger.name,
                                  port=self.port,
                                  pid=self.replicator.pid if self.replicator is not None else -1)
        return [info]

    def set_cli_options(self, cli_options):
        """Set command line options."""
        for option, value in cli_options.items():
            self.cli_options[option] = value