From 2f70889bbfd4dea77cd26cec2dde28193b06905d Mon Sep 17 00:00:00 2001 From: Mathias Stearn Date: Thu, 29 Oct 2015 17:02:55 -0400 Subject: SERVER-21154 Batch and parse oplog entries in parallel with applying them This includes the start of SERVER-21155. --- src/mongo/util/queue.h | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) (limited to 'src/mongo/util/queue.h') diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h index a1b8caac1a0..6cdb62538ef 100644 --- a/src/mongo/util/queue.h +++ b/src/mongo/util/queue.h @@ -32,9 +32,10 @@ #include #include +#include "mongo/base/disallow_copying.h" #include "mongo/stdx/chrono.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/base/disallow_copying.h" +#include "mongo/stdx/mutex.h" namespace mongo { @@ -62,6 +63,11 @@ public: BlockingQueue(size_t size) : _maxSize(size), _getSize(&_getSizeDefault) {} BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _getSize(f) {} + void pushEvenIfFull(T const& t) { + stdx::unique_lock l(_lock); + pushImpl_inlock(t, _getSize(t)); + } + void push(T const& t) { stdx::unique_lock l(_lock); _clearing = false; @@ -69,9 +75,7 @@ public: while (_currentSize + tSize > _maxSize) { _cvNoLongerFull.wait(l); } - _queue.push(t); - _currentSize += tSize; - _cvNoLongerEmpty.notify_one(); + pushImpl_inlock(t, tSize); } bool empty() const { @@ -198,6 +202,14 @@ public: } private: + void pushImpl_inlock(const T& obj, size_t objSize) { + _clearing = false; + _queue.push(obj); + _currentSize += objSize; + if (_queue.size() == 1) // We were empty. + _cvNoLongerEmpty.notify_one(); + } + mutable stdx::mutex _lock; std::queue _queue; const size_t _maxSize; -- cgit v1.2.1