diff options
author | Mathias Stearn <mathias@10gen.com> | 2016-08-16 17:30:47 -0400 |
---|---|---|
committer | Mathias Stearn <redbeard0531@gmail.com> | 2016-10-17 14:36:31 -0400 |
commit | e207e1a4809742a5cd0bb456202c82ff82548a44 (patch) | |
tree | 3e0cea4f1c0d9ca25191de97580732dc330e1036 | |
parent | 51db91df77d948ce72cf72c7f07ccbfe3a11071f (diff) | |
download | mongo-e207e1a4809742a5cd0bb456202c82ff82548a44.tar.gz |
SERVER-7200 Limit secondary apply batches to 10% of the oplog size
(cherry picked from commit b06901cd83b2a985aa50f9a699f3d63dcd28476d)
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface.h | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 57 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 24 |
11 files changed, 88 insertions, 53 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index ebc3e3cd2d9..e4b6c1dc770 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -246,7 +246,6 @@ mongodLibDeps = [ "db/mongodandmongos", "db/mongodwebserver", "db/serveronly", - "db/repl/storage_interface_impl", "executor/network_interface_factory", 's/commands/shared_cluster_commands', "util/ntservice", diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index f3012e2a04f..628bb239813 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -615,6 +615,7 @@ serverOnlyFiles = [ "repl/resync.cpp", "repl/rs_initialsync.cpp", "repl/rs_sync.cpp", + "repl/storage_interface_impl.cpp", "repl/sync_source_feedback.cpp", "service_context_d.cpp", "stats/fill_locker_info.cpp", diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 5d2d8f1fe06..2563de2d9ea 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -54,17 +54,6 @@ env.Library('storage_interface', ]) env.Library( - target='storage_interface_impl', - source=[ - 'storage_interface_impl.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/serveronly', # For OperationContextImpl - '$BUILD_DIR/mongo/db/service_context', - 'storage_interface', - ]) - -env.Library( target='replication_executor', source=[ 'replication_executor.cpp', @@ -217,6 +206,7 @@ env.Library( '$BUILD_DIR/mongo/db/curop', '$BUILD_DIR/mongo/util/concurrency/thread_pool', 'repl_coordinator_global', + 'storage_interface', ], LIBDEPS_TAGS=[ # Many undefined symbols in sync_tail.cpp diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index f600d7432d6..b0773769e0e 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -73,7 +73,7 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim OpQueue ops; auto replCoord = repl::ReplicationCoordinator::get(txn); - while (!tryPopAndWaitForMore(txn, &ops)) { + while (!tryPopAndWaitForMore(txn, &ops, BatchLimits{})) { if (inShutdown()) { return; } @@ -95,12 +95,6 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim << " without seeing it. Rollback?"; fassertFailedNoTrace(18693); } - - // apply replication batch limits - if (ops.getSize() > replBatchLimitBytes) - break; - if (ops.getDeque().size() > replBatchLimitOperations) - break; }; if (ops.empty()) { diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 9e5c0e7e344..f86fe64135a 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -50,6 +50,7 @@ #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/rs_initialsync.h" +#include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/sync_tail.h" #include "mongo/db/server_parameters.h" #include "mongo/db/stats/timer_stats.h" @@ -118,7 +119,8 @@ void runSyncThread() { /* we have some data. continue tailing. */ SyncTail tail(BackgroundSync::get(), multiSyncApply); - tail.oplogApplication(); + StorageInterfaceImpl storageInterface; + tail.oplogApplication(&storageInterface); } catch (...) { std::terminate(); } diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 1f9d0576741..eeb3fedecb2 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -29,6 +29,8 @@ #pragma once +#include "mongo/base/status_with.h" +#include "mongo/db/namespace_string.h" namespace mongo { @@ -49,6 +51,15 @@ public: */ virtual OperationContext* createOperationContext() = 0; + /** + * Returns the configured maximum size of the oplog. + * + * Implementations are allowed to be "fuzzy" and delete documents when the actual size is + * slightly above or below this, so callers should not rely on its exact value. + */ + virtual StatusWith<size_t> getOplogMaxSize(OperationContext* txn, + const NamespaceString& nss) = 0; + protected: StorageInterface(); }; diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index a58f85964b4..8a54a5897d3 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -33,7 +33,10 @@ #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/client.h" +#include "mongo/db/db_raii.h" #include "mongo/db/operation_context_impl.h" namespace mongo { @@ -50,5 +53,20 @@ OperationContext* StorageInterfaceImpl::createOperationContext() { return new OperationContextImpl(); } +StatusWith<size_t> StorageInterfaceImpl::getOplogMaxSize(OperationContext* txn, + const NamespaceString& nss) { + AutoGetCollectionForRead collection(txn, nss); + if (!collection.getCollection()) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "Your oplog doesn't exist: " << nss.ns()}; + } + + const auto options = collection.getCollection()->getCatalogEntry()->getCollectionOptions(txn); + if (!options.capped) + return {ErrorCodes::BadValue, str::stream() << nss.ns() << " isn't capped"}; + + return options.cappedSize; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index fa378e537fd..af4ca8029ce 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -43,6 +43,7 @@ public: virtual ~StorageInterfaceImpl(); OperationContext* createOperationContext() override; + StatusWith<size_t> getOplogMaxSize(OperationContext* txn, const NamespaceString& nss) override; }; } // namespace repl diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 8ce76adb642..a26196f667b 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -43,6 +43,10 @@ public: virtual ~StorageInterfaceMock(); OperationContext* createOperationContext() override; + + StatusWith<size_t> getOplogMaxSize(OperationContext* txn, const NamespaceString& nss) override { + return 1024 * 1024 * 1024; + } }; } // namespace repl diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 8b9bc9038bc..7c929b6eae1 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -602,7 +602,8 @@ class SyncTail::OpQueueBatcher { MONGO_DISALLOW_COPYING(OpQueueBatcher); public: - explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {} + explicit OpQueueBatcher(SyncTail* syncTail, StorageInterface* storageInterface) + : _syncTail(syncTail), _storageInterface(storageInterface), _thread([&] { run(); }) {} ~OpQueueBatcher() { _inShutdown.store(true); _cv.notify_all(); @@ -630,26 +631,23 @@ private: OperationContextImpl txn; auto replCoord = ReplicationCoordinator::get(&txn); + const auto oplogMaxSize = fassertStatusOK( + 40301, _storageInterface->getOplogMaxSize(&txn, NamespaceString(rsOplogName))); + + // Batches are limited to 10% of the oplog. + BatchLimits batchLimits; + batchLimits.ops = replBatchLimitOperations; + batchLimits.bytes = std::min(oplogMaxSize / 10, size_t(replBatchLimitBytes)); while (!_inShutdown.load()) { - const auto batchStartTime = Date_t::now(); const auto slaveDelay = replCoord->getSlaveDelaySecs(); - const auto slaveDelayLimit = (slaveDelay > Seconds(0)) ? (batchStartTime - slaveDelay) - : boost::optional<Date_t>(); + batchLimits.slaveDelayLatestTimestamp = (slaveDelay > Seconds(0)) + ? (Date_t::now() - slaveDelay) + : boost::optional<Date_t>(); OpQueue ops; - // tryPopAndWaitForMore returns true when we need to end a batch early - while (!_inShutdown.load()) { - if (_syncTail->tryPopAndWaitForMore(&txn, &ops, slaveDelayLimit)) { - break; // We need to end this batch early, even if there is more room. - } - - if (!ops.empty()) { - if (ops.getSize() >= replBatchLimitBytes) - break; - if (ops.getDeque().size() >= replBatchLimitOperations) - break; - } - // keep fetching more ops as long as we haven't hit any batch-ending conditions + // tryPopAndWaitForMore adds to ops and returns true when we need to end a batch early. + while (!_inShutdown.load() && + !_syncTail->tryPopAndWaitForMore(&txn, &ops, batchLimits)) { } // For pausing replication in tests @@ -675,6 +673,7 @@ private: AtomicWord<bool> _inShutdown; SyncTail* const _syncTail; + StorageInterface* const _storageInterface; stdx::mutex _mutex; // Guards _ops. stdx::condition_variable _cv; @@ -684,8 +683,8 @@ private: }; /* tail an oplog. ok to return, will be re-called. */ -void SyncTail::oplogApplication() { - OpQueueBatcher batcher(this); +void SyncTail::oplogApplication(StorageInterface* storageInterface) { + OpQueueBatcher batcher(this, storageInterface); OperationContextImpl txn; auto replCoord = ReplicationCoordinator::get(&txn); @@ -812,11 +811,11 @@ SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwne // Batch should end early if we encounter a command, or if // there are no further ops in the bgsync queue to read. // This function also blocks 1 second waiting for new ops to appear in the bgsync -// queue. We can't block forever because there are maintenance things we need -// to periodically check in the loop. +// queue. We don't block forever so that we can periodically check for things like shutdown or +// reconfigs. bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* ops, - boost::optional<Date_t> slaveDelayLimit) { + const BatchLimits& limits) { BSONObj op; // Check to see if there are ops waiting in the bgsync queue bool peek_success = peek(&op); @@ -832,6 +831,13 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, return true; } + // If this op would put us over the byte limit don't include it unless the batch is empty. + // We allow single-op batches to exceed the byte limit so that large ops are able to be + // processed. + if (!ops->empty() && (ops->getSize() + size_t(op.objsize())) > limits.bytes) { + return true; // Return before wasting time parsing the op. + } + auto entry = OplogEntry(op); if (!entry.raw.isEmpty()) { @@ -851,7 +857,8 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, } } - if (slaveDelayLimit && entry.ts.timestampTime() > *slaveDelayLimit) { + if (limits.slaveDelayLatestTimestamp && + entry.ts.timestampTime() > *limits.slaveDelayLatestTimestamp) { if (ops->empty()) { // Sleep if we've got nothing to do. Only sleep for 1 second at a time to allow // reconfigs and shutdown to occur. @@ -880,8 +887,8 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, ops->push_back(std::move(entry)); _networkQueue->consume(); - // Go back for more ops - return false; + // Go back for more ops, unless we've hit the limit. + return ops->getDeque().size() >= limits.ops; } void SyncTail::setHostname(const std::string& hostname) { diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 07941c74634..539a00d933d 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -34,6 +34,7 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/repl/minvalid.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/stdx/functional.h" #include "mongo/util/concurrency/old_thread_pool.h" @@ -94,7 +95,7 @@ public: static Status syncApply(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert); - void oplogApplication(); + void oplogApplication(StorageInterface* storageInterface); bool peek(BSONObj* obj); /** @@ -147,17 +148,24 @@ public: size_t _size; }; + struct BatchLimits { + size_t bytes = replBatchLimitBytes; + size_t ops = replBatchLimitOperations; + + // If provided, the batch will not include any operations with timestamps after this point. + // This is intended for implementing slaveDelay, so it should be some number of seconds + // before now. + boost::optional<Date_t> slaveDelayLatestTimestamp = {}; + }; + /** * Attempts to pop an OplogEntry off the BGSync queue and add it to ops. * - * If slaveDelayLimit is provided, only operations with a timestamp <= the provided Date_t will - * be included in the batch. Returns true if the (possibly empty) batch in ops should be ended - * and a new one started. If ops is empty on entry and nothing can be added yet, will wait up to - * a second before returning. + * Returns true if the (possibly empty) batch in ops should be ended and a new one started. + * If ops is empty on entry and nothing can be added yet, will wait up to a second before + * returning true. */ - bool tryPopAndWaitForMore(OperationContext* txn, - OpQueue* ops, - boost::optional<Date_t> slaveDelayLimit = {}); + bool tryPopAndWaitForMore(OperationContext* txn, OpQueue* ops, const BatchLimits& limits); /** * Fetch a single document referenced in the operation from the sync source. |