summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/rs_initialsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/rs_initialsync.cpp')
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp92
1 files changed, 46 insertions, 46 deletions
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 073afcc73ec..a6a5cf78baf 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -84,16 +84,16 @@ MONGO_EXPORT_SERVER_PARAMETER(num3Dot2InitialSyncAttempts, int, 10);
* Also resets the bgsync thread so that it reconnects its sync source after the oplog has been
* truncated.
*/
-void truncateAndResetOplog(OperationContext* txn,
+void truncateAndResetOplog(OperationContext* opCtx,
ReplicationCoordinator* replCoord,
BackgroundSync* bgsync) {
// Add field to minvalid document to tell us to restart initial sync if we crash
- StorageInterface::get(txn)->setInitialSyncFlag(txn);
+ StorageInterface::get(opCtx)->setInitialSyncFlag(opCtx);
- AutoGetDb autoDb(txn, "local", MODE_X);
+ AutoGetDb autoDb(opCtx, "local", MODE_X);
massert(28585, "no local database found", autoDb.getDb());
- invariant(txn->lockState()->isCollectionLockedForMode(rsOplogName, MODE_X));
+ invariant(opCtx->lockState()->isCollectionLockedForMode(rsOplogName, MODE_X));
// Note: the following order is important.
// The bgsync thread uses an empty optime as a sentinel to know to wait
// for initial sync; thus, we must
@@ -104,7 +104,7 @@ void truncateAndResetOplog(OperationContext* txn,
replCoord->resetMyLastOpTimes();
bgsync->stop(true);
bgsync->startProducerIfStopped();
- bgsync->clearBuffer(txn);
+ bgsync->clearBuffer(opCtx);
replCoord->clearSyncSourceBlacklist();
@@ -112,15 +112,15 @@ void truncateAndResetOplog(OperationContext* txn,
Collection* collection = autoDb.getDb()->getCollection(rsOplogName);
fassert(28565, collection);
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- WriteUnitOfWork wunit(txn);
- Status status = collection->truncate(txn);
+ WriteUnitOfWork wunit(opCtx);
+ Status status = collection->truncate(opCtx);
fassert(28564, status);
wunit.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "truncate", collection->ns().ns());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "truncate", collection->ns().ns());
}
-bool _initialSyncClone(OperationContext* txn,
+bool _initialSyncClone(OperationContext* opCtx,
Cloner& cloner,
const std::string& host,
const std::string& db,
@@ -144,10 +144,10 @@ bool _initialSyncClone(OperationContext* txn,
options.createCollections = false;
// Make database stable
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbWrite(txn->lockState(), db, MODE_X);
+ ScopedTransaction transaction(opCtx, MODE_IX);
+ Lock::DBLock dbWrite(opCtx->lockState(), db, MODE_X);
- Status status = cloner.copyDb(txn, db, host, options, nullptr, collections);
+ Status status = cloner.copyDb(opCtx, db, host, options, nullptr, collections);
if (!status.isOK()) {
log() << "initial sync: error while " << (dataPass ? "cloning " : "indexing ") << db
<< ". " << redact(status);
@@ -155,7 +155,7 @@ bool _initialSyncClone(OperationContext* txn,
}
if (dataPass && (db == "admin")) {
- fassertNoTrace(28619, checkAdminDatabase(txn, dbHolder().get(txn, db)));
+ fassertNoTrace(28619, checkAdminDatabase(opCtx, dbHolder().get(opCtx, db)));
}
return true;
}
@@ -167,7 +167,7 @@ bool _initialSyncClone(OperationContext* txn,
* @param r the oplog reader.
* @return if applying the oplog succeeded.
*/
-bool _initialSyncApplyOplog(OperationContext* txn,
+bool _initialSyncApplyOplog(OperationContext* opCtx,
repl::InitialSync* syncer,
OplogReader* r,
BackgroundSync* bgsync) {
@@ -178,7 +178,7 @@ bool _initialSyncApplyOplog(OperationContext* txn,
if (MONGO_FAIL_POINT(failInitSyncWithBufferedEntriesLeft)) {
log() << "adding fake oplog entry to buffer.";
bgsync->pushTestOpToBuffer(
- txn,
+ opCtx,
BSON("ts" << startOpTime.getTimestamp() << "t" << startOpTime.getTerm() << "v" << 1
<< "op"
<< "n"));
@@ -222,7 +222,7 @@ bool _initialSyncApplyOplog(OperationContext* txn,
// apply till stopOpTime
try {
LOG(2) << "Applying oplog entries from " << startOpTime << " until " << stopOpTime;
- syncer->oplogApplication(txn, stopOpTime);
+ syncer->oplogApplication(opCtx, stopOpTime);
if (globalInShutdownDeprecated()) {
return false;
@@ -262,15 +262,15 @@ bool _initialSyncApplyOplog(OperationContext* txn,
* ErrorCode::InitialSyncOplogSourceMissing if the node fails to find an sync source, Status::OK
* if everything worked, and ErrorCode::InitialSyncFailure for all other error cases.
*/
-Status _initialSync(OperationContext* txn, BackgroundSync* bgsync) {
+Status _initialSync(OperationContext* opCtx, BackgroundSync* bgsync) {
log() << "initial sync pending";
- txn->setReplicatedWrites(false);
- DisableDocumentValidation validationDisabler(txn);
+ opCtx->setReplicatedWrites(false);
+ DisableDocumentValidation validationDisabler(opCtx);
ReplicationCoordinator* replCoord(getGlobalReplicationCoordinator());
// reset state for initial sync
- truncateAndResetOplog(txn, replCoord, bgsync);
+ truncateAndResetOplog(opCtx, replCoord, bgsync);
OplogReader r;
@@ -278,7 +278,7 @@ Status _initialSync(OperationContext* txn, BackgroundSync* bgsync) {
while (r.getHost().empty()) {
// We must prime the sync source selector so that it considers all candidates regardless
// of oplog position, by passing in null OpTime as the last op fetched time.
- r.connectToSyncSource(txn, OpTime(), OpTime(), replCoord);
+ r.connectToSyncSource(opCtx, OpTime(), OpTime(), replCoord);
if (r.getHost().empty()) {
std::string msg =
@@ -306,7 +306,7 @@ Status _initialSync(OperationContext* txn, BackgroundSync* bgsync) {
}
log() << "initial sync drop all databases";
- dropAllDatabasesExceptLocal(txn);
+ dropAllDatabasesExceptLocal(opCtx);
if (MONGO_FAIL_POINT(initialSyncHangBeforeCopyingDatabases)) {
log() << "initial sync - initialSyncHangBeforeCopyingDatabases fail point enabled. "
@@ -360,17 +360,17 @@ Status _initialSync(OperationContext* txn, BackgroundSync* bgsync) {
createCollectionParams.push_back(params);
}
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbWrite(txn->lockState(), db, MODE_X);
+ ScopedTransaction transaction(opCtx, MODE_IX);
+ Lock::DBLock dbWrite(opCtx->lockState(), db, MODE_X);
- auto createStatus = cloner.createCollectionsForDb(txn, createCollectionParams, db);
+ auto createStatus = cloner.createCollectionsForDb(opCtx, createCollectionParams, db);
if (!createStatus.isOK()) {
return createStatus;
}
collectionsPerDb.emplace(db, std::move(collections));
}
for (auto&& dbCollsPair : collectionsPerDb) {
- if (!_initialSyncClone(txn,
+ if (!_initialSyncClone(opCtx,
cloner,
r.conn()->getServerAddress(),
dbCollsPair.first,
@@ -385,15 +385,15 @@ Status _initialSync(OperationContext* txn, BackgroundSync* bgsync) {
// prime oplog, but don't need to actually apply the op as the cloned data already reflects it.
fassertStatusOK(
40142,
- StorageInterface::get(txn)->insertDocument(txn, NamespaceString(rsOplogName), lastOp));
+ StorageInterface::get(opCtx)->insertDocument(opCtx, NamespaceString(rsOplogName), lastOp));
OpTime lastOptime = OplogEntry(lastOp).getOpTime();
- ReplClientInfo::forClient(txn->getClient()).setLastOp(lastOptime);
+ ReplClientInfo::forClient(opCtx->getClient()).setLastOp(lastOptime);
replCoord->setMyLastAppliedOpTime(lastOptime);
setNewTimestamp(replCoord->getServiceContext(), lastOptime.getTimestamp());
std::string msg = "oplog sync 1 of 3";
log() << msg;
- if (!_initialSyncApplyOplog(txn, &init, &r, bgsync)) {
+ if (!_initialSyncApplyOplog(opCtx, &init, &r, bgsync)) {
return Status(ErrorCodes::InitialSyncFailure,
str::stream() << "initial sync failed: " << msg);
}
@@ -404,7 +404,7 @@ Status _initialSync(OperationContext* txn, BackgroundSync* bgsync) {
// TODO: replace with "tail" instance below, since we don't need to retry/reclone missing docs.
msg = "oplog sync 2 of 3";
log() << msg;
- if (!_initialSyncApplyOplog(txn, &init, &r, bgsync)) {
+ if (!_initialSyncApplyOplog(opCtx, &init, &r, bgsync)) {
return Status(ErrorCodes::InitialSyncFailure,
str::stream() << "initial sync failed: " << msg);
}
@@ -413,7 +413,7 @@ Status _initialSync(OperationContext* txn, BackgroundSync* bgsync) {
msg = "initial sync building indexes";
log() << msg;
for (auto&& dbCollsPair : collectionsPerDb) {
- if (!_initialSyncClone(txn,
+ if (!_initialSyncClone(opCtx,
cloner,
r.conn()->getServerAddress(),
dbCollsPair.first,
@@ -431,14 +431,14 @@ Status _initialSync(OperationContext* txn, BackgroundSync* bgsync) {
log() << msg;
InitialSync tail(bgsync, multiSyncApply); // Use the non-initial sync apply code
- if (!_initialSyncApplyOplog(txn, &tail, &r, bgsync)) {
+ if (!_initialSyncApplyOplog(opCtx, &tail, &r, bgsync)) {
return Status(ErrorCodes::InitialSyncFailure,
str::stream() << "initial sync failed: " << msg);
}
// ---------
- Status status = getGlobalAuthorizationManager()->initialize(txn);
+ Status status = getGlobalAuthorizationManager()->initialize(opCtx);
if (!status.isOK()) {
warning() << "Failed to reinitialize auth data after initial sync. " << status;
return status;
@@ -448,7 +448,7 @@ Status _initialSync(OperationContext* txn, BackgroundSync* bgsync) {
// Initial sync is now complete.
// Clear the initial sync flag -- cannot be done under a db lock, or recursive.
- StorageInterface::get(txn)->clearInitialSyncFlag(txn);
+ StorageInterface::get(opCtx)->clearInitialSyncFlag(opCtx);
// Clear maint. mode.
while (replCoord->getMaintenanceMode()) {
@@ -463,20 +463,20 @@ stdx::mutex _initialSyncMutex;
const auto kInitialSyncRetrySleepDuration = Seconds{5};
} // namespace
-Status checkAdminDatabase(OperationContext* txn, Database* adminDb) {
- // Assumes txn holds MODE_X or MODE_S lock on "admin" database.
+Status checkAdminDatabase(OperationContext* opCtx, Database* adminDb) {
+ // Assumes opCtx holds MODE_X or MODE_S lock on "admin" database.
if (!adminDb) {
return Status::OK();
}
Collection* const usersCollection =
adminDb->getCollection(AuthorizationManager::usersCollectionNamespace);
const bool hasUsers =
- usersCollection && !Helpers::findOne(txn, usersCollection, BSONObj(), false).isNull();
+ usersCollection && !Helpers::findOne(opCtx, usersCollection, BSONObj(), false).isNull();
Collection* const adminVersionCollection =
adminDb->getCollection(AuthorizationManager::versionCollectionNamespace);
BSONObj authSchemaVersionDocument;
if (!adminVersionCollection ||
- !Helpers::findOne(txn,
+ !Helpers::findOne(opCtx,
adminVersionCollection,
AuthorizationManager::versionDocumentQuery,
authSchemaVersionDocument)) {
@@ -518,7 +518,7 @@ Status checkAdminDatabase(OperationContext* txn, Database* adminDb) {
return Status::OK();
}
-void syncDoInitialSync(OperationContext* txn,
+void syncDoInitialSync(OperationContext* opCtx,
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) {
stdx::unique_lock<stdx::mutex> lk(_initialSyncMutex, stdx::defer_lock);
if (!lk.try_lock()) {
@@ -530,21 +530,21 @@ void syncDoInitialSync(OperationContext* txn,
log() << "Starting replication fetcher thread for initial sync";
bgsync = stdx::make_unique<BackgroundSync>(
replicationCoordinatorExternalState,
- replicationCoordinatorExternalState->makeInitialSyncOplogBuffer(txn));
- bgsync->startup(txn);
- createOplog(txn);
+ replicationCoordinatorExternalState->makeInitialSyncOplogBuffer(opCtx));
+ bgsync->startup(opCtx);
+ createOplog(opCtx);
}
- ON_BLOCK_EXIT([txn, &bgsync]() {
+ ON_BLOCK_EXIT([opCtx, &bgsync]() {
log() << "Stopping replication fetcher thread for initial sync";
- bgsync->shutdown(txn);
- bgsync->join(txn);
+ bgsync->shutdown(opCtx);
+ bgsync->join(opCtx);
});
int failedAttempts = 0;
while (failedAttempts < num3Dot2InitialSyncAttempts.load()) {
try {
// leave loop when successful
- Status status = _initialSync(txn, bgsync.get());
+ Status status = _initialSync(opCtx, bgsync.get());
if (status.isOK()) {
break;
} else {