/** * Copyright (C) 2018 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/unittest/unittest.h" #include "mongo/util/periodic_runner_impl.h" #include "mongo/db/service_context_test_fixture.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/util/clock_source_mock.h" namespace mongo { class Client; namespace { class PeriodicRunnerImplTestNoSetup : public ServiceContextTest { public: void setUp() override { _clockSource = std::make_unique(); _runner = stdx::make_unique(getServiceContext(), _clockSource.get()); } void tearDown() override { _runner->shutdown(); } ClockSourceMock& clockSource() { return *_clockSource; } PeriodicRunner& runner() { return *_runner; } private: std::unique_ptr _clockSource; std::unique_ptr _runner; }; class PeriodicRunnerImplTest : public PeriodicRunnerImplTestNoSetup { public: void setUp() override { PeriodicRunnerImplTestNoSetup::setUp(); runner().startup(); } }; TEST_F(PeriodicRunnerImplTest, OneJobTest) { int count = 0; Milliseconds interval{5}; stdx::mutex mutex; stdx::condition_variable cv; // Add a job, ensure that it runs once PeriodicRunner::PeriodicJob job("job", [&count, &mutex, &cv](Client*) { { stdx::unique_lock lk(mutex); count++; } cv.notify_all(); }, interval); runner().scheduleJob(std::move(job)); // Fast forward ten times, we should run all ten times. for (int i = 0; i < 10; i++) { clockSource().advance(interval); { stdx::unique_lock lk(mutex); cv.wait(lk, [&count, &i] { return count > i; }); } } tearDown(); } TEST_F(PeriodicRunnerImplTest, OnePausableJobDoesNotRunWithoutStart) { int count = 0; Milliseconds interval{5}; stdx::mutex mutex; stdx::condition_variable cv; // Add a job, ensure that it runs once PeriodicRunner::PeriodicJob job("job", [&count, &mutex, &cv](Client*) { { stdx::unique_lock lk(mutex); count++; } cv.notify_all(); }, interval); auto handle = runner().makeJob(std::move(job)); clockSource().advance(interval); ASSERT_EQ(count, 0); tearDown(); } TEST_F(PeriodicRunnerImplTest, OnePausableJobRunsCorrectlyWithStart) { int count = 0; Milliseconds interval{5}; stdx::mutex mutex; stdx::condition_variable cv; // Add a job, ensure that it runs once PeriodicRunner::PeriodicJob job("job", [&count, &mutex, &cv](Client*) { { stdx::unique_lock lk(mutex); count++; } cv.notify_all(); }, interval); auto handle = runner().makeJob(std::move(job)); handle->start(); // Fast forward ten times, we should run all ten times. for (int i = 0; i < 10; i++) { clockSource().advance(interval); { stdx::unique_lock lk(mutex); cv.wait(lk, [&count, &i] { return count > i; }); } } tearDown(); } TEST_F(PeriodicRunnerImplTest, OnePausableJobPausesCorrectly) { int count = 0; Milliseconds interval{5}; stdx::mutex mutex; stdx::condition_variable cv; // Add a job, ensure that it runs once PeriodicRunner::PeriodicJob job("job", [&count, &mutex, &cv](Client*) { { stdx::unique_lock lk(mutex); count++; } cv.notify_all(); }, interval); auto handle = runner().makeJob(std::move(job)); handle->start(); // Fast forward ten times, we should run all ten times. for (int i = 0; i < 10; i++) { clockSource().advance(interval); { stdx::unique_lock lk(mutex); cv.wait(lk, [&count, &i] { return count > i; }); } } auto numExecutionsBeforePause = count; handle->pause(); // Fast forward ten times, we shouldn't run anymore for (int i = 0; i < 10; i++) { clockSource().advance(interval); } ASSERT_TRUE(count == numExecutionsBeforePause || count == numExecutionsBeforePause + 1) << "Actual values: count: " << count << ", numExecutionsBeforePause: " << numExecutionsBeforePause; tearDown(); } TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) { int count = 0; int numFastForwardsForIterationWhileActive = 10; Milliseconds interval{5}; stdx::mutex mutex; stdx::condition_variable cv; // Add a job, ensure that it runs once PeriodicRunner::PeriodicJob job("job", [&count, &mutex, &cv](Client*) { { stdx::unique_lock lk(mutex); count++; } cv.notify_all(); }, interval); auto handle = runner().makeJob(std::move(job)); handle->start(); // Fast forward ten times, we should run all ten times. for (int i = 0; i < numFastForwardsForIterationWhileActive; i++) { clockSource().advance(interval); { stdx::unique_lock lk(mutex); cv.wait(lk, [&count, &i] { return count > i; }); } } auto countBeforePause = count; ASSERT_TRUE(countBeforePause == numFastForwardsForIterationWhileActive || countBeforePause == numFastForwardsForIterationWhileActive + 1) << "Actual values: countBeforePause: " << countBeforePause << ", numFastForwardsForIterationWhileActive: " << numFastForwardsForIterationWhileActive; handle->pause(); // Fast forward ten times, we shouldn't run anymore for (int i = 0; i < 10; i++) { clockSource().advance(interval); } handle->resume(); // Fast forward ten times, we should run all ten times. for (int i = 0; i < numFastForwardsForIterationWhileActive; i++) { clockSource().advance(interval); { stdx::unique_lock lk(mutex); cv.wait(lk, [&count, &countBeforePause, &i] { return count > countBeforePause + i; }); } } // This is slightly racy so once in a while count will be one extra ASSERT_TRUE(count == numFastForwardsForIterationWhileActive * 2 || count == numFastForwardsForIterationWhileActive * 2 + 1) << "Actual values: count: " << count << ", numFastForwardsForIterationWhileActive: " << numFastForwardsForIterationWhileActive; tearDown(); } TEST_F(PeriodicRunnerImplTestNoSetup, ScheduleBeforeStartupTest) { int count = 0; Milliseconds interval{5}; stdx::mutex mutex; stdx::condition_variable cv; // Schedule a job before startup PeriodicRunner::PeriodicJob job("job", [&count, &mutex, &cv](Client*) { { stdx::unique_lock lk(mutex); count++; } cv.notify_all(); }, interval); runner().scheduleJob(std::move(job)); // Start the runner, job should still run runner().startup(); clockSource().advance(interval); { stdx::unique_lock lk(mutex); cv.wait(lk, [&count] { return count > 0; }); } tearDown(); } TEST_F(PeriodicRunnerImplTest, TwoJobsTest) { int countA = 0; int countB = 0; Milliseconds intervalA{5}; Milliseconds intervalB{10}; stdx::mutex mutex; stdx::condition_variable cv; // Add two jobs, ensure they both run the proper number of times PeriodicRunner::PeriodicJob jobA("job", [&countA, &mutex, &cv](Client*) { { stdx::unique_lock lk(mutex); countA++; } cv.notify_all(); }, intervalA); PeriodicRunner::PeriodicJob jobB("job", [&countB, &mutex, &cv](Client*) { { stdx::unique_lock lk(mutex); countB++; } cv.notify_all(); }, intervalB); runner().scheduleJob(std::move(jobA)); runner().scheduleJob(std::move(jobB)); // Fast forward and wait for both jobs to run the right number of times for (int i = 0; i <= 10; i++) { clockSource().advance(intervalA); { stdx::unique_lock lk(mutex); cv.wait(lk, [&countA, &countB, &i] { return (countA > i && countB >= i / 2); }); } } tearDown(); } TEST_F(PeriodicRunnerImplTest, TwoJobsDontDeadlock) { stdx::mutex mutex; stdx::condition_variable cv; stdx::condition_variable doneCv; bool a = false; bool b = false; PeriodicRunner::PeriodicJob jobA("job", [&](Client*) { stdx::unique_lock lk(mutex); a = true; cv.notify_one(); cv.wait(lk, [&] { return b; }); doneCv.notify_one(); }, Milliseconds(1)); PeriodicRunner::PeriodicJob jobB("job", [&](Client*) { stdx::unique_lock lk(mutex); b = true; cv.notify_one(); cv.wait(lk, [&] { return a; }); doneCv.notify_one(); }, Milliseconds(1)); runner().scheduleJob(std::move(jobA)); runner().scheduleJob(std::move(jobB)); clockSource().advance(Milliseconds(1)); { stdx::unique_lock lk(mutex); doneCv.wait(lk, [&] { return a && b; }); ASSERT(a); ASSERT(b); } tearDown(); } } // namespace } // namespace mongo