summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordi Olivares Provencio <jordi.olivares-provencio@mongodb.com>2022-02-25 15:38:13 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-25 17:01:01 +0000
commit8336dbd7c7b7c6b542523c3f0228102574b71014 (patch)
tree9679841660178d1ac8c5db1e3c5662b57ec64786
parent076b1393bae122bca1e18484990fa63ffc95f92a (diff)
downloadmongo-8336dbd7c7b7c6b542523c3f0228102574b71014.tar.gz
SERVER-63956 Refactor TicketHolder to accept multiple implementations
-rw-r--r--src/mongo/db/concurrency/d_concurrency_test.cpp10
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp4
-rw-r--r--src/mongo/dbtests/threadedtests.cpp9
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp47
-rw-r--r--src/mongo/util/concurrency/ticketholder.h57
-rw-r--r--src/mongo/util/concurrency/ticketholder_test.cpp46
6 files changed, 100 insertions, 73 deletions
diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp
index 37c59d08b41..9c1e131fdf6 100644
--- a/src/mongo/db/concurrency/d_concurrency_test.cpp
+++ b/src/mongo/db/concurrency/d_concurrency_test.cpp
@@ -69,19 +69,19 @@ const auto kMaxClockJitterMillis = Milliseconds(0);
*/
class UseGlobalThrottling {
public:
- explicit UseGlobalThrottling(OperationContext* opCtx, int numTickets)
- : _opCtx(opCtx), _holder(numTickets) {
- _opCtx->lockState()->setGlobalThrottling(&_holder, &_holder);
+ explicit UseGlobalThrottling(OperationContext* opCtx, int numTickets) : _opCtx(opCtx) {
+ _holder = std::make_unique<SemaphoreTicketHolder>(numTickets);
+ _opCtx->lockState()->setGlobalThrottling(_holder.get(), _holder.get());
}
~UseGlobalThrottling() noexcept(false) {
// Reset the global setting as we're about to destroy the ticket holder.
_opCtx->lockState()->setGlobalThrottling(nullptr, nullptr);
- ASSERT_EQ(_holder.used(), 0);
+ ASSERT_EQ(_holder->used(), 0);
}
private:
OperationContext* _opCtx;
- TicketHolder _holder;
+ std::unique_ptr<TicketHolder> _holder;
};
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 32fa2099c19..7eb3b74ab72 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -282,8 +282,8 @@ std::string toString(const StorageEngine::OldestActiveTransactionTimestampResult
}
namespace {
-TicketHolder openWriteTransaction(128);
-TicketHolder openReadTransaction(128);
+SemaphoreTicketHolder openWriteTransaction(128);
+SemaphoreTicketHolder openReadTransaction(128);
} // namespace
OpenWriteTransactionParam::OpenWriteTransactionParam(StringData name, ServerParameterType spt)
diff --git a/src/mongo/dbtests/threadedtests.cpp b/src/mongo/dbtests/threadedtests.cpp
index 27cdfc95df7..994682d2eca 100644
--- a/src/mongo/dbtests/threadedtests.cpp
+++ b/src/mongo/dbtests/threadedtests.cpp
@@ -231,7 +231,8 @@ class TicketHolderWaits : public ThreadedTest<10> {
static const int rooms = 3;
public:
- TicketHolderWaits() : _hotel(rooms), _tickets(_hotel._nRooms) {}
+ TicketHolderWaits()
+ : _hotel(rooms), _tickets(std::make_unique<SemaphoreTicketHolder>(_hotel._nRooms)) {}
private:
class Hotel {
@@ -259,15 +260,15 @@ private:
};
Hotel _hotel;
- TicketHolder _tickets;
+ std::unique_ptr<TicketHolder> _tickets;
virtual void subthread(int x) {
string threadName = (str::stream() << "ticketHolder" << x);
Client::initThread(threadName.c_str());
for (int i = 0; i < checkIns; i++) {
- _tickets.waitForTicket();
- TicketHolderReleaser whenDone(&_tickets);
+ _tickets->waitForTicket();
+ TicketHolderReleaser whenDone(_tickets.get());
_hotel.checkIn();
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index aecafa9ee4c..dfe2f2db0a2 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -40,6 +40,8 @@
namespace mongo {
+TicketHolder::~TicketHolder() = default;
+
#if defined(__linux__)
namespace {
@@ -71,15 +73,15 @@ void tsFromDate(const Date_t& deadline, struct timespec& ts) {
}
} // namespace
-TicketHolder::TicketHolder(int num) : _outof(num) {
+SemaphoreTicketHolder::SemaphoreTicketHolder(int num) : _outof(num) {
check(sem_init(&_sem, 0, num));
}
-TicketHolder::~TicketHolder() {
+SemaphoreTicketHolder::~SemaphoreTicketHolder() {
check(sem_destroy(&_sem));
}
-bool TicketHolder::tryAcquire() {
+bool SemaphoreTicketHolder::tryAcquire() {
while (0 != sem_trywait(&_sem)) {
if (errno == EAGAIN)
return false;
@@ -89,11 +91,11 @@ bool TicketHolder::tryAcquire() {
return true;
}
-void TicketHolder::waitForTicket(OperationContext* opCtx) {
+void SemaphoreTicketHolder::waitForTicket(OperationContext* opCtx) {
waitForTicketUntil(opCtx, Date_t::max());
}
-bool TicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) {
+bool SemaphoreTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) {
// Attempt to get a ticket without waiting in order to avoid expensive time calculations.
if (sem_trywait(&_sem) == 0) {
return true;
@@ -129,11 +131,11 @@ bool TicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) {
return true;
}
-void TicketHolder::release() {
+void SemaphoreTicketHolder::release() {
check(sem_post(&_sem));
}
-Status TicketHolder::resize(int newSize) {
+Status SemaphoreTicketHolder::resize(int newSize) {
stdx::lock_guard<Latch> lk(_resizeMutex);
if (newSize < 5)
@@ -151,7 +153,7 @@ Status TicketHolder::resize(int newSize) {
}
while (_outof.load() > newSize) {
- waitForTicket();
+ this->TicketHolder::waitForTicket();
_outof.subtractAndFetch(1);
}
@@ -159,32 +161,32 @@ Status TicketHolder::resize(int newSize) {
return Status::OK();
}
-int TicketHolder::available() const {
+int SemaphoreTicketHolder::available() const {
int val = 0;
check(sem_getvalue(&_sem, &val));
return val;
}
-int TicketHolder::used() const {
+int SemaphoreTicketHolder::used() const {
return outof() - available();
}
-int TicketHolder::outof() const {
+int SemaphoreTicketHolder::outof() const {
return _outof.load();
}
#else
-TicketHolder::TicketHolder(int num) : _outof(num), _num(num) {}
+SemaphoreTicketHolder::SemaphoreTicketHolder(int num) : _outof(num), _num(num) {}
-TicketHolder::~TicketHolder() = default;
+SemaphoreTicketHolder::~SemaphoreTicketHolder() = default;
-bool TicketHolder::tryAcquire() {
+bool SemaphoreTicketHolder::tryAcquire() {
stdx::lock_guard<Latch> lk(_mutex);
return _tryAcquire();
}
-void TicketHolder::waitForTicket(OperationContext* opCtx) {
+void SemaphoreTicketHolder::waitForTicket(OperationContext* opCtx) {
stdx::unique_lock<Latch> lk(_mutex);
if (opCtx) {
@@ -194,7 +196,7 @@ void TicketHolder::waitForTicket(OperationContext* opCtx) {
}
}
-bool TicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) {
+bool SemaphoreTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) {
stdx::unique_lock<Latch> lk(_mutex);
if (opCtx) {
@@ -206,7 +208,7 @@ bool TicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) {
}
}
-void TicketHolder::release() {
+void SemaphoreTicketHolder::release() {
{
stdx::lock_guard<Latch> lk(_mutex);
_num++;
@@ -214,7 +216,7 @@ void TicketHolder::release() {
_newTicket.notify_one();
}
-Status TicketHolder::resize(int newSize) {
+Status SemaphoreTicketHolder::resize(int newSize) {
stdx::lock_guard<Latch> lk(_mutex);
int used = _outof.load() - _num;
@@ -236,19 +238,19 @@ Status TicketHolder::resize(int newSize) {
return Status::OK();
}
-int TicketHolder::available() const {
+int SemaphoreTicketHolder::available() const {
return _num;
}
-int TicketHolder::used() const {
+int SemaphoreTicketHolder::used() const {
return outof() - _num;
}
-int TicketHolder::outof() const {
+int SemaphoreTicketHolder::outof() const {
return _outof.load();
}
-bool TicketHolder::_tryAcquire() {
+bool SemaphoreTicketHolder::_tryAcquire() {
if (_num <= 0) {
if (_num < 0) {
std::cerr << "DISASTER! in TicketHolder" << std::endl;
@@ -259,4 +261,5 @@ bool TicketHolder::_tryAcquire() {
return true;
}
#endif
+
} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index ad7b2e92a29..9849e17c27d 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -42,24 +42,24 @@
namespace mongo {
class TicketHolder {
- TicketHolder(const TicketHolder&) = delete;
- TicketHolder& operator=(const TicketHolder&) = delete;
-
public:
- explicit TicketHolder(int num);
- ~TicketHolder();
+ virtual ~TicketHolder() = 0;
- bool tryAcquire();
+ /**
+ * Attempts to acquire a ticket without blocking.
+ * Returns a boolean indicating whether the operation was successful or not.
+ */
+ virtual bool tryAcquire() = 0;
/**
* Attempts to acquire a ticket. Blocks until a ticket is acquired or the OperationContext
* 'opCtx' is killed, throwing an AssertionException.
* If 'opCtx' is not provided or equal to nullptr, the wait is not interruptible.
*/
- void waitForTicket(OperationContext* opCtx);
+ virtual void waitForTicket(OperationContext* opCtx) = 0;
void waitForTicket() {
- waitForTicket(nullptr);
- }
+ this->waitForTicket(nullptr);
+ };
/**
* Attempts to acquire a ticket within a deadline, 'until'. Returns 'true' if a ticket is
@@ -68,19 +68,42 @@ public:
* proceed.
* If 'opCtx' is not provided or equal to nullptr, the wait is not interruptible.
*/
- bool waitForTicketUntil(OperationContext* opCtx, Date_t until);
+ virtual bool waitForTicketUntil(OperationContext* opCtx, Date_t until) = 0;
bool waitForTicketUntil(Date_t until) {
- return waitForTicketUntil(nullptr, until);
- }
- void release();
+ return this->waitForTicketUntil(nullptr, until);
+ };
+
+ virtual void release() = 0;
+
+ virtual Status resize(int newSize) = 0;
+
+ virtual int available() const = 0;
+
+ virtual int used() const = 0;
+
+ virtual int outof() const = 0;
+};
+
+class SemaphoreTicketHolder final : public TicketHolder {
+public:
+ explicit SemaphoreTicketHolder(int num);
+ ~SemaphoreTicketHolder() override final;
+
+ bool tryAcquire() override final;
+
+ void waitForTicket(OperationContext* opCtx) override final;
+
+ bool waitForTicketUntil(OperationContext* opCtx, Date_t until) override final;
+
+ void release() override final;
- Status resize(int newSize);
+ Status resize(int newSize) override final;
- int available() const;
+ int available() const override final;
- int used() const;
+ int used() const override final;
- int outof() const;
+ int outof() const override final;
private:
#if defined(__linux__)
diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp
index 7e5cf20b415..81072b809a7 100644
--- a/src/mongo/util/concurrency/ticketholder_test.cpp
+++ b/src/mongo/util/concurrency/ticketholder_test.cpp
@@ -39,37 +39,37 @@ namespace {
using namespace mongo;
TEST(TicketholderTest, BasicTimeout) {
- TicketHolder holder(1);
- ASSERT_EQ(holder.used(), 0);
- ASSERT_EQ(holder.available(), 1);
- ASSERT_EQ(holder.outof(), 1);
+ std::unique_ptr<TicketHolder> holder = std::make_unique<SemaphoreTicketHolder>(1);
+ ASSERT_EQ(holder->used(), 0);
+ ASSERT_EQ(holder->available(), 1);
+ ASSERT_EQ(holder->outof(), 1);
{
- ScopedTicket ticket(&holder);
- ASSERT_EQ(holder.used(), 1);
- ASSERT_EQ(holder.available(), 0);
- ASSERT_EQ(holder.outof(), 1);
+ ScopedTicket ticket(holder.get());
+ ASSERT_EQ(holder->used(), 1);
+ ASSERT_EQ(holder->available(), 0);
+ ASSERT_EQ(holder->outof(), 1);
- ASSERT_FALSE(holder.tryAcquire());
- ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now()));
- ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(1)));
- ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(42)));
+ ASSERT_FALSE(holder->tryAcquire());
+ ASSERT_FALSE(holder->waitForTicketUntil(Date_t::now()));
+ ASSERT_FALSE(holder->waitForTicketUntil(Date_t::now() + Milliseconds(1)));
+ ASSERT_FALSE(holder->waitForTicketUntil(Date_t::now() + Milliseconds(42)));
}
- ASSERT_EQ(holder.used(), 0);
- ASSERT_EQ(holder.available(), 1);
- ASSERT_EQ(holder.outof(), 1);
+ ASSERT_EQ(holder->used(), 0);
+ ASSERT_EQ(holder->available(), 1);
+ ASSERT_EQ(holder->outof(), 1);
- ASSERT(holder.waitForTicketUntil(Date_t::now()));
- holder.release();
+ ASSERT(holder->waitForTicketUntil(Date_t::now()));
+ holder->release();
- ASSERT_EQ(holder.used(), 0);
+ ASSERT_EQ(holder->used(), 0);
- ASSERT(holder.waitForTicketUntil(Date_t::now() + Milliseconds(20)));
- ASSERT_EQ(holder.used(), 1);
+ ASSERT(holder->waitForTicketUntil(Date_t::now() + Milliseconds(20)));
+ ASSERT_EQ(holder->used(), 1);
- ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(2)));
- holder.release();
- ASSERT_EQ(holder.used(), 0);
+ ASSERT_FALSE(holder->waitForTicketUntil(Date_t::now() + Milliseconds(2)));
+ holder->release();
+ ASSERT_EQ(holder->used(), 0);
}
} // namespace