diff options
author | Randolph Tan <randolph@10gen.com> | 2017-09-28 10:45:49 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-09-28 10:45:49 -0400 |
commit | f24fbb0011c6ded9101f08574e7cd07e63690a9b (patch) | |
tree | e1452828e142748f1f03be61a00c32dbb3ed6bc1 /src | |
parent | d293f6857bcb36b26ca8fa03d90299714fe060de (diff) | |
download | mongo-f24fbb0011c6ded9101f08574e7cd07e63690a9b.tar.gz |
Revert "Revert "Revert "SERVER-30894 Implement command for transferring session information during migration"""
This reverts commit aaa0c96532ba6a8ea9146e4298c6bf1cc6b27f9e.
Diffstat (limited to 'src')
22 files changed, 139 insertions, 420 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 24825fb09ba..80a17b9ccf1 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -225,7 +225,6 @@ error_code("JSONSchemaNotAllowed", 224) error_code("TransactionTooOld", 225) error_code("AtomicityFailure", 226) error_code("CannotImplicitlyCreateCollection", 227); -error_code("SessionTransferIncomplete", 228) # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index b466b70c3bb..9a6bd8e5be7 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -115,15 +115,10 @@ BSONObj makeCollModCmdObj(const BSONObj& collModCmd, return cmdObjBuilder.obj(); } -struct OpTimeBundle { - repl::OpTime writeOpTime; - repl::OpTime prePostImageOpTime; -}; - /** * Write oplog entry(ies) for the update operation. */ -OpTimeBundle replLogUpdate(OperationContext* opCtx, +repl::OpTime replLogUpdate(OperationContext* opCtx, Session* session, const OplogUpdateEntryArgs& args) { BSONObj storeObj; @@ -143,8 +138,6 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); } - OpTimeBundle opTimes; - if (!storeObj.isEmpty() && opCtx->getTxnNumber()) { auto noteUpdateOpTime = repl::logOp(opCtx, "n", @@ -157,8 +150,6 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, args.stmtId, {}); - opTimes.prePostImageOpTime = noteUpdateOpTime; - if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { oplogLink.preImageTs = noteUpdateOpTime.getTimestamp(); } else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) { @@ -166,24 +157,22 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, } } - opTimes.writeOpTime = repl::logOp(opCtx, - "u", - args.nss, - args.uuid, - args.update, - &args.criteria, - args.fromMigrate, - sessionInfo, - args.stmtId, - oplogLink); - - return opTimes; + return repl::logOp(opCtx, + "u", + args.nss, + args.uuid, + args.update, + &args.criteria, + args.fromMigrate, + sessionInfo, + args.stmtId, + oplogLink); } /** * Write oplog entry(ies) for the delete operation. */ -OpTimeBundle replLogDelete(OperationContext* opCtx, +repl::OpTime replLogDelete(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, Session* session, @@ -200,26 +189,22 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); } - OpTimeBundle opTimes; - if (deletedDoc && opCtx->getTxnNumber()) { auto noteOplog = repl::logOp( opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, sessionInfo, stmtId, {}); - opTimes.prePostImageOpTime = noteOplog; oplogLink.preImageTs = noteOplog.getTimestamp(); } - opTimes.writeOpTime = repl::logOp(opCtx, - "d", - nss, - uuid, - deleteState.documentKey, - nullptr, - fromMigrate, - sessionInfo, - stmtId, - oplogLink); - return opTimes; + return repl::logOp(opCtx, + "d", + nss, + uuid, + deleteState.documentKey, + nullptr, + fromMigrate, + sessionInfo, + stmtId, + oplogLink); } } // namespace @@ -268,7 +253,7 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, auto css = CollectionShardingState::get(opCtx, systemIndexes); if (!fromMigrate) { - css->onInsertOp(opCtx, indexDoc, {}); + css->onInsertOp(opCtx, indexDoc); } } @@ -279,20 +264,15 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator end, bool fromMigrate) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; - - const size_t count = end - begin; - auto timestamps = stdx::make_unique<Timestamp[]>(count); - const auto lastOpTime = - repl::logInsertOps(opCtx, nss, uuid, session, begin, end, timestamps.get(), fromMigrate); + const auto lastOpTime = repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate); auto css = CollectionShardingState::get(opCtx, nss.ns()); - size_t index = 0; - for (auto it = begin; it != end; it++, index++) { + for (auto it = begin; it != end; it++) { AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "i", nss, it->doc, nullptr); if (!fromMigrate) { - css->onInsertOp(opCtx, it->doc, timestamps[index]); + css->onInsertOp(opCtx, it->doc); } } @@ -332,12 +312,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg auto css = CollectionShardingState::get(opCtx, args.nss); if (!args.fromMigrate) { - css->onUpdateOp(opCtx, - args.criteria, - args.update, - args.updatedDoc, - opTime.writeOpTime.getTimestamp(), - opTime.prePostImageOpTime.getTimestamp()); + css->onUpdateOp(opCtx, args.criteria, args.update, args.updatedDoc); } if (args.nss.coll() == "system.js") { @@ -347,13 +322,11 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } else if (args.nss.ns() == FeatureCompatibilityVersion::kCollection) { FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc); } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace && - !opTime.writeOpTime.isNull()) { + !opTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc); } - - onWriteOpCompleted( - opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime); + onWriteOpCompleted(opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime); } auto OpObserverImpl::aboutToDelete(OperationContext* opCtx, @@ -383,10 +356,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, auto css = CollectionShardingState::get(opCtx, nss.ns()); if (!fromMigrate) { - css->onDeleteOp(opCtx, - deleteState, - opTime.writeOpTime.getTimestamp(), - opTime.prePostImageOpTime.getTimestamp()); + css->onDeleteOp(opCtx, deleteState); } if (nss.coll() == "system.js") { @@ -395,12 +365,11 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, DurableViewCatalog::onExternalChange(opCtx, nss); } else if (nss.ns() == FeatureCompatibilityVersion::kCollection) { FeatureCompatibilityVersion::onDelete(opCtx, deleteState.documentKey); - } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && - !opTime.writeOpTime.isNull()) { + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey); } - onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime); + onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime); } void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp index 3033daa68f9..13d3a1cae56 100644 --- a/src/mongo/db/ops/write_ops_retryability.cpp +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -90,6 +90,16 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, << ", oplog: " << redact(oplogEntry.toBSON()), opType == repl::OpTypeEnum::kUpdate); + uassert( + 40610, + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) + << " is not compatible with previous write in the transaction of type: " + << OpType_serializer(oplogEntry.getOpType()) + << ", oplogTs: " + << ts.toString() + << ", oplog: " + << redact(oplogEntry.toBSON()), + !request.isUpsert()); if (request.shouldReturnNew()) { uassert(40611, diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp index a50c51eba58..f3415ae2427 100644 --- a/src/mongo/db/ops/write_ops_retryability_test.cpp +++ b/src/mongo/db/ops/write_ops_retryability_test.cpp @@ -273,6 +273,23 @@ TEST_F(FindAndModifyRetryability, NestedUpsert) { ASSERT_BSONOBJ_EQ(BSON("x" << 1), result.getValue()); } +TEST_F(FindAndModifyRetryability, ErrorIfRequestIsUpsertButOplogIsUpdate) { + auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); + request.setUpsert(true); + + Timestamp imageTs(120, 3); + repl::OplogEntry noteOplog( + repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); + + insertOplogEntry(noteOplog); + + repl::OplogEntry oplog( + repl::OpTime(), 0, repl::OpTypeEnum::kUpdate, kNs, BSON("x" << 1), BSON("y" << 1)); + oplog.setPreImageTs(imageTs); + + ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, oplog), AssertionException); +} + TEST_F(FindAndModifyRetryability, AttemptingToRetryUpsertWithUpdateWithoutUpsertErrors) { auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); request.setUpsert(false); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index f91ae11097b..6fa9c678fbd 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -452,7 +452,6 @@ repl::OpTime logInsertOps(OperationContext* opCtx, Session* session, std::vector<InsertStatement>::const_iterator begin, std::vector<InsertStatement>::const_iterator end, - Timestamp timestamps[], bool fromMigrate) { invariant(begin != end); @@ -481,6 +480,7 @@ repl::OpTime logInsertOps(OperationContext* opCtx, oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); } + auto timestamps = stdx::make_unique<Timestamp[]>(count); OpTime lastOpTime; for (size_t i = 0; i < count; i++) { // Make a mutable copy. @@ -512,7 +512,7 @@ repl::OpTime logInsertOps(OperationContext* opCtx, basePtrs[i] = &writers[i]; } invariant(!lastOpTime.isNull()); - _logOpsInner(opCtx, nss, basePtrs.get(), timestamps, count, oplog, lastOpTime); + _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime); wuow.commit(); return lastOpTime; } diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 34a96b23b05..a5eff8ac1aa 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -103,8 +103,6 @@ extern int OPLOG_VERSION; /** * Log insert(s) to the local oplog. * Returns the OpTime of the last insert. - * The timestamps parameter can also be modified and contain the individual timestamps for each - * insert after the oplog entries were created. */ OpTime logInsertOps(OperationContext* opCtx, const NamespaceString& nss, @@ -112,7 +110,6 @@ OpTime logInsertOps(OperationContext* opCtx, Session* session, std::vector<InsertStatement>::const_iterator begin, std::vector<InsertStatement>::const_iterator end, - Timestamp timestamps[], bool fromMigrate); /** diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 575c442ac74..9e6477c1d4b 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -267,9 +267,7 @@ boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj co return _metadataManager->getNextOrphanRange(from); } -void CollectionShardingState::onInsertOp(OperationContext* opCtx, - const BSONObj& insertedDoc, - const Timestamp& oplogTs) { +void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -293,16 +291,14 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, oplogTs); + _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc); } } void CollectionShardingState::onUpdateOp(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, - const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) { + const BSONObj& updatedDoc) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -319,7 +315,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx, checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, oplogTs, prePostImageTs); + _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc); } } @@ -328,10 +324,7 @@ auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState _sourceMgr && _sourceMgr->getCloner()->isDocumentInMigratingChunk(doc)}; } -void CollectionShardingState::onDeleteOp(OperationContext* opCtx, - const DeleteState& deleteState, - const Timestamp& oplogTs, - const Timestamp& preImageTs) { +void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -372,7 +365,7 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, checkShardVersionOrThrow(opCtx); if (_sourceMgr && deleteState.isMigrating) { - _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, oplogTs, preImageTs); + _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey); } } diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 6f9722dceb9..5aa44336f70 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -49,7 +49,6 @@ struct ChunkVersion; class CollectionMetadata; class MigrationSourceManager; class OperationContext; -class Timestamp; /** * Contains all sharding-related runtime state for a given collection. One such object is assigned @@ -224,17 +223,12 @@ public: * * The global exclusive lock is expected to be held by the caller of any of these functions. */ - void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const Timestamp& oplogTs); + void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc); void onUpdateOp(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, - const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs); - void onDeleteOp(OperationContext* opCtx, - const DeleteState& deleteState, - const Timestamp& oplogTs, - const Timestamp& preImageTs); + const BSONObj& updatedDoc); + void onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState); void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName); private: diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index 9ac9f4a9702..844727e72c4 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -78,7 +78,7 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { shardIdentity.setClusterId(OID::gen()); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); @@ -103,7 +103,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { { WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); } @@ -125,7 +125,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit shardIdentity.setClusterId(OID::gen()); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); @@ -144,7 +144,7 @@ TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument ShardIdentityType shardIdentity; shardIdentity.setShardName("a"); - ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}), + ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()), AssertionException); } @@ -156,7 +156,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNot NamespaceString::kServerConfigurationNamespace); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), BSON("_id" << 1), {}); + collShardingState.onInsertOp(operationContext(), BSON("_id" << 1)); ASSERT_EQ(0, getInitCallCount()); diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index 9d9a1407f1d..750bcdcda80 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -36,7 +36,6 @@ namespace mongo { class BSONObj; class OperationContext; class Status; -class Timestamp; /** * This class is responsible for producing chunk documents to be moved from donor to a recipient @@ -119,9 +118,7 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onInsertOp(OperationContext* opCtx, - const BSONObj& insertedDoc, - const Timestamp& oplogTs) = 0; + virtual void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) = 0; /** * Notifies this cloner that an update happened to the collection, which it owns. It is up to @@ -130,10 +127,7 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onUpdateOp(OperationContext* opCtx, - const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) = 0; + virtual void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) = 0; /** * Notifies this cloner that a delede happened to the collection, which it owns. It is up to the @@ -142,10 +136,7 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onDeleteOp(OperationContext* opCtx, - const BSONObj& deletedDocId, - const Timestamp& oplogTs, - const Timestamp& preImageTs) = 0; + virtual void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) = 0; protected: MigrationChunkClonerSource(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 65a75e4c9e0..70a76e5e473 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -135,14 +135,8 @@ public: */ LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner, const BSONObj& idObj, - const char op, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) - : _cloner(cloner), - _idObj(idObj.getOwned()), - _op(op), - _oplogTs(oplogTs), - _prePostImageTs(prePostImageTs) {} + const char op) + : _cloner(cloner), _idObj(idObj.getOwned()), _op(op) {} void commit() override { switch (_op) { @@ -162,14 +156,6 @@ public: default: MONGO_UNREACHABLE; } - - if (!_prePostImageTs.isNull()) { - _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs); - } - - if (!_oplogTs.isNull()) { - _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs); - } } void rollback() override {} @@ -178,8 +164,6 @@ private: MigrationChunkClonerSourceLegacy* const _cloner; const BSONObj _idObj; const char _op; - const Timestamp _oplogTs; - const Timestamp _prePostImageTs; }; MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request, @@ -191,8 +175,7 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ _sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(), _args.getToShardId().toString())), _donorConnStr(std::move(donorConnStr)), - _recipientHost(std::move(recipientHost)), - _sessionCatalogSource(_args.getNss()) {} + _recipientHost(std::move(recipientHost)) {} MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(_state == kDone); @@ -209,9 +192,6 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { return storeCurrentLocsStatus; } - // Prime up the session migration source if there are oplog entries to migrate. - _sessionCatalogSource.fetchNextOplog(opCtx); - // Tell the recipient shard to start cloning BSONObjBuilder cmdBuilder; StartChunkCloneRequest::appendAsCommand(&cmdBuilder, @@ -334,12 +314,6 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) { _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId)); if (responseStatus.isOK()) { _cleanup(opCtx); - - if (_sessionCatalogSource.hasMoreOplog()) { - return {ErrorCodes::SessionTransferIncomplete, - "destination shard finished committing but there are still some session " - "metadata that needs to be transferred"}; - } return Status::OK(); } @@ -370,8 +344,7 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj& } void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, - const BSONObj& insertedDoc, - const Timestamp& oplogTs) { + const BSONObj& insertedDoc) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = insertedDoc["_id"]; @@ -385,14 +358,11 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange( - new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {})); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i')); } void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, - const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) { + const BSONObj& updatedDoc) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = updatedDoc["_id"]; @@ -406,14 +376,11 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange( - new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs)); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u')); } void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, - const BSONObj& deletedDocId, - const Timestamp& oplogTs, - const Timestamp& preImageTs) { + const BSONObj& deletedDocId) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = deletedDocId["_id"]; @@ -423,8 +390,7 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange( - new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs)); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd')); } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -713,27 +679,4 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, arr.done(); } -void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx, - BSONArrayBuilder* arrBuilder) { - while (_sessionCatalogSource.hasMoreOplog()) { - auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog(); - - if (oplogDoc.isEmpty()) { - // Last fetched turned out empty, try to see if there are more - _sessionCatalogSource.fetchNextOplog(opCtx); - continue; - } - - // Use the builder size instead of accumulating the document sizes directly so that we - // take into consideration the overhead of BSONArray indices. - if (arrBuilder->arrSize() && - (arrBuilder->len() + oplogDoc.objsize() + 1024) > BSONObjMaxUserSize) { - break; - } - - arrBuilder->append(oplogDoc); - _sessionCatalogSource.fetchNextOplog(opCtx); - } -} - } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 72d299b58e4..c072718475a 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -37,7 +37,6 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/s/migration_chunk_cloner_source.h" #include "mongo/db/s/migration_session_id.h" -#include "mongo/db/s/session_catalog_migration_source.h" #include "mongo/s/move_chunk_request.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/memory.h" @@ -73,19 +72,11 @@ public: bool isDocumentInMigratingChunk(const BSONObj& doc) override; - void onInsertOp(OperationContext* opCtx, - const BSONObj& insertedDoc, - const Timestamp& oplogTs) override; + void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) override; - void onUpdateOp(OperationContext* opCtx, - const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) override; + void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) override; - void onDeleteOp(OperationContext* opCtx, - const BSONObj& deletedDocId, - const Timestamp& oplogTs, - const Timestamp& preImageTs) override; + void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) override; // Legacy cloner specific functionality @@ -130,8 +121,6 @@ public: */ Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder); - void nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder); - private: friend class DeleteNotificationStage; friend class LogOpForShardingHandler; @@ -194,8 +183,6 @@ private: // during the cloning stage std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _deleteNotifyExec; - SessionCatalogMigrationSource _sessionCatalogSource; - // Protects the entries below stdx::mutex _mutex; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index d42c6bdaa99..9e3e83774ee 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -216,57 +216,5 @@ public: } transferModsCommand; -/** - * Command for extracting the oplog entries that needs to be migrated for the given migration - * session id. - * Note: this command is not stateless. Calling this command has a side-effect of gradually - * depleting the buffer that contains the oplog entries to be transfered. - */ -class MigrateSessionCommand : public BasicCommand { -public: - MigrateSessionCommand() : BasicCommand("_getNextSessionMods") {} - - void help(std::stringstream& h) const { - h << "internal"; - } - - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { - return false; - } - - virtual bool slaveOk() const { - return false; - } - - virtual bool adminOnly() const { - return true; - } - - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - - bool run(OperationContext* opCtx, - const std::string&, - const BSONObj& cmdObj, - BSONObjBuilder& result) { - const MigrationSessionId migrationSessionId( - uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); - - BSONArrayBuilder arrBuilder; - - AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); - autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder); - - result.appendArray("oplog", arrBuilder.arr()); - return true; - } - -} migrateSessionCommand; - } // namespace } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index a15f6487a33..193c237db5c 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -246,14 +246,14 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) { WriteUnitOfWork wuow(operationContext()); - cloner.onInsertOp(operationContext(), createCollectionDocument(90), {}); - cloner.onInsertOp(operationContext(), createCollectionDocument(150), {}); - cloner.onInsertOp(operationContext(), createCollectionDocument(151), {}); - cloner.onInsertOp(operationContext(), createCollectionDocument(210), {}); - - cloner.onDeleteOp(operationContext(), createCollectionDocument(80), {}, {}); - cloner.onDeleteOp(operationContext(), createCollectionDocument(199), {}, {}); - cloner.onDeleteOp(operationContext(), createCollectionDocument(220), {}, {}); + cloner.onInsertOp(operationContext(), createCollectionDocument(90)); + cloner.onInsertOp(operationContext(), createCollectionDocument(150)); + cloner.onInsertOp(operationContext(), createCollectionDocument(151)); + cloner.onInsertOp(operationContext(), createCollectionDocument(210)); + + cloner.onDeleteOp(operationContext(), createCollectionDocument(80)); + cloner.onDeleteOp(operationContext(), createCollectionDocument(199)); + cloner.onDeleteOp(operationContext(), createCollectionDocument(220)); wuow.commit(); } diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 86688ba08ca..ca2c736a6b1 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -57,8 +57,6 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/chrono.h" #include "mongo/util/concurrency/notification.h" @@ -233,8 +231,6 @@ void MigrationDestinationManager::setStateFail(std::string msg) { _errmsg = std::move(msg); _state = FAIL; } - - _sessionMigration->forceFail(msg); } void MigrationDestinationManager::setStateFailWarn(std::string msg) { @@ -244,8 +240,6 @@ void MigrationDestinationManager::setStateFailWarn(std::string msg) { _errmsg = std::move(msg); _state = FAIL; } - - _sessionMigration->forceFail(msg); } bool MigrationDestinationManager::isActive() const { @@ -340,9 +334,6 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, _migrateThreadHandle.join(); } - _sessionMigration = - stdx::make_unique<SessionCatalogMigrationDestination>(fromShard, *_sessionId); - _migrateThreadHandle = stdx::thread([this, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() { _migrateThread(min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); @@ -406,7 +397,6 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio << _sessionId->toString()}; } - _sessionMigration->finish(); _state = COMMIT_START; auto const deadline = Date_t::now() + Seconds(30); @@ -421,7 +411,6 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio if (_state != DONE) { return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer"}; } - return Status::OK(); } @@ -683,8 +672,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, // 3. Initial bulk clone setState(CLONE); - _sessionMigration->start(opCtx->getServiceContext()); - const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId); _chunkMarkedPending = true; // no lock needed, only the migrate thread looks. @@ -842,15 +829,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, break; } - opCtx->sleepFor(Seconds(1)); + sleepsecs(1); } if (t.minutes() >= 600) { setStateFail("Cannot go to critical section because secondaries cannot keep up"); return; } - - _sessionMigration->waitUntilReadyToCommit(opCtx); } { @@ -910,12 +895,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep5); } - _sessionMigration->join(); - if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) { - setStateFail(_sessionMigration->getErrMsg()); - return; - } - setState(DONE); timing.done(6); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 49c6f1e5089..610f70196c1 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -39,7 +39,6 @@ #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_session_id.h" -#include "mongo/db/s/session_catalog_migration_destination.h" #include "mongo/s/shard_id.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" @@ -212,8 +211,6 @@ private: State _state{READY}; std::string _errmsg; - - std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration; }; } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 1c4c250dac1..c9344031555 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -201,6 +201,7 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx, */ ProcessOplogResult processSessionOplog(OperationContext* opCtx, const BSONObj& oplogBSON, + // const Timestamp& prePostImageTs, const ProcessOplogResult& lastResult) { ProcessOplogResult result; auto oplogEntry = parseOplog(oplogBSON); @@ -239,62 +240,50 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, const auto& sessionInfo = oplogEntry.getOperationSessionInfo(); result.sessionId = sessionInfo.getSessionId().value(); result.txnNum = sessionInfo.getTxnNumber().value(); - const auto stmtId = *oplogEntry.getStatementId(); auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); scopedSession->beginTxn(opCtx, result.txnNum); - if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) { - return lastResult; - } - BSONObj object(result.isPrePostImage ? oplogEntry.getObject() : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum); - writeConflictRetry( - opCtx, - "SessionOplogMigration", - NamespaceString::kSessionTransactionsTableNamespace.ns(), - [&] { - // Need to take global lock here so repl::logOp will not unlock it and trigger the - // invariant that disallows unlocking global lock while inside a WUOW. - // Grab a DBLock here instead of plain GlobalLock to make sure the MMAPV1 flush - // lock will be lock/unlocked correctly. Take the transaction table db lock to - // ensure the same lock ordering with normal replicated updates to the table. - Lock::DBLock lk( - opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX); - WriteUnitOfWork wunit(opCtx); - - result.oplogTime = repl::logOp(opCtx, - "n", - oplogEntry.getNamespace(), - oplogEntry.getUuid(), - object, - &object2, - true, - sessionInfo, - stmtId, - oplogLink); - - auto oplogTs = result.oplogTime.getTimestamp(); - uassert(40633, - str::stream() << "Failed to create new oplog entry for oplog with opTime: " - << oplogEntry.getOpTime().toString() - << ": " - << redact(oplogBSON), - !oplogTs.isNull()); - - // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because - // the next oplog will contain the real operation. - if (!result.isPrePostImage) { - scopedSession->onWriteOpCompletedOnPrimary(opCtx, result.txnNum, {stmtId}, oplogTs); - } - - wunit.commit(); - }); + writeConflictRetry(opCtx, + "SessionOplogMigration", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX); + WriteUnitOfWork wunit(opCtx); + + result.oplogTime = repl::logOp(opCtx, + "n", + oplogEntry.getNamespace(), + oplogEntry.getUuid(), + object, + &object2, + true, + sessionInfo, + *oplogEntry.getStatementId(), + oplogLink); + + auto oplogTs = result.oplogTime.getTimestamp(); + uassert(40633, + str::stream() + << "Failed to create new oplog entry for oplog with opTime: " + << oplogEntry.getOpTime().toString() + << ": " + << redact(oplogBSON), + !oplogTs.isNull()); + + if (!result.isPrePostImage) { + scopedSession->onWriteOpCompletedOnPrimary( + opCtx, result.txnNum, {*oplogEntry.getStatementId()}, oplogTs); + } + + wunit.commit(); + }); return result; } @@ -318,7 +307,6 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_state == State::NotStarted); _state = State::Migrating; - _isStateChanged.notify_all(); } _thread = stdx::thread(stdx::bind( @@ -328,7 +316,6 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { void SessionCatalogMigrationDestination::finish() { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::Committing; - _isStateChanged.notify_all(); } void SessionCatalogMigrationDestination::join() { @@ -358,7 +345,6 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service // Timestamp prePostImageTs; ProcessOplogResult lastResult; - repl::OpTime lastOpTimeWaited; while (true) { { @@ -395,17 +381,8 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service // Note: only transition to "ready to commit" if state is not error/force stop. if (_state == State::Migrating) { _state = State::ReadyToCommit; - _isStateChanged.notify_all(); } } - - if (lastOpTimeWaited == lastResult.oplogTime) { - // We got an empty result at least twice in a row from the source shard so - // space it out a little bit so we don't hammer the shard. - opCtx->sleepFor(Milliseconds(200)); - } - - lastOpTimeWaited = lastResult.oplogTime; } while (oplogIter.more()) { @@ -434,7 +411,6 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::Done; - _isStateChanged.notify_all(); } } @@ -447,8 +423,6 @@ void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::ErrorOccurred; _errMsg = errMsg.toString(); - - _isStateChanged.notify_all(); } SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::getState() { @@ -456,15 +430,4 @@ SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::ge return _state; } -void SessionCatalogMigrationDestination::forceFail(std::string& errMsg) { - _errorOccurred(errMsg); -} - -void SessionCatalogMigrationDestination::waitUntilReadyToCommit(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (_state == State::Migrating) { - opCtx->waitForConditionOrInterrupt(_isStateChanged, lk); - } -} - } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h index a67bd8012f6..2e740f2f6bb 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.h +++ b/src/mongo/db/s/session_catalog_migration_destination.h @@ -37,7 +37,6 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/s/shard_id.h" -#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/with_lock.h" @@ -89,17 +88,6 @@ public: void join(); /** - * Forces this into an error state which will also stop session transfer thread. - */ - void forceFail(std::string& errMsg); - - /** - * Blocks until state changes is not Migrating. In other words, can return when state - * becomes ReadyToCommit/Done/ErrorOccurred, etc. - */ - void waitUntilReadyToCommit(OperationContext* opCtx); - - /** * Returns the current state. */ State getState(); @@ -121,7 +109,6 @@ private: // Protects _state and _errMsg. stdx::mutex _mutex; - stdx::condition_variable _isStateChanged; State _state = State::NotStarted; std::string _errMsg; // valid only if _state == ErrorOccurred. }; diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 1ec90ca88b1..a1a76dfbd1d 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -1303,59 +1303,6 @@ TEST_F(SessionCatalogMigrationDestinationTest, ASSERT_FALSE(sessionMigration.getErrMsg().empty()); } -TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatements) { - const NamespaceString kNs("a.b"); - const auto sessionId = makeLogicalSessionIdForTest(); - - auto opCtx = operationContext(); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(sessionId); - sessionInfo.setTxnNumber(19); - - insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), 30); - - SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); - sessionMigration.start(getServiceContext()); - sessionMigration.finish(); - - OplogEntry oplog1( - OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); - oplog1.setOperationSessionInfo(sessionInfo); - oplog1.setStatementId(23); - - OplogEntry oplog2(OpTime(Timestamp(70, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); - oplog2.setOperationSessionInfo(sessionInfo); - oplog2.setStatementId(30); - - OplogEntry oplog3(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); - oplog3.setOperationSessionInfo(sessionInfo); - oplog3.setStatementId(45); - - returnOplog({oplog1, oplog2, oplog3}); - returnOplog({}); - - sessionMigration.join(); - - ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); - - auto session = getSessionWithTxn(opCtx, sessionId, 19); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(19)); - - ASSERT_TRUE(historyIter.hasNext()); - checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); - - ASSERT_TRUE(historyIter.hasNext()); - checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); - - ASSERT_TRUE(historyIter.hasNext()); - auto firstInsertOplog = historyIter.next(opCtx); - - ASSERT_TRUE(firstInsertOplog.getOpType() == OpTypeEnum::kInsert); - ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject()); - ASSERT_TRUE(firstInsertOplog.getStatementId()); - ASSERT_EQ(30, *firstInsertOplog.getStatementId()); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 829d862b370..e79656ba94b 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -76,6 +76,7 @@ BSONObj SessionCatalogMigrationSource::getLastFetchedOplog() { { stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); + invariant(!_lastFetchedNewWriteOplog.isEmpty()); return _lastFetchedNewWriteOplog; } } @@ -100,13 +101,11 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte return false; } - auto nextOplogBSON = nextOplog.toBSON().getOwned(); + _lastFetchedOplog = nextOplog.toBSON().getOwned(); + auto doc = fetchPrePostImageOplog(opCtx, nextOplog); if (!doc.isEmpty()) { - _lastFetchedOplogBuffer.push_back(nextOplogBSON); - _lastFetchedOplog = doc; - } else { - _lastFetchedOplog = nextOplogBSON; + _lastFetchedOplogBuffer.push_back(doc); } return true; diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 5e44531d31c..42a3e71e372 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -68,7 +68,6 @@ public: /** * Returns the oplog document that was last fetched by the fetchNextOplog call. - * Returns an empty object if there are no oplog to fetch. */ BSONObj getLastFetchedOplog(); diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index ff43cce5d0b..ad4942292ea 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -216,7 +216,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd SessionCatalogMigrationSource migrationSource(kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - auto expectedSequece = {entry3.toBSON(), entry4.toBSON(), entry1.toBSON(), entry2.toBSON()}; + auto expectedSequece = {entry4.toBSON(), entry3.toBSON(), entry2.toBSON(), entry1.toBSON()}; for (auto oplogDoc : expectedSequece) { ASSERT_TRUE(migrationSource.hasMoreOplog()); |