summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2015-09-17 16:33:51 -0400
committerScott Hernandez <scotthernandez@gmail.com>2015-09-22 07:53:20 -0400
commit87a5a53a3b3e6d6d30657a846434811da8ecee6c (patch)
treec36dbedd6b3ef41222c68bf8c1319a68473f3b28
parent578eba8c80615f525147d27e7f4317b000d37da7 (diff)
downloadmongo-87a5a53a3b3e6d6d30657a846434811da8ecee6c.tar.gz
SERVER-20161: move snapshot notification to capped collection notifier
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/repl/oplog.cpp131
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp1
-rw-r--r--src/mongo/db/repl/snapshot_thread.cpp228
-rw-r--r--src/mongo/db/repl/snapshot_thread.h7
-rw-r--r--src/mongo/db/repl/vote_requester.cpp2
6 files changed, 236 insertions, 134 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 84859814324..81abc18e718 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -642,6 +642,7 @@ serverOnlyFiles = [
"repl/resync.cpp",
"repl/rs_initialsync.cpp",
"repl/rs_sync.cpp",
+ "repl/snapshot_thread.cpp",
"repl/sync_source_feedback.cpp",
"service_context_d.cpp",
"stats/fill_locker_info.cpp",
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 99e3dcf0d14..4ea0f3275db 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -105,8 +105,6 @@ std::string rsOplogName = "local.oplog.rs";
std::string masterSlaveOplogName = "local.oplog.$main";
int OPLOG_VERSION = 2;
-MONGO_FP_DECLARE(disableSnapshotting);
-
namespace {
// cached copies of these...so don't rename them, drop them, etc.!!!
Database* _localDB = nullptr;
@@ -156,8 +154,6 @@ std::pair<OpTime, long long> getNextOpTime(OperationContext* txn,
stdx::lock_guard<stdx::mutex> lk(newOpMutex);
Timestamp ts = getNextGlobalTimestamp();
- newTimestampNotifier.notify_all();
-
fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts));
// Set hash if we're in replset mode, otherwise it remains 0 in master/slave.
@@ -890,7 +886,6 @@ Status applyCommand_inlock(OperationContext* txn, const BSONObj& op) {
void setNewTimestamp(const Timestamp& newTime) {
stdx::lock_guard<stdx::mutex> lk(newOpMutex);
setGlobalTimestamp(newTime);
- newTimestampNotifier.notify_all();
}
void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) {
@@ -910,131 +905,5 @@ void oplogCheckCloseDatabase(OperationContext* txn, Database* db) {
_localDB = nullptr;
_localOplogCollection = nullptr;
}
-
-
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replSnapshotThreadThrottleMicros, int, 1000);
-
-SnapshotThread::SnapshotThread(SnapshotManager* manager)
- : _manager(manager), _thread([this] { run(); }) {}
-
-void SnapshotThread::run() {
- Client::initThread("SnapshotThread");
- auto& client = cc();
- auto serviceContext = client.getServiceContext();
- auto replCoord = ReplicationCoordinator::get(serviceContext);
-
- Timestamp lastTimestamp = {};
- while (true) {
- {
- // This block logically belongs at the end of the loop, but having it at the top
- // simplifies handling of the "continue" cases. It is harmless to do these before the
- // first run of the loop.
- _manager->cleanupUnneededSnapshots();
- sleepmicros(replSnapshotThreadThrottleMicros); // Throttle by sleeping.
- }
-
- {
- stdx::unique_lock<stdx::mutex> lock(newOpMutex);
- while (true) {
- if (_inShutdown)
- return;
-
- if (_forcedSnapshotPending || lastTimestamp != getLastSetTimestamp()) {
- _forcedSnapshotPending = false;
- lastTimestamp = getLastSetTimestamp();
- break;
- }
-
- newTimestampNotifier.wait(lock);
- }
- }
-
- while (MONGO_FAIL_POINT(disableSnapshotting)) {
- sleepsecs(1);
- stdx::unique_lock<stdx::mutex> lock(newOpMutex);
- if (_inShutdown) {
- return;
- }
- }
-
- try {
- auto txn = client.makeOperationContext();
- Lock::GlobalLock globalLock(txn->lockState(), MODE_IS, UINT_MAX);
-
- if (!replCoord->getMemberState().readable()) {
- // If our MemberState isn't readable, we may not be in a consistent state so don't
- // take snapshots. When we transition into a readable state from a non-readable
- // state, a snapshot is forced to ensure we don't miss the latest write. This must
- // be checked each time we acquire the global IS lock since that prevents the node
- // from transitioning to a !readable() state from a readable() one in the cases
- // where we shouldn't be creating a snapshot.
- continue;
- }
-
- SnapshotName name(0); // assigned real value in block.
- {
- // Make sure there are no in-flight capped inserts while we create our snapshot.
- Lock::ResourceLock cappedInsertLockForOtherDb(
- txn->lockState(), resourceCappedInFlightForOtherDb, MODE_X);
- Lock::ResourceLock cappedInsertLockForLocalDb(
- txn->lockState(), resourceCappedInFlightForLocalDb, MODE_X);
-
- // Reserve the name immediately before we take our snapshot. This ensures that all
- // names that compare lower must be from points in time visible to this named
- // snapshot.
- name = replCoord->reserveSnapshotName(nullptr);
-
- // This establishes the view that we will name.
- _manager->prepareForCreateSnapshot(txn.get());
- }
-
- auto opTimeOfSnapshot = OpTime();
- {
- AutoGetCollectionForRead oplog(txn.get(), rsOplogName);
- invariant(oplog.getCollection());
- // Read the latest op from the oplog.
- auto cursor = oplog.getCollection()->getCursor(txn.get(), /*forward*/ false);
- auto record = cursor->next();
- if (!record)
- continue; // oplog is completely empty.
-
- const auto op = record->data.releaseToBson();
- opTimeOfSnapshot = fassertStatusOK(28780, OpTime::parseFromOplogEntry(op));
- invariant(!opTimeOfSnapshot.isNull());
- }
-
- _manager->createSnapshot(txn.get(), name);
- replCoord->onSnapshotCreate(opTimeOfSnapshot, name);
- } catch (const WriteConflictException& wce) {
- log() << "skipping storage snapshot pass due to write conflict";
- continue;
- }
- }
-}
-
-void SnapshotThread::shutdown() {
- invariant(_thread.joinable());
- {
- stdx::lock_guard<stdx::mutex> lock(newOpMutex);
- invariant(!_inShutdown);
- _inShutdown = true;
- newTimestampNotifier.notify_all();
- }
- _thread.join();
-}
-
-void SnapshotThread::forceSnapshot() {
- stdx::lock_guard<stdx::mutex> lock(newOpMutex);
- _forcedSnapshotPending = true;
- newTimestampNotifier.notify_all();
-}
-
-std::unique_ptr<SnapshotThread> SnapshotThread::start(ServiceContext* service) {
- if (auto manager = service->getGlobalStorageEngine()->getSnapshotManager()) {
- return std::unique_ptr<SnapshotThread>(new SnapshotThread(manager));
- }
- return {};
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 215e6d96e0a..2d120bda5f8 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -283,6 +283,7 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument(
void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp& newTime) {
setNewTimestamp(newTime);
+ forceSnapshotCreation();
}
StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) {
diff --git a/src/mongo/db/repl/snapshot_thread.cpp b/src/mongo/db/repl/snapshot_thread.cpp
new file mode 100644
index 00000000000..0143752c7bd
--- /dev/null
+++ b/src/mongo/db/repl/snapshot_thread.cpp
@@ -0,0 +1,228 @@
+/**
+* Copyright (C) 2008-2015 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/db/repl/snapshot_thread.h"
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/background.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/global_timestamp.h"
+#include "mongo/db/operation_context_impl.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/server_parameters.h"
+#include "mongo/db/service_context.h"
+#include "mongo/util/elapsed_tracker.h"
+#include "mongo/util/fail_point_service.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+using std::endl;
+using std::string;
+using std::stringstream;
+
+namespace repl {
+MONGO_FP_DECLARE(disableSnapshotting);
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replSnapshotThreadThrottleMicros, int, 1000);
+
+SnapshotThread::SnapshotThread(SnapshotManager* manager)
+ : _manager(manager), _thread([this] { run(); }) {}
+
+void SnapshotThread::run() {
+ Client::initThread("SnapshotThread");
+ auto& client = cc();
+ auto serviceContext = client.getServiceContext();
+ auto replCoord = ReplicationCoordinator::get(serviceContext);
+
+ Timestamp lastTimestamp = {};
+ while (true) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (_inShutdown)
+ return;
+ }
+ {
+ // This block logically belongs at the end of the loop, but having it at the top
+ // simplifies handling of the "continue" cases. It is harmless to do these before the
+ // first run of the loop.
+ _manager->cleanupUnneededSnapshots();
+ sleepmicros(replSnapshotThreadThrottleMicros); // Throttle by sleeping.
+ }
+
+ {
+ std::shared_ptr<CappedInsertNotifier> notifier;
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (!_notifier || _notifier->isDead()) {
+ lk.unlock();
+ auto txn = client.makeOperationContext();
+ AutoGetCollectionForRead oplog(txn.get(), rsOplogName);
+
+ if (!oplog.getCollection()) {
+ sleepmillis(200);
+ continue;
+ }
+
+ lk.lock();
+ _notifier = notifier = oplog.getCollection()->getCappedInsertNotifier();
+ invariant(notifier);
+ lk.unlock();
+ } else {
+ notifier = _notifier;
+ lk.unlock();
+ }
+
+ while (true) {
+ auto currentTimestamp = getLastSetTimestamp();
+
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ if (_inShutdown)
+ return;
+
+ if (_forcedSnapshotPending || lastTimestamp != currentTimestamp) {
+ _forcedSnapshotPending = false;
+ lastTimestamp = currentTimestamp;
+ break;
+ }
+
+ if (notifier->isDead()) {
+ notifier.reset();
+ _notifier.reset();
+ break;
+ }
+
+ lock.unlock();
+ notifier->waitForInsert(notifier->getCount());
+ }
+
+ // might need to re-acquire the notifier
+ if (!notifier)
+ continue;
+ }
+
+ while (MONGO_FAIL_POINT(disableSnapshotting)) {
+ sleepsecs(1);
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (_inShutdown) {
+ return;
+ }
+ }
+
+ try {
+ auto txn = client.makeOperationContext();
+ Lock::GlobalLock globalLock(txn->lockState(), MODE_IS, UINT_MAX);
+
+ if (!replCoord->getMemberState().readable()) {
+ // If our MemberState isn't readable, we may not be in a consistent state so don't
+ // take snapshots. When we transition into a readable state from a non-readable
+ // state, a snapshot is forced to ensure we don't miss the latest write. This must
+ // be checked each time we acquire the global IS lock since that prevents the node
+ // from transitioning to a !readable() state from a readable() one in the cases
+ // where we shouldn't be creating a snapshot.
+ continue;
+ }
+
+ SnapshotName name(0); // assigned real value in block.
+ {
+ // Make sure there are no in-flight capped inserts while we create our snapshot.
+ Lock::ResourceLock cappedInsertLockForOtherDb(
+ txn->lockState(), resourceCappedInFlightForOtherDb, MODE_X);
+ Lock::ResourceLock cappedInsertLockForLocalDb(
+ txn->lockState(), resourceCappedInFlightForLocalDb, MODE_X);
+
+ // Reserve the name immediately before we take our snapshot. This ensures that all
+ // names that compare lower must be from points in time visible to this named
+ // snapshot.
+ name = replCoord->reserveSnapshotName(nullptr);
+
+ // This establishes the view that we will name.
+ _manager->prepareForCreateSnapshot(txn.get());
+ }
+
+ auto opTimeOfSnapshot = OpTime();
+ {
+ AutoGetCollectionForRead oplog(txn.get(), rsOplogName);
+ invariant(oplog.getCollection());
+ // Read the latest op from the oplog.
+ auto cursor = oplog.getCollection()->getCursor(txn.get(), /*forward*/ false);
+ auto record = cursor->next();
+ if (!record)
+ continue; // oplog is completely empty.
+
+ const auto op = record->data.releaseToBson();
+ opTimeOfSnapshot = fassertStatusOK(28780, OpTime::parseFromOplogEntry(op));
+ invariant(!opTimeOfSnapshot.isNull());
+ }
+
+ _manager->createSnapshot(txn.get(), name);
+ replCoord->onSnapshotCreate(opTimeOfSnapshot, name);
+ } catch (const WriteConflictException& wce) {
+ log() << "skipping storage snapshot pass due to write conflict";
+ continue;
+ }
+ }
+}
+
+void SnapshotThread::shutdown() {
+ invariant(_thread.joinable());
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(!_inShutdown);
+ _inShutdown = true;
+ if (_notifier) {
+ _notifier->notifyAll();
+ }
+ }
+ _thread.join();
+}
+
+void SnapshotThread::forceSnapshot() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _forcedSnapshotPending = true;
+ if (_notifier) {
+ _notifier->notifyAll();
+ }
+}
+
+std::unique_ptr<SnapshotThread> SnapshotThread::start(ServiceContext* service) {
+ if (auto manager = service->getGlobalStorageEngine()->getSnapshotManager()) {
+ return std::unique_ptr<SnapshotThread>(new SnapshotThread(manager));
+ }
+ return {};
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/snapshot_thread.h b/src/mongo/db/repl/snapshot_thread.h
index 81bf2212f52..1cbdf42e5fe 100644
--- a/src/mongo/db/repl/snapshot_thread.h
+++ b/src/mongo/db/repl/snapshot_thread.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/catalog/collection.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/snapshot_manager.h"
#include "mongo/stdx/functional.h"
@@ -72,9 +73,11 @@ private:
void run();
SnapshotManager* const _manager;
- bool _inShutdown = false; // guarded by newOpMutex in oplog.cpp.
- bool _forcedSnapshotPending = false; // guarded by newOpMutex in oplog.cpp.
+ bool _inShutdown = false; // guarded by _mutex.
+ bool _forcedSnapshotPending = false; // guarded by _mutex.
+ std::shared_ptr<CappedInsertNotifier> _notifier;
stdx::thread _thread;
+ stdx::mutex _mutex;
};
} // namespace repl
diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp
index 18f16baa2e8..39583153832 100644
--- a/src/mongo/db/repl/vote_requester.cpp
+++ b/src/mongo/db/repl/vote_requester.cpp
@@ -103,7 +103,7 @@ void VoteRequester::Algorithm::processResponse(const RemoteCommandRequest& reque
_votes++;
} else {
log() << "VoteRequester: Got no vote from " << request.target
- << " because: " << voteResponse.getReason();
+ << " because: " << voteResponse.getReason() << ", resp:" << voteResponse.toBSON();
}
if (voteResponse.getTerm() > _term) {