summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeert Bosch <geert@mongodb.com>2018-02-08 20:25:27 -0500
committerGeert Bosch <geert@mongodb.com>2018-02-26 11:56:49 -0500
commitf23bcbfa6d08c24b5570b3b29641f96babfc6a34 (patch)
tree5ae2feeb298f22a9ee86cef57c90327b84c81147
parent17ae3d8ac9a6aa8d3664cebe791692781f3f9ca4 (diff)
downloadmongo-f23bcbfa6d08c24b5570b3b29641f96babfc6a34.tar.gz
SERVER-31679 Limit journaling frequency without oplog waiters
-rw-r--r--src/mongo/db/catalog/SConscript13
-rw-r--r--src/mongo/db/catalog/collection.h6
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp11
-rw-r--r--src/mongo/db/catalog/collection_impl.h2
-rw-r--r--src/mongo/db/catalog/collection_mock.h4
-rw-r--r--src/mongo/db/catalog/collection_test.cpp241
-rw-r--r--src/mongo/db/storage/capped_callback.h5
-rw-r--r--src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp3
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp4
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h1
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp26
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h2
13 files changed, 319 insertions, 4 deletions
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 1c45edcd9d3..662f7a22612 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -28,6 +28,19 @@ env.Library(
env.CppUnitTest('collection_options_test', ['collection_options_test.cpp'],
LIBDEPS=['collection_options'])
+env.CppUnitTest(
+ target = 'collection_test',
+ source = 'collection_test.cpp',
+ LIBDEPS=[
+ 'catalog_helpers',
+ '$BUILD_DIR/mongo/db/db_raii',
+ '$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
+ '$BUILD_DIR/mongo/db/repl/replmocks',
+ '$BUILD_DIR/mongo/db/repl/storage_interface_impl',
+ '$BUILD_DIR/mongo/db/service_context_d_test_fixture',
+ ],
+)
env.Library(
target='document_validation',
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index 1426251fd28..755da32b65e 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -321,6 +321,8 @@ public:
virtual void setMinimumVisibleSnapshot(Timestamp name) = 0;
+ virtual bool haveCappedWaiters() = 0;
+
virtual void notifyCappedWaitersIfNeeded() = 0;
virtual const CollatorInterface* getDefaultCollator() const = 0;
@@ -713,6 +715,10 @@ public:
return this->_impl().setMinimumVisibleSnapshot(name);
}
+ inline bool haveCappedWaiters() {
+ return this->_impl().haveCappedWaiters();
+ }
+
/**
* Notify (capped collection) waiters of data changes, like an insert.
*/
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index ff791409f28..28a16f136db 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -506,11 +506,16 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx,
return status;
}
+bool CollectionImpl::haveCappedWaiters() {
+ // Waiters keep a shared_ptr to '_cappedNotifier', so there are waiters if this CollectionImpl's
+ // shared_ptr is not unique (use_count > 1).
+ return _cappedNotifier.use_count() > 1;
+}
+
void CollectionImpl::notifyCappedWaitersIfNeeded() {
// If there is a notifier object and another thread is waiting on it, then we notify
- // waiters of this document insert. Waiters keep a shared_ptr to '_cappedNotifier', so
- // there are waiters if this CollectionImpl's shared_ptr is not unique (use_count > 1).
- if (_cappedNotifier && !_cappedNotifier.unique())
+ // waiters of this document insert.
+ if (haveCappedWaiters())
_cappedNotifier->notifyAll();
}
diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h
index ea74bbf6419..1950e1570b9 100644
--- a/src/mongo/db/catalog/collection_impl.h
+++ b/src/mongo/db/catalog/collection_impl.h
@@ -359,6 +359,8 @@ public:
_minVisibleSnapshot = name;
}
+ bool haveCappedWaiters() final;
+
/**
* Notify (capped collection) waiters of data changes, like an insert.
*/
diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h
index ba7851ebc15..785256358aa 100644
--- a/src/mongo/db/catalog/collection_mock.h
+++ b/src/mongo/db/catalog/collection_mock.h
@@ -280,6 +280,10 @@ public:
std::abort();
}
+ bool haveCappedWaiters() {
+ return false;
+ }
+
void notifyCappedWaitersIfNeeded() {
std::abort();
}
diff --git a/src/mongo/db/catalog/collection_test.cpp b/src/mongo/db/catalog/collection_test.cpp
new file mode 100644
index 00000000000..0d4a1bc5090
--- /dev/null
+++ b/src/mongo/db/catalog/collection_test.cpp
@@ -0,0 +1,241 @@
+/**
+ * Copyright 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/catalog/capped_utils.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/collection_catalog_entry.h"
+#include "mongo/db/client.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/unittest.h"
+
+namespace {
+
+using namespace mongo;
+
+class CollectionTest : public ServiceContextMongoDTest {
+private:
+ void setUp() override;
+ void tearDown() override;
+
+protected:
+ void makeCapped(NamespaceString nss, long long cappedSize = 8192);
+ // Use StorageInterface to access storage features below catalog interface.
+ std::unique_ptr<repl::StorageInterface> _storage;
+ ServiceContext::UniqueOperationContext _opCtxOwner;
+ OperationContext* _opCtx = nullptr;
+};
+
+void CollectionTest::setUp() {
+ // Set up mongod.
+ ServiceContextMongoDTest::setUp();
+
+ auto service = getServiceContext();
+
+ // Set up ReplicationCoordinator and ensure that we are primary.
+ auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorMock>(service);
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ repl::ReplicationCoordinator::set(service, std::move(replCoord));
+
+ _storage = stdx::make_unique<repl::StorageInterfaceImpl>();
+ _opCtxOwner = cc().makeOperationContext();
+ _opCtx = _opCtxOwner.get();
+}
+
+void CollectionTest::tearDown() {
+ _storage = {};
+ _opCtxOwner = {};
+
+ // Tear down mongod.
+ ServiceContextMongoDTest::tearDown();
+}
+
+void CollectionTest::makeCapped(NamespaceString nss, long long cappedSize) {
+ CollectionOptions options;
+ options.capped = true;
+ options.cappedSize = cappedSize; // Maximum size of capped collection in bytes.
+ ASSERT_OK(_storage->createCollection(_opCtx, nss, options));
+}
+
+TEST_F(CollectionTest, CappedNotifierKillAndIsDead) {
+ NamespaceString nss("test.t");
+ makeCapped(nss);
+
+ AutoGetCollectionForRead acfr(_opCtx, nss);
+ Collection* col = acfr.getCollection();
+ auto notifier = col->getCappedInsertNotifier();
+ ASSERT_FALSE(notifier->isDead());
+ notifier->kill();
+ ASSERT(notifier->isDead());
+}
+
+TEST_F(CollectionTest, CappedNotifierTimeouts) {
+ NamespaceString nss("test.t");
+ makeCapped(nss);
+
+ AutoGetCollectionForRead acfr(_opCtx, nss);
+ Collection* col = acfr.getCollection();
+ auto notifier = col->getCappedInsertNotifier();
+ ASSERT_EQ(notifier->getVersion(), 0u);
+
+ auto before = Date_t::now();
+ notifier->waitUntil(0u, before + Milliseconds(25));
+ auto after = Date_t::now();
+ ASSERT_GTE(after - before, Milliseconds(25));
+ ASSERT_EQ(notifier->getVersion(), 0u);
+}
+
+TEST_F(CollectionTest, CappedNotifierWaitAfterNotifyIsImmediate) {
+ NamespaceString nss("test.t");
+ makeCapped(nss);
+
+ AutoGetCollectionForRead acfr(_opCtx, nss);
+ Collection* col = acfr.getCollection();
+ auto notifier = col->getCappedInsertNotifier();
+
+ auto prevVersion = notifier->getVersion();
+ notifier->notifyAll();
+ auto thisVersion = prevVersion + 1;
+ ASSERT_EQ(notifier->getVersion(), thisVersion);
+
+ auto before = Date_t::now();
+ notifier->waitUntil(prevVersion, before + Seconds(25));
+ auto after = Date_t::now();
+ ASSERT_LT(after - before, Seconds(25));
+}
+
+TEST_F(CollectionTest, CappedNotifierWaitUntilAsynchronousNotifyAll) {
+ NamespaceString nss("test.t");
+ makeCapped(nss);
+
+ AutoGetCollectionForRead acfr(_opCtx, nss);
+ Collection* col = acfr.getCollection();
+ auto notifier = col->getCappedInsertNotifier();
+ auto prevVersion = notifier->getVersion();
+ auto thisVersion = prevVersion + 1;
+
+ auto before = Date_t::now();
+ stdx::thread thread([before, prevVersion, &notifier] {
+ notifier->waitUntil(prevVersion, before + Milliseconds(25));
+ auto after = Date_t::now();
+ ASSERT_GTE(after - before, Milliseconds(25));
+ notifier->notifyAll();
+ });
+ notifier->waitUntil(prevVersion, before + Seconds(25));
+ auto after = Date_t::now();
+ ASSERT_LT(after - before, Seconds(25));
+ ASSERT_GTE(after - before, Milliseconds(25));
+ thread.join();
+ ASSERT_EQ(notifier->getVersion(), thisVersion);
+}
+
+TEST_F(CollectionTest, CappedNotifierWaitUntilAsynchronousKill) {
+ NamespaceString nss("test.t");
+ makeCapped(nss);
+
+ AutoGetCollectionForRead acfr(_opCtx, nss);
+ Collection* col = acfr.getCollection();
+ auto notifier = col->getCappedInsertNotifier();
+ auto prevVersion = notifier->getVersion();
+
+ auto before = Date_t::now();
+ stdx::thread thread([before, prevVersion, &notifier] {
+ notifier->waitUntil(prevVersion, before + Milliseconds(25));
+ auto after = Date_t::now();
+ ASSERT_GTE(after - before, Milliseconds(25));
+ notifier->kill();
+ });
+ notifier->waitUntil(prevVersion, before + Seconds(25));
+ auto after = Date_t::now();
+ ASSERT_LT(after - before, Seconds(25));
+ ASSERT_GTE(after - before, Milliseconds(25));
+ thread.join();
+ ASSERT_EQ(notifier->getVersion(), prevVersion);
+}
+
+TEST_F(CollectionTest, HaveCappedWaiters) {
+ NamespaceString nss("test.t");
+ makeCapped(nss);
+
+ AutoGetCollectionForRead acfr(_opCtx, nss);
+ Collection* col = acfr.getCollection();
+ ASSERT_FALSE(col->haveCappedWaiters());
+ {
+ auto notifier = col->getCappedInsertNotifier();
+ ASSERT(col->haveCappedWaiters());
+ }
+ ASSERT_FALSE(col->haveCappedWaiters());
+}
+
+TEST_F(CollectionTest, NotifyCappedWaitersIfNeeded) {
+ NamespaceString nss("test.t");
+ makeCapped(nss);
+
+ AutoGetCollectionForRead acfr(_opCtx, nss);
+ Collection* col = acfr.getCollection();
+ col->notifyCappedWaitersIfNeeded();
+ {
+ auto notifier = col->getCappedInsertNotifier();
+ ASSERT_EQ(notifier->getVersion(), 0u);
+ col->notifyCappedWaitersIfNeeded();
+ ASSERT_EQ(notifier->getVersion(), 1u);
+ }
+}
+
+TEST_F(CollectionTest, AsynchronouslyNotifyCappedWaitersIfNeeded) {
+ NamespaceString nss("test.t");
+ makeCapped(nss);
+
+ AutoGetCollectionForRead acfr(_opCtx, nss);
+ Collection* col = acfr.getCollection();
+ auto notifier = col->getCappedInsertNotifier();
+ auto prevVersion = notifier->getVersion();
+ auto thisVersion = prevVersion + 1;
+
+ auto before = Date_t::now();
+ notifier->waitUntil(prevVersion, before + Milliseconds(25));
+ stdx::thread thread([before, prevVersion, col] {
+ auto after = Date_t::now();
+ ASSERT_GTE(after - before, Milliseconds(25));
+ col->notifyCappedWaitersIfNeeded();
+ });
+ notifier->waitUntil(prevVersion, before + Seconds(25));
+ auto after = Date_t::now();
+ ASSERT_LT(after - before, Seconds(25));
+ ASSERT_GTE(after - before, Milliseconds(25));
+ thread.join();
+ ASSERT_EQ(notifier->getVersion(), thisVersion);
+}
+} // namespace
diff --git a/src/mongo/db/storage/capped_callback.h b/src/mongo/db/storage/capped_callback.h
index dab688ba711..4fcbeecd8e2 100644
--- a/src/mongo/db/storage/capped_callback.h
+++ b/src/mongo/db/storage/capped_callback.h
@@ -55,6 +55,11 @@ public:
RecordData data) = 0;
/**
+ * Returns true if there may be waiters.
+ */
+ virtual bool haveCappedWaiters() = 0;
+
+ /**
* Used to notify any waiters when new documents may be visible in the capped collection.
*/
virtual void notifyCappedWaitersIfNeeded() = 0;
diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp
index 64c9ad1fbd7..280ad6ccee0 100644
--- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp
+++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp
@@ -56,6 +56,9 @@ public:
return Status::OK();
}
+ bool haveCappedWaiters() {
+ return false;
+ }
void notifyCappedWaitersIfNeeded() {}
vector<DiskLoc> deleted;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 6aa40aad041..5c99b5de7ab 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -93,6 +93,8 @@ using std::string;
namespace dps = ::mongo::dotted_path_support;
+const int WiredTigerKVEngine::kDefaultJournalDelayMillis = 100;
+
class WiredTigerKVEngine::WiredTigerJournalFlusher : public BackgroundJob {
public:
explicit WiredTigerJournalFlusher(WiredTigerSessionCache* sessionCache)
@@ -118,7 +120,7 @@ public:
int ms = storageGlobalParams.journalCommitIntervalMs.load();
if (!ms) {
- ms = 100;
+ ms = kDefaultJournalDelayMillis;
}
MONGO_IDLE_THREAD_BLOCK;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index fba921fea2d..bd4e10a4366 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -56,6 +56,7 @@ class WiredTigerSizeStorer;
class WiredTigerKVEngine final : public KVEngine {
public:
+ static const int kDefaultJournalDelayMillis;
WiredTigerKVEngine(const std::string& canonicalName,
const std::string& path,
ClockSource* cs,
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
index 7d7a7900c82..37881acd173 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
@@ -33,6 +33,7 @@
#include <cstring>
+#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/idle_thread_block.h"
@@ -163,6 +164,30 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses
MONGO_IDLE_THREAD_BLOCK;
_opsWaitingForJournalCV.wait(lk,
[&] { return _shuttingDown || _opsWaitingForJournal; });
+
+ // If we're not shutting down and nobody is actively waiting for the oplog to become
+ // durable, delay journaling a bit to reduce the sync rate.
+ auto journalDelay = Milliseconds(storageGlobalParams.journalCommitIntervalMs.load());
+ if (journalDelay == Milliseconds(0)) {
+ journalDelay = Milliseconds(WiredTigerKVEngine::kDefaultJournalDelayMillis);
+ }
+ auto now = Date_t::now();
+ auto deadline = now + journalDelay;
+ auto shouldSyncOpsWaitingForJournal = [&] {
+ return _shuttingDown || oplogRecordStore->haveCappedWaiters();
+ };
+
+ // Eventually it would be more optimal to merge this with the normal journal flushing
+ // and block for oplog tailers to show up. For now this loop will poll once a
+ // millisecond up to the journalDelay to see if we have any waiters yet. This reduces
+ // sync-related I/O on the primary when secondaries are lagged, but will avoid
+ // significant delays in confirming majority writes on replica sets with infrequent
+ // writes.
+ while (now < deadline &&
+ !_opsWaitingForJournalCV.wait_until(
+ lk, now.toSystemTimePoint(), shouldSyncOpsWaitingForJournal)) {
+ now += Milliseconds(1);
+ }
}
while (!_shuttingDown && MONGO_FAIL_POINT(WTPausePrimaryOplogDurabilityLoop)) {
@@ -175,6 +200,7 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses
log() << "oplog journal thread loop shutting down";
return;
}
+ invariant(_opsWaitingForJournal);
_opsWaitingForJournal = false;
lk.unlock();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index ec9c63c266a..646a6784a08 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -1191,6 +1191,11 @@ bool WiredTigerRecordStore::isOpHidden_forTest(const RecordId& id) const {
static_cast<std::uint64_t>(id.repr());
}
+bool WiredTigerRecordStore::haveCappedWaiters() {
+ stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
+ return _cappedCallback && _cappedCallback->haveCappedWaiters();
+}
+
void WiredTigerRecordStore::notifyCappedWaitersIfNeeded() {
stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
// This wakes up cursors blocking in await_data.
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index f02f161e720..3863cd7ae8b 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -260,6 +260,8 @@ public:
// Returns false if the oplog was dropped while waiting for a deletion request.
bool yieldAndAwaitOplogDeletionRequest(OperationContext* opCtx);
+ bool haveCappedWaiters();
+
void notifyCappedWaitersIfNeeded();
class OplogStones;