summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/initial_syncer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/initial_syncer.cpp')
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp88
1 files changed, 66 insertions, 22 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index b074341d4a3..b41a20510c6 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/sync_source_selector.h"
+#include "mongo/db/session_txn_record_gen.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
@@ -306,6 +307,7 @@ void InitialSyncer::_cancelRemainingWork_inlock() {
_shutdownComponent_inlock(_applier);
_shutdownComponent_inlock(_fCVFetcher);
_shutdownComponent_inlock(_lastOplogEntryFetcher);
+ _shutdownComponent_inlock(_beginFetchingOpTimeFetcher);
}
void InitialSyncer::join() {
@@ -644,21 +646,44 @@ Status InitialSyncer::_truncateOplogAndDropReplicatedDatabases() {
Status InitialSyncer::_scheduleGetBeginFetchingOpTime_inlock(
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- // We will ultimately be getting oldestActiveOplogEntryOpTime from the transactions field in
- // serverStatus to use as the beginFetchingTimestamp. Also, we project out the metrics and
- // wiredTiger fields because we don't need them and they're very large.
- executor::RemoteCommandRequest request(
- _syncSource,
- "admin",
- BSON("serverStatus" << 1 << "transactions" << 1 << "metrics" << 0 << "wiredTiger" << 0),
- nullptr);
-
- auto scheduleResultSW = _exec->scheduleRemoteCommand(
- request, [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackData) {
- _getBeginFetchingOpTimeCallback(callbackData.response, onCompletionGuard);
- });
- return scheduleResultSW.getStatus();
+ const auto preparedState = DurableTxnState_serializer(DurableTxnStateEnum::kPrepared);
+ const auto inProgressState = DurableTxnState_serializer(DurableTxnStateEnum::kInProgress);
+
+ // Obtain the oldest active transaction timestamp from the remote by querying their
+ // transactions table.
+ BSONObjBuilder cmd;
+ cmd.append("find", NamespaceString::kSessionTransactionsTableNamespace.coll().toString());
+ cmd.append("filter",
+ BSON("state" << BSON("$in" << BSON_ARRAY(preparedState << inProgressState))));
+ cmd.append("sort", BSON(SessionTxnRecord::kStartOpTimeFieldName << 1));
+ cmd.append("readConcern",
+ BSON("level"
+ << "local"));
+ cmd.append("limit", 1);
+
+ _beginFetchingOpTimeFetcher = stdx::make_unique<Fetcher>(
+ _exec,
+ _syncSource,
+ NamespaceString::kSessionTransactionsTableNamespace.db().toString(),
+ cmd.obj(),
+ [=](const StatusWith<mongo::Fetcher::QueryResponse>& response,
+ mongo::Fetcher::NextAction*,
+ mongo::BSONObjBuilder*) mutable {
+ _getBeginFetchingOpTimeCallback(response, onCompletionGuard);
+ },
+ ReadPreferenceSetting::secondaryPreferredMetadata(),
+ RemoteCommandRequest::kNoTimeout /* find network timeout */,
+ RemoteCommandRequest::kNoTimeout /* getMore network timeout */,
+ RemoteCommandRetryScheduler::makeRetryPolicy(
+ numInitialSyncOplogFindAttempts.load(),
+ executor::RemoteCommandRequest::kNoTimeout,
+ RemoteCommandRetryScheduler::kAllRetriableErrors));
+ Status scheduleStatus = _beginFetchingOpTimeFetcher->schedule();
+ if (!scheduleStatus.isOK()) {
+ _beginFetchingOpTimeFetcher.reset();
+ }
+ return scheduleStatus;
}
void InitialSyncer::_rollbackCheckerResetCallback(
@@ -679,24 +704,42 @@ void InitialSyncer::_rollbackCheckerResetCallback(
}
void InitialSyncer::_getBeginFetchingOpTimeCallback(
- const executor::TaskExecutor::ResponseStatus& response,
+ const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(
- response.status,
+ result.getStatus(),
"error while getting oldest active transaction timestamp for begin fetching timestamp");
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}
- invariant(response.data.hasField("transactions"));
+ const auto docs = result.getValue().documents;
+ if (docs.size() > 1) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(
+ lock,
+ Status(ErrorCodes::TooManyMatchingDocuments,
+ str::stream() << "Expected to receive one document for the oldest active "
+ "transaction entry, but received: "
+ << docs.size()
+ << ". First: "
+ << redact(docs.front())
+ << ". Last: "
+ << redact(docs.back())));
+ return;
+ }
- // Only set beginFetchingOpTime if the oldestActiveOplogEntryTimestamp actually exists.
+ // Only set beginFetchingOpTime if the oldestActiveOplogEntryOpTime actually exists.
OpTime beginFetchingOpTime = OpTime();
- if (response.data["transactions"].embeddedObject().hasField("oldestActiveOplogEntryOpTime")) {
- beginFetchingOpTime = repl::OpTime::parse(
- response.data["transactions"]["oldestActiveOplogEntryOpTime"].Obj());
+ if (docs.size() != 0) {
+ auto entry = SessionTxnRecord::parse(
+ IDLParserErrorContext("oldest active transaction optime for initial sync"),
+ docs.front());
+ auto optime = entry.getStartOpTime();
+ if (optime) {
+ beginFetchingOpTime = optime.get();
+ }
}
status = _scheduleLastOplogEntryFetcher_inlock(
@@ -788,7 +831,8 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
onCompletionGuard->setResultAndCancelRemainingWork_inlock(
lock,
Status(ErrorCodes::TooManyMatchingDocuments,
- str::stream() << "Expected to receive one document, but received: "
+ str::stream() << "Expected to receive one feature compatibility version "
+ "document, but received: "
<< docs.size()
<< ". First: "
<< redact(docs.front())