diff options
author | Benety Goh <benety@mongodb.com> | 2016-06-13 16:58:05 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-06-14 18:33:50 -0400 |
commit | 0f744edcde0011533bc2d88b04581199cfee9070 (patch) | |
tree | 58145f4fd77e93f5fa3c2efb3ebd1f348b367da6 /src | |
parent | dbbad24bfbc6d391dffe5902977431d90201db3b (diff) | |
download | mongo-0f744edcde0011533bc2d88b04581199cfee9070.tar.gz |
SERVER-24560 replaced references to BlockingQueue in BackgroundSync with OplogBuffer
BackgroundSync now accepts an OplogBuffer at construction
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 45 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer.h | 50 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_blocking_queue.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_blocking_queue.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 4 |
8 files changed, 146 insertions, 25 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 144f14f5fbe..9aecb8d1d94 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -143,20 +143,25 @@ static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count" static Counter64 bufferSizeGauge; static ServerStatusMetricField<Counter64> displayBufferSize("repl.buffer.sizeBytes", &bufferSizeGauge); -// The max size (bytes) of the buffer -static int bufferMaxSizeGauge = 256 * 1024 * 1024; -static ServerStatusMetricField<int> displayBufferMaxSize("repl.buffer.maxSizeBytes", - &bufferMaxSizeGauge); +// The max size (bytes) of the buffer. If the buffer does not have a size constraint, this is +// set to 0. +static Counter64 bufferMaxSizeGauge; +static ServerStatusMetricField<Counter64> displayBufferMaxSize("repl.buffer.maxSizeBytes", + &bufferMaxSizeGauge); -BackgroundSync::BackgroundSync() - : _buffer(bufferMaxSizeGauge, &getSize), +BackgroundSync::BackgroundSync(std::unique_ptr<OplogBuffer> oplogBuffer) + : _oplogBuffer(std::move(oplogBuffer)), _threadPoolTaskExecutor(makeThreadPool(), executor::makeNetworkInterface("NetworkInterfaceASIO-BGSync")), _replCoord(getGlobalReplicationCoordinator()), _syncSourceResolver(_replCoord), _lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0), - std::numeric_limits<long long>::max()) {} + std::numeric_limits<long long>::max()) { + // Update "repl.buffer.maxSizeBytes" server status metric to reflect the current oplog buffer's + // max size. + bufferMaxSizeGauge.increment(_oplogBuffer->getMaxSize() - bufferMaxSizeGauge.get()); +} void BackgroundSync::shutdown() { stdx::lock_guard<stdx::mutex> lock(_mutex); @@ -176,10 +181,12 @@ void BackgroundSync::producerThread( Client::initThread("rsBackgroundSync"); AuthorizationSession::get(cc())->grantInternalAuthorization(); + _oplogBuffer->startup(); _threadPoolTaskExecutor.startup(); ON_BLOCK_EXIT([this]() { _threadPoolTaskExecutor.shutdown(); _threadPoolTaskExecutor.join(); + _oplogBuffer->shutdown(); }); while (!inShutdown()) { @@ -201,10 +208,10 @@ void BackgroundSync::producerThread( void BackgroundSync::_signalNoNewDataForApplier() { // Signal to consumers that we have entered the stopped state // if the signal isn't already in the queue. - const boost::optional<BSONObj> lastObjectPushed = _buffer.lastObjectPushed(); + const boost::optional<BSONObj> lastObjectPushed = _oplogBuffer->lastObjectPushed(); if (!lastObjectPushed || !lastObjectPushed->isEmpty()) { const BSONObj sentinelDoc; - _buffer.pushEvenIfFull(sentinelDoc); + _oplogBuffer->pushEvenIfFull(sentinelDoc); bufferCountGauge.increment(); bufferSizeGauge.increment(sentinelDoc.objsize()); } @@ -486,14 +493,14 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, } // Wait for enough space. - _buffer.waitForSpace(info.toApplyDocumentBytes); + _oplogBuffer->waitForSpace(info.toApplyDocumentBytes); OCCASIONALLY { - LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes"; + LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize() << " bytes"; } // Buffer docs for later application. - _buffer.pushAllNonBlocking(begin, end); + _oplogBuffer->pushAllNonBlocking(begin, end); // Update last fetched info. { @@ -507,20 +514,20 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, } bool BackgroundSync::peek(BSONObj* op) { - return _buffer.peek(*op); + return _oplogBuffer->peek(op); } void BackgroundSync::waitForMore() { BSONObj op; // Block for one second before timing out. // Ignore the value of the op we peeked at. - _buffer.blockingPeek(op, 1); + _oplogBuffer->blockingPeek(&op, Seconds(1)); } void BackgroundSync::consume() { // this is just to get the op off the queue, it's been peeked at // and queued for application already - BSONObj op = _buffer.blockingPop(); + BSONObj op = _oplogBuffer->blockingPop(); bufferCountGauge.decrement(1); bufferSizeGauge.decrement(getSize(op)); } @@ -540,7 +547,7 @@ void BackgroundSync::_rollback(OperationContext* txn, // Wait until the buffer is empty. // This is an indication that syncTail has removed the sentinal marker from the buffer // and reset its local lastAppliedOpTime via the replCoord. - while (!_buffer.empty()) { + while (!_oplogBuffer->isEmpty()) { sleepmillis(10); if (inShutdown()) { return; @@ -596,7 +603,7 @@ void BackgroundSync::stop() { } void BackgroundSync::start(OperationContext* txn) { - massert(16235, "going to start syncing, but buffer is not empty", _buffer.empty()); + massert(16235, "going to start syncing, but buffer is not empty", _oplogBuffer->isEmpty()); long long lastFetchedHash = _readLastAppliedHash(txn); stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -615,7 +622,7 @@ bool BackgroundSync::isStopped() const { } void BackgroundSync::clearBuffer() { - _buffer.clear(); + _oplogBuffer->clear(); const auto count = bufferCountGauge.get(); bufferCountGauge.decrement(count); const auto size = bufferSizeGauge.get(); @@ -684,7 +691,7 @@ bool BackgroundSync::shouldStopFetching() const { } void BackgroundSync::pushTestOpToBuffer(const BSONObj& op) { - _buffer.push(op); + _oplogBuffer->push(op); bufferCountGauge.increment(); bufferSizeGauge.increment(op.objsize()); } diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 14d1432aa31..817746350d8 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -34,6 +34,7 @@ #include "mongo/base/status_with.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/data_replicator_external_state.h" +#include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/sync_source_resolver.h" @@ -42,7 +43,6 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" -#include "mongo/util/queue.h" namespace mongo { @@ -62,7 +62,7 @@ class ReplicationCoordinatorExternalState; */ class BackgroundSync { public: - BackgroundSync(); + BackgroundSync(std::unique_ptr<OplogBuffer> oplogBuffer); MONGO_DISALLOW_COPYING(BackgroundSync); // stop syncing (when this node becomes a primary, e.g.) @@ -156,7 +156,7 @@ private: long long _readLastAppliedHash(OperationContext* txn); // Production thread - BlockingQueue<BSONObj> _buffer; + std::unique_ptr<OplogBuffer> _oplogBuffer; // Task executor used to run find/getMore commands on sync source. executor::ThreadPoolTaskExecutor _threadPoolTaskExecutor; diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h index 43ec18ce077..3c172cf650e 100644 --- a/src/mongo/db/repl/oplog_buffer.h +++ b/src/mongo/db/repl/oplog_buffer.h @@ -28,11 +28,13 @@ #pragma once +#include <boost/optional.hpp> #include <cstddef> #include <vector> #include "mongo/base/disallow_copying.h" #include "mongo/bson/bsonobj.h" +#include "mongo/util/time_support.h" namespace mongo { namespace repl { @@ -77,6 +79,21 @@ public: virtual void shutdown() = 0; /** + * Pushes operation into oplog buffer, ignoring any size constraints. Does not block. + * If the oplog buffer is already full, this will cause the size of the oplog buffer to exceed + * the limit returned by getMaxSize() but should not otherwise adversely affect normal + * functionality such as pushing and popping operations from the oplog buffer. + */ + virtual void pushEvenIfFull(const Value& value) = 0; + + /** + * Pushes operation into oplog buffer. + * If there are size constraints on the oplog buffer, this may block until sufficient space + * is made available (by popping) to complete this operation. + */ + virtual void push(const Value& value) = 0; + + /** * Pushes operations in the iterator range [begin, end) into the oplog buffer without blocking. * * Returns false if there is insufficient space to complete this operation successfully. @@ -89,7 +106,20 @@ public: virtual void waitForSpace(std::size_t size) = 0; /** - * Total size of all oplog entries in this oplog buffer as measured by the BSONObj::size() + * Returns true if oplog buffer is empty. + */ + virtual bool isEmpty() const = 0; + + /** + * Maximum size of all oplog entries that can be stored in this oplog buffer as measured by the + * BSONObj::objsize() function. + * + * Returns 0 if this oplog buffer has no size constraints. + */ + virtual std::size_t getMaxSize() const = 0; + + /** + * Total size of all oplog entries in this oplog buffer as measured by the BSONObj::objsize() * function. */ virtual std::size_t getSize() const = 0; @@ -111,10 +141,28 @@ public: virtual bool tryPop(Value* value) = 0; /** + * Pops the last operation in the oplog buffer. + * If the oplog buffer is empty, waits until an operation is pushed. + */ + virtual Value blockingPop() = 0; + + /** + * Waits "waitDuration" for an operation to be pushed into the oplog buffer. + * Returns false if oplog buffer is still empty after "waitDuration". + * Otherwise, returns true and sets "value" to last item in oplog buffer. + */ + virtual bool blockingPeek(Value* value, Seconds waitDuration) = 0; + + /** * Returns false if oplog buffer is empty. * Otherwise, returns true and sets "value" to last item in oplog buffer. */ virtual bool peek(Value* value) = 0; + + /** + * Returns the item most recently added to the oplog buffer or nothing if the buffer is empty. + */ + virtual boost::optional<Value> lastObjectPushed() const = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp index ffa01bfd2c4..ff575cae145 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp @@ -53,6 +53,14 @@ void OplogBufferBlockingQueue::shutdown() { clear(); } +void OplogBufferBlockingQueue::pushEvenIfFull(const Value& value) { + _queue.pushEvenIfFull(value); +} + +void OplogBufferBlockingQueue::push(const Value& value) { + _queue.push(value); +} + bool OplogBufferBlockingQueue::pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) { _queue.pushAllNonBlocking(begin, end); @@ -63,6 +71,14 @@ void OplogBufferBlockingQueue::waitForSpace(std::size_t size) { _queue.waitForSpace(size); } +bool OplogBufferBlockingQueue::isEmpty() const { + return _queue.empty(); +} + +std::size_t OplogBufferBlockingQueue::getMaxSize() const { + return kOplogBufferSize; +} + std::size_t OplogBufferBlockingQueue::getSize() const { return _queue.size(); } @@ -79,9 +95,21 @@ bool OplogBufferBlockingQueue::tryPop(Value* value) { return _queue.tryPop(*value); } +OplogBuffer::Value OplogBufferBlockingQueue::blockingPop() { + return _queue.blockingPop(); +} + +bool OplogBufferBlockingQueue::blockingPeek(Value* value, Seconds waitDuration) { + return _queue.blockingPeek(*value, static_cast<int>(durationCount<Seconds>(waitDuration))); +} + bool OplogBufferBlockingQueue::peek(Value* value) { return _queue.peek(*value); } +boost::optional<OplogBuffer::Value> OplogBufferBlockingQueue::lastObjectPushed() const { + return _queue.lastObjectPushed(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h index 5baf78fdf03..0c40c8f1256 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h @@ -43,13 +43,20 @@ public: void startup() override; void shutdown() override; + void pushEvenIfFull(const Value& value) override; + void push(const Value& value) override; bool pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) override; void waitForSpace(std::size_t size) override; + bool isEmpty() const override; + std::size_t getMaxSize() const override; std::size_t getSize() const override; std::size_t getCount() const override; void clear() override; bool tryPop(Value* value) override; + Value blockingPop() override; + bool blockingPeek(Value* value, Seconds waitDuration) override; bool peek(Value* value) override; + boost::optional<Value> lastObjectPushed() const override; private: BlockingQueue<BSONObj> _queue; diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp index 3c060e77443..7af1057e022 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection.cpp @@ -86,6 +86,10 @@ void OplogBufferCollection::startup() { void OplogBufferCollection::shutdown() {} +void OplogBufferCollection::pushEvenIfFull(const Value& value) {} + +void OplogBufferCollection::push(const Value& value) {} + bool OplogBufferCollection::pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) { return false; @@ -93,6 +97,14 @@ bool OplogBufferCollection::pushAllNonBlocking(Batch::const_iterator begin, void OplogBufferCollection::waitForSpace(std::size_t size) {} +bool OplogBufferCollection::isEmpty() const { + return true; +} + +std::size_t OplogBufferCollection::getMaxSize() const { + return 0; +} + std::size_t OplogBufferCollection::getSize() const { return 0; } @@ -107,9 +119,21 @@ bool OplogBufferCollection::tryPop(Value* value) { return false; } +OplogBuffer::Value OplogBufferCollection::blockingPop() { + return {}; +} + +bool OplogBufferCollection::blockingPeek(Value* value, Seconds waitDuration) { + return false; +} + bool OplogBufferCollection::peek(Value* value) { return false; } +boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed() const { + return {}; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h index 99494f961be..c422c2f2c1c 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.h +++ b/src/mongo/db/repl/oplog_buffer_collection.h @@ -56,13 +56,20 @@ public: void startup() override; void shutdown() override; + void pushEvenIfFull(const Value& value) override; + void push(const Value& value) override; bool pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) override; void waitForSpace(std::size_t size) override; + bool isEmpty() const override; + std::size_t getMaxSize() const override; std::size_t getSize() const override; std::size_t getCount() const override; void clear() override; bool tryPop(Value* value) override; + Value blockingPop() override; + bool blockingPeek(Value* value, Seconds waitDuration) override; bool peek(Value* value) override; + boost::optional<Value> lastObjectPushed() const override; private: const NamespaceString _nss; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 51856de7e24..a42aa7ed2e4 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -134,7 +134,7 @@ void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFini log() << "Starting replication fetcher thread"; // Start bgsync. - _bgSync.reset(new BackgroundSync()); + _bgSync.reset(new BackgroundSync(makeSteadyStateOplogBuffer())); invariant(!_producerThread); // The producer thread should not be init'd before this. _producerThread.reset( new stdx::thread(stdx::bind(&BackgroundSync::producerThread, _bgSync.get(), this))); @@ -147,7 +147,7 @@ void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFini void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication() { if (!_producerThread) { log() << "Starting replication fetcher thread"; - _bgSync.reset(new BackgroundSync()); + _bgSync.reset(new BackgroundSync(makeSteadyStateOplogBuffer())); _producerThread.reset( new stdx::thread(stdx::bind(&BackgroundSync::producerThread, _bgSync.get(), this))); } |