summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-03-09 13:18:40 -0500
committerBenety Goh <benety@mongodb.com>2017-03-23 17:51:21 -0400
commit35a6f1920bf38d7b7df6962598e56fa456451152 (patch)
tree8f6a0d108dec796005b617d5d5c37d5d4fdc209c
parentf27a5631a6cb48d3e4a2b2888ede44edf66960cd (diff)
downloadmongo-35a6f1920bf38d7b7df6962598e56fa456451152.tar.gz
SERVER-28204 added server parameter to select between 3.4 and 3.6 rollback implementations
This server parameter is set to use the 3.6 implementation as the default.
-rw-r--r--src/mongo/db/repl/SConscript3
-rw-r--r--src/mongo/db/repl/bgsync.cpp176
-rw-r--r--src/mongo/db/repl/bgsync.h16
3 files changed, 135 insertions, 60 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 149320004a6..08503eb92bd 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -10,9 +10,11 @@ env.Library(
'bgsync.cpp',
],
LIBDEPS=[
+ 'abstract_async_component',
'data_replicator_external_state_impl',
'oplog_interface_local',
'repl_coordinator_interface',
+ 'rollback_impl',
'rollback_source_impl',
'rs_rollback',
'storage_interface',
@@ -22,6 +24,7 @@ env.Library(
'$BUILD_DIR/mongo/db/auth/authorization_manager_global',
'$BUILD_DIR/mongo/db/commands/server_status_core',
'$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
+ '$BUILD_DIR/mongo/db/server_parameters',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/executor/network_interface_factory',
'$BUILD_DIR/mongo/executor/task_executor_interface',
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 79d7d294f63..0475e2fba9f 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/memory.h"
@@ -70,6 +71,9 @@ const int kSleepToAllowBatchingMillis = 2;
const int kSmallBatchLimitBytes = 40000;
const Milliseconds kRollbackOplogSocketTimeout(10 * 60 * 1000);
+// Set this to true to force rollbacks to use the 3.4 implementation.
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(use3dot4Rollback, bool, true);
+
/**
* Extends DataReplicatorExternalStateImpl to be member state aware.
*/
@@ -164,6 +168,10 @@ void BackgroundSync::shutdown(OperationContext* opCtx) {
_oplogFetcher->shutdown();
}
+ if (_rollback) {
+ _rollback->shutdown();
+ }
+
_inShutdown = true;
}
@@ -440,66 +448,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
// if it can't return a matching oplog start from the last fetch oplog ts field.
return;
} else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) {
- if (_replCoord->getMemberState().primary()) {
- // TODO: Abort catchup mode early if rollback detected.
- warning() << "Rollback situation detected in catch-up mode; catch-up mode will end.";
- sleepsecs(1);
- return;
- }
-
- // Rollback is a synchronous operation that uses the task executor and may not be
- // executed inside the fetcher callback.
- const int messagingPortTags = 0;
- ConnectionPool connectionPool(messagingPortTags);
- std::unique_ptr<ConnectionPool::ConnectionPtr> connection;
- auto getConnection = [&connection, &connectionPool, source]() -> DBClientBase* {
- if (!connection.get()) {
- connection.reset(new ConnectionPool::ConnectionPtr(
- &connectionPool, source, Date_t::now(), kRollbackOplogSocketTimeout));
- };
- return connection->get();
- };
-
- {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- lastOpTimeFetched = _lastOpTimeFetched;
- }
-
- log() << "Starting rollback due to " << redact(fetcherReturnStatus);
-
- // TODO: change this to call into the Applier directly to block until the applier is
- // drained.
- //
- // Wait till all buffered oplog entries have drained and been applied.
- auto lastApplied = _replCoord->getMyLastAppliedOpTime();
- if (lastApplied != lastOpTimeFetched) {
- log() << "Waiting for all operations from " << lastApplied << " until "
- << lastOpTimeFetched << " to be applied before starting rollback.";
- while (lastOpTimeFetched > (lastApplied = _replCoord->getMyLastAppliedOpTime())) {
- sleepmillis(10);
- if (getState() != ProducerState::Running) {
- return;
- }
- }
- }
-
- if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) {
- // This log output is used in js tests so please leave it.
- log() << "rollback - rollbackHangBeforeStart fail point "
- "enabled. Blocking until fail point is disabled.";
- while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) {
- mongo::sleepsecs(1);
- }
- }
-
- OplogInterfaceLocal localOplog(opCtx, rsOplogName);
- RollbackSourceImpl rollbackSource(getConnection, source, rsOplogName);
- rollback(
- opCtx, localOplog, rollbackSource, syncSourceResp.rbid, _replCoord, storageInterface);
-
- // Reset the producer to clear the sync source and the last optime fetched.
- stop(true);
- startProducerIfStopped();
+ _runRollback(opCtx, fetcherReturnStatus, source, syncSourceResp.rbid, storageInterface);
} else if (fetcherReturnStatus == ErrorCodes::InvalidBSON) {
Seconds blacklistDuration(60);
warning() << "Fetcher got invalid BSON while querying oplog. Blacklisting sync source "
@@ -590,6 +539,113 @@ void BackgroundSync::consume(OperationContext* opCtx) {
}
}
+void BackgroundSync::_runRollback(OperationContext* opCtx,
+ const Status& fetcherReturnStatus,
+ const HostAndPort& source,
+ int requiredRBID,
+ StorageInterface* storageInterface) {
+ if (_replCoord->getMemberState().primary()) {
+ // TODO: Abort catchup mode early if rollback detected.
+ warning() << "Rollback situation detected in catch-up mode; catch-up mode will end.";
+ sleepsecs(1);
+ return;
+ }
+
+ // Rollback is a synchronous operation that uses the task executor and may not be
+ // executed inside the fetcher callback.
+
+ OpTime lastOpTimeFetched;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ lastOpTimeFetched = _lastOpTimeFetched;
+ }
+
+ log() << "Starting rollback due to " << redact(fetcherReturnStatus);
+
+ // TODO: change this to call into the Applier directly to block until the applier is
+ // drained.
+ //
+ // Wait till all buffered oplog entries have drained and been applied.
+ auto lastApplied = _replCoord->getMyLastAppliedOpTime();
+ if (lastApplied != lastOpTimeFetched) {
+ log() << "Waiting for all operations from " << lastApplied << " until " << lastOpTimeFetched
+ << " to be applied before starting rollback.";
+ while (lastOpTimeFetched > (lastApplied = _replCoord->getMyLastAppliedOpTime())) {
+ sleepmillis(10);
+ if (getState() != ProducerState::Running) {
+ return;
+ }
+ }
+ }
+
+ if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) {
+ // This log output is used in js tests so please leave it.
+ log() << "rollback - rollbackHangBeforeStart fail point "
+ "enabled. Blocking until fail point is disabled.";
+ while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) {
+ mongo::sleepsecs(1);
+ }
+ }
+
+ OplogInterfaceLocal localOplog(opCtx, rsOplogName);
+ if (use3dot4Rollback) {
+ const int messagingPortTags = 0;
+ ConnectionPool connectionPool(messagingPortTags);
+ std::unique_ptr<ConnectionPool::ConnectionPtr> connection;
+ auto getConnection = [&connection, &connectionPool, source]() -> DBClientBase* {
+ if (!connection.get()) {
+ connection.reset(new ConnectionPool::ConnectionPtr(
+ &connectionPool, source, Date_t::now(), kRollbackOplogSocketTimeout));
+ };
+ return connection->get();
+ };
+
+ RollbackSourceImpl rollbackSource(getConnection, source, rsOplogName);
+ rollback(opCtx, localOplog, rollbackSource, requiredRBID, _replCoord, storageInterface);
+ } else {
+ AbstractAsyncComponent* rollback;
+ StatusWith<OpTime> onRollbackShutdownResult =
+ Status(ErrorCodes::InternalError, "Rollback failed but didn’t return an error message");
+ try {
+ auto executor = _replicationCoordinatorExternalState->getTaskExecutor();
+ auto onRollbackShutdownCallbackFn = [&onRollbackShutdownResult](
+ const StatusWith<OpTime>& lastApplied) noexcept {
+ onRollbackShutdownResult = lastApplied;
+ };
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _rollback = stdx::make_unique<RollbackImpl>(executor,
+ &localOplog,
+ source,
+ requiredRBID,
+ _replCoord,
+ storageInterface,
+ onRollbackShutdownCallbackFn);
+ rollback = _rollback.get();
+ } catch (...) {
+ fassertFailedWithStatus(40401, exceptionToStatus());
+ }
+
+ log() << "Scheduling rollback (sync source: " << source << ")";
+ auto scheduleStatus = rollback->startup();
+ if (!scheduleStatus.isOK()) {
+ warning() << "Unable to schedule rollback: " << scheduleStatus;
+ } else {
+ rollback->join();
+ if (!onRollbackShutdownResult.isOK()) {
+ warning() << "Rollback failed with error: " << onRollbackShutdownResult.getStatus();
+ } else {
+ log() << "Rollback successful. Last applied optime: "
+ << onRollbackShutdownResult.getValue();
+ }
+ }
+ }
+
+ // Reset the producer to clear the sync source and the last optime fetched.
+ stop(true);
+ startProducerIfStopped();
+}
+
HostAndPort BackgroundSync::getSyncTarget() const {
stdx::unique_lock<stdx::mutex> lock(_mutex);
return _syncSourceHost;
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index f0c50f870e9..77d9e6618be 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -37,6 +37,7 @@
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/rollback_impl.h"
#include "mongo/db/repl/sync_source_resolver.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
@@ -53,6 +54,7 @@ namespace repl {
class ReplicationCoordinator;
class ReplicationCoordinatorExternalState;
+class StorageInterface;
class BackgroundSync {
MONGO_DISALLOW_COPYING(BackgroundSync);
@@ -160,6 +162,15 @@ private:
Fetcher::Documents::const_iterator end,
const OplogFetcher::DocumentsInfo& info);
+ /**
+ * Executes a rollback.
+ */
+ void _runRollback(OperationContext* opCtx,
+ const Status& fetcherReturnStatus,
+ const HostAndPort& source,
+ int requiredRBID,
+ StorageInterface* storageInterface);
+
// restart syncing
void start(OperationContext* opCtx);
@@ -202,6 +213,11 @@ private:
// Current oplog fetcher tailing the oplog on the sync source.
std::unique_ptr<OplogFetcher> _oplogFetcher;
+
+ // Current rollback process. If this component is active, we are currently reverting local
+ // operations in the local oplog in order to bring this server to a consistent state relative
+ // to the sync source.
+ std::unique_ptr<RollbackImpl> _rollback;
};