diff options
author | Matthew Russotto <matthew.russotto@mongodb.com> | 2020-09-16 09:53:31 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-29 14:59:45 +0000 |
commit | 3237a1428c6b07cb8019bbb8d47301a680667033 (patch) | |
tree | d8301dfbe7e505f00013a55a11e2889119807a93 | |
parent | cd14c4aad38bae7fa0555f13ee3e9538d850d719 (diff) | |
download | mongo-3237a1428c6b07cb8019bbb8d47301a680667033.tar.gz |
SERVER-48808 Oplog Fetching in MigrationServiceInstance
-rw-r--r-- | src/mongo/db/repl/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/cloner_utils.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/cloner_utils.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_server_parameters.idl | 21 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 166 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.h | 45 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 41 |
9 files changed, 312 insertions, 10 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index bff884e7844..cd279da1fbf 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1293,7 +1293,11 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/client/clientdriver_network', '$BUILD_DIR/mongo/db/transaction', + 'cloner_utils', + 'oplog_buffer_collection', 'oplog_entry', + 'oplog_fetcher', + 'repl_server_parameters', 'tenant_migration_state_machine_idl', ] ) diff --git a/src/mongo/db/repl/cloner_utils.cpp b/src/mongo/db/repl/cloner_utils.cpp index ab63050d669..7f061a4f1ff 100644 --- a/src/mongo/db/repl/cloner_utils.cpp +++ b/src/mongo/db/repl/cloner_utils.cpp @@ -37,9 +37,13 @@ namespace mongo { namespace repl { +BSONObj ClonerUtils::makeTenantDatabaseRegex(StringData prefix) { + return BSON("$regexp" + << "^" + prefix + "_"); +} + BSONObj ClonerUtils::makeTenantDatabaseFilter(StringData prefix) { - return BSON("name" << BSON("$regexp" - << "^" + prefix + "_")); + return BSON("name" << makeTenantDatabaseRegex(prefix)); } BSONObj ClonerUtils::buildMajorityWaitRequest(Timestamp operationTime) { diff --git a/src/mongo/db/repl/cloner_utils.h b/src/mongo/db/repl/cloner_utils.h index b9acd75b2ee..8623b2fdd1f 100644 --- a/src/mongo/db/repl/cloner_utils.h +++ b/src/mongo/db/repl/cloner_utils.h @@ -47,6 +47,11 @@ class ClonerUtils { public: /** + * Builds a regex that matches database names prefixed with a specific tenantId. + */ + static BSONObj makeTenantDatabaseRegex(StringData prefix); + + /** * Builds a filter that matches database names prefixed with a specific tenantId. */ static BSONObj makeTenantDatabaseFilter(StringData prefix); @@ -64,4 +69,4 @@ public: } // namespace repl -} // namespace mongo
\ No newline at end of file +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index e9ec434ffd4..d9f3db55ae7 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -216,6 +216,12 @@ OplogFetcher::~OplogFetcher() { join(); } +void OplogFetcher::setConnection(std::unique_ptr<DBClientConnection>&& _connectedClient) { + // Can only call this once, before startup. + invariant(!_conn); + _conn = std::move(_connectedClient); +} + Status OplogFetcher::_doStartup_inlock() noexcept { return _scheduleWorkAndSaveHandle_inlock( [this](const executor::TaskExecutor::CallbackArgs& args) { @@ -345,18 +351,25 @@ void OplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& callbac return; } + bool hadExistingConnection = true; { stdx::lock_guard<Latch> lock(_mutex); - _conn = _createClientFn(); + if (!_conn) { + _conn = _createClientFn(); + hadExistingConnection = false; + } } hangAfterOplogFetcherCallbackScheduled.pauseWhileSet(); - auto connectStatus = _connect(); - // Error out if we failed to connect after exhausting the allowed retry attempts. - if (!connectStatus.isOK()) { - _finishCallback(connectStatus); - return; + if (!hadExistingConnection) { + auto connectStatus = _connect(); + + // Error out if we failed to connect after exhausting the allowed retry attempts. + if (!connectStatus.isOK()) { + _finishCallback(connectStatus); + return; + } } _setMetadataWriterAndReader(); diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index f8f80c34d76..459b2c42fe2 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -196,6 +196,11 @@ public: Timestamp lastTS, StartingPoint startingPoint = StartingPoint::kSkipFirstDoc); + /** + * Allows the OplogFetcher to use an already-established connection from the caller. Ownership + * of the connection is taken by the OplogFetcher. Must be called before startup. + */ + void setConnection(std::unique_ptr<DBClientConnection>&& _connectedClient); /** * Prints out the status and settings of the oplog fetcher. diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 4153ad4ef0e..289bf286c9a 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -342,3 +342,24 @@ server_parameters: cpp_varname: tenantMigrationGarbageCollectionDelayMS default: expr: 48 * 60 * 60 * 1000 + + tenantMigrationOplogBufferPeekCacheSize: + description: >- + Set this to specify size of read ahead buffer in the OplogBufferCollection for tenant + migrations. + set_at: startup + cpp_vartype: int + cpp_varname: tenantMigrationOplogBufferPeekCacheSize + default: 10000 + + tenantMigrationOplogFetcherBatchSize: + description: >- + The batchSize to use for the find/getMore queries called by the OplogFetcher for + tenant migrations. + set_at: startup + cpp_vartype: int + cpp_varname: tenantMigrationOplogFetcherBatchSize + # 16MB max batch size / 12 byte min doc size * 10 (for good measure) = + # defaultBatchSize to use. + default: + expr: (16 * 1024 * 1024) / 12 * 10 diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 560ad7ba86e..ba7916eeda2 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -39,9 +39,13 @@ #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/cloner_utils.h" +#include "mongo/db/repl/data_replicator_external_state.h" +#include "mongo/db/repl/oplog_buffer_collection.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h" #include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" @@ -53,15 +57,89 @@ namespace mongo { namespace repl { +namespace { +constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd; +} // namespace + +// A convenient place to set test-specific parameters. +MONGO_FAIL_POINT_DEFINE(pauseBeforeRunTenantMigrationRecipientInstance); // Fails before waiting for the state doc to be majority replicated. MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc); MONGO_FAIL_POINT_DEFINE(stopAfterPersistingTenantMigrationRecipientInstanceStateDoc); MONGO_FAIL_POINT_DEFINE(stopAfterConnectingTenantMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(stopAfterRetrievingStartOpTimesMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(stopAfterStartingOplogFetcherMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout); MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance); +namespace { +// We never restart just the oplog fetcher. If a failure occurs, we restart the whole state machine +// and recover from there. So the restart decision is always "no". +class OplogFetcherRestartDecisionTenantMigration + : public OplogFetcher::OplogFetcherRestartDecision { +public: + ~OplogFetcherRestartDecisionTenantMigration(){}; + bool shouldContinue(OplogFetcher* fetcher, Status status) final { + return false; + } + void fetchSuccessful(OplogFetcher* fetcher) final {} +}; + +// The oplog fetcher requires some of the methods in DataReplicatorExternalState to operate. +class DataReplicatorExternalStateTenantMigration : public DataReplicatorExternalState { +public: + // The oplog fetcher is passed its executor directly and does not use the one from the + // DataReplicatorExternalState. + executor::TaskExecutor* getTaskExecutor() const final { + MONGO_UNREACHABLE; + } + std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const final { + MONGO_UNREACHABLE; + } + + // The oplog fetcher uses the current term and opTime to inform the sync source of term changes. + // As the term on the donor and the term on the recipient have nothing to do with each other, + // we do not want to do that. + OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() final { + return {OpTime::kUninitializedTerm, OpTime()}; + } + + // Tenant migration does not require the metadata from the oplog query. + void processMetadata(const rpc::ReplSetMetadata& replMetadata, + rpc::OplogQueryMetadata oqMetadata) final {} + + // Tenant migration does not change sync source depending on metadata. + ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) final { + return ChangeSyncSourceAction::kContinueSyncing; + } + + // The oplog fetcher should never call the rest of the methods. + std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const final { + MONGO_UNREACHABLE; + } + + std::unique_ptr<OplogApplier> makeOplogApplier( + OplogBuffer* oplogBuffer, + OplogApplier::Observer* observer, + ReplicationConsistencyMarkers* consistencyMarkers, + StorageInterface* storageInterface, + const OplogApplier::Options& options, + ThreadPool* writerPool) final { + MONGO_UNREACHABLE; + }; + + virtual StatusWith<ReplSetConfig> getCurrentConfig() const final { + MONGO_UNREACHABLE; + } +}; + + +} // namespace TenantMigrationRecipientService::TenantMigrationRecipientService(ServiceContext* serviceContext) : PrimaryOnlyService(serviceContext) {} @@ -292,6 +370,72 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo _stateDoc.setStartFetchingOpTime(startFetchingOpTime); } +void TenantMigrationRecipientService::Instance::_startOplogFetcher() { + auto opCtx = cc().makeOperationContext(); + OplogBufferCollection::Options options; + options.peekCacheSize = static_cast<size_t>(tenantMigrationOplogBufferPeekCacheSize); + options.dropCollectionAtStartup = false; + options.dropCollectionAtShutdown = false; + NamespaceString oplogBufferNs(NamespaceString::kConfigDb, + kOplogBufferPrefix + getMigrationUUID().toString()); + stdx::lock_guard lk(_mutex); + invariant(_stateDoc.getStartFetchingOpTime()); + _donorOplogBuffer = std::make_unique<OplogBufferCollection>( + StorageInterface::get(opCtx.get()), oplogBufferNs, options); + _dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>(); + _donorOplogFetcher = (*_createOplogFetcherFn)( + (**_scopedExecutor).get(), + *_stateDoc.getStartFetchingOpTime(), + _oplogFetcherClient->getServerHostAndPort(), + // The config is only used for setting the awaitData timeout; the defaults are fine. + ReplSetConfig::parse(BSON("_id" + << "dummy" + << "version" << 1 << "members" << BSONObj())), + std::make_unique<OplogFetcherRestartDecisionTenantMigration>(), + // We do not need to check the rollback ID. + ReplicationProcess::kUninitializedRollbackId, + false /* requireFresherSyncSource */, + _dataReplicatorExternalState.get(), + [this](OplogFetcher::Documents::const_iterator first, + OplogFetcher::Documents::const_iterator last, + const OplogFetcher::DocumentsInfo& info) { + return _enqueueDocuments(first, last, info); + }, + [this](const Status& s, int rbid) { _oplogFetcherCallback(s); }, + tenantMigrationOplogFetcherBatchSize, + OplogFetcher::StartingPoint::kEnqueueFirstDoc, + _getOplogFetcherFilter(), + ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern), + "TenantOplogFetcher_" + getTenantId() + "_" + getMigrationUUID().toString()); + _donorOplogFetcher->setConnection(std::move(_oplogFetcherClient)); + uassertStatusOK(_donorOplogFetcher->startup()); +} + +Status TenantMigrationRecipientService::Instance::_enqueueDocuments( + OplogFetcher::Documents::const_iterator begin, + OplogFetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info) { + + invariant(_donorOplogBuffer); + + if (info.toApplyDocumentCount == 0) + return Status::OK(); + + auto opCtx = cc().makeOperationContext(); + // Wait for enough space. + _donorOplogBuffer->waitForSpace(opCtx.get(), info.toApplyDocumentBytes); + + // Buffer docs for later application. + _donorOplogBuffer->push(opCtx.get(), begin, end); + + return Status::OK(); +} + +void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status oplogFetcherStatus) { + // TODO(SERVER-48812): Abort the migration unless the error is CallbackCanceled and + // the migration has finished. +} + namespace { constexpr std::int32_t stopFailPointErrorCode = 4880402; void stopOnFailPoint(FailPoint* fp) { @@ -309,9 +453,25 @@ void TenantMigrationRecipientService::Instance::interrupt(Status status) { } } +BSONObj TenantMigrationRecipientService::Instance::_getOplogFetcherFilter() const { + // Either the namespace belongs to the tenant, or it's an applyOps in the admin namespace + // and the first operation belongs to the tenant. A transaction with mixed tenant/non-tenant + // operations should not be possible and will fail in the TenantOplogApplier. + // + // Commit of prepared transactions is not handled here; we'd need to handle them in the applier + // by allowing all commits through here and ignoring those not corresponding to active + // transactions. + BSONObj namespaceRegex = ClonerUtils::makeTenantDatabaseRegex(getTenantId()); + return BSON("$or" << BSON_ARRAY(BSON("ns" << namespaceRegex) + << BSON("ns" + << "admin.$cmd" + << "o.applyOps.0.ns" << namespaceRegex))); +} + void TenantMigrationRecipientService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { _scopedExecutor = executor; + pauseBeforeRunTenantMigrationRecipientInstance.pauseWhileSet(); ExecutorFuture(**executor) .then([this]() { return _initializeStateDoc(); }) .then([this] { @@ -338,8 +498,12 @@ void TenantMigrationRecipientService::Instance::run( }) .then([this] { stopOnFailPoint(&stopAfterRetrievingStartOpTimesMigrationRecipientInstance); + _startOplogFetcher(); + }) + .then([this] { + stopOnFailPoint(&stopAfterStartingOplogFetcherMigrationRecipientInstance); // TODO SERVER-48808: Run cloners in MigrationServiceInstance - // TODO SERVER-48811: Oplog fetching in MigrationServiceInstance + // TODO SERVER-48811: Oplog application in MigrationServiceInstance }) .getAsync([this](Status status) { LOGV2(4878501, diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 2b3f1a0777b..debea0f5ba5 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -32,8 +32,10 @@ #include <boost/optional.hpp> #include <memory> +#include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" namespace mongo { @@ -43,6 +45,7 @@ class ReplicaSetMonitor; class ServiceContext; namespace repl { +class OplogBufferCollection; /** * TenantMigrationRecipientService is a primary only service to handle @@ -104,6 +107,14 @@ public: */ const std::string& getTenantId() const; + /* + * Set the oplog creator functor, to allow use of a mock oplog fetcher. + */ + void setCreateOplogFetcherFn_forTest( + std::unique_ptr<OplogFetcherFactory>&& createOplogFetcherFn) { + _createOplogFetcherFn = std::move(createOplogFetcherFn); + } + private: friend class TenantMigrationRecipientServiceTest; @@ -135,6 +146,33 @@ public: */ void _getStartOpTimesFromDonor(WithLock); + /** + * Pushes documents from oplog fetcher to oplog buffer. + * + * Returns a status even though it always returns OK, to conform the interface OplogFetcher + * expects for the EnqueueDocumentsFn. + */ + Status _enqueueDocuments(OplogFetcher::Documents::const_iterator begin, + OplogFetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info); + + /** + * Starts the tenant oplog fetcher. + */ + + void _startOplogFetcher(); + + /** + * Called when the oplog fetcher finishes. Usually the oplog fetcher finishes only when + * cancelled or on error. + */ + void _oplogFetcherCallback(Status oplogFetcherStatus); + + /** + * Returns the filter used to get only oplog documents related to the appropriate tenant. + */ + BSONObj _getOplogFetcherFilter() const; + std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor; // Protects below non-const data members. @@ -160,6 +198,13 @@ public: // optimes while the '_oplogFetcherClient' will be reserved for the oplog fetcher only. std::unique_ptr<DBClientConnection> _client; std::unique_ptr<DBClientConnection> _oplogFetcherClient; + + + std::unique_ptr<OplogFetcherFactory> _createOplogFetcherFn = + std::make_unique<CreateOplogFetcherFn>(); + std::unique_ptr<OplogBufferCollection> _donorOplogBuffer; + std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; + std::unique_ptr<OplogFetcher> _donorOplogFetcher; }; }; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index f3f1b517e2b..e83e7549258 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -38,6 +38,7 @@ #include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_fetcher_mock.h" #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/repl/primary_only_service_op_observer.h" #include "mongo/db/repl/replication_coordinator_mock.h" @@ -213,6 +214,11 @@ protected: return instance->_oplogFetcherClient.get(); } + OplogFetcher* getDonorOplogFetcher( + const TenantMigrationRecipientService::Instance* instance) const { + return instance->_donorOplogFetcher.get(); + } + const TenantMigrationRecipientDocument& getStateDoc( const TenantMigrationRecipientService::Instance* instance) const { return instance->_stateDoc; @@ -652,5 +658,40 @@ TEST_F(TenantMigrationRecipientServiceTest, checkStateDocPersisted(opCtx.get(), instance.get()); } +TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartOplogFetcher) { + FailPointEnableBlock fp("stopAfterStartingOplogFetcherMigrationRecipientInstance"); + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + // Wait for task completion success. + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + checkStateDocPersisted(opCtx.get(), instance.get()); + // The oplog fetcher should exist and be running. + auto oplogFetcher = getDonorOplogFetcher(instance.get()); + ASSERT_TRUE(oplogFetcher != nullptr); + ASSERT_TRUE(oplogFetcher->isActive()); +} + } // namespace repl } // namespace mongo |