From 9caac5e1856eea2c9152a09f966c80b2a5b1c1c0 Mon Sep 17 00:00:00 2001 From: Gregory Noma Date: Thu, 29 Apr 2021 12:41:24 -0400 Subject: SERVER-56292 Ensure time-series collections properly handle stepdown and stepup --- .../timeseries_insert_after_cycle_primary.js | 94 ++++++++++++++ src/mongo/db/catalog/SConscript | 1 - src/mongo/db/catalog/drop_collection.cpp | 11 +- src/mongo/db/catalog/drop_database.cpp | 3 - src/mongo/db/commands/write_commands.cpp | 8 +- src/mongo/db/namespace_string.cpp | 5 + src/mongo/db/namespace_string.h | 5 + src/mongo/db/op_observer_impl.cpp | 4 + src/mongo/db/ops/write_ops_exec.cpp | 138 +++++++++------------ 9 files changed, 170 insertions(+), 99 deletions(-) create mode 100644 jstests/noPassthrough/timeseries_insert_after_cycle_primary.js diff --git a/jstests/noPassthrough/timeseries_insert_after_cycle_primary.js b/jstests/noPassthrough/timeseries_insert_after_cycle_primary.js new file mode 100644 index 00000000000..e53053d9832 --- /dev/null +++ b/jstests/noPassthrough/timeseries_insert_after_cycle_primary.js @@ -0,0 +1,94 @@ +/** + * Tests that time-series inserts are properly handled when a node steps down from primary and then + * later steps back up. + * + * @tags: [ + * requires_replication, + * ] + */ +(function() { +'use strict'; + +load('jstests/core/timeseries/libs/timeseries.js'); + +const replTest = new ReplSetTest({nodes: 2}); +replTest.startSet(); +replTest.initiate(); + +if (!TimeseriesTest.timeseriesCollectionsEnabled(replTest.getPrimary())) { + jsTestLog('Skipping test because the time-series collection feature flag is disabled'); + replTest.stopSet(); + return; +} + +const dbName = 'test'; +const numColls = 3; + +const testDB = function() { + return replTest.getPrimary().getDB(dbName); +}; + +const coll = function(num) { + return testDB()[jsTestName() + '_' + num]; +}; + +const bucketsColl = function(num) { + return testDB()['system.buckets.' + coll(num).getName()]; +}; + +const timeFieldName = 'time'; +const metaFieldName = 'meta'; + +const createColl = function(num) { + assert.commandWorked(testDB().createCollection( + coll(num).getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}})); +}; + +for (let i = 0; i < numColls; i++) { + createColl(i); +} + +const docs = [ + {_id: 0, [timeFieldName]: ISODate(), [metaFieldName]: 0}, + {_id: 1, [timeFieldName]: ISODate(), [metaFieldName]: 0}, +]; + +for (let i = 0; i < numColls; i++) { + assert.commandWorked(coll(i).insert(docs[0])); +} + +replTest.stepUp(replTest.getSecondary()); + +// Manually update the bucket for collection 1. +assert.commandWorked(bucketsColl(1).update({}, {$set: {meta: 1}})); +assert.commandWorked(bucketsColl(1).update({}, {$set: {meta: 0}})); + +// Drop, recreate, and reinsert the bucket for collection 2. +assert(coll(2).drop()); +createColl(2); +assert.commandWorked(coll(2).insert(docs[0])); + +// Step back up the original primary. +replTest.stepUp(replTest.getSecondary()); + +for (let i = 0; i < numColls; i++) { + assert.commandWorked(coll(i).insert(docs[1])); +} + +const checkColl = function(num, numBuckets) { + jsTestLog('Checking collection ' + num); + assert.docEq(coll(num).find().sort({_id: 1}).toArray(), docs); + const buckets = bucketsColl(num).find().toArray(); + assert.eq(buckets.length, + numBuckets, + 'Expected ' + numBuckets + ' bucket(s) but found: ' + tojson(buckets)); +}; + +// For collection 0, the original bucket should still be usable. +checkColl(0, 1); +// For collections 1 and 2, the original bucket should have been closed. +checkColl(1, 2); +checkColl(2, 2); + +replTest.stopSet(); +})(); diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index 2a974339402..ca19d9b7770 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -475,7 +475,6 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/repl/local_oplog_info', - '$BUILD_DIR/mongo/db/timeseries/bucket_catalog', 'database_holder', ], ) diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp index 2e68723d39b..b11300a566b 100644 --- a/src/mongo/db/catalog/drop_collection.cpp +++ b/src/mongo/db/catalog/drop_collection.cpp @@ -45,7 +45,6 @@ #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" -#include "mongo/db/timeseries/bucket_catalog.h" #include "mongo/db/views/view_catalog.h" #include "mongo/logv2/log.h" #include "mongo/util/fail_point.h" @@ -72,8 +71,7 @@ Status _checkNssAndReplState(OperationContext* opCtx, const CollectionPtr& coll) Status _dropView(OperationContext* opCtx, Database* db, const NamespaceString& collectionName, - DropReply* reply, - bool clearBucketCatalog = false) { + DropReply* reply) { if (!db) { Status status = Status(ErrorCodes::NamespaceNotFound, "ns not found"); audit::logDropView(opCtx->getClient(), collectionName, "", {}, status.code()); @@ -125,10 +123,6 @@ Status _dropView(OperationContext* opCtx, } wunit.commit(); - if (clearBucketCatalog) { - BucketCatalog::get(opCtx).clear(collectionName); - } - reply->setNs(collectionName); return Status::OK(); } @@ -340,8 +334,7 @@ Status dropCollection(OperationContext* opCtx, [opCtx, dropView, &collectionName, &reply](Database* db, const NamespaceString& bucketsNs) { if (dropView) { - auto status = _dropView( - opCtx, db, collectionName, reply, true /* clearBucketCatalog */); + auto status = _dropView(opCtx, db, collectionName, reply); if (!status.isOK()) { return status; } diff --git a/src/mongo/db/catalog/drop_database.cpp b/src/mongo/db/catalog/drop_database.cpp index cbb84773fd6..3a177f3f907 100644 --- a/src/mongo/db/catalog/drop_database.cpp +++ b/src/mongo/db/catalog/drop_database.cpp @@ -45,7 +45,6 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" -#include "mongo/db/timeseries/bucket_catalog.h" #include "mongo/db/write_concern_options.h" #include "mongo/logv2/log.h" #include "mongo/util/duration.h" @@ -115,8 +114,6 @@ void _finishDropDatabase(OperationContext* opCtx, databaseHolder->dropDb(opCtx, db); dropPendingGuard.dismiss(); - BucketCatalog::get(opCtx).clear(dbName); - LOGV2(20336, "dropDatabase {dbName} - finished, dropped {numCollections} collection(s)", "dropDatabase", diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 08abd5cd743..4a59d0c9030 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -667,13 +667,7 @@ public: return; } - if (batch->numPreviouslyCommittedMeasurements() != 0 && - result.getValue().getNModified() == 0) { - // No document in the buckets collection was found to update, meaning that it was - // removed. - docsToRetry->push_back(index); - return; - } + invariant(result.getValue().getN() == 1 || result.getValue().getNModified() == 1); getOpTimeAndElectionId(opCtx, opTime, electionId); diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 6f1b7c63622..9c6d9ae93bd 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -328,6 +328,11 @@ NamespaceString NamespaceString::makeTimeseriesBucketsNamespace() const { return {db(), kTimeseriesBucketsCollectionPrefix.toString() + coll()}; } +NamespaceString NamespaceString::getTimeseriesViewNamespace() const { + invariant(isTimeseriesBucketsCollection()); + return {db(), coll().substr(kTimeseriesBucketsCollectionPrefix.size())}; +} + bool NamespaceString::isReplicated() const { if (isLocal()) { return false; diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 7b3a78f1120..219bfc1e55b 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -363,6 +363,11 @@ public: */ NamespaceString makeTimeseriesBucketsNamespace() const; + /** + * Returns the time-series view namespace for this buckets namespace. + */ + NamespaceString getTimeseriesViewNamespace() const; + /** * Returns whether a namespace is replicated, based only on its string value. One notable * omission is that map reduce `tmp.mr` collections may or may not be replicated. Callers must diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 103f658d9f4..2fcb858e6b3 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -827,6 +827,8 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& if (dbName == NamespaceString::kSessionTransactionsTableNamespace.db()) { MongoDSessionCatalog::invalidateAllSessions(opCtx); } + + BucketCatalog::get(opCtx).clear(dbName); } repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, @@ -871,6 +873,8 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, MongoDSessionCatalog::invalidateAllSessions(opCtx); } else if (collectionName == NamespaceString::kConfigSettingsNamespace) { ReadWriteConcernDefaults::get(opCtx).invalidate(); + } else if (collectionName.isTimeseriesBucketsCollection()) { + BucketCatalog::get(opCtx).clear(collectionName.getTimeseriesViewNamespace()); } return {}; diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 929d8990828..a92256312f2 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -1124,7 +1124,7 @@ WriteResult performDeletes(OperationContext* opCtx, Status performAtomicTimeseriesWrites( OperationContext* opCtx, const std::vector& insertOps, - const std::vector& updateOps) { + const std::vector& updateOps) try { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); invariant(!opCtx->inMultiDocumentTransaction()); invariant(!insertOps.empty() || !updateOps.empty()); @@ -1145,99 +1145,79 @@ Status performAtomicTimeseriesWrites( assertCanWrite_inlock(opCtx, ns); - try { - WriteUnitOfWork wuow{opCtx}; + WriteUnitOfWork wuow{opCtx}; - std::vector oplogSlots; - boost::optional::iterator> slot; - if (!repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, ns)) { - oplogSlots = repl::getNextOpTimes(opCtx, insertOps.size() + updateOps.size()); - slot = oplogSlots.begin(); - } + std::vector oplogSlots; + boost::optional::iterator> slot; + if (!repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, ns)) { + oplogSlots = repl::getNextOpTimes(opCtx, insertOps.size() + updateOps.size()); + slot = oplogSlots.begin(); + } - std::vector inserts; - inserts.reserve(insertOps.size()); + std::vector inserts; + inserts.reserve(insertOps.size()); - for (auto& op : insertOps) { - invariant(op.getDocuments().size() == 1); + for (auto& op : insertOps) { + invariant(op.getDocuments().size() == 1); - inserts.emplace_back(op.getStmtIds() ? *op.getStmtIds() - : std::vector{kUninitializedStmtId}, - op.getDocuments().front(), - slot ? *(*slot)++ : OplogSlot{}); - } + inserts.emplace_back(op.getStmtIds() ? *op.getStmtIds() + : std::vector{kUninitializedStmtId}, + op.getDocuments().front(), + slot ? *(*slot)++ : OplogSlot{}); + } - if (!insertOps.empty()) { - auto status = - coll->insertDocuments(opCtx, inserts.begin(), inserts.end(), &curOp->debug()); - if (!status.isOK()) { - return status; - } + if (!insertOps.empty()) { + auto status = coll->insertDocuments(opCtx, inserts.begin(), inserts.end(), &curOp->debug()); + if (!status.isOK()) { + return status; } + } - for (auto& op : updateOps) { - invariant(op.getUpdates().size() == 1); - auto& update = op.getUpdates().front(); - - // TODO (SERVER-56270): Remove handling for non-clustered time-series collections. - auto recordId = coll->isClustered() - ? record_id_helpers::keyForOID(update.getQ()["_id"].OID()) - : Helpers::findOne(opCtx, *coll, update.getQ(), false); - if (recordId.isNull()) { - return {ErrorCodes::TimeseriesBucketCleared, "Could not find time-series bucket"}; - } - - auto record = coll->getCursor(opCtx)->seekExact(recordId); - if (!record) { - return {ErrorCodes::TimeseriesBucketCleared, "Could not find time-series bucket"}; - } - - auto original = record->data.toBson(); - const bool mustCheckExistenceForInsertOperations = - static_cast(repl::tenantMigrationRecipientInfo(opCtx)); - auto [updated, indexesAffected] = - doc_diff::applyDiff(original, - update.getU().getDiff(), - &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx), - mustCheckExistenceForInsertOperations); - - CollectionUpdateArgs args; - if (const auto& stmtIds = op.getStmtIds()) { - args.stmtIds = *stmtIds; - } - args.preImageDoc = original; - args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff()); - args.criteria = update.getQ(); - args.source = OperationSource::kTimeseries; - if (slot) { - args.oplogSlot = *(*slot)++; - fassert(5481600, - opCtx->recoveryUnit()->setTimestamp(args.oplogSlot->getTimestamp())); - } - - coll->updateDocument( - opCtx, - recordId, - Snapshotted{opCtx->recoveryUnit()->getSnapshotId(), std::move(original)}, - updated, - indexesAffected, - &curOp->debug(), - &args); + for (auto& op : updateOps) { + invariant(op.getUpdates().size() == 1); + auto& update = op.getUpdates().front(); + + // TODO (SERVER-56270): Remove handling for non-clustered time-series collections. + auto recordId = coll->isClustered() + ? record_id_helpers::keyForOID(update.getQ()["_id"].OID()) + : Helpers::findOne(opCtx, *coll, update.getQ(), false); + + auto original = coll->docFor(opCtx, recordId); + auto [updated, indexesAffected] = + doc_diff::applyDiff(original.value(), + update.getU().getDiff(), + &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx), + static_cast(repl::tenantMigrationRecipientInfo(opCtx))); + + CollectionUpdateArgs args; + if (const auto& stmtIds = op.getStmtIds()) { + args.stmtIds = *stmtIds; } - - if (MONGO_unlikely(failAtomicTimeseriesWrites.shouldFail())) { - return {ErrorCodes::FailPointEnabled, - "Failing time-series writes due to failAtomicTimeseriesWrites fail point"}; + args.preImageDoc = original.value(); + args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff()); + args.criteria = update.getQ(); + args.source = OperationSource::kTimeseries; + if (slot) { + args.oplogSlot = *(*slot)++; + fassert(5481600, opCtx->recoveryUnit()->setTimestamp(args.oplogSlot->getTimestamp())); } - wuow.commit(); - } catch (const DBException& ex) { - return ex.toStatus(); + coll->updateDocument( + opCtx, recordId, original, updated, indexesAffected, &curOp->debug(), &args); + } + + if (MONGO_unlikely(failAtomicTimeseriesWrites.shouldFail())) { + return {ErrorCodes::FailPointEnabled, + "Failing time-series writes due to failAtomicTimeseriesWrites fail point"}; } + wuow.commit(); + lastOpFixer.finishedOpSuccessfully(); return Status::OK(); +} catch (const DBException& ex) { + return ex.toStatus(); } void recordUpdateResultInOpDebug(const UpdateResult& updateResult, OpDebug* opDebug) { -- cgit v1.2.1