From 2caa2dda6fff016ef928bd5d6493038f93b97bb8 Mon Sep 17 00:00:00 2001 From: kauboy26 Date: Tue, 15 Mar 2022 18:25:07 +0000 Subject: SERVER-64188 Change fixture to send START POST command to mongosync. --- .../testing/fixtures/cluster_to_cluster.py | 7 ++++ .../resmokelib/testing/fixtures/replicator.py | 47 +++++++++++++++------- .../hooks/cluster_to_cluster_replication.py | 32 +++++---------- 3 files changed, 49 insertions(+), 37 deletions(-) diff --git a/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py b/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py index 90d28687c03..14671e069b6 100644 --- a/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py +++ b/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py @@ -78,6 +78,13 @@ class ClusterToClusterFixture(interface.MultiClusterFixture): # pylint: disable for i, cluster in enumerate(self.clusters): self.logger.info(f"Setting up cluster {i}.") cluster.setup() + + source_url = self.clusters[self.source_cluster_index].get_driver_connection_url() + dest_url = self.clusters[1 - self.source_cluster_index].get_driver_connection_url() + self.logger.info("Setting source cluster string: '%s', destination cluster string: '%s'", + source_url, dest_url) + self.replicator.set_cli_options({'sourceURI': source_url, 'destinationURI': dest_url}) + self.replicator.setup() def pids(self): diff --git a/buildscripts/resmokelib/testing/fixtures/replicator.py b/buildscripts/resmokelib/testing/fixtures/replicator.py index 5bafaa6f4e3..eb446fa704d 100644 --- a/buildscripts/resmokelib/testing/fixtures/replicator.py +++ b/buildscripts/resmokelib/testing/fixtures/replicator.py @@ -1,6 +1,8 @@ """Standalone replicator fixture to handle launching and stopping a replicator binary.""" +import json import time +from urllib import request import buildscripts.resmokelib.testing.fixtures.interface as interface @@ -24,13 +26,9 @@ class ReplicatorFixture(interface.Fixture): # The running replicator process. self.replicator = None - # This denotes whether the fixture itself is running, i.e. if it hasn't errored and hasn't - # been torn down. The replicator process itself may have been stopped. - self.fixture_is_running = False - def setup(self): """Since launching the binary starts the replication, we do nothing here.""" - self.fixture_is_running = True + self._launch_replicator_process() def pids(self): """:return: pids owned by this fixture if any.""" @@ -47,8 +45,8 @@ class ReplicatorFixture(interface.Fixture): """ pass - def start(self): - """Launch the binary and start the replication process.""" + def _launch_replicator_process(self): + """Launch the replicator binary.""" if "sourceURI" not in self.cli_options or "destinationURI" not in self.cli_options: raise ValueError("Cannot launch the replicator without source and destination URIs.") @@ -56,17 +54,35 @@ class ReplicatorFixture(interface.Fixture): test_id=None, process_kwargs=None, **self.cli_options) try: - self.logger.info("Starting replicator...\n%s", replicator.as_command()) + self.logger.info("Launch replicator webserver...\n%s", replicator.as_command()) replicator.start() - self.logger.info("Replicator started with pid %d.", replicator.pid) + self.logger.info("Replicator launched with pid %d on port %d.", replicator.pid, + self.port) except Exception as err: - msg = "Failed to start replicator: {}".format(err) + msg = "Failed to launch replicator: {}".format(err) self.logger.exception(msg) - self.fixture_is_running = False raise self.fixturelib.ServerFailure(msg) self.replicator = replicator + def start(self): + """Start the replication process by sending the replicator a command.""" + url = self.get_api_url() + '/api/v1/start' + # Right now we set reversible to false, at some point this could be an + # argument to start. + data = '{"reversible": false}'.encode('ascii') + headers = {'Content-Type': 'application/json'} + + req = request.Request(url=url, data=data, headers=headers) + self.logger.info("Sending start command to replicator: %s", req.data) + response = json.loads(request.urlopen(req).read().decode('ascii')) + self.logger.info("Replicator start command response was: %s", response) + + if not response["success"]: + msg = f"Replicator failed to start: {response}" + self.logger.exception(msg) + raise self.fixturelib.ServerFailure(msg) + def stop(self, mode=None): """Stop the replicator binary.""" self.logger.info("Sleeping for %d s to allow replicator to finish up.", self.quiesce_period) @@ -81,7 +97,6 @@ class ReplicatorFixture(interface.Fixture): msg = ("Replicator was expected to be running, but wasn't. " "Process exited with code {:d}.").format(exit_code) self.logger.warning(msg) - self.fixture_is_running = False raise self.fixturelib.ServerFailure(msg) self.replicator.stop(mode) @@ -99,8 +114,6 @@ class ReplicatorFixture(interface.Fixture): def _do_teardown(self, mode=None): """Teardown the fixture.""" - self.fixture_is_running = False - if not self._is_process_running(): self.logger.info("Replicator already stopped; teardown is a NOOP.") return @@ -114,7 +127,7 @@ class ReplicatorFixture(interface.Fixture): def is_running(self): """Return if the fixture is running or has not errorred.""" - return self.fixture_is_running + return self._is_process_running() def get_internal_connection_string(self): """Return the internal connection string.""" @@ -124,6 +137,10 @@ class ReplicatorFixture(interface.Fixture): """Return the driver connection URL.""" raise NotImplementedError("Replicator cannot have a driver connection URL.") + def get_api_url(self): + """Return the URL used to send the replicator commands.""" + return f'http://localhost:{self.port}' + def get_node_info(self): """Return a list of NodeInfo objects.""" info = interface.NodeInfo(full_name=self.logger.full_name, name=self.logger.name, diff --git a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_replication.py b/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_replication.py index d4564cc9d03..246ba3842ea 100644 --- a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_replication.py +++ b/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_replication.py @@ -48,9 +48,6 @@ class ClusterToClusterReplication(interface.Hook): # pylint: disable=too-many-i self._replicator_start_delay = replicator_start_delay random.seed(config.RANDOM_SEED) - self._source_cluster = None - self._destination_cluster = None - # The last test executed so far. self._last_test = None @@ -61,24 +58,6 @@ class ClusterToClusterReplication(interface.Hook): # pylint: disable=too-many-i if not self._fixture: raise ValueError("No ClusterToClusterFixture to run migrations on") - self.logger.info("Setting up cluster to cluster test data.") - - # Set up the initial replication direction. - clusters = self._fixture.get_independent_clusters() - self._source_cluster = clusters[self._fixture.source_cluster_index] - self._destination_cluster = clusters[1 - self._fixture.source_cluster_index] - - source_url = self._source_cluster.get_driver_connection_url() - dest_url = self._destination_cluster.get_driver_connection_url() - self.logger.info("Setting source cluster string: '%s', destination cluster string: '%s'", - source_url, dest_url) - - # The TestData needs to be set to allow the data consistency hooks to run correctly. - self._shell_options["global_vars"]["TestData"]["sourceConnectionString"] = source_url - self._shell_options["global_vars"]["TestData"]["destinationConnectionString"] = dest_url - - self._replicator.set_cli_options({'sourceURI': source_url, 'destinationURI': dest_url}) - if self._replicator_start_delay is None: if math.isinf(self._tests_per_cycle): self._replicator_start_delay = random.randint(0, 10) @@ -120,8 +99,17 @@ class ClusterToClusterReplication(interface.Hook): # pylint: disable=too-many-i def _run_data_consistency_check(self, test, test_report): """Run the data consistency check across both clusters.""" + # The TestData needs to be set to allow the data consistency hooks to run correctly. + clusters = self._fixture.get_independent_clusters() + source_url = clusters[self._fixture.source_cluster_index].get_driver_connection_url() + dest_url = clusters[1 - self._fixture.source_cluster_index].get_driver_connection_url() + + shell_options = copy.deepcopy(self._shell_options) + shell_options["global_vars"]["TestData"]["sourceConnectionString"] = source_url + shell_options["global_vars"]["TestData"]["destinationConnectionString"] = dest_url + data_consistency = cluster_to_cluster_data_consistency.CheckClusterToClusterDataConsistency( - self.logger, self._fixture, self._shell_options) + self.logger, self._fixture, shell_options) data_consistency.before_suite(test_report) data_consistency.before_test(test, test_report) data_consistency.after_test(test, test_report) -- cgit v1.2.1