diff options
author | Arun Banala <arun.banala@mongodb.com> | 2021-04-14 19:36:11 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-26 20:49:14 +0000 |
commit | b5e656c92a92a4d6086a10dad0c1aad6e922fdc3 (patch) | |
tree | 3d845f2959b2598441a34b6951dfcc16eb0dab13 | |
parent | 6a6229335d7325836d1d5812caa1eade17a95e34 (diff) | |
download | mongo-b5e656c92a92a4d6086a10dad0c1aad6e922fdc3.tar.gz |
SERVER-55672 Fix oplog timestamp logic in SBE
47 files changed, 203 insertions, 202 deletions
diff --git a/jstests/noPassthrough/change_stream_resume_before_add_shard.js b/jstests/noPassthrough/change_stream_resume_before_add_shard.js index ba813102fca..7a8a78fbbcb 100644 --- a/jstests/noPassthrough/change_stream_resume_before_add_shard.js +++ b/jstests/noPassthrough/change_stream_resume_before_add_shard.js @@ -3,7 +3,6 @@ * the cluster. Exercises the fix for SERVER-42232. * @tags: [ * requires_sharding, - * sbe_incompatible, * uses_change_streams, * ] */ diff --git a/jstests/noPassthrough/change_stream_sharded_startafter_invalidate.js b/jstests/noPassthrough/change_stream_sharded_startafter_invalidate.js index a15ab37bce4..73ca9cbcdb3 100644 --- a/jstests/noPassthrough/change_stream_sharded_startafter_invalidate.js +++ b/jstests/noPassthrough/change_stream_sharded_startafter_invalidate.js @@ -4,7 +4,6 @@ // bug described in SERVER-41196. // @tags: [ // requires_sharding, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/noPassthrough/change_streams_collation_chunk_migration.js b/jstests/noPassthrough/change_streams_collation_chunk_migration.js index cb555d381ac..3386ba91480 100644 --- a/jstests/noPassthrough/change_streams_collation_chunk_migration.js +++ b/jstests/noPassthrough/change_streams_collation_chunk_migration.js @@ -4,7 +4,6 @@ * @tags: [ * requires_journaling, * requires_replication, - * sbe_incompatible, * ] */ (function() { diff --git a/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js b/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js index b4cb69facdf..0832545786f 100644 --- a/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js +++ b/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js @@ -6,7 +6,6 @@ * requires_journaling, * requires_majority_read_concern, * requires_replication, - * sbe_incompatible, * ] */ (function() { diff --git a/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js index dff3544eb7a..371b12137fe 100644 --- a/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js +++ b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js @@ -4,7 +4,6 @@ * for the bug described in SERVER-40094. * @tags: [ * requires_sharding, - * sbe_incompatible, * uses_change_streams, * ] */ diff --git a/jstests/noPassthrough/change_streams_resume_token_applyops_overlap.js b/jstests/noPassthrough/change_streams_resume_token_applyops_overlap.js index 744b312a528..4f0be761570 100644 --- a/jstests/noPassthrough/change_streams_resume_token_applyops_overlap.js +++ b/jstests/noPassthrough/change_streams_resume_token_applyops_overlap.js @@ -4,7 +4,6 @@ * bug described in SERVER-40094. * @tags: [ * requires_sharding, - * sbe_incompatible, * uses_multi_shard_transaction, * uses_transactions, * ] diff --git a/jstests/noPassthrough/change_streams_shell_helper_resume_token.js b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js index 029cd9b25ca..830bb0fd1fe 100644 --- a/jstests/noPassthrough/change_streams_shell_helper_resume_token.js +++ b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js @@ -5,7 +5,6 @@ * @tags: [ * requires_journaling, * requires_majority_read_concern, - * sbe_incompatible, * ] */ (function() { diff --git a/jstests/replsets/resume_after_against_oplog.js b/jstests/replsets/resume_after_against_oplog.js index c9e55dc3ad2..6db643f2a1a 100644 --- a/jstests/replsets/resume_after_against_oplog.js +++ b/jstests/replsets/resume_after_against_oplog.js @@ -4,7 +4,6 @@ * * @tags: [ * requires_fcv_47, - * sbe_incompatible, * ] */ @@ -55,7 +54,7 @@ jsTestLog("Running initial query on the oplog"); // Assert resume token is non-null. const resumeToken1 = assertExpectedResumeTokenFormat(res); - assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1); + assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1, res); // Kill the cursor before attempting to resume. assert.commandWorked(localDb.runCommand({killCursors: "oplog.rs", cursors: [res.cursor.id]})); @@ -88,7 +87,7 @@ jsTestLog("Running initial query on the oplog"); assert.eq(res2.cursor.firstBatch[0].o._id, 1, res); const resumeToken2 = assertExpectedResumeTokenFormat(res2); - assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1); + assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1, res2); const res3 = assert.commandWorked(localDb.runCommand({ find: "oplog.rs", @@ -102,7 +101,7 @@ jsTestLog("Running initial query on the oplog"); assert.eq(res3.cursor.firstBatch[0].o._id, 2, res); const resumeToken3 = assertExpectedResumeTokenFormat(res3); - assert.eq(timestampCmp(resumeToken3.ts, resumeToken2.ts), 1); + assert.eq(timestampCmp(resumeToken3.ts, resumeToken2.ts), 1, res3); } // --------------------------------------------------------------------------------------- jsTestLog("Running initial tailable query on the oplog"); @@ -122,7 +121,7 @@ jsTestLog("Running initial tailable query on the oplog"); // Resume token should be non-null. const resumeToken1 = assertExpectedResumeTokenFormat(res); - assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1); + assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1, res); const cursorId = res.cursor.id; @@ -136,7 +135,7 @@ jsTestLog("Running initial tailable query on the oplog"); // Resume token should be greater than the find command's. const resumeToken2 = assertExpectedResumeTokenFormat(resGetMore1); - assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1); + assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1, resGetMore1); jsTest.log( "Ensure that postBatchResumeToken attribute is returned for getMore command with no results"); diff --git a/jstests/sharding/change_stream_chunk_migration.js b/jstests/sharding/change_stream_chunk_migration.js index b9f1da2d9fd..0c4b77a2522 100644 --- a/jstests/sharding/change_stream_chunk_migration.js +++ b/jstests/sharding/change_stream_chunk_migration.js @@ -2,7 +2,6 @@ // it's migrating a chunk to a new shard. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_stream_empty_apply_ops.js b/jstests/sharding/change_stream_empty_apply_ops.js index 80789d16058..836bf9fad69 100644 --- a/jstests/sharding/change_stream_empty_apply_ops.js +++ b/jstests/sharding/change_stream_empty_apply_ops.js @@ -4,7 +4,6 @@ // SERVER-50769. // @tags: [ // requires_sharding, -// sbe_incompatible, // uses_change_streams, // uses_multi_shard_transaction, // uses_transactions, diff --git a/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js b/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js index 89fdf85c4ee..7102cfc0078 100644 --- a/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js +++ b/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js @@ -5,7 +5,6 @@ // shards. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_stream_error_label.js b/jstests/sharding/change_stream_error_label.js index 1dded1dd0ec..c393b6254bc 100644 --- a/jstests/sharding/change_stream_error_label.js +++ b/jstests/sharding/change_stream_error_label.js @@ -4,7 +4,6 @@ * @tags: [ * requires_find_command, * requires_sharding, - * sbe_incompatible, * uses_change_streams, * ] */ diff --git a/jstests/sharding/change_stream_lookup_single_shard_cluster.js b/jstests/sharding/change_stream_lookup_single_shard_cluster.js index 957b3d91b04..3d8117c619e 100644 --- a/jstests/sharding/change_stream_lookup_single_shard_cluster.js +++ b/jstests/sharding/change_stream_lookup_single_shard_cluster.js @@ -3,7 +3,6 @@ // sharded collection. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_stream_metadata_notifications.js b/jstests/sharding/change_stream_metadata_notifications.js index 0413c96f36a..d5fa409b97e 100644 --- a/jstests/sharding/change_stream_metadata_notifications.js +++ b/jstests/sharding/change_stream_metadata_notifications.js @@ -3,7 +3,6 @@ // @tags: [ // requires_find_command, // requires_majority_read_concern, -// sbe_incompatible, // ] (function() { "use strict"; diff --git a/jstests/sharding/change_stream_read_preference.js b/jstests/sharding/change_stream_read_preference.js index feaa8263179..752f1726c2e 100644 --- a/jstests/sharding/change_stream_read_preference.js +++ b/jstests/sharding/change_stream_read_preference.js @@ -2,7 +2,6 @@ // user. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_stream_resume_from_different_mongos.js b/jstests/sharding/change_stream_resume_from_different_mongos.js index 6bff49ffb46..27fca91e6b7 100644 --- a/jstests/sharding/change_stream_resume_from_different_mongos.js +++ b/jstests/sharding/change_stream_resume_from_different_mongos.js @@ -1,7 +1,6 @@ // Test resuming a change stream on a mongos other than the one the change stream was started on. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_stream_shard_failover.js b/jstests/sharding/change_stream_shard_failover.js index ad12a627681..eadc821dec9 100644 --- a/jstests/sharding/change_stream_shard_failover.js +++ b/jstests/sharding/change_stream_shard_failover.js @@ -3,7 +3,6 @@ * by triggering a stepdown. * @tags: [ * requires_majority_read_concern, - * sbe_incompatible, * uses_change_streams, * ] */ diff --git a/jstests/sharding/change_stream_transaction_sharded.js b/jstests/sharding/change_stream_transaction_sharded.js index 44cc029f0d2..96e15459ff1 100644 --- a/jstests/sharding/change_stream_transaction_sharded.js +++ b/jstests/sharding/change_stream_transaction_sharded.js @@ -1,7 +1,6 @@ // Confirms that change streams only see committed operations for sharded transactions. // @tags: [ // requires_sharding, -// sbe_incompatible, // uses_change_streams, // uses_multi_shard_transaction, // uses_transactions, diff --git a/jstests/sharding/change_stream_update_lookup_collation.js b/jstests/sharding/change_stream_update_lookup_collation.js index ac9242c707d..be07efa032b 100644 --- a/jstests/sharding/change_stream_update_lookup_collation.js +++ b/jstests/sharding/change_stream_update_lookup_collation.js @@ -5,7 +5,6 @@ // @tags: [ // requires_find_command, // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_stream_update_lookup_read_concern.js b/jstests/sharding/change_stream_update_lookup_read_concern.js index a793a4728f2..7cb8f5c667e 100644 --- a/jstests/sharding/change_stream_update_lookup_read_concern.js +++ b/jstests/sharding/change_stream_update_lookup_read_concern.js @@ -3,7 +3,6 @@ // change that we're doing the lookup for, and that change will be majority-committed. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_streams/lookup_change_stream_post_image_compound_shard_key.js b/jstests/sharding/change_streams/lookup_change_stream_post_image_compound_shard_key.js index 55b610147df..97fb61631f3 100644 --- a/jstests/sharding/change_streams/lookup_change_stream_post_image_compound_shard_key.js +++ b/jstests/sharding/change_streams/lookup_change_stream_post_image_compound_shard_key.js @@ -2,7 +2,6 @@ // sharded with a compound shard key. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_streams/lookup_change_stream_post_image_hashed_shard_key.js b/jstests/sharding/change_streams/lookup_change_stream_post_image_hashed_shard_key.js index 6a7dba0b6c8..e945fcc3af8 100644 --- a/jstests/sharding/change_streams/lookup_change_stream_post_image_hashed_shard_key.js +++ b/jstests/sharding/change_streams/lookup_change_stream_post_image_hashed_shard_key.js @@ -2,7 +2,6 @@ // sharded with a hashed shard key. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_streams/lookup_change_stream_post_image_id_shard_key.js b/jstests/sharding/change_streams/lookup_change_stream_post_image_id_shard_key.js index 6848e225672..f577ff932e3 100644 --- a/jstests/sharding/change_streams/lookup_change_stream_post_image_id_shard_key.js +++ b/jstests/sharding/change_streams/lookup_change_stream_post_image_id_shard_key.js @@ -2,7 +2,6 @@ // sharded with a key which is just the "_id" field. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_streams/resume_change_stream.js b/jstests/sharding/change_streams/resume_change_stream.js index f6824a344f7..441c9e43a83 100644 --- a/jstests/sharding/change_streams/resume_change_stream.js +++ b/jstests/sharding/change_streams/resume_change_stream.js @@ -3,7 +3,6 @@ // @tags: [ // requires_find_command, // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_streams/resume_change_stream_from_stale_mongos.js b/jstests/sharding/change_streams/resume_change_stream_from_stale_mongos.js index 2a0f48e9fff..75c8796aea8 100644 --- a/jstests/sharding/change_streams/resume_change_stream_from_stale_mongos.js +++ b/jstests/sharding/change_streams/resume_change_stream_from_stale_mongos.js @@ -3,7 +3,6 @@ // a stale shard version. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_streams/resume_change_stream_on_subset_of_shards.js b/jstests/sharding/change_streams/resume_change_stream_on_subset_of_shards.js index beaefb9f520..64783cb2a2e 100644 --- a/jstests/sharding/change_streams/resume_change_stream_on_subset_of_shards.js +++ b/jstests/sharding/change_streams/resume_change_stream_on_subset_of_shards.js @@ -2,7 +2,6 @@ // the collection. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_streams_delete_in_txn_produces_correct_doc_key.js b/jstests/sharding/change_streams_delete_in_txn_produces_correct_doc_key.js index c5ce06fa9f8..634b8b39d1e 100644 --- a/jstests/sharding/change_streams_delete_in_txn_produces_correct_doc_key.js +++ b/jstests/sharding/change_streams_delete_in_txn_produces_correct_doc_key.js @@ -2,7 +2,6 @@ // but only the shard key and _id in the 'documentKey' field. Exercises the fix for SERVER-45987. // @tags: [ // multiversion_incompatible, -// sbe_incompatible, // uses_transactions, // ] diff --git a/jstests/sharding/change_streams_new_shard_new_database.js b/jstests/sharding/change_streams_new_shard_new_database.js index 5057266d1ce..e4debdf534a 100644 --- a/jstests/sharding/change_streams_new_shard_new_database.js +++ b/jstests/sharding/change_streams_new_shard_new_database.js @@ -8,7 +8,6 @@ * @tags: [ * requires_find_command, * requires_sharding, - * sbe_incompatible, * uses_change_streams, * ] */ diff --git a/jstests/sharding/change_streams_primary_shard_unaware.js b/jstests/sharding/change_streams_primary_shard_unaware.js index a7b32e5fde8..0b233163384 100644 --- a/jstests/sharding/change_streams_primary_shard_unaware.js +++ b/jstests/sharding/change_streams_primary_shard_unaware.js @@ -7,7 +7,6 @@ // blacklist_from_rhel_67_s390x, // requires_majority_read_concern, // requires_persistence, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_streams_shards_start_in_sync.js b/jstests/sharding/change_streams_shards_start_in_sync.js index ba9947b2afa..627a633c176 100644 --- a/jstests/sharding/change_streams_shards_start_in_sync.js +++ b/jstests/sharding/change_streams_shards_start_in_sync.js @@ -7,7 +7,6 @@ // and 'B' will be seen in the changestream before 'C'. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js index 0b86b72e5ce..75e8f2a4388 100644 --- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js +++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js @@ -5,7 +5,6 @@ // sharded. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/jstests/sharding/change_streams_unsharded_update_resume.js b/jstests/sharding/change_streams_unsharded_update_resume.js index ed44e8f1ffb..96a8a0230da 100644 --- a/jstests/sharding/change_streams_unsharded_update_resume.js +++ b/jstests/sharding/change_streams_unsharded_update_resume.js @@ -2,7 +2,6 @@ * Tests that the post-image of an update which occurred while the collection was unsharded can * still be looked up after the collection becomes sharded. Exercises the fix for SERVER-44484. * @tags: [ - * sbe_incompatible, * uses_change_streams, * ] */ diff --git a/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js b/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js index bba669f0f67..39dd6a7417c 100644 --- a/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js +++ b/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js @@ -3,7 +3,6 @@ * TooManyMatchingDocuments after fixing SERVER-44598. * * @tags: [ - * sbe_incompatible, * uses_change_streams, * ] */ diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js index b087ce478f9..bd1bfe64f21 100644 --- a/jstests/sharding/change_streams_whole_db.js +++ b/jstests/sharding/change_streams_whole_db.js @@ -1,7 +1,6 @@ // Tests the behavior of a change stream on a whole database in a sharded cluster. // @tags: [ // requires_majority_read_concern, -// sbe_incompatible, // uses_change_streams, // ] (function() { diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp index bf50e71ce79..b0b67aea8d9 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.cpp +++ b/src/mongo/db/exec/sbe/expressions/expression.cpp @@ -864,7 +864,7 @@ void RuntimeEnvironment::resetSlot(value::SlotId slot, uasserted(4946300, str::stream() << "undefined slot accessor:" << slot); } -value::SlotAccessor* RuntimeEnvironment::getAccessor(value::SlotId slot) { +RuntimeEnvironment::Accessor* RuntimeEnvironment::getAccessor(value::SlotId slot) { if (auto it = _accessors.find(slot); it != _accessors.end()) { return &it->second; } diff --git a/src/mongo/db/exec/sbe/expressions/expression.h b/src/mongo/db/exec/sbe/expressions/expression.h index f52acef32ca..a8f24c578e7 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.h +++ b/src/mongo/db/exec/sbe/expressions/expression.h @@ -70,6 +70,39 @@ public: RuntimeEnvironment& operator=(const RuntimeEnvironment&&) = delete; ~RuntimeEnvironment(); + class Accessor final : public value::SlotAccessor { + public: + Accessor(RuntimeEnvironment* env, size_t index) : _env{env}, _index{index} {} + + std::pair<value::TypeTags, value::Value> getViewOfValue() const override { + return {_env->_state->typeTags[_index], _env->_state->vals[_index]}; + } + + std::pair<value::TypeTags, value::Value> copyOrMoveValue() override { + // Always make a copy. + return copyValue(_env->_state->typeTags[_index], _env->_state->vals[_index]); + } + + void reset(bool owned, value::TypeTags tag, value::Value val) { + release(); + + _env->_state->typeTags[_index] = tag; + _env->_state->vals[_index] = val; + _env->_state->owned[_index] = owned; + } + + private: + void release() { + if (_env->_state->owned[_index]) { + releaseValue(_env->_state->typeTags[_index], _env->_state->vals[_index]); + _env->_state->owned[_index] = false; + } + } + + RuntimeEnvironment* const _env; + const size_t _index; + }; + /** * Registers and returns a SlotId for the given slot 'name'. The 'slotIdGenerator' is used * to generate a new SlotId for the given slot 'name', which is then registered with this @@ -112,7 +145,7 @@ public: * * A user exception is raised if the SlotId is not registered within this environment. */ - value::SlotAccessor* getAccessor(value::SlotId slot); + Accessor* getAccessor(value::SlotId slot); /** * Make a copy of his environment. The new environment will have its own set of SlotAccessors @@ -151,39 +184,6 @@ private: std::vector<bool> owned; }; - class Accessor final : public value::SlotAccessor { - public: - Accessor(RuntimeEnvironment* env, size_t index) : _env{env}, _index{index} {} - - std::pair<value::TypeTags, value::Value> getViewOfValue() const override { - return {_env->_state->typeTags[_index], _env->_state->vals[_index]}; - } - - std::pair<value::TypeTags, value::Value> copyOrMoveValue() override { - // Always make a copy. - return copyValue(_env->_state->typeTags[_index], _env->_state->vals[_index]); - } - - void reset(bool owned, value::TypeTags tag, value::Value val) { - release(); - - _env->_state->typeTags[_index] = tag; - _env->_state->vals[_index] = val; - _env->_state->owned[_index] = owned; - } - - private: - void release() { - if (_env->_state->owned[_index]) { - releaseValue(_env->_state->typeTags[_index], _env->_state->vals[_index]); - _env->_state->owned[_index] = false; - } - } - - RuntimeEnvironment* const _env; - const size_t _index; - }; - void emplaceAccessor(value::SlotId slot, size_t index) { _accessors.emplace(slot, Accessor{this, index}); } @@ -200,6 +200,11 @@ struct CompileCtx { CompileCtx(std::unique_ptr<RuntimeEnvironment> env) : env{std::move(env)} {} value::SlotAccessor* getAccessor(value::SlotId slot); + + RuntimeEnvironment::Accessor* getRuntimeEnvAccessor(value::SlotId slotId) { + return env->getAccessor(slotId); + } + std::shared_ptr<SpoolBuffer> getSpoolBuffer(SpoolId spool); void pushCorrelated(value::SlotId slot, value::SlotAccessor* accessor); diff --git a/src/mongo/db/exec/sbe/parser/parser.cpp b/src/mongo/db/exec/sbe/parser/parser.cpp index 76d271b1e17..597229296f5 100644 --- a/src/mongo/db/exec/sbe/parser/parser.cpp +++ b/src/mongo/db/exec/sbe/parser/parser.cpp @@ -74,6 +74,8 @@ static constexpr auto kSyntax = R"( FORWARD_FLAG <- <'true'> / <'false'> + NEED_SLOT_FOR_OPLOG_TS <- <'true'> / <'false'> + SCAN <- 'scan' IDENT? # optional variable name of the root object (record) delivered by the scan IDENT? # optional variable name of the record id delivered by the scan IDENT? # optional variable name of the snapshot id read by the scan @@ -82,6 +84,7 @@ static constexpr auto kSyntax = R"( IDENT_LIST_WITH_RENAMES # list of projected fields (may be empty) IDENT # collection name to scan FORWARD_FLAG # forward scan or not + NEED_SLOT_FOR_OPLOG_TS # Whether a slot needs to be created for oplog timestamp PSCAN <- 'pscan' IDENT? # optional variable name of the root object (record) delivered by the scan IDENT? # optional variable name of the record id delivered by the scan @@ -100,6 +103,7 @@ static constexpr auto kSyntax = R"( IDENT_LIST_WITH_RENAMES # list of projected fields (may be empty) IDENT # collection name to scan FORWARD_FLAG # forward scan or not + NEED_SLOT_FOR_OPLOG_TS # Whether a slot needs to be created for oplog timestamp IXSCAN <- 'ixscan' IDENT? # optional variable name of the root object (record) delivered by the scan IDENT? # optional variable name of the record id delivered by the scan @@ -622,39 +626,40 @@ void Parser::walkScan(AstQuery& ast) { std::string snapshotIdName; std::string indexIdName; std::string indexKeyName; - std::string collName; int projectsPos; - int forwardPos; - if (ast.nodes.size() == 8) { + if (ast.nodes.size() == 9) { recordName = std::move(ast.nodes[0]->identifier); recordIdName = std::move(ast.nodes[1]->identifier); snapshotIdName = std::move(ast.nodes[2]->identifier); indexIdName = std::move(ast.nodes[3]->identifier); indexKeyName = std::move(ast.nodes[4]->identifier); projectsPos = 5; - collName = std::move(ast.nodes[6]->identifier); - forwardPos = 7; - } else if (ast.nodes.size() == 5) { + } else if (ast.nodes.size() == 6) { recordName = std::move(ast.nodes[0]->identifier); recordIdName = std::move(ast.nodes[1]->identifier); projectsPos = 2; - collName = std::move(ast.nodes[3]->identifier); - forwardPos = 4; - } else if (ast.nodes.size() == 4) { + } else if (ast.nodes.size() == 5) { recordName = std::move(ast.nodes[0]->identifier); projectsPos = 1; - collName = std::move(ast.nodes[2]->identifier); - forwardPos = 3; - } else if (ast.nodes.size() == 3) { + } else if (ast.nodes.size() == 4) { projectsPos = 0; - collName = std::move(ast.nodes[1]->identifier); - forwardPos = 2; } else { uasserted(5290715, "Wrong number of arguments for SCAN"); } + auto lastPos = ast.nodes.size() - 1; - const auto forward = (ast.nodes[forwardPos]->token == "true") ? true : false; + // The 'collName' should be third from last. + auto collName = std::move(ast.nodes[lastPos - 2]->identifier); + + // The 'FORWARD' should be second last. + const auto forward = (ast.nodes[lastPos - 1]->token == "true") ? true : false; + + // The 'NEED_SLOT_FOR_OPLOG_TS' always comes at the end. + const auto oplogTs = (ast.nodes[lastPos]->token == "true") + ? boost::optional<value::SlotId>(_env->registerSlot( + "oplogTs"_sd, value::TypeTags::Nothing, 0, false, &_slotIdGenerator)) + : boost::none; ast.stage = makeS<ScanStage>(getCollectionUuid(collName), lookupSlot(recordName), @@ -662,6 +667,7 @@ void Parser::walkScan(AstQuery& ast) { lookupSlot(snapshotIdName), lookupSlot(indexIdName), lookupSlot(indexKeyName), + oplogTs, ast.nodes[projectsPos]->identifiers, lookupSlots(ast.nodes[projectsPos]->renames), boost::none, @@ -727,39 +733,41 @@ void Parser::walkSeek(AstQuery& ast) { std::string snapshotIdName; std::string indexIdName; std::string indexKeyName; - std::string collName; + int projectsPos; - int forwardPos; - if (ast.nodes.size() == 9) { + if (ast.nodes.size() == 10) { recordName = std::move(ast.nodes[1]->identifier); recordIdName = std::move(ast.nodes[2]->identifier); snapshotIdName = std::move(ast.nodes[3]->identifier); indexIdName = std::move(ast.nodes[4]->identifier); indexKeyName = std::move(ast.nodes[5]->identifier); projectsPos = 6; - collName = std::move(ast.nodes[7]->identifier); - forwardPos = 8; } else if (ast.nodes.size() == 6) { recordName = std::move(ast.nodes[1]->identifier); recordIdName = std::move(ast.nodes[2]->identifier); projectsPos = 3; - collName = std::move(ast.nodes[4]->identifier); - forwardPos = 5; } else if (ast.nodes.size() == 5) { recordName = std::move(ast.nodes[1]->identifier); projectsPos = 2; - collName = std::move(ast.nodes[3]->identifier); - forwardPos = 4; } else if (ast.nodes.size() == 4) { projectsPos = 1; - collName = std::move(ast.nodes[2]->identifier); - forwardPos = 3; } else { - uasserted(5290717, "Wrong number of arguments for SEEk"); + uasserted(5290717, "Wrong number of arguments for SEEK"); } + auto lastPos = ast.nodes.size() - 1; - const auto forward = (ast.nodes[forwardPos]->token == "true") ? true : false; + // The 'collName' should be third from last. + auto collName = std::move(ast.nodes[lastPos - 2]->identifier); + + // The 'FORWARD' should be second last. + const auto forward = (ast.nodes[lastPos - 1]->token == "true") ? true : false; + + // The 'NEED_SLOT_FOR_OPLOG_TS' always comes at the end. + const auto oplogTs = (ast.nodes[lastPos]->token == "true") + ? boost::optional<value::SlotId>(_env->registerSlot( + "oplogTs"_sd, value::TypeTags::Nothing, 0, false, &_slotIdGenerator)) + : boost::none; ast.stage = makeS<ScanStage>(getCollectionUuid(collName), lookupSlot(recordName), @@ -767,6 +775,7 @@ void Parser::walkSeek(AstQuery& ast) { lookupSlot(snapshotIdName), lookupSlot(indexIdName), lookupSlot(indexKeyName), + oplogTs, ast.nodes[projectsPos]->identifiers, lookupSlots(ast.nodes[projectsPos]->renames), lookupSlot(ast.nodes[0]->identifier), @@ -1808,7 +1817,9 @@ void Parser::walk(AstQuery& ast) { } } -Parser::Parser() { +Parser::Parser(RuntimeEnvironment* env) : _env(env) { + invariant(_env); + _parser.log = [&](size_t ln, size_t col, const std::string& msg) { LOGV2(4885902, "{msg}", "msg"_attr = format_error_message(ln, col, msg)); }; diff --git a/src/mongo/db/exec/sbe/parser/parser.h b/src/mongo/db/exec/sbe/parser/parser.h index 6b02814d171..dc99252bc4c 100644 --- a/src/mongo/db/exec/sbe/parser/parser.h +++ b/src/mongo/db/exec/sbe/parser/parser.h @@ -63,7 +63,7 @@ using AstQuery = peg::AstBase<ParsedQueryTree>; class Parser { public: - Parser(); + Parser(RuntimeEnvironment* env); std::unique_ptr<PlanStage> parse(OperationContext* opCtx, StringData defaultDb, StringData line); @@ -84,6 +84,7 @@ private: value::SlotIdGenerator _slotIdGenerator; value::SpoolIdGenerator _spoolIdGenerator; FrameId _frameId{0}; + RuntimeEnvironment* _env; struct FrameSymbolTable { FrameId id; SymbolTable table; diff --git a/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp b/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp index 38f72fe2eaf..3fdfec561a2 100644 --- a/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp +++ b/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp @@ -188,6 +188,7 @@ protected: boost::none, boost::none, boost::none, + boost::none, std::vector<std::string>{}, sbe::makeSV(), boost::none, @@ -202,6 +203,7 @@ protected: boost::none, boost::none, boost::none, + boost::none, std::vector<std::string>{}, sbe::makeSV(), sbe::value::SlotId{3}, @@ -210,19 +212,21 @@ protected: planNodeId, sbe::ScanCallbacks({})), // SCAN with all slots present. - sbe::makeS<sbe::ScanStage>(fakeUuid, - sbe::value::SlotId{1}, - sbe::value::SlotId{2}, - sbe::value::SlotId{3}, - sbe::value::SlotId{4}, - sbe::value::SlotId{5}, - std::vector<std::string>{}, - sbe::makeSV(), - sbe::value::SlotId{6}, - true /* forward */, - nullptr, - planNodeId, - sbe::ScanCallbacks({})), + sbe::makeS<sbe::ScanStage>( + fakeUuid, + sbe::value::SlotId{1}, + sbe::value::SlotId{2}, + sbe::value::SlotId{3}, + sbe::value::SlotId{4}, + sbe::value::SlotId{5}, + sbe::value::SlotId{6}, + std::vector<std::string>{repl::OpTime::kTimestampFieldName.toString()}, + sbe::makeSV(sbe::value::SlotId{6}), + sbe::value::SlotId{7}, + true /* forward */, + nullptr, + planNodeId, + sbe::ScanCallbacks({})), // PSCAN with both 'recordSlot' and 'recordIdSlot' slots present. sbe::makeS<sbe::ParallelScanStage>(fakeUuid, sbe::value::SlotId{1}, @@ -580,7 +584,8 @@ TEST_F(SBEParserTest, TestIdenticalDebugOutputAfterParse) { sbe::DebugPrinter printer; for (const auto& stage : stages) { - sbe::Parser parser; + auto env = std::make_unique<sbe::RuntimeEnvironment>(); + sbe::Parser parser(env.get()); const auto stageText = printer.print(*stage); const auto parsedStage = parser.parse(nullptr, "testDb", stageText); @@ -592,7 +597,8 @@ TEST_F(SBEParserTest, TestIdenticalDebugOutputAfterParse) { TEST_F(SBEParserTest, TestPlanNodeIdIsParsed) { sbe::DebugPrinter printer; - sbe::Parser parser; + auto env = std::make_unique<sbe::RuntimeEnvironment>(); + sbe::Parser parser(env.get()); for (const auto& stage : stages) { const auto stageText = printer.print(*stage); diff --git a/src/mongo/db/exec/sbe/stages/scan.cpp b/src/mongo/db/exec/sbe/stages/scan.cpp index fd1d371b9ee..2d7e6fabe65 100644 --- a/src/mongo/db/exec/sbe/stages/scan.cpp +++ b/src/mongo/db/exec/sbe/stages/scan.cpp @@ -34,6 +34,7 @@ #include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/trial_run_tracker.h" #include "mongo/db/index/index_access_method.h" +#include "mongo/db/repl/optime.h" #include "mongo/util/str.h" namespace mongo { @@ -44,6 +45,7 @@ ScanStage::ScanStage(CollectionUUID collectionUuid, boost::optional<value::SlotId> snapshotIdSlot, boost::optional<value::SlotId> indexIdSlot, boost::optional<value::SlotId> indexKeySlot, + boost::optional<value::SlotId> oplogTsSlot, std::vector<std::string> fields, value::SlotVector vars, boost::optional<value::SlotId> seekKeySlot, @@ -58,6 +60,7 @@ ScanStage::ScanStage(CollectionUUID collectionUuid, _snapshotIdSlot(snapshotIdSlot), _indexIdSlot(indexIdSlot), _indexKeySlot(indexKeySlot), + _oplogTsSlot(oplogTsSlot), _fields(std::move(fields)), _vars(std::move(vars)), _seekKeySlot(seekKeySlot), @@ -65,6 +68,11 @@ ScanStage::ScanStage(CollectionUUID collectionUuid, _scanCallbacks(std::move(scanCallbacks)) { invariant(_fields.size() == _vars.size()); invariant(!_seekKeySlot || _forward); + tassert(5567202, + "The '_oplogTsSlot' cannot be set without 'ts' field in '_fields'", + !_oplogTsSlot || + (std::find(_fields.begin(), _fields.end(), repl::OpTime::kTimestampFieldName) != + _fields.end())); } std::unique_ptr<PlanStage> ScanStage::clone() const { @@ -74,6 +82,7 @@ std::unique_ptr<PlanStage> ScanStage::clone() const { _snapshotIdSlot, _indexIdSlot, _indexKeySlot, + _oplogTsSlot, _fields, _vars, _seekKeySlot, @@ -116,6 +125,10 @@ void ScanStage::prepare(CompileCtx& ctx) { _keyStringAccessor = ctx.getAccessor(*_indexKeySlot); } + if (_oplogTsSlot) { + _oplogTsAccessor = ctx.getRuntimeEnvAccessor(*_oplogTsSlot); + } + std::tie(_collName, _catalogEpoch) = acquireCollection(_opCtx, _collUuid, _scanCallbacks.lockAcquisitionCallback, _coll); } @@ -129,6 +142,10 @@ value::SlotAccessor* ScanStage::getAccessor(CompileCtx& ctx, value::SlotId slot) return _recordIdAccessor.get(); } + if (_oplogTsSlot && *_oplogTsSlot == slot) { + return _oplogTsAccessor; + } + if (auto it = _varAccessors.find(slot); it != _varAccessors.end()) { return it->second; } @@ -289,6 +306,11 @@ PlanState ScanStage::getNext() { // Found the field so convert it to Value. auto [tag, val] = bson::convertFrom(true, be, end, sv.size()); + if (_oplogTsAccessor && it->first == repl::OpTime::kTimestampFieldName) { + auto&& [ownedTag, ownedVal] = value::copyValue(tag, val); + _oplogTsAccessor->reset(false, ownedTag, ownedVal); + } + it->second->reset(tag, val); if ((--fieldsToMatch) == 0) { @@ -413,6 +435,8 @@ std::vector<DebugPrinter::Block> ScanStage::debugPrint() const { ret.emplace_back(_forward ? "true" : "false"); + ret.emplace_back(_oplogTsAccessor ? "true" : "false"); + return ret; } diff --git a/src/mongo/db/exec/sbe/stages/scan.h b/src/mongo/db/exec/sbe/stages/scan.h index 485afd63e0b..1f749d922dd 100644 --- a/src/mongo/db/exec/sbe/stages/scan.h +++ b/src/mongo/db/exec/sbe/stages/scan.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/sbe/stages/collection_helpers.h" #include "mongo/db/exec/sbe/stages/stages.h" #include "mongo/db/exec/sbe/values/bson.h" @@ -59,6 +60,7 @@ public: boost::optional<value::SlotId> snapshotIdSlot, boost::optional<value::SlotId> indexIdSlot, boost::optional<value::SlotId> indexKeySlot, + boost::optional<value::SlotId> oplogTsSlot, std::vector<std::string> fields, value::SlotVector vars, boost::optional<value::SlotId> seekKeySlot, @@ -94,8 +96,11 @@ private: const boost::optional<value::SlotId> _snapshotIdSlot; const boost::optional<value::SlotId> _indexIdSlot; const boost::optional<value::SlotId> _indexKeySlot; + const boost::optional<value::SlotId> _oplogTsSlot; + const std::vector<std::string> _fields; const value::SlotVector _vars; + const boost::optional<value::SlotId> _seekKeySlot; const bool _forward; @@ -113,6 +118,7 @@ private: value::SlotAccessor* _snapshotIdAccessor{nullptr}; value::SlotAccessor* _indexIdAccessor{nullptr}; value::SlotAccessor* _keyStringAccessor{nullptr}; + RuntimeEnvironment::Accessor* _oplogTsAccessor{nullptr}; value::FieldAccessorMap _fieldAccessors; value::SlotAccessorMap _varAccessors; diff --git a/src/mongo/db/exec/sbe_cmd.cpp b/src/mongo/db/exec/sbe_cmd.cpp index 42b11636853..0d58f7731a9 100644 --- a/src/mongo/db/exec/sbe_cmd.cpp +++ b/src/mongo/db/exec/sbe_cmd.cpp @@ -69,7 +69,8 @@ public: uassertStatusOK(CursorRequest::parseCommandCursorOptions( cmdObj, query_request_helper::kDefaultBatchSize, &batchSize)); - sbe::Parser parser; + auto env = std::make_unique<sbe::RuntimeEnvironment>(); + sbe::Parser parser(env.get()); auto root = parser.parse(opCtx, dbname, cmdObj["sbe"].String()); auto [resultSlot, recordIdSlot] = parser.getTopLevelSlots(); @@ -83,7 +84,7 @@ public: CanonicalQuery::canonicalize(opCtx, std::make_unique<FindCommandRequest>(nss)); std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - stage_builder::PlanStageData data{std::make_unique<sbe::RuntimeEnvironment>()}; + stage_builder::PlanStageData data{std::move(env)}; if (resultSlot) { data.outputs.set(stage_builder::PlanStageSlots::kResult, *resultSlot); diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp index bfae6954a90..822e28883d8 100644 --- a/src/mongo/db/query/plan_executor_sbe.cpp +++ b/src/mongo/db/query/plan_executor_sbe.cpp @@ -40,6 +40,7 @@ #include "mongo/db/query/plan_insert_listener.h" #include "mongo/db/query/sbe_stage_builder.h" #include "mongo/logv2/log.h" +#include "mongo/s/resharding/resume_token_gen.h" namespace mongo { // This failpoint is defined by the classic executor but is also accessed here. @@ -78,9 +79,8 @@ PlanExecutorSBE::PlanExecutorSBE(OperationContext* opCtx, uassert(4822866, "Query does not have recordId slot.", _resultRecordId); } - if (auto slot = _rootData.outputs.getIfExists(stage_builder::PlanStageSlots::kOplogTs); slot) { - _oplogTs = _root->getAccessor(_rootData.ctx, *slot); - uassert(4822867, "Query does not have oplogTs slot.", _oplogTs); + if (_rootData.shouldTrackLatestOplogTimestamp) { + _oplogTs = _rootData.env->getAccessor(_rootData.env->getSlot("oplogTs"_sd)); } if (winner.data.shouldUseTailableScan) { @@ -270,7 +270,10 @@ PlanExecutor::ExecState PlanExecutorSBE::getNext(BSONObj* out, RecordId* dlOut) Timestamp PlanExecutorSBE::getLatestOplogTimestamp() const { if (_rootData.shouldTrackLatestOplogTimestamp) { - invariant(_oplogTs); + tassert(5567201, + "The '_oplogTs' accessor should be populated when " + "'shouldTrackLatestOplogTimestamp' is true", + _oplogTs); auto [tag, val] = _oplogTs->getViewOfValue(); if (tag != sbe::value::TypeTags::Nothing) { @@ -301,6 +304,11 @@ BSONObj PlanExecutorSBE::getPostBatchResumeToken() const { return BSON("$recordId" << sbe::value::bitcastTo<int64_t>(val)); } } + + if (_rootData.shouldTrackLatestOplogTimestamp) { + return ResumeTokenOplogTimestamp{getLatestOplogTimestamp()}.toBSON(); + } + return {}; } diff --git a/src/mongo/db/query/plan_executor_sbe.h b/src/mongo/db/query/plan_executor_sbe.h index ebb092927be..918b2c8670a 100644 --- a/src/mongo/db/query/plan_executor_sbe.h +++ b/src/mongo/db/query/plan_executor_sbe.h @@ -150,8 +150,8 @@ private: sbe::value::SlotAccessor* _resultRecordId{nullptr}; sbe::value::TypeTags _tagLastRecordId{sbe::value::TypeTags::Nothing}; sbe::value::Value _valLastRecordId{0}; + sbe::RuntimeEnvironment::Accessor* _oplogTs{nullptr}; - sbe::value::SlotAccessor* _oplogTs{nullptr}; boost::optional<sbe::value::SlotId> _resumeRecordIdSlot; std::queue<std::pair<BSONObj, boost::optional<RecordId>>> _stash; diff --git a/src/mongo/db/query/sbe_stage_builder.cpp b/src/mongo/db/query/sbe_stage_builder.cpp index 59923ef4f39..83d2c9b1f37 100644 --- a/src/mongo/db/query/sbe_stage_builder.cpp +++ b/src/mongo/db/query/sbe_stage_builder.cpp @@ -259,9 +259,6 @@ std::string PlanStageData::debugString() const { if (auto slot = outputs.getIfExists(PlanStageSlots::kRecordId); slot) { builder << "$$RID=s" << *slot << " "; } - if (auto slot = outputs.getIfExists(PlanStageSlots::kOplogTs); slot) { - builder << "$$OPLOGTS=s" << *slot << " "; - } env->debugString(&builder); @@ -429,13 +426,10 @@ std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::build(const QuerySolution _buildHasStarted = true; // We always produce a 'resultSlot' and conditionally produce a 'recordIdSlot' based on the - // 'shouldProduceRecordIdSlot'. If the solution contains a CollectionScanNode with the - // 'shouldTrackLatestOplogTimestamp' flag set to true, then we will also produce an - // 'oplogTsSlot'. + // 'shouldProduceRecordIdSlot'. PlanStageReqs reqs; reqs.set(kResult); reqs.setIf(kRecordId, _shouldProduceRecordIdSlot); - reqs.setIf(kOplogTs, _data.shouldTrackLatestOplogTimestamp); // Build the SBE plan stage tree. auto [stage, outputs] = build(root, reqs); @@ -445,7 +439,6 @@ std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::build(const QuerySolution // it's needed. invariant(outputs.has(kResult)); invariant(!_shouldProduceRecordIdSlot || outputs.has(kRecordId)); - invariant(!_data.shouldTrackLatestOplogTimestamp || outputs.has(kOplogTs)); _data.outputs = std::move(outputs); @@ -477,9 +470,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder sbe::makeE<sbe::EFunction>("newObj", sbe::makeEs())); } - // Assert that generateCollScan() generated an oplogTsSlot if it's needed. - invariant(!reqs.has(kOplogTs) || outputs.has(kOplogTs)); - return {std::move(stage), std::move(outputs)}; } @@ -493,9 +483,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder invariant(!reqs.getIndexKeyBitset()); } - // Virtual scans cannot produce an oplogTsSlot, so assert that the caller doesn't need it. - invariant(!reqs.has(kOplogTs)); - auto [inputTag, inputVal] = sbe::value::makeNewArray(); sbe::value::ValueGuard inputGuard{inputTag, inputVal}; auto inputView = sbe::value::getArrayView(inputVal); @@ -563,9 +550,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder auto ixn = static_cast<const IndexScanNode*>(root); invariant(reqs.has(kReturnKey) || !ixn->addKeyMetadata); - // Index scans cannot produce an oplogTsSlot, so assert that the caller doesn't need it. - invariant(!reqs.has(kOplogTs)); - sbe::IndexKeysInclusionSet indexKeyBitset; if (reqs.has(PlanStageSlots::kReturnKey) || reqs.has(PlanStageSlots::kResult)) { @@ -661,6 +645,7 @@ SlotBasedStageBuilder::makeLoopJoinForFetch(std::unique_ptr<sbe::PlanStage> inpu snapshotIdSlot, indexIdSlot, keyStringSlot, + boost::none, std::vector<std::string>{}, sbe::makeSV(), seekKeySlot, @@ -686,10 +671,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder const QuerySolutionNode* root, const PlanStageReqs& reqs) { auto fn = static_cast<const FetchNode*>(root); - // At present, makeLoopJoinForFetch() doesn't have the necessary logic for producing an - // oplogTsSlot, so assert that the caller doesn't need oplogTsSlot. - invariant(!reqs.has(kOplogTs)); - // The child must produce all of the slots required by the parent of this FetchNode, except for // 'resultSlot' which will be produced by the call to makeLoopJoinForFetch() below. In addition // to that, the child must always produce a 'recordIdSlot' because it's needed for the call to @@ -1486,7 +1467,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder const QuerySolutionNode* root, const PlanStageReqs& reqs) { tassert(5432212, "no collection object", _collection); tassert(5432213, "index keys requsted for text match node", !reqs.getIndexKeyBitset()); - tassert(5432214, "oplogTs requsted for text match node", !reqs.has(kOplogTs)); tassert(5432215, str::stream() << "text match node must have one child, but got " << root->children.size(), diff --git a/src/mongo/db/query/sbe_stage_builder.h b/src/mongo/db/query/sbe_stage_builder.h index 2e7978b03c8..cc8baac65ba 100644 --- a/src/mongo/db/query/sbe_stage_builder.h +++ b/src/mongo/db/query/sbe_stage_builder.h @@ -59,7 +59,6 @@ public: static constexpr StringData kResult = "result"_sd; static constexpr StringData kRecordId = "recordId"_sd; static constexpr StringData kReturnKey = "returnKey"_sd; - static constexpr StringData kOplogTs = "oplogTs"_sd; static constexpr StringData kSnapshotId = "snapshotId"_sd; static constexpr StringData kIndexId = "indexId"_sd; static constexpr StringData kIndexKey = "indexKey"_sd; @@ -256,7 +255,6 @@ public: static constexpr StringData kResult = PlanStageSlots::kResult; static constexpr StringData kRecordId = PlanStageSlots::kRecordId; static constexpr StringData kReturnKey = PlanStageSlots::kReturnKey; - static constexpr StringData kOplogTs = PlanStageSlots::kOplogTs; static constexpr StringData kSnapshotId = PlanStageSlots::kSnapshotId; static constexpr StringData kIndexId = PlanStageSlots::kIndexId; static constexpr StringData kIndexKey = PlanStageSlots::kIndexKey; diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index bccc263eace..890d443dae7 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -51,6 +51,33 @@ namespace mongo::stage_builder { namespace { + +boost::optional<sbe::value::SlotId> registerOplogTs(sbe::RuntimeEnvironment* env, + sbe::value::SlotIdGenerator* slotIdGenerator) { + auto slotId = env->getSlotIfExists("oplogTs"_sd); + if (!slotId) { + return env->registerSlot( + "oplogTs"_sd, sbe::value::TypeTags::Nothing, 0, false, slotIdGenerator); + } + return slotId; +} + +/** + * If 'shouldTrackLatestOplogTimestamp' is true, then returns a vector holding the name of the oplog + * 'ts' field along with another vector holding a SlotId to map this field to, as well as the + * standalone value of the same SlotId (the latter is returned purely for convenience purposes). + */ +std::tuple<std::vector<std::string>, sbe::value::SlotVector, boost::optional<sbe::value::SlotId>> +makeOplogTimestampSlotsIfNeeded(sbe::RuntimeEnvironment* env, + sbe::value::SlotIdGenerator* slotIdGenerator, + bool shouldTrackLatestOplogTimestamp) { + if (shouldTrackLatestOplogTimestamp) { + auto slotId = registerOplogTs(env, slotIdGenerator); + return {{repl::OpTime::kTimestampFieldName.toString()}, sbe::makeSV(*slotId), slotId}; + } + return {}; +} + /** * Checks whether a callback function should be created for a ScanStage and returns it, if so. The * logic in the provided callback will be executed when the ScanStage is opened or reopened. @@ -83,24 +110,6 @@ sbe::ScanOpenCallback makeOpenCallbackIfNeeded(const CollectionPtr& collection, } /** - * If 'shouldTrackLatestOplogTimestamp' returns a vector holding the name of the oplog 'ts' field - * along with another vector holding a SlotId to map this field to, as well as the standalone value - * of the same SlotId (the latter is returned purely for convenience purposes). - */ -std::tuple<std::vector<std::string>, sbe::value::SlotVector, boost::optional<sbe::value::SlotId>> -makeOplogTimestampSlotsIfNeeded(const CollectionPtr& collection, - sbe::value::SlotIdGenerator* slotIdGenerator, - bool shouldTrackLatestOplogTimestamp) { - if (shouldTrackLatestOplogTimestamp) { - invariant(collection->ns().isOplog()); - - auto tsSlot = slotIdGenerator->generate(); - return {{repl::OpTime::kTimestampFieldName.toString()}, sbe::makeSV(tsSlot), tsSlot}; - } - return {}; -}; - -/** * Creates a collection scan sub-tree optimized for oplog scans. We can built an optimized scan * when there is a predicted on the 'ts' field of the oplog collection. * @@ -158,8 +167,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo // of if we need to track the latest oplog timestamp. const auto shouldTrackLatestOplogTimestamp = (csn->maxRecord || csn->shouldTrackLatestOplogTimestamp); - auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded( - collection, slotIdGenerator, shouldTrackLatestOplogTimestamp); + auto&& [fields, slots, tsSlot] = + makeOplogTimestampSlotsIfNeeded(env, slotIdGenerator, shouldTrackLatestOplogTimestamp); sbe::ScanCallbacks callbacks( lockAcquisitionCallback, {}, makeOpenCallbackIfNeeded(collection, csn)); @@ -169,6 +178,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo boost::none /* snapshotIdSlot */, boost::none /* indexIdSlot */, boost::none /* indexKeySlot */, + tsSlot, std::move(fields), std::move(slots), seekRecordIdSlot, @@ -204,23 +214,18 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo if (csn->assertTsHasNotFallenOffOplog && !isTailableResumeBranch) { invariant(csn->shouldTrackLatestOplogTimestamp); - // We will be constructing a filter that needs to see the 'ts' field. We name it 'minTsSlot' - // here so that it does not shadow the 'tsSlot' which we allocated earlier. - auto&& [fields, minTsSlots, minTsSlot] = makeOplogTimestampSlotsIfNeeded( - collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp); - - // We should always have allocated a 'minTsSlot', and there should always be a 'tsSlot' - // already allocated for the existing scan that we created previously. - invariant(minTsSlot); + // There should always be a 'tsSlot' already allocated on the RuntimeEnvironment for the + // existing scan that we created previously. invariant(tsSlot); - // Our filter will also need to see the 'op' and 'o.msg' fields. + // We will be constructing a filter that needs to see the 'ts' field. We name it 'minTsSlot' + // here so that it does not shadow the 'tsSlot' which we allocated earlier. Our filter will + // also need to see the 'op' and 'o.msg' fields. auto opTypeSlot = slotIdGenerator->generate(); auto oObjSlot = slotIdGenerator->generate(); - minTsSlots.push_back(opTypeSlot); - minTsSlots.push_back(oObjSlot); - fields.push_back("op"); - fields.push_back("o"); + auto minTsSlot = slotIdGenerator->generate(); + sbe::value::SlotVector minTsSlots = {minTsSlot, opTypeSlot, oObjSlot}; + std::vector<std::string> fields = {repl::OpTime::kTimestampFieldName.toString(), "op", "o"}; // If the first entry we see in the oplog is the replset initialization, then it doesn't // matter if its timestamp is later than the specified minTs; no events earlier than the @@ -251,6 +256,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo boost::none, boost::none, boost::none, + boost::none, std::move(fields), minTsSlots, /* don't move this */ boost::none, @@ -262,7 +268,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo makeBinaryOp( sbe::EPrimBinary::logicOr, makeBinaryOp(sbe::EPrimBinary::lessEq, - makeVariable(*minTsSlot), + makeVariable(minTsSlot), makeConstant(sbe::value::TypeTags::Timestamp, csn->assertTsHasNotFallenOffOplog->asULL())), makeBinaryOp( @@ -365,9 +371,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo invariant(csn->minRecord); invariant(csn->direction == CollectionScanParams::FORWARD); - std::tie(fields, slots, tsSlot) = makeOplogTimestampSlotsIfNeeded( - collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp); - seekRecordIdSlot = recordIdSlot; resultSlot = slotIdGenerator->generate(); recordIdSlot = slotIdGenerator->generate(); @@ -380,8 +383,9 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo boost::none, boost::none, boost::none, - std::move(fields), - std::move(slots), + boost::none, + std::vector<std::string>(), + sbe::makeSV(), seekRecordIdSlot, true /* forward */, yieldPolicy, @@ -401,10 +405,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo outputs.set(PlanStageSlots::kResult, resultSlot); outputs.set(PlanStageSlots::kRecordId, recordIdSlot); - if (csn->shouldTrackLatestOplogTimestamp) { - outputs.set(PlanStageSlots::kOplogTs, *tsSlot); - } - return {std::move(stage), std::move(outputs)}; } @@ -443,8 +443,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc }(); // See if we need to project out an oplog latest timestamp. - auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded( - collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp); + auto&& [fields, slots, tsSlot] = + makeOplogTimestampSlotsIfNeeded(env, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp); sbe::ScanCallbacks callbacks( lockAcquisitionCallback, {}, makeOpenCallbackIfNeeded(collection, csn)); @@ -454,6 +454,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc boost::none, boost::none, boost::none, + tsSlot, std::move(fields), std::move(slots), seekRecordIdSlot, @@ -487,6 +488,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc boost::none, boost::none, boost::none, + boost::none, std::vector<std::string>{}, sbe::makeSV(), seekSlot, @@ -542,9 +544,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc invariant(!csn->stopApplyingFilterAfterFirstMatch); auto relevantSlots = sbe::makeSV(resultSlot, recordIdSlot); - if (tsSlot) { - relevantSlots.push_back(*tsSlot); - } std::tie(std::ignore, stage) = generateFilter(opCtx, csn->filter.get(), @@ -561,10 +560,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc outputs.set(PlanStageSlots::kResult, resultSlot); outputs.set(PlanStageSlots::kRecordId, recordIdSlot); - if (tsSlot) { - outputs.set(PlanStageSlots::kOplogTs, *tsSlot); - } - return {std::move(stage), std::move(outputs)}; } } // namespace |