diff options
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp')
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 90 |
1 files changed, 81 insertions, 9 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 70a76e5e473..5d0db5dea0b 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -135,8 +135,14 @@ public: */ LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner, const BSONObj& idObj, - const char op) - : _cloner(cloner), _idObj(idObj.getOwned()), _op(op) {} + const char op, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) + : _cloner(cloner), + _idObj(idObj.getOwned()), + _op(op), + _oplogTs(oplogTs), + _prePostImageTs(prePostImageTs) {} void commit() override { switch (_op) { @@ -156,6 +162,14 @@ public: default: MONGO_UNREACHABLE; } + + if (!_prePostImageTs.isNull()) { + _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs); + } + + if (!_oplogTs.isNull()) { + _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs); + } } void rollback() override {} @@ -164,6 +178,8 @@ private: MigrationChunkClonerSourceLegacy* const _cloner; const BSONObj _idObj; const char _op; + const Timestamp _oplogTs; + const Timestamp _prePostImageTs; }; MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request, @@ -175,7 +191,8 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ _sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(), _args.getToShardId().toString())), _donorConnStr(std::move(donorConnStr)), - _recipientHost(std::move(recipientHost)) {} + _recipientHost(std::move(recipientHost)), + _sessionCatalogSource(_args.getNss()) {} MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(_state == kDone); @@ -192,6 +209,9 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { return storeCurrentLocsStatus; } + // Prime up the session migration source if there are oplog entries to migrate. + _sessionCatalogSource.fetchNextOplog(opCtx); + // Tell the recipient shard to start cloning BSONObjBuilder cmdBuilder; StartChunkCloneRequest::appendAsCommand(&cmdBuilder, @@ -314,6 +334,12 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) { _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId)); if (responseStatus.isOK()) { _cleanup(opCtx); + + if (_sessionCatalogSource.hasMoreOplog()) { + return {ErrorCodes::SessionTransferIncomplete, + "destination shard finished committing but there are still some session " + "metadata that needs to be transferred"}; + } return Status::OK(); } @@ -344,7 +370,8 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj& } void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, - const BSONObj& insertedDoc) { + const BSONObj& insertedDoc, + const Timestamp& oplogTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = insertedDoc["_id"]; @@ -358,11 +385,19 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i')); + if (opCtx->getTxnNumber()) { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {})); + } else { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'i', {}, {})); + } } void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, - const BSONObj& updatedDoc) { + const BSONObj& updatedDoc, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = updatedDoc["_id"]; @@ -376,11 +411,19 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u')); + if (opCtx->getTxnNumber()) { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs)); + } else { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'u', {}, {})); + } } void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, - const BSONObj& deletedDocId) { + const BSONObj& deletedDocId, + const Timestamp& oplogTs, + const Timestamp& preImageTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = deletedDocId["_id"]; @@ -390,7 +433,13 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd')); + if (opCtx->getTxnNumber()) { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs)); + } else { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'd', {}, {})); + } } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -679,4 +728,27 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, arr.done(); } +void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx, + BSONArrayBuilder* arrBuilder) { + while (_sessionCatalogSource.hasMoreOplog()) { + auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog(); + + if (oplogDoc.isEmpty()) { + // Last fetched turned out empty, try to see if there are more + _sessionCatalogSource.fetchNextOplog(opCtx); + continue; + } + + // Use the builder size instead of accumulating the document sizes directly so that we + // take into consideration the overhead of BSONArray indices. + if (arrBuilder->arrSize() && + (arrBuilder->len() + oplogDoc.objsize() + 1024) > BSONObjMaxUserSize) { + break; + } + + arrBuilder->append(oplogDoc); + _sessionCatalogSource.fetchNextOplog(opCtx); + } +} + } // namespace mongo |