diff options
author | Jordi Olivares Provencio <jordi.olivares-provencio@mongodb.com> | 2022-02-25 15:38:13 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-25 17:01:01 +0000 |
commit | 8336dbd7c7b7c6b542523c3f0228102574b71014 (patch) | |
tree | 9679841660178d1ac8c5db1e3c5662b57ec64786 | |
parent | 076b1393bae122bca1e18484990fa63ffc95f92a (diff) | |
download | mongo-8336dbd7c7b7c6b542523c3f0228102574b71014.tar.gz |
SERVER-63956 Refactor TicketHolder to accept multiple implementations
-rw-r--r-- | src/mongo/db/concurrency/d_concurrency_test.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 4 | ||||
-rw-r--r-- | src/mongo/dbtests/threadedtests.cpp | 9 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 47 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 57 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder_test.cpp | 46 |
6 files changed, 100 insertions, 73 deletions
diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp index 37c59d08b41..9c1e131fdf6 100644 --- a/src/mongo/db/concurrency/d_concurrency_test.cpp +++ b/src/mongo/db/concurrency/d_concurrency_test.cpp @@ -69,19 +69,19 @@ const auto kMaxClockJitterMillis = Milliseconds(0); */ class UseGlobalThrottling { public: - explicit UseGlobalThrottling(OperationContext* opCtx, int numTickets) - : _opCtx(opCtx), _holder(numTickets) { - _opCtx->lockState()->setGlobalThrottling(&_holder, &_holder); + explicit UseGlobalThrottling(OperationContext* opCtx, int numTickets) : _opCtx(opCtx) { + _holder = std::make_unique<SemaphoreTicketHolder>(numTickets); + _opCtx->lockState()->setGlobalThrottling(_holder.get(), _holder.get()); } ~UseGlobalThrottling() noexcept(false) { // Reset the global setting as we're about to destroy the ticket holder. _opCtx->lockState()->setGlobalThrottling(nullptr, nullptr); - ASSERT_EQ(_holder.used(), 0); + ASSERT_EQ(_holder->used(), 0); } private: OperationContext* _opCtx; - TicketHolder _holder; + std::unique_ptr<TicketHolder> _holder; }; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 32fa2099c19..7eb3b74ab72 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -282,8 +282,8 @@ std::string toString(const StorageEngine::OldestActiveTransactionTimestampResult } namespace { -TicketHolder openWriteTransaction(128); -TicketHolder openReadTransaction(128); +SemaphoreTicketHolder openWriteTransaction(128); +SemaphoreTicketHolder openReadTransaction(128); } // namespace OpenWriteTransactionParam::OpenWriteTransactionParam(StringData name, ServerParameterType spt) diff --git a/src/mongo/dbtests/threadedtests.cpp b/src/mongo/dbtests/threadedtests.cpp index 27cdfc95df7..994682d2eca 100644 --- a/src/mongo/dbtests/threadedtests.cpp +++ b/src/mongo/dbtests/threadedtests.cpp @@ -231,7 +231,8 @@ class TicketHolderWaits : public ThreadedTest<10> { static const int rooms = 3; public: - TicketHolderWaits() : _hotel(rooms), _tickets(_hotel._nRooms) {} + TicketHolderWaits() + : _hotel(rooms), _tickets(std::make_unique<SemaphoreTicketHolder>(_hotel._nRooms)) {} private: class Hotel { @@ -259,15 +260,15 @@ private: }; Hotel _hotel; - TicketHolder _tickets; + std::unique_ptr<TicketHolder> _tickets; virtual void subthread(int x) { string threadName = (str::stream() << "ticketHolder" << x); Client::initThread(threadName.c_str()); for (int i = 0; i < checkIns; i++) { - _tickets.waitForTicket(); - TicketHolderReleaser whenDone(&_tickets); + _tickets->waitForTicket(); + TicketHolderReleaser whenDone(_tickets.get()); _hotel.checkIn(); diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index aecafa9ee4c..dfe2f2db0a2 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -40,6 +40,8 @@ namespace mongo { +TicketHolder::~TicketHolder() = default; + #if defined(__linux__) namespace { @@ -71,15 +73,15 @@ void tsFromDate(const Date_t& deadline, struct timespec& ts) { } } // namespace -TicketHolder::TicketHolder(int num) : _outof(num) { +SemaphoreTicketHolder::SemaphoreTicketHolder(int num) : _outof(num) { check(sem_init(&_sem, 0, num)); } -TicketHolder::~TicketHolder() { +SemaphoreTicketHolder::~SemaphoreTicketHolder() { check(sem_destroy(&_sem)); } -bool TicketHolder::tryAcquire() { +bool SemaphoreTicketHolder::tryAcquire() { while (0 != sem_trywait(&_sem)) { if (errno == EAGAIN) return false; @@ -89,11 +91,11 @@ bool TicketHolder::tryAcquire() { return true; } -void TicketHolder::waitForTicket(OperationContext* opCtx) { +void SemaphoreTicketHolder::waitForTicket(OperationContext* opCtx) { waitForTicketUntil(opCtx, Date_t::max()); } -bool TicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) { +bool SemaphoreTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) { // Attempt to get a ticket without waiting in order to avoid expensive time calculations. if (sem_trywait(&_sem) == 0) { return true; @@ -129,11 +131,11 @@ bool TicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) { return true; } -void TicketHolder::release() { +void SemaphoreTicketHolder::release() { check(sem_post(&_sem)); } -Status TicketHolder::resize(int newSize) { +Status SemaphoreTicketHolder::resize(int newSize) { stdx::lock_guard<Latch> lk(_resizeMutex); if (newSize < 5) @@ -151,7 +153,7 @@ Status TicketHolder::resize(int newSize) { } while (_outof.load() > newSize) { - waitForTicket(); + this->TicketHolder::waitForTicket(); _outof.subtractAndFetch(1); } @@ -159,32 +161,32 @@ Status TicketHolder::resize(int newSize) { return Status::OK(); } -int TicketHolder::available() const { +int SemaphoreTicketHolder::available() const { int val = 0; check(sem_getvalue(&_sem, &val)); return val; } -int TicketHolder::used() const { +int SemaphoreTicketHolder::used() const { return outof() - available(); } -int TicketHolder::outof() const { +int SemaphoreTicketHolder::outof() const { return _outof.load(); } #else -TicketHolder::TicketHolder(int num) : _outof(num), _num(num) {} +SemaphoreTicketHolder::SemaphoreTicketHolder(int num) : _outof(num), _num(num) {} -TicketHolder::~TicketHolder() = default; +SemaphoreTicketHolder::~SemaphoreTicketHolder() = default; -bool TicketHolder::tryAcquire() { +bool SemaphoreTicketHolder::tryAcquire() { stdx::lock_guard<Latch> lk(_mutex); return _tryAcquire(); } -void TicketHolder::waitForTicket(OperationContext* opCtx) { +void SemaphoreTicketHolder::waitForTicket(OperationContext* opCtx) { stdx::unique_lock<Latch> lk(_mutex); if (opCtx) { @@ -194,7 +196,7 @@ void TicketHolder::waitForTicket(OperationContext* opCtx) { } } -bool TicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) { +bool SemaphoreTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) { stdx::unique_lock<Latch> lk(_mutex); if (opCtx) { @@ -206,7 +208,7 @@ bool TicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) { } } -void TicketHolder::release() { +void SemaphoreTicketHolder::release() { { stdx::lock_guard<Latch> lk(_mutex); _num++; @@ -214,7 +216,7 @@ void TicketHolder::release() { _newTicket.notify_one(); } -Status TicketHolder::resize(int newSize) { +Status SemaphoreTicketHolder::resize(int newSize) { stdx::lock_guard<Latch> lk(_mutex); int used = _outof.load() - _num; @@ -236,19 +238,19 @@ Status TicketHolder::resize(int newSize) { return Status::OK(); } -int TicketHolder::available() const { +int SemaphoreTicketHolder::available() const { return _num; } -int TicketHolder::used() const { +int SemaphoreTicketHolder::used() const { return outof() - _num; } -int TicketHolder::outof() const { +int SemaphoreTicketHolder::outof() const { return _outof.load(); } -bool TicketHolder::_tryAcquire() { +bool SemaphoreTicketHolder::_tryAcquire() { if (_num <= 0) { if (_num < 0) { std::cerr << "DISASTER! in TicketHolder" << std::endl; @@ -259,4 +261,5 @@ bool TicketHolder::_tryAcquire() { return true; } #endif + } // namespace mongo diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index ad7b2e92a29..9849e17c27d 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -42,24 +42,24 @@ namespace mongo { class TicketHolder { - TicketHolder(const TicketHolder&) = delete; - TicketHolder& operator=(const TicketHolder&) = delete; - public: - explicit TicketHolder(int num); - ~TicketHolder(); + virtual ~TicketHolder() = 0; - bool tryAcquire(); + /** + * Attempts to acquire a ticket without blocking. + * Returns a boolean indicating whether the operation was successful or not. + */ + virtual bool tryAcquire() = 0; /** * Attempts to acquire a ticket. Blocks until a ticket is acquired or the OperationContext * 'opCtx' is killed, throwing an AssertionException. * If 'opCtx' is not provided or equal to nullptr, the wait is not interruptible. */ - void waitForTicket(OperationContext* opCtx); + virtual void waitForTicket(OperationContext* opCtx) = 0; void waitForTicket() { - waitForTicket(nullptr); - } + this->waitForTicket(nullptr); + }; /** * Attempts to acquire a ticket within a deadline, 'until'. Returns 'true' if a ticket is @@ -68,19 +68,42 @@ public: * proceed. * If 'opCtx' is not provided or equal to nullptr, the wait is not interruptible. */ - bool waitForTicketUntil(OperationContext* opCtx, Date_t until); + virtual bool waitForTicketUntil(OperationContext* opCtx, Date_t until) = 0; bool waitForTicketUntil(Date_t until) { - return waitForTicketUntil(nullptr, until); - } - void release(); + return this->waitForTicketUntil(nullptr, until); + }; + + virtual void release() = 0; + + virtual Status resize(int newSize) = 0; + + virtual int available() const = 0; + + virtual int used() const = 0; + + virtual int outof() const = 0; +}; + +class SemaphoreTicketHolder final : public TicketHolder { +public: + explicit SemaphoreTicketHolder(int num); + ~SemaphoreTicketHolder() override final; + + bool tryAcquire() override final; + + void waitForTicket(OperationContext* opCtx) override final; + + bool waitForTicketUntil(OperationContext* opCtx, Date_t until) override final; + + void release() override final; - Status resize(int newSize); + Status resize(int newSize) override final; - int available() const; + int available() const override final; - int used() const; + int used() const override final; - int outof() const; + int outof() const override final; private: #if defined(__linux__) diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp index 7e5cf20b415..81072b809a7 100644 --- a/src/mongo/util/concurrency/ticketholder_test.cpp +++ b/src/mongo/util/concurrency/ticketholder_test.cpp @@ -39,37 +39,37 @@ namespace { using namespace mongo; TEST(TicketholderTest, BasicTimeout) { - TicketHolder holder(1); - ASSERT_EQ(holder.used(), 0); - ASSERT_EQ(holder.available(), 1); - ASSERT_EQ(holder.outof(), 1); + std::unique_ptr<TicketHolder> holder = std::make_unique<SemaphoreTicketHolder>(1); + ASSERT_EQ(holder->used(), 0); + ASSERT_EQ(holder->available(), 1); + ASSERT_EQ(holder->outof(), 1); { - ScopedTicket ticket(&holder); - ASSERT_EQ(holder.used(), 1); - ASSERT_EQ(holder.available(), 0); - ASSERT_EQ(holder.outof(), 1); + ScopedTicket ticket(holder.get()); + ASSERT_EQ(holder->used(), 1); + ASSERT_EQ(holder->available(), 0); + ASSERT_EQ(holder->outof(), 1); - ASSERT_FALSE(holder.tryAcquire()); - ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now())); - ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(1))); - ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(42))); + ASSERT_FALSE(holder->tryAcquire()); + ASSERT_FALSE(holder->waitForTicketUntil(Date_t::now())); + ASSERT_FALSE(holder->waitForTicketUntil(Date_t::now() + Milliseconds(1))); + ASSERT_FALSE(holder->waitForTicketUntil(Date_t::now() + Milliseconds(42))); } - ASSERT_EQ(holder.used(), 0); - ASSERT_EQ(holder.available(), 1); - ASSERT_EQ(holder.outof(), 1); + ASSERT_EQ(holder->used(), 0); + ASSERT_EQ(holder->available(), 1); + ASSERT_EQ(holder->outof(), 1); - ASSERT(holder.waitForTicketUntil(Date_t::now())); - holder.release(); + ASSERT(holder->waitForTicketUntil(Date_t::now())); + holder->release(); - ASSERT_EQ(holder.used(), 0); + ASSERT_EQ(holder->used(), 0); - ASSERT(holder.waitForTicketUntil(Date_t::now() + Milliseconds(20))); - ASSERT_EQ(holder.used(), 1); + ASSERT(holder->waitForTicketUntil(Date_t::now() + Milliseconds(20))); + ASSERT_EQ(holder->used(), 1); - ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(2))); - holder.release(); - ASSERT_EQ(holder.used(), 0); + ASSERT_FALSE(holder->waitForTicketUntil(Date_t::now() + Milliseconds(2))); + holder->release(); + ASSERT_EQ(holder->used(), 0); } } // namespace |