summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp176
1 files changed, 131 insertions, 45 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 0b68b839712..2272599806e 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -33,20 +33,26 @@
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
+#include <fmt/format.h>
+
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/ops/write_ops_retryability.h"
+#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/migration_source_manager.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/shard_key_index_util.h"
#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/s/sharding_statistics.h"
@@ -69,6 +75,8 @@
namespace mongo {
namespace {
+using namespace fmt::literals;
+
const char kRecvChunkStatus[] = "_recvChunkStatus";
const char kRecvChunkCommit[] = "_recvChunkCommit";
const char kRecvChunkAbort[] = "_recvChunkAbort";
@@ -107,9 +115,8 @@ BSONObj createRequestWithSessionId(StringData commandName,
return builder.obj();
}
-BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation,
- repl::OpTypeEnum opType) {
- switch (opType) {
+BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation) {
+ switch (replOperation.getOpType()) {
case repl::OpTypeEnum::kInsert:
case repl::OpTypeEnum::kDelete:
return replOperation.getObject();
@@ -168,6 +175,30 @@ private:
const repl::OpTime _opTime;
};
+LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler(
+ LogicalSessionId lsid,
+ const std::vector<repl::OplogEntry>& stmts,
+ repl::OpTime prepareOrCommitOpTime)
+ : _lsid(std::move(lsid)), _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {
+ _stmts.reserve(stmts.size());
+ _ownedReplBSONObj.reserve(stmts.size());
+
+ for (const auto& op : stmts) {
+ auto ownedBSON = op.getDurableReplOperation().toBSON().getOwned();
+ _ownedReplBSONObj.push_back(ownedBSON);
+ _stmts.push_back(
+ repl::ReplOperation::parse({"MigrationChunkClonerSource_toReplOperation"}, ownedBSON));
+ }
+}
+
+LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler(
+ LogicalSessionId lsid,
+ const std::vector<repl::ReplOperation>& stmts,
+ repl::OpTime prepareOrCommitOpTime)
+ : _lsid(std::move(lsid)),
+ _stmts(stmts),
+ _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {}
+
void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) {
std::set<NamespaceString> namespacesTouchedByTransaction;
@@ -225,7 +256,7 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
continue;
}
- auto preImageDocKey = getDocumentKeyFromReplOperation(stmt, opType);
+ auto preImageDocKey = getDocumentKeyFromReplOperation(stmt);
auto idElement = preImageDocKey["_id"];
if (idElement.eoo()) {
@@ -235,52 +266,35 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
continue;
}
- auto const& minKey = cloner->_args.getMin().get();
- auto const& maxKey = cloner->_args.getMax().get();
- auto const& shardKeyPattern = cloner->_shardKeyPattern;
-
- // Note: This assumes that prepared transactions will always have post document key
- // set. There is a small window where create collection coordinator releases the critical
- // section and before it writes down the chunks for non-empty collections. So in theory,
- // it is possible to have a prepared transaction while collection is unsharded
- // and becomes sharded midway. This doesn't happen in practice because the only way to
- // have a prepared transactions without being sharded is by directly connecting to the
- // shards and manually preparing the transaction. Another exception is when transaction
- // is prepared on an older version that doesn't set the post image document key.
- auto postImageDocKey = stmt.getPostImageDocumentKey();
- if (postImageDocKey.isEmpty()) {
- LOGV2_WARNING(
- 6836102,
- "Migration encountered a transaction operation without a post image document key",
- "preImageDocKey"_attr = preImageDocKey);
- } else {
- auto postShardKeyValues =
- shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey);
- fassert(6836100, !postShardKeyValues.isEmpty());
-
- if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) {
- // If the preImageDoc is not in range but the postImageDoc was, we know that the
- // document has changed shard keys and no longer belongs in the chunk being cloned.
- // We will model the deletion of the preImage document so that the destination chunk
- // does not receive an outdated version of this document.
-
- auto preImageShardKeyValues =
- shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
- fassert(6836101, !preImageShardKeyValues.isEmpty());
-
- if (opType == repl::OpTypeEnum::kUpdate &&
- isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) {
- opType = repl::OpTypeEnum::kDelete;
- idElement = postImageDocKey["_id"];
- } else {
+ if (opType == repl::OpTypeEnum::kUpdate) {
+ auto const& shardKeyPattern = cloner->_shardKeyPattern;
+ auto preImageShardKeyValues =
+ shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
+
+ // If prepare was performed from another term, we will not have the post image doc key
+ // since it is not persisted in the oplog.
+ auto postImageDocKey = stmt.getPostImageDocumentKey();
+ if (!postImageDocKey.isEmpty()) {
+ if (!cloner->_processUpdateForXferMod(preImageDocKey, postImageDocKey)) {
+ // We don't need to add this op to session migration if neither post or pre
+ // image doc falls within the chunk range.
continue;
}
+ } else {
+ // We can't perform reads here using the same recovery unit because the transaction
+ // is already committed. We instead defer performing the reads when xferMods command
+ // is called. Also allow this op to be added to session migration since we can't
+ // tell whether post image doc will fall within the chunk range. If it turns out
+ // both preImage and postImage doc don't fall into the chunk range, it is not wrong
+ // for this op to be added to session migration, but it will result in wasted work
+ // and unneccesary extra oplog storage on the destination.
+ cloner->_deferProcessingForXferMod(preImageDocKey);
}
+ } else {
+ cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {});
}
addToSessionMigrationOptimeQueueIfNeeded(cloner, nss, _prepareOrCommitOpTime);
-
- cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {});
}
}
@@ -800,11 +814,78 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
return Status::OK();
}
+bool MigrationChunkClonerSourceLegacy::_processUpdateForXferMod(const BSONObj& preImageDocKey,
+ const BSONObj& postImageDocKey) {
+ auto const& minKey = _args.getMin().value();
+ auto const& maxKey = _args.getMax().value();
+
+ auto postShardKeyValues = _shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey);
+ fassert(6836100, !postShardKeyValues.isEmpty());
+
+ auto opType = repl::OpTypeEnum::kUpdate;
+ auto idElement = preImageDocKey["_id"];
+
+ if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) {
+ // If the preImageDoc is not in range but the postImageDoc was, we know that the
+ // document has changed shard keys and no longer belongs in the chunk being cloned.
+ // We will model the deletion of the preImage document so that the destination chunk
+ // does not receive an outdated version of this document.
+
+ auto preImageShardKeyValues =
+ _shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
+ fassert(6836101, !preImageShardKeyValues.isEmpty());
+
+ if (!isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) {
+ return false;
+ }
+
+ opType = repl::OpTypeEnum::kDelete;
+ idElement = postImageDocKey["_id"];
+ }
+
+ _addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {});
+
+ return true;
+}
+
+void MigrationChunkClonerSourceLegacy::_deferProcessingForXferMod(const BSONObj& preImageDocKey) {
+ stdx::lock_guard<Latch> sl(_mutex);
+ _deferredReloadOrDeletePreImageDocKeys.push_back(preImageDocKey.getOwned());
+ _deferredUntransferredOpsCounter++;
+}
+
+void MigrationChunkClonerSourceLegacy::_processDeferredXferMods(OperationContext* opCtx,
+ Database* db) {
+ std::vector<BSONObj> deferredReloadOrDeletePreImageDocKeys;
+
+ {
+ stdx::unique_lock lk(_mutex);
+ deferredReloadOrDeletePreImageDocKeys.swap(_deferredReloadOrDeletePreImageDocKeys);
+ }
+
+ for (const auto& preImageDocKey : deferredReloadOrDeletePreImageDocKeys) {
+ auto idElement = preImageDocKey["_id"];
+ BSONObj newerVersionDoc;
+ if (!Helpers::findById(opCtx, db, nss().ns(), BSON("_id" << idElement), newerVersionDoc)) {
+ // If the document can no longer be found, this means that another later op must have
+ // deleted it. That delete would have been captured by the xferMods so nothing else to
+ // do here.
+ continue;
+ }
+
+ auto postImageDocKey =
+ CollectionMetadata::extractDocumentKey(&_shardKeyPattern, newerVersionDoc);
+ static_cast<void>(_processUpdateForXferMod(preImageDocKey, postImageDocKey));
+ }
+}
+
Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
Database* db,
BSONObjBuilder* builder) {
dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IS));
+ _processDeferredXferMods(opCtx, db);
+
std::list<BSONObj> deleteList;
std::list<BSONObj> updateList;
@@ -849,6 +930,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
_untransferredDeletesCounter = _deleted.size();
_reload.splice(_reload.cbegin(), updateList);
_untransferredUpsertsCounter = _reload.size();
+ _deferredUntransferredOpsCounter = _deferredReloadOrDeletePreImageDocKeys.size();
return Status::OK();
}
@@ -863,6 +945,8 @@ void MigrationChunkClonerSourceLegacy::_cleanup() {
_untransferredUpsertsCounter = 0;
_deleted.clear();
_untransferredDeletesCounter = 0;
+ _deferredReloadOrDeletePreImageDocKeys.clear();
+ _deferredUntransferredOpsCounter = 0;
}
StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(OperationContext* opCtx,
@@ -1111,7 +1195,8 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
stdx::lock_guard<Latch> sl(_mutex);
int64_t untransferredModsSizeBytes = _untransferredDeletesCounter * _averageObjectIdSize +
- _untransferredUpsertsCounter * _averageObjectSizeForCloneLocs;
+ (_untransferredUpsertsCounter + _deferredUntransferredOpsCounter) *
+ _averageObjectSizeForCloneLocs;
if (_forceJumbo && _jumboChunkCloneState) {
LOGV2(21992,
@@ -1177,6 +1262,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"moveChunk data transfer within threshold to allow write blocking",
"_untransferredUpsertsCounter"_attr = _untransferredUpsertsCounter,
"_untransferredDeletesCounter"_attr = _untransferredDeletesCounter,
+ "_deferredUntransferredOpsCounter"_attr = _deferredUntransferredOpsCounter,
"_averageObjectSizeForCloneLocs"_attr = _averageObjectSizeForCloneLocs,
"_averageObjectIdSize"_attr = _averageObjectIdSize,
"untransferredModsSizeBytes"_attr = untransferredModsSizeBytes,