summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMoustafa Maher <m.maher@10gen.com>2021-10-01 22:42:09 +0000
committerMoustafa Maher Khalil <m.maher@mongodb.com>2021-10-06 05:17:40 +0000
commitd14c8c01e589b11340cb0b7dc9e33c32601cd5b6 (patch)
tree0596c02d349e34bde4818c067e6adb470ec4df70
parented1a1de3bbf155f9e14ad536f47771b9c8ba2561 (diff)
downloadmongo-m.maher/Test.tar.gz
-rw-r--r--jstests/replsets/test.js159
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp501
-rw-r--r--src/mongo/db/service_entry_point_common.cpp7
3 files changed, 665 insertions, 2 deletions
diff --git a/jstests/replsets/test.js b/jstests/replsets/test.js
new file mode 100644
index 00000000000..48bf8f3afed
--- /dev/null
+++ b/jstests/replsets/test.js
@@ -0,0 +1,159 @@
+/**
+ * Test the waiting logic of $backupCursorExtend. Given a timestamp T, when
+ * $backupCursorExtend returns, oplog with T should be majority committed and
+ * persisent on the disk of that node.
+ *
+ * @tags: [
+ * requires_journaling,
+ * requires_persistence,
+ * requires_sharding,
+ * requires_wiredtiger,
+ * ]
+ */
+(function() {
+"use strict";
+load("jstests/replsets/rslib.js"); // For reconfig, isConfigCommitted and
+ // safeReconfigShouldFail.
+load("jstests/libs/backup_utils.js");
+load("jstests/libs/write_concern_util.js");
+
+const DEBUG = false;
+const dbName = "test";
+const collName = "coll";
+const restorePath = MongoRunner.dataPath + "forRestore/";
+const numDocs = 2;
+
+let addNodeConfig = function(rst, nodeId, conn, arbiter) {
+ const config = rst.getReplSetConfigFromNode();
+ if (arbiter) {
+ config.members.push({_id: nodeId, host: conn.host, arbiterOnly: true});
+ } else {
+ config.members.push({_id: nodeId, host: conn.host});
+ }
+
+ return config;
+};
+
+let removeNodeConfig = function(rst, conn) {
+ const config = rst.getReplSetConfigFromNode();
+ for (var i = 0; i < config.members.length; i++) {
+ if (config.members[i].host == conn.host) {
+ config.members.splice(i, 1);
+ break;
+ }
+ }
+
+ return config;
+};
+
+function testReconfig(rst, config, shouldSucceed, errCode, errMsg) {
+ if (shouldSucceed) {
+ reconfig(rst, config);
+ assert.soon(() => isConfigCommitted(rst.getPrimary()));
+ rst.waitForConfigReplication(rst.getPrimary());
+ rst.awaitReplication();
+ // rst.await
+ } else {
+ safeReconfigShouldFail(rst, config, false /* force */, errCode, errMsg);
+
+ // A force reconfig should also fail.
+ safeReconfigShouldFail(rst, config, true /* force */, errCode, errMsg);
+ }
+}
+
+function insertDoc(db, collName, doc) {
+ let res = assert.commandWorked(db.runCommand({insert: collName, documents: [doc]}));
+ assert(res.hasOwnProperty("operationTime"), tojson(res));
+ return res.operationTime;
+}
+
+/*
+ * Assert that lagged secondary will block when Timestamp T has not been majority committed yet.
+ */
+function assertLaggedSecondaryGetBlocked() {
+ resetDbpath(restorePath);
+ let rst = new ReplSetTest({name: "test", nodes: 1});
+ rst.startSet();
+ rst.initiateWithHighElectionTimeout();
+ const primaryDB = rst.getPrimary().getDB(dbName);
+
+ print("Ahoo0 ==> Insert Docs to Primary");
+ for (let i = 0; i < 1000; i++) {
+ insertDoc(primaryDB, collName, {k: i});
+ }
+
+ print("Ahoo0 ==> AddSecondary");
+ testReconfig(rst,
+ addNodeConfig(rst, 1 /* nodeId */, rst.add() /* conn */, false /* arbiter */),
+ true /* shouldSucceed */);
+ rst.stopSet();
+ return;
+
+ let cursor = openBackupCursor(rst.getSecondary());
+ // let firstBatch = cursor.next();
+ let firstBatch = undefined;
+ while(cursor.hasNext()) {
+ let batch = cursor.next();
+ print("Ahoo1 ==> ", tojson(batch));
+ if (!firstBatch) {
+ firstBatch = batch;
+ }
+ }
+
+ print("Ahoo2 --> first batch: ", tojson(firstBatch));
+ let checkpointTimestamp = firstBatch.metadata["checkpointTimestamp"];
+ const backupId = firstBatch.metadata.backupId;
+ print("ahoo2 -> ", tojson(checkpointTimestamp), " "+ backupId);
+
+ jsTestLog("Start writes on primary");
+ let clusterTime;
+ for (let i = 0; i < numDocs - 1; i++) {
+ clusterTime = insertDoc(primaryDB, collName, {a: i});
+ }
+
+ print("Ahoo3 ==> clusterTime: ", tojson(clusterTime));
+ let extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime);
+ while(extendCursor.hasNext()) {
+ let batch = extendCursor.next();
+ print("Ahoo3 ==> ", tojson(batch));
+ }
+
+ jsTestLog("Start writes on primary");
+ for (let i = 0; i < numDocs - 1; i++) {
+ clusterTime = insertDoc(primaryDB, collName, {b: i});
+ }
+
+ print("Ahoo4 ==> clusterTime: ", tojson(clusterTime));
+ extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime);
+ while(extendCursor.hasNext()) {
+ let batch = extendCursor.next();
+ print("Ahoo4 ==> ", tojson(batch));
+ }
+
+ jsTestLog("Start writes on primary");
+ for (let i = 0; i < numDocs - 1; i++) {
+ clusterTime = insertDoc(primaryDB, collName, {c: i});
+ }
+
+ print("Ahoo5 ==> clusterTime: ", tojson(clusterTime));
+ extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime);
+ while(extendCursor.hasNext()) {
+ let batch = extendCursor.next();
+ print("Ahoo5 ==> ", tojson(batch));
+ }
+
+ cursor.close();
+
+ cursor = openBackupCursor(rst.getSecondary());
+ // let firstBatch = cursor.next();
+ while(cursor.hasNext()) {
+ let batch = cursor.next();
+ print("Ahoo6 ==> ", tojson(batch));
+ }
+
+ cursor.close();
+ rst.stopSet();
+}
+
+assertLaggedSecondaryGetBlocked();
+})();
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index f4810d8ce3d..5fd72435a2c 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -29,9 +29,10 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplicationInitialSync
-#include "mongo/platform/basic.h"
-
#include "initial_syncer.h"
+#include "mongo/platform/basic.h"
+#include "mongo/util/future_util.h"
+#include <boost/optional.hpp>
#include <algorithm>
#include <memory>
@@ -77,6 +78,7 @@
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
#include "mongo/util/version/releases.h"
+#include "mongo/db/dbdirectclient.h"
namespace mongo {
namespace repl {
@@ -704,6 +706,434 @@ void InitialSyncer::_startInitialSyncAttemptCallback(
}
}
+// void _cloneFiles(const std::set<std::string> &files) {
+// std::cout << "Ahoo ->Start cloning files:" << std::endl;
+// for (auto str : files) {
+// std::cout << str << std::endl;
+// }
+// }
+
+void _cloneFiles(const StringSet& files) {
+ std::cout << "Ahoo ->Start cloning files:" << std::endl;
+ for (auto str : files) {
+ std::cout << str << std::endl;
+ }
+}
+
+struct SyncingFilesState {
+ SyncingFilesState() = default;
+ void keepBackupCursorAlive(std::shared_ptr<executor::TaskExecutor> clonerExec,
+ const CancellationToken& parentToken,
+ const HostAndPort& host) {
+ backupCursorKeepAliveCancellation = CancellationSource(parentToken);
+ std::cout << "Ahoo --> Inside keepBackupCursorAlive " << std::endl;
+ return AsyncTry([this, host, clonerExec] {
+ if (backupId) {
+ executor::RemoteCommandRequest request(
+ host,
+ NamespaceString::kAdminDb.toString(),
+ std::move(BSON("getMore" << cursorId << "collection"
+ << "$cmd.aggregate")),
+ rpc::makeEmptyMetadata(),
+ nullptr);
+ // We're not expecting a response, set to fire and forget
+ request.fireAndForgetMode =
+ executor::RemoteCommandRequest::FireAndForgetMode::kOn;
+ std::cout << "Ahoo --> Inside keepBackupCursorAlive new command"
+ << std::endl;
+ return clonerExec
+ ->scheduleRemoteCommand(std::move(request),
+ backupCursorKeepAliveCancellation.token())
+ .getAsync([](auto&&) {});
+ }
+ })
+ .until([](Status) { return false; })
+ .withDelayBetweenIterations(Seconds(2))
+ .on(clonerExec, backupCursorKeepAliveCancellation.token())
+ .getAsync([](auto&&) {}); // Ignore the result Future;
+ }
+
+ void KillBackupCursor(std::shared_ptr<executor::TaskExecutor> clonerExec,
+ const HostAndPort& host) {
+ if (backupId) {
+ std::cout << "Ahoo ->Inside InitialSync Cancelling backupCursorCancellation:"
+ << std::endl;
+ backupCursorKeepAliveCancellation.cancel();
+ // mongo::sleepsecs(20);
+ auto cmdObj = BSON("killCursors"
+ << "$cmd.aggregate"
+ << "cursors" << BSON_ARRAY(cursorId));
+ // std::cout << "Ahoo ->Inside InitialSync kill cursor :" << " " << cmdObj << std::endl;
+
+ executor::RemoteCommandRequest request(host,
+ NamespaceString::kAdminDb.toString(),
+ std::move(cmdObj),
+ rpc::makeEmptyMetadata(),
+ nullptr);
+ // We're not expecting a response, set to fire and forget
+ request.fireAndForgetMode = executor::RemoteCommandRequest::FireAndForgetMode::kOn;
+ return clonerExec
+ ->scheduleRemoteCommand(std::move(request), CancellationToken::uncancelable())
+ .getAsync([](auto&&) {});
+ }
+ }
+
+ // Extended cursor sends all log files created since the backupCursor's
+ // checkpointTimestamp till the extendTo timestamp, so we need to get the
+ // difference between the files returned by the consecutive backupCursorExtend to
+ // clone only the new log files added since the previous backupCursorExtend.
+ StringSet getNewFilesToClone(std::set<std::string>& backupCursorExtendFiles) {
+ StringSet newFilesToClone;
+ std::set_difference(backupCursorExtendFiles.begin(),
+ backupCursorExtendFiles.end(),
+ extendedCursorFiles.begin(),
+ extendedCursorFiles.end(),
+ std::inserter(newFilesToClone, newFilesToClone.begin()));
+ extendedCursorFiles.insert(newFilesToClone.begin(), newFilesToClone.end());
+ return newFilesToClone;
+ }
+
+ CancellationSource backupCursorKeepAliveCancellation;
+ boost::optional<mongo::UUID> backupId;
+ CursorId cursorId;
+ mongo::Timestamp lastSyncedOpTime;
+ mongo::Timestamp lastAppliedOpTimeOnSyncSrc;
+ std::set<std::string> extendedCursorFiles;
+ int fileBasedInitialSyncCycles = 1;
+};
+
+BSONElement _getBSONField(const BSONObj& obj,
+ const std::string& fieldName,
+ const std::string& objName) {
+ uassert(ErrorCodes::NoSuchKey,
+ str::stream() << "Missing " << fieldName << "field for " << objName << ".",
+ obj.hasField(fieldName));
+ return obj.getField(fieldName);
+}
+
+ExecutorFuture<mongo::Timestamp> _getLastAppliedOpTimeFromRemoteNode(
+ HostAndPort host,
+ std::shared_ptr<executor::TaskExecutor> clonerExec,
+ const CancellationToken& token) {
+ executor::RemoteCommandRequest request(std::move(host),
+ NamespaceString::kAdminDb.toString(),
+ std::move(BSON("replSetGetStatus" << 1)),
+ rpc::makeEmptyMetadata(),
+ nullptr);
+ return clonerExec->scheduleRemoteCommand(std::move(request), token)
+ .then([](const auto& response) {
+ uassertStatusOK(response.status);
+ auto& reply = response.data;
+ uassertStatusOK(getStatusFromCommandResult(reply));
+ // Parsing replSetGetStatus's reply to get lastAppliedOpTime.
+ // ReplSetGetStatus's reply example:
+ // {
+ // ...
+ // "optimes" : {
+ // ...
+ // "appliedOpTime" : {
+ // "ts" : Timestamp(1583385878, 1),
+ // "t" : NumberLong(3)
+ // },
+ // ...
+ // }
+ // ...
+ // }
+ auto lastAppliedOpTime = _getBSONField(
+ _getBSONField(_getBSONField(reply, "optimes", "replSetGetStatus's reply").Obj(),
+ "appliedOpTime",
+ "replSetGetStatus's reply.optimes")
+ .Obj(),
+ "ts",
+ "replSetGetStatus's reply.optimes.appliedOpTime");
+ return lastAppliedOpTime.timestamp();
+ });
+}
+
+ExecutorFuture<void> _openBackupCursor(const HostAndPort& host,
+ std::shared_ptr<executor::TaskExecutor> clonerExec,
+ const CancellationToken& token,
+ SyncingFilesState& _syncingFilesState,
+ std::shared_ptr<StringSet> files) {
+
+ const auto cmdObj = [&] {
+ AggregateCommandRequest aggRequest(
+ NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
+ {BSON("$backupCursor" << BSONObj())});
+ // We must set a writeConcern on internal commands.
+ aggRequest.setWriteConcern(WriteConcernOptions());
+ return aggRequest.toBSON(BSONObj());
+ }();
+
+ std::cout << "Ahoo -> Inside Fetcher CMD -> " << cmdObj << std::endl;
+
+ auto fetcherCallback = [&_syncingFilesState, files, token](
+ const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ std::cout << "Ahoo -> Inside Fetcher callback-> " << dataStatus.getStatus().reason()
+ << std::endl;
+ // Throw out any accumulated results on error
+ uassertStatusOK(dataStatus.getStatus());
+ uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
+
+ const auto& data = dataStatus.getValue();
+ for (const BSONObj& doc : data.documents) {
+ std::cout << "Ahoo -> Inside Fetcher -> " << doc << std::endl;
+ if (!_syncingFilesState.backupId) {
+ // First batch must contain the metadata.
+ // Parsing the metadata to get backupId and checkpointTimestamp for the
+ // the backupCursor.
+ const auto& metaData =
+ _getBSONField(doc, "metadata", "backupCursor's first batch").Obj();
+ _syncingFilesState.backupId = UUID(uassertStatusOK(UUID::parse(
+ _getBSONField(metaData, "backupId", "backupCursor's first batch.metadata"))));
+ _syncingFilesState.lastSyncedOpTime =
+ _getBSONField(
+ metaData, "checkpointTimestamp", "backupCursor's first batch.metadata")
+ .timestamp();
+ _syncingFilesState.cursorId = data.cursorId;
+ std::cout << "AHoo -> _syncingFilesState.cursorId " << _syncingFilesState.cursorId
+ << std::endl;
+ } else {
+ files->insert(_getBSONField(doc, "filename", "backupCursor's batches").str());
+ }
+ }
+
+ if (!getMoreBob) {
+ return;
+ }
+
+ if (!data.documents.size()) {
+ *nextAction = Fetcher::NextAction::kExitAndKeepCursorAlive;
+ return;
+ }
+
+ getMoreBob->append("getMore", data.cursorId);
+ getMoreBob->append("collection", data.nss.coll());
+ };
+ auto fetcher = std::make_shared<Fetcher>(
+ clonerExec.get(),
+ host,
+ NamespaceString::kAdminDb.toString(),
+ cmdObj,
+ fetcherCallback,
+ ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(),
+ executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */
+ executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */
+ RemoteCommandRetryScheduler::makeRetryPolicy<ErrorCategory::RetriableError>(
+ 3U, executor::RemoteCommandRequest::kNoTimeout),
+ transport::kGlobalSSLMode);
+ uassertStatusOK(fetcher->schedule());
+ // std::cout << fetcher->getDiagnosticString() << std::endl;
+ // fetcher->join();
+ // fetcher.join();
+ return fetcher->onCompletion().thenRunOn(clonerExec).then([fetcher, files] {
+ std::cout << "Ahoo -> Inside Fetcher Finished -> " << std::endl;
+ for (const auto& file : *files) {
+ std::cout << file << std::endl;
+ }
+ });
+}
+
+ExecutorFuture<void> _extendBackupCursor(const HostAndPort& host,
+ std::shared_ptr<executor::TaskExecutor> clonerExec,
+ const CancellationToken& token,
+ SyncingFilesState& _syncingFilesState,
+ std::shared_ptr<StringSet> files,
+ const Timestamp& extendTo) {
+
+ const auto cmdObj = [&] {
+ AggregateCommandRequest aggRequest(
+ NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
+ {BSON("$backupCursorExtend" << BSON("backupId" << _syncingFilesState.backupId.get()
+ << "timestamp" << extendTo))});
+ // We must set a writeConcern on internal commands.
+ aggRequest.setWriteConcern(WriteConcernOptions());
+ // The command may not return immediately because it may wait for the node to have the full
+ // oplog history up to the backup point in time.
+ aggRequest.setMaxTimeMS(180 * 1000);
+ return aggRequest.toBSON(BSONObj());
+ }();
+
+ std::cout << "Ahoo -> Inside Fetcher Extend CMD -> " << cmdObj << std::endl;
+ auto exfiles = std::make_shared<std::set<std::string>>();
+ auto fetcherCallback =
+ [&_syncingFilesState, exfiles, token](const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ std::cout << "Ahoo -> Inside Fetcher Extend callback-> "
+ << dataStatus.getStatus().reason() << std::endl;
+ // Throw out any accumulated results on error
+ uassertStatusOK(dataStatus.getStatus());
+ uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
+
+ const auto& data = dataStatus.getValue();
+ for (const BSONObj& doc : data.documents) {
+ std::cout << "Ahoo -> Inside Fetcher Extend -> " << doc << std::endl;
+ exfiles->insert(_getBSONField(doc, "filename", "backupCursor's batches").str());
+ }
+
+ if (!getMoreBob || !data.documents.size()) {
+ return;
+ }
+ getMoreBob->append("getMore", data.cursorId);
+ getMoreBob->append("collection", data.nss.coll());
+ };
+
+ auto fetcher = std::make_shared<Fetcher>(
+ clonerExec.get(),
+ host,
+ NamespaceString::kAdminDb.toString(),
+ cmdObj,
+ fetcherCallback,
+ ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(),
+ executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */
+ executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */
+ RemoteCommandRetryScheduler::makeRetryPolicy<ErrorCategory::RetriableError>(
+ 3U, executor::RemoteCommandRequest::kNoTimeout),
+ transport::kGlobalSSLMode);
+ uassertStatusOK(fetcher->schedule());
+ // std::cout << fetcher->getDiagnosticString() << std::endl;
+ // fetcher->join();
+ // fetcher.join();
+ return fetcher->onCompletion()
+ .thenRunOn(clonerExec)
+ .then([fetcher, files, exfiles, &_syncingFilesState] {
+ std::cout << "Ahoo -> Inside Fetcher Extend Finished -> " << std::endl;
+ *files = _syncingFilesState.getNewFilesToClone(*exfiles);
+ for (const auto& file : *files) {
+ std::cout << file << std::endl;
+ }
+ });
+}
+
+ExecutorFuture<void> _cloneFromCursor(const HostAndPort& host,
+ std::shared_ptr<executor::TaskExecutor> clonerExec,
+ const CancellationToken& token,
+ SyncingFilesState& _syncingFilesState) {
+ std::cout << "Ahoo ->Inside InitialSync fileBasedInitialSyncCycles: "
+ << _syncingFilesState.fileBasedInitialSyncCycles << std::endl;
+ if (_syncingFilesState.fileBasedInitialSyncCycles == 1) {
+ std::cout << "Ahoo ->Inside InitialSync start backup cursor: " << std::endl;
+ invariant(!_syncingFilesState.backupId);
+ auto files = std::make_shared<StringSet>();
+ return _openBackupCursor(host, clonerExec, token, _syncingFilesState, files)
+ .then([files, &_syncingFilesState, clonerExec, &host, token] {
+ _cloneFiles(*files);
+ _syncingFilesState.keepBackupCursorAlive(clonerExec, token, host);
+ });
+ }
+
+ invariant(_syncingFilesState.backupId);
+ auto files = std::make_shared<StringSet>();
+ return _extendBackupCursor(host,
+ clonerExec,
+ token,
+ _syncingFilesState,
+ files,
+ _syncingFilesState.lastAppliedOpTimeOnSyncSrc)
+ .then([files, &_syncingFilesState] {
+ _syncingFilesState.lastSyncedOpTime = _syncingFilesState.lastAppliedOpTimeOnSyncSrc;
+ _cloneFiles(*files);
+ });
+}
+
+ExecutorFuture<void> _startSyncingFiles(std::shared_ptr<executor::TaskExecutor> clonerExec,
+ std::unique_ptr<DBClientConnection>& client,
+ SyncingFilesState& _syncingFilesState,
+ const CancellationToken& token) {
+
+ return AsyncTry([&client, &_syncingFilesState, &clonerExec, &token]() mutable {
+ return _cloneFromCursor(
+ client->getServerHostAndPort(), clonerExec, token, _syncingFilesState)
+ .then([&client, &clonerExec, &token, &_syncingFilesState]() {
+ if (_syncingFilesState.fileBasedInitialSyncCycles == 1) {
+ std::cout << "Ahoo --> Inside InitialSync- start inserting "
+ << std::endl;
+ for (int i = 0; i < 2000; i++) {
+ auto insertCmd = BSON("insert"
+ << "hoppa"
+ << "documents" << BSON_ARRAY(BSON("a" << i))
+ << "writeConcern" << BSON("w" << 1));
+ client->runCommand(OpMsgRequest::fromDBAndBody("test", insertCmd));
+ }
+ mongo::sleepsecs(20);
+ }
+
+ return _getLastAppliedOpTimeFromRemoteNode(
+ client->getServerHostAndPort(), clonerExec, token);
+ })
+ .then([&_syncingFilesState](mongo::Timestamp result) {
+ _syncingFilesState.lastAppliedOpTimeOnSyncSrc = result;
+ });
+ })
+ .until([&client, &_syncingFilesState, &clonerExec, &token](Status status) mutable {
+ std::cout << "Ahoo ->Inside InitialSync Until: " << std::endl;
+
+ if (!status.isOK()) {
+ _syncingFilesState.KillBackupCursor(clonerExec, client->getServerHostAndPort());
+ return true;
+ }
+
+ if (static_cast<int>(_syncingFilesState.lastAppliedOpTimeOnSyncSrc.getSecs() -
+ _syncingFilesState.lastSyncedOpTime.getSecs()) >= 0) {
+ if (++_syncingFilesState.fileBasedInitialSyncCycles <= 3) {
+ std::cout << "Ahoo ->Inside InitialSync lastSyncedOpTime: "
+ << _syncingFilesState.lastSyncedOpTime.toString() << std::endl;
+ std::cout << "Ahoo ->Inside InitialSync syncSourceLastApliedOpTime: "
+ << _syncingFilesState.lastAppliedOpTimeOnSyncSrc.toString()
+ << std::endl;
+ std::cout << "Ahoo ->Inside InitialSync Diff: "
+ << (_syncingFilesState.lastAppliedOpTimeOnSyncSrc.getSecs() -
+ _syncingFilesState.lastSyncedOpTime.getSecs())
+ << std::endl;
+ return false;
+ }
+ }
+
+ _syncingFilesState.KillBackupCursor(clonerExec, client->getServerHostAndPort());
+
+ std::cout << "Ahoo ->Inside InitialSync stop looping: " << std::endl;
+ return true;
+ })
+ .on(clonerExec, token);
+}
+
+mongo::Timestamp _getLastAppliedOpTimeLocal(
+ DBClientBase* conn) {
+ std::cout << "here 1" << std::endl;
+ auto res = conn->runCommand(
+ OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, BSON("replSetGetStatus" << 1)));
+ auto reply = res->getCommandReply();
+
+ std::cout << "here 2" << reply << std::endl;
+ uassertStatusOK(getStatusFromCommandResult(reply));
+
+ // Parsing replSetGetStatus's reply to get lastAppliedOpTime.
+ // ReplSetGetStatus's reply example:
+ // {
+ // ...
+ // "optimes" : {
+ // ...
+ // "appliedOpTime" : {
+ // "ts" : Timestamp(1583385878, 1),
+ // "t" : NumberLong(3)
+ // },
+ // ...
+ // }
+ // ...
+ // }
+ auto lastAppliedOpTime = _getBSONField(
+ _getBSONField(_getBSONField(reply, "optimes", "replSetGetStatus's reply").Obj(),
+ "appliedOpTime",
+ "replSetGetStatus's reply.optimes")
+ .Obj(),
+ "ts",
+ "replSetGetStatus's reply.optimes.appliedOpTime");
+ return lastAppliedOpTime.timestamp();
+}
+
void InitialSyncer::_chooseSyncSourceCallback(
const executor::TaskExecutor::CallbackArgs& callbackArgs,
std::uint32_t chooseSyncSourceAttempt,
@@ -795,6 +1225,73 @@ void InitialSyncer::_chooseSyncSourceCallback(
_syncSource = syncSource.getValue();
+ {
+ /// AHOO
+ // auto client = std::make_unique<DBDirectClient>(true /* autoReconnect */);
+ // _client;
+ // uassertStatusOK(client->connect(_syncSource, "FileCopyBasedInitialSyncer", boost::none));
+ // std::cout << "AHOO ---> Inside InitialSync == client : " << client->toString()
+ // << std::endl;
+ // SyncingFilesState _syncingFilesState;
+ // auto primary = CancellationSource();
+ // uassertStatusOK(_startSyncingFiles(_clonerExec, client, _syncingFilesState, primary.token())
+ // .getNoThrow());
+ // std::set<std::string> files;
+ // uassertStatusOK(_openBackupCursor(_syncSource, _clonerExec, primary.token(),
+ // _syncingFilesState, files).getNoThrow());
+ AggregateCommandRequest aggRequest(
+ NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
+ {BSON("$backupCursor" << BSONObj())});
+ // We must set a writeConcern on internal commands.
+ aggRequest.setWriteConcern(WriteConcernOptions());
+ auto opctx = makeOpCtx();
+ auto client = std::make_unique<DBDirectClient>(opctx.get());
+ auto statusWith = DBClientCursor::fromAggregationRequest(
+ client.get(), aggRequest, true /* secondaryOk */, false /* useExhaust */);
+
+ std::cout << "Ahoo --> WE ARE HERE --> " << statusWith.isOK() << std::endl;
+ auto cursor = statusWith.getValue().get();
+ bool firstBatch = false;
+ auto backupId = UUID::gen();
+ while (cursor->more()) {
+ auto batch = cursor->next();
+ if(!firstBatch) {
+ firstBatch = true;
+ backupId = UUID(uassertStatusOK(UUID::parse(batch["metadata"]["backupId"])));
+ }
+ std::cout << "Ahoo BackupCursor--> " << batch << std::endl;
+ }
+
+ // std::cout << "Ahoo --> backupId " << backupId << std::endl;
+
+ // auto lastOpTime = _getLastAppliedOpTimeLocal(client.get());
+
+ // std::cout << "Ahoo --> lastOpTime " << lastOpTime.toString() << std::endl;
+
+ // AggregateCommandRequest aggRequestEx(
+ // NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
+ // {BSON("$backupCursorExtend" << BSON("backupId" << backupId << "timestamp" << lastOpTime))});
+ // // We must set a writeConcern on internal commands.
+ // aggRequestEx.setWriteConcern(WriteConcernOptions());
+ // // The command may not return immediately because it may wait for the node to have the full
+ // // oplog history up to the backup point in time.
+ // aggRequestEx.setMaxTimeMS(180 * 1000);
+
+ // std::cout << "Ahoo --> open Extended Cursror " << std::endl;
+ // statusWith = DBClientCursor::fromAggregationRequest(
+ // client.get(), aggRequestEx, true /* secondaryOk */, false /* useExhaust */);
+
+ // std::cout << "Ahoo --> WE ARE HERE2 --> " << statusWith.isOK() << std::endl;
+ // auto exCursor = statusWith.getValue().get();
+ // while (exCursor->more()) {
+ // auto batch = exCursor->next();
+ // std::cout << "Ahoo ExtendedBackupCursor--> " << batch << std::endl;
+ // }
+
+ std::cout << "Ahoo --> Kill backupCursor " << std::endl;
+ cursor->kill();
+ }
+
// Schedule rollback ID checker.
_rollbackChecker = std::make_unique<RollbackChecker>(*_attemptExec, _syncSource);
auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) {
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 6d2793b1a3e..130e7ae2736 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -571,6 +571,10 @@ private:
auto command = _execContext->getCommand();
auto& request = _execContext->getRequest();
+ if (command &&
+ (command->getName() == "aggregate" || command->getName() == "getMore" ||
+ command->getName() == "killCursors"))
+ std::cout << "Ahoo --> new Request: " << request.body << std::endl;
const auto apiParamsFromClient = initializeAPIParameters(request.body, command);
Client* client = opCtx->getClient();
@@ -1865,6 +1869,9 @@ DbResponse makeCommandResponse(std::shared_ptr<HandleRequest::ExecutionContext>
}
}
+ if (c &&
+ (c->getName() == "aggregate" || c->getName() == "getMore" || c->getName() == "killCursors"))
+ std::cout << "Ahoo ==> Reply: " << replyBuilder->getBodyBuilder().asTempObj() << std::endl;
dbResponse.response = replyBuilder->done();
CurOp::get(opCtx)->debug().responseLength = dbResponse.response.header().dataLen();