diff options
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.h | 4 |
3 files changed, 29 insertions, 27 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index ef5fe93d4f4..58a83b41cb6 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -151,22 +151,8 @@ bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationCont void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFinishedFn finished) { _initialSyncThread.reset(new stdx::thread{[finished, this]() { Client::initThreadIfNotAlready("initial sync"); - - // "_bgSync" should not be initialized before this. - invariant(!_bgSync); - { - auto txn = cc().makeOperationContext(); - invariant(txn); - invariant(txn->getClient()); - log() << "Starting replication fetcher thread"; - - // Start bgsync. - _bgSync.reset(new BackgroundSync(this, makeSteadyStateOplogBuffer(txn.get()))); - _bgSync->startup(txn.get()); - } - // Do initial sync. - syncDoInitialSync(_bgSync.get()); + syncDoInitialSync(this); finished(); }}); } @@ -183,11 +169,11 @@ void ReplicationCoordinatorExternalStateImpl::runOnInitialSyncThread( } void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(OperationContext* txn) { - if (!_bgSync) { - log() << "Starting replication fetcher thread"; - _bgSync.reset(new BackgroundSync(this, makeSteadyStateOplogBuffer(txn))); - _bgSync->startup(txn); - } + invariant(!_bgSync); + log() << "Starting replication fetcher thread"; + _bgSync = stdx::make_unique<BackgroundSync>(this, makeSteadyStateOplogBuffer(txn)); + _bgSync->startup(txn); + log() << "Starting replication applier threads"; invariant(!_applierThread); _applierThread.reset(new stdx::thread(stdx::bind(&runSyncThread, _bgSync.get()))); diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 4a3f3e721a1..387a7f9a391 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -32,6 +32,8 @@ #include "mongo/db/repl/rs_initialsync.h" +#include <memory> + #include "mongo/bson/timestamp.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/db/auth/authorization_manager.h" @@ -50,14 +52,17 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/service_context.h" +#include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/socket_exception.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace repl { @@ -513,23 +518,34 @@ Status checkAdminDatabase(OperationContext* txn, Database* adminDb) { return Status::OK(); } -void syncDoInitialSync(BackgroundSync* bgsync) { +void syncDoInitialSync(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) { stdx::unique_lock<stdx::mutex> lk(_initialSyncMutex, stdx::defer_lock); if (!lk.try_lock()) { uasserted(34474, "Initial Sync Already Active."); } + std::unique_ptr<BackgroundSync> bgsync; { - const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); - OperationContext& txn = *txnPtr; - createOplog(&txn); + log() << "Starting replication fetcher thread for initial sync"; + auto txn = cc().makeOperationContext(); + bgsync = stdx::make_unique<BackgroundSync>( + replicationCoordinatorExternalState, + replicationCoordinatorExternalState->makeInitialSyncOplogBuffer(txn.get())); + bgsync->startup(txn.get()); + createOplog(txn.get()); } + ON_BLOCK_EXIT([&bgsync]() { + log() << "Stopping replication fetcher thread for initial sync"; + auto txn = cc().makeOperationContext(); + bgsync->shutdown(txn.get()); + bgsync->join(txn.get()); + }); int failedAttempts = 0; while (failedAttempts < kMaxFailedAttempts) { try { // leave loop when successful - Status status = _initialSync(bgsync); + Status status = _initialSync(bgsync.get()); if (status.isOK()) { break; } else { diff --git a/src/mongo/db/repl/rs_initialsync.h b/src/mongo/db/repl/rs_initialsync.h index 4f8cd5e5017..204f492ef0d 100644 --- a/src/mongo/db/repl/rs_initialsync.h +++ b/src/mongo/db/repl/rs_initialsync.h @@ -35,13 +35,13 @@ class OperationContext; class Database; namespace repl { -class BackgroundSync; +class ReplicationCoordinatorExternalState; /** * Begins an initial sync of a node. This drops all data, chooses a sync source, * and runs the cloner from that sync source. The node's state is not changed. */ -void syncDoInitialSync(BackgroundSync* bgsync); +void syncDoInitialSync(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState); /** * Checks that the "admin" database contains a supported version of the auth data schema. |