summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2021-04-01 23:06:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-02 00:01:02 +0000
commitb55f6c86a1a47f28de7815ef942c3c422590dea9 (patch)
tree3ad4664be27f03f0e601d1fb6fd32d4276ca25f6
parentc8e74f9e82a9aab3f549ff77aa539fa6b3ab6b45 (diff)
downloadmongo-b55f6c86a1a47f28de7815ef942c3c422590dea9.tar.gz
SERVER-55060 Direct modification must remove buckets from the time-series bucket catalog
-rw-r--r--jstests/noPassthrough/timeseries_direct_remove.js90
-rw-r--r--jstests/noPassthrough/timeseries_direct_remove_conflict.js95
-rw-r--r--jstests/noPassthrough/timeseries_direct_update.js102
-rw-r--r--jstests/noPassthrough/timeseries_direct_update_conflict.js105
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/catalog/collection.h5
-rw-r--r--src/mongo/db/catalog/collection_operation_source.h46
-rw-r--r--src/mongo/db/commands/write_commands.cpp45
-rw-r--r--src/mongo/db/dbhelpers.cpp8
-rw-r--r--src/mongo/db/exec/update_stage.cpp8
-rw-r--r--src/mongo/db/op_observer_impl.cpp15
-rw-r--r--src/mongo/db/ops/update_request.h21
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp19
-rw-r--r--src/mongo/db/ops/write_ops_exec.h19
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp4
-rw-r--r--src/mongo/db/s/shard_server_op_observer.cpp2
-rw-r--r--src/mongo/db/timeseries/SConscript2
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp233
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h55
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp64
-rw-r--r--src/mongo/db/transaction_participant.cpp1
-rw-r--r--src/mongo/db/views/durable_view_catalog.cpp1
22 files changed, 790 insertions, 151 deletions
diff --git a/jstests/noPassthrough/timeseries_direct_remove.js b/jstests/noPassthrough/timeseries_direct_remove.js
new file mode 100644
index 00000000000..d7c009fe27e
--- /dev/null
+++ b/jstests/noPassthrough/timeseries_direct_remove.js
@@ -0,0 +1,90 @@
+/**
+ * Tests that direct removal in a timeseries bucket collection close the relevant bucket, preventing
+ * further inserts from landing in that bucket.
+ */
+(function() {
+'use strict';
+
+load('jstests/core/timeseries/libs/timeseries.js');
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallel_shell_helpers.js");
+
+const conn = MongoRunner.runMongod();
+
+if (!TimeseriesTest.timeseriesCollectionsEnabled(conn)) {
+ jsTestLog("Skipping test because the time-series collection feature flag is disabled");
+ MongoRunner.stopMongod(conn);
+ return;
+}
+
+const dbName = jsTestName();
+const testDB = conn.getDB(dbName);
+assert.commandWorked(testDB.dropDatabase());
+
+const collName = 'test';
+
+const timeFieldName = 'time';
+const times = [
+ ISODate('2021-01-01T01:00:00Z'),
+ ISODate('2021-01-01T01:10:00Z'),
+ ISODate('2021-01-01T01:20:00Z')
+];
+let docs = [
+ {_id: 0, [timeFieldName]: times[0]},
+ {_id: 1, [timeFieldName]: times[1]},
+ {_id: 2, [timeFieldName]: times[2]}
+];
+
+const coll = testDB.getCollection(collName);
+const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
+coll.drop();
+
+assert.commandWorked(
+ testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
+assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
+
+assert.commandWorked(coll.insert(docs[0]));
+assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 1));
+
+let buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 1);
+assert.eq(buckets[0].control.min[timeFieldName], times[0]);
+assert.eq(buckets[0].control.max[timeFieldName], times[0]);
+
+let removeResult = assert.commandWorked(bucketsColl.remove({_id: buckets[0]._id}));
+assert.eq(removeResult.nRemoved, 1);
+
+buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 0);
+
+assert.commandWorked(coll.insert(docs[1]));
+assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(1, 2));
+
+buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 1);
+assert.eq(buckets[0].control.min[timeFieldName], times[1]);
+assert.eq(buckets[0].control.max[timeFieldName], times[1]);
+
+let fpInsert = configureFailPoint(conn, "hangTimeseriesInsertBeforeCommit");
+let awaitInsert = startParallelShell(
+ funWithArgs(function(dbName, collName, doc) {
+ assert.commandWorked(db.getSiblingDB(dbName).getCollection(collName).insert(doc));
+ }, dbName, coll.getName(), docs[2]), conn.port);
+
+fpInsert.wait();
+
+removeResult = assert.commandWorked(bucketsColl.remove({_id: buckets[0]._id}));
+assert.eq(removeResult.nRemoved, 1);
+
+fpInsert.off();
+awaitInsert();
+
+assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(2, 3));
+
+buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 1);
+assert.eq(buckets[0].control.min[timeFieldName], times[2]);
+assert.eq(buckets[0].control.max[timeFieldName], times[2]);
+
+MongoRunner.stopMongod(conn);
+})();
diff --git a/jstests/noPassthrough/timeseries_direct_remove_conflict.js b/jstests/noPassthrough/timeseries_direct_remove_conflict.js
new file mode 100644
index 00000000000..07af79937c0
--- /dev/null
+++ b/jstests/noPassthrough/timeseries_direct_remove_conflict.js
@@ -0,0 +1,95 @@
+/**
+ * Tests that direct removal in a timeseries bucket collection close the relevant bucket, preventing
+ * further inserts from landing in that bucket, including the case where a concurrent catalog write
+ * causes a write conflict.
+ */
+(function() {
+'use strict';
+
+load('jstests/core/timeseries/libs/timeseries.js');
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallel_shell_helpers.js");
+
+const conn = MongoRunner.runMongod();
+
+if (!TimeseriesTest.timeseriesCollectionsEnabled(conn)) {
+ jsTestLog("Skipping test because the time-series collection feature flag is disabled");
+ MongoRunner.stopMongod(conn);
+ return;
+}
+
+const dbName = jsTestName();
+const testDB = conn.getDB(dbName);
+assert.commandWorked(testDB.dropDatabase());
+
+const collName = 'test';
+
+const timeFieldName = 'time';
+const times = [
+ ISODate('2021-01-01T01:00:00Z'),
+ ISODate('2021-01-01T01:10:00Z'),
+ ISODate('2021-01-01T01:20:00Z')
+];
+let docs = [
+ {_id: 0, [timeFieldName]: times[0]},
+ {_id: 1, [timeFieldName]: times[1]},
+ {_id: 2, [timeFieldName]: times[2]}
+];
+
+const coll = testDB.getCollection(collName);
+const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
+coll.drop();
+
+assert.commandWorked(
+ testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
+assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
+
+assert.commandWorked(coll.insert(docs[0]));
+assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 1));
+
+let buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 1);
+assert.eq(buckets[0].control.min[timeFieldName], times[0]);
+assert.eq(buckets[0].control.max[timeFieldName], times[0]);
+
+const fpInsert = configureFailPoint(conn, "hangTimeseriesInsertBeforeWrite");
+const awaitInsert = startParallelShell(
+ funWithArgs(function(dbName, collName, doc) {
+ assert.commandWorked(db.getSiblingDB(dbName).getCollection(collName).insert(doc));
+ }, dbName, coll.getName(), docs[1]), conn.port);
+fpInsert.wait();
+
+const fpRemove = configureFailPoint(conn, "hangTimeseriesDirectModificationBeforeWriteConflict");
+const awaitRemove = startParallelShell(
+ funWithArgs(function(dbName, collName, id) {
+ const removeResult = assert.commandWorked(
+ db.getSiblingDB(dbName).getCollection('system.buckets.' + collName).remove({_id: id}));
+ assert.eq(removeResult.nRemoved, 1);
+ }, dbName, coll.getName(), buckets[0]._id), conn.port);
+fpRemove.wait();
+
+fpRemove.off();
+fpInsert.off();
+awaitRemove();
+awaitInsert();
+
+// The expected ordering is that the insert finished, then the remove deleted the bucket document,
+// so there should be no documents left.
+
+assert.docEq(coll.find().sort({_id: 1}).toArray().length, 0);
+
+buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 0);
+
+// Now another insert should generate a new bucket.
+
+assert.commandWorked(coll.insert(docs[2]));
+assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(2, 3));
+
+buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 1);
+assert.eq(buckets[0].control.min[timeFieldName], times[2]);
+assert.eq(buckets[0].control.max[timeFieldName], times[2]);
+
+MongoRunner.stopMongod(conn);
+})();
diff --git a/jstests/noPassthrough/timeseries_direct_update.js b/jstests/noPassthrough/timeseries_direct_update.js
new file mode 100644
index 00000000000..f22cad0b28e
--- /dev/null
+++ b/jstests/noPassthrough/timeseries_direct_update.js
@@ -0,0 +1,102 @@
+/**
+ * Tests that direct updates to a timeseries bucket collection close the bucket, preventing further
+ * inserts to land in that bucket.
+ */
+(function() {
+'use strict';
+
+load('jstests/core/timeseries/libs/timeseries.js');
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallel_shell_helpers.js");
+
+const conn = MongoRunner.runMongod();
+
+if (!TimeseriesTest.timeseriesCollectionsEnabled(conn)) {
+ jsTestLog("Skipping test because the time-series collection feature flag is disabled");
+ MongoRunner.stopMongod(conn);
+ return;
+}
+
+const dbName = jsTestName();
+const testDB = conn.getDB(dbName);
+assert.commandWorked(testDB.dropDatabase());
+
+const collName = 'test';
+
+const timeFieldName = 'time';
+const times = [
+ ISODate('2021-01-01T01:00:00Z'),
+ ISODate('2021-01-01T01:10:00Z'),
+ ISODate('2021-01-01T01:20:00Z')
+];
+let docs = [
+ {_id: 0, [timeFieldName]: times[0]},
+ {_id: 1, [timeFieldName]: times[1]},
+ {_id: 2, [timeFieldName]: times[2]}
+];
+
+const coll = testDB.getCollection(collName);
+const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
+coll.drop();
+
+assert.commandWorked(
+ testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
+assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
+
+assert.commandWorked(coll.insert(docs[0]));
+assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 1));
+
+let buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 1);
+assert.eq(buckets[0].control.min[timeFieldName], times[0]);
+assert.eq(buckets[0].control.max[timeFieldName], times[0]);
+
+let modified = buckets[0];
+// This corrupts the bucket, but it's fine here.
+modified.control.max[timeFieldName] = times[1];
+let updateResult = assert.commandWorked(bucketsColl.update({_id: buckets[0]._id}, modified));
+assert.eq(updateResult.nMatched, 1);
+assert.eq(updateResult.nModified, 1);
+
+buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 1);
+assert.eq(buckets[0].control.min[timeFieldName], times[0]);
+assert.eq(buckets[0].control.max[timeFieldName], times[1]);
+
+assert.commandWorked(coll.insert(docs[1]));
+assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 2));
+
+buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 2);
+assert.eq(buckets[1].control.min[timeFieldName], times[1]);
+assert.eq(buckets[1].control.max[timeFieldName], times[1]);
+
+let fpInsert = configureFailPoint(conn, "hangTimeseriesInsertBeforeCommit");
+let awaitInsert = startParallelShell(
+ funWithArgs(function(dbName, collName, doc) {
+ assert.commandWorked(db.getSiblingDB(dbName).getCollection(collName).insert(doc));
+ }, dbName, coll.getName(), docs[2]), conn.port);
+
+fpInsert.wait();
+
+modified = buckets[1];
+// This corrupts the bucket, but it's fine here.
+modified.control.max[timeFieldName] = times[2];
+updateResult = assert.commandWorked(bucketsColl.update({_id: buckets[1]._id}, modified));
+assert.eq(updateResult.nMatched, 1);
+assert.eq(updateResult.nModified, 1);
+
+fpInsert.off();
+awaitInsert();
+
+assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 3));
+
+buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 3);
+assert.eq(buckets[1].control.min[timeFieldName], times[1]);
+assert.eq(buckets[1].control.max[timeFieldName], times[2]);
+assert.eq(buckets[2].control.min[timeFieldName], times[2]);
+assert.eq(buckets[2].control.max[timeFieldName], times[2]);
+
+MongoRunner.stopMongod(conn);
+})();
diff --git a/jstests/noPassthrough/timeseries_direct_update_conflict.js b/jstests/noPassthrough/timeseries_direct_update_conflict.js
new file mode 100644
index 00000000000..da70684a61f
--- /dev/null
+++ b/jstests/noPassthrough/timeseries_direct_update_conflict.js
@@ -0,0 +1,105 @@
+/**
+ * Tests that direct updates to a timeseries bucket collection close the bucket, preventing further
+ * inserts to land in that bucket, including the case where a concurrent catalog write causes
+ * a write conflict.
+ */
+(function() {
+'use strict';
+
+load('jstests/core/timeseries/libs/timeseries.js');
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallel_shell_helpers.js");
+
+const conn = MongoRunner.runMongod();
+
+if (!TimeseriesTest.timeseriesCollectionsEnabled(conn)) {
+ jsTestLog("Skipping test because the time-series collection feature flag is disabled");
+ MongoRunner.stopMongod(conn);
+ return;
+}
+
+const dbName = jsTestName();
+const testDB = conn.getDB(dbName);
+assert.commandWorked(testDB.dropDatabase());
+
+const collName = 'test';
+
+const timeFieldName = 'time';
+const times = [
+ ISODate('2021-01-01T01:00:00Z'),
+ ISODate('2021-01-01T01:10:00Z'),
+ ISODate('2021-01-01T01:20:00Z')
+];
+let docs = [
+ {_id: 0, [timeFieldName]: times[0]},
+ {_id: 1, [timeFieldName]: times[1]},
+ {_id: 2, [timeFieldName]: times[2]}
+];
+
+const coll = testDB.getCollection(collName);
+const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
+coll.drop();
+
+assert.commandWorked(
+ testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
+assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
+
+assert.commandWorked(coll.insert(docs[0]));
+assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 1));
+
+let buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 1);
+assert.eq(buckets[0].control.min[timeFieldName], times[0]);
+assert.eq(buckets[0].control.max[timeFieldName], times[0]);
+
+const fpInsert = configureFailPoint(conn, "hangTimeseriesInsertBeforeWrite");
+const awaitInsert = startParallelShell(
+ funWithArgs(function(dbName, collName, doc) {
+ assert.commandWorked(db.getSiblingDB(dbName).getCollection(collName).insert(doc));
+ }, dbName, coll.getName(), docs[1]), conn.port);
+fpInsert.wait();
+
+const modified = buckets[0];
+// This corrupts the bucket, but it's fine here.
+modified.control.max[timeFieldName] = times[1];
+
+const fpUpdate = configureFailPoint(conn, "hangTimeseriesDirectModificationBeforeWriteConflict");
+const awaitUpdate = startParallelShell(
+ funWithArgs(function(dbName, collName, update) {
+ const updateResult = assert.commandWorked(db.getSiblingDB(dbName)
+ .getCollection('system.buckets.' + collName)
+ .update({_id: update._id}, update));
+ assert.eq(updateResult.nMatched, 1);
+ assert.eq(updateResult.nModified, 1);
+ }, dbName, coll.getName(), modified), conn.port);
+fpUpdate.wait();
+
+fpUpdate.off();
+fpInsert.off();
+awaitUpdate();
+awaitInsert();
+
+// The expected ordering is that the insert finished, then the update overwrote the bucket document,
+// so there should be one document, and a closed flag.
+
+assert.docEq(coll.find().sort({_id: 1}).toArray(), docs.slice(0, 1));
+
+buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 1);
+assert.eq(buckets[0].control.min[timeFieldName], times[0]);
+assert.eq(buckets[0].control.max[timeFieldName], times[1]);
+
+// Now another insert should generate a new bucket.
+
+assert.commandWorked(coll.insert(docs[2]));
+assert.docEq(coll.find().sort({_id: 1}).toArray(), [docs[0], docs[2]]);
+
+buckets = bucketsColl.find().sort({_id: 1}).toArray();
+assert.eq(buckets.length, 2);
+assert.eq(buckets[0].control.min[timeFieldName], times[0]);
+assert.eq(buckets[0].control.max[timeFieldName], times[1]);
+assert.eq(buckets[1].control.min[timeFieldName], times[2]);
+assert.eq(buckets[1].control.max[timeFieldName], times[2]);
+
+MongoRunner.stopMongod(conn);
+})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 8c9833b4b37..00c9a8cf7ac 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -823,6 +823,7 @@ env.Library(
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/catalog/collection_catalog',
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
+ "$BUILD_DIR/mongo/db/timeseries/bucket_catalog",
'$BUILD_DIR/mongo/s/coreshard',
"$BUILD_DIR/mongo/s/grid",
'catalog/collection_options',
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index b5212dea20d..e95215b3832 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -40,6 +40,7 @@
#include "mongo/base/string_data.h"
#include "mongo/bson/mutable/damage_vector.h"
#include "mongo/bson/timestamp.h"
+#include "mongo/db/catalog/collection_operation_source.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/logical_session_id.h"
@@ -88,8 +89,8 @@ struct CollectionUpdateArgs {
// Document containing the _id field of the doc being updated.
BSONObj criteria;
- // True if this update comes from a chunk migration.
- bool fromMigrate = false;
+ // Type of update. See OperationSource definition for more details.
+ OperationSource source = OperationSource::kStandard;
StoreDocOption storeDocOption = StoreDocOption::None;
bool preImageRecordingEnabledForCollection = false;
diff --git a/src/mongo/db/catalog/collection_operation_source.h b/src/mongo/db/catalog/collection_operation_source.h
new file mode 100644
index 00000000000..21875fcfa95
--- /dev/null
+++ b/src/mongo/db/catalog/collection_operation_source.h
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+namespace mongo {
+
+/**
+ * Enum used to differentiate between types of insert/update operations based on how they were
+ * issued.
+ */
+
+enum OperationSource {
+ kStandard, // Default case, use this if none of the others applies.
+ kFromMigrate, // From a chunk migration.
+ kTimeseries // From an internal operation on the BucketCatalog, not a direct operation on
+ // the underlying bucket collection.
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index caf43d24d7f..33a5598bf02 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -77,6 +77,7 @@ namespace {
MONGO_FAIL_POINT_DEFINE(hangWriteBeforeWaitingForMigrationDecision);
MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeCommit);
+MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeWrite);
MONGO_FAIL_POINT_DEFINE(failTimeseriesInsert);
void redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName) {
@@ -563,7 +564,7 @@ public:
write_ops::Insert::parse({"CmdInsert::_performTimeseriesInsert"}, request);
return _getTimeseriesSingleWriteResult(write_ops_exec::performInserts(
- opCtx, timeseriesInsertBatch, write_ops_exec::InsertType::kTimeseries));
+ opCtx, timeseriesInsertBatch, OperationSource::kTimeseries));
}
StatusWith<SingleWriteResult> _performTimeseriesUpdate(
@@ -592,7 +593,7 @@ public:
timeseriesUpdateBatch.setWriteCommandBase(std::move(writeCommandBase));
return _getTimeseriesSingleWriteResult(write_ops_exec::performUpdates(
- opCtx, timeseriesUpdateBatch, write_ops_exec::UpdateType::kTimeseries));
+ opCtx, timeseriesUpdateBatch, OperationSource::kTimeseries));
}
void _commitTimeseriesBucket(OperationContext* opCtx,
@@ -603,11 +604,23 @@ public:
std::vector<BSONObj>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
- std::vector<size_t>* updatesToRetry) const {
+ std::vector<size_t>* docsToRetry) const {
auto& bucketCatalog = BucketCatalog::get(opCtx);
auto metadata = bucketCatalog.getMetadata(batch->bucket());
- bucketCatalog.prepareCommit(batch);
+ bool prepared = bucketCatalog.prepareCommit(batch);
+ if (!prepared) {
+ invariant(batch->finished());
+ invariant(batch->getResult().getStatus() == ErrorCodes::TimeseriesBucketCleared,
+ str::stream()
+ << "Got unexpected error (" << batch->getResult().getStatus()
+ << ") preparing time-series bucket to be committed for " << ns()
+ << ": " << redact(request().toBSON({})));
+ docsToRetry->push_back(index);
+ return;
+ }
+
+ hangTimeseriesInsertBeforeWrite.pauseWhileSet();
auto result = batch->numPreviouslyCommittedMeasurements() == 0
? _performTimeseriesInsert(opCtx, batch, metadata, stmtIds)
@@ -624,7 +637,7 @@ public:
// No document in the buckets collection was found to update, meaning that it was
// removed.
bucketCatalog.abort(batch);
- updatesToRetry->push_back(index);
+ docsToRetry->push_back(index);
return;
}
@@ -715,7 +728,7 @@ public:
hangTimeseriesInsertBeforeCommit.pauseWhileSet();
- std::vector<size_t> updatesToRetry;
+ std::vector<size_t> docsToRetry;
for (auto& [batch, index] : batches) {
bool shouldCommit = batch->claimCommitRights();
@@ -739,7 +752,7 @@ public:
errors,
opTime,
electionId,
- &updatesToRetry);
+ &docsToRetry);
batch.reset();
}
}
@@ -757,7 +770,7 @@ public:
<< ") waiting for time-series bucket to be committed for " << ns()
<< ": " << redact(request().toBSON({})));
- updatesToRetry.push_back(index);
+ docsToRetry.push_back(index);
continue;
}
@@ -774,7 +787,7 @@ public:
}
}
- return updatesToRetry;
+ return docsToRetry;
}
void _performTimeseriesWritesSubset(OperationContext* opCtx,
@@ -784,17 +797,11 @@ public:
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry) const {
- std::vector<size_t> updatesToRetry;
+ std::vector<size_t> docsToRetry;
do {
- updatesToRetry = _performUnorderedTimeseriesWrites(opCtx,
- start,
- numDocs,
- errors,
- opTime,
- electionId,
- containsRetry,
- updatesToRetry);
- } while (!updatesToRetry.empty());
+ docsToRetry = _performUnorderedTimeseriesWrites(
+ opCtx, start, numDocs, errors, opTime, electionId, containsRetry, docsToRetry);
+ } while (!docsToRetry.empty());
}
void _performTimeseriesWrites(OperationContext* opCtx,
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp
index 3fc790e9bec..0d4f4bebeec 100644
--- a/src/mongo/db/dbhelpers.cpp
+++ b/src/mongo/db/dbhelpers.cpp
@@ -249,7 +249,9 @@ UpdateResult Helpers::upsert(OperationContext* opCtx,
request.setQuery(filter);
request.setUpdateModification(write_ops::UpdateModification::parseFromClassicUpdate(updateMod));
request.setUpsert();
- request.setFromMigration(fromMigrate);
+ if (fromMigrate) {
+ request.setSource(OperationSource::kFromMigrate);
+ }
request.setYieldPolicy(PlanYieldPolicy::YieldPolicy::NO_YIELD);
return ::mongo::update(opCtx, context.db(), request);
@@ -268,7 +270,9 @@ void Helpers::update(OperationContext* opCtx,
request.setQuery(filter);
request.setUpdateModification(write_ops::UpdateModification::parseFromClassicUpdate(updateMod));
- request.setFromMigration(fromMigrate);
+ if (fromMigrate) {
+ request.setSource(OperationSource::kFromMigrate);
+ }
request.setYieldPolicy(PlanYieldPolicy::YieldPolicy::NO_YIELD);
::mongo::update(opCtx, context.db(), request);
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp
index 0856b46b5eb..4d400a2ddea 100644
--- a/src/mongo/db/exec/update_stage.cpp
+++ b/src/mongo/db/exec/update_stage.cpp
@@ -259,13 +259,19 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
uassert(16980,
"Multi-update operations require all documents to have an '_id' field",
!request->isMulti() || args.criteria.hasField("_id"_sd));
- args.fromMigrate = request->isFromMigration();
args.storeDocOption = getStoreDocMode(*request);
if (args.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) {
args.preImageDoc = oldObj.value().getOwned();
}
}
+ // Ensure we set the type correctly
+ if (request->isFromMigration()) {
+ args.source = OperationSource::kFromMigrate;
+ } else if (request->isTimeseries()) {
+ args.source = OperationSource::kTimeseries;
+ }
+
if (inPlace) {
if (!request->explain()) {
newObj = oldObj.value();
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 25f598a332a..e542425452e 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -61,6 +61,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/storage/durable_catalog.h"
+#include "mongo/db/timeseries/bucket_catalog.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/db/transaction_participant_gen.h"
#include "mongo/db/views/durable_view_catalog.h"
@@ -201,7 +202,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
oplogEntry.setOpType(repl::OpTypeEnum::kUpdate);
oplogEntry.setObject(args.updateArgs.update);
oplogEntry.setObject2(args.updateArgs.criteria);
- oplogEntry.setFromMigrateIfTrue(args.updateArgs.fromMigrate);
+ oplogEntry.setFromMigrateIfTrue(args.updateArgs.source == OperationSource::kFromMigrate);
// oplogLink could have been changed to include pre/postImageOpTime by the previous no-op write.
repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtIds);
if (args.updateArgs.oplogSlot) {
@@ -595,7 +596,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
}
if (args.nss != NamespaceString::kSessionTransactionsTableNamespace) {
- if (!args.updateArgs.fromMigrate) {
+ if (args.updateArgs.source != OperationSource::kFromMigrate) {
shardObserveUpdateOp(opCtx,
args.nss,
args.updateArgs.preImageDoc,
@@ -618,6 +619,11 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
} else if (args.nss == NamespaceString::kConfigSettingsNamespace) {
ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings(
opCtx, args.updateArgs.updatedDoc["_id"], args.updateArgs.updatedDoc);
+ } else if (args.nss.isTimeseriesBucketsCollection()) {
+ if (args.updateArgs.source != OperationSource::kTimeseries) {
+ auto& bucketCatalog = BucketCatalog::get(opCtx);
+ bucketCatalog.clear(args.updateArgs.updatedDoc["_id"].OID());
+ }
}
}
@@ -634,6 +640,11 @@ void OpObserverImpl::aboutToDelete(OperationContext* opCtx,
destinedRecipientDecoration(opCtx) = op.getDestinedRecipient();
shardObserveAboutToDelete(opCtx, nss, doc);
+
+ if (nss.isTimeseriesBucketsCollection()) {
+ auto& bucketCatalog = BucketCatalog::get(opCtx);
+ bucketCatalog.clear(doc["_id"].OID());
+ }
}
void OpObserverImpl::onDelete(OperationContext* opCtx,
diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h
index 43060673b82..fd7421148f2 100644
--- a/src/mongo/db/ops/update_request.h
+++ b/src/mongo/db/ops/update_request.h
@@ -193,12 +193,20 @@ public:
return _updateOp.getMulti();
}
- void setFromMigration(bool value = true) {
- _fromMigration = value;
+ void setSource(OperationSource source) {
+ _source = source;
+ }
+
+ OperationSource source() const {
+ return _source;
}
bool isFromMigration() const {
- return _fromMigration;
+ return _source == OperationSource::kFromMigrate;
+ }
+
+ bool isTimeseries() const {
+ return _source == OperationSource::kTimeseries;
}
void setFromOplogApplication(bool value = true) {
@@ -286,7 +294,8 @@ public:
builder << " god: " << _god;
builder << " upsert: " << isUpsert();
builder << " multi: " << isMulti();
- builder << " fromMigration: " << _fromMigration;
+ builder << " fromMigration: " << isFromMigration();
+ builder << " timeseries: " << isTimeseries();
builder << " fromOplogApplication: " << _fromOplogApplication;
builder << " isExplain: " << static_cast<bool>(_explain);
return builder.str();
@@ -319,8 +328,8 @@ private:
// updates, never user updates.
bool _god = false;
- // True if this update is on behalf of a chunk migration.
- bool _fromMigration = false;
+ // See Source declaration
+ OperationSource _source = OperationSource::kStandard;
// True if this update was triggered by the application of an oplog entry.
bool _fromOplogApplication = false;
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index bb60434bcd1..b9e3c1e6883 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -36,7 +36,7 @@
#include "mongo/base/checked_cast.h"
#include "mongo/db/audit.h"
#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/collection_operation_source.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
@@ -543,7 +543,7 @@ SingleWriteResult makeWriteResultForInsertOrDeleteRetry() {
WriteResult performInserts(OperationContext* opCtx,
const write_ops::Insert& wholeOp,
- const InsertType& type) {
+ const OperationSource& source) {
// Insert performs its own retries, so we should only be within a WriteUnitOfWork when run in a
// transaction.
auto txnParticipant = TransactionParticipant::get(opCtx);
@@ -615,7 +615,7 @@ WriteResult performInserts(OperationContext* opCtx,
// A time-series insert can combine multiple writes into a single operation, and thus
// can have multiple statement ids associated with it if it is retryable.
- batch.emplace_back(type == InsertType::kTimeseries && wholeOp.getStmtIds()
+ batch.emplace_back(source == OperationSource::kTimeseries && wholeOp.getStmtIds()
? *wholeOp.getStmtIds()
: std::vector<StmtId>{stmtId},
toInsert);
@@ -627,7 +627,7 @@ WriteResult performInserts(OperationContext* opCtx,
}
bool canContinue = insertBatchAndHandleErrors(
- opCtx, wholeOp, batch, &lastOpFixer, &out, type == InsertType::kFromMigrate);
+ opCtx, wholeOp, batch, &lastOpFixer, &out, source == OperationSource::kFromMigrate);
batch.clear(); // We won't need the current batch any more.
bytesInBatch = 0;
@@ -772,7 +772,8 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
const std::vector<StmtId>& stmtIds,
const write_ops::UpdateOpEntry& op,
LegacyRuntimeConstants runtimeConstants,
- const boost::optional<BSONObj>& letParams) {
+ const boost::optional<BSONObj>& letParams,
+ OperationSource source) {
globalOpCounters.gotUpdate();
ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForUpdate(opCtx->getWriteConcern());
auto& curOp = *CurOp::get(opCtx);
@@ -799,6 +800,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
request.setYieldPolicy(opCtx->inMultiDocumentTransaction()
? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY
: PlanYieldPolicy::YieldPolicy::YIELD_AUTO);
+ request.setSource(source);
size_t numAttempts = 0;
while (true) {
@@ -834,7 +836,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
WriteResult performUpdates(OperationContext* opCtx,
const write_ops::Update& wholeOp,
- const UpdateType& type) {
+ const OperationSource& source) {
// Update performs its own retries, so we should not be in a WriteUnitOfWork unless run in a
// transaction.
auto txnParticipant = TransactionParticipant::get(opCtx);
@@ -886,7 +888,7 @@ WriteResult performUpdates(OperationContext* opCtx,
// A time-series insert can combine multiple writes into a single operation, and thus
// can have multiple statement ids associated with it if it is retryable.
- auto stmtIds = type == UpdateType::kTimeseries && wholeOp.getStmtIds()
+ auto stmtIds = source == OperationSource::kTimeseries && wholeOp.getStmtIds()
? *wholeOp.getStmtIds()
: std::vector<StmtId>{stmtId};
@@ -895,7 +897,8 @@ WriteResult performUpdates(OperationContext* opCtx,
stmtIds,
singleOp,
runtimeConstants,
- wholeOp.getLet()));
+ wholeOp.getLet(),
+ source));
lastOpFixer.finishedOpSuccessfully();
} catch (const DBException& ex) {
const bool canContinue = handleError(opCtx,
diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h
index 0b2809251cf..87196b1345e 100644
--- a/src/mongo/db/ops/write_ops_exec.h
+++ b/src/mongo/db/ops/write_ops_exec.h
@@ -33,6 +33,7 @@
#include <vector>
#include "mongo/base/status_with.h"
+#include "mongo/db/catalog/collection_operation_source.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/single_write_result_gen.h"
#include "mongo/db/ops/update_result.h"
@@ -57,20 +58,6 @@ struct WriteResult {
};
/**
- * Enums used to differentiate between types of insert/update operations based on how they were
- * issued.
- */
-enum class InsertType {
- kStandard,
- kFromMigrate, // From a chunk migration.
- kTimeseries,
-};
-enum class UpdateType {
- kStandard,
- kTimeseries,
-};
-
-/**
* Performs a batch of inserts, updates, or deletes.
*
* These functions handle all of the work of doing the writes, including locking, incrementing
@@ -89,10 +76,10 @@ enum class UpdateType {
*/
WriteResult performInserts(OperationContext* opCtx,
const write_ops::Insert& op,
- const InsertType& type = InsertType::kStandard);
+ const OperationSource& source = OperationSource::kStandard);
WriteResult performUpdates(OperationContext* opCtx,
const write_ops::Update& op,
- const UpdateType& type = UpdateType::kStandard);
+ const OperationSource& source = OperationSource::kStandard);
WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& op);
/**
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 80ad3ad1402..63bc8d36d6a 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -1072,8 +1072,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
return toInsert;
}());
- const auto reply = write_ops_exec::performInserts(
- opCtx, insertOp, write_ops_exec::InsertType::kFromMigrate);
+ const auto reply =
+ write_ops_exec::performInserts(opCtx, insertOp, OperationSource::kFromMigrate);
for (unsigned long i = 0; i < reply.results.size(); ++i) {
uassertStatusOKWithContext(
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp
index 9e8bf42a6b5..d358c438e4e 100644
--- a/src/mongo/db/s/shard_server_op_observer.cpp
+++ b/src/mongo/db/s/shard_server_op_observer.cpp
@@ -430,7 +430,7 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
*metadata->getChunkManager(),
args.updateArgs.updatedDoc,
args.updateArgs.updatedDoc.objsize(),
- args.updateArgs.fromMigrate);
+ args.updateArgs.source == OperationSource::kFromMigrate);
}
}
diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript
index c2f1959a441..b1b35582fe2 100644
--- a/src/mongo/db/timeseries/SConscript
+++ b/src/mongo/db/timeseries/SConscript
@@ -23,7 +23,9 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/commands/server_status',
+ '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
'$BUILD_DIR/mongo/db/views/views',
+ '$BUILD_DIR/mongo/util/fail_point',
'timeseries_idl',
],
)
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 17fa905b8d9..a6fe7f89e14 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -35,13 +35,16 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/commands/server_status.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/stdx/thread.h"
+#include "mongo/util/fail_point.h"
namespace mongo {
namespace {
const auto getBucketCatalog = ServiceContext::declareDecoration<BucketCatalog>();
+MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeWriteConflict);
uint8_t numDigits(uint32_t num) {
uint8_t numDigits = 0;
@@ -128,6 +131,7 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
auto time = timeElem.Date();
BucketAccess bucket{this, key, stats.get(), time};
+ invariant(bucket);
StringSet newFieldNamesToBeInserted;
uint32_t newFieldNamesSize = 0;
@@ -202,46 +206,59 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
return batch;
}
-void BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
- invariant(!batch->finished());
+bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
+ if (batch->finished()) {
+ // In this case, someone else aborted the batch behind our back. Oops.
+ return false;
+ }
_waitToCommitBatch(batch);
BucketAccess bucket(this, batch->bucket());
- invariant(bucket);
+ if (!bucket) {
+ abort(batch);
+ return false;
+ }
+
+ invariant(_setBucketState(bucket->_id, BucketState::kPrepared));
auto prevMemoryUsage = bucket->_memoryUsage;
batch->_prepareCommit();
_memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage);
bucket->_batches.erase(batch->_lsid);
+
+ return true;
}
void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info) {
invariant(!batch->finished());
invariant(!batch->active());
- auto stats = _getExecutionStats(batch->bucket()->_ns);
-
BucketAccess bucket(this, batch->bucket());
- invariant(bucket);
batch->_finish(info);
- bucket->_preparedBatch.reset();
+ if (bucket) {
+ invariant(_setBucketState(bucket->_id, BucketState::kNormal));
+ bucket->_preparedBatch.reset();
+ }
if (info.result.isOK()) {
+ auto& stats = batch->_stats;
stats->numCommits.fetchAndAddRelaxed(1);
- if (bucket->_numCommittedMeasurements == 0) {
+ if (batch->numPreviouslyCommittedMeasurements() == 0) {
stats->numBucketInserts.fetchAndAddRelaxed(1);
} else {
stats->numBucketUpdates.fetchAndAddRelaxed(1);
}
stats->numMeasurementsCommitted.fetchAndAddRelaxed(batch->measurements().size());
- bucket->_numCommittedMeasurements += batch->measurements().size();
+ if (bucket) {
+ bucket->_numCommittedMeasurements += batch->measurements().size();
+ }
}
- if (bucket->allCommitted()) {
+ if (bucket && bucket->allCommitted()) {
if (bucket->_full) {
// Everything in the bucket has been committed, and nothing more will be added since the
// bucket is full. Thus, we can remove it.
@@ -255,6 +272,10 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo&
// happened in BucketAccess::rollover, and that there is already a new open bucket for
// this metadata.
_markBucketNotIdle(ptr, false /* locked */);
+ {
+ stdx::lock_guard statesLk{_statesMutex};
+ _bucketStates.erase(ptr->_id);
+ }
_allBuckets.erase(ptr);
} else {
_markBucketIdle(bucket);
@@ -265,54 +286,26 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo&
void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch) {
invariant(batch);
invariant(!batch->finished());
+ invariant(batch->_commitRights.load());
Bucket* bucket = batch->bucket();
- while (true) {
- std::vector<std::shared_ptr<WriteBatch>> batchesToWaitOn;
-
- {
- auto lk = _lockExclusive();
-
- // Before we access the bucket, make sure it's still there.
- if (!_allBuckets.contains(bucket)) {
- // Someone else already cleaned up the bucket while we didn't hold the lock.
- invariant(batch->finished());
- return;
- }
-
- stdx::unique_lock blk{bucket->_mutex};
- if (bucket->allCommitted()) {
- // No uncommitted batches left to abort, go ahead and remove the bucket.
- blk.unlock();
- _removeBucket(bucket, false /* expiringBuckets */);
- break;
- }
-
- // For any uncommitted batches that we need to abort, see if we already have the rights,
- // otherwise try to claim the rights and abort it. If we don't get the rights, then wait
- // for the other writer to resolve the batch.
- for (const auto& [_, active] : bucket->_batches) {
- if (active == batch || active->claimCommitRights()) {
- active->_abort();
- } else {
- batchesToWaitOn.push_back(active);
- }
- }
- bucket->_batches.clear();
+ // Before we access the bucket, make sure it's still there.
+ auto lk = _lockExclusive();
+ if (!_allBuckets.contains(bucket)) {
+ // Special case, bucket has already been cleared, and we need only abort this batch.
+ batch->_abort();
+ return;
+ }
- if (auto& prepared = bucket->_preparedBatch) {
- if (prepared == batch) {
- prepared->_abort();
- } else {
- batchesToWaitOn.push_back(prepared);
- }
- prepared.reset();
- }
- }
+ stdx::unique_lock blk{bucket->_mutex};
+ _abort(blk, bucket, batch);
+}
- for (const auto& batchToWaitOn : batchesToWaitOn) {
- batchToWaitOn->getResult().getStatus().ignore();
- }
+void BucketCatalog::clear(const OID& oid) {
+ auto result = _setBucketState(oid, BucketState::kCleared);
+ if (result && *result == BucketState::kPreparedAndCleared) {
+ hangTimeseriesDirectModificationBeforeWriteConflict.pauseWhileSet();
+ throw WriteConflictException();
}
}
@@ -328,10 +321,10 @@ void BucketCatalog::clear(const NamespaceString& ns) {
auto nextIt = std::next(it);
const auto& bucket = *it;
- _verifyBucketIsUnused(bucket.get());
+ stdx::unique_lock blk{bucket->_mutex};
if (shouldClear(bucket->_ns)) {
_executionStats.erase(bucket->_ns);
- _removeBucket(bucket.get(), false /* expiringBuckets */);
+ _abort(blk, bucket.get(), nullptr);
}
it = nextIt;
@@ -394,7 +387,9 @@ BucketCatalog::StripedMutex::ExclusiveLock BucketCatalog::_lockExclusive() const
void BucketCatalog::_waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch) {
while (true) {
BucketAccess bucket{this, batch->bucket()};
- invariant(bucket);
+ if (!bucket) {
+ return;
+ }
auto current = bucket->_preparedBatch;
if (!current) {
@@ -416,15 +411,42 @@ bool BucketCatalog::_removeBucket(Bucket* bucket, bool expiringBuckets) {
}
invariant(bucket->_batches.empty());
+ invariant(!bucket->_preparedBatch);
_memoryUsage.fetchAndSubtract(bucket->_memoryUsage);
_markBucketNotIdle(bucket, expiringBuckets /* locked */);
_openBuckets.erase({std::move(bucket->_ns), std::move(bucket->_metadata)});
+ {
+ stdx::lock_guard statesLk{_statesMutex};
+ _bucketStates.erase(bucket->_id);
+ }
_allBuckets.erase(it);
return true;
}
+void BucketCatalog::_abort(stdx::unique_lock<Mutex>& lk,
+ Bucket* bucket,
+ std::shared_ptr<WriteBatch> batch) {
+ // For any uncommitted batches that we need to abort, see if we already have the rights,
+ // otherwise try to claim the rights and abort it. If we don't get the rights, then wait
+ // for the other writer to resolve the batch.
+ for (const auto& [_, current] : bucket->_batches) {
+ current->_abort();
+ }
+ bucket->_batches.clear();
+
+ if (auto& prepared = bucket->_preparedBatch) {
+ if (prepared == batch) {
+ prepared->_abort();
+ }
+ prepared.reset();
+ }
+
+ lk.unlock();
+ _removeBucket(bucket, true /* bucketIsUnused */);
+}
+
void BucketCatalog::_markBucketIdle(Bucket* bucket) {
invariant(bucket);
stdx::lock_guard lk{_idleMutex};
@@ -482,6 +504,7 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(
auto [it, inserted] = _allBuckets.insert(std::make_unique<Bucket>());
Bucket* bucket = it->get();
_setIdTimestamp(bucket, time);
+ _bucketStates.emplace(bucket->_id, BucketState::kNormal);
_openBuckets[key] = bucket;
if (openedDuetoMetadata) {
@@ -518,7 +541,52 @@ const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutio
}
void BucketCatalog::_setIdTimestamp(Bucket* bucket, const Date_t& time) {
+ auto oldId = bucket->_id;
bucket->_id.setTimestamp(durationCount<Seconds>(time.toDurationSinceEpoch()));
+ stdx::lock_guard statesLk{_statesMutex};
+ _bucketStates.erase(oldId);
+ _bucketStates.emplace(bucket->_id, BucketState::kNormal);
+}
+
+boost::optional<BucketCatalog::BucketState> BucketCatalog::_setBucketState(const OID& id,
+ BucketState target) {
+ stdx::lock_guard statesLk{_statesMutex};
+ auto it = _bucketStates.find(id);
+ if (it == _bucketStates.end()) {
+ return boost::none;
+ }
+
+ auto& [_, state] = *it;
+ switch (target) {
+ case BucketState::kNormal: {
+ if (state == BucketState::kPrepared) {
+ state = BucketState::kNormal;
+ } else if (state == BucketState::kPreparedAndCleared) {
+ state = BucketState::kCleared;
+ } else {
+ invariant(state != BucketState::kCleared);
+ }
+ break;
+ }
+ case BucketState::kPrepared: {
+ invariant(state == BucketState::kNormal);
+ state = BucketState::kPrepared;
+ break;
+ }
+ case BucketState::kCleared: {
+ if (state == BucketState::kNormal) {
+ state = BucketState::kCleared;
+ } else if (state == BucketState::kPrepared) {
+ state = BucketState::kPreparedAndCleared;
+ }
+ break;
+ }
+ case BucketState::kPreparedAndCleared: {
+ invariant(target != BucketState::kPreparedAndCleared);
+ }
+ }
+
+ return state;
}
BucketCatalog::BucketMetadata::BucketMetadata(BSONObj&& obj,
@@ -607,8 +675,8 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
{
auto lk = _catalog->_lockShared();
- bool bucketExisted = _findOpenBucketAndLock(hash);
- if (bucketExisted) {
+ auto bucketState = _findOpenBucketAndLock(hash);
+ if (bucketState == BucketState::kNormal || bucketState == BucketState::kPrepared) {
return;
}
}
@@ -620,12 +688,21 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, Bucket* bucket)
: _catalog(catalog) {
auto lk = _catalog->_lockShared();
- auto it = _catalog->_allBuckets.find(bucket);
- if (it == _catalog->_allBuckets.end()) {
+ auto bucketIt = _catalog->_allBuckets.find(bucket);
+ if (bucketIt == _catalog->_allBuckets.end()) {
return;
}
+
_bucket = bucket;
_acquire();
+
+ stdx::lock_guard statesLk{_catalog->_statesMutex};
+ auto statesIt = _catalog->_bucketStates.find(_bucket->_id);
+ invariant(statesIt != _catalog->_bucketStates.end());
+ auto& [_, state] = *statesIt;
+ if (state == BucketState::kCleared) {
+ release();
+ }
}
BucketCatalog::BucketAccess::~BucketAccess() {
@@ -634,18 +711,27 @@ BucketCatalog::BucketAccess::~BucketAccess() {
}
}
-bool BucketCatalog::BucketAccess::_findOpenBucketAndLock(std::size_t hash) {
+BucketCatalog::BucketState BucketCatalog::BucketAccess::_findOpenBucketAndLock(std::size_t hash) {
auto it = _catalog->_openBuckets.find(*_key, hash);
if (it == _catalog->_openBuckets.end()) {
// Bucket does not exist.
- return false;
+ return BucketState::kCleared;
}
_bucket = it->second;
_acquire();
- _catalog->_markBucketNotIdle(_bucket, false /* locked */);
- return true;
+ stdx::lock_guard statesLk{_catalog->_statesMutex};
+ auto statesIt = _catalog->_bucketStates.find(_bucket->_id);
+ invariant(statesIt != _catalog->_bucketStates.end());
+ auto& [_, state] = *statesIt;
+ if (state == BucketState::kCleared || state == BucketState::kPreparedAndCleared) {
+ release();
+ } else {
+ _catalog->_markBucketNotIdle(_bucket, false /* locked */);
+ }
+
+ return state;
}
void BucketCatalog::BucketAccess::_findOrCreateOpenBucketAndLock(std::size_t hash) {
@@ -658,7 +744,20 @@ void BucketCatalog::BucketAccess::_findOrCreateOpenBucketAndLock(std::size_t has
_bucket = it->second;
_acquire();
- _catalog->_markBucketNotIdle(_bucket, false /* locked */);
+
+ {
+ stdx::lock_guard statesLk{_catalog->_statesMutex};
+ auto statesIt = _catalog->_bucketStates.find(_bucket->_id);
+ invariant(statesIt != _catalog->_bucketStates.end());
+ auto& [_, state] = *statesIt;
+ if (state == BucketState::kNormal || state == BucketState::kPrepared) {
+ _catalog->_markBucketNotIdle(_bucket, false /* locked */);
+ return;
+ }
+ }
+
+ _catalog->_abort(_guard, _bucket, nullptr);
+ _create();
}
void BucketCatalog::BucketAccess::_acquire() {
@@ -985,7 +1084,6 @@ StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() con
}
BucketCatalog::Bucket* BucketCatalog::WriteBatch::bucket() const {
- invariant(!finished());
return _bucket;
}
@@ -1083,7 +1181,6 @@ void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) {
}
void BucketCatalog::WriteBatch::_abort() {
- invariant(_commitRights.load());
_active = false;
_promise.setError({ErrorCodes::TimeseriesBucketCleared,
str::stream() << "Time-series bucket " << _bucket->id() << " for "
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index b6fc129313f..66d762d5efe 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -190,9 +190,10 @@ public:
/**
* Prepares a batch for commit, transitioning it to an inactive state. Caller must already have
- * commit rights on batch.
+ * commit rights on batch. Returns true if the batch was successfully prepared, or false if the
+ * batch was aborted.
*/
- void prepareCommit(std::shared_ptr<WriteBatch> batch);
+ bool prepareCommit(std::shared_ptr<WriteBatch> batch);
/**
* Records the result of a batch commit. Caller must already have commit rights on batch, and
@@ -207,6 +208,12 @@ public:
void abort(std::shared_ptr<WriteBatch> batch);
/**
+ * Marks any bucket with the specified OID as cleared and prevents any future inserts from
+ * landing in that bucket.
+ */
+ void clear(const OID& oid);
+
+ /**
* Clears the buckets for the given namespace.
*/
void clear(const NamespaceString& ns);
@@ -461,6 +468,20 @@ private:
AtomicWord<long long> numMeasurementsCommitted;
};
+ enum class BucketState {
+ // Bucket can be inserted into, and does not have an outstanding prepared commit
+ kNormal,
+ // Bucket can be inserted into, and has a prepared commit outstanding.
+ kPrepared,
+ // Bucket can no longer be inserted into, does not have an outstanding prepared
+ // commit.
+ kCleared,
+ // Bucket can no longer be inserted into, but still has an outstanding
+ // prepared commit. Any writer other than the one who prepared the
+ // commit should receive a WriteConflictException.
+ kPreparedAndCleared,
+ };
+
/**
* Helper class to handle all the locking necessary to lookup and lock a bucket for use. This
* is intended primarily for using a single bucket, including replacing it when it becomes full.
@@ -500,9 +521,13 @@ private:
Date_t getTime() const;
private:
- // Helper to find and lock an open bucket for the given metadata if it exists. Requires a
- // shared lock on the catalog. Returns true if the bucket exists and was locked.
- bool _findOpenBucketAndLock(std::size_t hash);
+ /**
+ * Helper to find and lock an open bucket for the given metadata if it exists. Requires a
+ * shared lock on the catalog. Returns the state of the bucket if it is locked and usable.
+ * In case the bucket does not exist or was previously cleared and thus is not usable, the
+ * return value will be BucketState::kCleared.
+ */
+ BucketState _findOpenBucketAndLock(std::size_t hash);
// Helper to find an open bucket for the given metadata if it exists, create it if it
// doesn't, and lock it. Requires an exclusive lock on the catalog.
@@ -537,6 +562,12 @@ private:
bool _removeBucket(Bucket* bucket, bool expiringBuckets);
/**
+ * Aborts any batches it can for the given bucket, then removes the bucket. If batch is
+ * non-null, it is assumed that the caller has commit rights for that batch.
+ */
+ void _abort(stdx::unique_lock<Mutex>& lk, Bucket* bucket, std::shared_ptr<WriteBatch> batch);
+
+ /**
* Adds the bucket to a list of idle buckets to be expired at a later date
*/
void _markBucketIdle(Bucket* bucket);
@@ -572,6 +603,16 @@ private:
void _setIdTimestamp(Bucket* bucket, const Date_t& time);
/**
+ * Changes the bucket state, taking into account the current state, the specified target state,
+ * and allowed state transitions. The return value, if set, is the final state of the bucket
+ * with the given id; if no such bucket exists, the return value will not be set.
+ *
+ * Ex. For a bucket with state kPrepared, and a target of kCleared, the return will be
+ * kPreparedAndCleared.
+ */
+ boost::optional<BucketState> _setBucketState(const OID& id, BucketState target);
+
+ /**
* You must hold a lock on _bucketMutex when accessing _allBuckets or _openBuckets.
* While holding a lock on _bucketMutex, you can take a lock on an individual bucket, then
* release _bucketMutex. Any iterators on the protected structures should be considered invalid
@@ -595,6 +636,10 @@ private:
// The current open bucket for each namespace and metadata pair.
stdx::unordered_map<std::tuple<NamespaceString, BucketMetadata>, Bucket*> _openBuckets;
+ // Bucket state
+ mutable Mutex _statesMutex = MONGO_MAKE_LATCH("BucketCatalog::_statesMutex");
+ stdx::unordered_map<OID, BucketState, OID::Hasher> _bucketStates;
+
// This mutex protects access to _idleBuckets
mutable Mutex _idleMutex = MONGO_MAKE_LATCH("BucketCatalog::_idleMutex");
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index 5b0f398eebd..f21376f5eb9 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/catalog/catalog_test_fixture.h"
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/timeseries/bucket_catalog.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/stdx/future.h"
@@ -445,7 +446,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
ASSERT(batch2->newFieldNamesToBeInserted().count("a")) << batch2->toBSON();
}
-TEST_F(BucketCatalogTest, AbortBatchWithOutstandingInsertsOnBucket) {
+TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
auto batch1 = _bucketCatalog
->insert(_opCtx,
_ns1,
@@ -470,27 +471,56 @@ TEST_F(BucketCatalogTest, AbortBatchWithOutstandingInsertsOnBucket) {
.getValue();
ASSERT_NE(batch1, batch2);
- ASSERT_EQ(0, _getNumWaits(_ns1));
-
- // Aborting the batch will have to wait for the commit of batch1 to finish, then will proceed
- // to abort batch2.
ASSERT(batch2->claimCommitRights());
- auto task = Task{[&]() { _bucketCatalog->abort(batch2); }};
- // Add a little extra wait to make sure abort actually gets to the blocking point.
- stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10));
- ASSERT(task.future().valid());
- ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1)))
- << "clear finished before expected";
+ _bucketCatalog->abort(batch2);
+ ASSERT(batch2->finished());
+ ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
_bucketCatalog->finish(batch1, _commitInfo);
ASSERT(batch1->finished());
+ ASSERT_OK(batch1->getResult().getStatus());
+}
- // Now the clear should be able to continue, and will eventually abort batch2.
- task.future().wait();
- ASSERT_EQ(1, _getNumWaits(_ns1));
- ASSERT(batch2->finished());
- ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
- ASSERT_EQ(1, _getNumWaits(_ns1));
+TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) {
+ auto batch = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
+ .getValue();
+ ASSERT(batch->claimCommitRights());
+ _bucketCatalog->prepareCommit(batch);
+ ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
+
+ ASSERT_THROWS(_bucketCatalog->clear(batch->bucket()->id()), WriteConflictException);
+
+ _bucketCatalog->abort(batch);
+ ASSERT(batch->finished());
+ ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
+}
+
+TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrows) {
+ auto batch = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
+ .getValue();
+ ASSERT(batch->claimCommitRights());
+
+ _bucketCatalog->abort(batch);
+ ASSERT(batch->finished());
+ ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
+
+ bool prepared = _bucketCatalog->prepareCommit(batch);
+ ASSERT(!prepared);
+ ASSERT(batch->finished());
+ ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
}
TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 0af6c1e6782..468b0dfe8d8 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -277,7 +277,6 @@ void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequ
CollectionUpdateArgs args;
args.update = updateMod;
args.criteria = toUpdateIdDoc;
- args.fromMigrate = false;
collection->updateDocument(opCtx,
recordId,
diff --git a/src/mongo/db/views/durable_view_catalog.cpp b/src/mongo/db/views/durable_view_catalog.cpp
index e2e2d32dc75..e2c49040dc8 100644
--- a/src/mongo/db/views/durable_view_catalog.cpp
+++ b/src/mongo/db/views/durable_view_catalog.cpp
@@ -208,7 +208,6 @@ void DurableViewCatalogImpl::upsert(OperationContext* opCtx,
CollectionUpdateArgs args;
args.update = view;
args.criteria = BSON("_id" << name.ns());
- args.fromMigrate = false;
const bool assumeIndexesAreAffected = true;
systemViews->updateDocument(