summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2015-01-27 18:52:48 -0500
committerDan Pasette <dan@mongodb.com>2015-01-27 23:44:14 -0500
commit124386bc6077c1a9e56578fe800b31d43e15a648 (patch)
treea3a4bfe86d68f3773c18e3b961e29daa436168f4
parentcbe2315f1a511478d51968ca0f09aa7775540903 (diff)
downloadmongo-124386bc6077c1a9e56578fe800b31d43e15a648.tar.gz
SERVER-16951: Improve TicketHolder and add semaphore version
(cherry picked from commit 8299d7435855820d16b25f4a66b572ddf6a11cf5)
-rw-r--r--src/mongo/SConscript3
-rw-r--r--src/mongo/util/concurrency/SConscript5
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp201
-rw-r--r--src/mongo/util/concurrency/ticketholder.h105
4 files changed, 252 insertions, 62 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index b51b1238fd6..21f0b53a975 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -273,7 +273,8 @@ env.Library('network', [
"util/net/message.cpp",
"util/net/message_port.cpp",
"util/net/listen.cpp" ],
- LIBDEPS=['$BUILD_DIR/mongo/util/options_parser/options_parser',
+ LIBDEPS=['$BUILD_DIR/mongo/util/concurrency/ticketholder',
+ '$BUILD_DIR/mongo/util/options_parser/options_parser',
'background_job',
'fail_point',
'foundation',
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript
index c80f4d9b49b..ef9b8e66a3d 100644
--- a/src/mongo/util/concurrency/SConscript
+++ b/src/mongo/util/concurrency/SConscript
@@ -6,3 +6,8 @@ env.Library('thread_name',
['thread_name.cpp'],
LIBDEPS=['$BUILD_DIR/mongo/base/base',
'$BUILD_DIR/third_party/shim_boost'])
+
+env.Library('ticketholder',
+ ['ticketholder.cpp'],
+ LIBDEPS=['$BUILD_DIR/mongo/base/base',
+ '$BUILD_DIR/third_party/shim_boost'])
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
new file mode 100644
index 00000000000..44793197daf
--- /dev/null
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -0,0 +1,201 @@
+/* Copyright 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/util/concurrency/ticketholder.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+#if defined(__linux__)
+ namespace {
+ void _check(int ret) {
+ if (ret == 0)
+ return;
+ int err = errno;
+ severe() << "error in Ticketholder: " << errnoWithDescription(err);
+ fassertFailed(28604);
+ }
+ }
+
+ TicketHolder::TicketHolder(int num)
+ : _outof(num) {
+ _check(sem_init(&_sem, 0, num));
+ }
+
+ TicketHolder::~TicketHolder(){
+ _check(sem_destroy(&_sem));
+ }
+
+ bool TicketHolder::tryAcquire() {
+ while (0 != sem_trywait(&_sem)) {
+ switch(errno) {
+ case EAGAIN: return false;
+ case EINTR: break;
+ default: _check(-1);
+ }
+ }
+ return true;
+ }
+
+ void TicketHolder::waitForTicket() {
+ while (0 != sem_wait(&_sem)) {
+ switch(errno) {
+ case EINTR: break;
+ default: _check(-1);
+ }
+ }
+ }
+
+ void TicketHolder::release() {
+ _check(sem_post(&_sem));
+ }
+
+ Status TicketHolder::resize(int newSize) {
+ boost::mutex::scoped_lock lk(_resizeMutex);
+
+ if (newSize < 5)
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "Minimum value for semaphore is 5; given "
+ << newSize);
+
+ if (newSize > SEM_VALUE_MAX)
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "Maximum value for semaphore is "
+ << SEM_VALUE_MAX << "; given " << newSize );
+
+ while (_outof.load() < newSize) {
+ release();
+ _outof.fetchAndAdd(1);
+ }
+
+ while (_outof.load() > newSize) {
+ waitForTicket();
+ _outof.subtractAndFetch(1);
+ }
+
+ invariant(_outof.load() == newSize);
+ return Status::OK();
+ }
+
+ int TicketHolder::available() const {
+ int val = 0;
+ _check(sem_getvalue(&_sem, &val));
+ return val;
+ }
+
+ int TicketHolder::used() const {
+ return outof() - available();
+ }
+
+ int TicketHolder::outof() const {
+ return _outof.load();
+ }
+
+#else
+
+ TicketHolder::TicketHolder( int num )
+ : _outof(num),
+ _num(num),
+ _mutex("TicketHolder") {
+ }
+
+ TicketHolder::~TicketHolder(){
+ }
+
+ bool TicketHolder::tryAcquire() {
+ scoped_lock lk( _mutex );
+ return _tryAcquire();
+ }
+
+ void TicketHolder::waitForTicket() {
+ scoped_lock lk( _mutex );
+
+ while( ! _tryAcquire() ) {
+ _newTicket.wait( lk.boost() );
+ }
+ }
+
+ void TicketHolder::release() {
+ {
+ scoped_lock lk( _mutex );
+ _num++;
+ }
+ _newTicket.notify_one();
+ }
+
+ Status TicketHolder::resize( int newSize ) {
+ scoped_lock lk( _mutex );
+
+ int used = _outof.load() - _num;
+ if ( used > newSize ) {
+ std::stringstream ss;
+ ss << "can't resize since we're using (" << used << ") "
+ << "more than newSize(" << newSize << ")";
+
+ std::string errmsg = ss.str();
+ log() << errmsg;
+ return Status(ErrorCodes::BadValue, errmsg);
+ }
+
+ _outof.store(newSize);
+ _num = _outof.load() - used;
+
+ // Potentially wasteful, but easier to see is correct
+ _newTicket.notify_all();
+ return Status::OK();
+ }
+
+ int TicketHolder::available() const {
+ return _num;
+ }
+
+ int TicketHolder::used() const {
+ return outof() - _num;
+ }
+
+ int TicketHolder::outof() const {
+ return _outof.load();
+ }
+
+ bool TicketHolder::_tryAcquire(){
+ if ( _num <= 0 ) {
+ if ( _num < 0 ) {
+ std::cerr << "DISASTER! in TicketHolder" << std::endl;
+ }
+ return false;
+ }
+ _num--;
+ return true;
+ }
+#endif
+
+}
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index cc8037378f9..e9058675ec9 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -1,4 +1,4 @@
-/* Copyright 2009 10gen Inc.
+/* Copyright 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
@@ -26,86 +26,52 @@
*/
#pragma once
+#if defined(__linux__)
+#include <semaphore.h>
+#endif
+
#include <boost/thread/condition_variable.hpp>
-#include <iostream>
+#include "mongo/base/disallow_copying.h"
#include "mongo/util/concurrency/mutex.h"
namespace mongo {
class TicketHolder {
+ MONGO_DISALLOW_COPYING(TicketHolder);
public:
- TicketHolder( int num ) : _mutex("TicketHolder") {
- _outof = num;
- _num = num;
- }
-
- bool tryAcquire() {
- scoped_lock lk( _mutex );
- return _tryAcquire();
- }
-
- void waitForTicket() {
- scoped_lock lk( _mutex );
-
- while( ! _tryAcquire() ) {
- _newTicket.wait( lk.boost() );
- }
- }
+ explicit TicketHolder(int num);
+ ~TicketHolder();
- void release() {
- {
- scoped_lock lk( _mutex );
- _num++;
- }
- _newTicket.notify_one();
- }
+ bool tryAcquire();
- void resize( int newSize ) {
- {
- scoped_lock lk( _mutex );
+ void waitForTicket();
- int used = _outof - _num;
- if ( used > newSize ) {
- std::cout << "can't resize since we're using (" << used << ") more than newSize(" << newSize << ")" << std::endl;
- return;
- }
+ void release();
- _outof = newSize;
- _num = _outof - used;
- }
+ Status resize(int newSize);
- // Potentially wasteful, but easier to see is correct
- _newTicket.notify_all();
- }
+ int available() const;
- int available() const {
- return _num;
- }
+ int used() const;
- int used() const {
- return _outof - _num;
- }
-
- int outof() const { return _outof; }
+ int outof() const;
private:
+#if defined(__linux__)
+ mutable sem_t _sem;
- bool _tryAcquire(){
- if ( _num <= 0 ) {
- if ( _num < 0 ) {
- std::cerr << "DISASTER! in TicketHolder" << std::endl;
- }
- return false;
- }
- _num--;
- return true;
- }
+ // You can read _outof without a lock, but have to hold _resizeMutex to change.
+ AtomicInt32 _outof;
+ boost::mutex _resizeMutex;
+#else
+ bool _tryAcquire();
- int _outof;
+ AtomicInt32 _outof;
int _num;
mongo::mutex _mutex;
boost::condition_variable_any _newTicket;
+#endif
};
class ScopedTicket {
@@ -124,14 +90,31 @@ namespace mongo {
};
class TicketHolderReleaser {
+ MONGO_DISALLOW_COPYING(TicketHolderReleaser);
public:
- TicketHolderReleaser( TicketHolder * holder ) {
+ TicketHolderReleaser() {
+ _holder = NULL;
+ }
+
+ explicit TicketHolderReleaser(TicketHolder* holder) {
_holder = holder;
}
~TicketHolderReleaser() {
- _holder->release();
+ if (_holder) {
+ _holder->release();
+ }
}
+
+ bool hasTicket() const { return _holder != NULL; }
+
+ void reset(TicketHolder* holder = NULL) {
+ if (_holder) {
+ _holder->release();
+ }
+ _holder = holder;
+ }
+
private:
TicketHolder * _holder;
};