diff options
-rw-r--r-- | jstests/noPassthrough/resize_tickets.js | 27 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state.h | 2 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticket.h | 8 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 17 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 4 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder_test.cpp | 29 |
7 files changed, 70 insertions, 22 deletions
diff --git a/jstests/noPassthrough/resize_tickets.js b/jstests/noPassthrough/resize_tickets.js new file mode 100644 index 00000000000..8b35f13ef12 --- /dev/null +++ b/jstests/noPassthrough/resize_tickets.js @@ -0,0 +1,27 @@ +/** + * Tests that tickets can be resized during runtime. This test exercises both increase and decrease + * of tickets. + */ +(function() { +'use strict'; + +var replTest = new ReplSetTest({name: "test_ticket_resize", nodes: 1}); +replTest.startSet(); +replTest.initiate(); +var mongod = replTest.getPrimary(); +// The 20, 10, 30 sequence of ticket resizes are just arbitrary numbers in order to test a decrease +// (20 -> 10) and an increase (10 -> 30) of tickets. +assert.commandWorked( + mongod.adminCommand({setParameter: 1, wiredTigerConcurrentWriteTransactions: 20})); +assert.commandWorked( + mongod.adminCommand({setParameter: 1, wiredTigerConcurrentWriteTransactions: 10})); +assert.commandWorked( + mongod.adminCommand({setParameter: 1, wiredTigerConcurrentWriteTransactions: 30})); +assert.commandWorked( + mongod.adminCommand({setParameter: 1, wiredTigerConcurrentReadTransactions: 20})); +assert.commandWorked( + mongod.adminCommand({setParameter: 1, wiredTigerConcurrentReadTransactions: 10})); +assert.commandWorked( + mongod.adminCommand({setParameter: 1, wiredTigerConcurrentReadTransactions: 30})); +replTest.stopSet(); +}()); diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index bd88d4f3d4a..e53c23a09f9 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -312,7 +312,7 @@ LockerImpl::~LockerImpl() { // to delete with unaccounted locks anyways. invariant(!inAWriteUnitOfWork()); invariant(_numResourcesToUnlockAtEndUnitOfWork == 0); - invariant(!_ticket.valid()); + invariant(!_ticket || !_ticket->valid()); if (!_requests.empty()) { _dumpLockerAndLockManagerRequests(); @@ -1077,8 +1077,9 @@ void LockerImpl::_releaseTicket() { auto& ticketHolders = ticketHoldersDecoration(getGlobalServiceContext()); auto holder = shouldAcquireTicket() ? ticketHolders.getTicketHolder(_modeForTicket) : nullptr; if (holder) { - holder->release(&_admCtx, std::move(_ticket)); + holder->release(&_admCtx, std::move(*_ticket)); } + _ticket.reset(); _clientState.store(kInactive); } diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h index d44ba681756..31096da3cf8 100644 --- a/src/mongo/db/concurrency/lock_state.h +++ b/src/mongo/db/concurrency/lock_state.h @@ -382,7 +382,7 @@ private: AdmissionContext _admCtx; // This will only be valid when holding a ticket. - Ticket _ticket; + boost::optional<Ticket> _ticket; // Tracks the global lock modes ever acquired in this Locker's life. This value should only ever // be accessed from the thread that owns the Locker. diff --git a/src/mongo/util/concurrency/ticket.h b/src/mongo/util/concurrency/ticket.h index de4d7d78016..3f10db3e6a1 100644 --- a/src/mongo/util/concurrency/ticket.h +++ b/src/mongo/util/concurrency/ticket.h @@ -71,14 +71,8 @@ public: return _valid; } - Ticket() : _valid(false) {} - private: - static Ticket makeValid() { - Ticket ticket; - ticket._valid = true; - return ticket; - } + Ticket() : _valid(true) {} void release() { invariant(_valid); diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index 6c4ee52bf5f..aecf15065a6 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -91,7 +91,7 @@ boost::optional<Ticket> SemaphoreTicketHolder::tryAcquire(AdmissionContext* admC if (errno != EINTR) failWithErrno(errno); } - return boost::make_optional(Ticket::makeValid()); + return Ticket{}; } Ticket SemaphoreTicketHolder::waitForTicket(OperationContext* opCtx, @@ -106,11 +106,10 @@ boost::optional<Ticket> SemaphoreTicketHolder::waitForTicketUntil(OperationConte AdmissionContext* admCtx, Date_t until, WaitMode waitMode) { - invariant(opCtx); // Attempt to get a ticket without waiting in order to avoid expensive time calculations. if (sem_trywait(&_sem) == 0) { - return boost::make_optional(Ticket::makeValid()); + return Ticket{}; } const Milliseconds intervalMs(500); @@ -140,7 +139,7 @@ boost::optional<Ticket> SemaphoreTicketHolder::waitForTicketUntil(OperationConte if (waitMode == WaitMode::kInterruptible) opCtx->checkForInterrupt(); } - return boost::make_optional(Ticket::makeValid()); + return Ticket{}; } void SemaphoreTicketHolder::release(AdmissionContext* admCtx, Ticket&& ticket) { @@ -169,6 +168,7 @@ Status SemaphoreTicketHolder::resize(int newSize) { while (_outof.load() > newSize) { auto ticket = waitForTicket(nullptr, &admCtx, WaitMode::kUninterruptible); + ticket.release(); _outof.subtractAndFetch(1); } @@ -380,7 +380,7 @@ boost::optional<Ticket> FifoTicketHolder::tryAcquire(AdmissionContext* admCtx) { } admCtx->start(_serviceContext->getTickSource()); _totalStartedProcessing.fetchAndAddRelaxed(1); - return boost::make_optional(Ticket::makeValid()); + return Ticket{}; } Ticket FifoTicketHolder::waitForTicket(OperationContext* opCtx, @@ -436,7 +436,7 @@ boost::optional<Ticket> FifoTicketHolder::waitForTicketUntil(OperationContext* o if (remaining >= 0) { _enqueuedElements.subtractAndFetch(1); startProcessing(); - return boost::make_optional(Ticket::makeValid()); + return Ticket{}; } _ticketsAvailable.addAndFetch(1); // We copy-construct the shared_ptr here as the waiting element needs to be alive in both @@ -458,7 +458,7 @@ boost::optional<Ticket> FifoTicketHolder::waitForTicketUntil(OperationContext* o // To cover the edge case of getting a ticket assigned before cancelling the ticket // request. As we have been granted a ticket we must release it. startProcessing(); - release(admCtx, Ticket()); + release(admCtx, Ticket{}); } }); @@ -485,7 +485,7 @@ boost::optional<Ticket> FifoTicketHolder::waitForTicketUntil(OperationContext* o if (assigned) { cancelWait.dismiss(); startProcessing(); - return boost::make_optional(Ticket::makeValid()); + return Ticket{}; } else { return boost::none; } @@ -508,6 +508,7 @@ Status FifoTicketHolder::resize(int newSize) { while (_capacity.load() > newSize) { Ticket ticket = waitForTicket(nullptr, &admCtx, WaitMode::kUninterruptible); + ticket.release(); _capacity.subtractAndFetch(1); } diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index d24530ffac6..a03518f908b 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -227,10 +227,6 @@ class TicketHolderReleaser { TicketHolderReleaser& operator=(const TicketHolderReleaser&) = delete; public: - TicketHolderReleaser() { - _holder = nullptr; - } - explicit TicketHolderReleaser(Ticket&& ticket, AdmissionContext* admCtx, TicketHolder* holder) : _holder(holder), _ticket(std::move(ticket)), _admCtx(admCtx) {} diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp index d60a39d02dc..5f0fed6769b 100644 --- a/src/mongo/util/concurrency/ticketholder_test.cpp +++ b/src/mongo/util/concurrency/ticketholder_test.cpp @@ -100,6 +100,35 @@ void basicTimeout(OperationContext* opCtx) { ASSERT_FALSE(holder->waitForTicketUntil(opCtx, &admCtx, Date_t::now() + Milliseconds(2), mode)); holder->release(&admCtx, std::move(*ticket)); ASSERT_EQ(holder->used(), 0); + + // + // Test resize + // + ASSERT(holder->resize(6).isOK()); + ticket = holder->waitForTicket(opCtx, &admCtx, mode); + ASSERT(ticket); + ASSERT_EQ(holder->used(), 1); + ASSERT_EQ(holder->outof(), 6); + + std::array<boost::optional<Ticket>, 5> tickets; + for (int i = 0; i < 5; ++i) { + tickets[i] = holder->waitForTicket(opCtx, &admCtx, mode); + ASSERT_EQ(holder->used(), 2 + i); + ASSERT_EQ(holder->outof(), 6); + } + + ASSERT_FALSE(holder->waitForTicketUntil(opCtx, &admCtx, Date_t::now() + Milliseconds(1), mode)); + + holder->release(&admCtx, std::move(*ticket)); + + ASSERT(holder->resize(5).isOK()); + ASSERT_EQ(holder->used(), 5); + ASSERT_EQ(holder->outof(), 5); + ASSERT_FALSE(holder->waitForTicketUntil(opCtx, &admCtx, Date_t::now() + Milliseconds(1), mode)); + + for (int i = 0; i < 5; ++i) { + holder->release(&admCtx, std::move(*tickets[i])); + } } TEST_F(TicketHolderTest, BasicTimeoutFifo) { |