diff options
author | Geert Bosch <geert@mongodb.com> | 2018-02-08 20:25:27 -0500 |
---|---|---|
committer | Geert Bosch <geert@mongodb.com> | 2018-02-26 11:56:49 -0500 |
commit | f23bcbfa6d08c24b5570b3b29641f96babfc6a34 (patch) | |
tree | 5ae2feeb298f22a9ee86cef57c90327b84c81147 /src/mongo/db | |
parent | 17ae3d8ac9a6aa8d3664cebe791692781f3f9ca4 (diff) | |
download | mongo-f23bcbfa6d08c24b5570b3b29641f96babfc6a34.tar.gz |
SERVER-31679 Limit journaling frequency without oplog waiters
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/catalog/SConscript | 13 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection.h | 6 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_impl.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_mock.h | 4 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_test.cpp | 241 | ||||
-rw-r--r-- | src/mongo/db/storage/capped_callback.h | 5 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h | 1 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h | 2 |
13 files changed, 319 insertions, 4 deletions
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index 1c45edcd9d3..662f7a22612 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -28,6 +28,19 @@ env.Library( env.CppUnitTest('collection_options_test', ['collection_options_test.cpp'], LIBDEPS=['collection_options']) +env.CppUnitTest( + target = 'collection_test', + source = 'collection_test.cpp', + LIBDEPS=[ + 'catalog_helpers', + '$BUILD_DIR/mongo/db/db_raii', + '$BUILD_DIR/mongo/db/namespace_string', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', + '$BUILD_DIR/mongo/db/repl/replmocks', + '$BUILD_DIR/mongo/db/repl/storage_interface_impl', + '$BUILD_DIR/mongo/db/service_context_d_test_fixture', + ], +) env.Library( target='document_validation', diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 1426251fd28..755da32b65e 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -321,6 +321,8 @@ public: virtual void setMinimumVisibleSnapshot(Timestamp name) = 0; + virtual bool haveCappedWaiters() = 0; + virtual void notifyCappedWaitersIfNeeded() = 0; virtual const CollatorInterface* getDefaultCollator() const = 0; @@ -713,6 +715,10 @@ public: return this->_impl().setMinimumVisibleSnapshot(name); } + inline bool haveCappedWaiters() { + return this->_impl().haveCappedWaiters(); + } + /** * Notify (capped collection) waiters of data changes, like an insert. */ diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index ff791409f28..28a16f136db 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -506,11 +506,16 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, return status; } +bool CollectionImpl::haveCappedWaiters() { + // Waiters keep a shared_ptr to '_cappedNotifier', so there are waiters if this CollectionImpl's + // shared_ptr is not unique (use_count > 1). + return _cappedNotifier.use_count() > 1; +} + void CollectionImpl::notifyCappedWaitersIfNeeded() { // If there is a notifier object and another thread is waiting on it, then we notify - // waiters of this document insert. Waiters keep a shared_ptr to '_cappedNotifier', so - // there are waiters if this CollectionImpl's shared_ptr is not unique (use_count > 1). - if (_cappedNotifier && !_cappedNotifier.unique()) + // waiters of this document insert. + if (haveCappedWaiters()) _cappedNotifier->notifyAll(); } diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h index ea74bbf6419..1950e1570b9 100644 --- a/src/mongo/db/catalog/collection_impl.h +++ b/src/mongo/db/catalog/collection_impl.h @@ -359,6 +359,8 @@ public: _minVisibleSnapshot = name; } + bool haveCappedWaiters() final; + /** * Notify (capped collection) waiters of data changes, like an insert. */ diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h index ba7851ebc15..785256358aa 100644 --- a/src/mongo/db/catalog/collection_mock.h +++ b/src/mongo/db/catalog/collection_mock.h @@ -280,6 +280,10 @@ public: std::abort(); } + bool haveCappedWaiters() { + return false; + } + void notifyCappedWaitersIfNeeded() { std::abort(); } diff --git a/src/mongo/db/catalog/collection_test.cpp b/src/mongo/db/catalog/collection_test.cpp new file mode 100644 index 00000000000..0d4a1bc5090 --- /dev/null +++ b/src/mongo/db/catalog/collection_test.cpp @@ -0,0 +1,241 @@ +/** + * Copyright 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/catalog/capped_utils.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/collection_catalog_entry.h" +#include "mongo/db/client.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/stdx/memory.h" +#include "mongo/stdx/thread.h" +#include "mongo/unittest/unittest.h" + +namespace { + +using namespace mongo; + +class CollectionTest : public ServiceContextMongoDTest { +private: + void setUp() override; + void tearDown() override; + +protected: + void makeCapped(NamespaceString nss, long long cappedSize = 8192); + // Use StorageInterface to access storage features below catalog interface. + std::unique_ptr<repl::StorageInterface> _storage; + ServiceContext::UniqueOperationContext _opCtxOwner; + OperationContext* _opCtx = nullptr; +}; + +void CollectionTest::setUp() { + // Set up mongod. + ServiceContextMongoDTest::setUp(); + + auto service = getServiceContext(); + + // Set up ReplicationCoordinator and ensure that we are primary. + auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorMock>(service); + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + repl::ReplicationCoordinator::set(service, std::move(replCoord)); + + _storage = stdx::make_unique<repl::StorageInterfaceImpl>(); + _opCtxOwner = cc().makeOperationContext(); + _opCtx = _opCtxOwner.get(); +} + +void CollectionTest::tearDown() { + _storage = {}; + _opCtxOwner = {}; + + // Tear down mongod. + ServiceContextMongoDTest::tearDown(); +} + +void CollectionTest::makeCapped(NamespaceString nss, long long cappedSize) { + CollectionOptions options; + options.capped = true; + options.cappedSize = cappedSize; // Maximum size of capped collection in bytes. + ASSERT_OK(_storage->createCollection(_opCtx, nss, options)); +} + +TEST_F(CollectionTest, CappedNotifierKillAndIsDead) { + NamespaceString nss("test.t"); + makeCapped(nss); + + AutoGetCollectionForRead acfr(_opCtx, nss); + Collection* col = acfr.getCollection(); + auto notifier = col->getCappedInsertNotifier(); + ASSERT_FALSE(notifier->isDead()); + notifier->kill(); + ASSERT(notifier->isDead()); +} + +TEST_F(CollectionTest, CappedNotifierTimeouts) { + NamespaceString nss("test.t"); + makeCapped(nss); + + AutoGetCollectionForRead acfr(_opCtx, nss); + Collection* col = acfr.getCollection(); + auto notifier = col->getCappedInsertNotifier(); + ASSERT_EQ(notifier->getVersion(), 0u); + + auto before = Date_t::now(); + notifier->waitUntil(0u, before + Milliseconds(25)); + auto after = Date_t::now(); + ASSERT_GTE(after - before, Milliseconds(25)); + ASSERT_EQ(notifier->getVersion(), 0u); +} + +TEST_F(CollectionTest, CappedNotifierWaitAfterNotifyIsImmediate) { + NamespaceString nss("test.t"); + makeCapped(nss); + + AutoGetCollectionForRead acfr(_opCtx, nss); + Collection* col = acfr.getCollection(); + auto notifier = col->getCappedInsertNotifier(); + + auto prevVersion = notifier->getVersion(); + notifier->notifyAll(); + auto thisVersion = prevVersion + 1; + ASSERT_EQ(notifier->getVersion(), thisVersion); + + auto before = Date_t::now(); + notifier->waitUntil(prevVersion, before + Seconds(25)); + auto after = Date_t::now(); + ASSERT_LT(after - before, Seconds(25)); +} + +TEST_F(CollectionTest, CappedNotifierWaitUntilAsynchronousNotifyAll) { + NamespaceString nss("test.t"); + makeCapped(nss); + + AutoGetCollectionForRead acfr(_opCtx, nss); + Collection* col = acfr.getCollection(); + auto notifier = col->getCappedInsertNotifier(); + auto prevVersion = notifier->getVersion(); + auto thisVersion = prevVersion + 1; + + auto before = Date_t::now(); + stdx::thread thread([before, prevVersion, ¬ifier] { + notifier->waitUntil(prevVersion, before + Milliseconds(25)); + auto after = Date_t::now(); + ASSERT_GTE(after - before, Milliseconds(25)); + notifier->notifyAll(); + }); + notifier->waitUntil(prevVersion, before + Seconds(25)); + auto after = Date_t::now(); + ASSERT_LT(after - before, Seconds(25)); + ASSERT_GTE(after - before, Milliseconds(25)); + thread.join(); + ASSERT_EQ(notifier->getVersion(), thisVersion); +} + +TEST_F(CollectionTest, CappedNotifierWaitUntilAsynchronousKill) { + NamespaceString nss("test.t"); + makeCapped(nss); + + AutoGetCollectionForRead acfr(_opCtx, nss); + Collection* col = acfr.getCollection(); + auto notifier = col->getCappedInsertNotifier(); + auto prevVersion = notifier->getVersion(); + + auto before = Date_t::now(); + stdx::thread thread([before, prevVersion, ¬ifier] { + notifier->waitUntil(prevVersion, before + Milliseconds(25)); + auto after = Date_t::now(); + ASSERT_GTE(after - before, Milliseconds(25)); + notifier->kill(); + }); + notifier->waitUntil(prevVersion, before + Seconds(25)); + auto after = Date_t::now(); + ASSERT_LT(after - before, Seconds(25)); + ASSERT_GTE(after - before, Milliseconds(25)); + thread.join(); + ASSERT_EQ(notifier->getVersion(), prevVersion); +} + +TEST_F(CollectionTest, HaveCappedWaiters) { + NamespaceString nss("test.t"); + makeCapped(nss); + + AutoGetCollectionForRead acfr(_opCtx, nss); + Collection* col = acfr.getCollection(); + ASSERT_FALSE(col->haveCappedWaiters()); + { + auto notifier = col->getCappedInsertNotifier(); + ASSERT(col->haveCappedWaiters()); + } + ASSERT_FALSE(col->haveCappedWaiters()); +} + +TEST_F(CollectionTest, NotifyCappedWaitersIfNeeded) { + NamespaceString nss("test.t"); + makeCapped(nss); + + AutoGetCollectionForRead acfr(_opCtx, nss); + Collection* col = acfr.getCollection(); + col->notifyCappedWaitersIfNeeded(); + { + auto notifier = col->getCappedInsertNotifier(); + ASSERT_EQ(notifier->getVersion(), 0u); + col->notifyCappedWaitersIfNeeded(); + ASSERT_EQ(notifier->getVersion(), 1u); + } +} + +TEST_F(CollectionTest, AsynchronouslyNotifyCappedWaitersIfNeeded) { + NamespaceString nss("test.t"); + makeCapped(nss); + + AutoGetCollectionForRead acfr(_opCtx, nss); + Collection* col = acfr.getCollection(); + auto notifier = col->getCappedInsertNotifier(); + auto prevVersion = notifier->getVersion(); + auto thisVersion = prevVersion + 1; + + auto before = Date_t::now(); + notifier->waitUntil(prevVersion, before + Milliseconds(25)); + stdx::thread thread([before, prevVersion, col] { + auto after = Date_t::now(); + ASSERT_GTE(after - before, Milliseconds(25)); + col->notifyCappedWaitersIfNeeded(); + }); + notifier->waitUntil(prevVersion, before + Seconds(25)); + auto after = Date_t::now(); + ASSERT_LT(after - before, Seconds(25)); + ASSERT_GTE(after - before, Milliseconds(25)); + thread.join(); + ASSERT_EQ(notifier->getVersion(), thisVersion); +} +} // namespace diff --git a/src/mongo/db/storage/capped_callback.h b/src/mongo/db/storage/capped_callback.h index dab688ba711..4fcbeecd8e2 100644 --- a/src/mongo/db/storage/capped_callback.h +++ b/src/mongo/db/storage/capped_callback.h @@ -55,6 +55,11 @@ public: RecordData data) = 0; /** + * Returns true if there may be waiters. + */ + virtual bool haveCappedWaiters() = 0; + + /** * Used to notify any waiters when new documents may be visible in the capped collection. */ virtual void notifyCappedWaitersIfNeeded() = 0; diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp index 64c9ad1fbd7..280ad6ccee0 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp @@ -56,6 +56,9 @@ public: return Status::OK(); } + bool haveCappedWaiters() { + return false; + } void notifyCappedWaitersIfNeeded() {} vector<DiskLoc> deleted; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 6aa40aad041..5c99b5de7ab 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -93,6 +93,8 @@ using std::string; namespace dps = ::mongo::dotted_path_support; +const int WiredTigerKVEngine::kDefaultJournalDelayMillis = 100; + class WiredTigerKVEngine::WiredTigerJournalFlusher : public BackgroundJob { public: explicit WiredTigerJournalFlusher(WiredTigerSessionCache* sessionCache) @@ -118,7 +120,7 @@ public: int ms = storageGlobalParams.journalCommitIntervalMs.load(); if (!ms) { - ms = 100; + ms = kDefaultJournalDelayMillis; } MONGO_IDLE_THREAD_BLOCK; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index fba921fea2d..bd4e10a4366 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -56,6 +56,7 @@ class WiredTigerSizeStorer; class WiredTigerKVEngine final : public KVEngine { public: + static const int kDefaultJournalDelayMillis; WiredTigerKVEngine(const std::string& canonicalName, const std::string& path, ClockSource* cs, diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp index 7d7a7900c82..37881acd173 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp @@ -33,6 +33,7 @@ #include <cstring> +#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" #include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/idle_thread_block.h" @@ -163,6 +164,30 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses MONGO_IDLE_THREAD_BLOCK; _opsWaitingForJournalCV.wait(lk, [&] { return _shuttingDown || _opsWaitingForJournal; }); + + // If we're not shutting down and nobody is actively waiting for the oplog to become + // durable, delay journaling a bit to reduce the sync rate. + auto journalDelay = Milliseconds(storageGlobalParams.journalCommitIntervalMs.load()); + if (journalDelay == Milliseconds(0)) { + journalDelay = Milliseconds(WiredTigerKVEngine::kDefaultJournalDelayMillis); + } + auto now = Date_t::now(); + auto deadline = now + journalDelay; + auto shouldSyncOpsWaitingForJournal = [&] { + return _shuttingDown || oplogRecordStore->haveCappedWaiters(); + }; + + // Eventually it would be more optimal to merge this with the normal journal flushing + // and block for oplog tailers to show up. For now this loop will poll once a + // millisecond up to the journalDelay to see if we have any waiters yet. This reduces + // sync-related I/O on the primary when secondaries are lagged, but will avoid + // significant delays in confirming majority writes on replica sets with infrequent + // writes. + while (now < deadline && + !_opsWaitingForJournalCV.wait_until( + lk, now.toSystemTimePoint(), shouldSyncOpsWaitingForJournal)) { + now += Milliseconds(1); + } } while (!_shuttingDown && MONGO_FAIL_POINT(WTPausePrimaryOplogDurabilityLoop)) { @@ -175,6 +200,7 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses log() << "oplog journal thread loop shutting down"; return; } + invariant(_opsWaitingForJournal); _opsWaitingForJournal = false; lk.unlock(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index ec9c63c266a..646a6784a08 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1191,6 +1191,11 @@ bool WiredTigerRecordStore::isOpHidden_forTest(const RecordId& id) const { static_cast<std::uint64_t>(id.repr()); } +bool WiredTigerRecordStore::haveCappedWaiters() { + stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex); + return _cappedCallback && _cappedCallback->haveCappedWaiters(); +} + void WiredTigerRecordStore::notifyCappedWaitersIfNeeded() { stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex); // This wakes up cursors blocking in await_data. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index f02f161e720..3863cd7ae8b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -260,6 +260,8 @@ public: // Returns false if the oplog was dropped while waiting for a deletion request. bool yieldAndAwaitOplogDeletionRequest(OperationContext* opCtx); + bool haveCappedWaiters(); + void notifyCappedWaitersIfNeeded(); class OplogStones; |