From 25ff1dbbe2a1f7cce66ad7571212dace2956a455 Mon Sep 17 00:00:00 2001 From: Matthew Russotto Date: Tue, 22 Sep 2020 11:03:30 -0400 Subject: SERVER-48806 Obtain and persist donor optimes in MigrationServerInstance --- src/mongo/db/repl/SConscript | 2 + .../tenant_migration_recipient_entry_helpers.cpp | 24 ++ .../tenant_migration_recipient_entry_helpers.h | 8 + .../db/repl/tenant_migration_recipient_service.cpp | 89 +++++++- .../db/repl/tenant_migration_recipient_service.h | 5 + .../tenant_migration_recipient_service_test.cpp | 241 +++++++++++++++++++++ src/mongo/dbtests/SConscript | 6 + src/mongo/dbtests/mock/mock_remote_db_server.cpp | 27 ++- src/mongo/dbtests/mock/mock_remote_db_server.h | 16 ++ 9 files changed, 416 insertions(+), 2 deletions(-) (limited to 'src/mongo') diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d7adc53133e..de85cf9021e 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1291,6 +1291,8 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/client/clientdriver_network', + '$BUILD_DIR/mongo/db/transaction', + 'oplog_entry', 'tenant_migration_state_machine_idl', ] ) diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp index 69b181302f7..357d38ae466 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp @@ -39,6 +39,8 @@ #include "mongo/db/index_build_entry_helpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/ops/update.h" +#include "mongo/db/ops/update_request.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/util/str.h" @@ -101,6 +103,28 @@ Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDoc }); } +Status updateStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc) { + const auto nss = NamespaceString::kTenantMigrationRecipientsNamespace; + AutoGetCollection collection(opCtx, nss, MODE_IX); + + if (!collection) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << nss.ns() << " does not exist"); + } + auto updateReq = UpdateRequest(); + updateReq.setNamespaceString(nss); + updateReq.setQuery(BSON("_id" << stateDoc.getId())); + updateReq.setUpdateModification( + write_ops::UpdateModification::parseFromClassicUpdate(stateDoc.toBSON())); + auto updateResult = update(opCtx, collection.getDb(), updateReq); + if (updateResult.numMatched == 0) { + return {ErrorCodes::NoSuchKey, + str::stream() << "Existing Tenant Migration State Document not found for id: " + << stateDoc.getId()}; + } + return Status::OK(); +} + StatusWith getStateDoc(OperationContext* opCtx, const UUID& migrationUUID) { // Read the most up to date data. diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h index 7bd3ece2ab2..3a6d88beb61 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h +++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h @@ -49,6 +49,14 @@ namespace tenantMigrationRecipientEntryHelpers { */ Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc); +/** + * Updates the state doc in the database. + * + * Returns 'NoSuchKey' error code if no state document already exists on the disk with the same + * 'migrationUUID'. + */ +Status updateStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc); + /** * Returns the state doc matching the document with 'migrationUUID' from the disk if it * exists. Reads at "no" timestamp i.e, reading with the "latest" snapshot reflecting up to date diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index bdf3df5a086..560ad7ba86e 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -39,11 +39,14 @@ #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h" #include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/session_txn_record_gen.h" #include "mongo/db/write_concern_options.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -55,7 +58,9 @@ namespace repl { MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc); MONGO_FAIL_POINT_DEFINE(stopAfterPersistingTenantMigrationRecipientInstanceStateDoc); MONGO_FAIL_POINT_DEFINE(stopAfterConnectingTenantMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(stopAfterRetrievingStartOpTimesMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout); +MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance); TenantMigrationRecipientService::TenantMigrationRecipientService(ServiceContext* serviceContext) : PrimaryOnlyService(serviceContext) {} @@ -213,6 +218,80 @@ SharedSemiFuture TenantMigrationRecipientService::Instance::_initializeSta return WaitForMajorityService::get(opCtx->getServiceContext()).waitUntilMajority(insertOpTime); } +void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock) { + // Get the last oplog entry at the read concern majority optime in the remote oplog. It + // does not matter which tenant it is for. + auto oplogOpTimeFields = + BSON(OplogEntry::kTimestampFieldName << 1 << OplogEntry::kTermFieldName << 1); + auto lastOplogEntry1Bson = + _client->findOne(NamespaceString::kRsOplogNamespace.ns(), + Query().sort("$natural", -1), + &oplogOpTimeFields, + QueryOption_SlaveOk, + ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner()); + uassert(4880601, "Found no entries in the remote oplog", !lastOplogEntry1Bson.isEmpty()); + LOGV2_DEBUG(4880600, + 2, + "Found last oplog entry at read concern majority optime on remote node", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = _stateDoc.getDatabasePrefix(), + "lastOplogEntry"_attr = lastOplogEntry1Bson); + auto lastOplogEntry1OpTime = uassertStatusOK(OpTime::parseFromOplogEntry(lastOplogEntry1Bson)); + + // Get the optime of the earliest transaction that was open at the read concern majority optime + // As with the last oplog entry, it does not matter that this may be for a different tenant; an + // optime that is too early does not result in incorrect behavior. + const auto preparedState = DurableTxnState_serializer(DurableTxnStateEnum::kPrepared); + const auto inProgressState = DurableTxnState_serializer(DurableTxnStateEnum::kInProgress); + auto transactionTableOpTimeFields = BSON(SessionTxnRecord::kStartOpTimeFieldName << 1); + auto earliestOpenTransactionBson = _client->findOne( + NamespaceString::kSessionTransactionsTableNamespace.ns(), + QUERY("state" << BSON("$in" << BSON_ARRAY(preparedState << inProgressState))) + .sort(SessionTxnRecord::kStartOpTimeFieldName.toString(), 1), + &transactionTableOpTimeFields, + QueryOption_SlaveOk, + ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner()); + LOGV2_DEBUG(4880602, + 2, + "Transaction table entry for earliest transaction that was open at the read " + "concern majority optime on remote node (may be empty)", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = _stateDoc.getDatabasePrefix(), + "earliestOpenTransaction"_attr = earliestOpenTransactionBson); + + pauseAfterRetrievingLastTxnMigrationRecipientInstance.pauseWhileSet(); + + // We need to fetch the last oplog entry both before and after getting the transaction + // table entry, as otherwise there is a potential race where we may try to apply + // a commit for which we have not fetched a previous transaction oplog entry. + auto lastOplogEntry2Bson = + _client->findOne(NamespaceString::kRsOplogNamespace.ns(), + Query().sort("$natural", -1), + &oplogOpTimeFields, + QueryOption_SlaveOk, + ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner()); + uassert(4880603, "Found no entries in the remote oplog", !lastOplogEntry2Bson.isEmpty()); + LOGV2_DEBUG(4880604, + 2, + "Found last oplog entry at the read concern majority optime (after reading txn " + "table) on remote node", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = _stateDoc.getDatabasePrefix(), + "lastOplogEntry"_attr = lastOplogEntry2Bson); + auto lastOplogEntry2OpTime = uassertStatusOK(OpTime::parseFromOplogEntry(lastOplogEntry2Bson)); + _stateDoc.setStartApplyingOpTime(lastOplogEntry2OpTime); + + OpTime startFetchingOpTime = lastOplogEntry1OpTime; + if (!earliestOpenTransactionBson.isEmpty()) { + auto startOpTimeField = + earliestOpenTransactionBson[SessionTxnRecord::kStartOpTimeFieldName]; + if (startOpTimeField.isABSONObj()) { + startFetchingOpTime = OpTime::parse(startOpTimeField.Obj()); + } + } + _stateDoc.setStartFetchingOpTime(startFetchingOpTime); +} + namespace { constexpr std::int32_t stopFailPointErrorCode = 4880402; void stopOnFailPoint(FailPoint* fp) { @@ -250,7 +329,15 @@ void TenantMigrationRecipientService::Instance::run( "for garbage collect for migration uuid: " << getMigrationUUID(), !_stateDoc.getGarbageCollect()); - + _getStartOpTimesFromDonor(lk); + auto opCtx = cc().makeOperationContext(); + uassertStatusOK( + tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc)); + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp()); + }) + .then([this] { + stopOnFailPoint(&stopAfterRetrievingStartOpTimesMigrationRecipientInstance); // TODO SERVER-48808: Run cloners in MigrationServiceInstance // TODO SERVER-48811: Oplog fetching in MigrationServiceInstance }) diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 8b483255d3f..2b3f1a0777b 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -130,6 +130,11 @@ public: */ SemiFuture _createAndConnectClients(); + /** + * Retrieves the start optimes from the donor and updates the in-memory state accordingly. + */ + void _getStartOpTimesFromDonor(WithLock); + std::shared_ptr _scopedExecutor; // Protects below non-const data members. diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index 27e7d77f9c8..219635c1dd7 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -27,6 +27,7 @@ * it in the license file. */ +#include #include #include "mongo/client/connpool.h" @@ -40,10 +41,12 @@ #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/repl/primary_only_service_op_observer.h" #include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h" #include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/session_txn_record_gen.h" #include "mongo/dbtests/mock/mock_conn_registry.h" #include "mongo/dbtests/mock/mock_replica_set.h" #include "mongo/executor/network_interface.h" @@ -59,6 +62,34 @@ namespace mongo { namespace repl { +namespace { +OplogEntry makeOplogEntry(OpTime opTime, + OpTypeEnum opType, + NamespaceString nss, + OptionalCollectionUUID uuid, + BSONObj o, + boost::optional o2) { + return OplogEntry(opTime, // optime + boost::none, // hash + opType, // opType + nss, // namespace + uuid, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + o, // o + o2, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none); // ShardId of resharding recipient +} + +} // namespace + class TenantMigrationRecipientServiceTest : public ServiceContextMongoDTest { public: void setUp() override { @@ -137,6 +168,41 @@ protected: PrimaryOnlyService* _service; long long _term = 0; + void checkStateDocPersisted(const TenantMigrationRecipientService::Instance* instance) { + auto opCtx = cc().makeOperationContext(); + auto memoryStateDoc = getStateDoc(instance); + auto persistedStateDocWithStatus = + tenantMigrationRecipientEntryHelpers::getStateDoc(opCtx.get(), memoryStateDoc.getId()); + ASSERT_OK(persistedStateDocWithStatus.getStatus()); + ASSERT_BSONOBJ_EQ(memoryStateDoc.toBSON(), persistedStateDocWithStatus.getValue().toBSON()); + } + void insertToAllNodes(MockReplicaSet* replSet, const std::string& nss, BSONObj obj) { + for (const auto& host : replSet->getHosts()) { + replSet->getNode(host.toString())->insert(nss, obj); + } + } + + void clearCollectionAllNodes(MockReplicaSet* replSet, const std::string& nss) { + for (const auto& host : replSet->getHosts()) { + replSet->getNode(host.toString())->remove(nss, Query()); + } + } + + void insertTopOfOplog(MockReplicaSet* replSet, const OpTime& topOfOplogOpTime) { + // The MockRemoteDBService does not actually implement the database, so to make our + // find work correctly we must make sure there's only one document to find. + clearCollectionAllNodes(replSet, NamespaceString::kRsOplogNamespace.ns()); + insertToAllNodes(replSet, + NamespaceString::kRsOplogNamespace.ns(), + makeOplogEntry(topOfOplogOpTime, + OpTypeEnum::kNoop, + {} /* namespace */, + boost::none /* uuid */, + BSONObj() /* o */, + boost::none /* o2 */) + .toBSON()); + } + // Accessors to class private members DBClientConnection* getClient(const TenantMigrationRecipientService::Instance* instance) const { return instance->_client.get(); @@ -147,6 +213,11 @@ protected: return instance->_oplogFetcherClient.get(); } + const TenantMigrationRecipientDocument& getStateDoc( + const TenantMigrationRecipientService::Instance* instance) const { + return instance->_stateDoc; + } + private: unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{ logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)}; @@ -398,5 +469,175 @@ TEST_F(TenantMigrationRecipientServiceTest, ASSERT_EQUALS(ErrorCodes::FailedToParse, instance->getCompletionFuture().getNoThrow().code()); } +TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTime_NoTransaction) { + FailPointEnableBlock fp("stopAfterRetrievingStartOpTimesMigrationRecipientInstance"); + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Create and start the instance. + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + // Wait for task completion success. + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingOpTime()); + ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingOpTime()); + checkStateDocPersisted(instance.get()); +} + +TEST_F(TenantMigrationRecipientServiceTest, + TenantMigrationRecipientGetStartOpTime_Advances_NoTransaction) { + FailPointEnableBlock fp("stopAfterRetrievingStartOpTimesMigrationRecipientInstance"); + auto pauseFailPoint = + globalFailPointRegistry().find("pauseAfterRetrievingLastTxnMigrationRecipientInstance"); + auto timesEntered = pauseFailPoint->setMode(FailPoint::alwaysOn, 0); + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + const OpTime newTopOfOplogOpTime(Timestamp(6, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Create and start the instance. + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + pauseFailPoint->waitForTimesEntered(timesEntered + 1); + insertTopOfOplog(&replSet, newTopOfOplogOpTime); + pauseFailPoint->setMode(FailPoint::off, 0); + + // Wait for task completion success. + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingOpTime()); + ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingOpTime()); + checkStateDocPersisted(instance.get()); +} + +TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTime_Transaction) { + FailPointEnableBlock fp("stopAfterRetrievingStartOpTimesMigrationRecipientInstance"); + + const UUID migrationUUID = UUID::gen(); + const OpTime txnStartOpTime(Timestamp(3, 1), 1); + const OpTime txnLastWriteOpTime(Timestamp(4, 1), 1); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + SessionTxnRecord lastTxn(makeLogicalSessionIdForTest(), 100, txnLastWriteOpTime, Date_t()); + lastTxn.setStartOpTime(txnStartOpTime); + lastTxn.setState(DurableTxnStateEnum::kInProgress); + insertToAllNodes( + &replSet, NamespaceString::kSessionTransactionsTableNamespace.ns(), lastTxn.toBSON()); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Create and start the instance. + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + // Wait for task completion success. + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingOpTime()); + ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingOpTime()); + checkStateDocPersisted(instance.get()); +} + +TEST_F(TenantMigrationRecipientServiceTest, + TenantMigrationRecipientGetStartOpTime_Advances_Transaction) { + FailPointEnableBlock fp("stopAfterRetrievingStartOpTimesMigrationRecipientInstance"); + auto pauseFailPoint = + globalFailPointRegistry().find("pauseAfterRetrievingLastTxnMigrationRecipientInstance"); + auto timesEntered = pauseFailPoint->setMode(FailPoint::alwaysOn, 0); + + const UUID migrationUUID = UUID::gen(); + const OpTime txnStartOpTime(Timestamp(3, 1), 1); + const OpTime txnLastWriteOpTime(Timestamp(4, 1), 1); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + const OpTime newTopOfOplogOpTime(Timestamp(6, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + SessionTxnRecord lastTxn(makeLogicalSessionIdForTest(), 100, txnLastWriteOpTime, Date_t()); + lastTxn.setStartOpTime(txnStartOpTime); + lastTxn.setState(DurableTxnStateEnum::kInProgress); + insertToAllNodes( + &replSet, NamespaceString::kSessionTransactionsTableNamespace.ns(), lastTxn.toBSON()); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Create and start the instance. + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + pauseFailPoint->waitForTimesEntered(timesEntered + 1); + insertTopOfOplog(&replSet, newTopOfOplogOpTime); + pauseFailPoint->setMode(FailPoint::off, 0); + + // Wait for task completion success. + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingOpTime()); + ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingOpTime()); + checkStateDocPersisted(instance.get()); +} + +TEST_F(TenantMigrationRecipientServiceTest, + TenantMigrationRecipientGetStartOpTimes_RemoteOplogQueryFails) { + FailPointEnableBlock fp("stopAfterRetrievingStartOpTimesMigrationRecipientInstance"); + + const UUID migrationUUID = UUID::gen(); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Create and start the instance. Fail to populate the remote oplog mock. + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + // Wait for task completion success. + ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow()); + + // Even though we failed, the memory state should still match the on-disk state. + checkStateDocPersisted(instance.get()); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index 8358a63c54e..d96d9c9fa66 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -57,6 +57,12 @@ env.Library( '$BUILD_DIR/mongo/client/dbclient_mockcursor', '$BUILD_DIR/mongo/db/repl/replica_set_messages' ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/exec/projection_executor', + '$BUILD_DIR/mongo/db/pipeline/expression_context', + '$BUILD_DIR/mongo/db/query/projection_ast', + '$BUILD_DIR/mongo/db/query/query_test_service_context', + ], ) if not has_option('noshell') and usemozjs: diff --git a/src/mongo/dbtests/mock/mock_remote_db_server.cpp b/src/mongo/dbtests/mock/mock_remote_db_server.cpp index 9e37c54c23e..d96c72a3799 100644 --- a/src/mongo/dbtests/mock/mock_remote_db_server.cpp +++ b/src/mongo/dbtests/mock/mock_remote_db_server.cpp @@ -34,6 +34,9 @@ #include #include +#include "mongo/db/exec/projection_executor_builder.h" +#include "mongo/db/pipeline/expression_context_for_test.h" +#include "mongo/db/query/projection_parser.h" #include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/op_msg_rpc_impls.h" @@ -174,6 +177,24 @@ rpc::UniqueReply MockRemoteDBServer::runCommand(InstanceID id, const OpMsgReques return rpc::UniqueReply(std::move(message), std::move(replyView)); } +std::unique_ptr +MockRemoteDBServer::createProjectionExecutor(const BSONObj& projectionSpec) { + const boost::intrusive_ptr expCtx(new ExpressionContextForTest()); + ProjectionPolicies defaultPolicies; + auto projection = projection_ast::parse(expCtx, projectionSpec, defaultPolicies); + return projection_executor::buildProjectionExecutor( + expCtx, &projection, defaultPolicies, projection_executor::kDefaultBuilderParams); +} + +BSONObj MockRemoteDBServer::project(projection_executor::ProjectionExecutor* projectionExecutor, + const BSONObj& o) { + if (!projectionExecutor) + return o.copy(); + Document doc(o); + auto projectedDoc = projectionExecutor->applyTransformation(doc); + return projectedDoc.toBson().getOwned(); +} + mongo::BSONArray MockRemoteDBServer::query(MockRemoteDBServer::InstanceID id, const NamespaceStringOrUUID& nsOrUuid, mongo::Query query, @@ -191,6 +212,10 @@ mongo::BSONArray MockRemoteDBServer::query(MockRemoteDBServer::InstanceID id, checkIfUp(id); + std::unique_ptr projectionExecutor; + if (fieldsToReturn) { + projectionExecutor = createProjectionExecutor(*fieldsToReturn); + } scoped_spinlock sLock(_lock); _queryCount++; @@ -198,7 +223,7 @@ mongo::BSONArray MockRemoteDBServer::query(MockRemoteDBServer::InstanceID id, const vector& coll = _dataMgr[ns]; BSONArrayBuilder result; for (vector::const_iterator iter = coll.begin(); iter != coll.end(); ++iter) { - result.append(iter->copy()); + result.append(project(projectionExecutor.get(), *iter)); } return BSONArray(result.obj()); diff --git a/src/mongo/dbtests/mock/mock_remote_db_server.h b/src/mongo/dbtests/mock/mock_remote_db_server.h index e346923acf2..2f08f1e207d 100644 --- a/src/mongo/dbtests/mock/mock_remote_db_server.h +++ b/src/mongo/dbtests/mock/mock_remote_db_server.h @@ -40,6 +40,9 @@ #include "mongo/util/concurrency/spin_lock.h" namespace mongo { +namespace projection_executor { +class ProjectionExecutor; +} // namespace projection_executor const std::string IdentityNS("local.me"); const BSONField HostField("host"); @@ -219,6 +222,19 @@ private: */ void checkIfUp(InstanceID id) const; + /** + * Creates a ProjectionExecutor to handle fieldsToReturn. + */ + std::unique_ptr createProjectionExecutor( + const BSONObj& projectionSpec); + + /** + * Projects the object, unless the projectionExecutor is null, in which case returns a + * copy of the object. + */ + BSONObj project(projection_executor::ProjectionExecutor* projectionExecutor, const BSONObj& o); + + typedef stdx::unordered_map> CmdToReplyObj; typedef stdx::unordered_map> MockDataMgr; typedef stdx::unordered_map UUIDMap; -- cgit v1.2.1