summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabe Villasana <villagab4@gmail.com>2019-06-13 10:33:20 -0400
committerGabe Villasana <villagab4@gmail.com>2019-07-11 11:32:13 -0400
commitd9b6940984db00c428441139a33f19b207e35f30 (patch)
treeffe233d034018c0decc4b40561e4ffefb8594767
parent33fc66e686b4a48dc9527930c948f137d7628c14 (diff)
downloadmongo-d9b6940984db00c428441139a33f19b207e35f30.tar.gz
SERVER-40168 Pull the OplogTruncaterThread out of WiredTigerKVEngine and put it above the storage layer
-rw-r--r--jstests/replsets/oplog_rollover.js14
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp3
-rw-r--r--src/mongo/db/repl/storage_interface.h8
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp14
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h2
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h2
-rw-r--r--src/mongo/db/storage/SConscript18
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h7
-rw-r--r--src/mongo/db/storage/oplog_cap_maintainer_thread.cpp120
-rw-r--r--src/mongo/db/storage/oplog_cap_maintainer_thread.h (renamed from src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mock.cpp)42
-rw-r--r--src/mongo/db/storage/record_store.h18
-rw-r--r--src/mongo/db/storage/storage_engine.h9
-rw-r--r--src/mongo/db/storage/storage_engine_impl.cpp4
-rw-r--r--src/mongo/db/storage/storage_engine_impl.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/SConscript24
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp17
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h17
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp3
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h7
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp175
21 files changed, 259 insertions, 248 deletions
diff --git a/jstests/replsets/oplog_rollover.js b/jstests/replsets/oplog_rollover.js
index b16f54cf539..e5532585ab7 100644
--- a/jstests/replsets/oplog_rollover.js
+++ b/jstests/replsets/oplog_rollover.js
@@ -54,9 +54,9 @@
// happen when oplog size exceeds the configured maximum.
if (primary.getDB('admin').serverStatus().storageEngine.supportsCommittedReads) {
// Wait for checkpointing/stable timestamp to catch up with the second insert so oplog
- // entry of the first insert is allowed to be deleted by the oplog truncater thread when
- // a new oplog stone is created. "inMemory" WT engine does not run checkpoint thread and
- // lastStableRecoveryTimestamp is the stable timestamp in this case.
+ // entry of the first insert is allowed to be deleted by the oplog cap maintainer thread
+ // when a new oplog stone is created. "inMemory" WT engine does not run checkpoint
+ // thread and lastStableRecoveryTimestamp is the stable timestamp in this case.
assert.soon(
() => {
const primaryTimestamp =
@@ -81,14 +81,14 @@
2000);
// Insert the third document which will trigger a new oplog stone to be created. The
- // oplog truncater thread will then be unblocked on the creation of the new oplog stone
- // and will start truncating oplog entries. The oplog entry for the first insert will be
- // truncated after the oplog truncater thread finishes.
+ // oplog cap maintainer thread will then be unblocked on the creation of the new oplog
+ // stone and will start truncating oplog entries. The oplog entry for the first
+ // insert will be truncated after the oplog cap maintainer thread finishes.
assert.commandWorked(
coll.insert({_id: 2, longString: longString}, {writeConcern: {w: 2}}));
// Test that oplog entry of the initial insert rolls over on both primary and secondary.
- // Use assert.soon to wait for oplog truncater thread to run.
+ // Use assert.soon to wait for oplog cap maintainer thread to run.
assert.soon(() => {
return numInsertOplogEntry(primaryOplog) === 2;
}, "Timeout waiting for oplog to roll over on primary");
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 926f35031e2..5af7dda53c9 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -227,6 +227,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/database_holder',
+ '$BUILD_DIR/mongo/db/storage/oplog_cap_maintainer_thread',
'$BUILD_DIR/mongo/db/logical_clock',
],
)
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 2f18223276b..05bd9f78ff2 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -811,9 +811,12 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
_setConfigState_inlock(kConfigReplicationDisabled);
return;
}
+
invariant(_settings.usingReplSets());
invariant(!ReplSettings::shouldRecoverFromOplogAsStandalone());
+ _storage->initializeStorageControlsForReplication(opCtx->getServiceContext());
+
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
fassert(18822, !_inShutdown);
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 0dab0f4588f..a1cdd60ed92 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -374,6 +374,14 @@ public:
virtual bool supportsRecoveryTimestamp(ServiceContext* serviceCtx) const = 0;
/**
+ * Responsible for initializing independent processes for replication that manage
+ * and interact with the storage layer.
+ *
+ * Initializes the OplogCapMaintainerThread to control deletion of oplog stones.
+ */
+ virtual void initializeStorageControlsForReplication(ServiceContext* serviceCtx) const = 0;
+
+ /**
* Returns the stable timestamp that the storage engine recovered to on startup. If the
* recovery point was not stable, returns "none".
*/
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index f1e80568f7a..7cfa35ba7af 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -75,7 +75,9 @@
#include "mongo/db/repl/rollback_gen.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/durable_catalog.h"
+#include "mongo/db/storage/oplog_cap_maintainer_thread.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/background.h"
#include "mongo/util/log.h"
#include "mongo/util/str.h"
@@ -1091,6 +1093,18 @@ bool StorageInterfaceImpl::supportsRecoveryTimestamp(ServiceContext* serviceCtx)
return serviceCtx->getStorageEngine()->supportsRecoveryTimestamp();
}
+void StorageInterfaceImpl::initializeStorageControlsForReplication(
+ ServiceContext* serviceCtx) const {
+ // The storage engine may support the use of OplogStones to more finely control
+ // oplog history deletion, in which case we need to start the thread to
+ // periodically execute deletion via oplog stones. OplogStones are a replacement
+ // for capped collection deletion of the oplog collection history.
+ if (serviceCtx->getStorageEngine()->supportsOplogStones()) {
+ BackgroundJob* backgroundThread = new OplogCapMaintainerThread();
+ backgroundThread->go();
+ }
+}
+
boost::optional<Timestamp> StorageInterfaceImpl::getRecoveryTimestamp(
ServiceContext* serviceCtx) const {
return serviceCtx->getStorageEngine()->getRecoveryTimestamp();
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 51ed102028f..733d11bd5f8 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -166,6 +166,8 @@ public:
bool supportsRecoveryTimestamp(ServiceContext* serviceCtx) const override;
+ void initializeStorageControlsForReplication(ServiceContext* serviceCtx) const override;
+
boost::optional<Timestamp> getRecoveryTimestamp(ServiceContext* serviceCtx) const override;
bool supportsDocLocking(ServiceContext* serviceCtx) const override;
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 71508cd41fe..ebf54b3cc5d 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -304,6 +304,8 @@ public:
return false;
}
+ void initializeStorageControlsForReplication(ServiceContext* serviceCtx) const override {}
+
boost::optional<Timestamp> getRecoveryTimestamp(ServiceContext* serviceCtx) const override {
return boost::none;
}
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index 3e66269ca93..4bc7dbaf540 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -93,6 +93,24 @@ env.Library(
'$BUILD_DIR/mongo/base',
]
)
+
+env.Library(
+ target='oplog_cap_maintainer_thread',
+ source=[
+ 'oplog_cap_maintainer_thread.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/util/background_job',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/catalog/collection',
+ '$BUILD_DIR/mongo/db/catalog/database_holder',
+ '$BUILD_DIR/mongo/db/concurrency/lock_manager',
+ '$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/db/service_context',
+ ],
+)
env.Library(
target='storage_options',
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index 3344908b525..0503c6f5e6f 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -399,6 +399,13 @@ public:
}
/**
+ * See `StorageEngine::supportsOplogStones`
+ */
+ virtual bool supportsOplogStones() const {
+ return false;
+ }
+
+ /**
* See `StorageEngine::replicationBatchIsComplete()`
*/
virtual void replicationBatchIsComplete() const {};
diff --git a/src/mongo/db/storage/oplog_cap_maintainer_thread.cpp b/src/mongo/db/storage/oplog_cap_maintainer_thread.cpp
new file mode 100644
index 00000000000..aa6b997f75f
--- /dev/null
+++ b/src/mongo/db/storage/oplog_cap_maintainer_thread.cpp
@@ -0,0 +1,120 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/platform/basic.h"
+
+#include "oplog_cap_maintainer_thread.h"
+
+#include "mongo/base/error_codes.h"
+#include "mongo/base/string_data.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/lock_manager_defs.h"
+#include "mongo/db/concurrency/locker.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/storage/record_store.h"
+#include "mongo/logger/logstream_builder.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/exit.h"
+#include "mongo/util/log.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+bool OplogCapMaintainerThread::_deleteExcessDocuments() {
+ if (!getGlobalServiceContext()->getStorageEngine()) {
+ LOG(2) << "OplogCapMaintainerThread: no global storage engine yet";
+ return false;
+ }
+
+ const ServiceContext::UniqueOperationContext opCtx = cc().makeOperationContext();
+
+ try {
+ // A Global IX lock should be good enough to protect the oplog truncation from
+ // interruptions such as restartCatalog. PBWM, database lock or collection lock is not
+ // needed. This improves concurrency if oplog truncation takes long time.
+ ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(
+ opCtx.get()->lockState());
+ Lock::GlobalLock lk(opCtx.get(), MODE_IX);
+
+ RecordStore* rs = nullptr;
+ NamespaceString oplogNss = NamespaceString::kRsOplogNamespace;
+ {
+ // Release the database lock right away because we don't want to
+ // block other operations on the local database and given the
+ // fact that oplog collection is so special, Global IX lock can
+ // make sure the collection exists.
+ Lock::DBLock dbLock(opCtx.get(), oplogNss.db(), MODE_IX);
+ auto databaseHolder = DatabaseHolder::get(opCtx.get());
+ auto db = databaseHolder->getDb(opCtx.get(), oplogNss.db());
+ if (!db) {
+ LOG(2) << "no local database yet";
+ return false;
+ }
+ // We need to hold the database lock while getting the collection. Otherwise a
+ // concurrent collection creation would write to the map in the Database object
+ // while we concurrently read the map.
+ Collection* collection = db->getCollection(opCtx.get(), oplogNss);
+ if (!collection) {
+ LOG(2) << "no collection " << oplogNss;
+ return false;
+ }
+ rs = collection->getRecordStore();
+ }
+ if (!rs->yieldAndAwaitOplogDeletionRequest(opCtx.get())) {
+ return false; // Oplog went away.
+ }
+ rs->reclaimOplog(opCtx.get());
+ } catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
+ return false;
+ } catch (const std::exception& e) {
+ severe() << "error in OplogCapMaintainerThread: " << e.what();
+ fassertFailedNoTrace(!"error in OplogCapMaintainerThread");
+ } catch (...) {
+ fassertFailedNoTrace(!"unknown error in OplogCapMaintainerThread");
+ }
+ return true;
+}
+
+void OplogCapMaintainerThread::run() {
+ ThreadClient tc(_name, getGlobalServiceContext());
+
+ while (!globalInShutdownDeprecated()) {
+ if (!_deleteExcessDocuments()) {
+ sleepmillis(1000); // Back off in case there were problems deleting.
+ }
+ }
+}
+} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mock.cpp b/src/mongo/db/storage/oplog_cap_maintainer_thread.h
index 3fe81676fa2..ac4ea75a8e8 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mock.cpp
+++ b/src/mongo/db/storage/oplog_cap_maintainer_thread.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2019-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -27,26 +27,36 @@
* it in the license file.
*/
-#include "mongo/platform/basic.h"
+#pragma once
-#include "mongo/base/init.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/service_context.h"
-#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
+#include <string>
-#include <memory>
+#include "mongo/db/namespace_string.h"
+#include "mongo/util/background.h"
namespace mongo {
-namespace {
-bool initRsOplogBackgroundThread(StringData ns) {
- return NamespaceString::oplog(ns);
-}
+/**
+ * Responsible for deleting oplog stones once their max capacity has been reached.
+ */
+class OplogCapMaintainerThread : public BackgroundJob {
+public:
+ OplogCapMaintainerThread() : BackgroundJob(true /* deleteSelf */) {}
+
+ virtual std::string name() const {
+ return _name;
+ }
+
+ virtual void run();
+
+private:
+ /**
+ * Returns true iff there was an oplog to delete from.
+ */
+ bool _deleteExcessDocuments();
-MONGO_INITIALIZER(SetInitRsOplogBackgroundThreadCallback)(InitializerContext* context) {
- WiredTigerKVEngine::setInitRsOplogBackgroundThreadCallback(initRsOplogBackgroundThread);
- return Status::OK();
-}
+ std::string _name =
+ std::string("OplogCapMaintainerThread-") + NamespaceString::kRsOplogNamespace.toString();
+};
-} // namespace
} // namespace mongo
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index 3da5078559b..66b7b713f6d 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -560,6 +560,24 @@ public:
"this storage engine does not support updateCappedSize");
}
+ /**
+ * Returns false if the oplog was dropped while waiting for a deletion request.
+ * This should only be called if StorageEngine::supportsOplogStones() is true.
+ * Storage engines supporting oplog stones must implement this function.
+ */
+ virtual bool yieldAndAwaitOplogDeletionRequest(OperationContext* opCtx) {
+ MONGO_UNREACHABLE;
+ }
+
+ /**
+ * This should only be called if StorageEngine::supportsOplogStones() is true.
+ * Storage engines supporting oplog stones must implement this function.
+ */
+ virtual void reclaimOplog(OperationContext* opCtx) {
+ MONGO_UNREACHABLE;
+ }
+
+
protected:
std::string _ns;
};
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index 3d1fe86e120..7722aaaa956 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -353,6 +353,15 @@ public:
}
/**
+ * Returns true if the storage engine uses oplog stones to more finely control
+ * deletion of oplog history, instead of the standard capped collection controls on
+ * the oplog collection size.
+ */
+ virtual bool supportsOplogStones() const {
+ return false;
+ }
+
+ /**
* Returns true if the storage engine supports deferring collection drops until the the storage
* engine determines that the storage layer artifacts for the pending drops are no longer needed
* based on the stable and oldest timestamps.
diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp
index 35131d7be1c..1dbabe78ed7 100644
--- a/src/mongo/db/storage/storage_engine_impl.cpp
+++ b/src/mongo/db/storage/storage_engine_impl.cpp
@@ -788,6 +788,10 @@ bool StorageEngineImpl::supportsReadConcernMajority() const {
return _engine->supportsReadConcernMajority();
}
+bool StorageEngineImpl::supportsOplogStones() const {
+ return _engine->supportsOplogStones();
+}
+
bool StorageEngineImpl::supportsPendingDrops() const {
return supportsReadConcernMajority();
}
diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h
index 89ce276ddab..22edef205ad 100644
--- a/src/mongo/db/storage/storage_engine_impl.h
+++ b/src/mongo/db/storage/storage_engine_impl.h
@@ -149,6 +149,8 @@ public:
bool supportsReadConcernMajority() const final;
+ bool supportsOplogStones() const final;
+
bool supportsPendingDrops() const final;
void clearDropPendingState() final;
diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript
index c3e886c866d..3a43de2eb8d 100644
--- a/src/mongo/db/storage/wiredtiger/SConscript
+++ b/src/mongo/db/storage/wiredtiger/SConscript
@@ -95,7 +95,6 @@ if wiredtiger:
source=[
'wiredtiger_init.cpp',
'wiredtiger_options_init.cpp',
- 'wiredtiger_record_store_mongod.cpp',
'wiredtiger_server_status.cpp',
env.Idlc('wiredtiger_global_options.idl')[0],
],
@@ -116,15 +115,6 @@ if wiredtiger:
],
)
- wtEnv.Library(
- target='storage_wiredtiger_mock',
- source=[
- 'wiredtiger_record_store_mock.cpp',
- ],
- LIBDEPS=['storage_wiredtiger_core',
- ]
- )
-
wtEnv.CppUnitTest(
target='storage_wiredtiger_init_test',
source=['wiredtiger_init_test.cpp',
@@ -147,7 +137,7 @@ if wiredtiger:
'wiredtiger_recovery_unit_test.cpp',
],
LIBDEPS=[
- 'storage_wiredtiger_mock',
+ 'storage_wiredtiger_core',
'$BUILD_DIR/mongo/db/storage/recovery_unit_test_harness',
'$BUILD_DIR/mongo/util/clock_source_mock',
],
@@ -168,7 +158,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/storage/durable_catalog_impl',
'$BUILD_DIR/mongo/db/storage/record_store_test_harness',
'$BUILD_DIR/mongo/util/clock_source_mock',
- 'storage_wiredtiger_mock',
+ 'storage_wiredtiger_core',
],
)
@@ -180,7 +170,7 @@ if wiredtiger:
LIBDEPS=[
'$BUILD_DIR/mongo/db/storage/durable_catalog_impl',
'$BUILD_DIR/mongo/db/storage/sorted_data_interface_test_harness',
- 'storage_wiredtiger_mock',
+ 'storage_wiredtiger_core',
],
)
@@ -252,7 +242,7 @@ if wiredtiger:
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/storage/kv/kv_engine_test_harness',
- 'storage_wiredtiger_mock',
+ 'storage_wiredtiger_core',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
@@ -268,7 +258,7 @@ if wiredtiger:
LIBDEPS=[
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/storage/durable_catalog_impl',
- 'storage_wiredtiger_mock',
+ 'storage_wiredtiger_core',
],
)
@@ -277,7 +267,7 @@ if wiredtiger:
source=['wiredtiger_session_cache_test.cpp',
],
LIBDEPS=[
- 'storage_wiredtiger_mock',
+ 'storage_wiredtiger_core',
],
)
@@ -289,6 +279,6 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/storage/durable_catalog_impl',
'$BUILD_DIR/mongo/unittest/unittest',
'$BUILD_DIR/mongo/util/clock_source_mock',
- 'storage_wiredtiger_mock',
+ 'storage_wiredtiger_core',
],
)
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 72aa1d4dfcf..422f838ba5f 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -500,10 +500,6 @@ Status OpenReadTransactionParam::setFromString(const std::string& str) {
namespace {
-std::function<bool(StringData)> initRsOplogBackgroundThreadCallback = [](StringData) -> bool {
- fassertFailed(40358);
-};
-
StatusWith<std::vector<std::string>> getDataFilesFromBackupCursor(WT_CURSOR* cursor,
std::string dbPath,
const char* statusPrefix) {
@@ -1549,15 +1545,6 @@ void WiredTigerKVEngine::setJournalListener(JournalListener* jl) {
return _sessionCache->setJournalListener(jl);
}
-void WiredTigerKVEngine::setInitRsOplogBackgroundThreadCallback(
- std::function<bool(StringData)> cb) {
- initRsOplogBackgroundThreadCallback = std::move(cb);
-}
-
-bool WiredTigerKVEngine::initRsOplogBackgroundThread(StringData ns) {
- return initRsOplogBackgroundThreadCallback(ns);
-}
-
namespace {
MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely);
@@ -1907,6 +1894,10 @@ bool WiredTigerKVEngine::supportsReadConcernMajority() const {
return _keepDataHistory;
}
+bool WiredTigerKVEngine::supportsOplogStones() const {
+ return true;
+}
+
void WiredTigerKVEngine::startOplogManager(OperationContext* opCtx,
const std::string& uri,
WiredTigerRecordStore* oplogRecordStore) {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index ba86106efec..625cb5a565a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -236,6 +236,8 @@ public:
bool supportsReadConcernSnapshot() const final override;
+ bool supportsOplogStones() const final override;
+
/*
* This function is called when replication has completed a batch. In this function, we
* refresh our oplog visiblity read-at-timestamp value.
@@ -289,21 +291,6 @@ public:
return _oplogManager.get();
}
- /**
- * Sets the implementation for `initRsOplogBackgroundThread` (allowing tests to skip the
- * background job, for example). Intended to be called from a MONGO_INITIALIZER and therefore in
- * a single threaded context.
- */
- static void setInitRsOplogBackgroundThreadCallback(std::function<bool(StringData)> cb);
-
- /**
- * Initializes a background job to remove excess documents in the oplog collections.
- * This applies to the capped collections in the local.oplog.* namespaces (specifically
- * local.oplog.rs for replica sets).
- * Returns true if a background job is running for the namespace.
- */
- static bool initRsOplogBackgroundThread(StringData ns);
-
static void appendGlobalStats(BSONObjBuilder& b);
Timestamp getStableTimestamp() const override;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index d711686fc42..554f830c7f6 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -767,7 +767,8 @@ void WiredTigerRecordStore::postConstructorInit(OperationContext* opCtx) {
if (_sizeStorer)
_sizeStorer->store(_uri, _sizeInfo);
- if (WiredTigerKVEngine::initRsOplogBackgroundThread(ns())) {
+ if (NamespaceString::oplog(ns()) &&
+ !(storageGlobalParams.repair || storageGlobalParams.readOnly)) {
_oplogStones = std::make_shared<OplogStones>(opCtx, this);
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index 33365c96b36..d3d037ffcfc 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -241,7 +241,9 @@ public:
bool inShutdown() const;
- void reclaimOplog(OperationContext* opCtx);
+ bool yieldAndAwaitOplogDeletionRequest(OperationContext* opCtx) override;
+
+ void reclaimOplog(OperationContext* opCtx) override;
/**
* The `recoveryTimestamp` is when replication recovery would need to replay from for
@@ -250,9 +252,6 @@ public:
*/
void reclaimOplog(OperationContext* opCtx, Timestamp recoveryTimestamp);
- // Returns false if the oplog was dropped while waiting for a deletion request.
- bool yieldAndAwaitOplogDeletionRequest(OperationContext* opCtx);
-
bool haveCappedWaiters();
void notifyCappedWaitersIfNeeded();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp
deleted file mode 100644
index cd6207313c9..00000000000
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
-
-#include "mongo/platform/basic.h"
-
-#include <set>
-
-#include "mongo/base/checked_cast.h"
-#include "mongo/base/init.h"
-#include "mongo/db/catalog/collection.h"
-#include "mongo/db/catalog/database.h"
-#include "mongo/db/catalog/database_holder.h"
-#include "mongo/db/client.h"
-#include "mongo/db/concurrency/d_concurrency.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/service_context.h"
-#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
-#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
-#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/util/background.h"
-#include "mongo/util/exit.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
-namespace {
-
-std::set<NamespaceString> _backgroundThreadNamespaces;
-stdx::mutex _backgroundThreadMutex;
-
-class OplogTruncaterThread : public BackgroundJob {
-public:
- OplogTruncaterThread(const NamespaceString& ns)
- : BackgroundJob(true /* deleteSelf */), _ns(ns) {
- _name = std::string("WT-OplogTruncaterThread-") + _ns.toString();
- }
-
- virtual std::string name() const {
- return _name;
- }
-
- /**
- * Returns true iff there was an oplog to delete from.
- */
- bool _deleteExcessDocuments() {
- if (!getGlobalServiceContext()->getStorageEngine()) {
- LOG(2) << "no global storage engine yet";
- return false;
- }
-
- const ServiceContext::UniqueOperationContext opCtx = cc().makeOperationContext();
-
- try {
- // A Global IX lock should be good enough to protect the oplog truncation from
- // interruptions such as restartCatalog. PBWM, database lock or collection lock is not
- // needed. This improves concurrency if oplog truncation takes long time.
- ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(
- opCtx.get()->lockState());
- Lock::GlobalLock lk(opCtx.get(), MODE_IX);
-
- WiredTigerRecordStore* rs = nullptr;
- {
- // Release the database lock right away because we don't want to
- // block other operations on the local database and given the
- // fact that oplog collection is so special, Global IX lock can
- // make sure the collection exists.
- Lock::DBLock dbLock(opCtx.get(), _ns.db(), MODE_IX);
- auto databaseHolder = DatabaseHolder::get(opCtx.get());
- auto db = databaseHolder->getDb(opCtx.get(), _ns.db());
- if (!db) {
- LOG(2) << "no local database yet";
- return false;
- }
- // We need to hold the database lock while getting the collection. Otherwise a
- // concurrent collection creation would write to the map in the Database object
- // while we concurrently read the map.
- Collection* collection = db->getCollection(opCtx.get(), _ns);
- if (!collection) {
- LOG(2) << "no collection " << _ns;
- return false;
- }
- rs = checked_cast<WiredTigerRecordStore*>(collection->getRecordStore());
- }
-
- if (!rs->yieldAndAwaitOplogDeletionRequest(opCtx.get())) {
- return false; // Oplog went away.
- }
- rs->reclaimOplog(opCtx.get());
- } catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
- return false;
- } catch (const std::exception& e) {
- severe() << "error in OplogTruncaterThread: " << e.what();
- fassertFailedNoTrace(!"error in OplogTruncaterThread");
- } catch (...) {
- fassertFailedNoTrace(!"unknown error in OplogTruncaterThread");
- }
- return true;
- }
-
- virtual void run() {
- ThreadClient tc(_name, getGlobalServiceContext());
-
- while (!globalInShutdownDeprecated()) {
- if (!_deleteExcessDocuments()) {
- sleepmillis(1000); // Back off in case there were problems deleting.
- }
- }
- }
-
-private:
- NamespaceString _ns;
- std::string _name;
-};
-
-bool initRsOplogBackgroundThread(StringData ns) {
- if (!NamespaceString::oplog(ns)) {
- return false;
- }
-
- if (storageGlobalParams.repair || storageGlobalParams.readOnly) {
- LOG(1) << "not starting OplogTruncaterThread for " << ns
- << " because we are either in repair or read-only mode";
- return false;
- }
-
- stdx::lock_guard<stdx::mutex> lock(_backgroundThreadMutex);
- NamespaceString nss(ns);
- if (_backgroundThreadNamespaces.count(nss)) {
- log() << "OplogTruncaterThread " << ns << " already started";
- } else {
- log() << "Starting OplogTruncaterThread " << ns;
- BackgroundJob* backgroundThread = new OplogTruncaterThread(nss);
- backgroundThread->go();
- _backgroundThreadNamespaces.insert(nss);
- }
- return true;
-}
-
-MONGO_INITIALIZER(SetInitRsOplogBackgroundThreadCallback)(InitializerContext* context) {
- WiredTigerKVEngine::setInitRsOplogBackgroundThreadCallback(initRsOplogBackgroundThread);
- return Status::OK();
-}
-
-} // namespace
-} // namespace mongo