diff options
Diffstat (limited to 'src/mongo/util/concurrency')
-rw-r--r-- | src/mongo/util/concurrency/msg.h | 2 | ||||
-rw-r--r-- | src/mongo/util/concurrency/mvar.h | 3 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool.cpp | 3 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool.h | 9 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 110 |
5 files changed, 124 insertions, 3 deletions
diff --git a/src/mongo/util/concurrency/msg.h b/src/mongo/util/concurrency/msg.h index 0b9a7c5048c..6e8a3980246 100644 --- a/src/mongo/util/concurrency/msg.h +++ b/src/mongo/util/concurrency/msg.h @@ -19,6 +19,8 @@ #pragma once #include <deque> + +#include <boost/thread/condition.hpp> #include "task.h" namespace mongo { diff --git a/src/mongo/util/concurrency/mvar.h b/src/mongo/util/concurrency/mvar.h index bc1855a85cc..d5ceb28ce30 100644 --- a/src/mongo/util/concurrency/mvar.h +++ b/src/mongo/util/concurrency/mvar.h @@ -17,6 +17,9 @@ #pragma once +#include <boost/thread/recursive_mutex.hpp> +#include <boost/thread/condition.hpp> + namespace mongo { /* This is based on haskell's MVar synchronization primitive: diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp index b64a904cd4a..78eb05556a9 100644 --- a/src/mongo/util/concurrency/thread_pool.cpp +++ b/src/mongo/util/concurrency/thread_pool.cpp @@ -17,6 +17,9 @@ */ #include "pch.h" + +#include <boost/thread/thread.hpp> + #include "thread_pool.h" #include "mvar.h" diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h index ea2f801b63c..62a9cac57d1 100644 --- a/src/mongo/util/concurrency/thread_pool.h +++ b/src/mongo/util/concurrency/thread_pool.h @@ -17,8 +17,11 @@ #pragma once -#include <boost/function.hpp> +#include <list> + #include <boost/bind.hpp> +#include <boost/function.hpp> +#include <boost/thread/condition.hpp> namespace mongo { @@ -63,8 +66,8 @@ namespace mongo { mongo::mutex _mutex; boost::condition _condition; - list<Worker*> _freeWorkers; //used as LIFO stack (always front) - list<Task> _tasks; //used as FIFO queue (push_back, pop_front) + std::list<Worker*> _freeWorkers; //used as LIFO stack (always front) + std::list<Task> _tasks; //used as FIFO queue (push_back, pop_front) int _tasksRemaining; // in queue + currently processing int _nThreads; // only used for sanity checking. could be removed in the future. diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h new file mode 100644 index 00000000000..036d33fb9ef --- /dev/null +++ b/src/mongo/util/concurrency/ticketholder.h @@ -0,0 +1,110 @@ +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <boost/thread/condition_variable.hpp> + +#include "mongo/util/concurrency/mutex.h" + +namespace mongo { + + class TicketHolder { + public: + TicketHolder( int num ) : _mutex("TicketHolder") { + _outof = num; + _num = num; + } + + bool tryAcquire() { + scoped_lock lk( _mutex ); + return _tryAcquire(); + } + + void waitForTicket() { + scoped_lock lk( _mutex ); + + while( ! _tryAcquire() ) { + _newTicket.wait( lk.boost() ); + } + } + + void release() { + { + scoped_lock lk( _mutex ); + _num++; + } + _newTicket.notify_one(); + } + + void resize( int newSize ) { + { + scoped_lock lk( _mutex ); + + int used = _outof - _num; + if ( used > newSize ) { + cout << "ERROR: can't resize since we're using (" << used << ") more than newSize(" << newSize << ")" << endl; + return; + } + + _outof = newSize; + _num = _outof - used; + } + + // Potentially wasteful, but easier to see is correct + _newTicket.notify_all(); + } + + int available() const { + return _num; + } + + int used() const { + return _outof - _num; + } + + int outof() const { return _outof; } + + private: + + bool _tryAcquire(){ + if ( _num <= 0 ) { + if ( _num < 0 ) { + cerr << "DISASTER! in TicketHolder" << endl; + } + return false; + } + _num--; + return true; + } + + int _outof; + int _num; + mongo::mutex _mutex; + boost::condition_variable_any _newTicket; + }; + + class TicketHolderReleaser { + public: + TicketHolderReleaser( TicketHolder * holder ) { + _holder = holder; + } + + ~TicketHolderReleaser() { + _holder->release(); + } + private: + TicketHolder * _holder; + }; +} |