summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2021-04-29 12:41:24 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-29 17:04:20 +0000
commit9caac5e1856eea2c9152a09f966c80b2a5b1c1c0 (patch)
treeb7a0ea286d36efd7dec7d65fc5cd102bcc3f9462
parentbbf49c8019ec253013ad2df96951ab3201872d6a (diff)
downloadmongo-9caac5e1856eea2c9152a09f966c80b2a5b1c1c0.tar.gz
SERVER-56292 Ensure time-series collections properly handle stepdown and stepup
-rw-r--r--jstests/noPassthrough/timeseries_insert_after_cycle_primary.js94
-rw-r--r--src/mongo/db/catalog/SConscript1
-rw-r--r--src/mongo/db/catalog/drop_collection.cpp11
-rw-r--r--src/mongo/db/catalog/drop_database.cpp3
-rw-r--r--src/mongo/db/commands/write_commands.cpp8
-rw-r--r--src/mongo/db/namespace_string.cpp5
-rw-r--r--src/mongo/db/namespace_string.h5
-rw-r--r--src/mongo/db/op_observer_impl.cpp4
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp138
9 files changed, 170 insertions, 99 deletions
diff --git a/jstests/noPassthrough/timeseries_insert_after_cycle_primary.js b/jstests/noPassthrough/timeseries_insert_after_cycle_primary.js
new file mode 100644
index 00000000000..e53053d9832
--- /dev/null
+++ b/jstests/noPassthrough/timeseries_insert_after_cycle_primary.js
@@ -0,0 +1,94 @@
+/**
+ * Tests that time-series inserts are properly handled when a node steps down from primary and then
+ * later steps back up.
+ *
+ * @tags: [
+ * requires_replication,
+ * ]
+ */
+(function() {
+'use strict';
+
+load('jstests/core/timeseries/libs/timeseries.js');
+
+const replTest = new ReplSetTest({nodes: 2});
+replTest.startSet();
+replTest.initiate();
+
+if (!TimeseriesTest.timeseriesCollectionsEnabled(replTest.getPrimary())) {
+ jsTestLog('Skipping test because the time-series collection feature flag is disabled');
+ replTest.stopSet();
+ return;
+}
+
+const dbName = 'test';
+const numColls = 3;
+
+const testDB = function() {
+ return replTest.getPrimary().getDB(dbName);
+};
+
+const coll = function(num) {
+ return testDB()[jsTestName() + '_' + num];
+};
+
+const bucketsColl = function(num) {
+ return testDB()['system.buckets.' + coll(num).getName()];
+};
+
+const timeFieldName = 'time';
+const metaFieldName = 'meta';
+
+const createColl = function(num) {
+ assert.commandWorked(testDB().createCollection(
+ coll(num).getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}}));
+};
+
+for (let i = 0; i < numColls; i++) {
+ createColl(i);
+}
+
+const docs = [
+ {_id: 0, [timeFieldName]: ISODate(), [metaFieldName]: 0},
+ {_id: 1, [timeFieldName]: ISODate(), [metaFieldName]: 0},
+];
+
+for (let i = 0; i < numColls; i++) {
+ assert.commandWorked(coll(i).insert(docs[0]));
+}
+
+replTest.stepUp(replTest.getSecondary());
+
+// Manually update the bucket for collection 1.
+assert.commandWorked(bucketsColl(1).update({}, {$set: {meta: 1}}));
+assert.commandWorked(bucketsColl(1).update({}, {$set: {meta: 0}}));
+
+// Drop, recreate, and reinsert the bucket for collection 2.
+assert(coll(2).drop());
+createColl(2);
+assert.commandWorked(coll(2).insert(docs[0]));
+
+// Step back up the original primary.
+replTest.stepUp(replTest.getSecondary());
+
+for (let i = 0; i < numColls; i++) {
+ assert.commandWorked(coll(i).insert(docs[1]));
+}
+
+const checkColl = function(num, numBuckets) {
+ jsTestLog('Checking collection ' + num);
+ assert.docEq(coll(num).find().sort({_id: 1}).toArray(), docs);
+ const buckets = bucketsColl(num).find().toArray();
+ assert.eq(buckets.length,
+ numBuckets,
+ 'Expected ' + numBuckets + ' bucket(s) but found: ' + tojson(buckets));
+};
+
+// For collection 0, the original bucket should still be usable.
+checkColl(0, 1);
+// For collections 1 and 2, the original bucket should have been closed.
+checkColl(1, 2);
+checkColl(2, 2);
+
+replTest.stopSet();
+})();
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 2a974339402..ca19d9b7770 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -475,7 +475,6 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/repl/local_oplog_info',
- '$BUILD_DIR/mongo/db/timeseries/bucket_catalog',
'database_holder',
],
)
diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp
index 2e68723d39b..b11300a566b 100644
--- a/src/mongo/db/catalog/drop_collection.cpp
+++ b/src/mongo/db/catalog/drop_collection.cpp
@@ -45,7 +45,6 @@
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
-#include "mongo/db/timeseries/bucket_catalog.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/logv2/log.h"
#include "mongo/util/fail_point.h"
@@ -72,8 +71,7 @@ Status _checkNssAndReplState(OperationContext* opCtx, const CollectionPtr& coll)
Status _dropView(OperationContext* opCtx,
Database* db,
const NamespaceString& collectionName,
- DropReply* reply,
- bool clearBucketCatalog = false) {
+ DropReply* reply) {
if (!db) {
Status status = Status(ErrorCodes::NamespaceNotFound, "ns not found");
audit::logDropView(opCtx->getClient(), collectionName, "", {}, status.code());
@@ -125,10 +123,6 @@ Status _dropView(OperationContext* opCtx,
}
wunit.commit();
- if (clearBucketCatalog) {
- BucketCatalog::get(opCtx).clear(collectionName);
- }
-
reply->setNs(collectionName);
return Status::OK();
}
@@ -340,8 +334,7 @@ Status dropCollection(OperationContext* opCtx,
[opCtx, dropView, &collectionName, &reply](Database* db,
const NamespaceString& bucketsNs) {
if (dropView) {
- auto status = _dropView(
- opCtx, db, collectionName, reply, true /* clearBucketCatalog */);
+ auto status = _dropView(opCtx, db, collectionName, reply);
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/db/catalog/drop_database.cpp b/src/mongo/db/catalog/drop_database.cpp
index cbb84773fd6..3a177f3f907 100644
--- a/src/mongo/db/catalog/drop_database.cpp
+++ b/src/mongo/db/catalog/drop_database.cpp
@@ -45,7 +45,6 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
-#include "mongo/db/timeseries/bucket_catalog.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/logv2/log.h"
#include "mongo/util/duration.h"
@@ -115,8 +114,6 @@ void _finishDropDatabase(OperationContext* opCtx,
databaseHolder->dropDb(opCtx, db);
dropPendingGuard.dismiss();
- BucketCatalog::get(opCtx).clear(dbName);
-
LOGV2(20336,
"dropDatabase {dbName} - finished, dropped {numCollections} collection(s)",
"dropDatabase",
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index 08abd5cd743..4a59d0c9030 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -667,13 +667,7 @@ public:
return;
}
- if (batch->numPreviouslyCommittedMeasurements() != 0 &&
- result.getValue().getNModified() == 0) {
- // No document in the buckets collection was found to update, meaning that it was
- // removed.
- docsToRetry->push_back(index);
- return;
- }
+ invariant(result.getValue().getN() == 1 || result.getValue().getNModified() == 1);
getOpTimeAndElectionId(opCtx, opTime, electionId);
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 6f1b7c63622..9c6d9ae93bd 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -328,6 +328,11 @@ NamespaceString NamespaceString::makeTimeseriesBucketsNamespace() const {
return {db(), kTimeseriesBucketsCollectionPrefix.toString() + coll()};
}
+NamespaceString NamespaceString::getTimeseriesViewNamespace() const {
+ invariant(isTimeseriesBucketsCollection());
+ return {db(), coll().substr(kTimeseriesBucketsCollectionPrefix.size())};
+}
+
bool NamespaceString::isReplicated() const {
if (isLocal()) {
return false;
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 7b3a78f1120..219bfc1e55b 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -364,6 +364,11 @@ public:
NamespaceString makeTimeseriesBucketsNamespace() const;
/**
+ * Returns the time-series view namespace for this buckets namespace.
+ */
+ NamespaceString getTimeseriesViewNamespace() const;
+
+ /**
* Returns whether a namespace is replicated, based only on its string value. One notable
* omission is that map reduce `tmp.mr` collections may or may not be replicated. Callers must
* decide how to handle that case separately.
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 103f658d9f4..2fcb858e6b3 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -827,6 +827,8 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string&
if (dbName == NamespaceString::kSessionTransactionsTableNamespace.db()) {
MongoDSessionCatalog::invalidateAllSessions(opCtx);
}
+
+ BucketCatalog::get(opCtx).clear(dbName);
}
repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
@@ -871,6 +873,8 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
MongoDSessionCatalog::invalidateAllSessions(opCtx);
} else if (collectionName == NamespaceString::kConfigSettingsNamespace) {
ReadWriteConcernDefaults::get(opCtx).invalidate();
+ } else if (collectionName.isTimeseriesBucketsCollection()) {
+ BucketCatalog::get(opCtx).clear(collectionName.getTimeseriesViewNamespace());
}
return {};
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 929d8990828..a92256312f2 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -1124,7 +1124,7 @@ WriteResult performDeletes(OperationContext* opCtx,
Status performAtomicTimeseriesWrites(
OperationContext* opCtx,
const std::vector<write_ops::InsertCommandRequest>& insertOps,
- const std::vector<write_ops::UpdateCommandRequest>& updateOps) {
+ const std::vector<write_ops::UpdateCommandRequest>& updateOps) try {
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
invariant(!opCtx->inMultiDocumentTransaction());
invariant(!insertOps.empty() || !updateOps.empty());
@@ -1145,99 +1145,79 @@ Status performAtomicTimeseriesWrites(
assertCanWrite_inlock(opCtx, ns);
- try {
- WriteUnitOfWork wuow{opCtx};
+ WriteUnitOfWork wuow{opCtx};
- std::vector<repl::OpTime> oplogSlots;
- boost::optional<std::vector<repl::OpTime>::iterator> slot;
- if (!repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, ns)) {
- oplogSlots = repl::getNextOpTimes(opCtx, insertOps.size() + updateOps.size());
- slot = oplogSlots.begin();
- }
+ std::vector<repl::OpTime> oplogSlots;
+ boost::optional<std::vector<repl::OpTime>::iterator> slot;
+ if (!repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, ns)) {
+ oplogSlots = repl::getNextOpTimes(opCtx, insertOps.size() + updateOps.size());
+ slot = oplogSlots.begin();
+ }
- std::vector<InsertStatement> inserts;
- inserts.reserve(insertOps.size());
+ std::vector<InsertStatement> inserts;
+ inserts.reserve(insertOps.size());
- for (auto& op : insertOps) {
- invariant(op.getDocuments().size() == 1);
+ for (auto& op : insertOps) {
+ invariant(op.getDocuments().size() == 1);
- inserts.emplace_back(op.getStmtIds() ? *op.getStmtIds()
- : std::vector<StmtId>{kUninitializedStmtId},
- op.getDocuments().front(),
- slot ? *(*slot)++ : OplogSlot{});
- }
+ inserts.emplace_back(op.getStmtIds() ? *op.getStmtIds()
+ : std::vector<StmtId>{kUninitializedStmtId},
+ op.getDocuments().front(),
+ slot ? *(*slot)++ : OplogSlot{});
+ }
- if (!insertOps.empty()) {
- auto status =
- coll->insertDocuments(opCtx, inserts.begin(), inserts.end(), &curOp->debug());
- if (!status.isOK()) {
- return status;
- }
+ if (!insertOps.empty()) {
+ auto status = coll->insertDocuments(opCtx, inserts.begin(), inserts.end(), &curOp->debug());
+ if (!status.isOK()) {
+ return status;
}
+ }
- for (auto& op : updateOps) {
- invariant(op.getUpdates().size() == 1);
- auto& update = op.getUpdates().front();
-
- // TODO (SERVER-56270): Remove handling for non-clustered time-series collections.
- auto recordId = coll->isClustered()
- ? record_id_helpers::keyForOID(update.getQ()["_id"].OID())
- : Helpers::findOne(opCtx, *coll, update.getQ(), false);
- if (recordId.isNull()) {
- return {ErrorCodes::TimeseriesBucketCleared, "Could not find time-series bucket"};
- }
-
- auto record = coll->getCursor(opCtx)->seekExact(recordId);
- if (!record) {
- return {ErrorCodes::TimeseriesBucketCleared, "Could not find time-series bucket"};
- }
-
- auto original = record->data.toBson();
- const bool mustCheckExistenceForInsertOperations =
- static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx));
- auto [updated, indexesAffected] =
- doc_diff::applyDiff(original,
- update.getU().getDiff(),
- &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx),
- mustCheckExistenceForInsertOperations);
-
- CollectionUpdateArgs args;
- if (const auto& stmtIds = op.getStmtIds()) {
- args.stmtIds = *stmtIds;
- }
- args.preImageDoc = original;
- args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff());
- args.criteria = update.getQ();
- args.source = OperationSource::kTimeseries;
- if (slot) {
- args.oplogSlot = *(*slot)++;
- fassert(5481600,
- opCtx->recoveryUnit()->setTimestamp(args.oplogSlot->getTimestamp()));
- }
-
- coll->updateDocument(
- opCtx,
- recordId,
- Snapshotted<BSONObj>{opCtx->recoveryUnit()->getSnapshotId(), std::move(original)},
- updated,
- indexesAffected,
- &curOp->debug(),
- &args);
+ for (auto& op : updateOps) {
+ invariant(op.getUpdates().size() == 1);
+ auto& update = op.getUpdates().front();
+
+ // TODO (SERVER-56270): Remove handling for non-clustered time-series collections.
+ auto recordId = coll->isClustered()
+ ? record_id_helpers::keyForOID(update.getQ()["_id"].OID())
+ : Helpers::findOne(opCtx, *coll, update.getQ(), false);
+
+ auto original = coll->docFor(opCtx, recordId);
+ auto [updated, indexesAffected] =
+ doc_diff::applyDiff(original.value(),
+ update.getU().getDiff(),
+ &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx),
+ static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx)));
+
+ CollectionUpdateArgs args;
+ if (const auto& stmtIds = op.getStmtIds()) {
+ args.stmtIds = *stmtIds;
}
-
- if (MONGO_unlikely(failAtomicTimeseriesWrites.shouldFail())) {
- return {ErrorCodes::FailPointEnabled,
- "Failing time-series writes due to failAtomicTimeseriesWrites fail point"};
+ args.preImageDoc = original.value();
+ args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff());
+ args.criteria = update.getQ();
+ args.source = OperationSource::kTimeseries;
+ if (slot) {
+ args.oplogSlot = *(*slot)++;
+ fassert(5481600, opCtx->recoveryUnit()->setTimestamp(args.oplogSlot->getTimestamp()));
}
- wuow.commit();
- } catch (const DBException& ex) {
- return ex.toStatus();
+ coll->updateDocument(
+ opCtx, recordId, original, updated, indexesAffected, &curOp->debug(), &args);
+ }
+
+ if (MONGO_unlikely(failAtomicTimeseriesWrites.shouldFail())) {
+ return {ErrorCodes::FailPointEnabled,
+ "Failing time-series writes due to failAtomicTimeseriesWrites fail point"};
}
+ wuow.commit();
+
lastOpFixer.finishedOpSuccessfully();
return Status::OK();
+} catch (const DBException& ex) {
+ return ex.toStatus();
}
void recordUpdateResultInOpDebug(const UpdateResult& updateResult, OpDebug* opDebug) {