summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/collection_metadata.cpp14
-rw-r--r--src/mongo/db/s/collection_metadata.h6
-rw-r--r--src/mongo/db/s/config_server_op_observer.h4
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp176
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h38
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.cpp10
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.h4
-rw-r--r--src/mongo/db/s/resharding/resharding_op_observer.h4
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h4
9 files changed, 204 insertions, 56 deletions
diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp
index ea9f6f0c7c1..6b0fb689356 100644
--- a/src/mongo/db/s/collection_metadata.cpp
+++ b/src/mongo/db/s/collection_metadata.cpp
@@ -107,13 +107,13 @@ void CollectionMetadata::throwIfReshardingInProgress(NamespaceString const& nss)
}
}
-BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const {
+BSONObj CollectionMetadata::extractDocumentKey(const ShardKeyPattern* shardKeyPattern,
+ const BSONObj& doc) {
BSONObj key;
- if (isSharded()) {
- auto const& pattern = _cm->getShardKeyPattern();
- key = dotted_path_support::extractElementsBasedOnTemplate(doc, pattern.toBSON());
- if (pattern.hasId()) {
+ if (shardKeyPattern) {
+ key = dotted_path_support::extractElementsBasedOnTemplate(doc, shardKeyPattern->toBSON());
+ if (shardKeyPattern->hasId()) {
return key;
}
// else, try to append an _id field from the document.
@@ -127,6 +127,10 @@ BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const {
return doc;
}
+BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const {
+ return extractDocumentKey(isSharded() ? &_cm->getShardKeyPattern() : nullptr, doc);
+}
+
std::string CollectionMetadata::toStringBasic() const {
if (isSharded()) {
return str::stream() << "collection version: " << _cm->getVersion().toString()
diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h
index b691f94ebe5..c2b323c0cc7 100644
--- a/src/mongo/db/s/collection_metadata.h
+++ b/src/mongo/db/s/collection_metadata.h
@@ -162,6 +162,12 @@ public:
BSONObj extractDocumentKey(const BSONObj& doc) const;
/**
+ * Static version of the function above. Only use this for internal sharding operations where
+ * shard key pattern is fixed and cannot change.
+ */
+ static BSONObj extractDocumentKey(const ShardKeyPattern* shardKeyPattern, const BSONObj& doc);
+
+ /**
* String output of the collection and shard versions.
*/
std::string toStringBasic() const;
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index 18e83a4b994..2acd0017ab7 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -208,6 +208,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) override {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 0b68b839712..2272599806e 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -33,20 +33,26 @@
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
+#include <fmt/format.h>
+
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/ops/write_ops_retryability.h"
+#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/migration_source_manager.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/shard_key_index_util.h"
#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/s/sharding_statistics.h"
@@ -69,6 +75,8 @@
namespace mongo {
namespace {
+using namespace fmt::literals;
+
const char kRecvChunkStatus[] = "_recvChunkStatus";
const char kRecvChunkCommit[] = "_recvChunkCommit";
const char kRecvChunkAbort[] = "_recvChunkAbort";
@@ -107,9 +115,8 @@ BSONObj createRequestWithSessionId(StringData commandName,
return builder.obj();
}
-BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation,
- repl::OpTypeEnum opType) {
- switch (opType) {
+BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation) {
+ switch (replOperation.getOpType()) {
case repl::OpTypeEnum::kInsert:
case repl::OpTypeEnum::kDelete:
return replOperation.getObject();
@@ -168,6 +175,30 @@ private:
const repl::OpTime _opTime;
};
+LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler(
+ LogicalSessionId lsid,
+ const std::vector<repl::OplogEntry>& stmts,
+ repl::OpTime prepareOrCommitOpTime)
+ : _lsid(std::move(lsid)), _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {
+ _stmts.reserve(stmts.size());
+ _ownedReplBSONObj.reserve(stmts.size());
+
+ for (const auto& op : stmts) {
+ auto ownedBSON = op.getDurableReplOperation().toBSON().getOwned();
+ _ownedReplBSONObj.push_back(ownedBSON);
+ _stmts.push_back(
+ repl::ReplOperation::parse({"MigrationChunkClonerSource_toReplOperation"}, ownedBSON));
+ }
+}
+
+LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler(
+ LogicalSessionId lsid,
+ const std::vector<repl::ReplOperation>& stmts,
+ repl::OpTime prepareOrCommitOpTime)
+ : _lsid(std::move(lsid)),
+ _stmts(stmts),
+ _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {}
+
void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) {
std::set<NamespaceString> namespacesTouchedByTransaction;
@@ -225,7 +256,7 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
continue;
}
- auto preImageDocKey = getDocumentKeyFromReplOperation(stmt, opType);
+ auto preImageDocKey = getDocumentKeyFromReplOperation(stmt);
auto idElement = preImageDocKey["_id"];
if (idElement.eoo()) {
@@ -235,52 +266,35 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
continue;
}
- auto const& minKey = cloner->_args.getMin().get();
- auto const& maxKey = cloner->_args.getMax().get();
- auto const& shardKeyPattern = cloner->_shardKeyPattern;
-
- // Note: This assumes that prepared transactions will always have post document key
- // set. There is a small window where create collection coordinator releases the critical
- // section and before it writes down the chunks for non-empty collections. So in theory,
- // it is possible to have a prepared transaction while collection is unsharded
- // and becomes sharded midway. This doesn't happen in practice because the only way to
- // have a prepared transactions without being sharded is by directly connecting to the
- // shards and manually preparing the transaction. Another exception is when transaction
- // is prepared on an older version that doesn't set the post image document key.
- auto postImageDocKey = stmt.getPostImageDocumentKey();
- if (postImageDocKey.isEmpty()) {
- LOGV2_WARNING(
- 6836102,
- "Migration encountered a transaction operation without a post image document key",
- "preImageDocKey"_attr = preImageDocKey);
- } else {
- auto postShardKeyValues =
- shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey);
- fassert(6836100, !postShardKeyValues.isEmpty());
-
- if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) {
- // If the preImageDoc is not in range but the postImageDoc was, we know that the
- // document has changed shard keys and no longer belongs in the chunk being cloned.
- // We will model the deletion of the preImage document so that the destination chunk
- // does not receive an outdated version of this document.
-
- auto preImageShardKeyValues =
- shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
- fassert(6836101, !preImageShardKeyValues.isEmpty());
-
- if (opType == repl::OpTypeEnum::kUpdate &&
- isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) {
- opType = repl::OpTypeEnum::kDelete;
- idElement = postImageDocKey["_id"];
- } else {
+ if (opType == repl::OpTypeEnum::kUpdate) {
+ auto const& shardKeyPattern = cloner->_shardKeyPattern;
+ auto preImageShardKeyValues =
+ shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
+
+ // If prepare was performed from another term, we will not have the post image doc key
+ // since it is not persisted in the oplog.
+ auto postImageDocKey = stmt.getPostImageDocumentKey();
+ if (!postImageDocKey.isEmpty()) {
+ if (!cloner->_processUpdateForXferMod(preImageDocKey, postImageDocKey)) {
+ // We don't need to add this op to session migration if neither post or pre
+ // image doc falls within the chunk range.
continue;
}
+ } else {
+ // We can't perform reads here using the same recovery unit because the transaction
+ // is already committed. We instead defer performing the reads when xferMods command
+ // is called. Also allow this op to be added to session migration since we can't
+ // tell whether post image doc will fall within the chunk range. If it turns out
+ // both preImage and postImage doc don't fall into the chunk range, it is not wrong
+ // for this op to be added to session migration, but it will result in wasted work
+ // and unneccesary extra oplog storage on the destination.
+ cloner->_deferProcessingForXferMod(preImageDocKey);
}
+ } else {
+ cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {});
}
addToSessionMigrationOptimeQueueIfNeeded(cloner, nss, _prepareOrCommitOpTime);
-
- cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {});
}
}
@@ -800,11 +814,78 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
return Status::OK();
}
+bool MigrationChunkClonerSourceLegacy::_processUpdateForXferMod(const BSONObj& preImageDocKey,
+ const BSONObj& postImageDocKey) {
+ auto const& minKey = _args.getMin().value();
+ auto const& maxKey = _args.getMax().value();
+
+ auto postShardKeyValues = _shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey);
+ fassert(6836100, !postShardKeyValues.isEmpty());
+
+ auto opType = repl::OpTypeEnum::kUpdate;
+ auto idElement = preImageDocKey["_id"];
+
+ if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) {
+ // If the preImageDoc is not in range but the postImageDoc was, we know that the
+ // document has changed shard keys and no longer belongs in the chunk being cloned.
+ // We will model the deletion of the preImage document so that the destination chunk
+ // does not receive an outdated version of this document.
+
+ auto preImageShardKeyValues =
+ _shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
+ fassert(6836101, !preImageShardKeyValues.isEmpty());
+
+ if (!isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) {
+ return false;
+ }
+
+ opType = repl::OpTypeEnum::kDelete;
+ idElement = postImageDocKey["_id"];
+ }
+
+ _addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {});
+
+ return true;
+}
+
+void MigrationChunkClonerSourceLegacy::_deferProcessingForXferMod(const BSONObj& preImageDocKey) {
+ stdx::lock_guard<Latch> sl(_mutex);
+ _deferredReloadOrDeletePreImageDocKeys.push_back(preImageDocKey.getOwned());
+ _deferredUntransferredOpsCounter++;
+}
+
+void MigrationChunkClonerSourceLegacy::_processDeferredXferMods(OperationContext* opCtx,
+ Database* db) {
+ std::vector<BSONObj> deferredReloadOrDeletePreImageDocKeys;
+
+ {
+ stdx::unique_lock lk(_mutex);
+ deferredReloadOrDeletePreImageDocKeys.swap(_deferredReloadOrDeletePreImageDocKeys);
+ }
+
+ for (const auto& preImageDocKey : deferredReloadOrDeletePreImageDocKeys) {
+ auto idElement = preImageDocKey["_id"];
+ BSONObj newerVersionDoc;
+ if (!Helpers::findById(opCtx, db, nss().ns(), BSON("_id" << idElement), newerVersionDoc)) {
+ // If the document can no longer be found, this means that another later op must have
+ // deleted it. That delete would have been captured by the xferMods so nothing else to
+ // do here.
+ continue;
+ }
+
+ auto postImageDocKey =
+ CollectionMetadata::extractDocumentKey(&_shardKeyPattern, newerVersionDoc);
+ static_cast<void>(_processUpdateForXferMod(preImageDocKey, postImageDocKey));
+ }
+}
+
Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
Database* db,
BSONObjBuilder* builder) {
dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IS));
+ _processDeferredXferMods(opCtx, db);
+
std::list<BSONObj> deleteList;
std::list<BSONObj> updateList;
@@ -849,6 +930,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
_untransferredDeletesCounter = _deleted.size();
_reload.splice(_reload.cbegin(), updateList);
_untransferredUpsertsCounter = _reload.size();
+ _deferredUntransferredOpsCounter = _deferredReloadOrDeletePreImageDocKeys.size();
return Status::OK();
}
@@ -863,6 +945,8 @@ void MigrationChunkClonerSourceLegacy::_cleanup() {
_untransferredUpsertsCounter = 0;
_deleted.clear();
_untransferredDeletesCounter = 0;
+ _deferredReloadOrDeletePreImageDocKeys.clear();
+ _deferredUntransferredOpsCounter = 0;
}
StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(OperationContext* opCtx,
@@ -1111,7 +1195,8 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
stdx::lock_guard<Latch> sl(_mutex);
int64_t untransferredModsSizeBytes = _untransferredDeletesCounter * _averageObjectIdSize +
- _untransferredUpsertsCounter * _averageObjectSizeForCloneLocs;
+ (_untransferredUpsertsCounter + _deferredUntransferredOpsCounter) *
+ _averageObjectSizeForCloneLocs;
if (_forceJumbo && _jumboChunkCloneState) {
LOGV2(21992,
@@ -1177,6 +1262,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"moveChunk data transfer within threshold to allow write blocking",
"_untransferredUpsertsCounter"_attr = _untransferredUpsertsCounter,
"_untransferredDeletesCounter"_attr = _untransferredDeletesCounter,
+ "_deferredUntransferredOpsCounter"_attr = _deferredUntransferredOpsCounter,
"_averageObjectSizeForCloneLocs"_attr = _averageObjectSizeForCloneLocs,
"_averageObjectIdSize"_attr = _averageObjectIdSize,
"untransferredModsSizeBytes"_attr = untransferredModsSizeBytes,
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 803c1f512af..4fff7da8d17 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -67,13 +67,13 @@ const long long kFixedCommandOverhead = 32 * 1024;
*/
class LogTransactionOperationsForShardingHandler final : public RecoveryUnit::Change {
public:
- /**
- * Invariant: idObj should belong to a document that is part of the active chunk being migrated
- */
- LogTransactionOperationsForShardingHandler(const LogicalSessionId lsid,
+ LogTransactionOperationsForShardingHandler(LogicalSessionId lsid,
+ const std::vector<repl::OplogEntry>& stmts,
+ repl::OpTime prepareOrCommitOpTime);
+
+ LogTransactionOperationsForShardingHandler(LogicalSessionId lsid,
const std::vector<repl::ReplOperation>& stmts,
- const repl::OpTime& prepareOrCommitOpTime)
- : _lsid(lsid), _stmts(stmts), _prepareOrCommitOpTime(prepareOrCommitOpTime) {}
+ repl::OpTime prepareOrCommitOpTime);
void commit(boost::optional<Timestamp>) override;
@@ -81,6 +81,8 @@ public:
private:
const LogicalSessionId _lsid;
+ // Use to keep BSON obj alive for the lifetime of this object.
+ std::vector<BSONObj> _ownedReplBSONObj;
std::vector<repl::ReplOperation> _stmts;
const repl::OpTime _prepareOrCommitOpTime;
};
@@ -486,6 +488,23 @@ private:
*/
Status _checkRecipientCloningStatus(OperationContext* opCtx, Milliseconds maxTimeToWait);
+ /**
+ * Inspects the pre and post image document keys and determines which xferMods bucket to
+ * add a new entry. Returns false if neither pre or post image document keys fall into
+ * the chunk boundaries being migrated.
+ */
+ bool _processUpdateForXferMod(const BSONObj& preImageDocKey, const BSONObj& postImageDocKey);
+
+ /**
+ * Defer processing of update ops into xferMods entries to when nextModsBatch is called.
+ */
+ void _deferProcessingForXferMod(const BSONObj& preImageDocKey);
+
+ /**
+ * Converts all deferred update ops captured by the op observer into xferMods entries.
+ */
+ void _processDeferredXferMods(OperationContext* opCtx, Database* database);
+
// The original move range request
const ShardsvrMoveRange _args;
@@ -547,6 +566,13 @@ private:
// Amount of delete xfer mods that have not yet reached the recipient.
size_t _untransferredDeletesCounter{0};
+ // Amount of ops that are yet to be converted to update/delete xferMods.
+ size_t _deferredUntransferredOpsCounter{0};
+
+ // Stores document keys of document that needs to be examined if we need to put in to xferMods
+ // list later.
+ std::vector<BSONObj> _deferredReloadOrDeletePreImageDocKeys;
+
// Total bytes in _reload + _deleted (xfer mods)
uint64_t _memoryUsed{0};
diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp
index ab8ce8ca5e8..860fe7ad050 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.cpp
+++ b/src/mongo/db/s/op_observer_sharding_impl.cpp
@@ -241,4 +241,14 @@ void OpObserverShardingImpl::shardObserveTransactionPrepareOrUnpreparedCommit(
*opCtx->getLogicalSessionId(), stmts, prepareOrCommitOptime));
}
+void OpObserverShardingImpl::shardObserveNonPrimaryTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& stmts,
+ const repl::OpTime& prepareOrCommitOptime) {
+
+ opCtx->recoveryUnit()->registerChange(
+ std::make_unique<LogTransactionOperationsForShardingHandler>(
+ *opCtx->getLogicalSessionId(), stmts, prepareOrCommitOptime));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h
index f9005497c57..d7295482332 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.h
+++ b/src/mongo/db/s/op_observer_sharding_impl.h
@@ -74,6 +74,10 @@ protected:
OperationContext* opCtx,
const std::vector<repl::ReplOperation>& stmts,
const repl::OpTime& prepareOrCommitOptime) override;
+ void shardObserveNonPrimaryTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& stmts,
+ const repl::OpTime& prepareOrCommitOptime) override;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h
index 30d319a041d..e8affe3ef4a 100644
--- a/src/mongo/db/s/resharding/resharding_op_observer.h
+++ b/src/mongo/db/s/resharding/resharding_op_observer.h
@@ -228,6 +228,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) override {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 5a0671254d6..78e84b664cf 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -207,6 +207,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) override {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}