summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp90
1 files changed, 81 insertions, 9 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 70a76e5e473..5d0db5dea0b 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -135,8 +135,14 @@ public:
*/
LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner,
const BSONObj& idObj,
- const char op)
- : _cloner(cloner), _idObj(idObj.getOwned()), _op(op) {}
+ const char op,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs)
+ : _cloner(cloner),
+ _idObj(idObj.getOwned()),
+ _op(op),
+ _oplogTs(oplogTs),
+ _prePostImageTs(prePostImageTs) {}
void commit() override {
switch (_op) {
@@ -156,6 +162,14 @@ public:
default:
MONGO_UNREACHABLE;
}
+
+ if (!_prePostImageTs.isNull()) {
+ _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs);
+ }
+
+ if (!_oplogTs.isNull()) {
+ _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs);
+ }
}
void rollback() override {}
@@ -164,6 +178,8 @@ private:
MigrationChunkClonerSourceLegacy* const _cloner;
const BSONObj _idObj;
const char _op;
+ const Timestamp _oplogTs;
+ const Timestamp _prePostImageTs;
};
MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request,
@@ -175,7 +191,8 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ
_sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(),
_args.getToShardId().toString())),
_donorConnStr(std::move(donorConnStr)),
- _recipientHost(std::move(recipientHost)) {}
+ _recipientHost(std::move(recipientHost)),
+ _sessionCatalogSource(_args.getNss()) {}
MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
invariant(_state == kDone);
@@ -192,6 +209,9 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
return storeCurrentLocsStatus;
}
+ // Prime up the session migration source if there are oplog entries to migrate.
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+
// Tell the recipient shard to start cloning
BSONObjBuilder cmdBuilder;
StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
@@ -314,6 +334,12 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) {
_callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
if (responseStatus.isOK()) {
_cleanup(opCtx);
+
+ if (_sessionCatalogSource.hasMoreOplog()) {
+ return {ErrorCodes::SessionTransferIncomplete,
+ "destination shard finished committing but there are still some session "
+ "metadata that needs to be transferred"};
+ }
return Status::OK();
}
@@ -344,7 +370,8 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj&
}
void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
- const BSONObj& insertedDoc) {
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = insertedDoc["_id"];
@@ -358,11 +385,19 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i'));
+ if (opCtx->getTxnNumber()) {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {}));
+ } else {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'i', {}, {}));
+ }
}
void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
- const BSONObj& updatedDoc) {
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = updatedDoc["_id"];
@@ -376,11 +411,19 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u'));
+ if (opCtx->getTxnNumber()) {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs));
+ } else {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'u', {}, {}));
+ }
}
void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
- const BSONObj& deletedDocId) {
+ const BSONObj& deletedDocId,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = deletedDocId["_id"];
@@ -390,7 +433,13 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd'));
+ if (opCtx->getTxnNumber()) {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs));
+ } else {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'd', {}, {}));
+ }
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -679,4 +728,27 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx,
arr.done();
}
+void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx,
+ BSONArrayBuilder* arrBuilder) {
+ while (_sessionCatalogSource.hasMoreOplog()) {
+ auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog();
+
+ if (oplogDoc.isEmpty()) {
+ // Last fetched turned out empty, try to see if there are more
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+ continue;
+ }
+
+ // Use the builder size instead of accumulating the document sizes directly so that we
+ // take into consideration the overhead of BSONArray indices.
+ if (arrBuilder->arrSize() &&
+ (arrBuilder->len() + oplogDoc.objsize() + 1024) > BSONObjMaxUserSize) {
+ break;
+ }
+
+ arrBuilder->append(oplogDoc);
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+ }
+}
+
} // namespace mongo