diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2015-09-17 16:33:51 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2015-09-22 07:53:20 -0400 |
commit | 87a5a53a3b3e6d6d30657a846434811da8ecee6c (patch) | |
tree | c36dbedd6b3ef41222c68bf8c1319a68473f3b28 | |
parent | 578eba8c80615f525147d27e7f4317b000d37da7 (diff) | |
download | mongo-87a5a53a3b3e6d6d30657a846434811da8ecee6c.tar.gz |
SERVER-20161: move snapshot notification to capped collection notifier
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 131 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/snapshot_thread.cpp | 228 | ||||
-rw-r--r-- | src/mongo/db/repl/snapshot_thread.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/vote_requester.cpp | 2 |
6 files changed, 236 insertions, 134 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 84859814324..81abc18e718 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -642,6 +642,7 @@ serverOnlyFiles = [ "repl/resync.cpp", "repl/rs_initialsync.cpp", "repl/rs_sync.cpp", + "repl/snapshot_thread.cpp", "repl/sync_source_feedback.cpp", "service_context_d.cpp", "stats/fill_locker_info.cpp", diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 99e3dcf0d14..4ea0f3275db 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -105,8 +105,6 @@ std::string rsOplogName = "local.oplog.rs"; std::string masterSlaveOplogName = "local.oplog.$main"; int OPLOG_VERSION = 2; -MONGO_FP_DECLARE(disableSnapshotting); - namespace { // cached copies of these...so don't rename them, drop them, etc.!!! Database* _localDB = nullptr; @@ -156,8 +154,6 @@ std::pair<OpTime, long long> getNextOpTime(OperationContext* txn, stdx::lock_guard<stdx::mutex> lk(newOpMutex); Timestamp ts = getNextGlobalTimestamp(); - newTimestampNotifier.notify_all(); - fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts)); // Set hash if we're in replset mode, otherwise it remains 0 in master/slave. @@ -890,7 +886,6 @@ Status applyCommand_inlock(OperationContext* txn, const BSONObj& op) { void setNewTimestamp(const Timestamp& newTime) { stdx::lock_guard<stdx::mutex> lk(newOpMutex); setGlobalTimestamp(newTime); - newTimestampNotifier.notify_all(); } void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) { @@ -910,131 +905,5 @@ void oplogCheckCloseDatabase(OperationContext* txn, Database* db) { _localDB = nullptr; _localOplogCollection = nullptr; } - - -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replSnapshotThreadThrottleMicros, int, 1000); - -SnapshotThread::SnapshotThread(SnapshotManager* manager) - : _manager(manager), _thread([this] { run(); }) {} - -void SnapshotThread::run() { - Client::initThread("SnapshotThread"); - auto& client = cc(); - auto serviceContext = client.getServiceContext(); - auto replCoord = ReplicationCoordinator::get(serviceContext); - - Timestamp lastTimestamp = {}; - while (true) { - { - // This block logically belongs at the end of the loop, but having it at the top - // simplifies handling of the "continue" cases. It is harmless to do these before the - // first run of the loop. - _manager->cleanupUnneededSnapshots(); - sleepmicros(replSnapshotThreadThrottleMicros); // Throttle by sleeping. - } - - { - stdx::unique_lock<stdx::mutex> lock(newOpMutex); - while (true) { - if (_inShutdown) - return; - - if (_forcedSnapshotPending || lastTimestamp != getLastSetTimestamp()) { - _forcedSnapshotPending = false; - lastTimestamp = getLastSetTimestamp(); - break; - } - - newTimestampNotifier.wait(lock); - } - } - - while (MONGO_FAIL_POINT(disableSnapshotting)) { - sleepsecs(1); - stdx::unique_lock<stdx::mutex> lock(newOpMutex); - if (_inShutdown) { - return; - } - } - - try { - auto txn = client.makeOperationContext(); - Lock::GlobalLock globalLock(txn->lockState(), MODE_IS, UINT_MAX); - - if (!replCoord->getMemberState().readable()) { - // If our MemberState isn't readable, we may not be in a consistent state so don't - // take snapshots. When we transition into a readable state from a non-readable - // state, a snapshot is forced to ensure we don't miss the latest write. This must - // be checked each time we acquire the global IS lock since that prevents the node - // from transitioning to a !readable() state from a readable() one in the cases - // where we shouldn't be creating a snapshot. - continue; - } - - SnapshotName name(0); // assigned real value in block. - { - // Make sure there are no in-flight capped inserts while we create our snapshot. - Lock::ResourceLock cappedInsertLockForOtherDb( - txn->lockState(), resourceCappedInFlightForOtherDb, MODE_X); - Lock::ResourceLock cappedInsertLockForLocalDb( - txn->lockState(), resourceCappedInFlightForLocalDb, MODE_X); - - // Reserve the name immediately before we take our snapshot. This ensures that all - // names that compare lower must be from points in time visible to this named - // snapshot. - name = replCoord->reserveSnapshotName(nullptr); - - // This establishes the view that we will name. - _manager->prepareForCreateSnapshot(txn.get()); - } - - auto opTimeOfSnapshot = OpTime(); - { - AutoGetCollectionForRead oplog(txn.get(), rsOplogName); - invariant(oplog.getCollection()); - // Read the latest op from the oplog. - auto cursor = oplog.getCollection()->getCursor(txn.get(), /*forward*/ false); - auto record = cursor->next(); - if (!record) - continue; // oplog is completely empty. - - const auto op = record->data.releaseToBson(); - opTimeOfSnapshot = fassertStatusOK(28780, OpTime::parseFromOplogEntry(op)); - invariant(!opTimeOfSnapshot.isNull()); - } - - _manager->createSnapshot(txn.get(), name); - replCoord->onSnapshotCreate(opTimeOfSnapshot, name); - } catch (const WriteConflictException& wce) { - log() << "skipping storage snapshot pass due to write conflict"; - continue; - } - } -} - -void SnapshotThread::shutdown() { - invariant(_thread.joinable()); - { - stdx::lock_guard<stdx::mutex> lock(newOpMutex); - invariant(!_inShutdown); - _inShutdown = true; - newTimestampNotifier.notify_all(); - } - _thread.join(); -} - -void SnapshotThread::forceSnapshot() { - stdx::lock_guard<stdx::mutex> lock(newOpMutex); - _forcedSnapshotPending = true; - newTimestampNotifier.notify_all(); -} - -std::unique_ptr<SnapshotThread> SnapshotThread::start(ServiceContext* service) { - if (auto manager = service->getGlobalStorageEngine()->getSnapshotManager()) { - return std::unique_ptr<SnapshotThread>(new SnapshotThread(manager)); - } - return {}; -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 215e6d96e0a..2d120bda5f8 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -283,6 +283,7 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp& newTime) { setNewTimestamp(newTime); + forceSnapshotCreation(); } StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) { diff --git a/src/mongo/db/repl/snapshot_thread.cpp b/src/mongo/db/repl/snapshot_thread.cpp new file mode 100644 index 00000000000..0143752c7bd --- /dev/null +++ b/src/mongo/db/repl/snapshot_thread.cpp @@ -0,0 +1,228 @@ +/** +* Copyright (C) 2008-2015 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/db/repl/snapshot_thread.h" + +#include "mongo/platform/basic.h" + +#include "mongo/db/background.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/global_timestamp.h" +#include "mongo/db/operation_context_impl.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/server_parameters.h" +#include "mongo/db/service_context.h" +#include "mongo/util/elapsed_tracker.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +using std::endl; +using std::string; +using std::stringstream; + +namespace repl { +MONGO_FP_DECLARE(disableSnapshotting); +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replSnapshotThreadThrottleMicros, int, 1000); + +SnapshotThread::SnapshotThread(SnapshotManager* manager) + : _manager(manager), _thread([this] { run(); }) {} + +void SnapshotThread::run() { + Client::initThread("SnapshotThread"); + auto& client = cc(); + auto serviceContext = client.getServiceContext(); + auto replCoord = ReplicationCoordinator::get(serviceContext); + + Timestamp lastTimestamp = {}; + while (true) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_inShutdown) + return; + } + { + // This block logically belongs at the end of the loop, but having it at the top + // simplifies handling of the "continue" cases. It is harmless to do these before the + // first run of the loop. + _manager->cleanupUnneededSnapshots(); + sleepmicros(replSnapshotThreadThrottleMicros); // Throttle by sleeping. + } + + { + std::shared_ptr<CappedInsertNotifier> notifier; + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (!_notifier || _notifier->isDead()) { + lk.unlock(); + auto txn = client.makeOperationContext(); + AutoGetCollectionForRead oplog(txn.get(), rsOplogName); + + if (!oplog.getCollection()) { + sleepmillis(200); + continue; + } + + lk.lock(); + _notifier = notifier = oplog.getCollection()->getCappedInsertNotifier(); + invariant(notifier); + lk.unlock(); + } else { + notifier = _notifier; + lk.unlock(); + } + + while (true) { + auto currentTimestamp = getLastSetTimestamp(); + + stdx::unique_lock<stdx::mutex> lock(_mutex); + if (_inShutdown) + return; + + if (_forcedSnapshotPending || lastTimestamp != currentTimestamp) { + _forcedSnapshotPending = false; + lastTimestamp = currentTimestamp; + break; + } + + if (notifier->isDead()) { + notifier.reset(); + _notifier.reset(); + break; + } + + lock.unlock(); + notifier->waitForInsert(notifier->getCount()); + } + + // might need to re-acquire the notifier + if (!notifier) + continue; + } + + while (MONGO_FAIL_POINT(disableSnapshotting)) { + sleepsecs(1); + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_inShutdown) { + return; + } + } + + try { + auto txn = client.makeOperationContext(); + Lock::GlobalLock globalLock(txn->lockState(), MODE_IS, UINT_MAX); + + if (!replCoord->getMemberState().readable()) { + // If our MemberState isn't readable, we may not be in a consistent state so don't + // take snapshots. When we transition into a readable state from a non-readable + // state, a snapshot is forced to ensure we don't miss the latest write. This must + // be checked each time we acquire the global IS lock since that prevents the node + // from transitioning to a !readable() state from a readable() one in the cases + // where we shouldn't be creating a snapshot. + continue; + } + + SnapshotName name(0); // assigned real value in block. + { + // Make sure there are no in-flight capped inserts while we create our snapshot. + Lock::ResourceLock cappedInsertLockForOtherDb( + txn->lockState(), resourceCappedInFlightForOtherDb, MODE_X); + Lock::ResourceLock cappedInsertLockForLocalDb( + txn->lockState(), resourceCappedInFlightForLocalDb, MODE_X); + + // Reserve the name immediately before we take our snapshot. This ensures that all + // names that compare lower must be from points in time visible to this named + // snapshot. + name = replCoord->reserveSnapshotName(nullptr); + + // This establishes the view that we will name. + _manager->prepareForCreateSnapshot(txn.get()); + } + + auto opTimeOfSnapshot = OpTime(); + { + AutoGetCollectionForRead oplog(txn.get(), rsOplogName); + invariant(oplog.getCollection()); + // Read the latest op from the oplog. + auto cursor = oplog.getCollection()->getCursor(txn.get(), /*forward*/ false); + auto record = cursor->next(); + if (!record) + continue; // oplog is completely empty. + + const auto op = record->data.releaseToBson(); + opTimeOfSnapshot = fassertStatusOK(28780, OpTime::parseFromOplogEntry(op)); + invariant(!opTimeOfSnapshot.isNull()); + } + + _manager->createSnapshot(txn.get(), name); + replCoord->onSnapshotCreate(opTimeOfSnapshot, name); + } catch (const WriteConflictException& wce) { + log() << "skipping storage snapshot pass due to write conflict"; + continue; + } + } +} + +void SnapshotThread::shutdown() { + invariant(_thread.joinable()); + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(!_inShutdown); + _inShutdown = true; + if (_notifier) { + _notifier->notifyAll(); + } + } + _thread.join(); +} + +void SnapshotThread::forceSnapshot() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _forcedSnapshotPending = true; + if (_notifier) { + _notifier->notifyAll(); + } +} + +std::unique_ptr<SnapshotThread> SnapshotThread::start(ServiceContext* service) { + if (auto manager = service->getGlobalStorageEngine()->getSnapshotManager()) { + return std::unique_ptr<SnapshotThread>(new SnapshotThread(manager)); + } + return {}; +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/snapshot_thread.h b/src/mongo/db/repl/snapshot_thread.h index 81bf2212f52..1cbdf42e5fe 100644 --- a/src/mongo/db/repl/snapshot_thread.h +++ b/src/mongo/db/repl/snapshot_thread.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/db/catalog/collection.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/snapshot_manager.h" #include "mongo/stdx/functional.h" @@ -72,9 +73,11 @@ private: void run(); SnapshotManager* const _manager; - bool _inShutdown = false; // guarded by newOpMutex in oplog.cpp. - bool _forcedSnapshotPending = false; // guarded by newOpMutex in oplog.cpp. + bool _inShutdown = false; // guarded by _mutex. + bool _forcedSnapshotPending = false; // guarded by _mutex. + std::shared_ptr<CappedInsertNotifier> _notifier; stdx::thread _thread; + stdx::mutex _mutex; }; } // namespace repl diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp index 18f16baa2e8..39583153832 100644 --- a/src/mongo/db/repl/vote_requester.cpp +++ b/src/mongo/db/repl/vote_requester.cpp @@ -103,7 +103,7 @@ void VoteRequester::Algorithm::processResponse(const RemoteCommandRequest& reque _votes++; } else { log() << "VoteRequester: Got no vote from " << request.target - << " because: " << voteResponse.getReason(); + << " because: " << voteResponse.getReason() << ", resp:" << voteResponse.toBSON(); } if (voteResponse.getTerm() > _term) { |