summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2019-03-25 16:44:32 -0400
committerRandolph Tan <randolph@10gen.com>2019-04-03 16:08:25 -0400
commit47153dbf7aebeba9a5b9086a82709adfa6fd7226 (patch)
tree79fbadde42332add71593f572be0ea9cf4ec000a /src/mongo/db/s
parentdd589aa07a0155cdeaa70f0403466aabcfaa5186 (diff)
downloadmongo-47153dbf7aebeba9a5b9086a82709adfa6fd7226.tar.gz
SERVER-40301 Don't hold mutex while doing queries in nextCloneBatch and nextModsBatch
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp121
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h39
2 files changed, 103 insertions, 57 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 393a99c00fa..bd599fc74b1 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -516,21 +516,24 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
internalQueryExecYieldIterations.load(),
Milliseconds(internalQueryExecYieldPeriodMS.load()));
- stdx::lock_guard<stdx::mutex> sl(_mutex);
-
- std::set<RecordId>::iterator it;
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ auto iter = _cloneLocs.begin();
- for (it = _cloneLocs.begin(); it != _cloneLocs.end(); ++it) {
+ for (; iter != _cloneLocs.end(); ++iter) {
// We must always make progress in this method by at least one document because empty return
// indicates there is no more initial clone data.
if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) {
break;
}
+ auto nextRecordId = *iter;
+
+ lk.unlock();
+
Snapshotted<BSONObj> doc;
- if (collection->findDoc(opCtx, *it, &doc)) {
- // Use the builder size instead of accumulating the document sizes directly so that we
- // take into consideration the overhead of BSONArray indices.
+ if (collection->findDoc(opCtx, nextRecordId, &doc)) {
+ // Use the builder size instead of accumulating the document sizes directly so
+ // that we take into consideration the overhead of BSONArray indices.
if (arrBuilder->arrSize() &&
(arrBuilder->len() + doc.value().objsize() + 1024) > BSONObjMaxUserSize) {
break;
@@ -539,9 +542,11 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
arrBuilder->append(doc.value());
ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
}
+
+ lk.lock();
}
- _cloneLocs.erase(_cloneLocs.begin(), it);
+ _cloneLocs.erase(_cloneLocs.begin(), iter);
return Status::OK();
}
@@ -551,17 +556,33 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
BSONObjBuilder* builder) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IS));
- stdx::lock_guard<stdx::mutex> sl(_mutex);
+ std::list<BSONObj> deleteList;
+ std::list<BSONObj> updateList;
- // All clone data must have been drained before starting to fetch the incremental changes
- invariant(_cloneLocs.empty());
+ {
+ // All clone data must have been drained before starting to fetch the incremental changes.
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ invariant(_cloneLocs.empty());
- long long docSizeAccumulator = 0;
+ // The "snapshot" for delete and update list must be taken under a single lock. This is to
+ // ensure that we will preserve the causal order of writes. Always consume the delete
+ // buffer first, before the update buffer. If the delete is causally before the update to
+ // the same doc, then there's no problem since we consume the delete buffer first. If the
+ // delete is causally after, we will not be able to see the document when we attempt to
+ // fetch it, so it's also ok.
+ deleteList.splice(deleteList.cbegin(), _deleted);
+ updateList.splice(updateList.cbegin(), _reload);
+ }
+
+ auto totalDocSize = _xferDeletes(builder, &deleteList, 0);
+ totalDocSize = _xferUpdates(opCtx, db, builder, &updateList, totalDocSize);
- _xfer(opCtx, db, &_deleted, builder, "deleted", &docSizeAccumulator, false);
- _xfer(opCtx, db, &_reload, builder, "reload", &docSizeAccumulator, true);
+ builder->append("size", totalDocSize);
- builder->append("size", docSizeAccumulator);
+ // Put back remaining ids we didn't consume
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _deleted.splice(_deleted.cbegin(), deleteList);
+ _reload.splice(_reload.cbegin(), updateList);
return Status::OK();
}
@@ -719,41 +740,61 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
return Status::OK();
}
-void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx,
- Database* db,
- std::list<BSONObj>* docIdList,
- BSONObjBuilder* builder,
- const char* fieldName,
- long long* sizeAccumulator,
- bool explode) {
+long long MigrationChunkClonerSourceLegacy::_xferDeletes(BSONObjBuilder* builder,
+ std::list<BSONObj>* removeList,
+ long long initialSize) {
const long long maxSize = 1024 * 1024;
- if (docIdList->size() == 0 || *sizeAccumulator > maxSize) {
- return;
+ if (removeList->empty() || initialSize > maxSize) {
+ return initialSize;
}
- const std::string& ns = _args.getNss().ns();
+ long long totalSize = initialSize;
+ BSONArrayBuilder arr(builder->subarrayStart("deleted"));
- BSONArrayBuilder arr(builder->subarrayStart(fieldName));
-
- std::list<BSONObj>::iterator docIdIter = docIdList->begin();
- while (docIdIter != docIdList->end() && *sizeAccumulator < maxSize) {
+ auto docIdIter = removeList->begin();
+ for (; docIdIter != removeList->end() && totalSize < maxSize; ++docIdIter) {
BSONObj idDoc = *docIdIter;
- if (explode) {
- BSONObj fullDoc;
- if (Helpers::findById(opCtx, db, ns.c_str(), idDoc, fullDoc)) {
- arr.append(fullDoc);
- *sizeAccumulator += fullDoc.objsize();
- }
- } else {
- arr.append(idDoc);
- *sizeAccumulator += idDoc.objsize();
- }
+ arr.append(idDoc);
+ totalSize += idDoc.objsize();
+ }
+
+ removeList->erase(removeList->begin(), docIdIter);
+
+ arr.done();
+ return totalSize;
+}
+
+long long MigrationChunkClonerSourceLegacy::_xferUpdates(OperationContext* opCtx,
+ Database* db,
+ BSONObjBuilder* builder,
+ std::list<BSONObj>* updateList,
+ long long initialSize) {
+ const long long maxSize = 1024 * 1024;
- docIdIter = docIdList->erase(docIdIter);
+ if (updateList->empty() || initialSize > maxSize) {
+ return initialSize;
}
+ const auto& nss = _args.getNss();
+ BSONArrayBuilder arr(builder->subarrayStart("reload"));
+ long long totalSize = initialSize;
+
+ auto iter = updateList->begin();
+ for (; iter != updateList->end() && totalSize < maxSize; ++iter) {
+ auto idDoc = *iter;
+
+ BSONObj fullDoc;
+ if (Helpers::findById(opCtx, db, nss.ns().c_str(), idDoc, fullDoc)) {
+ arr.append(fullDoc);
+ totalSize += fullDoc.objsize();
+ }
+ }
+
+ updateList->erase(updateList->begin(), iter);
+
arr.done();
+ return totalSize;
}
boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 3874f03e76a..1fd34944a8b 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -120,7 +120,8 @@ public:
/**
* Called by the recipient shard. Populates the passed BSONArrayBuilder with a set of documents,
- * which are part of the initial clone sequence.
+ * which are part of the initial clone sequence. Assumes that there is only one active caller
+ * to this method at a time (otherwise, it can cause corruption/crash).
*
* Returns OK status on success. If there were documents returned in the result argument, this
* method should be called more times until the result is empty. If it returns failure, it is
@@ -201,22 +202,6 @@ private:
*/
Status _storeCurrentLocs(OperationContext* opCtx);
- /**
- * Insert items from docIdList to a new array with the given fieldName in the given builder. If
- * explode is true, the inserted object will be the full version of the document. Note that
- * whenever an item from the docList is inserted to the array, it will also be removed from
- * docList.
- *
- * Should be holding the collection lock for ns if explode is true.
- */
- void _xfer(OperationContext* opCtx,
- Database* db,
- std::list<BSONObj>* docIdList,
- BSONObjBuilder* builder,
- const char* fieldName,
- long long* sizeAccumulator,
- bool explode);
-
/*
* Consumes the operation track request and appends the relevant document changes to
* the appropriate internal data structures (known colloquially as the 'transfer mods queue').
@@ -266,6 +251,26 @@ private:
*/
void _drainAllOutstandingOperationTrackRequests(stdx::unique_lock<stdx::mutex>& lk);
+ /**
+ * Appends to the builder the list of _id of documents that were deleted during migration.
+ * Entries appended to the builder are removed from the list.
+ * Returns the total size of the documents that were appended + initialSize.
+ */
+ long long _xferDeletes(BSONObjBuilder* builder,
+ std::list<BSONObj>* removeList,
+ long long initialSize);
+
+ /**
+ * Appends to the builder the list of full documents that were modified/inserted during the
+ * migration. Entries appended to the builder are removed from the list.
+ * Returns the total size of the documents that were appended + initialSize.
+ */
+ long long _xferUpdates(OperationContext* opCtx,
+ Database* db,
+ BSONObjBuilder* builder,
+ std::list<BSONObj>* updateList,
+ long long initialSize);
+
// The original move chunk request
const MoveChunkRequest _args;