diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2021-04-01 23:06:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-02 00:01:02 +0000 |
commit | b55f6c86a1a47f28de7815ef942c3c422590dea9 (patch) | |
tree | 3ad4664be27f03f0e601d1fb6fd32d4276ca25f6 | |
parent | c8e74f9e82a9aab3f549ff77aa539fa6b3ab6b45 (diff) | |
download | mongo-b55f6c86a1a47f28de7815ef942c3c422590dea9.tar.gz |
SERVER-55060 Direct modification must remove buckets from the time-series bucket catalog
22 files changed, 790 insertions, 151 deletions
diff --git a/jstests/noPassthrough/timeseries_direct_remove.js b/jstests/noPassthrough/timeseries_direct_remove.js new file mode 100644 index 00000000000..d7c009fe27e --- /dev/null +++ b/jstests/noPassthrough/timeseries_direct_remove.js @@ -0,0 +1,90 @@ +/** + * Tests that direct removal in a timeseries bucket collection close the relevant bucket, preventing + * further inserts from landing in that bucket. + */ +(function() { +'use strict'; + +load('jstests/core/timeseries/libs/timeseries.js'); +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallel_shell_helpers.js"); + +const conn = MongoRunner.runMongod(); + +if (!TimeseriesTest.timeseriesCollectionsEnabled(conn)) { + jsTestLog("Skipping test because the time-series collection feature flag is disabled"); + MongoRunner.stopMongod(conn); + return; +} + +const dbName = jsTestName(); +const testDB = conn.getDB(dbName); +assert.commandWorked(testDB.dropDatabase()); + +const collName = 'test'; + +const timeFieldName = 'time'; +const times = [ + ISODate('2021-01-01T01:00:00Z'), + ISODate('2021-01-01T01:10:00Z'), + ISODate('2021-01-01T01:20:00Z') +]; +let docs = [ + {_id: 0, [timeFieldName]: times[0]}, + {_id: 1, [timeFieldName]: times[1]}, + {_id: 2, [timeFieldName]: times[2]} +]; + +const coll = testDB.getCollection(collName); +const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName()); +coll.drop(); + +assert.commandWorked( + testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}})); +assert.contains(bucketsColl.getName(), testDB.getCollectionNames()); + +assert.commandWorked(coll.insert(docs[0])); +assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 1)); + +let buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 1); +assert.eq(buckets[0].control.min[timeFieldName], times[0]); +assert.eq(buckets[0].control.max[timeFieldName], times[0]); + +let removeResult = assert.commandWorked(bucketsColl.remove({_id: buckets[0]._id})); +assert.eq(removeResult.nRemoved, 1); + +buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 0); + +assert.commandWorked(coll.insert(docs[1])); +assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(1, 2)); + +buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 1); +assert.eq(buckets[0].control.min[timeFieldName], times[1]); +assert.eq(buckets[0].control.max[timeFieldName], times[1]); + +let fpInsert = configureFailPoint(conn, "hangTimeseriesInsertBeforeCommit"); +let awaitInsert = startParallelShell( + funWithArgs(function(dbName, collName, doc) { + assert.commandWorked(db.getSiblingDB(dbName).getCollection(collName).insert(doc)); + }, dbName, coll.getName(), docs[2]), conn.port); + +fpInsert.wait(); + +removeResult = assert.commandWorked(bucketsColl.remove({_id: buckets[0]._id})); +assert.eq(removeResult.nRemoved, 1); + +fpInsert.off(); +awaitInsert(); + +assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(2, 3)); + +buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 1); +assert.eq(buckets[0].control.min[timeFieldName], times[2]); +assert.eq(buckets[0].control.max[timeFieldName], times[2]); + +MongoRunner.stopMongod(conn); +})(); diff --git a/jstests/noPassthrough/timeseries_direct_remove_conflict.js b/jstests/noPassthrough/timeseries_direct_remove_conflict.js new file mode 100644 index 00000000000..07af79937c0 --- /dev/null +++ b/jstests/noPassthrough/timeseries_direct_remove_conflict.js @@ -0,0 +1,95 @@ +/** + * Tests that direct removal in a timeseries bucket collection close the relevant bucket, preventing + * further inserts from landing in that bucket, including the case where a concurrent catalog write + * causes a write conflict. + */ +(function() { +'use strict'; + +load('jstests/core/timeseries/libs/timeseries.js'); +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallel_shell_helpers.js"); + +const conn = MongoRunner.runMongod(); + +if (!TimeseriesTest.timeseriesCollectionsEnabled(conn)) { + jsTestLog("Skipping test because the time-series collection feature flag is disabled"); + MongoRunner.stopMongod(conn); + return; +} + +const dbName = jsTestName(); +const testDB = conn.getDB(dbName); +assert.commandWorked(testDB.dropDatabase()); + +const collName = 'test'; + +const timeFieldName = 'time'; +const times = [ + ISODate('2021-01-01T01:00:00Z'), + ISODate('2021-01-01T01:10:00Z'), + ISODate('2021-01-01T01:20:00Z') +]; +let docs = [ + {_id: 0, [timeFieldName]: times[0]}, + {_id: 1, [timeFieldName]: times[1]}, + {_id: 2, [timeFieldName]: times[2]} +]; + +const coll = testDB.getCollection(collName); +const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName()); +coll.drop(); + +assert.commandWorked( + testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}})); +assert.contains(bucketsColl.getName(), testDB.getCollectionNames()); + +assert.commandWorked(coll.insert(docs[0])); +assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 1)); + +let buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 1); +assert.eq(buckets[0].control.min[timeFieldName], times[0]); +assert.eq(buckets[0].control.max[timeFieldName], times[0]); + +const fpInsert = configureFailPoint(conn, "hangTimeseriesInsertBeforeWrite"); +const awaitInsert = startParallelShell( + funWithArgs(function(dbName, collName, doc) { + assert.commandWorked(db.getSiblingDB(dbName).getCollection(collName).insert(doc)); + }, dbName, coll.getName(), docs[1]), conn.port); +fpInsert.wait(); + +const fpRemove = configureFailPoint(conn, "hangTimeseriesDirectModificationBeforeWriteConflict"); +const awaitRemove = startParallelShell( + funWithArgs(function(dbName, collName, id) { + const removeResult = assert.commandWorked( + db.getSiblingDB(dbName).getCollection('system.buckets.' + collName).remove({_id: id})); + assert.eq(removeResult.nRemoved, 1); + }, dbName, coll.getName(), buckets[0]._id), conn.port); +fpRemove.wait(); + +fpRemove.off(); +fpInsert.off(); +awaitRemove(); +awaitInsert(); + +// The expected ordering is that the insert finished, then the remove deleted the bucket document, +// so there should be no documents left. + +assert.docEq(coll.find().sort({_id: 1}).toArray().length, 0); + +buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 0); + +// Now another insert should generate a new bucket. + +assert.commandWorked(coll.insert(docs[2])); +assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(2, 3)); + +buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 1); +assert.eq(buckets[0].control.min[timeFieldName], times[2]); +assert.eq(buckets[0].control.max[timeFieldName], times[2]); + +MongoRunner.stopMongod(conn); +})(); diff --git a/jstests/noPassthrough/timeseries_direct_update.js b/jstests/noPassthrough/timeseries_direct_update.js new file mode 100644 index 00000000000..f22cad0b28e --- /dev/null +++ b/jstests/noPassthrough/timeseries_direct_update.js @@ -0,0 +1,102 @@ +/** + * Tests that direct updates to a timeseries bucket collection close the bucket, preventing further + * inserts to land in that bucket. + */ +(function() { +'use strict'; + +load('jstests/core/timeseries/libs/timeseries.js'); +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallel_shell_helpers.js"); + +const conn = MongoRunner.runMongod(); + +if (!TimeseriesTest.timeseriesCollectionsEnabled(conn)) { + jsTestLog("Skipping test because the time-series collection feature flag is disabled"); + MongoRunner.stopMongod(conn); + return; +} + +const dbName = jsTestName(); +const testDB = conn.getDB(dbName); +assert.commandWorked(testDB.dropDatabase()); + +const collName = 'test'; + +const timeFieldName = 'time'; +const times = [ + ISODate('2021-01-01T01:00:00Z'), + ISODate('2021-01-01T01:10:00Z'), + ISODate('2021-01-01T01:20:00Z') +]; +let docs = [ + {_id: 0, [timeFieldName]: times[0]}, + {_id: 1, [timeFieldName]: times[1]}, + {_id: 2, [timeFieldName]: times[2]} +]; + +const coll = testDB.getCollection(collName); +const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName()); +coll.drop(); + +assert.commandWorked( + testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}})); +assert.contains(bucketsColl.getName(), testDB.getCollectionNames()); + +assert.commandWorked(coll.insert(docs[0])); +assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 1)); + +let buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 1); +assert.eq(buckets[0].control.min[timeFieldName], times[0]); +assert.eq(buckets[0].control.max[timeFieldName], times[0]); + +let modified = buckets[0]; +// This corrupts the bucket, but it's fine here. +modified.control.max[timeFieldName] = times[1]; +let updateResult = assert.commandWorked(bucketsColl.update({_id: buckets[0]._id}, modified)); +assert.eq(updateResult.nMatched, 1); +assert.eq(updateResult.nModified, 1); + +buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 1); +assert.eq(buckets[0].control.min[timeFieldName], times[0]); +assert.eq(buckets[0].control.max[timeFieldName], times[1]); + +assert.commandWorked(coll.insert(docs[1])); +assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 2)); + +buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 2); +assert.eq(buckets[1].control.min[timeFieldName], times[1]); +assert.eq(buckets[1].control.max[timeFieldName], times[1]); + +let fpInsert = configureFailPoint(conn, "hangTimeseriesInsertBeforeCommit"); +let awaitInsert = startParallelShell( + funWithArgs(function(dbName, collName, doc) { + assert.commandWorked(db.getSiblingDB(dbName).getCollection(collName).insert(doc)); + }, dbName, coll.getName(), docs[2]), conn.port); + +fpInsert.wait(); + +modified = buckets[1]; +// This corrupts the bucket, but it's fine here. +modified.control.max[timeFieldName] = times[2]; +updateResult = assert.commandWorked(bucketsColl.update({_id: buckets[1]._id}, modified)); +assert.eq(updateResult.nMatched, 1); +assert.eq(updateResult.nModified, 1); + +fpInsert.off(); +awaitInsert(); + +assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 3)); + +buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 3); +assert.eq(buckets[1].control.min[timeFieldName], times[1]); +assert.eq(buckets[1].control.max[timeFieldName], times[2]); +assert.eq(buckets[2].control.min[timeFieldName], times[2]); +assert.eq(buckets[2].control.max[timeFieldName], times[2]); + +MongoRunner.stopMongod(conn); +})(); diff --git a/jstests/noPassthrough/timeseries_direct_update_conflict.js b/jstests/noPassthrough/timeseries_direct_update_conflict.js new file mode 100644 index 00000000000..da70684a61f --- /dev/null +++ b/jstests/noPassthrough/timeseries_direct_update_conflict.js @@ -0,0 +1,105 @@ +/** + * Tests that direct updates to a timeseries bucket collection close the bucket, preventing further + * inserts to land in that bucket, including the case where a concurrent catalog write causes + * a write conflict. + */ +(function() { +'use strict'; + +load('jstests/core/timeseries/libs/timeseries.js'); +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallel_shell_helpers.js"); + +const conn = MongoRunner.runMongod(); + +if (!TimeseriesTest.timeseriesCollectionsEnabled(conn)) { + jsTestLog("Skipping test because the time-series collection feature flag is disabled"); + MongoRunner.stopMongod(conn); + return; +} + +const dbName = jsTestName(); +const testDB = conn.getDB(dbName); +assert.commandWorked(testDB.dropDatabase()); + +const collName = 'test'; + +const timeFieldName = 'time'; +const times = [ + ISODate('2021-01-01T01:00:00Z'), + ISODate('2021-01-01T01:10:00Z'), + ISODate('2021-01-01T01:20:00Z') +]; +let docs = [ + {_id: 0, [timeFieldName]: times[0]}, + {_id: 1, [timeFieldName]: times[1]}, + {_id: 2, [timeFieldName]: times[2]} +]; + +const coll = testDB.getCollection(collName); +const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName()); +coll.drop(); + +assert.commandWorked( + testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}})); +assert.contains(bucketsColl.getName(), testDB.getCollectionNames()); + +assert.commandWorked(coll.insert(docs[0])); +assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 1)); + +let buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 1); +assert.eq(buckets[0].control.min[timeFieldName], times[0]); +assert.eq(buckets[0].control.max[timeFieldName], times[0]); + +const fpInsert = configureFailPoint(conn, "hangTimeseriesInsertBeforeWrite"); +const awaitInsert = startParallelShell( + funWithArgs(function(dbName, collName, doc) { + assert.commandWorked(db.getSiblingDB(dbName).getCollection(collName).insert(doc)); + }, dbName, coll.getName(), docs[1]), conn.port); +fpInsert.wait(); + +const modified = buckets[0]; +// This corrupts the bucket, but it's fine here. +modified.control.max[timeFieldName] = times[1]; + +const fpUpdate = configureFailPoint(conn, "hangTimeseriesDirectModificationBeforeWriteConflict"); +const awaitUpdate = startParallelShell( + funWithArgs(function(dbName, collName, update) { + const updateResult = assert.commandWorked(db.getSiblingDB(dbName) + .getCollection('system.buckets.' + collName) + .update({_id: update._id}, update)); + assert.eq(updateResult.nMatched, 1); + assert.eq(updateResult.nModified, 1); + }, dbName, coll.getName(), modified), conn.port); +fpUpdate.wait(); + +fpUpdate.off(); +fpInsert.off(); +awaitUpdate(); +awaitInsert(); + +// The expected ordering is that the insert finished, then the update overwrote the bucket document, +// so there should be one document, and a closed flag. + +assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 1)); + +buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 1); +assert.eq(buckets[0].control.min[timeFieldName], times[0]); +assert.eq(buckets[0].control.max[timeFieldName], times[1]); + +// Now another insert should generate a new bucket. + +assert.commandWorked(coll.insert(docs[2])); +assert.docEq(coll.find().sort({_id: 1}).toArray(), [docs[0], docs[2]]); + +buckets = bucketsColl.find().sort({_id: 1}).toArray(); +assert.eq(buckets.length, 2); +assert.eq(buckets[0].control.min[timeFieldName], times[0]); +assert.eq(buckets[0].control.max[timeFieldName], times[1]); +assert.eq(buckets[1].control.min[timeFieldName], times[2]); +assert.eq(buckets[1].control.max[timeFieldName], times[2]); + +MongoRunner.stopMongod(conn); +})(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 8c9833b4b37..00c9a8cf7ac 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -823,6 +823,7 @@ env.Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/catalog/collection_catalog', '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', + "$BUILD_DIR/mongo/db/timeseries/bucket_catalog", '$BUILD_DIR/mongo/s/coreshard', "$BUILD_DIR/mongo/s/grid", 'catalog/collection_options', diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index b5212dea20d..e95215b3832 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -40,6 +40,7 @@ #include "mongo/base/string_data.h" #include "mongo/bson/mutable/damage_vector.h" #include "mongo/bson/timestamp.h" +#include "mongo/db/catalog/collection_operation_source.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/logical_session_id.h" @@ -88,8 +89,8 @@ struct CollectionUpdateArgs { // Document containing the _id field of the doc being updated. BSONObj criteria; - // True if this update comes from a chunk migration. - bool fromMigrate = false; + // Type of update. See OperationSource definition for more details. + OperationSource source = OperationSource::kStandard; StoreDocOption storeDocOption = StoreDocOption::None; bool preImageRecordingEnabledForCollection = false; diff --git a/src/mongo/db/catalog/collection_operation_source.h b/src/mongo/db/catalog/collection_operation_source.h new file mode 100644 index 00000000000..21875fcfa95 --- /dev/null +++ b/src/mongo/db/catalog/collection_operation_source.h @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +namespace mongo { + +/** + * Enum used to differentiate between types of insert/update operations based on how they were + * issued. + */ + +enum OperationSource { + kStandard, // Default case, use this if none of the others applies. + kFromMigrate, // From a chunk migration. + kTimeseries // From an internal operation on the BucketCatalog, not a direct operation on + // the underlying bucket collection. +}; + +} // namespace mongo diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index caf43d24d7f..33a5598bf02 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -77,6 +77,7 @@ namespace { MONGO_FAIL_POINT_DEFINE(hangWriteBeforeWaitingForMigrationDecision); MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeCommit); +MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeWrite); MONGO_FAIL_POINT_DEFINE(failTimeseriesInsert); void redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName) { @@ -563,7 +564,7 @@ public: write_ops::Insert::parse({"CmdInsert::_performTimeseriesInsert"}, request); return _getTimeseriesSingleWriteResult(write_ops_exec::performInserts( - opCtx, timeseriesInsertBatch, write_ops_exec::InsertType::kTimeseries)); + opCtx, timeseriesInsertBatch, OperationSource::kTimeseries)); } StatusWith<SingleWriteResult> _performTimeseriesUpdate( @@ -592,7 +593,7 @@ public: timeseriesUpdateBatch.setWriteCommandBase(std::move(writeCommandBase)); return _getTimeseriesSingleWriteResult(write_ops_exec::performUpdates( - opCtx, timeseriesUpdateBatch, write_ops_exec::UpdateType::kTimeseries)); + opCtx, timeseriesUpdateBatch, OperationSource::kTimeseries)); } void _commitTimeseriesBucket(OperationContext* opCtx, @@ -603,11 +604,23 @@ public: std::vector<BSONObj>* errors, boost::optional<repl::OpTime>* opTime, boost::optional<OID>* electionId, - std::vector<size_t>* updatesToRetry) const { + std::vector<size_t>* docsToRetry) const { auto& bucketCatalog = BucketCatalog::get(opCtx); auto metadata = bucketCatalog.getMetadata(batch->bucket()); - bucketCatalog.prepareCommit(batch); + bool prepared = bucketCatalog.prepareCommit(batch); + if (!prepared) { + invariant(batch->finished()); + invariant(batch->getResult().getStatus() == ErrorCodes::TimeseriesBucketCleared, + str::stream() + << "Got unexpected error (" << batch->getResult().getStatus() + << ") preparing time-series bucket to be committed for " << ns() + << ": " << redact(request().toBSON({}))); + docsToRetry->push_back(index); + return; + } + + hangTimeseriesInsertBeforeWrite.pauseWhileSet(); auto result = batch->numPreviouslyCommittedMeasurements() == 0 ? _performTimeseriesInsert(opCtx, batch, metadata, stmtIds) @@ -624,7 +637,7 @@ public: // No document in the buckets collection was found to update, meaning that it was // removed. bucketCatalog.abort(batch); - updatesToRetry->push_back(index); + docsToRetry->push_back(index); return; } @@ -715,7 +728,7 @@ public: hangTimeseriesInsertBeforeCommit.pauseWhileSet(); - std::vector<size_t> updatesToRetry; + std::vector<size_t> docsToRetry; for (auto& [batch, index] : batches) { bool shouldCommit = batch->claimCommitRights(); @@ -739,7 +752,7 @@ public: errors, opTime, electionId, - &updatesToRetry); + &docsToRetry); batch.reset(); } } @@ -757,7 +770,7 @@ public: << ") waiting for time-series bucket to be committed for " << ns() << ": " << redact(request().toBSON({}))); - updatesToRetry.push_back(index); + docsToRetry.push_back(index); continue; } @@ -774,7 +787,7 @@ public: } } - return updatesToRetry; + return docsToRetry; } void _performTimeseriesWritesSubset(OperationContext* opCtx, @@ -784,17 +797,11 @@ public: boost::optional<repl::OpTime>* opTime, boost::optional<OID>* electionId, bool* containsRetry) const { - std::vector<size_t> updatesToRetry; + std::vector<size_t> docsToRetry; do { - updatesToRetry = _performUnorderedTimeseriesWrites(opCtx, - start, - numDocs, - errors, - opTime, - electionId, - containsRetry, - updatesToRetry); - } while (!updatesToRetry.empty()); + docsToRetry = _performUnorderedTimeseriesWrites( + opCtx, start, numDocs, errors, opTime, electionId, containsRetry, docsToRetry); + } while (!docsToRetry.empty()); } void _performTimeseriesWrites(OperationContext* opCtx, diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 3fc790e9bec..0d4f4bebeec 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -249,7 +249,9 @@ UpdateResult Helpers::upsert(OperationContext* opCtx, request.setQuery(filter); request.setUpdateModification(write_ops::UpdateModification::parseFromClassicUpdate(updateMod)); request.setUpsert(); - request.setFromMigration(fromMigrate); + if (fromMigrate) { + request.setSource(OperationSource::kFromMigrate); + } request.setYieldPolicy(PlanYieldPolicy::YieldPolicy::NO_YIELD); return ::mongo::update(opCtx, context.db(), request); @@ -268,7 +270,9 @@ void Helpers::update(OperationContext* opCtx, request.setQuery(filter); request.setUpdateModification(write_ops::UpdateModification::parseFromClassicUpdate(updateMod)); - request.setFromMigration(fromMigrate); + if (fromMigrate) { + request.setSource(OperationSource::kFromMigrate); + } request.setYieldPolicy(PlanYieldPolicy::YieldPolicy::NO_YIELD); ::mongo::update(opCtx, context.db(), request); diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index 0856b46b5eb..4d400a2ddea 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -259,13 +259,19 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco uassert(16980, "Multi-update operations require all documents to have an '_id' field", !request->isMulti() || args.criteria.hasField("_id"_sd)); - args.fromMigrate = request->isFromMigration(); args.storeDocOption = getStoreDocMode(*request); if (args.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) { args.preImageDoc = oldObj.value().getOwned(); } } + // Ensure we set the type correctly + if (request->isFromMigration()) { + args.source = OperationSource::kFromMigrate; + } else if (request->isTimeseries()) { + args.source = OperationSource::kTimeseries; + } + if (inPlace) { if (!request->explain()) { newObj = oldObj.value(); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 25f598a332a..e542425452e 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -61,6 +61,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/storage/durable_catalog.h" +#include "mongo/db/timeseries/bucket_catalog.h" #include "mongo/db/transaction_participant.h" #include "mongo/db/transaction_participant_gen.h" #include "mongo/db/views/durable_view_catalog.h" @@ -201,7 +202,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, oplogEntry.setOpType(repl::OpTypeEnum::kUpdate); oplogEntry.setObject(args.updateArgs.update); oplogEntry.setObject2(args.updateArgs.criteria); - oplogEntry.setFromMigrateIfTrue(args.updateArgs.fromMigrate); + oplogEntry.setFromMigrateIfTrue(args.updateArgs.source == OperationSource::kFromMigrate); // oplogLink could have been changed to include pre/postImageOpTime by the previous no-op write. repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtIds); if (args.updateArgs.oplogSlot) { @@ -595,7 +596,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } if (args.nss != NamespaceString::kSessionTransactionsTableNamespace) { - if (!args.updateArgs.fromMigrate) { + if (args.updateArgs.source != OperationSource::kFromMigrate) { shardObserveUpdateOp(opCtx, args.nss, args.updateArgs.preImageDoc, @@ -618,6 +619,11 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } else if (args.nss == NamespaceString::kConfigSettingsNamespace) { ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( opCtx, args.updateArgs.updatedDoc["_id"], args.updateArgs.updatedDoc); + } else if (args.nss.isTimeseriesBucketsCollection()) { + if (args.updateArgs.source != OperationSource::kTimeseries) { + auto& bucketCatalog = BucketCatalog::get(opCtx); + bucketCatalog.clear(args.updateArgs.updatedDoc["_id"].OID()); + } } } @@ -634,6 +640,11 @@ void OpObserverImpl::aboutToDelete(OperationContext* opCtx, destinedRecipientDecoration(opCtx) = op.getDestinedRecipient(); shardObserveAboutToDelete(opCtx, nss, doc); + + if (nss.isTimeseriesBucketsCollection()) { + auto& bucketCatalog = BucketCatalog::get(opCtx); + bucketCatalog.clear(doc["_id"].OID()); + } } void OpObserverImpl::onDelete(OperationContext* opCtx, diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h index 43060673b82..fd7421148f2 100644 --- a/src/mongo/db/ops/update_request.h +++ b/src/mongo/db/ops/update_request.h @@ -193,12 +193,20 @@ public: return _updateOp.getMulti(); } - void setFromMigration(bool value = true) { - _fromMigration = value; + void setSource(OperationSource source) { + _source = source; + } + + OperationSource source() const { + return _source; } bool isFromMigration() const { - return _fromMigration; + return _source == OperationSource::kFromMigrate; + } + + bool isTimeseries() const { + return _source == OperationSource::kTimeseries; } void setFromOplogApplication(bool value = true) { @@ -286,7 +294,8 @@ public: builder << " god: " << _god; builder << " upsert: " << isUpsert(); builder << " multi: " << isMulti(); - builder << " fromMigration: " << _fromMigration; + builder << " fromMigration: " << isFromMigration(); + builder << " timeseries: " << isTimeseries(); builder << " fromOplogApplication: " << _fromOplogApplication; builder << " isExplain: " << static_cast<bool>(_explain); return builder.str(); @@ -319,8 +328,8 @@ private: // updates, never user updates. bool _god = false; - // True if this update is on behalf of a chunk migration. - bool _fromMigration = false; + // See Source declaration + OperationSource _source = OperationSource::kStandard; // True if this update was triggered by the application of an oplog entry. bool _fromOplogApplication = false; diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index bb60434bcd1..b9e3c1e6883 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -36,7 +36,7 @@ #include "mongo/base/checked_cast.h" #include "mongo/db/audit.h" #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/collection_operation_source.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" @@ -543,7 +543,7 @@ SingleWriteResult makeWriteResultForInsertOrDeleteRetry() { WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& wholeOp, - const InsertType& type) { + const OperationSource& source) { // Insert performs its own retries, so we should only be within a WriteUnitOfWork when run in a // transaction. auto txnParticipant = TransactionParticipant::get(opCtx); @@ -615,7 +615,7 @@ WriteResult performInserts(OperationContext* opCtx, // A time-series insert can combine multiple writes into a single operation, and thus // can have multiple statement ids associated with it if it is retryable. - batch.emplace_back(type == InsertType::kTimeseries && wholeOp.getStmtIds() + batch.emplace_back(source == OperationSource::kTimeseries && wholeOp.getStmtIds() ? *wholeOp.getStmtIds() : std::vector<StmtId>{stmtId}, toInsert); @@ -627,7 +627,7 @@ WriteResult performInserts(OperationContext* opCtx, } bool canContinue = insertBatchAndHandleErrors( - opCtx, wholeOp, batch, &lastOpFixer, &out, type == InsertType::kFromMigrate); + opCtx, wholeOp, batch, &lastOpFixer, &out, source == OperationSource::kFromMigrate); batch.clear(); // We won't need the current batch any more. bytesInBatch = 0; @@ -772,7 +772,8 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry( const std::vector<StmtId>& stmtIds, const write_ops::UpdateOpEntry& op, LegacyRuntimeConstants runtimeConstants, - const boost::optional<BSONObj>& letParams) { + const boost::optional<BSONObj>& letParams, + OperationSource source) { globalOpCounters.gotUpdate(); ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForUpdate(opCtx->getWriteConcern()); auto& curOp = *CurOp::get(opCtx); @@ -799,6 +800,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry( request.setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY : PlanYieldPolicy::YieldPolicy::YIELD_AUTO); + request.setSource(source); size_t numAttempts = 0; while (true) { @@ -834,7 +836,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry( WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& wholeOp, - const UpdateType& type) { + const OperationSource& source) { // Update performs its own retries, so we should not be in a WriteUnitOfWork unless run in a // transaction. auto txnParticipant = TransactionParticipant::get(opCtx); @@ -886,7 +888,7 @@ WriteResult performUpdates(OperationContext* opCtx, // A time-series insert can combine multiple writes into a single operation, and thus // can have multiple statement ids associated with it if it is retryable. - auto stmtIds = type == UpdateType::kTimeseries && wholeOp.getStmtIds() + auto stmtIds = source == OperationSource::kTimeseries && wholeOp.getStmtIds() ? *wholeOp.getStmtIds() : std::vector<StmtId>{stmtId}; @@ -895,7 +897,8 @@ WriteResult performUpdates(OperationContext* opCtx, stmtIds, singleOp, runtimeConstants, - wholeOp.getLet())); + wholeOp.getLet(), + source)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { const bool canContinue = handleError(opCtx, diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h index 0b2809251cf..87196b1345e 100644 --- a/src/mongo/db/ops/write_ops_exec.h +++ b/src/mongo/db/ops/write_ops_exec.h @@ -33,6 +33,7 @@ #include <vector> #include "mongo/base/status_with.h" +#include "mongo/db/catalog/collection_operation_source.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/single_write_result_gen.h" #include "mongo/db/ops/update_result.h" @@ -57,20 +58,6 @@ struct WriteResult { }; /** - * Enums used to differentiate between types of insert/update operations based on how they were - * issued. - */ -enum class InsertType { - kStandard, - kFromMigrate, // From a chunk migration. - kTimeseries, -}; -enum class UpdateType { - kStandard, - kTimeseries, -}; - -/** * Performs a batch of inserts, updates, or deletes. * * These functions handle all of the work of doing the writes, including locking, incrementing @@ -89,10 +76,10 @@ enum class UpdateType { */ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& op, - const InsertType& type = InsertType::kStandard); + const OperationSource& source = OperationSource::kStandard); WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& op, - const UpdateType& type = UpdateType::kStandard); + const OperationSource& source = OperationSource::kStandard); WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& op); /** diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 80ad3ad1402..63bc8d36d6a 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -1072,8 +1072,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { return toInsert; }()); - const auto reply = write_ops_exec::performInserts( - opCtx, insertOp, write_ops_exec::InsertType::kFromMigrate); + const auto reply = + write_ops_exec::performInserts(opCtx, insertOp, OperationSource::kFromMigrate); for (unsigned long i = 0; i < reply.results.size(); ++i) { uassertStatusOKWithContext( diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index 9e8bf42a6b5..d358c438e4e 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -430,7 +430,7 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE *metadata->getChunkManager(), args.updateArgs.updatedDoc, args.updateArgs.updatedDoc.objsize(), - args.updateArgs.fromMigrate); + args.updateArgs.source == OperationSource::kFromMigrate); } } diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript index c2f1959a441..b1b35582fe2 100644 --- a/src/mongo/db/timeseries/SConscript +++ b/src/mongo/db/timeseries/SConscript @@ -23,7 +23,9 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/database_holder', '$BUILD_DIR/mongo/db/commands/server_status', + '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', '$BUILD_DIR/mongo/db/views/views', + '$BUILD_DIR/mongo/util/fail_point', 'timeseries_idl', ], ) diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 17fa905b8d9..a6fe7f89e14 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -35,13 +35,16 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/commands/server_status.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/operation_context.h" #include "mongo/db/views/view_catalog.h" #include "mongo/stdx/thread.h" +#include "mongo/util/fail_point.h" namespace mongo { namespace { const auto getBucketCatalog = ServiceContext::declareDecoration<BucketCatalog>(); +MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeWriteConflict); uint8_t numDigits(uint32_t num) { uint8_t numDigits = 0; @@ -128,6 +131,7 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert( auto time = timeElem.Date(); BucketAccess bucket{this, key, stats.get(), time}; + invariant(bucket); StringSet newFieldNamesToBeInserted; uint32_t newFieldNamesSize = 0; @@ -202,46 +206,59 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert( return batch; } -void BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { - invariant(!batch->finished()); +bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { + if (batch->finished()) { + // In this case, someone else aborted the batch behind our back. Oops. + return false; + } _waitToCommitBatch(batch); BucketAccess bucket(this, batch->bucket()); - invariant(bucket); + if (!bucket) { + abort(batch); + return false; + } + + invariant(_setBucketState(bucket->_id, BucketState::kPrepared)); auto prevMemoryUsage = bucket->_memoryUsage; batch->_prepareCommit(); _memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage); bucket->_batches.erase(batch->_lsid); + + return true; } void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info) { invariant(!batch->finished()); invariant(!batch->active()); - auto stats = _getExecutionStats(batch->bucket()->_ns); - BucketAccess bucket(this, batch->bucket()); - invariant(bucket); batch->_finish(info); - bucket->_preparedBatch.reset(); + if (bucket) { + invariant(_setBucketState(bucket->_id, BucketState::kNormal)); + bucket->_preparedBatch.reset(); + } if (info.result.isOK()) { + auto& stats = batch->_stats; stats->numCommits.fetchAndAddRelaxed(1); - if (bucket->_numCommittedMeasurements == 0) { + if (batch->numPreviouslyCommittedMeasurements() == 0) { stats->numBucketInserts.fetchAndAddRelaxed(1); } else { stats->numBucketUpdates.fetchAndAddRelaxed(1); } stats->numMeasurementsCommitted.fetchAndAddRelaxed(batch->measurements().size()); - bucket->_numCommittedMeasurements += batch->measurements().size(); + if (bucket) { + bucket->_numCommittedMeasurements += batch->measurements().size(); + } } - if (bucket->allCommitted()) { + if (bucket && bucket->allCommitted()) { if (bucket->_full) { // Everything in the bucket has been committed, and nothing more will be added since the // bucket is full. Thus, we can remove it. @@ -255,6 +272,10 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& // happened in BucketAccess::rollover, and that there is already a new open bucket for // this metadata. _markBucketNotIdle(ptr, false /* locked */); + { + stdx::lock_guard statesLk{_statesMutex}; + _bucketStates.erase(ptr->_id); + } _allBuckets.erase(ptr); } else { _markBucketIdle(bucket); @@ -265,54 +286,26 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch) { invariant(batch); invariant(!batch->finished()); + invariant(batch->_commitRights.load()); Bucket* bucket = batch->bucket(); - while (true) { - std::vector<std::shared_ptr<WriteBatch>> batchesToWaitOn; - - { - auto lk = _lockExclusive(); - - // Before we access the bucket, make sure it's still there. - if (!_allBuckets.contains(bucket)) { - // Someone else already cleaned up the bucket while we didn't hold the lock. - invariant(batch->finished()); - return; - } - - stdx::unique_lock blk{bucket->_mutex}; - if (bucket->allCommitted()) { - // No uncommitted batches left to abort, go ahead and remove the bucket. - blk.unlock(); - _removeBucket(bucket, false /* expiringBuckets */); - break; - } - - // For any uncommitted batches that we need to abort, see if we already have the rights, - // otherwise try to claim the rights and abort it. If we don't get the rights, then wait - // for the other writer to resolve the batch. - for (const auto& [_, active] : bucket->_batches) { - if (active == batch || active->claimCommitRights()) { - active->_abort(); - } else { - batchesToWaitOn.push_back(active); - } - } - bucket->_batches.clear(); + // Before we access the bucket, make sure it's still there. + auto lk = _lockExclusive(); + if (!_allBuckets.contains(bucket)) { + // Special case, bucket has already been cleared, and we need only abort this batch. + batch->_abort(); + return; + } - if (auto& prepared = bucket->_preparedBatch) { - if (prepared == batch) { - prepared->_abort(); - } else { - batchesToWaitOn.push_back(prepared); - } - prepared.reset(); - } - } + stdx::unique_lock blk{bucket->_mutex}; + _abort(blk, bucket, batch); +} - for (const auto& batchToWaitOn : batchesToWaitOn) { - batchToWaitOn->getResult().getStatus().ignore(); - } +void BucketCatalog::clear(const OID& oid) { + auto result = _setBucketState(oid, BucketState::kCleared); + if (result && *result == BucketState::kPreparedAndCleared) { + hangTimeseriesDirectModificationBeforeWriteConflict.pauseWhileSet(); + throw WriteConflictException(); } } @@ -328,10 +321,10 @@ void BucketCatalog::clear(const NamespaceString& ns) { auto nextIt = std::next(it); const auto& bucket = *it; - _verifyBucketIsUnused(bucket.get()); + stdx::unique_lock blk{bucket->_mutex}; if (shouldClear(bucket->_ns)) { _executionStats.erase(bucket->_ns); - _removeBucket(bucket.get(), false /* expiringBuckets */); + _abort(blk, bucket.get(), nullptr); } it = nextIt; @@ -394,7 +387,9 @@ BucketCatalog::StripedMutex::ExclusiveLock BucketCatalog::_lockExclusive() const void BucketCatalog::_waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch) { while (true) { BucketAccess bucket{this, batch->bucket()}; - invariant(bucket); + if (!bucket) { + return; + } auto current = bucket->_preparedBatch; if (!current) { @@ -416,15 +411,42 @@ bool BucketCatalog::_removeBucket(Bucket* bucket, bool expiringBuckets) { } invariant(bucket->_batches.empty()); + invariant(!bucket->_preparedBatch); _memoryUsage.fetchAndSubtract(bucket->_memoryUsage); _markBucketNotIdle(bucket, expiringBuckets /* locked */); _openBuckets.erase({std::move(bucket->_ns), std::move(bucket->_metadata)}); + { + stdx::lock_guard statesLk{_statesMutex}; + _bucketStates.erase(bucket->_id); + } _allBuckets.erase(it); return true; } +void BucketCatalog::_abort(stdx::unique_lock<Mutex>& lk, + Bucket* bucket, + std::shared_ptr<WriteBatch> batch) { + // For any uncommitted batches that we need to abort, see if we already have the rights, + // otherwise try to claim the rights and abort it. If we don't get the rights, then wait + // for the other writer to resolve the batch. + for (const auto& [_, current] : bucket->_batches) { + current->_abort(); + } + bucket->_batches.clear(); + + if (auto& prepared = bucket->_preparedBatch) { + if (prepared == batch) { + prepared->_abort(); + } + prepared.reset(); + } + + lk.unlock(); + _removeBucket(bucket, true /* bucketIsUnused */); +} + void BucketCatalog::_markBucketIdle(Bucket* bucket) { invariant(bucket); stdx::lock_guard lk{_idleMutex}; @@ -482,6 +504,7 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket( auto [it, inserted] = _allBuckets.insert(std::make_unique<Bucket>()); Bucket* bucket = it->get(); _setIdTimestamp(bucket, time); + _bucketStates.emplace(bucket->_id, BucketState::kNormal); _openBuckets[key] = bucket; if (openedDuetoMetadata) { @@ -518,7 +541,52 @@ const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutio } void BucketCatalog::_setIdTimestamp(Bucket* bucket, const Date_t& time) { + auto oldId = bucket->_id; bucket->_id.setTimestamp(durationCount<Seconds>(time.toDurationSinceEpoch())); + stdx::lock_guard statesLk{_statesMutex}; + _bucketStates.erase(oldId); + _bucketStates.emplace(bucket->_id, BucketState::kNormal); +} + +boost::optional<BucketCatalog::BucketState> BucketCatalog::_setBucketState(const OID& id, + BucketState target) { + stdx::lock_guard statesLk{_statesMutex}; + auto it = _bucketStates.find(id); + if (it == _bucketStates.end()) { + return boost::none; + } + + auto& [_, state] = *it; + switch (target) { + case BucketState::kNormal: { + if (state == BucketState::kPrepared) { + state = BucketState::kNormal; + } else if (state == BucketState::kPreparedAndCleared) { + state = BucketState::kCleared; + } else { + invariant(state != BucketState::kCleared); + } + break; + } + case BucketState::kPrepared: { + invariant(state == BucketState::kNormal); + state = BucketState::kPrepared; + break; + } + case BucketState::kCleared: { + if (state == BucketState::kNormal) { + state = BucketState::kCleared; + } else if (state == BucketState::kPrepared) { + state = BucketState::kPreparedAndCleared; + } + break; + } + case BucketState::kPreparedAndCleared: { + invariant(target != BucketState::kPreparedAndCleared); + } + } + + return state; } BucketCatalog::BucketMetadata::BucketMetadata(BSONObj&& obj, @@ -607,8 +675,8 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, { auto lk = _catalog->_lockShared(); - bool bucketExisted = _findOpenBucketAndLock(hash); - if (bucketExisted) { + auto bucketState = _findOpenBucketAndLock(hash); + if (bucketState == BucketState::kNormal || bucketState == BucketState::kPrepared) { return; } } @@ -620,12 +688,21 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, Bucket* bucket) : _catalog(catalog) { auto lk = _catalog->_lockShared(); - auto it = _catalog->_allBuckets.find(bucket); - if (it == _catalog->_allBuckets.end()) { + auto bucketIt = _catalog->_allBuckets.find(bucket); + if (bucketIt == _catalog->_allBuckets.end()) { return; } + _bucket = bucket; _acquire(); + + stdx::lock_guard statesLk{_catalog->_statesMutex}; + auto statesIt = _catalog->_bucketStates.find(_bucket->_id); + invariant(statesIt != _catalog->_bucketStates.end()); + auto& [_, state] = *statesIt; + if (state == BucketState::kCleared) { + release(); + } } BucketCatalog::BucketAccess::~BucketAccess() { @@ -634,18 +711,27 @@ BucketCatalog::BucketAccess::~BucketAccess() { } } -bool BucketCatalog::BucketAccess::_findOpenBucketAndLock(std::size_t hash) { +BucketCatalog::BucketState BucketCatalog::BucketAccess::_findOpenBucketAndLock(std::size_t hash) { auto it = _catalog->_openBuckets.find(*_key, hash); if (it == _catalog->_openBuckets.end()) { // Bucket does not exist. - return false; + return BucketState::kCleared; } _bucket = it->second; _acquire(); - _catalog->_markBucketNotIdle(_bucket, false /* locked */); - return true; + stdx::lock_guard statesLk{_catalog->_statesMutex}; + auto statesIt = _catalog->_bucketStates.find(_bucket->_id); + invariant(statesIt != _catalog->_bucketStates.end()); + auto& [_, state] = *statesIt; + if (state == BucketState::kCleared || state == BucketState::kPreparedAndCleared) { + release(); + } else { + _catalog->_markBucketNotIdle(_bucket, false /* locked */); + } + + return state; } void BucketCatalog::BucketAccess::_findOrCreateOpenBucketAndLock(std::size_t hash) { @@ -658,7 +744,20 @@ void BucketCatalog::BucketAccess::_findOrCreateOpenBucketAndLock(std::size_t has _bucket = it->second; _acquire(); - _catalog->_markBucketNotIdle(_bucket, false /* locked */); + + { + stdx::lock_guard statesLk{_catalog->_statesMutex}; + auto statesIt = _catalog->_bucketStates.find(_bucket->_id); + invariant(statesIt != _catalog->_bucketStates.end()); + auto& [_, state] = *statesIt; + if (state == BucketState::kNormal || state == BucketState::kPrepared) { + _catalog->_markBucketNotIdle(_bucket, false /* locked */); + return; + } + } + + _catalog->_abort(_guard, _bucket, nullptr); + _create(); } void BucketCatalog::BucketAccess::_acquire() { @@ -985,7 +1084,6 @@ StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() con } BucketCatalog::Bucket* BucketCatalog::WriteBatch::bucket() const { - invariant(!finished()); return _bucket; } @@ -1083,7 +1181,6 @@ void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) { } void BucketCatalog::WriteBatch::_abort() { - invariant(_commitRights.load()); _active = false; _promise.setError({ErrorCodes::TimeseriesBucketCleared, str::stream() << "Time-series bucket " << _bucket->id() << " for " diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index b6fc129313f..66d762d5efe 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -190,9 +190,10 @@ public: /** * Prepares a batch for commit, transitioning it to an inactive state. Caller must already have - * commit rights on batch. + * commit rights on batch. Returns true if the batch was successfully prepared, or false if the + * batch was aborted. */ - void prepareCommit(std::shared_ptr<WriteBatch> batch); + bool prepareCommit(std::shared_ptr<WriteBatch> batch); /** * Records the result of a batch commit. Caller must already have commit rights on batch, and @@ -207,6 +208,12 @@ public: void abort(std::shared_ptr<WriteBatch> batch); /** + * Marks any bucket with the specified OID as cleared and prevents any future inserts from + * landing in that bucket. + */ + void clear(const OID& oid); + + /** * Clears the buckets for the given namespace. */ void clear(const NamespaceString& ns); @@ -461,6 +468,20 @@ private: AtomicWord<long long> numMeasurementsCommitted; }; + enum class BucketState { + // Bucket can be inserted into, and does not have an outstanding prepared commit + kNormal, + // Bucket can be inserted into, and has a prepared commit outstanding. + kPrepared, + // Bucket can no longer be inserted into, does not have an outstanding prepared + // commit. + kCleared, + // Bucket can no longer be inserted into, but still has an outstanding + // prepared commit. Any writer other than the one who prepared the + // commit should receive a WriteConflictException. + kPreparedAndCleared, + }; + /** * Helper class to handle all the locking necessary to lookup and lock a bucket for use. This * is intended primarily for using a single bucket, including replacing it when it becomes full. @@ -500,9 +521,13 @@ private: Date_t getTime() const; private: - // Helper to find and lock an open bucket for the given metadata if it exists. Requires a - // shared lock on the catalog. Returns true if the bucket exists and was locked. - bool _findOpenBucketAndLock(std::size_t hash); + /** + * Helper to find and lock an open bucket for the given metadata if it exists. Requires a + * shared lock on the catalog. Returns the state of the bucket if it is locked and usable. + * In case the bucket does not exist or was previously cleared and thus is not usable, the + * return value will be BucketState::kCleared. + */ + BucketState _findOpenBucketAndLock(std::size_t hash); // Helper to find an open bucket for the given metadata if it exists, create it if it // doesn't, and lock it. Requires an exclusive lock on the catalog. @@ -537,6 +562,12 @@ private: bool _removeBucket(Bucket* bucket, bool expiringBuckets); /** + * Aborts any batches it can for the given bucket, then removes the bucket. If batch is + * non-null, it is assumed that the caller has commit rights for that batch. + */ + void _abort(stdx::unique_lock<Mutex>& lk, Bucket* bucket, std::shared_ptr<WriteBatch> batch); + + /** * Adds the bucket to a list of idle buckets to be expired at a later date */ void _markBucketIdle(Bucket* bucket); @@ -572,6 +603,16 @@ private: void _setIdTimestamp(Bucket* bucket, const Date_t& time); /** + * Changes the bucket state, taking into account the current state, the specified target state, + * and allowed state transitions. The return value, if set, is the final state of the bucket + * with the given id; if no such bucket exists, the return value will not be set. + * + * Ex. For a bucket with state kPrepared, and a target of kCleared, the return will be + * kPreparedAndCleared. + */ + boost::optional<BucketState> _setBucketState(const OID& id, BucketState target); + + /** * You must hold a lock on _bucketMutex when accessing _allBuckets or _openBuckets. * While holding a lock on _bucketMutex, you can take a lock on an individual bucket, then * release _bucketMutex. Any iterators on the protected structures should be considered invalid @@ -595,6 +636,10 @@ private: // The current open bucket for each namespace and metadata pair. stdx::unordered_map<std::tuple<NamespaceString, BucketMetadata>, Bucket*> _openBuckets; + // Bucket state + mutable Mutex _statesMutex = MONGO_MAKE_LATCH("BucketCatalog::_statesMutex"); + stdx::unordered_map<OID, BucketState, OID::Hasher> _bucketStates; + // This mutex protects access to _idleBuckets mutable Mutex _idleMutex = MONGO_MAKE_LATCH("BucketCatalog::_idleMutex"); diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index 5b0f398eebd..f21376f5eb9 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -32,6 +32,7 @@ #include "mongo/db/catalog/catalog_test_fixture.h" #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/timeseries/bucket_catalog.h" #include "mongo/db/views/view_catalog.h" #include "mongo/stdx/future.h" @@ -445,7 +446,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { ASSERT(batch2->newFieldNamesToBeInserted().count("a")) << batch2->toBSON(); } -TEST_F(BucketCatalogTest, AbortBatchWithOutstandingInsertsOnBucket) { +TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { auto batch1 = _bucketCatalog ->insert(_opCtx, _ns1, @@ -470,27 +471,56 @@ TEST_F(BucketCatalogTest, AbortBatchWithOutstandingInsertsOnBucket) { .getValue(); ASSERT_NE(batch1, batch2); - ASSERT_EQ(0, _getNumWaits(_ns1)); - - // Aborting the batch will have to wait for the commit of batch1 to finish, then will proceed - // to abort batch2. ASSERT(batch2->claimCommitRights()); - auto task = Task{[&]() { _bucketCatalog->abort(batch2); }}; - // Add a little extra wait to make sure abort actually gets to the blocking point. - stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10)); - ASSERT(task.future().valid()); - ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1))) - << "clear finished before expected"; + _bucketCatalog->abort(batch2); + ASSERT(batch2->finished()); + ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); _bucketCatalog->finish(batch1, _commitInfo); ASSERT(batch1->finished()); + ASSERT_OK(batch1->getResult().getStatus()); +} - // Now the clear should be able to continue, and will eventually abort batch2. - task.future().wait(); - ASSERT_EQ(1, _getNumWaits(_ns1)); - ASSERT(batch2->finished()); - ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); - ASSERT_EQ(1, _getNumWaits(_ns1)); +TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) { + auto batch = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + .getValue(); + ASSERT(batch->claimCommitRights()); + _bucketCatalog->prepareCommit(batch); + ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); + + ASSERT_THROWS(_bucketCatalog->clear(batch->bucket()->id()), WriteConflictException); + + _bucketCatalog->abort(batch); + ASSERT(batch->finished()); + ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); +} + +TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrows) { + auto batch = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + .getValue(); + ASSERT(batch->claimCommitRights()); + + _bucketCatalog->abort(batch); + ASSERT(batch->finished()); + ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); + + bool prepared = _bucketCatalog->prepareCommit(batch); + ASSERT(!prepared); + ASSERT(batch->finished()); + ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); } TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 0af6c1e6782..468b0dfe8d8 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -277,7 +277,6 @@ void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequ CollectionUpdateArgs args; args.update = updateMod; args.criteria = toUpdateIdDoc; - args.fromMigrate = false; collection->updateDocument(opCtx, recordId, diff --git a/src/mongo/db/views/durable_view_catalog.cpp b/src/mongo/db/views/durable_view_catalog.cpp index e2e2d32dc75..e2c49040dc8 100644 --- a/src/mongo/db/views/durable_view_catalog.cpp +++ b/src/mongo/db/views/durable_view_catalog.cpp @@ -208,7 +208,6 @@ void DurableViewCatalogImpl::upsert(OperationContext* opCtx, CollectionUpdateArgs args; args.update = view; args.criteria = BSON("_id" << name.ns()); - args.fromMigrate = false; const bool assumeIndexesAreAffected = true; systemViews->updateDocument( |