From d14c8c01e589b11340cb0b7dc9e33c32601cd5b6 Mon Sep 17 00:00:00 2001 From: Moustafa Maher Date: Fri, 1 Oct 2021 22:42:09 +0000 Subject: tst --- jstests/replsets/test.js | 159 +++++++++ src/mongo/db/repl/initial_syncer.cpp | 501 +++++++++++++++++++++++++++- src/mongo/db/service_entry_point_common.cpp | 7 + 3 files changed, 665 insertions(+), 2 deletions(-) create mode 100644 jstests/replsets/test.js diff --git a/jstests/replsets/test.js b/jstests/replsets/test.js new file mode 100644 index 00000000000..48bf8f3afed --- /dev/null +++ b/jstests/replsets/test.js @@ -0,0 +1,159 @@ +/** + * Test the waiting logic of $backupCursorExtend. Given a timestamp T, when + * $backupCursorExtend returns, oplog with T should be majority committed and + * persisent on the disk of that node. + * + * @tags: [ + * requires_journaling, + * requires_persistence, + * requires_sharding, + * requires_wiredtiger, + * ] + */ +(function() { +"use strict"; +load("jstests/replsets/rslib.js"); // For reconfig, isConfigCommitted and + // safeReconfigShouldFail. +load("jstests/libs/backup_utils.js"); +load("jstests/libs/write_concern_util.js"); + +const DEBUG = false; +const dbName = "test"; +const collName = "coll"; +const restorePath = MongoRunner.dataPath + "forRestore/"; +const numDocs = 2; + +let addNodeConfig = function(rst, nodeId, conn, arbiter) { + const config = rst.getReplSetConfigFromNode(); + if (arbiter) { + config.members.push({_id: nodeId, host: conn.host, arbiterOnly: true}); + } else { + config.members.push({_id: nodeId, host: conn.host}); + } + + return config; +}; + +let removeNodeConfig = function(rst, conn) { + const config = rst.getReplSetConfigFromNode(); + for (var i = 0; i < config.members.length; i++) { + if (config.members[i].host == conn.host) { + config.members.splice(i, 1); + break; + } + } + + return config; +}; + +function testReconfig(rst, config, shouldSucceed, errCode, errMsg) { + if (shouldSucceed) { + reconfig(rst, config); + assert.soon(() => isConfigCommitted(rst.getPrimary())); + rst.waitForConfigReplication(rst.getPrimary()); + rst.awaitReplication(); + // rst.await + } else { + safeReconfigShouldFail(rst, config, false /* force */, errCode, errMsg); + + // A force reconfig should also fail. + safeReconfigShouldFail(rst, config, true /* force */, errCode, errMsg); + } +} + +function insertDoc(db, collName, doc) { + let res = assert.commandWorked(db.runCommand({insert: collName, documents: [doc]})); + assert(res.hasOwnProperty("operationTime"), tojson(res)); + return res.operationTime; +} + +/* + * Assert that lagged secondary will block when Timestamp T has not been majority committed yet. + */ +function assertLaggedSecondaryGetBlocked() { + resetDbpath(restorePath); + let rst = new ReplSetTest({name: "test", nodes: 1}); + rst.startSet(); + rst.initiateWithHighElectionTimeout(); + const primaryDB = rst.getPrimary().getDB(dbName); + + print("Ahoo0 ==> Insert Docs to Primary"); + for (let i = 0; i < 1000; i++) { + insertDoc(primaryDB, collName, {k: i}); + } + + print("Ahoo0 ==> AddSecondary"); + testReconfig(rst, + addNodeConfig(rst, 1 /* nodeId */, rst.add() /* conn */, false /* arbiter */), + true /* shouldSucceed */); + rst.stopSet(); + return; + + let cursor = openBackupCursor(rst.getSecondary()); + // let firstBatch = cursor.next(); + let firstBatch = undefined; + while(cursor.hasNext()) { + let batch = cursor.next(); + print("Ahoo1 ==> ", tojson(batch)); + if (!firstBatch) { + firstBatch = batch; + } + } + + print("Ahoo2 --> first batch: ", tojson(firstBatch)); + let checkpointTimestamp = firstBatch.metadata["checkpointTimestamp"]; + const backupId = firstBatch.metadata.backupId; + print("ahoo2 -> ", tojson(checkpointTimestamp), " "+ backupId); + + jsTestLog("Start writes on primary"); + let clusterTime; + for (let i = 0; i < numDocs - 1; i++) { + clusterTime = insertDoc(primaryDB, collName, {a: i}); + } + + print("Ahoo3 ==> clusterTime: ", tojson(clusterTime)); + let extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime); + while(extendCursor.hasNext()) { + let batch = extendCursor.next(); + print("Ahoo3 ==> ", tojson(batch)); + } + + jsTestLog("Start writes on primary"); + for (let i = 0; i < numDocs - 1; i++) { + clusterTime = insertDoc(primaryDB, collName, {b: i}); + } + + print("Ahoo4 ==> clusterTime: ", tojson(clusterTime)); + extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime); + while(extendCursor.hasNext()) { + let batch = extendCursor.next(); + print("Ahoo4 ==> ", tojson(batch)); + } + + jsTestLog("Start writes on primary"); + for (let i = 0; i < numDocs - 1; i++) { + clusterTime = insertDoc(primaryDB, collName, {c: i}); + } + + print("Ahoo5 ==> clusterTime: ", tojson(clusterTime)); + extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime); + while(extendCursor.hasNext()) { + let batch = extendCursor.next(); + print("Ahoo5 ==> ", tojson(batch)); + } + + cursor.close(); + + cursor = openBackupCursor(rst.getSecondary()); + // let firstBatch = cursor.next(); + while(cursor.hasNext()) { + let batch = cursor.next(); + print("Ahoo6 ==> ", tojson(batch)); + } + + cursor.close(); + rst.stopSet(); +} + +assertLaggedSecondaryGetBlocked(); +})(); diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index f4810d8ce3d..5fd72435a2c 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -29,9 +29,10 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplicationInitialSync -#include "mongo/platform/basic.h" - #include "initial_syncer.h" +#include "mongo/platform/basic.h" +#include "mongo/util/future_util.h" +#include #include #include @@ -77,6 +78,7 @@ #include "mongo/util/time_support.h" #include "mongo/util/timer.h" #include "mongo/util/version/releases.h" +#include "mongo/db/dbdirectclient.h" namespace mongo { namespace repl { @@ -704,6 +706,434 @@ void InitialSyncer::_startInitialSyncAttemptCallback( } } +// void _cloneFiles(const std::set &files) { +// std::cout << "Ahoo ->Start cloning files:" << std::endl; +// for (auto str : files) { +// std::cout << str << std::endl; +// } +// } + +void _cloneFiles(const StringSet& files) { + std::cout << "Ahoo ->Start cloning files:" << std::endl; + for (auto str : files) { + std::cout << str << std::endl; + } +} + +struct SyncingFilesState { + SyncingFilesState() = default; + void keepBackupCursorAlive(std::shared_ptr clonerExec, + const CancellationToken& parentToken, + const HostAndPort& host) { + backupCursorKeepAliveCancellation = CancellationSource(parentToken); + std::cout << "Ahoo --> Inside keepBackupCursorAlive " << std::endl; + return AsyncTry([this, host, clonerExec] { + if (backupId) { + executor::RemoteCommandRequest request( + host, + NamespaceString::kAdminDb.toString(), + std::move(BSON("getMore" << cursorId << "collection" + << "$cmd.aggregate")), + rpc::makeEmptyMetadata(), + nullptr); + // We're not expecting a response, set to fire and forget + request.fireAndForgetMode = + executor::RemoteCommandRequest::FireAndForgetMode::kOn; + std::cout << "Ahoo --> Inside keepBackupCursorAlive new command" + << std::endl; + return clonerExec + ->scheduleRemoteCommand(std::move(request), + backupCursorKeepAliveCancellation.token()) + .getAsync([](auto&&) {}); + } + }) + .until([](Status) { return false; }) + .withDelayBetweenIterations(Seconds(2)) + .on(clonerExec, backupCursorKeepAliveCancellation.token()) + .getAsync([](auto&&) {}); // Ignore the result Future; + } + + void KillBackupCursor(std::shared_ptr clonerExec, + const HostAndPort& host) { + if (backupId) { + std::cout << "Ahoo ->Inside InitialSync Cancelling backupCursorCancellation:" + << std::endl; + backupCursorKeepAliveCancellation.cancel(); + // mongo::sleepsecs(20); + auto cmdObj = BSON("killCursors" + << "$cmd.aggregate" + << "cursors" << BSON_ARRAY(cursorId)); + // std::cout << "Ahoo ->Inside InitialSync kill cursor :" << " " << cmdObj << std::endl; + + executor::RemoteCommandRequest request(host, + NamespaceString::kAdminDb.toString(), + std::move(cmdObj), + rpc::makeEmptyMetadata(), + nullptr); + // We're not expecting a response, set to fire and forget + request.fireAndForgetMode = executor::RemoteCommandRequest::FireAndForgetMode::kOn; + return clonerExec + ->scheduleRemoteCommand(std::move(request), CancellationToken::uncancelable()) + .getAsync([](auto&&) {}); + } + } + + // Extended cursor sends all log files created since the backupCursor's + // checkpointTimestamp till the extendTo timestamp, so we need to get the + // difference between the files returned by the consecutive backupCursorExtend to + // clone only the new log files added since the previous backupCursorExtend. + StringSet getNewFilesToClone(std::set& backupCursorExtendFiles) { + StringSet newFilesToClone; + std::set_difference(backupCursorExtendFiles.begin(), + backupCursorExtendFiles.end(), + extendedCursorFiles.begin(), + extendedCursorFiles.end(), + std::inserter(newFilesToClone, newFilesToClone.begin())); + extendedCursorFiles.insert(newFilesToClone.begin(), newFilesToClone.end()); + return newFilesToClone; + } + + CancellationSource backupCursorKeepAliveCancellation; + boost::optional backupId; + CursorId cursorId; + mongo::Timestamp lastSyncedOpTime; + mongo::Timestamp lastAppliedOpTimeOnSyncSrc; + std::set extendedCursorFiles; + int fileBasedInitialSyncCycles = 1; +}; + +BSONElement _getBSONField(const BSONObj& obj, + const std::string& fieldName, + const std::string& objName) { + uassert(ErrorCodes::NoSuchKey, + str::stream() << "Missing " << fieldName << "field for " << objName << ".", + obj.hasField(fieldName)); + return obj.getField(fieldName); +} + +ExecutorFuture _getLastAppliedOpTimeFromRemoteNode( + HostAndPort host, + std::shared_ptr clonerExec, + const CancellationToken& token) { + executor::RemoteCommandRequest request(std::move(host), + NamespaceString::kAdminDb.toString(), + std::move(BSON("replSetGetStatus" << 1)), + rpc::makeEmptyMetadata(), + nullptr); + return clonerExec->scheduleRemoteCommand(std::move(request), token) + .then([](const auto& response) { + uassertStatusOK(response.status); + auto& reply = response.data; + uassertStatusOK(getStatusFromCommandResult(reply)); + // Parsing replSetGetStatus's reply to get lastAppliedOpTime. + // ReplSetGetStatus's reply example: + // { + // ... + // "optimes" : { + // ... + // "appliedOpTime" : { + // "ts" : Timestamp(1583385878, 1), + // "t" : NumberLong(3) + // }, + // ... + // } + // ... + // } + auto lastAppliedOpTime = _getBSONField( + _getBSONField(_getBSONField(reply, "optimes", "replSetGetStatus's reply").Obj(), + "appliedOpTime", + "replSetGetStatus's reply.optimes") + .Obj(), + "ts", + "replSetGetStatus's reply.optimes.appliedOpTime"); + return lastAppliedOpTime.timestamp(); + }); +} + +ExecutorFuture _openBackupCursor(const HostAndPort& host, + std::shared_ptr clonerExec, + const CancellationToken& token, + SyncingFilesState& _syncingFilesState, + std::shared_ptr files) { + + const auto cmdObj = [&] { + AggregateCommandRequest aggRequest( + NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb), + {BSON("$backupCursor" << BSONObj())}); + // We must set a writeConcern on internal commands. + aggRequest.setWriteConcern(WriteConcernOptions()); + return aggRequest.toBSON(BSONObj()); + }(); + + std::cout << "Ahoo -> Inside Fetcher CMD -> " << cmdObj << std::endl; + + auto fetcherCallback = [&_syncingFilesState, files, token]( + const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + std::cout << "Ahoo -> Inside Fetcher callback-> " << dataStatus.getStatus().reason() + << std::endl; + // Throw out any accumulated results on error + uassertStatusOK(dataStatus.getStatus()); + uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled()); + + const auto& data = dataStatus.getValue(); + for (const BSONObj& doc : data.documents) { + std::cout << "Ahoo -> Inside Fetcher -> " << doc << std::endl; + if (!_syncingFilesState.backupId) { + // First batch must contain the metadata. + // Parsing the metadata to get backupId and checkpointTimestamp for the + // the backupCursor. + const auto& metaData = + _getBSONField(doc, "metadata", "backupCursor's first batch").Obj(); + _syncingFilesState.backupId = UUID(uassertStatusOK(UUID::parse( + _getBSONField(metaData, "backupId", "backupCursor's first batch.metadata")))); + _syncingFilesState.lastSyncedOpTime = + _getBSONField( + metaData, "checkpointTimestamp", "backupCursor's first batch.metadata") + .timestamp(); + _syncingFilesState.cursorId = data.cursorId; + std::cout << "AHoo -> _syncingFilesState.cursorId " << _syncingFilesState.cursorId + << std::endl; + } else { + files->insert(_getBSONField(doc, "filename", "backupCursor's batches").str()); + } + } + + if (!getMoreBob) { + return; + } + + if (!data.documents.size()) { + *nextAction = Fetcher::NextAction::kExitAndKeepCursorAlive; + return; + } + + getMoreBob->append("getMore", data.cursorId); + getMoreBob->append("collection", data.nss.coll()); + }; + auto fetcher = std::make_shared( + clonerExec.get(), + host, + NamespaceString::kAdminDb.toString(), + cmdObj, + fetcherCallback, + ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(), + executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */ + executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */ + RemoteCommandRetryScheduler::makeRetryPolicy( + 3U, executor::RemoteCommandRequest::kNoTimeout), + transport::kGlobalSSLMode); + uassertStatusOK(fetcher->schedule()); + // std::cout << fetcher->getDiagnosticString() << std::endl; + // fetcher->join(); + // fetcher.join(); + return fetcher->onCompletion().thenRunOn(clonerExec).then([fetcher, files] { + std::cout << "Ahoo -> Inside Fetcher Finished -> " << std::endl; + for (const auto& file : *files) { + std::cout << file << std::endl; + } + }); +} + +ExecutorFuture _extendBackupCursor(const HostAndPort& host, + std::shared_ptr clonerExec, + const CancellationToken& token, + SyncingFilesState& _syncingFilesState, + std::shared_ptr files, + const Timestamp& extendTo) { + + const auto cmdObj = [&] { + AggregateCommandRequest aggRequest( + NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb), + {BSON("$backupCursorExtend" << BSON("backupId" << _syncingFilesState.backupId.get() + << "timestamp" << extendTo))}); + // We must set a writeConcern on internal commands. + aggRequest.setWriteConcern(WriteConcernOptions()); + // The command may not return immediately because it may wait for the node to have the full + // oplog history up to the backup point in time. + aggRequest.setMaxTimeMS(180 * 1000); + return aggRequest.toBSON(BSONObj()); + }(); + + std::cout << "Ahoo -> Inside Fetcher Extend CMD -> " << cmdObj << std::endl; + auto exfiles = std::make_shared>(); + auto fetcherCallback = + [&_syncingFilesState, exfiles, token](const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + std::cout << "Ahoo -> Inside Fetcher Extend callback-> " + << dataStatus.getStatus().reason() << std::endl; + // Throw out any accumulated results on error + uassertStatusOK(dataStatus.getStatus()); + uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled()); + + const auto& data = dataStatus.getValue(); + for (const BSONObj& doc : data.documents) { + std::cout << "Ahoo -> Inside Fetcher Extend -> " << doc << std::endl; + exfiles->insert(_getBSONField(doc, "filename", "backupCursor's batches").str()); + } + + if (!getMoreBob || !data.documents.size()) { + return; + } + getMoreBob->append("getMore", data.cursorId); + getMoreBob->append("collection", data.nss.coll()); + }; + + auto fetcher = std::make_shared( + clonerExec.get(), + host, + NamespaceString::kAdminDb.toString(), + cmdObj, + fetcherCallback, + ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(), + executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */ + executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */ + RemoteCommandRetryScheduler::makeRetryPolicy( + 3U, executor::RemoteCommandRequest::kNoTimeout), + transport::kGlobalSSLMode); + uassertStatusOK(fetcher->schedule()); + // std::cout << fetcher->getDiagnosticString() << std::endl; + // fetcher->join(); + // fetcher.join(); + return fetcher->onCompletion() + .thenRunOn(clonerExec) + .then([fetcher, files, exfiles, &_syncingFilesState] { + std::cout << "Ahoo -> Inside Fetcher Extend Finished -> " << std::endl; + *files = _syncingFilesState.getNewFilesToClone(*exfiles); + for (const auto& file : *files) { + std::cout << file << std::endl; + } + }); +} + +ExecutorFuture _cloneFromCursor(const HostAndPort& host, + std::shared_ptr clonerExec, + const CancellationToken& token, + SyncingFilesState& _syncingFilesState) { + std::cout << "Ahoo ->Inside InitialSync fileBasedInitialSyncCycles: " + << _syncingFilesState.fileBasedInitialSyncCycles << std::endl; + if (_syncingFilesState.fileBasedInitialSyncCycles == 1) { + std::cout << "Ahoo ->Inside InitialSync start backup cursor: " << std::endl; + invariant(!_syncingFilesState.backupId); + auto files = std::make_shared(); + return _openBackupCursor(host, clonerExec, token, _syncingFilesState, files) + .then([files, &_syncingFilesState, clonerExec, &host, token] { + _cloneFiles(*files); + _syncingFilesState.keepBackupCursorAlive(clonerExec, token, host); + }); + } + + invariant(_syncingFilesState.backupId); + auto files = std::make_shared(); + return _extendBackupCursor(host, + clonerExec, + token, + _syncingFilesState, + files, + _syncingFilesState.lastAppliedOpTimeOnSyncSrc) + .then([files, &_syncingFilesState] { + _syncingFilesState.lastSyncedOpTime = _syncingFilesState.lastAppliedOpTimeOnSyncSrc; + _cloneFiles(*files); + }); +} + +ExecutorFuture _startSyncingFiles(std::shared_ptr clonerExec, + std::unique_ptr& client, + SyncingFilesState& _syncingFilesState, + const CancellationToken& token) { + + return AsyncTry([&client, &_syncingFilesState, &clonerExec, &token]() mutable { + return _cloneFromCursor( + client->getServerHostAndPort(), clonerExec, token, _syncingFilesState) + .then([&client, &clonerExec, &token, &_syncingFilesState]() { + if (_syncingFilesState.fileBasedInitialSyncCycles == 1) { + std::cout << "Ahoo --> Inside InitialSync- start inserting " + << std::endl; + for (int i = 0; i < 2000; i++) { + auto insertCmd = BSON("insert" + << "hoppa" + << "documents" << BSON_ARRAY(BSON("a" << i)) + << "writeConcern" << BSON("w" << 1)); + client->runCommand(OpMsgRequest::fromDBAndBody("test", insertCmd)); + } + mongo::sleepsecs(20); + } + + return _getLastAppliedOpTimeFromRemoteNode( + client->getServerHostAndPort(), clonerExec, token); + }) + .then([&_syncingFilesState](mongo::Timestamp result) { + _syncingFilesState.lastAppliedOpTimeOnSyncSrc = result; + }); + }) + .until([&client, &_syncingFilesState, &clonerExec, &token](Status status) mutable { + std::cout << "Ahoo ->Inside InitialSync Until: " << std::endl; + + if (!status.isOK()) { + _syncingFilesState.KillBackupCursor(clonerExec, client->getServerHostAndPort()); + return true; + } + + if (static_cast(_syncingFilesState.lastAppliedOpTimeOnSyncSrc.getSecs() - + _syncingFilesState.lastSyncedOpTime.getSecs()) >= 0) { + if (++_syncingFilesState.fileBasedInitialSyncCycles <= 3) { + std::cout << "Ahoo ->Inside InitialSync lastSyncedOpTime: " + << _syncingFilesState.lastSyncedOpTime.toString() << std::endl; + std::cout << "Ahoo ->Inside InitialSync syncSourceLastApliedOpTime: " + << _syncingFilesState.lastAppliedOpTimeOnSyncSrc.toString() + << std::endl; + std::cout << "Ahoo ->Inside InitialSync Diff: " + << (_syncingFilesState.lastAppliedOpTimeOnSyncSrc.getSecs() - + _syncingFilesState.lastSyncedOpTime.getSecs()) + << std::endl; + return false; + } + } + + _syncingFilesState.KillBackupCursor(clonerExec, client->getServerHostAndPort()); + + std::cout << "Ahoo ->Inside InitialSync stop looping: " << std::endl; + return true; + }) + .on(clonerExec, token); +} + +mongo::Timestamp _getLastAppliedOpTimeLocal( + DBClientBase* conn) { + std::cout << "here 1" << std::endl; + auto res = conn->runCommand( + OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, BSON("replSetGetStatus" << 1))); + auto reply = res->getCommandReply(); + + std::cout << "here 2" << reply << std::endl; + uassertStatusOK(getStatusFromCommandResult(reply)); + + // Parsing replSetGetStatus's reply to get lastAppliedOpTime. + // ReplSetGetStatus's reply example: + // { + // ... + // "optimes" : { + // ... + // "appliedOpTime" : { + // "ts" : Timestamp(1583385878, 1), + // "t" : NumberLong(3) + // }, + // ... + // } + // ... + // } + auto lastAppliedOpTime = _getBSONField( + _getBSONField(_getBSONField(reply, "optimes", "replSetGetStatus's reply").Obj(), + "appliedOpTime", + "replSetGetStatus's reply.optimes") + .Obj(), + "ts", + "replSetGetStatus's reply.optimes.appliedOpTime"); + return lastAppliedOpTime.timestamp(); +} + void InitialSyncer::_chooseSyncSourceCallback( const executor::TaskExecutor::CallbackArgs& callbackArgs, std::uint32_t chooseSyncSourceAttempt, @@ -795,6 +1225,73 @@ void InitialSyncer::_chooseSyncSourceCallback( _syncSource = syncSource.getValue(); + { + /// AHOO + // auto client = std::make_unique(true /* autoReconnect */); + // _client; + // uassertStatusOK(client->connect(_syncSource, "FileCopyBasedInitialSyncer", boost::none)); + // std::cout << "AHOO ---> Inside InitialSync == client : " << client->toString() + // << std::endl; + // SyncingFilesState _syncingFilesState; + // auto primary = CancellationSource(); + // uassertStatusOK(_startSyncingFiles(_clonerExec, client, _syncingFilesState, primary.token()) + // .getNoThrow()); + // std::set files; + // uassertStatusOK(_openBackupCursor(_syncSource, _clonerExec, primary.token(), + // _syncingFilesState, files).getNoThrow()); + AggregateCommandRequest aggRequest( + NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb), + {BSON("$backupCursor" << BSONObj())}); + // We must set a writeConcern on internal commands. + aggRequest.setWriteConcern(WriteConcernOptions()); + auto opctx = makeOpCtx(); + auto client = std::make_unique(opctx.get()); + auto statusWith = DBClientCursor::fromAggregationRequest( + client.get(), aggRequest, true /* secondaryOk */, false /* useExhaust */); + + std::cout << "Ahoo --> WE ARE HERE --> " << statusWith.isOK() << std::endl; + auto cursor = statusWith.getValue().get(); + bool firstBatch = false; + auto backupId = UUID::gen(); + while (cursor->more()) { + auto batch = cursor->next(); + if(!firstBatch) { + firstBatch = true; + backupId = UUID(uassertStatusOK(UUID::parse(batch["metadata"]["backupId"]))); + } + std::cout << "Ahoo BackupCursor--> " << batch << std::endl; + } + + // std::cout << "Ahoo --> backupId " << backupId << std::endl; + + // auto lastOpTime = _getLastAppliedOpTimeLocal(client.get()); + + // std::cout << "Ahoo --> lastOpTime " << lastOpTime.toString() << std::endl; + + // AggregateCommandRequest aggRequestEx( + // NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb), + // {BSON("$backupCursorExtend" << BSON("backupId" << backupId << "timestamp" << lastOpTime))}); + // // We must set a writeConcern on internal commands. + // aggRequestEx.setWriteConcern(WriteConcernOptions()); + // // The command may not return immediately because it may wait for the node to have the full + // // oplog history up to the backup point in time. + // aggRequestEx.setMaxTimeMS(180 * 1000); + + // std::cout << "Ahoo --> open Extended Cursror " << std::endl; + // statusWith = DBClientCursor::fromAggregationRequest( + // client.get(), aggRequestEx, true /* secondaryOk */, false /* useExhaust */); + + // std::cout << "Ahoo --> WE ARE HERE2 --> " << statusWith.isOK() << std::endl; + // auto exCursor = statusWith.getValue().get(); + // while (exCursor->more()) { + // auto batch = exCursor->next(); + // std::cout << "Ahoo ExtendedBackupCursor--> " << batch << std::endl; + // } + + std::cout << "Ahoo --> Kill backupCursor " << std::endl; + cursor->kill(); + } + // Schedule rollback ID checker. _rollbackChecker = std::make_unique(*_attemptExec, _syncSource); auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) { diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 6d2793b1a3e..130e7ae2736 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -571,6 +571,10 @@ private: auto command = _execContext->getCommand(); auto& request = _execContext->getRequest(); + if (command && + (command->getName() == "aggregate" || command->getName() == "getMore" || + command->getName() == "killCursors")) + std::cout << "Ahoo --> new Request: " << request.body << std::endl; const auto apiParamsFromClient = initializeAPIParameters(request.body, command); Client* client = opCtx->getClient(); @@ -1865,6 +1869,9 @@ DbResponse makeCommandResponse(std::shared_ptr } } + if (c && + (c->getName() == "aggregate" || c->getName() == "getMore" || c->getName() == "killCursors")) + std::cout << "Ahoo ==> Reply: " << replyBuilder->getBodyBuilder().asTempObj() << std::endl; dbResponse.response = replyBuilder->done(); CurOp::get(opCtx)->debug().responseLength = dbResponse.response.header().dataLen(); -- cgit v1.2.1