summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-06-13 16:58:05 -0400
committerBenety Goh <benety@mongodb.com>2016-06-14 18:33:50 -0400
commit0f744edcde0011533bc2d88b04581199cfee9070 (patch)
tree58145f4fd77e93f5fa3c2efb3ebd1f348b367da6 /src
parentdbbad24bfbc6d391dffe5902977431d90201db3b (diff)
downloadmongo-0f744edcde0011533bc2d88b04581199cfee9070.tar.gz
SERVER-24560 replaced references to BlockingQueue in BackgroundSync with OplogBuffer
BackgroundSync now accepts an OplogBuffer at construction
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/bgsync.cpp45
-rw-r--r--src/mongo/db/repl/bgsync.h6
-rw-r--r--src/mongo/db/repl/oplog_buffer.h50
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.cpp28
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.h7
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.cpp24
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
8 files changed, 146 insertions, 25 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 144f14f5fbe..9aecb8d1d94 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -143,20 +143,25 @@ static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count"
static Counter64 bufferSizeGauge;
static ServerStatusMetricField<Counter64> displayBufferSize("repl.buffer.sizeBytes",
&bufferSizeGauge);
-// The max size (bytes) of the buffer
-static int bufferMaxSizeGauge = 256 * 1024 * 1024;
-static ServerStatusMetricField<int> displayBufferMaxSize("repl.buffer.maxSizeBytes",
- &bufferMaxSizeGauge);
+// The max size (bytes) of the buffer. If the buffer does not have a size constraint, this is
+// set to 0.
+static Counter64 bufferMaxSizeGauge;
+static ServerStatusMetricField<Counter64> displayBufferMaxSize("repl.buffer.maxSizeBytes",
+ &bufferMaxSizeGauge);
-BackgroundSync::BackgroundSync()
- : _buffer(bufferMaxSizeGauge, &getSize),
+BackgroundSync::BackgroundSync(std::unique_ptr<OplogBuffer> oplogBuffer)
+ : _oplogBuffer(std::move(oplogBuffer)),
_threadPoolTaskExecutor(makeThreadPool(),
executor::makeNetworkInterface("NetworkInterfaceASIO-BGSync")),
_replCoord(getGlobalReplicationCoordinator()),
_syncSourceResolver(_replCoord),
_lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0),
- std::numeric_limits<long long>::max()) {}
+ std::numeric_limits<long long>::max()) {
+ // Update "repl.buffer.maxSizeBytes" server status metric to reflect the current oplog buffer's
+ // max size.
+ bufferMaxSizeGauge.increment(_oplogBuffer->getMaxSize() - bufferMaxSizeGauge.get());
+}
void BackgroundSync::shutdown() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -176,10 +181,12 @@ void BackgroundSync::producerThread(
Client::initThread("rsBackgroundSync");
AuthorizationSession::get(cc())->grantInternalAuthorization();
+ _oplogBuffer->startup();
_threadPoolTaskExecutor.startup();
ON_BLOCK_EXIT([this]() {
_threadPoolTaskExecutor.shutdown();
_threadPoolTaskExecutor.join();
+ _oplogBuffer->shutdown();
});
while (!inShutdown()) {
@@ -201,10 +208,10 @@ void BackgroundSync::producerThread(
void BackgroundSync::_signalNoNewDataForApplier() {
// Signal to consumers that we have entered the stopped state
// if the signal isn't already in the queue.
- const boost::optional<BSONObj> lastObjectPushed = _buffer.lastObjectPushed();
+ const boost::optional<BSONObj> lastObjectPushed = _oplogBuffer->lastObjectPushed();
if (!lastObjectPushed || !lastObjectPushed->isEmpty()) {
const BSONObj sentinelDoc;
- _buffer.pushEvenIfFull(sentinelDoc);
+ _oplogBuffer->pushEvenIfFull(sentinelDoc);
bufferCountGauge.increment();
bufferSizeGauge.increment(sentinelDoc.objsize());
}
@@ -486,14 +493,14 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
}
// Wait for enough space.
- _buffer.waitForSpace(info.toApplyDocumentBytes);
+ _oplogBuffer->waitForSpace(info.toApplyDocumentBytes);
OCCASIONALLY {
- LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes";
+ LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize() << " bytes";
}
// Buffer docs for later application.
- _buffer.pushAllNonBlocking(begin, end);
+ _oplogBuffer->pushAllNonBlocking(begin, end);
// Update last fetched info.
{
@@ -507,20 +514,20 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
}
bool BackgroundSync::peek(BSONObj* op) {
- return _buffer.peek(*op);
+ return _oplogBuffer->peek(op);
}
void BackgroundSync::waitForMore() {
BSONObj op;
// Block for one second before timing out.
// Ignore the value of the op we peeked at.
- _buffer.blockingPeek(op, 1);
+ _oplogBuffer->blockingPeek(&op, Seconds(1));
}
void BackgroundSync::consume() {
// this is just to get the op off the queue, it's been peeked at
// and queued for application already
- BSONObj op = _buffer.blockingPop();
+ BSONObj op = _oplogBuffer->blockingPop();
bufferCountGauge.decrement(1);
bufferSizeGauge.decrement(getSize(op));
}
@@ -540,7 +547,7 @@ void BackgroundSync::_rollback(OperationContext* txn,
// Wait until the buffer is empty.
// This is an indication that syncTail has removed the sentinal marker from the buffer
// and reset its local lastAppliedOpTime via the replCoord.
- while (!_buffer.empty()) {
+ while (!_oplogBuffer->isEmpty()) {
sleepmillis(10);
if (inShutdown()) {
return;
@@ -596,7 +603,7 @@ void BackgroundSync::stop() {
}
void BackgroundSync::start(OperationContext* txn) {
- massert(16235, "going to start syncing, but buffer is not empty", _buffer.empty());
+ massert(16235, "going to start syncing, but buffer is not empty", _oplogBuffer->isEmpty());
long long lastFetchedHash = _readLastAppliedHash(txn);
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -615,7 +622,7 @@ bool BackgroundSync::isStopped() const {
}
void BackgroundSync::clearBuffer() {
- _buffer.clear();
+ _oplogBuffer->clear();
const auto count = bufferCountGauge.get();
bufferCountGauge.decrement(count);
const auto size = bufferSizeGauge.get();
@@ -684,7 +691,7 @@ bool BackgroundSync::shouldStopFetching() const {
}
void BackgroundSync::pushTestOpToBuffer(const BSONObj& op) {
- _buffer.push(op);
+ _oplogBuffer->push(op);
bufferCountGauge.increment();
bufferSizeGauge.increment(op.objsize());
}
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 14d1432aa31..817746350d8 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -34,6 +34,7 @@
#include "mongo/base/status_with.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/data_replicator_external_state.h"
+#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/sync_source_resolver.h"
@@ -42,7 +43,6 @@
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
-#include "mongo/util/queue.h"
namespace mongo {
@@ -62,7 +62,7 @@ class ReplicationCoordinatorExternalState;
*/
class BackgroundSync {
public:
- BackgroundSync();
+ BackgroundSync(std::unique_ptr<OplogBuffer> oplogBuffer);
MONGO_DISALLOW_COPYING(BackgroundSync);
// stop syncing (when this node becomes a primary, e.g.)
@@ -156,7 +156,7 @@ private:
long long _readLastAppliedHash(OperationContext* txn);
// Production thread
- BlockingQueue<BSONObj> _buffer;
+ std::unique_ptr<OplogBuffer> _oplogBuffer;
// Task executor used to run find/getMore commands on sync source.
executor::ThreadPoolTaskExecutor _threadPoolTaskExecutor;
diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h
index 43ec18ce077..3c172cf650e 100644
--- a/src/mongo/db/repl/oplog_buffer.h
+++ b/src/mongo/db/repl/oplog_buffer.h
@@ -28,11 +28,13 @@
#pragma once
+#include <boost/optional.hpp>
#include <cstddef>
#include <vector>
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/util/time_support.h"
namespace mongo {
namespace repl {
@@ -77,6 +79,21 @@ public:
virtual void shutdown() = 0;
/**
+ * Pushes operation into oplog buffer, ignoring any size constraints. Does not block.
+ * If the oplog buffer is already full, this will cause the size of the oplog buffer to exceed
+ * the limit returned by getMaxSize() but should not otherwise adversely affect normal
+ * functionality such as pushing and popping operations from the oplog buffer.
+ */
+ virtual void pushEvenIfFull(const Value& value) = 0;
+
+ /**
+ * Pushes operation into oplog buffer.
+ * If there are size constraints on the oplog buffer, this may block until sufficient space
+ * is made available (by popping) to complete this operation.
+ */
+ virtual void push(const Value& value) = 0;
+
+ /**
* Pushes operations in the iterator range [begin, end) into the oplog buffer without blocking.
*
* Returns false if there is insufficient space to complete this operation successfully.
@@ -89,7 +106,20 @@ public:
virtual void waitForSpace(std::size_t size) = 0;
/**
- * Total size of all oplog entries in this oplog buffer as measured by the BSONObj::size()
+ * Returns true if oplog buffer is empty.
+ */
+ virtual bool isEmpty() const = 0;
+
+ /**
+ * Maximum size of all oplog entries that can be stored in this oplog buffer as measured by the
+ * BSONObj::objsize() function.
+ *
+ * Returns 0 if this oplog buffer has no size constraints.
+ */
+ virtual std::size_t getMaxSize() const = 0;
+
+ /**
+ * Total size of all oplog entries in this oplog buffer as measured by the BSONObj::objsize()
* function.
*/
virtual std::size_t getSize() const = 0;
@@ -111,10 +141,28 @@ public:
virtual bool tryPop(Value* value) = 0;
/**
+ * Pops the last operation in the oplog buffer.
+ * If the oplog buffer is empty, waits until an operation is pushed.
+ */
+ virtual Value blockingPop() = 0;
+
+ /**
+ * Waits "waitDuration" for an operation to be pushed into the oplog buffer.
+ * Returns false if oplog buffer is still empty after "waitDuration".
+ * Otherwise, returns true and sets "value" to last item in oplog buffer.
+ */
+ virtual bool blockingPeek(Value* value, Seconds waitDuration) = 0;
+
+ /**
* Returns false if oplog buffer is empty.
* Otherwise, returns true and sets "value" to last item in oplog buffer.
*/
virtual bool peek(Value* value) = 0;
+
+ /**
+ * Returns the item most recently added to the oplog buffer or nothing if the buffer is empty.
+ */
+ virtual boost::optional<Value> lastObjectPushed() const = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
index ffa01bfd2c4..ff575cae145 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
@@ -53,6 +53,14 @@ void OplogBufferBlockingQueue::shutdown() {
clear();
}
+void OplogBufferBlockingQueue::pushEvenIfFull(const Value& value) {
+ _queue.pushEvenIfFull(value);
+}
+
+void OplogBufferBlockingQueue::push(const Value& value) {
+ _queue.push(value);
+}
+
bool OplogBufferBlockingQueue::pushAllNonBlocking(Batch::const_iterator begin,
Batch::const_iterator end) {
_queue.pushAllNonBlocking(begin, end);
@@ -63,6 +71,14 @@ void OplogBufferBlockingQueue::waitForSpace(std::size_t size) {
_queue.waitForSpace(size);
}
+bool OplogBufferBlockingQueue::isEmpty() const {
+ return _queue.empty();
+}
+
+std::size_t OplogBufferBlockingQueue::getMaxSize() const {
+ return kOplogBufferSize;
+}
+
std::size_t OplogBufferBlockingQueue::getSize() const {
return _queue.size();
}
@@ -79,9 +95,21 @@ bool OplogBufferBlockingQueue::tryPop(Value* value) {
return _queue.tryPop(*value);
}
+OplogBuffer::Value OplogBufferBlockingQueue::blockingPop() {
+ return _queue.blockingPop();
+}
+
+bool OplogBufferBlockingQueue::blockingPeek(Value* value, Seconds waitDuration) {
+ return _queue.blockingPeek(*value, static_cast<int>(durationCount<Seconds>(waitDuration)));
+}
+
bool OplogBufferBlockingQueue::peek(Value* value) {
return _queue.peek(*value);
}
+boost::optional<OplogBuffer::Value> OplogBufferBlockingQueue::lastObjectPushed() const {
+ return _queue.lastObjectPushed();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
index 5baf78fdf03..0c40c8f1256 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
@@ -43,13 +43,20 @@ public:
void startup() override;
void shutdown() override;
+ void pushEvenIfFull(const Value& value) override;
+ void push(const Value& value) override;
bool pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) override;
void waitForSpace(std::size_t size) override;
+ bool isEmpty() const override;
+ std::size_t getMaxSize() const override;
std::size_t getSize() const override;
std::size_t getCount() const override;
void clear() override;
bool tryPop(Value* value) override;
+ Value blockingPop() override;
+ bool blockingPeek(Value* value, Seconds waitDuration) override;
bool peek(Value* value) override;
+ boost::optional<Value> lastObjectPushed() const override;
private:
BlockingQueue<BSONObj> _queue;
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp
index 3c060e77443..7af1057e022 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection.cpp
@@ -86,6 +86,10 @@ void OplogBufferCollection::startup() {
void OplogBufferCollection::shutdown() {}
+void OplogBufferCollection::pushEvenIfFull(const Value& value) {}
+
+void OplogBufferCollection::push(const Value& value) {}
+
bool OplogBufferCollection::pushAllNonBlocking(Batch::const_iterator begin,
Batch::const_iterator end) {
return false;
@@ -93,6 +97,14 @@ bool OplogBufferCollection::pushAllNonBlocking(Batch::const_iterator begin,
void OplogBufferCollection::waitForSpace(std::size_t size) {}
+bool OplogBufferCollection::isEmpty() const {
+ return true;
+}
+
+std::size_t OplogBufferCollection::getMaxSize() const {
+ return 0;
+}
+
std::size_t OplogBufferCollection::getSize() const {
return 0;
}
@@ -107,9 +119,21 @@ bool OplogBufferCollection::tryPop(Value* value) {
return false;
}
+OplogBuffer::Value OplogBufferCollection::blockingPop() {
+ return {};
+}
+
+bool OplogBufferCollection::blockingPeek(Value* value, Seconds waitDuration) {
+ return false;
+}
+
bool OplogBufferCollection::peek(Value* value) {
return false;
}
+boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed() const {
+ return {};
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h
index 99494f961be..c422c2f2c1c 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.h
+++ b/src/mongo/db/repl/oplog_buffer_collection.h
@@ -56,13 +56,20 @@ public:
void startup() override;
void shutdown() override;
+ void pushEvenIfFull(const Value& value) override;
+ void push(const Value& value) override;
bool pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) override;
void waitForSpace(std::size_t size) override;
+ bool isEmpty() const override;
+ std::size_t getMaxSize() const override;
std::size_t getSize() const override;
std::size_t getCount() const override;
void clear() override;
bool tryPop(Value* value) override;
+ Value blockingPop() override;
+ bool blockingPeek(Value* value, Seconds waitDuration) override;
bool peek(Value* value) override;
+ boost::optional<Value> lastObjectPushed() const override;
private:
const NamespaceString _nss;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 51856de7e24..a42aa7ed2e4 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -134,7 +134,7 @@ void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFini
log() << "Starting replication fetcher thread";
// Start bgsync.
- _bgSync.reset(new BackgroundSync());
+ _bgSync.reset(new BackgroundSync(makeSteadyStateOplogBuffer()));
invariant(!_producerThread); // The producer thread should not be init'd before this.
_producerThread.reset(
new stdx::thread(stdx::bind(&BackgroundSync::producerThread, _bgSync.get(), this)));
@@ -147,7 +147,7 @@ void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFini
void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication() {
if (!_producerThread) {
log() << "Starting replication fetcher thread";
- _bgSync.reset(new BackgroundSync());
+ _bgSync.reset(new BackgroundSync(makeSteadyStateOplogBuffer()));
_producerThread.reset(
new stdx::thread(stdx::bind(&BackgroundSync::producerThread, _bgSync.get(), this)));
}