summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/concurrency')
-rw-r--r--src/mongo/util/concurrency/msg.h2
-rw-r--r--src/mongo/util/concurrency/mvar.h3
-rw-r--r--src/mongo/util/concurrency/thread_pool.cpp3
-rw-r--r--src/mongo/util/concurrency/thread_pool.h9
-rw-r--r--src/mongo/util/concurrency/ticketholder.h110
5 files changed, 124 insertions, 3 deletions
diff --git a/src/mongo/util/concurrency/msg.h b/src/mongo/util/concurrency/msg.h
index 0b9a7c5048c..6e8a3980246 100644
--- a/src/mongo/util/concurrency/msg.h
+++ b/src/mongo/util/concurrency/msg.h
@@ -19,6 +19,8 @@
#pragma once
#include <deque>
+
+#include <boost/thread/condition.hpp>
#include "task.h"
namespace mongo {
diff --git a/src/mongo/util/concurrency/mvar.h b/src/mongo/util/concurrency/mvar.h
index bc1855a85cc..d5ceb28ce30 100644
--- a/src/mongo/util/concurrency/mvar.h
+++ b/src/mongo/util/concurrency/mvar.h
@@ -17,6 +17,9 @@
#pragma once
+#include <boost/thread/recursive_mutex.hpp>
+#include <boost/thread/condition.hpp>
+
namespace mongo {
/* This is based on haskell's MVar synchronization primitive:
diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp
index b64a904cd4a..78eb05556a9 100644
--- a/src/mongo/util/concurrency/thread_pool.cpp
+++ b/src/mongo/util/concurrency/thread_pool.cpp
@@ -17,6 +17,9 @@
*/
#include "pch.h"
+
+#include <boost/thread/thread.hpp>
+
#include "thread_pool.h"
#include "mvar.h"
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h
index ea2f801b63c..62a9cac57d1 100644
--- a/src/mongo/util/concurrency/thread_pool.h
+++ b/src/mongo/util/concurrency/thread_pool.h
@@ -17,8 +17,11 @@
#pragma once
-#include <boost/function.hpp>
+#include <list>
+
#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/thread/condition.hpp>
namespace mongo {
@@ -63,8 +66,8 @@ namespace mongo {
mongo::mutex _mutex;
boost::condition _condition;
- list<Worker*> _freeWorkers; //used as LIFO stack (always front)
- list<Task> _tasks; //used as FIFO queue (push_back, pop_front)
+ std::list<Worker*> _freeWorkers; //used as LIFO stack (always front)
+ std::list<Task> _tasks; //used as FIFO queue (push_back, pop_front)
int _tasksRemaining; // in queue + currently processing
int _nThreads; // only used for sanity checking. could be removed in the future.
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
new file mode 100644
index 00000000000..036d33fb9ef
--- /dev/null
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -0,0 +1,110 @@
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <boost/thread/condition_variable.hpp>
+
+#include "mongo/util/concurrency/mutex.h"
+
+namespace mongo {
+
+ class TicketHolder {
+ public:
+ TicketHolder( int num ) : _mutex("TicketHolder") {
+ _outof = num;
+ _num = num;
+ }
+
+ bool tryAcquire() {
+ scoped_lock lk( _mutex );
+ return _tryAcquire();
+ }
+
+ void waitForTicket() {
+ scoped_lock lk( _mutex );
+
+ while( ! _tryAcquire() ) {
+ _newTicket.wait( lk.boost() );
+ }
+ }
+
+ void release() {
+ {
+ scoped_lock lk( _mutex );
+ _num++;
+ }
+ _newTicket.notify_one();
+ }
+
+ void resize( int newSize ) {
+ {
+ scoped_lock lk( _mutex );
+
+ int used = _outof - _num;
+ if ( used > newSize ) {
+ cout << "ERROR: can't resize since we're using (" << used << ") more than newSize(" << newSize << ")" << endl;
+ return;
+ }
+
+ _outof = newSize;
+ _num = _outof - used;
+ }
+
+ // Potentially wasteful, but easier to see is correct
+ _newTicket.notify_all();
+ }
+
+ int available() const {
+ return _num;
+ }
+
+ int used() const {
+ return _outof - _num;
+ }
+
+ int outof() const { return _outof; }
+
+ private:
+
+ bool _tryAcquire(){
+ if ( _num <= 0 ) {
+ if ( _num < 0 ) {
+ cerr << "DISASTER! in TicketHolder" << endl;
+ }
+ return false;
+ }
+ _num--;
+ return true;
+ }
+
+ int _outof;
+ int _num;
+ mongo::mutex _mutex;
+ boost::condition_variable_any _newTicket;
+ };
+
+ class TicketHolderReleaser {
+ public:
+ TicketHolderReleaser( TicketHolder * holder ) {
+ _holder = holder;
+ }
+
+ ~TicketHolderReleaser() {
+ _holder->release();
+ }
+ private:
+ TicketHolder * _holder;
+ };
+}