summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenis Grebennicov <denis.grebennicov@mongodb.com>2022-09-13 13:26:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-13 13:57:05 +0000
commitd26ae542584d0e4d92f1d9906e49b6f5a24d260f (patch)
tree72f1854eb8bfe571b3527154d148c9d27d9a4137
parentfa94f5fb6216a1cc1e23f5ad4df05295b380070e (diff)
downloadmongo-d26ae542584d0e4d92f1d9906e49b6f5a24d260f.tar.gz
SERVER-66125 Verify that FTDC collects change collection purging job stats
-rw-r--r--jstests/serverless/change_collection_server_stats.js85
-rw-r--r--src/mongo/db/change_collection_expired_documents_remover.cpp24
2 files changed, 100 insertions, 9 deletions
diff --git a/jstests/serverless/change_collection_server_stats.js b/jstests/serverless/change_collection_server_stats.js
new file mode 100644
index 00000000000..a5992259d78
--- /dev/null
+++ b/jstests/serverless/change_collection_server_stats.js
@@ -0,0 +1,85 @@
+/**
+ * Tests that FTDC collects information about the change collection, including its purging job.
+ * @tags: [ requires_fcv_62 ]
+ */
+(function() {
+'use strict';
+
+load("jstests/libs/fail_point_util.js");
+load('jstests/libs/ftdc.js');
+
+const kExpiredChangeRemovalJobSleepSeconds = 5;
+const kExpireAfterSeconds = 1;
+
+const rst = new ReplSetTest({nodes: 1});
+rst.startSet({
+ setParameter: {
+ featureFlagServerlessChangeStreams: 1,
+ changeCollectionRemoverJobSleepSeconds: kExpiredChangeRemovalJobSleepSeconds,
+ }
+});
+rst.initiate();
+
+const primary = rst.getPrimary();
+const adminDb = primary.getDB('admin');
+const testDb = primary.getDB(jsTestName());
+const changeCollection = primary.getDB("config").system.change_collection;
+
+// Enable change streams to ensure the creation of change collections if run in serverless mode.
+assert.commandWorked(adminDb.runCommand({setChangeStreamState: 1, enabled: true}));
+assert.soon(() => {
+ // Ensure that server status diagnostics is collecting change collection statistics.
+ const serverStatusDiagnostics = verifyGetDiagnosticData(adminDb).serverStatus;
+ return serverStatusDiagnostics.hasOwnProperty('changeCollections') &&
+ serverStatusDiagnostics.changeCollections.hasOwnProperty('purgingJob');
+});
+const diagnosticsBeforeTestCollInsertions =
+ verifyGetDiagnosticData(adminDb).serverStatus.changeCollections.purgingJob;
+
+// Create collection and insert sample data.
+assert.commandWorked(testDb.createCollection("testColl"));
+const numberOfDocuments = 1000;
+for (let i = 0; i < numberOfDocuments; i++) {
+ assert.commandWorked(testDb.testColl.insert({x: i}));
+}
+const wallTimeOfTheFirstOplogEntry =
+ new NumberLong(changeCollection.find().sort({wall: 1}).limit(1).next().wall.getTime());
+const estimatedToBeRemovedDocsSize = changeCollection.find()
+ .sort({wall: -1})
+ .skip(1)
+ .toArray()
+ .map(doc => Object.bsonsize(doc))
+ .reduce((acc, size) => acc + size, 0);
+assert.gt(estimatedToBeRemovedDocsSize, 0);
+
+// Set the 'expireAfterSeconds' to 'kExpireAfterSeconds'.
+assert.commandWorked(adminDb.runCommand(
+ {setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}}));
+
+// Ensure purging job deletes the expired oplog entries about insertion into test collection.
+assert.soon(() => {
+ // All change collection entries are removed but one.
+ return changeCollection.count() === 1;
+});
+
+// Ensure that FTDC collected the purging job information of the change collection.
+assert.soon(() => {
+ const diagnosticsAfterTestCollInsertions =
+ verifyGetDiagnosticData(adminDb).serverStatus.changeCollections.purgingJob;
+
+ return diagnosticsAfterTestCollInsertions.totalPass >
+ diagnosticsBeforeTestCollInsertions.totalPass &&
+ diagnosticsAfterTestCollInsertions.scannedCollections >
+ diagnosticsBeforeTestCollInsertions.scannedCollections &&
+ diagnosticsAfterTestCollInsertions.bytesDeleted >
+ diagnosticsBeforeTestCollInsertions.bytesDeleted + estimatedToBeRemovedDocsSize &&
+ diagnosticsAfterTestCollInsertions.docsDeleted >
+ diagnosticsBeforeTestCollInsertions.docsDeleted + numberOfDocuments - 1 &&
+ diagnosticsAfterTestCollInsertions.maxStartWallTimeMillis.tojson() >=
+ wallTimeOfTheFirstOplogEntry.tojson() &&
+ diagnosticsAfterTestCollInsertions.timeElapsedMillis >
+ diagnosticsBeforeTestCollInsertions.timeElapsedMillis;
+});
+
+rst.stopSet();
+}());
diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp
index 5eae8abdf76..7c6a9b62eae 100644
--- a/src/mongo/db/change_collection_expired_documents_remover.cpp
+++ b/src/mongo/db/change_collection_expired_documents_remover.cpp
@@ -68,6 +68,13 @@ boost::optional<int64_t> getExpireAfterSeconds(boost::optional<TenantId> tid) {
}
void removeExpiredDocuments(Client* client) {
+ // TODO SERVER-66717 Remove this logic from this method. Due to the delay in the feature flag
+ // activation it was placed here. The remover job should ultimately be initialized at the mongod
+ // startup when launched in serverless mode.
+ if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ return;
+ }
+
hangBeforeRemovingExpiredChanges.pauseWhileSet();
try {
@@ -123,7 +130,14 @@ void removeExpiredDocuments(Client* client) {
maxStartWallTime =
std::max(maxStartWallTime, purgingJobMetadata->firstDocWallTimeMillis);
}
- changeCollectionManager.getPurgingJobStats().maxStartWallTimeMillis.store(maxStartWallTime);
+
+ // The purging job metadata will be 'boost::none' if none of the change collections have
+ // more than one oplog entry, as such the 'maxStartWallTimeMillis' will be zero. Avoid
+ // reporting 0 as 'maxStartWallTimeMillis'.
+ if (maxStartWallTime > 0) {
+ changeCollectionManager.getPurgingJobStats().maxStartWallTimeMillis.store(
+ maxStartWallTime);
+ }
const auto jobDurationMillis = clock->now() - currentWallTime;
if (removedCount > 0) {
@@ -187,19 +201,11 @@ private:
} // namespace
void startChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) {
- if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
- return;
- }
-
LOGV2(6663507, "Starting the ChangeCollectionExpiredChangeRemover");
ChangeCollectionExpiredDocumentsRemover::start(serviceContext);
}
void shutdownChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) {
- if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
- return;
- }
-
LOGV2(6663508, "Shutting down the ChangeCollectionExpiredChangeRemover");
ChangeCollectionExpiredDocumentsRemover::shutdown(serviceContext);
}