summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-08-16 17:30:47 -0400
committerMathias Stearn <redbeard0531@gmail.com>2016-10-17 14:36:31 -0400
commite207e1a4809742a5cd0bb456202c82ff82548a44 (patch)
tree3e0cea4f1c0d9ca25191de97580732dc330e1036
parent51db91df77d948ce72cf72c7f07ccbfe3a11071f (diff)
downloadmongo-e207e1a4809742a5cd0bb456202c82ff82548a44.tar.gz
SERVER-7200 Limit secondary apply batches to 10% of the oplog size
(cherry picked from commit b06901cd83b2a985aa50f9a699f3d63dcd28476d)
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/repl/SConscript12
-rw-r--r--src/mongo/db/repl/initial_sync.cpp8
-rw-r--r--src/mongo/db/repl/rs_sync.cpp4
-rw-r--r--src/mongo/db/repl/storage_interface.h11
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp18
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h1
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h4
-rw-r--r--src/mongo/db/repl/sync_tail.cpp57
-rw-r--r--src/mongo/db/repl/sync_tail.h24
11 files changed, 88 insertions, 53 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index ebc3e3cd2d9..e4b6c1dc770 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -246,7 +246,6 @@ mongodLibDeps = [
"db/mongodandmongos",
"db/mongodwebserver",
"db/serveronly",
- "db/repl/storage_interface_impl",
"executor/network_interface_factory",
's/commands/shared_cluster_commands',
"util/ntservice",
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index f3012e2a04f..628bb239813 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -615,6 +615,7 @@ serverOnlyFiles = [
"repl/resync.cpp",
"repl/rs_initialsync.cpp",
"repl/rs_sync.cpp",
+ "repl/storage_interface_impl.cpp",
"repl/sync_source_feedback.cpp",
"service_context_d.cpp",
"stats/fill_locker_info.cpp",
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 5d2d8f1fe06..2563de2d9ea 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -54,17 +54,6 @@ env.Library('storage_interface',
])
env.Library(
- target='storage_interface_impl',
- source=[
- 'storage_interface_impl.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/serveronly', # For OperationContextImpl
- '$BUILD_DIR/mongo/db/service_context',
- 'storage_interface',
- ])
-
-env.Library(
target='replication_executor',
source=[
'replication_executor.cpp',
@@ -217,6 +206,7 @@ env.Library(
'$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/util/concurrency/thread_pool',
'repl_coordinator_global',
+ 'storage_interface',
],
LIBDEPS_TAGS=[
# Many undefined symbols in sync_tail.cpp
diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp
index f600d7432d6..b0773769e0e 100644
--- a/src/mongo/db/repl/initial_sync.cpp
+++ b/src/mongo/db/repl/initial_sync.cpp
@@ -73,7 +73,7 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim
OpQueue ops;
auto replCoord = repl::ReplicationCoordinator::get(txn);
- while (!tryPopAndWaitForMore(txn, &ops)) {
+ while (!tryPopAndWaitForMore(txn, &ops, BatchLimits{})) {
if (inShutdown()) {
return;
}
@@ -95,12 +95,6 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim
<< " without seeing it. Rollback?";
fassertFailedNoTrace(18693);
}
-
- // apply replication batch limits
- if (ops.getSize() > replBatchLimitBytes)
- break;
- if (ops.getDeque().size() > replBatchLimitOperations)
- break;
};
if (ops.empty()) {
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index 9e5c0e7e344..f86fe64135a 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/rs_initialsync.h"
+#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/sync_tail.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/stats/timer_stats.h"
@@ -118,7 +119,8 @@ void runSyncThread() {
/* we have some data. continue tailing. */
SyncTail tail(BackgroundSync::get(), multiSyncApply);
- tail.oplogApplication();
+ StorageInterfaceImpl storageInterface;
+ tail.oplogApplication(&storageInterface);
} catch (...) {
std::terminate();
}
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 1f9d0576741..eeb3fedecb2 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -29,6 +29,8 @@
#pragma once
+#include "mongo/base/status_with.h"
+#include "mongo/db/namespace_string.h"
namespace mongo {
@@ -49,6 +51,15 @@ public:
*/
virtual OperationContext* createOperationContext() = 0;
+ /**
+ * Returns the configured maximum size of the oplog.
+ *
+ * Implementations are allowed to be "fuzzy" and delete documents when the actual size is
+ * slightly above or below this, so callers should not rely on its exact value.
+ */
+ virtual StatusWith<size_t> getOplogMaxSize(OperationContext* txn,
+ const NamespaceString& nss) = 0;
+
protected:
StorageInterface();
};
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index a58f85964b4..8a54a5897d3 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -33,7 +33,10 @@
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/client.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/operation_context_impl.h"
namespace mongo {
@@ -50,5 +53,20 @@ OperationContext* StorageInterfaceImpl::createOperationContext() {
return new OperationContextImpl();
}
+StatusWith<size_t> StorageInterfaceImpl::getOplogMaxSize(OperationContext* txn,
+ const NamespaceString& nss) {
+ AutoGetCollectionForRead collection(txn, nss);
+ if (!collection.getCollection()) {
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "Your oplog doesn't exist: " << nss.ns()};
+ }
+
+ const auto options = collection.getCollection()->getCatalogEntry()->getCollectionOptions(txn);
+ if (!options.capped)
+ return {ErrorCodes::BadValue, str::stream() << nss.ns() << " isn't capped"};
+
+ return options.cappedSize;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index fa378e537fd..af4ca8029ce 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -43,6 +43,7 @@ public:
virtual ~StorageInterfaceImpl();
OperationContext* createOperationContext() override;
+ StatusWith<size_t> getOplogMaxSize(OperationContext* txn, const NamespaceString& nss) override;
};
} // namespace repl
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 8ce76adb642..a26196f667b 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -43,6 +43,10 @@ public:
virtual ~StorageInterfaceMock();
OperationContext* createOperationContext() override;
+
+ StatusWith<size_t> getOplogMaxSize(OperationContext* txn, const NamespaceString& nss) override {
+ return 1024 * 1024 * 1024;
+ }
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 8b9bc9038bc..7c929b6eae1 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -602,7 +602,8 @@ class SyncTail::OpQueueBatcher {
MONGO_DISALLOW_COPYING(OpQueueBatcher);
public:
- explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {}
+ explicit OpQueueBatcher(SyncTail* syncTail, StorageInterface* storageInterface)
+ : _syncTail(syncTail), _storageInterface(storageInterface), _thread([&] { run(); }) {}
~OpQueueBatcher() {
_inShutdown.store(true);
_cv.notify_all();
@@ -630,26 +631,23 @@ private:
OperationContextImpl txn;
auto replCoord = ReplicationCoordinator::get(&txn);
+ const auto oplogMaxSize = fassertStatusOK(
+ 40301, _storageInterface->getOplogMaxSize(&txn, NamespaceString(rsOplogName)));
+
+ // Batches are limited to 10% of the oplog.
+ BatchLimits batchLimits;
+ batchLimits.ops = replBatchLimitOperations;
+ batchLimits.bytes = std::min(oplogMaxSize / 10, size_t(replBatchLimitBytes));
while (!_inShutdown.load()) {
- const auto batchStartTime = Date_t::now();
const auto slaveDelay = replCoord->getSlaveDelaySecs();
- const auto slaveDelayLimit = (slaveDelay > Seconds(0)) ? (batchStartTime - slaveDelay)
- : boost::optional<Date_t>();
+ batchLimits.slaveDelayLatestTimestamp = (slaveDelay > Seconds(0))
+ ? (Date_t::now() - slaveDelay)
+ : boost::optional<Date_t>();
OpQueue ops;
- // tryPopAndWaitForMore returns true when we need to end a batch early
- while (!_inShutdown.load()) {
- if (_syncTail->tryPopAndWaitForMore(&txn, &ops, slaveDelayLimit)) {
- break; // We need to end this batch early, even if there is more room.
- }
-
- if (!ops.empty()) {
- if (ops.getSize() >= replBatchLimitBytes)
- break;
- if (ops.getDeque().size() >= replBatchLimitOperations)
- break;
- }
- // keep fetching more ops as long as we haven't hit any batch-ending conditions
+ // tryPopAndWaitForMore adds to ops and returns true when we need to end a batch early.
+ while (!_inShutdown.load() &&
+ !_syncTail->tryPopAndWaitForMore(&txn, &ops, batchLimits)) {
}
// For pausing replication in tests
@@ -675,6 +673,7 @@ private:
AtomicWord<bool> _inShutdown;
SyncTail* const _syncTail;
+ StorageInterface* const _storageInterface;
stdx::mutex _mutex; // Guards _ops.
stdx::condition_variable _cv;
@@ -684,8 +683,8 @@ private:
};
/* tail an oplog. ok to return, will be re-called. */
-void SyncTail::oplogApplication() {
- OpQueueBatcher batcher(this);
+void SyncTail::oplogApplication(StorageInterface* storageInterface) {
+ OpQueueBatcher batcher(this, storageInterface);
OperationContextImpl txn;
auto replCoord = ReplicationCoordinator::get(&txn);
@@ -812,11 +811,11 @@ SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwne
// Batch should end early if we encounter a command, or if
// there are no further ops in the bgsync queue to read.
// This function also blocks 1 second waiting for new ops to appear in the bgsync
-// queue. We can't block forever because there are maintenance things we need
-// to periodically check in the loop.
+// queue. We don't block forever so that we can periodically check for things like shutdown or
+// reconfigs.
bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
SyncTail::OpQueue* ops,
- boost::optional<Date_t> slaveDelayLimit) {
+ const BatchLimits& limits) {
BSONObj op;
// Check to see if there are ops waiting in the bgsync queue
bool peek_success = peek(&op);
@@ -832,6 +831,13 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
return true;
}
+ // If this op would put us over the byte limit don't include it unless the batch is empty.
+ // We allow single-op batches to exceed the byte limit so that large ops are able to be
+ // processed.
+ if (!ops->empty() && (ops->getSize() + size_t(op.objsize())) > limits.bytes) {
+ return true; // Return before wasting time parsing the op.
+ }
+
auto entry = OplogEntry(op);
if (!entry.raw.isEmpty()) {
@@ -851,7 +857,8 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
}
}
- if (slaveDelayLimit && entry.ts.timestampTime() > *slaveDelayLimit) {
+ if (limits.slaveDelayLatestTimestamp &&
+ entry.ts.timestampTime() > *limits.slaveDelayLatestTimestamp) {
if (ops->empty()) {
// Sleep if we've got nothing to do. Only sleep for 1 second at a time to allow
// reconfigs and shutdown to occur.
@@ -880,8 +887,8 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
ops->push_back(std::move(entry));
_networkQueue->consume();
- // Go back for more ops
- return false;
+ // Go back for more ops, unless we've hit the limit.
+ return ops->getDeque().size() >= limits.ops;
}
void SyncTail::setHostname(const std::string& hostname) {
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 07941c74634..539a00d933d 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -34,6 +34,7 @@
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/repl/minvalid.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/storage/mmap_v1/dur.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/concurrency/old_thread_pool.h"
@@ -94,7 +95,7 @@ public:
static Status syncApply(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert);
- void oplogApplication();
+ void oplogApplication(StorageInterface* storageInterface);
bool peek(BSONObj* obj);
/**
@@ -147,17 +148,24 @@ public:
size_t _size;
};
+ struct BatchLimits {
+ size_t bytes = replBatchLimitBytes;
+ size_t ops = replBatchLimitOperations;
+
+ // If provided, the batch will not include any operations with timestamps after this point.
+ // This is intended for implementing slaveDelay, so it should be some number of seconds
+ // before now.
+ boost::optional<Date_t> slaveDelayLatestTimestamp = {};
+ };
+
/**
* Attempts to pop an OplogEntry off the BGSync queue and add it to ops.
*
- * If slaveDelayLimit is provided, only operations with a timestamp <= the provided Date_t will
- * be included in the batch. Returns true if the (possibly empty) batch in ops should be ended
- * and a new one started. If ops is empty on entry and nothing can be added yet, will wait up to
- * a second before returning.
+ * Returns true if the (possibly empty) batch in ops should be ended and a new one started.
+ * If ops is empty on entry and nothing can be added yet, will wait up to a second before
+ * returning true.
*/
- bool tryPopAndWaitForMore(OperationContext* txn,
- OpQueue* ops,
- boost::optional<Date_t> slaveDelayLimit = {});
+ bool tryPopAndWaitForMore(OperationContext* txn, OpQueue* ops, const BatchLimits& limits);
/**
* Fetch a single document referenced in the operation from the sync source.