diff options
author | Eliot Horowitz <eliot@10gen.com> | 2015-01-27 18:52:48 -0500 |
---|---|---|
committer | Dan Pasette <dan@mongodb.com> | 2015-01-27 23:44:14 -0500 |
commit | 124386bc6077c1a9e56578fe800b31d43e15a648 (patch) | |
tree | a3a4bfe86d68f3773c18e3b961e29daa436168f4 | |
parent | cbe2315f1a511478d51968ca0f09aa7775540903 (diff) | |
download | mongo-124386bc6077c1a9e56578fe800b31d43e15a648.tar.gz |
SERVER-16951: Improve TicketHolder and add semaphore version
(cherry picked from commit 8299d7435855820d16b25f4a66b572ddf6a11cf5)
-rw-r--r-- | src/mongo/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/util/concurrency/SConscript | 5 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 201 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 105 |
4 files changed, 252 insertions, 62 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index b51b1238fd6..21f0b53a975 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -273,7 +273,8 @@ env.Library('network', [ "util/net/message.cpp", "util/net/message_port.cpp", "util/net/listen.cpp" ], - LIBDEPS=['$BUILD_DIR/mongo/util/options_parser/options_parser', + LIBDEPS=['$BUILD_DIR/mongo/util/concurrency/ticketholder', + '$BUILD_DIR/mongo/util/options_parser/options_parser', 'background_job', 'fail_point', 'foundation', diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript index c80f4d9b49b..ef9b8e66a3d 100644 --- a/src/mongo/util/concurrency/SConscript +++ b/src/mongo/util/concurrency/SConscript @@ -6,3 +6,8 @@ env.Library('thread_name', ['thread_name.cpp'], LIBDEPS=['$BUILD_DIR/mongo/base/base', '$BUILD_DIR/third_party/shim_boost']) + +env.Library('ticketholder', + ['ticketholder.cpp'], + LIBDEPS=['$BUILD_DIR/mongo/base/base', + '$BUILD_DIR/third_party/shim_boost']) diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp new file mode 100644 index 00000000000..44793197daf --- /dev/null +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -0,0 +1,201 @@ +/* Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/util/concurrency/ticketholder.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +#if defined(__linux__) + namespace { + void _check(int ret) { + if (ret == 0) + return; + int err = errno; + severe() << "error in Ticketholder: " << errnoWithDescription(err); + fassertFailed(28604); + } + } + + TicketHolder::TicketHolder(int num) + : _outof(num) { + _check(sem_init(&_sem, 0, num)); + } + + TicketHolder::~TicketHolder(){ + _check(sem_destroy(&_sem)); + } + + bool TicketHolder::tryAcquire() { + while (0 != sem_trywait(&_sem)) { + switch(errno) { + case EAGAIN: return false; + case EINTR: break; + default: _check(-1); + } + } + return true; + } + + void TicketHolder::waitForTicket() { + while (0 != sem_wait(&_sem)) { + switch(errno) { + case EINTR: break; + default: _check(-1); + } + } + } + + void TicketHolder::release() { + _check(sem_post(&_sem)); + } + + Status TicketHolder::resize(int newSize) { + boost::mutex::scoped_lock lk(_resizeMutex); + + if (newSize < 5) + return Status(ErrorCodes::BadValue, + str::stream() << "Minimum value for semaphore is 5; given " + << newSize); + + if (newSize > SEM_VALUE_MAX) + return Status(ErrorCodes::BadValue, + str::stream() << "Maximum value for semaphore is " + << SEM_VALUE_MAX << "; given " << newSize ); + + while (_outof.load() < newSize) { + release(); + _outof.fetchAndAdd(1); + } + + while (_outof.load() > newSize) { + waitForTicket(); + _outof.subtractAndFetch(1); + } + + invariant(_outof.load() == newSize); + return Status::OK(); + } + + int TicketHolder::available() const { + int val = 0; + _check(sem_getvalue(&_sem, &val)); + return val; + } + + int TicketHolder::used() const { + return outof() - available(); + } + + int TicketHolder::outof() const { + return _outof.load(); + } + +#else + + TicketHolder::TicketHolder( int num ) + : _outof(num), + _num(num), + _mutex("TicketHolder") { + } + + TicketHolder::~TicketHolder(){ + } + + bool TicketHolder::tryAcquire() { + scoped_lock lk( _mutex ); + return _tryAcquire(); + } + + void TicketHolder::waitForTicket() { + scoped_lock lk( _mutex ); + + while( ! _tryAcquire() ) { + _newTicket.wait( lk.boost() ); + } + } + + void TicketHolder::release() { + { + scoped_lock lk( _mutex ); + _num++; + } + _newTicket.notify_one(); + } + + Status TicketHolder::resize( int newSize ) { + scoped_lock lk( _mutex ); + + int used = _outof.load() - _num; + if ( used > newSize ) { + std::stringstream ss; + ss << "can't resize since we're using (" << used << ") " + << "more than newSize(" << newSize << ")"; + + std::string errmsg = ss.str(); + log() << errmsg; + return Status(ErrorCodes::BadValue, errmsg); + } + + _outof.store(newSize); + _num = _outof.load() - used; + + // Potentially wasteful, but easier to see is correct + _newTicket.notify_all(); + return Status::OK(); + } + + int TicketHolder::available() const { + return _num; + } + + int TicketHolder::used() const { + return outof() - _num; + } + + int TicketHolder::outof() const { + return _outof.load(); + } + + bool TicketHolder::_tryAcquire(){ + if ( _num <= 0 ) { + if ( _num < 0 ) { + std::cerr << "DISASTER! in TicketHolder" << std::endl; + } + return false; + } + _num--; + return true; + } +#endif + +} diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index cc8037378f9..e9058675ec9 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -1,4 +1,4 @@ -/* Copyright 2009 10gen Inc. +/* Copyright 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, @@ -26,86 +26,52 @@ */ #pragma once +#if defined(__linux__) +#include <semaphore.h> +#endif + #include <boost/thread/condition_variable.hpp> -#include <iostream> +#include "mongo/base/disallow_copying.h" #include "mongo/util/concurrency/mutex.h" namespace mongo { class TicketHolder { + MONGO_DISALLOW_COPYING(TicketHolder); public: - TicketHolder( int num ) : _mutex("TicketHolder") { - _outof = num; - _num = num; - } - - bool tryAcquire() { - scoped_lock lk( _mutex ); - return _tryAcquire(); - } - - void waitForTicket() { - scoped_lock lk( _mutex ); - - while( ! _tryAcquire() ) { - _newTicket.wait( lk.boost() ); - } - } + explicit TicketHolder(int num); + ~TicketHolder(); - void release() { - { - scoped_lock lk( _mutex ); - _num++; - } - _newTicket.notify_one(); - } + bool tryAcquire(); - void resize( int newSize ) { - { - scoped_lock lk( _mutex ); + void waitForTicket(); - int used = _outof - _num; - if ( used > newSize ) { - std::cout << "can't resize since we're using (" << used << ") more than newSize(" << newSize << ")" << std::endl; - return; - } + void release(); - _outof = newSize; - _num = _outof - used; - } + Status resize(int newSize); - // Potentially wasteful, but easier to see is correct - _newTicket.notify_all(); - } + int available() const; - int available() const { - return _num; - } + int used() const; - int used() const { - return _outof - _num; - } - - int outof() const { return _outof; } + int outof() const; private: +#if defined(__linux__) + mutable sem_t _sem; - bool _tryAcquire(){ - if ( _num <= 0 ) { - if ( _num < 0 ) { - std::cerr << "DISASTER! in TicketHolder" << std::endl; - } - return false; - } - _num--; - return true; - } + // You can read _outof without a lock, but have to hold _resizeMutex to change. + AtomicInt32 _outof; + boost::mutex _resizeMutex; +#else + bool _tryAcquire(); - int _outof; + AtomicInt32 _outof; int _num; mongo::mutex _mutex; boost::condition_variable_any _newTicket; +#endif }; class ScopedTicket { @@ -124,14 +90,31 @@ namespace mongo { }; class TicketHolderReleaser { + MONGO_DISALLOW_COPYING(TicketHolderReleaser); public: - TicketHolderReleaser( TicketHolder * holder ) { + TicketHolderReleaser() { + _holder = NULL; + } + + explicit TicketHolderReleaser(TicketHolder* holder) { _holder = holder; } ~TicketHolderReleaser() { - _holder->release(); + if (_holder) { + _holder->release(); + } } + + bool hasTicket() const { return _holder != NULL; } + + void reset(TicketHolder* holder = NULL) { + if (_holder) { + _holder->release(); + } + _holder = holder; + } + private: TicketHolder * _holder; }; |