diff options
Diffstat (limited to 'src/mongo/db/repl/initial_syncer.cpp')
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 88 |
1 files changed, 66 insertions, 22 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index b074341d4a3..b41a20510c6 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -58,6 +58,7 @@ #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/sync_source_selector.h" +#include "mongo/db/session_txn_record_gen.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/rpc/metadata/repl_set_metadata.h" @@ -306,6 +307,7 @@ void InitialSyncer::_cancelRemainingWork_inlock() { _shutdownComponent_inlock(_applier); _shutdownComponent_inlock(_fCVFetcher); _shutdownComponent_inlock(_lastOplogEntryFetcher); + _shutdownComponent_inlock(_beginFetchingOpTimeFetcher); } void InitialSyncer::join() { @@ -644,21 +646,44 @@ Status InitialSyncer::_truncateOplogAndDropReplicatedDatabases() { Status InitialSyncer::_scheduleGetBeginFetchingOpTime_inlock( std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - // We will ultimately be getting oldestActiveOplogEntryOpTime from the transactions field in - // serverStatus to use as the beginFetchingTimestamp. Also, we project out the metrics and - // wiredTiger fields because we don't need them and they're very large. - executor::RemoteCommandRequest request( - _syncSource, - "admin", - BSON("serverStatus" << 1 << "transactions" << 1 << "metrics" << 0 << "wiredTiger" << 0), - nullptr); - - auto scheduleResultSW = _exec->scheduleRemoteCommand( - request, [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackData) { - _getBeginFetchingOpTimeCallback(callbackData.response, onCompletionGuard); - }); - return scheduleResultSW.getStatus(); + const auto preparedState = DurableTxnState_serializer(DurableTxnStateEnum::kPrepared); + const auto inProgressState = DurableTxnState_serializer(DurableTxnStateEnum::kInProgress); + + // Obtain the oldest active transaction timestamp from the remote by querying their + // transactions table. + BSONObjBuilder cmd; + cmd.append("find", NamespaceString::kSessionTransactionsTableNamespace.coll().toString()); + cmd.append("filter", + BSON("state" << BSON("$in" << BSON_ARRAY(preparedState << inProgressState)))); + cmd.append("sort", BSON(SessionTxnRecord::kStartOpTimeFieldName << 1)); + cmd.append("readConcern", + BSON("level" + << "local")); + cmd.append("limit", 1); + + _beginFetchingOpTimeFetcher = stdx::make_unique<Fetcher>( + _exec, + _syncSource, + NamespaceString::kSessionTransactionsTableNamespace.db().toString(), + cmd.obj(), + [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) mutable { + _getBeginFetchingOpTimeCallback(response, onCompletionGuard); + }, + ReadPreferenceSetting::secondaryPreferredMetadata(), + RemoteCommandRequest::kNoTimeout /* find network timeout */, + RemoteCommandRequest::kNoTimeout /* getMore network timeout */, + RemoteCommandRetryScheduler::makeRetryPolicy( + numInitialSyncOplogFindAttempts.load(), + executor::RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::kAllRetriableErrors)); + Status scheduleStatus = _beginFetchingOpTimeFetcher->schedule(); + if (!scheduleStatus.isOK()) { + _beginFetchingOpTimeFetcher.reset(); + } + return scheduleStatus; } void InitialSyncer::_rollbackCheckerResetCallback( @@ -679,24 +704,42 @@ void InitialSyncer::_rollbackCheckerResetCallback( } void InitialSyncer::_getBeginFetchingOpTimeCallback( - const executor::TaskExecutor::ResponseStatus& response, + const StatusWith<Fetcher::QueryResponse>& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { stdx::unique_lock<stdx::mutex> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( - response.status, + result.getStatus(), "error while getting oldest active transaction timestamp for begin fetching timestamp"); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; } - invariant(response.data.hasField("transactions")); + const auto docs = result.getValue().documents; + if (docs.size() > 1) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, + Status(ErrorCodes::TooManyMatchingDocuments, + str::stream() << "Expected to receive one document for the oldest active " + "transaction entry, but received: " + << docs.size() + << ". First: " + << redact(docs.front()) + << ". Last: " + << redact(docs.back()))); + return; + } - // Only set beginFetchingOpTime if the oldestActiveOplogEntryTimestamp actually exists. + // Only set beginFetchingOpTime if the oldestActiveOplogEntryOpTime actually exists. OpTime beginFetchingOpTime = OpTime(); - if (response.data["transactions"].embeddedObject().hasField("oldestActiveOplogEntryOpTime")) { - beginFetchingOpTime = repl::OpTime::parse( - response.data["transactions"]["oldestActiveOplogEntryOpTime"].Obj()); + if (docs.size() != 0) { + auto entry = SessionTxnRecord::parse( + IDLParserErrorContext("oldest active transaction optime for initial sync"), + docs.front()); + auto optime = entry.getStartOpTime(); + if (optime) { + beginFetchingOpTime = optime.get(); + } } status = _scheduleLastOplogEntryFetcher_inlock( @@ -788,7 +831,8 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> onCompletionGuard->setResultAndCancelRemainingWork_inlock( lock, Status(ErrorCodes::TooManyMatchingDocuments, - str::stream() << "Expected to receive one document, but received: " + str::stream() << "Expected to receive one feature compatibility version " + "document, but received: " << docs.size() << ". First: " << redact(docs.front()) |