diff options
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/collection_metadata.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/s/collection_metadata.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/config_server_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 176 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 38 | ||||
-rw-r--r-- | src/mongo/db/s/op_observer_sharding_impl.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/op_observer_sharding_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.h | 4 |
9 files changed, 204 insertions, 56 deletions
diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp index ea9f6f0c7c1..6b0fb689356 100644 --- a/src/mongo/db/s/collection_metadata.cpp +++ b/src/mongo/db/s/collection_metadata.cpp @@ -107,13 +107,13 @@ void CollectionMetadata::throwIfReshardingInProgress(NamespaceString const& nss) } } -BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const { +BSONObj CollectionMetadata::extractDocumentKey(const ShardKeyPattern* shardKeyPattern, + const BSONObj& doc) { BSONObj key; - if (isSharded()) { - auto const& pattern = _cm->getShardKeyPattern(); - key = dotted_path_support::extractElementsBasedOnTemplate(doc, pattern.toBSON()); - if (pattern.hasId()) { + if (shardKeyPattern) { + key = dotted_path_support::extractElementsBasedOnTemplate(doc, shardKeyPattern->toBSON()); + if (shardKeyPattern->hasId()) { return key; } // else, try to append an _id field from the document. @@ -127,6 +127,10 @@ BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const { return doc; } +BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const { + return extractDocumentKey(isSharded() ? &_cm->getShardKeyPattern() : nullptr, doc); +} + std::string CollectionMetadata::toStringBasic() const { if (isSharded()) { return str::stream() << "collection version: " << _cm->getVersion().toString() diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h index b691f94ebe5..c2b323c0cc7 100644 --- a/src/mongo/db/s/collection_metadata.h +++ b/src/mongo/db/s/collection_metadata.h @@ -162,6 +162,12 @@ public: BSONObj extractDocumentKey(const BSONObj& doc) const; /** + * Static version of the function above. Only use this for internal sharding operations where + * shard key pattern is fixed and cannot change. + */ + static BSONObj extractDocumentKey(const ShardKeyPattern* shardKeyPattern, const BSONObj& doc); + + /** * String output of the collection and shard versions. */ std::string toStringBasic() const; diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 18e83a4b994..2acd0017ab7 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -208,6 +208,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) override {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) override {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 0b68b839712..2272599806e 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -33,20 +33,26 @@ #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" +#include <fmt/format.h> + #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/client/read_preference.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/ops/write_ops_retryability.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_source_manager.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_key_index_util.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_statistics.h" @@ -69,6 +75,8 @@ namespace mongo { namespace { +using namespace fmt::literals; + const char kRecvChunkStatus[] = "_recvChunkStatus"; const char kRecvChunkCommit[] = "_recvChunkCommit"; const char kRecvChunkAbort[] = "_recvChunkAbort"; @@ -107,9 +115,8 @@ BSONObj createRequestWithSessionId(StringData commandName, return builder.obj(); } -BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation, - repl::OpTypeEnum opType) { - switch (opType) { +BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation) { + switch (replOperation.getOpType()) { case repl::OpTypeEnum::kInsert: case repl::OpTypeEnum::kDelete: return replOperation.getObject(); @@ -168,6 +175,30 @@ private: const repl::OpTime _opTime; }; +LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler( + LogicalSessionId lsid, + const std::vector<repl::OplogEntry>& stmts, + repl::OpTime prepareOrCommitOpTime) + : _lsid(std::move(lsid)), _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) { + _stmts.reserve(stmts.size()); + _ownedReplBSONObj.reserve(stmts.size()); + + for (const auto& op : stmts) { + auto ownedBSON = op.getDurableReplOperation().toBSON().getOwned(); + _ownedReplBSONObj.push_back(ownedBSON); + _stmts.push_back( + repl::ReplOperation::parse({"MigrationChunkClonerSource_toReplOperation"}, ownedBSON)); + } +} + +LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler( + LogicalSessionId lsid, + const std::vector<repl::ReplOperation>& stmts, + repl::OpTime prepareOrCommitOpTime) + : _lsid(std::move(lsid)), + _stmts(stmts), + _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {} + void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) { std::set<NamespaceString> namespacesTouchedByTransaction; @@ -225,7 +256,7 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam continue; } - auto preImageDocKey = getDocumentKeyFromReplOperation(stmt, opType); + auto preImageDocKey = getDocumentKeyFromReplOperation(stmt); auto idElement = preImageDocKey["_id"]; if (idElement.eoo()) { @@ -235,52 +266,35 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam continue; } - auto const& minKey = cloner->_args.getMin().get(); - auto const& maxKey = cloner->_args.getMax().get(); - auto const& shardKeyPattern = cloner->_shardKeyPattern; - - // Note: This assumes that prepared transactions will always have post document key - // set. There is a small window where create collection coordinator releases the critical - // section and before it writes down the chunks for non-empty collections. So in theory, - // it is possible to have a prepared transaction while collection is unsharded - // and becomes sharded midway. This doesn't happen in practice because the only way to - // have a prepared transactions without being sharded is by directly connecting to the - // shards and manually preparing the transaction. Another exception is when transaction - // is prepared on an older version that doesn't set the post image document key. - auto postImageDocKey = stmt.getPostImageDocumentKey(); - if (postImageDocKey.isEmpty()) { - LOGV2_WARNING( - 6836102, - "Migration encountered a transaction operation without a post image document key", - "preImageDocKey"_attr = preImageDocKey); - } else { - auto postShardKeyValues = - shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey); - fassert(6836100, !postShardKeyValues.isEmpty()); - - if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) { - // If the preImageDoc is not in range but the postImageDoc was, we know that the - // document has changed shard keys and no longer belongs in the chunk being cloned. - // We will model the deletion of the preImage document so that the destination chunk - // does not receive an outdated version of this document. - - auto preImageShardKeyValues = - shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey); - fassert(6836101, !preImageShardKeyValues.isEmpty()); - - if (opType == repl::OpTypeEnum::kUpdate && - isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) { - opType = repl::OpTypeEnum::kDelete; - idElement = postImageDocKey["_id"]; - } else { + if (opType == repl::OpTypeEnum::kUpdate) { + auto const& shardKeyPattern = cloner->_shardKeyPattern; + auto preImageShardKeyValues = + shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey); + + // If prepare was performed from another term, we will not have the post image doc key + // since it is not persisted in the oplog. + auto postImageDocKey = stmt.getPostImageDocumentKey(); + if (!postImageDocKey.isEmpty()) { + if (!cloner->_processUpdateForXferMod(preImageDocKey, postImageDocKey)) { + // We don't need to add this op to session migration if neither post or pre + // image doc falls within the chunk range. continue; } + } else { + // We can't perform reads here using the same recovery unit because the transaction + // is already committed. We instead defer performing the reads when xferMods command + // is called. Also allow this op to be added to session migration since we can't + // tell whether post image doc will fall within the chunk range. If it turns out + // both preImage and postImage doc don't fall into the chunk range, it is not wrong + // for this op to be added to session migration, but it will result in wasted work + // and unneccesary extra oplog storage on the destination. + cloner->_deferProcessingForXferMod(preImageDocKey); } + } else { + cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}); } addToSessionMigrationOptimeQueueIfNeeded(cloner, nss, _prepareOrCommitOpTime); - - cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}); } } @@ -800,11 +814,78 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, return Status::OK(); } +bool MigrationChunkClonerSourceLegacy::_processUpdateForXferMod(const BSONObj& preImageDocKey, + const BSONObj& postImageDocKey) { + auto const& minKey = _args.getMin().value(); + auto const& maxKey = _args.getMax().value(); + + auto postShardKeyValues = _shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey); + fassert(6836100, !postShardKeyValues.isEmpty()); + + auto opType = repl::OpTypeEnum::kUpdate; + auto idElement = preImageDocKey["_id"]; + + if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) { + // If the preImageDoc is not in range but the postImageDoc was, we know that the + // document has changed shard keys and no longer belongs in the chunk being cloned. + // We will model the deletion of the preImage document so that the destination chunk + // does not receive an outdated version of this document. + + auto preImageShardKeyValues = + _shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey); + fassert(6836101, !preImageShardKeyValues.isEmpty()); + + if (!isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) { + return false; + } + + opType = repl::OpTypeEnum::kDelete; + idElement = postImageDocKey["_id"]; + } + + _addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}); + + return true; +} + +void MigrationChunkClonerSourceLegacy::_deferProcessingForXferMod(const BSONObj& preImageDocKey) { + stdx::lock_guard<Latch> sl(_mutex); + _deferredReloadOrDeletePreImageDocKeys.push_back(preImageDocKey.getOwned()); + _deferredUntransferredOpsCounter++; +} + +void MigrationChunkClonerSourceLegacy::_processDeferredXferMods(OperationContext* opCtx, + Database* db) { + std::vector<BSONObj> deferredReloadOrDeletePreImageDocKeys; + + { + stdx::unique_lock lk(_mutex); + deferredReloadOrDeletePreImageDocKeys.swap(_deferredReloadOrDeletePreImageDocKeys); + } + + for (const auto& preImageDocKey : deferredReloadOrDeletePreImageDocKeys) { + auto idElement = preImageDocKey["_id"]; + BSONObj newerVersionDoc; + if (!Helpers::findById(opCtx, db, nss().ns(), BSON("_id" << idElement), newerVersionDoc)) { + // If the document can no longer be found, this means that another later op must have + // deleted it. That delete would have been captured by the xferMods so nothing else to + // do here. + continue; + } + + auto postImageDocKey = + CollectionMetadata::extractDocumentKey(&_shardKeyPattern, newerVersionDoc); + static_cast<void>(_processUpdateForXferMod(preImageDocKey, postImageDocKey)); + } +} + Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder) { dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IS)); + _processDeferredXferMods(opCtx, db); + std::list<BSONObj> deleteList; std::list<BSONObj> updateList; @@ -849,6 +930,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, _untransferredDeletesCounter = _deleted.size(); _reload.splice(_reload.cbegin(), updateList); _untransferredUpsertsCounter = _reload.size(); + _deferredUntransferredOpsCounter = _deferredReloadOrDeletePreImageDocKeys.size(); return Status::OK(); } @@ -863,6 +945,8 @@ void MigrationChunkClonerSourceLegacy::_cleanup() { _untransferredUpsertsCounter = 0; _deleted.clear(); _untransferredDeletesCounter = 0; + _deferredReloadOrDeletePreImageDocKeys.clear(); + _deferredUntransferredOpsCounter = 0; } StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(OperationContext* opCtx, @@ -1111,7 +1195,8 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC stdx::lock_guard<Latch> sl(_mutex); int64_t untransferredModsSizeBytes = _untransferredDeletesCounter * _averageObjectIdSize + - _untransferredUpsertsCounter * _averageObjectSizeForCloneLocs; + (_untransferredUpsertsCounter + _deferredUntransferredOpsCounter) * + _averageObjectSizeForCloneLocs; if (_forceJumbo && _jumboChunkCloneState) { LOGV2(21992, @@ -1177,6 +1262,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC "moveChunk data transfer within threshold to allow write blocking", "_untransferredUpsertsCounter"_attr = _untransferredUpsertsCounter, "_untransferredDeletesCounter"_attr = _untransferredDeletesCounter, + "_deferredUntransferredOpsCounter"_attr = _deferredUntransferredOpsCounter, "_averageObjectSizeForCloneLocs"_attr = _averageObjectSizeForCloneLocs, "_averageObjectIdSize"_attr = _averageObjectIdSize, "untransferredModsSizeBytes"_attr = untransferredModsSizeBytes, diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 803c1f512af..4fff7da8d17 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -67,13 +67,13 @@ const long long kFixedCommandOverhead = 32 * 1024; */ class LogTransactionOperationsForShardingHandler final : public RecoveryUnit::Change { public: - /** - * Invariant: idObj should belong to a document that is part of the active chunk being migrated - */ - LogTransactionOperationsForShardingHandler(const LogicalSessionId lsid, + LogTransactionOperationsForShardingHandler(LogicalSessionId lsid, + const std::vector<repl::OplogEntry>& stmts, + repl::OpTime prepareOrCommitOpTime); + + LogTransactionOperationsForShardingHandler(LogicalSessionId lsid, const std::vector<repl::ReplOperation>& stmts, - const repl::OpTime& prepareOrCommitOpTime) - : _lsid(lsid), _stmts(stmts), _prepareOrCommitOpTime(prepareOrCommitOpTime) {} + repl::OpTime prepareOrCommitOpTime); void commit(boost::optional<Timestamp>) override; @@ -81,6 +81,8 @@ public: private: const LogicalSessionId _lsid; + // Use to keep BSON obj alive for the lifetime of this object. + std::vector<BSONObj> _ownedReplBSONObj; std::vector<repl::ReplOperation> _stmts; const repl::OpTime _prepareOrCommitOpTime; }; @@ -486,6 +488,23 @@ private: */ Status _checkRecipientCloningStatus(OperationContext* opCtx, Milliseconds maxTimeToWait); + /** + * Inspects the pre and post image document keys and determines which xferMods bucket to + * add a new entry. Returns false if neither pre or post image document keys fall into + * the chunk boundaries being migrated. + */ + bool _processUpdateForXferMod(const BSONObj& preImageDocKey, const BSONObj& postImageDocKey); + + /** + * Defer processing of update ops into xferMods entries to when nextModsBatch is called. + */ + void _deferProcessingForXferMod(const BSONObj& preImageDocKey); + + /** + * Converts all deferred update ops captured by the op observer into xferMods entries. + */ + void _processDeferredXferMods(OperationContext* opCtx, Database* database); + // The original move range request const ShardsvrMoveRange _args; @@ -547,6 +566,13 @@ private: // Amount of delete xfer mods that have not yet reached the recipient. size_t _untransferredDeletesCounter{0}; + // Amount of ops that are yet to be converted to update/delete xferMods. + size_t _deferredUntransferredOpsCounter{0}; + + // Stores document keys of document that needs to be examined if we need to put in to xferMods + // list later. + std::vector<BSONObj> _deferredReloadOrDeletePreImageDocKeys; + // Total bytes in _reload + _deleted (xfer mods) uint64_t _memoryUsed{0}; diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp index ab8ce8ca5e8..860fe7ad050 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.cpp +++ b/src/mongo/db/s/op_observer_sharding_impl.cpp @@ -241,4 +241,14 @@ void OpObserverShardingImpl::shardObserveTransactionPrepareOrUnpreparedCommit( *opCtx->getLogicalSessionId(), stmts, prepareOrCommitOptime)); } +void OpObserverShardingImpl::shardObserveNonPrimaryTransactionPrepare( + OperationContext* opCtx, + const std::vector<repl::OplogEntry>& stmts, + const repl::OpTime& prepareOrCommitOptime) { + + opCtx->recoveryUnit()->registerChange( + std::make_unique<LogTransactionOperationsForShardingHandler>( + *opCtx->getLogicalSessionId(), stmts, prepareOrCommitOptime)); +} + } // namespace mongo diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h index f9005497c57..d7295482332 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.h +++ b/src/mongo/db/s/op_observer_sharding_impl.h @@ -74,6 +74,10 @@ protected: OperationContext* opCtx, const std::vector<repl::ReplOperation>& stmts, const repl::OpTime& prepareOrCommitOptime) override; + void shardObserveNonPrimaryTransactionPrepare( + OperationContext* opCtx, + const std::vector<repl::OplogEntry>& stmts, + const repl::OpTime& prepareOrCommitOptime) override; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h index 30d319a041d..e8affe3ef4a 100644 --- a/src/mongo/db/s/resharding/resharding_op_observer.h +++ b/src/mongo/db/s/resharding/resharding_op_observer.h @@ -228,6 +228,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) override {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) override {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 5a0671254d6..78e84b664cf 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -207,6 +207,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) override {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) override {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} |