diff options
Diffstat (limited to 'src/mongo')
22 files changed, 420 insertions, 139 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 80a17b9ccf1..24825fb09ba 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -225,6 +225,7 @@ error_code("JSONSchemaNotAllowed", 224) error_code("TransactionTooOld", 225) error_code("AtomicityFailure", 226) error_code("CannotImplicitlyCreateCollection", 227); +error_code("SessionTransferIncomplete", 228) # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 9a6bd8e5be7..b466b70c3bb 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -115,10 +115,15 @@ BSONObj makeCollModCmdObj(const BSONObj& collModCmd, return cmdObjBuilder.obj(); } +struct OpTimeBundle { + repl::OpTime writeOpTime; + repl::OpTime prePostImageOpTime; +}; + /** * Write oplog entry(ies) for the update operation. */ -repl::OpTime replLogUpdate(OperationContext* opCtx, +OpTimeBundle replLogUpdate(OperationContext* opCtx, Session* session, const OplogUpdateEntryArgs& args) { BSONObj storeObj; @@ -138,6 +143,8 @@ repl::OpTime replLogUpdate(OperationContext* opCtx, oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); } + OpTimeBundle opTimes; + if (!storeObj.isEmpty() && opCtx->getTxnNumber()) { auto noteUpdateOpTime = repl::logOp(opCtx, "n", @@ -150,6 +157,8 @@ repl::OpTime replLogUpdate(OperationContext* opCtx, args.stmtId, {}); + opTimes.prePostImageOpTime = noteUpdateOpTime; + if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { oplogLink.preImageTs = noteUpdateOpTime.getTimestamp(); } else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) { @@ -157,22 +166,24 @@ repl::OpTime replLogUpdate(OperationContext* opCtx, } } - return repl::logOp(opCtx, - "u", - args.nss, - args.uuid, - args.update, - &args.criteria, - args.fromMigrate, - sessionInfo, - args.stmtId, - oplogLink); + opTimes.writeOpTime = repl::logOp(opCtx, + "u", + args.nss, + args.uuid, + args.update, + &args.criteria, + args.fromMigrate, + sessionInfo, + args.stmtId, + oplogLink); + + return opTimes; } /** * Write oplog entry(ies) for the delete operation. */ -repl::OpTime replLogDelete(OperationContext* opCtx, +OpTimeBundle replLogDelete(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, Session* session, @@ -189,22 +200,26 @@ repl::OpTime replLogDelete(OperationContext* opCtx, oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); } + OpTimeBundle opTimes; + if (deletedDoc && opCtx->getTxnNumber()) { auto noteOplog = repl::logOp( opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, sessionInfo, stmtId, {}); + opTimes.prePostImageOpTime = noteOplog; oplogLink.preImageTs = noteOplog.getTimestamp(); } - return repl::logOp(opCtx, - "d", - nss, - uuid, - deleteState.documentKey, - nullptr, - fromMigrate, - sessionInfo, - stmtId, - oplogLink); + opTimes.writeOpTime = repl::logOp(opCtx, + "d", + nss, + uuid, + deleteState.documentKey, + nullptr, + fromMigrate, + sessionInfo, + stmtId, + oplogLink); + return opTimes; } } // namespace @@ -253,7 +268,7 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, auto css = CollectionShardingState::get(opCtx, systemIndexes); if (!fromMigrate) { - css->onInsertOp(opCtx, indexDoc); + css->onInsertOp(opCtx, indexDoc, {}); } } @@ -264,15 +279,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator end, bool fromMigrate) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; - const auto lastOpTime = repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate); + + const size_t count = end - begin; + auto timestamps = stdx::make_unique<Timestamp[]>(count); + const auto lastOpTime = + repl::logInsertOps(opCtx, nss, uuid, session, begin, end, timestamps.get(), fromMigrate); auto css = CollectionShardingState::get(opCtx, nss.ns()); - for (auto it = begin; it != end; it++) { + size_t index = 0; + for (auto it = begin; it != end; it++, index++) { AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "i", nss, it->doc, nullptr); if (!fromMigrate) { - css->onInsertOp(opCtx, it->doc); + css->onInsertOp(opCtx, it->doc, timestamps[index]); } } @@ -312,7 +332,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg auto css = CollectionShardingState::get(opCtx, args.nss); if (!args.fromMigrate) { - css->onUpdateOp(opCtx, args.criteria, args.update, args.updatedDoc); + css->onUpdateOp(opCtx, + args.criteria, + args.update, + args.updatedDoc, + opTime.writeOpTime.getTimestamp(), + opTime.prePostImageOpTime.getTimestamp()); } if (args.nss.coll() == "system.js") { @@ -322,11 +347,13 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } else if (args.nss.ns() == FeatureCompatibilityVersion::kCollection) { FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc); } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace && - !opTime.isNull()) { + !opTime.writeOpTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc); } - onWriteOpCompleted(opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime); + + onWriteOpCompleted( + opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime); } auto OpObserverImpl::aboutToDelete(OperationContext* opCtx, @@ -356,7 +383,10 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, auto css = CollectionShardingState::get(opCtx, nss.ns()); if (!fromMigrate) { - css->onDeleteOp(opCtx, deleteState); + css->onDeleteOp(opCtx, + deleteState, + opTime.writeOpTime.getTimestamp(), + opTime.prePostImageOpTime.getTimestamp()); } if (nss.coll() == "system.js") { @@ -365,11 +395,12 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, DurableViewCatalog::onExternalChange(opCtx, nss); } else if (nss.ns() == FeatureCompatibilityVersion::kCollection) { FeatureCompatibilityVersion::onDelete(opCtx, deleteState.documentKey); - } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.isNull()) { + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && + !opTime.writeOpTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey); } - onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime); + onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime); } void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp index 13d3a1cae56..3033daa68f9 100644 --- a/src/mongo/db/ops/write_ops_retryability.cpp +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -90,16 +90,6 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, << ", oplog: " << redact(oplogEntry.toBSON()), opType == repl::OpTypeEnum::kUpdate); - uassert( - 40610, - str::stream() << "findAndModify retry request: " << redact(request.toBSON()) - << " is not compatible with previous write in the transaction of type: " - << OpType_serializer(oplogEntry.getOpType()) - << ", oplogTs: " - << ts.toString() - << ", oplog: " - << redact(oplogEntry.toBSON()), - !request.isUpsert()); if (request.shouldReturnNew()) { uassert(40611, diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp index f3415ae2427..a50c51eba58 100644 --- a/src/mongo/db/ops/write_ops_retryability_test.cpp +++ b/src/mongo/db/ops/write_ops_retryability_test.cpp @@ -273,23 +273,6 @@ TEST_F(FindAndModifyRetryability, NestedUpsert) { ASSERT_BSONOBJ_EQ(BSON("x" << 1), result.getValue()); } -TEST_F(FindAndModifyRetryability, ErrorIfRequestIsUpsertButOplogIsUpdate) { - auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); - request.setUpsert(true); - - Timestamp imageTs(120, 3); - repl::OplogEntry noteOplog( - repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); - - insertOplogEntry(noteOplog); - - repl::OplogEntry oplog( - repl::OpTime(), 0, repl::OpTypeEnum::kUpdate, kNs, BSON("x" << 1), BSON("y" << 1)); - oplog.setPreImageTs(imageTs); - - ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, oplog), AssertionException); -} - TEST_F(FindAndModifyRetryability, AttemptingToRetryUpsertWithUpdateWithoutUpsertErrors) { auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); request.setUpsert(false); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 6fa9c678fbd..f91ae11097b 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -452,6 +452,7 @@ repl::OpTime logInsertOps(OperationContext* opCtx, Session* session, std::vector<InsertStatement>::const_iterator begin, std::vector<InsertStatement>::const_iterator end, + Timestamp timestamps[], bool fromMigrate) { invariant(begin != end); @@ -480,7 +481,6 @@ repl::OpTime logInsertOps(OperationContext* opCtx, oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); } - auto timestamps = stdx::make_unique<Timestamp[]>(count); OpTime lastOpTime; for (size_t i = 0; i < count; i++) { // Make a mutable copy. @@ -512,7 +512,7 @@ repl::OpTime logInsertOps(OperationContext* opCtx, basePtrs[i] = &writers[i]; } invariant(!lastOpTime.isNull()); - _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime); + _logOpsInner(opCtx, nss, basePtrs.get(), timestamps, count, oplog, lastOpTime); wuow.commit(); return lastOpTime; } diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index a5eff8ac1aa..34a96b23b05 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -103,6 +103,8 @@ extern int OPLOG_VERSION; /** * Log insert(s) to the local oplog. * Returns the OpTime of the last insert. + * The timestamps parameter can also be modified and contain the individual timestamps for each + * insert after the oplog entries were created. */ OpTime logInsertOps(OperationContext* opCtx, const NamespaceString& nss, @@ -110,6 +112,7 @@ OpTime logInsertOps(OperationContext* opCtx, Session* session, std::vector<InsertStatement>::const_iterator begin, std::vector<InsertStatement>::const_iterator end, + Timestamp timestamps[], bool fromMigrate); /** diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 9e6477c1d4b..575c442ac74 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -267,7 +267,9 @@ boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj co return _metadataManager->getNextOrphanRange(from); } -void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) { +void CollectionShardingState::onInsertOp(OperationContext* opCtx, + const BSONObj& insertedDoc, + const Timestamp& oplogTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -291,14 +293,16 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc); + _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, oplogTs); } } void CollectionShardingState::onUpdateOp(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, - const BSONObj& updatedDoc) { + const BSONObj& updatedDoc, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -315,7 +319,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx, checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc); + _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, oplogTs, prePostImageTs); } } @@ -324,7 +328,10 @@ auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState _sourceMgr && _sourceMgr->getCloner()->isDocumentInMigratingChunk(doc)}; } -void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState) { +void CollectionShardingState::onDeleteOp(OperationContext* opCtx, + const DeleteState& deleteState, + const Timestamp& oplogTs, + const Timestamp& preImageTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -365,7 +372,7 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteSt checkShardVersionOrThrow(opCtx); if (_sourceMgr && deleteState.isMigrating) { - _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey); + _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, oplogTs, preImageTs); } } diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 5aa44336f70..6f9722dceb9 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -49,6 +49,7 @@ struct ChunkVersion; class CollectionMetadata; class MigrationSourceManager; class OperationContext; +class Timestamp; /** * Contains all sharding-related runtime state for a given collection. One such object is assigned @@ -223,12 +224,17 @@ public: * * The global exclusive lock is expected to be held by the caller of any of these functions. */ - void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc); + void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const Timestamp& oplogTs); void onUpdateOp(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, - const BSONObj& updatedDoc); - void onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState); + const BSONObj& updatedDoc, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs); + void onDeleteOp(OperationContext* opCtx, + const DeleteState& deleteState, + const Timestamp& oplogTs, + const Timestamp& preImageTs); void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName); private: diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index 844727e72c4..9ac9f4a9702 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -78,7 +78,7 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { shardIdentity.setClusterId(OID::gen()); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); ASSERT_EQ(0, getInitCallCount()); @@ -103,7 +103,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { { WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); ASSERT_EQ(0, getInitCallCount()); } @@ -125,7 +125,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit shardIdentity.setClusterId(OID::gen()); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); ASSERT_EQ(0, getInitCallCount()); @@ -144,7 +144,7 @@ TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument ShardIdentityType shardIdentity; shardIdentity.setShardName("a"); - ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()), + ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}), AssertionException); } @@ -156,7 +156,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNot NamespaceString::kServerConfigurationNamespace); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), BSON("_id" << 1)); + collShardingState.onInsertOp(operationContext(), BSON("_id" << 1), {}); ASSERT_EQ(0, getInitCallCount()); diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index 750bcdcda80..9d9a1407f1d 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -36,6 +36,7 @@ namespace mongo { class BSONObj; class OperationContext; class Status; +class Timestamp; /** * This class is responsible for producing chunk documents to be moved from donor to a recipient @@ -118,7 +119,9 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) = 0; + virtual void onInsertOp(OperationContext* opCtx, + const BSONObj& insertedDoc, + const Timestamp& oplogTs) = 0; /** * Notifies this cloner that an update happened to the collection, which it owns. It is up to @@ -127,7 +130,10 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) = 0; + virtual void onUpdateOp(OperationContext* opCtx, + const BSONObj& updatedDoc, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) = 0; /** * Notifies this cloner that a delede happened to the collection, which it owns. It is up to the @@ -136,7 +142,10 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) = 0; + virtual void onDeleteOp(OperationContext* opCtx, + const BSONObj& deletedDocId, + const Timestamp& oplogTs, + const Timestamp& preImageTs) = 0; protected: MigrationChunkClonerSource(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 70a76e5e473..65a75e4c9e0 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -135,8 +135,14 @@ public: */ LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner, const BSONObj& idObj, - const char op) - : _cloner(cloner), _idObj(idObj.getOwned()), _op(op) {} + const char op, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) + : _cloner(cloner), + _idObj(idObj.getOwned()), + _op(op), + _oplogTs(oplogTs), + _prePostImageTs(prePostImageTs) {} void commit() override { switch (_op) { @@ -156,6 +162,14 @@ public: default: MONGO_UNREACHABLE; } + + if (!_prePostImageTs.isNull()) { + _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs); + } + + if (!_oplogTs.isNull()) { + _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs); + } } void rollback() override {} @@ -164,6 +178,8 @@ private: MigrationChunkClonerSourceLegacy* const _cloner; const BSONObj _idObj; const char _op; + const Timestamp _oplogTs; + const Timestamp _prePostImageTs; }; MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request, @@ -175,7 +191,8 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ _sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(), _args.getToShardId().toString())), _donorConnStr(std::move(donorConnStr)), - _recipientHost(std::move(recipientHost)) {} + _recipientHost(std::move(recipientHost)), + _sessionCatalogSource(_args.getNss()) {} MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(_state == kDone); @@ -192,6 +209,9 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { return storeCurrentLocsStatus; } + // Prime up the session migration source if there are oplog entries to migrate. + _sessionCatalogSource.fetchNextOplog(opCtx); + // Tell the recipient shard to start cloning BSONObjBuilder cmdBuilder; StartChunkCloneRequest::appendAsCommand(&cmdBuilder, @@ -314,6 +334,12 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) { _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId)); if (responseStatus.isOK()) { _cleanup(opCtx); + + if (_sessionCatalogSource.hasMoreOplog()) { + return {ErrorCodes::SessionTransferIncomplete, + "destination shard finished committing but there are still some session " + "metadata that needs to be transferred"}; + } return Status::OK(); } @@ -344,7 +370,8 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj& } void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, - const BSONObj& insertedDoc) { + const BSONObj& insertedDoc, + const Timestamp& oplogTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = insertedDoc["_id"]; @@ -358,11 +385,14 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i')); + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {})); } void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, - const BSONObj& updatedDoc) { + const BSONObj& updatedDoc, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = updatedDoc["_id"]; @@ -376,11 +406,14 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u')); + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs)); } void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, - const BSONObj& deletedDocId) { + const BSONObj& deletedDocId, + const Timestamp& oplogTs, + const Timestamp& preImageTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = deletedDocId["_id"]; @@ -390,7 +423,8 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd')); + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs)); } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -679,4 +713,27 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, arr.done(); } +void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx, + BSONArrayBuilder* arrBuilder) { + while (_sessionCatalogSource.hasMoreOplog()) { + auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog(); + + if (oplogDoc.isEmpty()) { + // Last fetched turned out empty, try to see if there are more + _sessionCatalogSource.fetchNextOplog(opCtx); + continue; + } + + // Use the builder size instead of accumulating the document sizes directly so that we + // take into consideration the overhead of BSONArray indices. + if (arrBuilder->arrSize() && + (arrBuilder->len() + oplogDoc.objsize() + 1024) > BSONObjMaxUserSize) { + break; + } + + arrBuilder->append(oplogDoc); + _sessionCatalogSource.fetchNextOplog(opCtx); + } +} + } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index c072718475a..72d299b58e4 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -37,6 +37,7 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/s/migration_chunk_cloner_source.h" #include "mongo/db/s/migration_session_id.h" +#include "mongo/db/s/session_catalog_migration_source.h" #include "mongo/s/move_chunk_request.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/memory.h" @@ -72,11 +73,19 @@ public: bool isDocumentInMigratingChunk(const BSONObj& doc) override; - void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) override; + void onInsertOp(OperationContext* opCtx, + const BSONObj& insertedDoc, + const Timestamp& oplogTs) override; - void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) override; + void onUpdateOp(OperationContext* opCtx, + const BSONObj& updatedDoc, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) override; - void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) override; + void onDeleteOp(OperationContext* opCtx, + const BSONObj& deletedDocId, + const Timestamp& oplogTs, + const Timestamp& preImageTs) override; // Legacy cloner specific functionality @@ -121,6 +130,8 @@ public: */ Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder); + void nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder); + private: friend class DeleteNotificationStage; friend class LogOpForShardingHandler; @@ -183,6 +194,8 @@ private: // during the cloning stage std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _deleteNotifyExec; + SessionCatalogMigrationSource _sessionCatalogSource; + // Protects the entries below stdx::mutex _mutex; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index 9e3e83774ee..d42c6bdaa99 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -216,5 +216,57 @@ public: } transferModsCommand; +/** + * Command for extracting the oplog entries that needs to be migrated for the given migration + * session id. + * Note: this command is not stateless. Calling this command has a side-effect of gradually + * depleting the buffer that contains the oplog entries to be transfered. + */ +class MigrateSessionCommand : public BasicCommand { +public: + MigrateSessionCommand() : BasicCommand("_getNextSessionMods") {} + + void help(std::stringstream& h) const { + h << "internal"; + } + + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + return false; + } + + virtual bool slaveOk() const { + return false; + } + + virtual bool adminOnly() const { + return true; + } + + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + + bool run(OperationContext* opCtx, + const std::string&, + const BSONObj& cmdObj, + BSONObjBuilder& result) { + const MigrationSessionId migrationSessionId( + uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); + + BSONArrayBuilder arrBuilder; + + AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); + autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder); + + result.appendArray("oplog", arrBuilder.arr()); + return true; + } + +} migrateSessionCommand; + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 193c237db5c..a15f6487a33 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -246,14 +246,14 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) { WriteUnitOfWork wuow(operationContext()); - cloner.onInsertOp(operationContext(), createCollectionDocument(90)); - cloner.onInsertOp(operationContext(), createCollectionDocument(150)); - cloner.onInsertOp(operationContext(), createCollectionDocument(151)); - cloner.onInsertOp(operationContext(), createCollectionDocument(210)); - - cloner.onDeleteOp(operationContext(), createCollectionDocument(80)); - cloner.onDeleteOp(operationContext(), createCollectionDocument(199)); - cloner.onDeleteOp(operationContext(), createCollectionDocument(220)); + cloner.onInsertOp(operationContext(), createCollectionDocument(90), {}); + cloner.onInsertOp(operationContext(), createCollectionDocument(150), {}); + cloner.onInsertOp(operationContext(), createCollectionDocument(151), {}); + cloner.onInsertOp(operationContext(), createCollectionDocument(210), {}); + + cloner.onDeleteOp(operationContext(), createCollectionDocument(80), {}, {}); + cloner.onDeleteOp(operationContext(), createCollectionDocument(199), {}, {}); + cloner.onDeleteOp(operationContext(), createCollectionDocument(220), {}, {}); wuow.commit(); } diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index ca2c736a6b1..86688ba08ca 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -57,6 +57,8 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/chrono.h" #include "mongo/util/concurrency/notification.h" @@ -231,6 +233,8 @@ void MigrationDestinationManager::setStateFail(std::string msg) { _errmsg = std::move(msg); _state = FAIL; } + + _sessionMigration->forceFail(msg); } void MigrationDestinationManager::setStateFailWarn(std::string msg) { @@ -240,6 +244,8 @@ void MigrationDestinationManager::setStateFailWarn(std::string msg) { _errmsg = std::move(msg); _state = FAIL; } + + _sessionMigration->forceFail(msg); } bool MigrationDestinationManager::isActive() const { @@ -334,6 +340,9 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, _migrateThreadHandle.join(); } + _sessionMigration = + stdx::make_unique<SessionCatalogMigrationDestination>(fromShard, *_sessionId); + _migrateThreadHandle = stdx::thread([this, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() { _migrateThread(min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); @@ -397,6 +406,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio << _sessionId->toString()}; } + _sessionMigration->finish(); _state = COMMIT_START; auto const deadline = Date_t::now() + Seconds(30); @@ -411,6 +421,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio if (_state != DONE) { return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer"}; } + return Status::OK(); } @@ -672,6 +683,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, // 3. Initial bulk clone setState(CLONE); + _sessionMigration->start(opCtx->getServiceContext()); + const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId); _chunkMarkedPending = true; // no lock needed, only the migrate thread looks. @@ -829,13 +842,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, break; } - sleepsecs(1); + opCtx->sleepFor(Seconds(1)); } if (t.minutes() >= 600) { setStateFail("Cannot go to critical section because secondaries cannot keep up"); return; } + + _sessionMigration->waitUntilReadyToCommit(opCtx); } { @@ -895,6 +910,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep5); } + _sessionMigration->join(); + if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) { + setStateFail(_sessionMigration->getErrMsg()); + return; + } + setState(DONE); timing.done(6); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 610f70196c1..49c6f1e5089 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -39,6 +39,7 @@ #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_session_id.h" +#include "mongo/db/s/session_catalog_migration_destination.h" #include "mongo/s/shard_id.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" @@ -211,6 +212,8 @@ private: State _state{READY}; std::string _errmsg; + + std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration; }; } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index c9344031555..1c4c250dac1 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -201,7 +201,6 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx, */ ProcessOplogResult processSessionOplog(OperationContext* opCtx, const BSONObj& oplogBSON, - // const Timestamp& prePostImageTs, const ProcessOplogResult& lastResult) { ProcessOplogResult result; auto oplogEntry = parseOplog(oplogBSON); @@ -240,50 +239,62 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, const auto& sessionInfo = oplogEntry.getOperationSessionInfo(); result.sessionId = sessionInfo.getSessionId().value(); result.txnNum = sessionInfo.getTxnNumber().value(); + const auto stmtId = *oplogEntry.getStatementId(); auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); scopedSession->beginTxn(opCtx, result.txnNum); + if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) { + return lastResult; + } + BSONObj object(result.isPrePostImage ? oplogEntry.getObject() : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum); - writeConflictRetry(opCtx, - "SessionOplogMigration", - NamespaceString::kSessionTransactionsTableNamespace.ns(), - [&] { - Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX); - WriteUnitOfWork wunit(opCtx); - - result.oplogTime = repl::logOp(opCtx, - "n", - oplogEntry.getNamespace(), - oplogEntry.getUuid(), - object, - &object2, - true, - sessionInfo, - *oplogEntry.getStatementId(), - oplogLink); - - auto oplogTs = result.oplogTime.getTimestamp(); - uassert(40633, - str::stream() - << "Failed to create new oplog entry for oplog with opTime: " - << oplogEntry.getOpTime().toString() - << ": " - << redact(oplogBSON), - !oplogTs.isNull()); - - if (!result.isPrePostImage) { - scopedSession->onWriteOpCompletedOnPrimary( - opCtx, result.txnNum, {*oplogEntry.getStatementId()}, oplogTs); - } - - wunit.commit(); - }); + writeConflictRetry( + opCtx, + "SessionOplogMigration", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + // Need to take global lock here so repl::logOp will not unlock it and trigger the + // invariant that disallows unlocking global lock while inside a WUOW. + // Grab a DBLock here instead of plain GlobalLock to make sure the MMAPV1 flush + // lock will be lock/unlocked correctly. Take the transaction table db lock to + // ensure the same lock ordering with normal replicated updates to the table. + Lock::DBLock lk( + opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX); + WriteUnitOfWork wunit(opCtx); + + result.oplogTime = repl::logOp(opCtx, + "n", + oplogEntry.getNamespace(), + oplogEntry.getUuid(), + object, + &object2, + true, + sessionInfo, + stmtId, + oplogLink); + + auto oplogTs = result.oplogTime.getTimestamp(); + uassert(40633, + str::stream() << "Failed to create new oplog entry for oplog with opTime: " + << oplogEntry.getOpTime().toString() + << ": " + << redact(oplogBSON), + !oplogTs.isNull()); + + // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because + // the next oplog will contain the real operation. + if (!result.isPrePostImage) { + scopedSession->onWriteOpCompletedOnPrimary(opCtx, result.txnNum, {stmtId}, oplogTs); + } + + wunit.commit(); + }); return result; } @@ -307,6 +318,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_state == State::NotStarted); _state = State::Migrating; + _isStateChanged.notify_all(); } _thread = stdx::thread(stdx::bind( @@ -316,6 +328,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { void SessionCatalogMigrationDestination::finish() { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::Committing; + _isStateChanged.notify_all(); } void SessionCatalogMigrationDestination::join() { @@ -345,6 +358,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service // Timestamp prePostImageTs; ProcessOplogResult lastResult; + repl::OpTime lastOpTimeWaited; while (true) { { @@ -381,8 +395,17 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service // Note: only transition to "ready to commit" if state is not error/force stop. if (_state == State::Migrating) { _state = State::ReadyToCommit; + _isStateChanged.notify_all(); } } + + if (lastOpTimeWaited == lastResult.oplogTime) { + // We got an empty result at least twice in a row from the source shard so + // space it out a little bit so we don't hammer the shard. + opCtx->sleepFor(Milliseconds(200)); + } + + lastOpTimeWaited = lastResult.oplogTime; } while (oplogIter.more()) { @@ -411,6 +434,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::Done; + _isStateChanged.notify_all(); } } @@ -423,6 +447,8 @@ void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::ErrorOccurred; _errMsg = errMsg.toString(); + + _isStateChanged.notify_all(); } SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::getState() { @@ -430,4 +456,15 @@ SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::ge return _state; } +void SessionCatalogMigrationDestination::forceFail(std::string& errMsg) { + _errorOccurred(errMsg); +} + +void SessionCatalogMigrationDestination::waitUntilReadyToCommit(OperationContext* opCtx) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (_state == State::Migrating) { + opCtx->waitForConditionOrInterrupt(_isStateChanged, lk); + } +} + } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h index 2e740f2f6bb..a67bd8012f6 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.h +++ b/src/mongo/db/s/session_catalog_migration_destination.h @@ -37,6 +37,7 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/s/shard_id.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/with_lock.h" @@ -88,6 +89,17 @@ public: void join(); /** + * Forces this into an error state which will also stop session transfer thread. + */ + void forceFail(std::string& errMsg); + + /** + * Blocks until state changes is not Migrating. In other words, can return when state + * becomes ReadyToCommit/Done/ErrorOccurred, etc. + */ + void waitUntilReadyToCommit(OperationContext* opCtx); + + /** * Returns the current state. */ State getState(); @@ -109,6 +121,7 @@ private: // Protects _state and _errMsg. stdx::mutex _mutex; + stdx::condition_variable _isStateChanged; State _state = State::NotStarted; std::string _errMsg; // valid only if _state == ErrorOccurred. }; diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index a1a76dfbd1d..1ec90ca88b1 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -1303,6 +1303,59 @@ TEST_F(SessionCatalogMigrationDestinationTest, ASSERT_FALSE(sessionMigration.getErrMsg().empty()); } +TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatements) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + auto opCtx = operationContext(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(19); + + insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), 30); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OplogEntry oplog1( + OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog1.setOperationSessionInfo(sessionInfo); + oplog1.setStatementId(23); + + OplogEntry oplog2(OpTime(Timestamp(70, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + oplog2.setOperationSessionInfo(sessionInfo); + oplog2.setStatementId(30); + + OplogEntry oplog3(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + oplog3.setOperationSessionInfo(sessionInfo); + oplog3.setStatementId(45); + + returnOplog({oplog1, oplog2, oplog3}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto session = getSessionWithTxn(opCtx, sessionId, 19); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(19)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + auto firstInsertOplog = historyIter.next(opCtx); + + ASSERT_TRUE(firstInsertOplog.getOpType() == OpTypeEnum::kInsert); + ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject()); + ASSERT_TRUE(firstInsertOplog.getStatementId()); + ASSERT_EQ(30, *firstInsertOplog.getStatementId()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index e79656ba94b..829d862b370 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -76,7 +76,6 @@ BSONObj SessionCatalogMigrationSource::getLastFetchedOplog() { { stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); - invariant(!_lastFetchedNewWriteOplog.isEmpty()); return _lastFetchedNewWriteOplog; } } @@ -101,11 +100,13 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte return false; } - _lastFetchedOplog = nextOplog.toBSON().getOwned(); - + auto nextOplogBSON = nextOplog.toBSON().getOwned(); auto doc = fetchPrePostImageOplog(opCtx, nextOplog); if (!doc.isEmpty()) { - _lastFetchedOplogBuffer.push_back(doc); + _lastFetchedOplogBuffer.push_back(nextOplogBSON); + _lastFetchedOplog = doc; + } else { + _lastFetchedOplog = nextOplogBSON; } return true; diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 42a3e71e372..5e44531d31c 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -68,6 +68,7 @@ public: /** * Returns the oplog document that was last fetched by the fetchNextOplog call. + * Returns an empty object if there are no oplog to fetch. */ BSONObj getLastFetchedOplog(); diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index ad4942292ea..ff43cce5d0b 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -216,7 +216,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd SessionCatalogMigrationSource migrationSource(kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - auto expectedSequece = {entry4.toBSON(), entry3.toBSON(), entry2.toBSON(), entry1.toBSON()}; + auto expectedSequece = {entry3.toBSON(), entry4.toBSON(), entry1.toBSON(), entry2.toBSON()}; for (auto oplogDoc : expectedSequece) { ASSERT_TRUE(migrationSource.hasMoreOplog()); |