diff options
author | Benety Goh <benety@mongodb.com> | 2017-03-09 13:18:40 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2017-03-23 17:51:21 -0400 |
commit | 35a6f1920bf38d7b7df6962598e56fa456451152 (patch) | |
tree | 8f6a0d108dec796005b617d5d5c37d5d4fdc209c | |
parent | f27a5631a6cb48d3e4a2b2888ede44edf66960cd (diff) | |
download | mongo-35a6f1920bf38d7b7df6962598e56fa456451152.tar.gz |
SERVER-28204 added server parameter to select between 3.4 and 3.6 rollback implementations
This server parameter is set to use the 3.6 implementation as the default.
-rw-r--r-- | src/mongo/db/repl/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 176 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 16 |
3 files changed, 135 insertions, 60 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 149320004a6..08503eb92bd 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -10,9 +10,11 @@ env.Library( 'bgsync.cpp', ], LIBDEPS=[ + 'abstract_async_component', 'data_replicator_external_state_impl', 'oplog_interface_local', 'repl_coordinator_interface', + 'rollback_impl', 'rollback_source_impl', 'rs_rollback', 'storage_interface', @@ -22,6 +24,7 @@ env.Library( '$BUILD_DIR/mongo/db/auth/authorization_manager_global', '$BUILD_DIR/mongo/db/commands/server_status_core', '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', + '$BUILD_DIR/mongo/db/server_parameters', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/executor/network_interface_factory', '$BUILD_DIR/mongo/executor/task_executor_interface', diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 79d7d294f63..0475e2fba9f 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -50,6 +50,7 @@ #include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/s/shard_identity_rollback_notifier.h" +#include "mongo/db/server_parameters.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/memory.h" @@ -70,6 +71,9 @@ const int kSleepToAllowBatchingMillis = 2; const int kSmallBatchLimitBytes = 40000; const Milliseconds kRollbackOplogSocketTimeout(10 * 60 * 1000); +// Set this to true to force rollbacks to use the 3.4 implementation. +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(use3dot4Rollback, bool, true); + /** * Extends DataReplicatorExternalStateImpl to be member state aware. */ @@ -164,6 +168,10 @@ void BackgroundSync::shutdown(OperationContext* opCtx) { _oplogFetcher->shutdown(); } + if (_rollback) { + _rollback->shutdown(); + } + _inShutdown = true; } @@ -440,66 +448,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) { // if it can't return a matching oplog start from the last fetch oplog ts field. return; } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) { - if (_replCoord->getMemberState().primary()) { - // TODO: Abort catchup mode early if rollback detected. - warning() << "Rollback situation detected in catch-up mode; catch-up mode will end."; - sleepsecs(1); - return; - } - - // Rollback is a synchronous operation that uses the task executor and may not be - // executed inside the fetcher callback. - const int messagingPortTags = 0; - ConnectionPool connectionPool(messagingPortTags); - std::unique_ptr<ConnectionPool::ConnectionPtr> connection; - auto getConnection = [&connection, &connectionPool, source]() -> DBClientBase* { - if (!connection.get()) { - connection.reset(new ConnectionPool::ConnectionPtr( - &connectionPool, source, Date_t::now(), kRollbackOplogSocketTimeout)); - }; - return connection->get(); - }; - - { - stdx::lock_guard<stdx::mutex> lock(_mutex); - lastOpTimeFetched = _lastOpTimeFetched; - } - - log() << "Starting rollback due to " << redact(fetcherReturnStatus); - - // TODO: change this to call into the Applier directly to block until the applier is - // drained. - // - // Wait till all buffered oplog entries have drained and been applied. - auto lastApplied = _replCoord->getMyLastAppliedOpTime(); - if (lastApplied != lastOpTimeFetched) { - log() << "Waiting for all operations from " << lastApplied << " until " - << lastOpTimeFetched << " to be applied before starting rollback."; - while (lastOpTimeFetched > (lastApplied = _replCoord->getMyLastAppliedOpTime())) { - sleepmillis(10); - if (getState() != ProducerState::Running) { - return; - } - } - } - - if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) { - // This log output is used in js tests so please leave it. - log() << "rollback - rollbackHangBeforeStart fail point " - "enabled. Blocking until fail point is disabled."; - while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) { - mongo::sleepsecs(1); - } - } - - OplogInterfaceLocal localOplog(opCtx, rsOplogName); - RollbackSourceImpl rollbackSource(getConnection, source, rsOplogName); - rollback( - opCtx, localOplog, rollbackSource, syncSourceResp.rbid, _replCoord, storageInterface); - - // Reset the producer to clear the sync source and the last optime fetched. - stop(true); - startProducerIfStopped(); + _runRollback(opCtx, fetcherReturnStatus, source, syncSourceResp.rbid, storageInterface); } else if (fetcherReturnStatus == ErrorCodes::InvalidBSON) { Seconds blacklistDuration(60); warning() << "Fetcher got invalid BSON while querying oplog. Blacklisting sync source " @@ -590,6 +539,113 @@ void BackgroundSync::consume(OperationContext* opCtx) { } } +void BackgroundSync::_runRollback(OperationContext* opCtx, + const Status& fetcherReturnStatus, + const HostAndPort& source, + int requiredRBID, + StorageInterface* storageInterface) { + if (_replCoord->getMemberState().primary()) { + // TODO: Abort catchup mode early if rollback detected. + warning() << "Rollback situation detected in catch-up mode; catch-up mode will end."; + sleepsecs(1); + return; + } + + // Rollback is a synchronous operation that uses the task executor and may not be + // executed inside the fetcher callback. + + OpTime lastOpTimeFetched; + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + lastOpTimeFetched = _lastOpTimeFetched; + } + + log() << "Starting rollback due to " << redact(fetcherReturnStatus); + + // TODO: change this to call into the Applier directly to block until the applier is + // drained. + // + // Wait till all buffered oplog entries have drained and been applied. + auto lastApplied = _replCoord->getMyLastAppliedOpTime(); + if (lastApplied != lastOpTimeFetched) { + log() << "Waiting for all operations from " << lastApplied << " until " << lastOpTimeFetched + << " to be applied before starting rollback."; + while (lastOpTimeFetched > (lastApplied = _replCoord->getMyLastAppliedOpTime())) { + sleepmillis(10); + if (getState() != ProducerState::Running) { + return; + } + } + } + + if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) { + // This log output is used in js tests so please leave it. + log() << "rollback - rollbackHangBeforeStart fail point " + "enabled. Blocking until fail point is disabled."; + while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) { + mongo::sleepsecs(1); + } + } + + OplogInterfaceLocal localOplog(opCtx, rsOplogName); + if (use3dot4Rollback) { + const int messagingPortTags = 0; + ConnectionPool connectionPool(messagingPortTags); + std::unique_ptr<ConnectionPool::ConnectionPtr> connection; + auto getConnection = [&connection, &connectionPool, source]() -> DBClientBase* { + if (!connection.get()) { + connection.reset(new ConnectionPool::ConnectionPtr( + &connectionPool, source, Date_t::now(), kRollbackOplogSocketTimeout)); + }; + return connection->get(); + }; + + RollbackSourceImpl rollbackSource(getConnection, source, rsOplogName); + rollback(opCtx, localOplog, rollbackSource, requiredRBID, _replCoord, storageInterface); + } else { + AbstractAsyncComponent* rollback; + StatusWith<OpTime> onRollbackShutdownResult = + Status(ErrorCodes::InternalError, "Rollback failed but didn’t return an error message"); + try { + auto executor = _replicationCoordinatorExternalState->getTaskExecutor(); + auto onRollbackShutdownCallbackFn = [&onRollbackShutdownResult]( + const StatusWith<OpTime>& lastApplied) noexcept { + onRollbackShutdownResult = lastApplied; + }; + + stdx::lock_guard<stdx::mutex> lock(_mutex); + _rollback = stdx::make_unique<RollbackImpl>(executor, + &localOplog, + source, + requiredRBID, + _replCoord, + storageInterface, + onRollbackShutdownCallbackFn); + rollback = _rollback.get(); + } catch (...) { + fassertFailedWithStatus(40401, exceptionToStatus()); + } + + log() << "Scheduling rollback (sync source: " << source << ")"; + auto scheduleStatus = rollback->startup(); + if (!scheduleStatus.isOK()) { + warning() << "Unable to schedule rollback: " << scheduleStatus; + } else { + rollback->join(); + if (!onRollbackShutdownResult.isOK()) { + warning() << "Rollback failed with error: " << onRollbackShutdownResult.getStatus(); + } else { + log() << "Rollback successful. Last applied optime: " + << onRollbackShutdownResult.getValue(); + } + } + } + + // Reset the producer to clear the sync source and the last optime fetched. + stop(true); + startProducerIfStopped(); +} + HostAndPort BackgroundSync::getSyncTarget() const { stdx::unique_lock<stdx::mutex> lock(_mutex); return _syncSourceHost; diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index f0c50f870e9..77d9e6618be 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -37,6 +37,7 @@ #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/rollback_impl.h" #include "mongo/db/repl/sync_source_resolver.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" @@ -53,6 +54,7 @@ namespace repl { class ReplicationCoordinator; class ReplicationCoordinatorExternalState; +class StorageInterface; class BackgroundSync { MONGO_DISALLOW_COPYING(BackgroundSync); @@ -160,6 +162,15 @@ private: Fetcher::Documents::const_iterator end, const OplogFetcher::DocumentsInfo& info); + /** + * Executes a rollback. + */ + void _runRollback(OperationContext* opCtx, + const Status& fetcherReturnStatus, + const HostAndPort& source, + int requiredRBID, + StorageInterface* storageInterface); + // restart syncing void start(OperationContext* opCtx); @@ -202,6 +213,11 @@ private: // Current oplog fetcher tailing the oplog on the sync source. std::unique_ptr<OplogFetcher> _oplogFetcher; + + // Current rollback process. If this component is active, we are currently reverting local + // operations in the local oplog in order to bring this server to a consistent state relative + // to the sync source. + std::unique_ptr<RollbackImpl> _rollback; }; |