summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2020-09-22 11:03:30 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-23 14:25:06 +0000
commit25ff1dbbe2a1f7cce66ad7571212dace2956a455 (patch)
treebee4e252a36cec17a1202ceaf018f7dbebb1815e /src/mongo
parentec76a8c2a0c2324978f17191bf17e9b8efb2e604 (diff)
downloadmongo-25ff1dbbe2a1f7cce66ad7571212dace2956a455.tar.gz
SERVER-48806 Obtain and persist donor optimes in MigrationServerInstance
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp24
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h8
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp89
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h5
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp241
-rw-r--r--src/mongo/dbtests/SConscript6
-rw-r--r--src/mongo/dbtests/mock/mock_remote_db_server.cpp27
-rw-r--r--src/mongo/dbtests/mock/mock_remote_db_server.h16
9 files changed, 416 insertions, 2 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d7adc53133e..de85cf9021e 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1291,6 +1291,8 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/client/clientdriver_network',
+ '$BUILD_DIR/mongo/db/transaction',
+ 'oplog_entry',
'tenant_migration_state_machine_idl',
]
)
diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp
index 69b181302f7..357d38ae466 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp
@@ -39,6 +39,8 @@
#include "mongo/db/index_build_entry_helpers.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/update.h"
+#include "mongo/db/ops/update_request.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/util/str.h"
@@ -101,6 +103,28 @@ Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDoc
});
}
+Status updateStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc) {
+ const auto nss = NamespaceString::kTenantMigrationRecipientsNamespace;
+ AutoGetCollection collection(opCtx, nss, MODE_IX);
+
+ if (!collection) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << nss.ns() << " does not exist");
+ }
+ auto updateReq = UpdateRequest();
+ updateReq.setNamespaceString(nss);
+ updateReq.setQuery(BSON("_id" << stateDoc.getId()));
+ updateReq.setUpdateModification(
+ write_ops::UpdateModification::parseFromClassicUpdate(stateDoc.toBSON()));
+ auto updateResult = update(opCtx, collection.getDb(), updateReq);
+ if (updateResult.numMatched == 0) {
+ return {ErrorCodes::NoSuchKey,
+ str::stream() << "Existing Tenant Migration State Document not found for id: "
+ << stateDoc.getId()};
+ }
+ return Status::OK();
+}
+
StatusWith<TenantMigrationRecipientDocument> getStateDoc(OperationContext* opCtx,
const UUID& migrationUUID) {
// Read the most up to date data.
diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h
index 7bd3ece2ab2..3a6d88beb61 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h
@@ -50,6 +50,14 @@ namespace tenantMigrationRecipientEntryHelpers {
Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc);
/**
+ * Updates the state doc in the database.
+ *
+ * Returns 'NoSuchKey' error code if no state document already exists on the disk with the same
+ * 'migrationUUID'.
+ */
+Status updateStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc);
+
+/**
* Returns the state doc matching the document with 'migrationUUID' from the disk if it
* exists. Reads at "no" timestamp i.e, reading with the "latest" snapshot reflecting up to date
* data.
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index bdf3df5a086..560ad7ba86e 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -39,11 +39,14 @@
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h"
#include "mongo/db/repl/tenant_migration_recipient_service.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -55,7 +58,9 @@ namespace repl {
MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc);
MONGO_FAIL_POINT_DEFINE(stopAfterPersistingTenantMigrationRecipientInstanceStateDoc);
MONGO_FAIL_POINT_DEFINE(stopAfterConnectingTenantMigrationRecipientInstance);
+MONGO_FAIL_POINT_DEFINE(stopAfterRetrievingStartOpTimesMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout);
+MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance);
TenantMigrationRecipientService::TenantMigrationRecipientService(ServiceContext* serviceContext)
: PrimaryOnlyService(serviceContext) {}
@@ -213,6 +218,80 @@ SharedSemiFuture<void> TenantMigrationRecipientService::Instance::_initializeSta
return WaitForMajorityService::get(opCtx->getServiceContext()).waitUntilMajority(insertOpTime);
}
+void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock) {
+ // Get the last oplog entry at the read concern majority optime in the remote oplog. It
+ // does not matter which tenant it is for.
+ auto oplogOpTimeFields =
+ BSON(OplogEntry::kTimestampFieldName << 1 << OplogEntry::kTermFieldName << 1);
+ auto lastOplogEntry1Bson =
+ _client->findOne(NamespaceString::kRsOplogNamespace.ns(),
+ Query().sort("$natural", -1),
+ &oplogOpTimeFields,
+ QueryOption_SlaveOk,
+ ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner());
+ uassert(4880601, "Found no entries in the remote oplog", !lastOplogEntry1Bson.isEmpty());
+ LOGV2_DEBUG(4880600,
+ 2,
+ "Found last oplog entry at read concern majority optime on remote node",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = _stateDoc.getDatabasePrefix(),
+ "lastOplogEntry"_attr = lastOplogEntry1Bson);
+ auto lastOplogEntry1OpTime = uassertStatusOK(OpTime::parseFromOplogEntry(lastOplogEntry1Bson));
+
+ // Get the optime of the earliest transaction that was open at the read concern majority optime
+ // As with the last oplog entry, it does not matter that this may be for a different tenant; an
+ // optime that is too early does not result in incorrect behavior.
+ const auto preparedState = DurableTxnState_serializer(DurableTxnStateEnum::kPrepared);
+ const auto inProgressState = DurableTxnState_serializer(DurableTxnStateEnum::kInProgress);
+ auto transactionTableOpTimeFields = BSON(SessionTxnRecord::kStartOpTimeFieldName << 1);
+ auto earliestOpenTransactionBson = _client->findOne(
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ QUERY("state" << BSON("$in" << BSON_ARRAY(preparedState << inProgressState)))
+ .sort(SessionTxnRecord::kStartOpTimeFieldName.toString(), 1),
+ &transactionTableOpTimeFields,
+ QueryOption_SlaveOk,
+ ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner());
+ LOGV2_DEBUG(4880602,
+ 2,
+ "Transaction table entry for earliest transaction that was open at the read "
+ "concern majority optime on remote node (may be empty)",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = _stateDoc.getDatabasePrefix(),
+ "earliestOpenTransaction"_attr = earliestOpenTransactionBson);
+
+ pauseAfterRetrievingLastTxnMigrationRecipientInstance.pauseWhileSet();
+
+ // We need to fetch the last oplog entry both before and after getting the transaction
+ // table entry, as otherwise there is a potential race where we may try to apply
+ // a commit for which we have not fetched a previous transaction oplog entry.
+ auto lastOplogEntry2Bson =
+ _client->findOne(NamespaceString::kRsOplogNamespace.ns(),
+ Query().sort("$natural", -1),
+ &oplogOpTimeFields,
+ QueryOption_SlaveOk,
+ ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner());
+ uassert(4880603, "Found no entries in the remote oplog", !lastOplogEntry2Bson.isEmpty());
+ LOGV2_DEBUG(4880604,
+ 2,
+ "Found last oplog entry at the read concern majority optime (after reading txn "
+ "table) on remote node",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = _stateDoc.getDatabasePrefix(),
+ "lastOplogEntry"_attr = lastOplogEntry2Bson);
+ auto lastOplogEntry2OpTime = uassertStatusOK(OpTime::parseFromOplogEntry(lastOplogEntry2Bson));
+ _stateDoc.setStartApplyingOpTime(lastOplogEntry2OpTime);
+
+ OpTime startFetchingOpTime = lastOplogEntry1OpTime;
+ if (!earliestOpenTransactionBson.isEmpty()) {
+ auto startOpTimeField =
+ earliestOpenTransactionBson[SessionTxnRecord::kStartOpTimeFieldName];
+ if (startOpTimeField.isABSONObj()) {
+ startFetchingOpTime = OpTime::parse(startOpTimeField.Obj());
+ }
+ }
+ _stateDoc.setStartFetchingOpTime(startFetchingOpTime);
+}
+
namespace {
constexpr std::int32_t stopFailPointErrorCode = 4880402;
void stopOnFailPoint(FailPoint* fp) {
@@ -250,7 +329,15 @@ void TenantMigrationRecipientService::Instance::run(
"for garbage collect for migration uuid: "
<< getMigrationUUID(),
!_stateDoc.getGarbageCollect());
-
+ _getStartOpTimesFromDonor(lk);
+ auto opCtx = cc().makeOperationContext();
+ uassertStatusOK(
+ tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc));
+ return WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp());
+ })
+ .then([this] {
+ stopOnFailPoint(&stopAfterRetrievingStartOpTimesMigrationRecipientInstance);
// TODO SERVER-48808: Run cloners in MigrationServiceInstance
// TODO SERVER-48811: Oplog fetching in MigrationServiceInstance
})
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 8b483255d3f..2b3f1a0777b 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -130,6 +130,11 @@ public:
*/
SemiFuture<void> _createAndConnectClients();
+ /**
+ * Retrieves the start optimes from the donor and updates the in-memory state accordingly.
+ */
+ void _getStartOpTimesFromDonor(WithLock);
+
std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor;
// Protects below non-const data members.
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index 27e7d77f9c8..219635c1dd7 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -27,6 +27,7 @@
* it in the license file.
*/
+#include <boost/optional/optional_io.hpp>
#include <memory>
#include "mongo/client/connpool.h"
@@ -40,10 +41,12 @@
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/repl/primary_only_service_op_observer.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h"
#include "mongo/db/repl/tenant_migration_recipient_service.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/session_txn_record_gen.h"
#include "mongo/dbtests/mock/mock_conn_registry.h"
#include "mongo/dbtests/mock/mock_replica_set.h"
#include "mongo/executor/network_interface.h"
@@ -59,6 +62,34 @@
namespace mongo {
namespace repl {
+namespace {
+OplogEntry makeOplogEntry(OpTime opTime,
+ OpTypeEnum opType,
+ NamespaceString nss,
+ OptionalCollectionUUID uuid,
+ BSONObj o,
+ boost::optional<BSONObj> o2) {
+ return OplogEntry(opTime, // optime
+ boost::none, // hash
+ opType, // opType
+ nss, // namespace
+ uuid, // uuid
+ boost::none, // fromMigrate
+ OplogEntry::kOplogVersion, // version
+ o, // o
+ o2, // o2
+ {}, // sessionInfo
+ boost::none, // upsert
+ Date_t(), // wall clock time
+ boost::none, // statement id
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none, // post-image optime
+ boost::none); // ShardId of resharding recipient
+}
+
+} // namespace
+
class TenantMigrationRecipientServiceTest : public ServiceContextMongoDTest {
public:
void setUp() override {
@@ -137,6 +168,41 @@ protected:
PrimaryOnlyService* _service;
long long _term = 0;
+ void checkStateDocPersisted(const TenantMigrationRecipientService::Instance* instance) {
+ auto opCtx = cc().makeOperationContext();
+ auto memoryStateDoc = getStateDoc(instance);
+ auto persistedStateDocWithStatus =
+ tenantMigrationRecipientEntryHelpers::getStateDoc(opCtx.get(), memoryStateDoc.getId());
+ ASSERT_OK(persistedStateDocWithStatus.getStatus());
+ ASSERT_BSONOBJ_EQ(memoryStateDoc.toBSON(), persistedStateDocWithStatus.getValue().toBSON());
+ }
+ void insertToAllNodes(MockReplicaSet* replSet, const std::string& nss, BSONObj obj) {
+ for (const auto& host : replSet->getHosts()) {
+ replSet->getNode(host.toString())->insert(nss, obj);
+ }
+ }
+
+ void clearCollectionAllNodes(MockReplicaSet* replSet, const std::string& nss) {
+ for (const auto& host : replSet->getHosts()) {
+ replSet->getNode(host.toString())->remove(nss, Query());
+ }
+ }
+
+ void insertTopOfOplog(MockReplicaSet* replSet, const OpTime& topOfOplogOpTime) {
+ // The MockRemoteDBService does not actually implement the database, so to make our
+ // find work correctly we must make sure there's only one document to find.
+ clearCollectionAllNodes(replSet, NamespaceString::kRsOplogNamespace.ns());
+ insertToAllNodes(replSet,
+ NamespaceString::kRsOplogNamespace.ns(),
+ makeOplogEntry(topOfOplogOpTime,
+ OpTypeEnum::kNoop,
+ {} /* namespace */,
+ boost::none /* uuid */,
+ BSONObj() /* o */,
+ boost::none /* o2 */)
+ .toBSON());
+ }
+
// Accessors to class private members
DBClientConnection* getClient(const TenantMigrationRecipientService::Instance* instance) const {
return instance->_client.get();
@@ -147,6 +213,11 @@ protected:
return instance->_oplogFetcherClient.get();
}
+ const TenantMigrationRecipientDocument& getStateDoc(
+ const TenantMigrationRecipientService::Instance* instance) const {
+ return instance->_stateDoc;
+ }
+
private:
unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{
logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)};
@@ -398,5 +469,175 @@ TEST_F(TenantMigrationRecipientServiceTest,
ASSERT_EQUALS(ErrorCodes::FailedToParse, instance->getCompletionFuture().getNoThrow().code());
}
+TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTime_NoTransaction) {
+ FailPointEnableBlock fp("stopAfterRetrievingStartOpTimesMigrationRecipientInstance");
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Create and start the instance.
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+
+ // Wait for task completion success.
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingOpTime());
+ ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingOpTime());
+ checkStateDocPersisted(instance.get());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest,
+ TenantMigrationRecipientGetStartOpTime_Advances_NoTransaction) {
+ FailPointEnableBlock fp("stopAfterRetrievingStartOpTimesMigrationRecipientInstance");
+ auto pauseFailPoint =
+ globalFailPointRegistry().find("pauseAfterRetrievingLastTxnMigrationRecipientInstance");
+ auto timesEntered = pauseFailPoint->setMode(FailPoint::alwaysOn, 0);
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+ const OpTime newTopOfOplogOpTime(Timestamp(6, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Create and start the instance.
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+
+ pauseFailPoint->waitForTimesEntered(timesEntered + 1);
+ insertTopOfOplog(&replSet, newTopOfOplogOpTime);
+ pauseFailPoint->setMode(FailPoint::off, 0);
+
+ // Wait for task completion success.
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingOpTime());
+ ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingOpTime());
+ checkStateDocPersisted(instance.get());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTime_Transaction) {
+ FailPointEnableBlock fp("stopAfterRetrievingStartOpTimesMigrationRecipientInstance");
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime txnStartOpTime(Timestamp(3, 1), 1);
+ const OpTime txnLastWriteOpTime(Timestamp(4, 1), 1);
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+ SessionTxnRecord lastTxn(makeLogicalSessionIdForTest(), 100, txnLastWriteOpTime, Date_t());
+ lastTxn.setStartOpTime(txnStartOpTime);
+ lastTxn.setState(DurableTxnStateEnum::kInProgress);
+ insertToAllNodes(
+ &replSet, NamespaceString::kSessionTransactionsTableNamespace.ns(), lastTxn.toBSON());
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Create and start the instance.
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+
+ // Wait for task completion success.
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingOpTime());
+ ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingOpTime());
+ checkStateDocPersisted(instance.get());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest,
+ TenantMigrationRecipientGetStartOpTime_Advances_Transaction) {
+ FailPointEnableBlock fp("stopAfterRetrievingStartOpTimesMigrationRecipientInstance");
+ auto pauseFailPoint =
+ globalFailPointRegistry().find("pauseAfterRetrievingLastTxnMigrationRecipientInstance");
+ auto timesEntered = pauseFailPoint->setMode(FailPoint::alwaysOn, 0);
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime txnStartOpTime(Timestamp(3, 1), 1);
+ const OpTime txnLastWriteOpTime(Timestamp(4, 1), 1);
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+ const OpTime newTopOfOplogOpTime(Timestamp(6, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+ SessionTxnRecord lastTxn(makeLogicalSessionIdForTest(), 100, txnLastWriteOpTime, Date_t());
+ lastTxn.setStartOpTime(txnStartOpTime);
+ lastTxn.setState(DurableTxnStateEnum::kInProgress);
+ insertToAllNodes(
+ &replSet, NamespaceString::kSessionTransactionsTableNamespace.ns(), lastTxn.toBSON());
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Create and start the instance.
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+
+ pauseFailPoint->waitForTimesEntered(timesEntered + 1);
+ insertTopOfOplog(&replSet, newTopOfOplogOpTime);
+ pauseFailPoint->setMode(FailPoint::off, 0);
+
+ // Wait for task completion success.
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingOpTime());
+ ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingOpTime());
+ checkStateDocPersisted(instance.get());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest,
+ TenantMigrationRecipientGetStartOpTimes_RemoteOplogQueryFails) {
+ FailPointEnableBlock fp("stopAfterRetrievingStartOpTimesMigrationRecipientInstance");
+
+ const UUID migrationUUID = UUID::gen();
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Create and start the instance. Fail to populate the remote oplog mock.
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+
+ // Wait for task completion success.
+ ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow());
+
+ // Even though we failed, the memory state should still match the on-disk state.
+ checkStateDocPersisted(instance.get());
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index 8358a63c54e..d96d9c9fa66 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -57,6 +57,12 @@ env.Library(
'$BUILD_DIR/mongo/client/dbclient_mockcursor',
'$BUILD_DIR/mongo/db/repl/replica_set_messages'
],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/exec/projection_executor',
+ '$BUILD_DIR/mongo/db/pipeline/expression_context',
+ '$BUILD_DIR/mongo/db/query/projection_ast',
+ '$BUILD_DIR/mongo/db/query/query_test_service_context',
+ ],
)
if not has_option('noshell') and usemozjs:
diff --git a/src/mongo/dbtests/mock/mock_remote_db_server.cpp b/src/mongo/dbtests/mock/mock_remote_db_server.cpp
index 9e37c54c23e..d96c72a3799 100644
--- a/src/mongo/dbtests/mock/mock_remote_db_server.cpp
+++ b/src/mongo/dbtests/mock/mock_remote_db_server.cpp
@@ -34,6 +34,9 @@
#include <memory>
#include <tuple>
+#include "mongo/db/exec/projection_executor_builder.h"
+#include "mongo/db/pipeline/expression_context_for_test.h"
+#include "mongo/db/query/projection_parser.h"
#include "mongo/dbtests/mock/mock_dbclient_connection.h"
#include "mongo/rpc/metadata.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
@@ -174,6 +177,24 @@ rpc::UniqueReply MockRemoteDBServer::runCommand(InstanceID id, const OpMsgReques
return rpc::UniqueReply(std::move(message), std::move(replyView));
}
+std::unique_ptr<projection_executor::ProjectionExecutor>
+MockRemoteDBServer::createProjectionExecutor(const BSONObj& projectionSpec) {
+ const boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ ProjectionPolicies defaultPolicies;
+ auto projection = projection_ast::parse(expCtx, projectionSpec, defaultPolicies);
+ return projection_executor::buildProjectionExecutor(
+ expCtx, &projection, defaultPolicies, projection_executor::kDefaultBuilderParams);
+}
+
+BSONObj MockRemoteDBServer::project(projection_executor::ProjectionExecutor* projectionExecutor,
+ const BSONObj& o) {
+ if (!projectionExecutor)
+ return o.copy();
+ Document doc(o);
+ auto projectedDoc = projectionExecutor->applyTransformation(doc);
+ return projectedDoc.toBson().getOwned();
+}
+
mongo::BSONArray MockRemoteDBServer::query(MockRemoteDBServer::InstanceID id,
const NamespaceStringOrUUID& nsOrUuid,
mongo::Query query,
@@ -191,6 +212,10 @@ mongo::BSONArray MockRemoteDBServer::query(MockRemoteDBServer::InstanceID id,
checkIfUp(id);
+ std::unique_ptr<projection_executor::ProjectionExecutor> projectionExecutor;
+ if (fieldsToReturn) {
+ projectionExecutor = createProjectionExecutor(*fieldsToReturn);
+ }
scoped_spinlock sLock(_lock);
_queryCount++;
@@ -198,7 +223,7 @@ mongo::BSONArray MockRemoteDBServer::query(MockRemoteDBServer::InstanceID id,
const vector<BSONObj>& coll = _dataMgr[ns];
BSONArrayBuilder result;
for (vector<BSONObj>::const_iterator iter = coll.begin(); iter != coll.end(); ++iter) {
- result.append(iter->copy());
+ result.append(project(projectionExecutor.get(), *iter));
}
return BSONArray(result.obj());
diff --git a/src/mongo/dbtests/mock/mock_remote_db_server.h b/src/mongo/dbtests/mock/mock_remote_db_server.h
index e346923acf2..2f08f1e207d 100644
--- a/src/mongo/dbtests/mock/mock_remote_db_server.h
+++ b/src/mongo/dbtests/mock/mock_remote_db_server.h
@@ -40,6 +40,9 @@
#include "mongo/util/concurrency/spin_lock.h"
namespace mongo {
+namespace projection_executor {
+class ProjectionExecutor;
+} // namespace projection_executor
const std::string IdentityNS("local.me");
const BSONField<std::string> HostField("host");
@@ -219,6 +222,19 @@ private:
*/
void checkIfUp(InstanceID id) const;
+ /**
+ * Creates a ProjectionExecutor to handle fieldsToReturn.
+ */
+ std::unique_ptr<projection_executor::ProjectionExecutor> createProjectionExecutor(
+ const BSONObj& projectionSpec);
+
+ /**
+ * Projects the object, unless the projectionExecutor is null, in which case returns a
+ * copy of the object.
+ */
+ BSONObj project(projection_executor::ProjectionExecutor* projectionExecutor, const BSONObj& o);
+
+
typedef stdx::unordered_map<std::string, std::shared_ptr<CircularBSONIterator>> CmdToReplyObj;
typedef stdx::unordered_map<std::string, std::vector<BSONObj>> MockDataMgr;
typedef stdx::unordered_map<mongo::UUID, std::string, UUID::Hash> UUIDMap;