summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-06-24 15:06:21 -0400
committerBenety Goh <benety@mongodb.com>2016-06-27 17:02:35 -0400
commit7fb4f0c57126bab6a0d2ea01d7e76926f64ca1e3 (patch)
tree463a5d96c5742c189f196c877125512da24c152a
parent00e103fc3a5264c098536f303b6bbda2bf5c868e (diff)
downloadmongo-7fb4f0c57126bab6a0d2ea01d7e76926f64ca1e3.tar.gz
SERVER-24784 syncDoInitialSync maintains its own instance of BackgroundSync
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp26
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp26
-rw-r--r--src/mongo/db/repl/rs_initialsync.h4
3 files changed, 29 insertions, 27 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index ef5fe93d4f4..58a83b41cb6 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -151,22 +151,8 @@ bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationCont
void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFinishedFn finished) {
_initialSyncThread.reset(new stdx::thread{[finished, this]() {
Client::initThreadIfNotAlready("initial sync");
-
- // "_bgSync" should not be initialized before this.
- invariant(!_bgSync);
- {
- auto txn = cc().makeOperationContext();
- invariant(txn);
- invariant(txn->getClient());
- log() << "Starting replication fetcher thread";
-
- // Start bgsync.
- _bgSync.reset(new BackgroundSync(this, makeSteadyStateOplogBuffer(txn.get())));
- _bgSync->startup(txn.get());
- }
-
// Do initial sync.
- syncDoInitialSync(_bgSync.get());
+ syncDoInitialSync(this);
finished();
}});
}
@@ -183,11 +169,11 @@ void ReplicationCoordinatorExternalStateImpl::runOnInitialSyncThread(
}
void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(OperationContext* txn) {
- if (!_bgSync) {
- log() << "Starting replication fetcher thread";
- _bgSync.reset(new BackgroundSync(this, makeSteadyStateOplogBuffer(txn)));
- _bgSync->startup(txn);
- }
+ invariant(!_bgSync);
+ log() << "Starting replication fetcher thread";
+ _bgSync = stdx::make_unique<BackgroundSync>(this, makeSteadyStateOplogBuffer(txn));
+ _bgSync->startup(txn);
+
log() << "Starting replication applier threads";
invariant(!_applierThread);
_applierThread.reset(new stdx::thread(stdx::bind(&runSyncThread, _bgSync.get())));
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 4a3f3e721a1..387a7f9a391 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -32,6 +32,8 @@
#include "mongo/db/repl/rs_initialsync.h"
+#include <memory>
+
#include "mongo/bson/timestamp.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/auth/authorization_manager.h"
@@ -50,14 +52,17 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_coordinator_external_state.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/service_context.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/socket_exception.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
@@ -513,23 +518,34 @@ Status checkAdminDatabase(OperationContext* txn, Database* adminDb) {
return Status::OK();
}
-void syncDoInitialSync(BackgroundSync* bgsync) {
+void syncDoInitialSync(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) {
stdx::unique_lock<stdx::mutex> lk(_initialSyncMutex, stdx::defer_lock);
if (!lk.try_lock()) {
uasserted(34474, "Initial Sync Already Active.");
}
+ std::unique_ptr<BackgroundSync> bgsync;
{
- const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext();
- OperationContext& txn = *txnPtr;
- createOplog(&txn);
+ log() << "Starting replication fetcher thread for initial sync";
+ auto txn = cc().makeOperationContext();
+ bgsync = stdx::make_unique<BackgroundSync>(
+ replicationCoordinatorExternalState,
+ replicationCoordinatorExternalState->makeInitialSyncOplogBuffer(txn.get()));
+ bgsync->startup(txn.get());
+ createOplog(txn.get());
}
+ ON_BLOCK_EXIT([&bgsync]() {
+ log() << "Stopping replication fetcher thread for initial sync";
+ auto txn = cc().makeOperationContext();
+ bgsync->shutdown(txn.get());
+ bgsync->join(txn.get());
+ });
int failedAttempts = 0;
while (failedAttempts < kMaxFailedAttempts) {
try {
// leave loop when successful
- Status status = _initialSync(bgsync);
+ Status status = _initialSync(bgsync.get());
if (status.isOK()) {
break;
} else {
diff --git a/src/mongo/db/repl/rs_initialsync.h b/src/mongo/db/repl/rs_initialsync.h
index 4f8cd5e5017..204f492ef0d 100644
--- a/src/mongo/db/repl/rs_initialsync.h
+++ b/src/mongo/db/repl/rs_initialsync.h
@@ -35,13 +35,13 @@ class OperationContext;
class Database;
namespace repl {
-class BackgroundSync;
+class ReplicationCoordinatorExternalState;
/**
* Begins an initial sync of a node. This drops all data, chooses a sync source,
* and runs the cloner from that sync source. The node's state is not changed.
*/
-void syncDoInitialSync(BackgroundSync* bgsync);
+void syncDoInitialSync(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState);
/**
* Checks that the "admin" database contains a supported version of the auth data schema.