diff options
author | Gabe Villasana <villagab4@gmail.com> | 2019-06-13 10:33:20 -0400 |
---|---|---|
committer | Gabe Villasana <villagab4@gmail.com> | 2019-07-11 11:32:13 -0400 |
commit | d9b6940984db00c428441139a33f19b207e35f30 (patch) | |
tree | ffe233d034018c0decc4b40561e4ffefb8594767 | |
parent | 33fc66e686b4a48dc9527930c948f137d7628c14 (diff) | |
download | mongo-d9b6940984db00c428441139a33f19b207e35f30.tar.gz |
SERVER-40168 Pull the OplogTruncaterThread out of WiredTigerKVEngine and put it above the storage layer
21 files changed, 259 insertions, 248 deletions
diff --git a/jstests/replsets/oplog_rollover.js b/jstests/replsets/oplog_rollover.js index b16f54cf539..e5532585ab7 100644 --- a/jstests/replsets/oplog_rollover.js +++ b/jstests/replsets/oplog_rollover.js @@ -54,9 +54,9 @@ // happen when oplog size exceeds the configured maximum. if (primary.getDB('admin').serverStatus().storageEngine.supportsCommittedReads) { // Wait for checkpointing/stable timestamp to catch up with the second insert so oplog - // entry of the first insert is allowed to be deleted by the oplog truncater thread when - // a new oplog stone is created. "inMemory" WT engine does not run checkpoint thread and - // lastStableRecoveryTimestamp is the stable timestamp in this case. + // entry of the first insert is allowed to be deleted by the oplog cap maintainer thread + // when a new oplog stone is created. "inMemory" WT engine does not run checkpoint + // thread and lastStableRecoveryTimestamp is the stable timestamp in this case. assert.soon( () => { const primaryTimestamp = @@ -81,14 +81,14 @@ 2000); // Insert the third document which will trigger a new oplog stone to be created. The - // oplog truncater thread will then be unblocked on the creation of the new oplog stone - // and will start truncating oplog entries. The oplog entry for the first insert will be - // truncated after the oplog truncater thread finishes. + // oplog cap maintainer thread will then be unblocked on the creation of the new oplog + // stone and will start truncating oplog entries. The oplog entry for the first + // insert will be truncated after the oplog cap maintainer thread finishes. assert.commandWorked( coll.insert({_id: 2, longString: longString}, {writeConcern: {w: 2}})); // Test that oplog entry of the initial insert rolls over on both primary and secondary. - // Use assert.soon to wait for oplog truncater thread to run. + // Use assert.soon to wait for oplog cap maintainer thread to run. assert.soon(() => { return numInsertOplogEntry(primaryOplog) === 2; }, "Timeout waiting for oplog to roll over on primary"); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 926f35031e2..5af7dda53c9 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -227,6 +227,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/database_holder', + '$BUILD_DIR/mongo/db/storage/oplog_cap_maintainer_thread', '$BUILD_DIR/mongo/db/logical_clock', ], ) diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 2f18223276b..05bd9f78ff2 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -811,9 +811,12 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { _setConfigState_inlock(kConfigReplicationDisabled); return; } + invariant(_settings.usingReplSets()); invariant(!ReplSettings::shouldRecoverFromOplogAsStandalone()); + _storage->initializeStorageControlsForReplication(opCtx->getServiceContext()); + { stdx::lock_guard<stdx::mutex> lk(_mutex); fassert(18822, !_inShutdown); diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 0dab0f4588f..a1cdd60ed92 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -374,6 +374,14 @@ public: virtual bool supportsRecoveryTimestamp(ServiceContext* serviceCtx) const = 0; /** + * Responsible for initializing independent processes for replication that manage + * and interact with the storage layer. + * + * Initializes the OplogCapMaintainerThread to control deletion of oplog stones. + */ + virtual void initializeStorageControlsForReplication(ServiceContext* serviceCtx) const = 0; + + /** * Returns the stable timestamp that the storage engine recovered to on startup. If the * recovery point was not stable, returns "none". */ diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index f1e80568f7a..7cfa35ba7af 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -75,7 +75,9 @@ #include "mongo/db/repl/rollback_gen.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/durable_catalog.h" +#include "mongo/db/storage/oplog_cap_maintainer_thread.h" #include "mongo/util/assert_util.h" +#include "mongo/util/background.h" #include "mongo/util/log.h" #include "mongo/util/str.h" @@ -1091,6 +1093,18 @@ bool StorageInterfaceImpl::supportsRecoveryTimestamp(ServiceContext* serviceCtx) return serviceCtx->getStorageEngine()->supportsRecoveryTimestamp(); } +void StorageInterfaceImpl::initializeStorageControlsForReplication( + ServiceContext* serviceCtx) const { + // The storage engine may support the use of OplogStones to more finely control + // oplog history deletion, in which case we need to start the thread to + // periodically execute deletion via oplog stones. OplogStones are a replacement + // for capped collection deletion of the oplog collection history. + if (serviceCtx->getStorageEngine()->supportsOplogStones()) { + BackgroundJob* backgroundThread = new OplogCapMaintainerThread(); + backgroundThread->go(); + } +} + boost::optional<Timestamp> StorageInterfaceImpl::getRecoveryTimestamp( ServiceContext* serviceCtx) const { return serviceCtx->getStorageEngine()->getRecoveryTimestamp(); diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 51ed102028f..733d11bd5f8 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -166,6 +166,8 @@ public: bool supportsRecoveryTimestamp(ServiceContext* serviceCtx) const override; + void initializeStorageControlsForReplication(ServiceContext* serviceCtx) const override; + boost::optional<Timestamp> getRecoveryTimestamp(ServiceContext* serviceCtx) const override; bool supportsDocLocking(ServiceContext* serviceCtx) const override; diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 71508cd41fe..ebf54b3cc5d 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -304,6 +304,8 @@ public: return false; } + void initializeStorageControlsForReplication(ServiceContext* serviceCtx) const override {} + boost::optional<Timestamp> getRecoveryTimestamp(ServiceContext* serviceCtx) const override { return boost::none; } diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript index 3e66269ca93..4bc7dbaf540 100644 --- a/src/mongo/db/storage/SConscript +++ b/src/mongo/db/storage/SConscript @@ -93,6 +93,24 @@ env.Library( '$BUILD_DIR/mongo/base', ] ) + +env.Library( + target='oplog_cap_maintainer_thread', + source=[ + 'oplog_cap_maintainer_thread.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/util/background_job', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/catalog/collection', + '$BUILD_DIR/mongo/db/catalog/database_holder', + '$BUILD_DIR/mongo/db/concurrency/lock_manager', + '$BUILD_DIR/mongo/db/namespace_string', + '$BUILD_DIR/mongo/db/service_context', + ], +) env.Library( target='storage_options', diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index 3344908b525..0503c6f5e6f 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -399,6 +399,13 @@ public: } /** + * See `StorageEngine::supportsOplogStones` + */ + virtual bool supportsOplogStones() const { + return false; + } + + /** * See `StorageEngine::replicationBatchIsComplete()` */ virtual void replicationBatchIsComplete() const {}; diff --git a/src/mongo/db/storage/oplog_cap_maintainer_thread.cpp b/src/mongo/db/storage/oplog_cap_maintainer_thread.cpp new file mode 100644 index 00000000000..aa6b997f75f --- /dev/null +++ b/src/mongo/db/storage/oplog_cap_maintainer_thread.cpp @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/platform/basic.h" + +#include "oplog_cap_maintainer_thread.h" + +#include "mongo/base/error_codes.h" +#include "mongo/base/string_data.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/client.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/lock_manager_defs.h" +#include "mongo/db/concurrency/locker.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/db/storage/record_store.h" +#include "mongo/logger/logstream_builder.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/exit.h" +#include "mongo/util/log.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +bool OplogCapMaintainerThread::_deleteExcessDocuments() { + if (!getGlobalServiceContext()->getStorageEngine()) { + LOG(2) << "OplogCapMaintainerThread: no global storage engine yet"; + return false; + } + + const ServiceContext::UniqueOperationContext opCtx = cc().makeOperationContext(); + + try { + // A Global IX lock should be good enough to protect the oplog truncation from + // interruptions such as restartCatalog. PBWM, database lock or collection lock is not + // needed. This improves concurrency if oplog truncation takes long time. + ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( + opCtx.get()->lockState()); + Lock::GlobalLock lk(opCtx.get(), MODE_IX); + + RecordStore* rs = nullptr; + NamespaceString oplogNss = NamespaceString::kRsOplogNamespace; + { + // Release the database lock right away because we don't want to + // block other operations on the local database and given the + // fact that oplog collection is so special, Global IX lock can + // make sure the collection exists. + Lock::DBLock dbLock(opCtx.get(), oplogNss.db(), MODE_IX); + auto databaseHolder = DatabaseHolder::get(opCtx.get()); + auto db = databaseHolder->getDb(opCtx.get(), oplogNss.db()); + if (!db) { + LOG(2) << "no local database yet"; + return false; + } + // We need to hold the database lock while getting the collection. Otherwise a + // concurrent collection creation would write to the map in the Database object + // while we concurrently read the map. + Collection* collection = db->getCollection(opCtx.get(), oplogNss); + if (!collection) { + LOG(2) << "no collection " << oplogNss; + return false; + } + rs = collection->getRecordStore(); + } + if (!rs->yieldAndAwaitOplogDeletionRequest(opCtx.get())) { + return false; // Oplog went away. + } + rs->reclaimOplog(opCtx.get()); + } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { + return false; + } catch (const std::exception& e) { + severe() << "error in OplogCapMaintainerThread: " << e.what(); + fassertFailedNoTrace(!"error in OplogCapMaintainerThread"); + } catch (...) { + fassertFailedNoTrace(!"unknown error in OplogCapMaintainerThread"); + } + return true; +} + +void OplogCapMaintainerThread::run() { + ThreadClient tc(_name, getGlobalServiceContext()); + + while (!globalInShutdownDeprecated()) { + if (!_deleteExcessDocuments()) { + sleepmillis(1000); // Back off in case there were problems deleting. + } + } +} +} // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mock.cpp b/src/mongo/db/storage/oplog_cap_maintainer_thread.h index 3fe81676fa2..ac4ea75a8e8 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mock.cpp +++ b/src/mongo/db/storage/oplog_cap_maintainer_thread.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2019-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -27,26 +27,36 @@ * it in the license file. */ -#include "mongo/platform/basic.h" +#pragma once -#include "mongo/base/init.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/service_context.h" -#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" +#include <string> -#include <memory> +#include "mongo/db/namespace_string.h" +#include "mongo/util/background.h" namespace mongo { -namespace { -bool initRsOplogBackgroundThread(StringData ns) { - return NamespaceString::oplog(ns); -} +/** + * Responsible for deleting oplog stones once their max capacity has been reached. + */ +class OplogCapMaintainerThread : public BackgroundJob { +public: + OplogCapMaintainerThread() : BackgroundJob(true /* deleteSelf */) {} + + virtual std::string name() const { + return _name; + } + + virtual void run(); + +private: + /** + * Returns true iff there was an oplog to delete from. + */ + bool _deleteExcessDocuments(); -MONGO_INITIALIZER(SetInitRsOplogBackgroundThreadCallback)(InitializerContext* context) { - WiredTigerKVEngine::setInitRsOplogBackgroundThreadCallback(initRsOplogBackgroundThread); - return Status::OK(); -} + std::string _name = + std::string("OplogCapMaintainerThread-") + NamespaceString::kRsOplogNamespace.toString(); +}; -} // namespace } // namespace mongo diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index 3da5078559b..66b7b713f6d 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -560,6 +560,24 @@ public: "this storage engine does not support updateCappedSize"); } + /** + * Returns false if the oplog was dropped while waiting for a deletion request. + * This should only be called if StorageEngine::supportsOplogStones() is true. + * Storage engines supporting oplog stones must implement this function. + */ + virtual bool yieldAndAwaitOplogDeletionRequest(OperationContext* opCtx) { + MONGO_UNREACHABLE; + } + + /** + * This should only be called if StorageEngine::supportsOplogStones() is true. + * Storage engines supporting oplog stones must implement this function. + */ + virtual void reclaimOplog(OperationContext* opCtx) { + MONGO_UNREACHABLE; + } + + protected: std::string _ns; }; diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index 3d1fe86e120..7722aaaa956 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -353,6 +353,15 @@ public: } /** + * Returns true if the storage engine uses oplog stones to more finely control + * deletion of oplog history, instead of the standard capped collection controls on + * the oplog collection size. + */ + virtual bool supportsOplogStones() const { + return false; + } + + /** * Returns true if the storage engine supports deferring collection drops until the the storage * engine determines that the storage layer artifacts for the pending drops are no longer needed * based on the stable and oldest timestamps. diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp index 35131d7be1c..1dbabe78ed7 100644 --- a/src/mongo/db/storage/storage_engine_impl.cpp +++ b/src/mongo/db/storage/storage_engine_impl.cpp @@ -788,6 +788,10 @@ bool StorageEngineImpl::supportsReadConcernMajority() const { return _engine->supportsReadConcernMajority(); } +bool StorageEngineImpl::supportsOplogStones() const { + return _engine->supportsOplogStones(); +} + bool StorageEngineImpl::supportsPendingDrops() const { return supportsReadConcernMajority(); } diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h index 89ce276ddab..22edef205ad 100644 --- a/src/mongo/db/storage/storage_engine_impl.h +++ b/src/mongo/db/storage/storage_engine_impl.h @@ -149,6 +149,8 @@ public: bool supportsReadConcernMajority() const final; + bool supportsOplogStones() const final; + bool supportsPendingDrops() const final; void clearDropPendingState() final; diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript index c3e886c866d..3a43de2eb8d 100644 --- a/src/mongo/db/storage/wiredtiger/SConscript +++ b/src/mongo/db/storage/wiredtiger/SConscript @@ -95,7 +95,6 @@ if wiredtiger: source=[ 'wiredtiger_init.cpp', 'wiredtiger_options_init.cpp', - 'wiredtiger_record_store_mongod.cpp', 'wiredtiger_server_status.cpp', env.Idlc('wiredtiger_global_options.idl')[0], ], @@ -116,15 +115,6 @@ if wiredtiger: ], ) - wtEnv.Library( - target='storage_wiredtiger_mock', - source=[ - 'wiredtiger_record_store_mock.cpp', - ], - LIBDEPS=['storage_wiredtiger_core', - ] - ) - wtEnv.CppUnitTest( target='storage_wiredtiger_init_test', source=['wiredtiger_init_test.cpp', @@ -147,7 +137,7 @@ if wiredtiger: 'wiredtiger_recovery_unit_test.cpp', ], LIBDEPS=[ - 'storage_wiredtiger_mock', + 'storage_wiredtiger_core', '$BUILD_DIR/mongo/db/storage/recovery_unit_test_harness', '$BUILD_DIR/mongo/util/clock_source_mock', ], @@ -168,7 +158,7 @@ if wiredtiger: '$BUILD_DIR/mongo/db/storage/durable_catalog_impl', '$BUILD_DIR/mongo/db/storage/record_store_test_harness', '$BUILD_DIR/mongo/util/clock_source_mock', - 'storage_wiredtiger_mock', + 'storage_wiredtiger_core', ], ) @@ -180,7 +170,7 @@ if wiredtiger: LIBDEPS=[ '$BUILD_DIR/mongo/db/storage/durable_catalog_impl', '$BUILD_DIR/mongo/db/storage/sorted_data_interface_test_harness', - 'storage_wiredtiger_mock', + 'storage_wiredtiger_core', ], ) @@ -252,7 +242,7 @@ if wiredtiger: LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/storage/kv/kv_engine_test_harness', - 'storage_wiredtiger_mock', + 'storage_wiredtiger_core', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', @@ -268,7 +258,7 @@ if wiredtiger: LIBDEPS=[ '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/storage/durable_catalog_impl', - 'storage_wiredtiger_mock', + 'storage_wiredtiger_core', ], ) @@ -277,7 +267,7 @@ if wiredtiger: source=['wiredtiger_session_cache_test.cpp', ], LIBDEPS=[ - 'storage_wiredtiger_mock', + 'storage_wiredtiger_core', ], ) @@ -289,6 +279,6 @@ if wiredtiger: '$BUILD_DIR/mongo/db/storage/durable_catalog_impl', '$BUILD_DIR/mongo/unittest/unittest', '$BUILD_DIR/mongo/util/clock_source_mock', - 'storage_wiredtiger_mock', + 'storage_wiredtiger_core', ], ) diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 72aa1d4dfcf..422f838ba5f 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -500,10 +500,6 @@ Status OpenReadTransactionParam::setFromString(const std::string& str) { namespace { -std::function<bool(StringData)> initRsOplogBackgroundThreadCallback = [](StringData) -> bool { - fassertFailed(40358); -}; - StatusWith<std::vector<std::string>> getDataFilesFromBackupCursor(WT_CURSOR* cursor, std::string dbPath, const char* statusPrefix) { @@ -1549,15 +1545,6 @@ void WiredTigerKVEngine::setJournalListener(JournalListener* jl) { return _sessionCache->setJournalListener(jl); } -void WiredTigerKVEngine::setInitRsOplogBackgroundThreadCallback( - std::function<bool(StringData)> cb) { - initRsOplogBackgroundThreadCallback = std::move(cb); -} - -bool WiredTigerKVEngine::initRsOplogBackgroundThread(StringData ns) { - return initRsOplogBackgroundThreadCallback(ns); -} - namespace { MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely); @@ -1907,6 +1894,10 @@ bool WiredTigerKVEngine::supportsReadConcernMajority() const { return _keepDataHistory; } +bool WiredTigerKVEngine::supportsOplogStones() const { + return true; +} + void WiredTigerKVEngine::startOplogManager(OperationContext* opCtx, const std::string& uri, WiredTigerRecordStore* oplogRecordStore) { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index ba86106efec..625cb5a565a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -236,6 +236,8 @@ public: bool supportsReadConcernSnapshot() const final override; + bool supportsOplogStones() const final override; + /* * This function is called when replication has completed a batch. In this function, we * refresh our oplog visiblity read-at-timestamp value. @@ -289,21 +291,6 @@ public: return _oplogManager.get(); } - /** - * Sets the implementation for `initRsOplogBackgroundThread` (allowing tests to skip the - * background job, for example). Intended to be called from a MONGO_INITIALIZER and therefore in - * a single threaded context. - */ - static void setInitRsOplogBackgroundThreadCallback(std::function<bool(StringData)> cb); - - /** - * Initializes a background job to remove excess documents in the oplog collections. - * This applies to the capped collections in the local.oplog.* namespaces (specifically - * local.oplog.rs for replica sets). - * Returns true if a background job is running for the namespace. - */ - static bool initRsOplogBackgroundThread(StringData ns); - static void appendGlobalStats(BSONObjBuilder& b); Timestamp getStableTimestamp() const override; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index d711686fc42..554f830c7f6 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -767,7 +767,8 @@ void WiredTigerRecordStore::postConstructorInit(OperationContext* opCtx) { if (_sizeStorer) _sizeStorer->store(_uri, _sizeInfo); - if (WiredTigerKVEngine::initRsOplogBackgroundThread(ns())) { + if (NamespaceString::oplog(ns()) && + !(storageGlobalParams.repair || storageGlobalParams.readOnly)) { _oplogStones = std::make_shared<OplogStones>(opCtx, this); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index 33365c96b36..d3d037ffcfc 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -241,7 +241,9 @@ public: bool inShutdown() const; - void reclaimOplog(OperationContext* opCtx); + bool yieldAndAwaitOplogDeletionRequest(OperationContext* opCtx) override; + + void reclaimOplog(OperationContext* opCtx) override; /** * The `recoveryTimestamp` is when replication recovery would need to replay from for @@ -250,9 +252,6 @@ public: */ void reclaimOplog(OperationContext* opCtx, Timestamp recoveryTimestamp); - // Returns false if the oplog was dropped while waiting for a deletion request. - bool yieldAndAwaitOplogDeletionRequest(OperationContext* opCtx); - bool haveCappedWaiters(); void notifyCappedWaitersIfNeeded(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp deleted file mode 100644 index cd6207313c9..00000000000 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage - -#include "mongo/platform/basic.h" - -#include <set> - -#include "mongo/base/checked_cast.h" -#include "mongo/base/init.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database.h" -#include "mongo/db/catalog/database_holder.h" -#include "mongo/db/client.h" -#include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/service_context.h" -#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" -#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" -#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" -#include "mongo/stdx/mutex.h" -#include "mongo/util/background.h" -#include "mongo/util/exit.h" -#include "mongo/util/log.h" - -namespace mongo { - -namespace { - -std::set<NamespaceString> _backgroundThreadNamespaces; -stdx::mutex _backgroundThreadMutex; - -class OplogTruncaterThread : public BackgroundJob { -public: - OplogTruncaterThread(const NamespaceString& ns) - : BackgroundJob(true /* deleteSelf */), _ns(ns) { - _name = std::string("WT-OplogTruncaterThread-") + _ns.toString(); - } - - virtual std::string name() const { - return _name; - } - - /** - * Returns true iff there was an oplog to delete from. - */ - bool _deleteExcessDocuments() { - if (!getGlobalServiceContext()->getStorageEngine()) { - LOG(2) << "no global storage engine yet"; - return false; - } - - const ServiceContext::UniqueOperationContext opCtx = cc().makeOperationContext(); - - try { - // A Global IX lock should be good enough to protect the oplog truncation from - // interruptions such as restartCatalog. PBWM, database lock or collection lock is not - // needed. This improves concurrency if oplog truncation takes long time. - ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( - opCtx.get()->lockState()); - Lock::GlobalLock lk(opCtx.get(), MODE_IX); - - WiredTigerRecordStore* rs = nullptr; - { - // Release the database lock right away because we don't want to - // block other operations on the local database and given the - // fact that oplog collection is so special, Global IX lock can - // make sure the collection exists. - Lock::DBLock dbLock(opCtx.get(), _ns.db(), MODE_IX); - auto databaseHolder = DatabaseHolder::get(opCtx.get()); - auto db = databaseHolder->getDb(opCtx.get(), _ns.db()); - if (!db) { - LOG(2) << "no local database yet"; - return false; - } - // We need to hold the database lock while getting the collection. Otherwise a - // concurrent collection creation would write to the map in the Database object - // while we concurrently read the map. - Collection* collection = db->getCollection(opCtx.get(), _ns); - if (!collection) { - LOG(2) << "no collection " << _ns; - return false; - } - rs = checked_cast<WiredTigerRecordStore*>(collection->getRecordStore()); - } - - if (!rs->yieldAndAwaitOplogDeletionRequest(opCtx.get())) { - return false; // Oplog went away. - } - rs->reclaimOplog(opCtx.get()); - } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { - return false; - } catch (const std::exception& e) { - severe() << "error in OplogTruncaterThread: " << e.what(); - fassertFailedNoTrace(!"error in OplogTruncaterThread"); - } catch (...) { - fassertFailedNoTrace(!"unknown error in OplogTruncaterThread"); - } - return true; - } - - virtual void run() { - ThreadClient tc(_name, getGlobalServiceContext()); - - while (!globalInShutdownDeprecated()) { - if (!_deleteExcessDocuments()) { - sleepmillis(1000); // Back off in case there were problems deleting. - } - } - } - -private: - NamespaceString _ns; - std::string _name; -}; - -bool initRsOplogBackgroundThread(StringData ns) { - if (!NamespaceString::oplog(ns)) { - return false; - } - - if (storageGlobalParams.repair || storageGlobalParams.readOnly) { - LOG(1) << "not starting OplogTruncaterThread for " << ns - << " because we are either in repair or read-only mode"; - return false; - } - - stdx::lock_guard<stdx::mutex> lock(_backgroundThreadMutex); - NamespaceString nss(ns); - if (_backgroundThreadNamespaces.count(nss)) { - log() << "OplogTruncaterThread " << ns << " already started"; - } else { - log() << "Starting OplogTruncaterThread " << ns; - BackgroundJob* backgroundThread = new OplogTruncaterThread(nss); - backgroundThread->go(); - _backgroundThreadNamespaces.insert(nss); - } - return true; -} - -MONGO_INITIALIZER(SetInitRsOplogBackgroundThreadCallback)(InitializerContext* context) { - WiredTigerKVEngine::setInitRsOplogBackgroundThreadCallback(initRsOplogBackgroundThread); - return Status::OK(); -} - -} // namespace -} // namespace mongo |