summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArun Banala <arun.banala@mongodb.com>2021-04-14 19:36:11 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-26 20:49:14 +0000
commitb5e656c92a92a4d6086a10dad0c1aad6e922fdc3 (patch)
tree3d845f2959b2598441a34b6951dfcc16eb0dab13
parent6a6229335d7325836d1d5812caa1eade17a95e34 (diff)
downloadmongo-b5e656c92a92a4d6086a10dad0c1aad6e922fdc3.tar.gz
SERVER-55672 Fix oplog timestamp logic in SBE
-rw-r--r--jstests/noPassthrough/change_stream_resume_before_add_shard.js1
-rw-r--r--jstests/noPassthrough/change_stream_sharded_startafter_invalidate.js1
-rw-r--r--jstests/noPassthrough/change_streams_collation_chunk_migration.js1
-rw-r--r--jstests/noPassthrough/change_streams_resume_at_same_clustertime.js1
-rw-r--r--jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js1
-rw-r--r--jstests/noPassthrough/change_streams_resume_token_applyops_overlap.js1
-rw-r--r--jstests/noPassthrough/change_streams_shell_helper_resume_token.js1
-rw-r--r--jstests/replsets/resume_after_against_oplog.js11
-rw-r--r--jstests/sharding/change_stream_chunk_migration.js1
-rw-r--r--jstests/sharding/change_stream_empty_apply_ops.js1
-rw-r--r--jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js1
-rw-r--r--jstests/sharding/change_stream_error_label.js1
-rw-r--r--jstests/sharding/change_stream_lookup_single_shard_cluster.js1
-rw-r--r--jstests/sharding/change_stream_metadata_notifications.js1
-rw-r--r--jstests/sharding/change_stream_read_preference.js1
-rw-r--r--jstests/sharding/change_stream_resume_from_different_mongos.js1
-rw-r--r--jstests/sharding/change_stream_shard_failover.js1
-rw-r--r--jstests/sharding/change_stream_transaction_sharded.js1
-rw-r--r--jstests/sharding/change_stream_update_lookup_collation.js1
-rw-r--r--jstests/sharding/change_stream_update_lookup_read_concern.js1
-rw-r--r--jstests/sharding/change_streams/lookup_change_stream_post_image_compound_shard_key.js1
-rw-r--r--jstests/sharding/change_streams/lookup_change_stream_post_image_hashed_shard_key.js1
-rw-r--r--jstests/sharding/change_streams/lookup_change_stream_post_image_id_shard_key.js1
-rw-r--r--jstests/sharding/change_streams/resume_change_stream.js1
-rw-r--r--jstests/sharding/change_streams/resume_change_stream_from_stale_mongos.js1
-rw-r--r--jstests/sharding/change_streams/resume_change_stream_on_subset_of_shards.js1
-rw-r--r--jstests/sharding/change_streams_delete_in_txn_produces_correct_doc_key.js1
-rw-r--r--jstests/sharding/change_streams_new_shard_new_database.js1
-rw-r--r--jstests/sharding/change_streams_primary_shard_unaware.js1
-rw-r--r--jstests/sharding/change_streams_shards_start_in_sync.js1
-rw-r--r--jstests/sharding/change_streams_unsharded_becomes_sharded.js1
-rw-r--r--jstests/sharding/change_streams_unsharded_update_resume.js1
-rw-r--r--jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js1
-rw-r--r--jstests/sharding/change_streams_whole_db.js1
-rw-r--r--src/mongo/db/exec/sbe/expressions/expression.cpp2
-rw-r--r--src/mongo/db/exec/sbe/expressions/expression.h73
-rw-r--r--src/mongo/db/exec/sbe/parser/parser.cpp69
-rw-r--r--src/mongo/db/exec/sbe/parser/parser.h3
-rw-r--r--src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp36
-rw-r--r--src/mongo/db/exec/sbe/stages/scan.cpp24
-rw-r--r--src/mongo/db/exec/sbe/stages/scan.h6
-rw-r--r--src/mongo/db/exec/sbe_cmd.cpp5
-rw-r--r--src/mongo/db/query/plan_executor_sbe.cpp16
-rw-r--r--src/mongo/db/query/plan_executor_sbe.h2
-rw-r--r--src/mongo/db/query/sbe_stage_builder.cpp24
-rw-r--r--src/mongo/db/query/sbe_stage_builder.h2
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.cpp99
47 files changed, 203 insertions, 202 deletions
diff --git a/jstests/noPassthrough/change_stream_resume_before_add_shard.js b/jstests/noPassthrough/change_stream_resume_before_add_shard.js
index ba813102fca..7a8a78fbbcb 100644
--- a/jstests/noPassthrough/change_stream_resume_before_add_shard.js
+++ b/jstests/noPassthrough/change_stream_resume_before_add_shard.js
@@ -3,7 +3,6 @@
* the cluster. Exercises the fix for SERVER-42232.
* @tags: [
* requires_sharding,
- * sbe_incompatible,
* uses_change_streams,
* ]
*/
diff --git a/jstests/noPassthrough/change_stream_sharded_startafter_invalidate.js b/jstests/noPassthrough/change_stream_sharded_startafter_invalidate.js
index a15ab37bce4..73ca9cbcdb3 100644
--- a/jstests/noPassthrough/change_stream_sharded_startafter_invalidate.js
+++ b/jstests/noPassthrough/change_stream_sharded_startafter_invalidate.js
@@ -4,7 +4,6 @@
// bug described in SERVER-41196.
// @tags: [
// requires_sharding,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/noPassthrough/change_streams_collation_chunk_migration.js b/jstests/noPassthrough/change_streams_collation_chunk_migration.js
index cb555d381ac..3386ba91480 100644
--- a/jstests/noPassthrough/change_streams_collation_chunk_migration.js
+++ b/jstests/noPassthrough/change_streams_collation_chunk_migration.js
@@ -4,7 +4,6 @@
* @tags: [
* requires_journaling,
* requires_replication,
- * sbe_incompatible,
* ]
*/
(function() {
diff --git a/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js b/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js
index b4cb69facdf..0832545786f 100644
--- a/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js
+++ b/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js
@@ -6,7 +6,6 @@
* requires_journaling,
* requires_majority_read_concern,
* requires_replication,
- * sbe_incompatible,
* ]
*/
(function() {
diff --git a/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js
index dff3544eb7a..371b12137fe 100644
--- a/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js
+++ b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js
@@ -4,7 +4,6 @@
* for the bug described in SERVER-40094.
* @tags: [
* requires_sharding,
- * sbe_incompatible,
* uses_change_streams,
* ]
*/
diff --git a/jstests/noPassthrough/change_streams_resume_token_applyops_overlap.js b/jstests/noPassthrough/change_streams_resume_token_applyops_overlap.js
index 744b312a528..4f0be761570 100644
--- a/jstests/noPassthrough/change_streams_resume_token_applyops_overlap.js
+++ b/jstests/noPassthrough/change_streams_resume_token_applyops_overlap.js
@@ -4,7 +4,6 @@
* bug described in SERVER-40094.
* @tags: [
* requires_sharding,
- * sbe_incompatible,
* uses_multi_shard_transaction,
* uses_transactions,
* ]
diff --git a/jstests/noPassthrough/change_streams_shell_helper_resume_token.js b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js
index 029cd9b25ca..830bb0fd1fe 100644
--- a/jstests/noPassthrough/change_streams_shell_helper_resume_token.js
+++ b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js
@@ -5,7 +5,6 @@
* @tags: [
* requires_journaling,
* requires_majority_read_concern,
- * sbe_incompatible,
* ]
*/
(function() {
diff --git a/jstests/replsets/resume_after_against_oplog.js b/jstests/replsets/resume_after_against_oplog.js
index c9e55dc3ad2..6db643f2a1a 100644
--- a/jstests/replsets/resume_after_against_oplog.js
+++ b/jstests/replsets/resume_after_against_oplog.js
@@ -4,7 +4,6 @@
*
* @tags: [
* requires_fcv_47,
- * sbe_incompatible,
* ]
*/
@@ -55,7 +54,7 @@ jsTestLog("Running initial query on the oplog");
// Assert resume token is non-null.
const resumeToken1 = assertExpectedResumeTokenFormat(res);
- assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1);
+ assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1, res);
// Kill the cursor before attempting to resume.
assert.commandWorked(localDb.runCommand({killCursors: "oplog.rs", cursors: [res.cursor.id]}));
@@ -88,7 +87,7 @@ jsTestLog("Running initial query on the oplog");
assert.eq(res2.cursor.firstBatch[0].o._id, 1, res);
const resumeToken2 = assertExpectedResumeTokenFormat(res2);
- assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1);
+ assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1, res2);
const res3 = assert.commandWorked(localDb.runCommand({
find: "oplog.rs",
@@ -102,7 +101,7 @@ jsTestLog("Running initial query on the oplog");
assert.eq(res3.cursor.firstBatch[0].o._id, 2, res);
const resumeToken3 = assertExpectedResumeTokenFormat(res3);
- assert.eq(timestampCmp(resumeToken3.ts, resumeToken2.ts), 1);
+ assert.eq(timestampCmp(resumeToken3.ts, resumeToken2.ts), 1, res3);
}
// ---------------------------------------------------------------------------------------
jsTestLog("Running initial tailable query on the oplog");
@@ -122,7 +121,7 @@ jsTestLog("Running initial tailable query on the oplog");
// Resume token should be non-null.
const resumeToken1 = assertExpectedResumeTokenFormat(res);
- assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1);
+ assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1, res);
const cursorId = res.cursor.id;
@@ -136,7 +135,7 @@ jsTestLog("Running initial tailable query on the oplog");
// Resume token should be greater than the find command's.
const resumeToken2 = assertExpectedResumeTokenFormat(resGetMore1);
- assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1);
+ assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1, resGetMore1);
jsTest.log(
"Ensure that postBatchResumeToken attribute is returned for getMore command with no results");
diff --git a/jstests/sharding/change_stream_chunk_migration.js b/jstests/sharding/change_stream_chunk_migration.js
index b9f1da2d9fd..0c4b77a2522 100644
--- a/jstests/sharding/change_stream_chunk_migration.js
+++ b/jstests/sharding/change_stream_chunk_migration.js
@@ -2,7 +2,6 @@
// it's migrating a chunk to a new shard.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_stream_empty_apply_ops.js b/jstests/sharding/change_stream_empty_apply_ops.js
index 80789d16058..836bf9fad69 100644
--- a/jstests/sharding/change_stream_empty_apply_ops.js
+++ b/jstests/sharding/change_stream_empty_apply_ops.js
@@ -4,7 +4,6 @@
// SERVER-50769.
// @tags: [
// requires_sharding,
-// sbe_incompatible,
// uses_change_streams,
// uses_multi_shard_transaction,
// uses_transactions,
diff --git a/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js b/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js
index 89fdf85c4ee..7102cfc0078 100644
--- a/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js
+++ b/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js
@@ -5,7 +5,6 @@
// shards.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_stream_error_label.js b/jstests/sharding/change_stream_error_label.js
index 1dded1dd0ec..c393b6254bc 100644
--- a/jstests/sharding/change_stream_error_label.js
+++ b/jstests/sharding/change_stream_error_label.js
@@ -4,7 +4,6 @@
* @tags: [
* requires_find_command,
* requires_sharding,
- * sbe_incompatible,
* uses_change_streams,
* ]
*/
diff --git a/jstests/sharding/change_stream_lookup_single_shard_cluster.js b/jstests/sharding/change_stream_lookup_single_shard_cluster.js
index 957b3d91b04..3d8117c619e 100644
--- a/jstests/sharding/change_stream_lookup_single_shard_cluster.js
+++ b/jstests/sharding/change_stream_lookup_single_shard_cluster.js
@@ -3,7 +3,6 @@
// sharded collection.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_stream_metadata_notifications.js b/jstests/sharding/change_stream_metadata_notifications.js
index 0413c96f36a..d5fa409b97e 100644
--- a/jstests/sharding/change_stream_metadata_notifications.js
+++ b/jstests/sharding/change_stream_metadata_notifications.js
@@ -3,7 +3,6 @@
// @tags: [
// requires_find_command,
// requires_majority_read_concern,
-// sbe_incompatible,
// ]
(function() {
"use strict";
diff --git a/jstests/sharding/change_stream_read_preference.js b/jstests/sharding/change_stream_read_preference.js
index feaa8263179..752f1726c2e 100644
--- a/jstests/sharding/change_stream_read_preference.js
+++ b/jstests/sharding/change_stream_read_preference.js
@@ -2,7 +2,6 @@
// user.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_stream_resume_from_different_mongos.js b/jstests/sharding/change_stream_resume_from_different_mongos.js
index 6bff49ffb46..27fca91e6b7 100644
--- a/jstests/sharding/change_stream_resume_from_different_mongos.js
+++ b/jstests/sharding/change_stream_resume_from_different_mongos.js
@@ -1,7 +1,6 @@
// Test resuming a change stream on a mongos other than the one the change stream was started on.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_stream_shard_failover.js b/jstests/sharding/change_stream_shard_failover.js
index ad12a627681..eadc821dec9 100644
--- a/jstests/sharding/change_stream_shard_failover.js
+++ b/jstests/sharding/change_stream_shard_failover.js
@@ -3,7 +3,6 @@
* by triggering a stepdown.
* @tags: [
* requires_majority_read_concern,
- * sbe_incompatible,
* uses_change_streams,
* ]
*/
diff --git a/jstests/sharding/change_stream_transaction_sharded.js b/jstests/sharding/change_stream_transaction_sharded.js
index 44cc029f0d2..96e15459ff1 100644
--- a/jstests/sharding/change_stream_transaction_sharded.js
+++ b/jstests/sharding/change_stream_transaction_sharded.js
@@ -1,7 +1,6 @@
// Confirms that change streams only see committed operations for sharded transactions.
// @tags: [
// requires_sharding,
-// sbe_incompatible,
// uses_change_streams,
// uses_multi_shard_transaction,
// uses_transactions,
diff --git a/jstests/sharding/change_stream_update_lookup_collation.js b/jstests/sharding/change_stream_update_lookup_collation.js
index ac9242c707d..be07efa032b 100644
--- a/jstests/sharding/change_stream_update_lookup_collation.js
+++ b/jstests/sharding/change_stream_update_lookup_collation.js
@@ -5,7 +5,6 @@
// @tags: [
// requires_find_command,
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_stream_update_lookup_read_concern.js b/jstests/sharding/change_stream_update_lookup_read_concern.js
index a793a4728f2..7cb8f5c667e 100644
--- a/jstests/sharding/change_stream_update_lookup_read_concern.js
+++ b/jstests/sharding/change_stream_update_lookup_read_concern.js
@@ -3,7 +3,6 @@
// change that we're doing the lookup for, and that change will be majority-committed.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_streams/lookup_change_stream_post_image_compound_shard_key.js b/jstests/sharding/change_streams/lookup_change_stream_post_image_compound_shard_key.js
index 55b610147df..97fb61631f3 100644
--- a/jstests/sharding/change_streams/lookup_change_stream_post_image_compound_shard_key.js
+++ b/jstests/sharding/change_streams/lookup_change_stream_post_image_compound_shard_key.js
@@ -2,7 +2,6 @@
// sharded with a compound shard key.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_streams/lookup_change_stream_post_image_hashed_shard_key.js b/jstests/sharding/change_streams/lookup_change_stream_post_image_hashed_shard_key.js
index 6a7dba0b6c8..e945fcc3af8 100644
--- a/jstests/sharding/change_streams/lookup_change_stream_post_image_hashed_shard_key.js
+++ b/jstests/sharding/change_streams/lookup_change_stream_post_image_hashed_shard_key.js
@@ -2,7 +2,6 @@
// sharded with a hashed shard key.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_streams/lookup_change_stream_post_image_id_shard_key.js b/jstests/sharding/change_streams/lookup_change_stream_post_image_id_shard_key.js
index 6848e225672..f577ff932e3 100644
--- a/jstests/sharding/change_streams/lookup_change_stream_post_image_id_shard_key.js
+++ b/jstests/sharding/change_streams/lookup_change_stream_post_image_id_shard_key.js
@@ -2,7 +2,6 @@
// sharded with a key which is just the "_id" field.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_streams/resume_change_stream.js b/jstests/sharding/change_streams/resume_change_stream.js
index f6824a344f7..441c9e43a83 100644
--- a/jstests/sharding/change_streams/resume_change_stream.js
+++ b/jstests/sharding/change_streams/resume_change_stream.js
@@ -3,7 +3,6 @@
// @tags: [
// requires_find_command,
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_streams/resume_change_stream_from_stale_mongos.js b/jstests/sharding/change_streams/resume_change_stream_from_stale_mongos.js
index 2a0f48e9fff..75c8796aea8 100644
--- a/jstests/sharding/change_streams/resume_change_stream_from_stale_mongos.js
+++ b/jstests/sharding/change_streams/resume_change_stream_from_stale_mongos.js
@@ -3,7 +3,6 @@
// a stale shard version.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_streams/resume_change_stream_on_subset_of_shards.js b/jstests/sharding/change_streams/resume_change_stream_on_subset_of_shards.js
index beaefb9f520..64783cb2a2e 100644
--- a/jstests/sharding/change_streams/resume_change_stream_on_subset_of_shards.js
+++ b/jstests/sharding/change_streams/resume_change_stream_on_subset_of_shards.js
@@ -2,7 +2,6 @@
// the collection.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_streams_delete_in_txn_produces_correct_doc_key.js b/jstests/sharding/change_streams_delete_in_txn_produces_correct_doc_key.js
index c5ce06fa9f8..634b8b39d1e 100644
--- a/jstests/sharding/change_streams_delete_in_txn_produces_correct_doc_key.js
+++ b/jstests/sharding/change_streams_delete_in_txn_produces_correct_doc_key.js
@@ -2,7 +2,6 @@
// but only the shard key and _id in the 'documentKey' field. Exercises the fix for SERVER-45987.
// @tags: [
// multiversion_incompatible,
-// sbe_incompatible,
// uses_transactions,
// ]
diff --git a/jstests/sharding/change_streams_new_shard_new_database.js b/jstests/sharding/change_streams_new_shard_new_database.js
index 5057266d1ce..e4debdf534a 100644
--- a/jstests/sharding/change_streams_new_shard_new_database.js
+++ b/jstests/sharding/change_streams_new_shard_new_database.js
@@ -8,7 +8,6 @@
* @tags: [
* requires_find_command,
* requires_sharding,
- * sbe_incompatible,
* uses_change_streams,
* ]
*/
diff --git a/jstests/sharding/change_streams_primary_shard_unaware.js b/jstests/sharding/change_streams_primary_shard_unaware.js
index a7b32e5fde8..0b233163384 100644
--- a/jstests/sharding/change_streams_primary_shard_unaware.js
+++ b/jstests/sharding/change_streams_primary_shard_unaware.js
@@ -7,7 +7,6 @@
// blacklist_from_rhel_67_s390x,
// requires_majority_read_concern,
// requires_persistence,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_streams_shards_start_in_sync.js b/jstests/sharding/change_streams_shards_start_in_sync.js
index ba9947b2afa..627a633c176 100644
--- a/jstests/sharding/change_streams_shards_start_in_sync.js
+++ b/jstests/sharding/change_streams_shards_start_in_sync.js
@@ -7,7 +7,6 @@
// and 'B' will be seen in the changestream before 'C'.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
index 0b86b72e5ce..75e8f2a4388 100644
--- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js
+++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
@@ -5,7 +5,6 @@
// sharded.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/jstests/sharding/change_streams_unsharded_update_resume.js b/jstests/sharding/change_streams_unsharded_update_resume.js
index ed44e8f1ffb..96a8a0230da 100644
--- a/jstests/sharding/change_streams_unsharded_update_resume.js
+++ b/jstests/sharding/change_streams_unsharded_update_resume.js
@@ -2,7 +2,6 @@
* Tests that the post-image of an update which occurred while the collection was unsharded can
* still be looked up after the collection becomes sharded. Exercises the fix for SERVER-44484.
* @tags: [
- * sbe_incompatible,
* uses_change_streams,
* ]
*/
diff --git a/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js b/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
index bba669f0f67..39dd6a7417c 100644
--- a/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
+++ b/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
@@ -3,7 +3,6 @@
* TooManyMatchingDocuments after fixing SERVER-44598.
*
* @tags: [
- * sbe_incompatible,
* uses_change_streams,
* ]
*/
diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js
index b087ce478f9..bd1bfe64f21 100644
--- a/jstests/sharding/change_streams_whole_db.js
+++ b/jstests/sharding/change_streams_whole_db.js
@@ -1,7 +1,6 @@
// Tests the behavior of a change stream on a whole database in a sharded cluster.
// @tags: [
// requires_majority_read_concern,
-// sbe_incompatible,
// uses_change_streams,
// ]
(function() {
diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp
index bf50e71ce79..b0b67aea8d9 100644
--- a/src/mongo/db/exec/sbe/expressions/expression.cpp
+++ b/src/mongo/db/exec/sbe/expressions/expression.cpp
@@ -864,7 +864,7 @@ void RuntimeEnvironment::resetSlot(value::SlotId slot,
uasserted(4946300, str::stream() << "undefined slot accessor:" << slot);
}
-value::SlotAccessor* RuntimeEnvironment::getAccessor(value::SlotId slot) {
+RuntimeEnvironment::Accessor* RuntimeEnvironment::getAccessor(value::SlotId slot) {
if (auto it = _accessors.find(slot); it != _accessors.end()) {
return &it->second;
}
diff --git a/src/mongo/db/exec/sbe/expressions/expression.h b/src/mongo/db/exec/sbe/expressions/expression.h
index f52acef32ca..a8f24c578e7 100644
--- a/src/mongo/db/exec/sbe/expressions/expression.h
+++ b/src/mongo/db/exec/sbe/expressions/expression.h
@@ -70,6 +70,39 @@ public:
RuntimeEnvironment& operator=(const RuntimeEnvironment&&) = delete;
~RuntimeEnvironment();
+ class Accessor final : public value::SlotAccessor {
+ public:
+ Accessor(RuntimeEnvironment* env, size_t index) : _env{env}, _index{index} {}
+
+ std::pair<value::TypeTags, value::Value> getViewOfValue() const override {
+ return {_env->_state->typeTags[_index], _env->_state->vals[_index]};
+ }
+
+ std::pair<value::TypeTags, value::Value> copyOrMoveValue() override {
+ // Always make a copy.
+ return copyValue(_env->_state->typeTags[_index], _env->_state->vals[_index]);
+ }
+
+ void reset(bool owned, value::TypeTags tag, value::Value val) {
+ release();
+
+ _env->_state->typeTags[_index] = tag;
+ _env->_state->vals[_index] = val;
+ _env->_state->owned[_index] = owned;
+ }
+
+ private:
+ void release() {
+ if (_env->_state->owned[_index]) {
+ releaseValue(_env->_state->typeTags[_index], _env->_state->vals[_index]);
+ _env->_state->owned[_index] = false;
+ }
+ }
+
+ RuntimeEnvironment* const _env;
+ const size_t _index;
+ };
+
/**
* Registers and returns a SlotId for the given slot 'name'. The 'slotIdGenerator' is used
* to generate a new SlotId for the given slot 'name', which is then registered with this
@@ -112,7 +145,7 @@ public:
*
* A user exception is raised if the SlotId is not registered within this environment.
*/
- value::SlotAccessor* getAccessor(value::SlotId slot);
+ Accessor* getAccessor(value::SlotId slot);
/**
* Make a copy of his environment. The new environment will have its own set of SlotAccessors
@@ -151,39 +184,6 @@ private:
std::vector<bool> owned;
};
- class Accessor final : public value::SlotAccessor {
- public:
- Accessor(RuntimeEnvironment* env, size_t index) : _env{env}, _index{index} {}
-
- std::pair<value::TypeTags, value::Value> getViewOfValue() const override {
- return {_env->_state->typeTags[_index], _env->_state->vals[_index]};
- }
-
- std::pair<value::TypeTags, value::Value> copyOrMoveValue() override {
- // Always make a copy.
- return copyValue(_env->_state->typeTags[_index], _env->_state->vals[_index]);
- }
-
- void reset(bool owned, value::TypeTags tag, value::Value val) {
- release();
-
- _env->_state->typeTags[_index] = tag;
- _env->_state->vals[_index] = val;
- _env->_state->owned[_index] = owned;
- }
-
- private:
- void release() {
- if (_env->_state->owned[_index]) {
- releaseValue(_env->_state->typeTags[_index], _env->_state->vals[_index]);
- _env->_state->owned[_index] = false;
- }
- }
-
- RuntimeEnvironment* const _env;
- const size_t _index;
- };
-
void emplaceAccessor(value::SlotId slot, size_t index) {
_accessors.emplace(slot, Accessor{this, index});
}
@@ -200,6 +200,11 @@ struct CompileCtx {
CompileCtx(std::unique_ptr<RuntimeEnvironment> env) : env{std::move(env)} {}
value::SlotAccessor* getAccessor(value::SlotId slot);
+
+ RuntimeEnvironment::Accessor* getRuntimeEnvAccessor(value::SlotId slotId) {
+ return env->getAccessor(slotId);
+ }
+
std::shared_ptr<SpoolBuffer> getSpoolBuffer(SpoolId spool);
void pushCorrelated(value::SlotId slot, value::SlotAccessor* accessor);
diff --git a/src/mongo/db/exec/sbe/parser/parser.cpp b/src/mongo/db/exec/sbe/parser/parser.cpp
index 76d271b1e17..597229296f5 100644
--- a/src/mongo/db/exec/sbe/parser/parser.cpp
+++ b/src/mongo/db/exec/sbe/parser/parser.cpp
@@ -74,6 +74,8 @@ static constexpr auto kSyntax = R"(
FORWARD_FLAG <- <'true'> / <'false'>
+ NEED_SLOT_FOR_OPLOG_TS <- <'true'> / <'false'>
+
SCAN <- 'scan' IDENT? # optional variable name of the root object (record) delivered by the scan
IDENT? # optional variable name of the record id delivered by the scan
IDENT? # optional variable name of the snapshot id read by the scan
@@ -82,6 +84,7 @@ static constexpr auto kSyntax = R"(
IDENT_LIST_WITH_RENAMES # list of projected fields (may be empty)
IDENT # collection name to scan
FORWARD_FLAG # forward scan or not
+ NEED_SLOT_FOR_OPLOG_TS # Whether a slot needs to be created for oplog timestamp
PSCAN <- 'pscan' IDENT? # optional variable name of the root object (record) delivered by the scan
IDENT? # optional variable name of the record id delivered by the scan
@@ -100,6 +103,7 @@ static constexpr auto kSyntax = R"(
IDENT_LIST_WITH_RENAMES # list of projected fields (may be empty)
IDENT # collection name to scan
FORWARD_FLAG # forward scan or not
+ NEED_SLOT_FOR_OPLOG_TS # Whether a slot needs to be created for oplog timestamp
IXSCAN <- 'ixscan' IDENT? # optional variable name of the root object (record) delivered by the scan
IDENT? # optional variable name of the record id delivered by the scan
@@ -622,39 +626,40 @@ void Parser::walkScan(AstQuery& ast) {
std::string snapshotIdName;
std::string indexIdName;
std::string indexKeyName;
- std::string collName;
int projectsPos;
- int forwardPos;
- if (ast.nodes.size() == 8) {
+ if (ast.nodes.size() == 9) {
recordName = std::move(ast.nodes[0]->identifier);
recordIdName = std::move(ast.nodes[1]->identifier);
snapshotIdName = std::move(ast.nodes[2]->identifier);
indexIdName = std::move(ast.nodes[3]->identifier);
indexKeyName = std::move(ast.nodes[4]->identifier);
projectsPos = 5;
- collName = std::move(ast.nodes[6]->identifier);
- forwardPos = 7;
- } else if (ast.nodes.size() == 5) {
+ } else if (ast.nodes.size() == 6) {
recordName = std::move(ast.nodes[0]->identifier);
recordIdName = std::move(ast.nodes[1]->identifier);
projectsPos = 2;
- collName = std::move(ast.nodes[3]->identifier);
- forwardPos = 4;
- } else if (ast.nodes.size() == 4) {
+ } else if (ast.nodes.size() == 5) {
recordName = std::move(ast.nodes[0]->identifier);
projectsPos = 1;
- collName = std::move(ast.nodes[2]->identifier);
- forwardPos = 3;
- } else if (ast.nodes.size() == 3) {
+ } else if (ast.nodes.size() == 4) {
projectsPos = 0;
- collName = std::move(ast.nodes[1]->identifier);
- forwardPos = 2;
} else {
uasserted(5290715, "Wrong number of arguments for SCAN");
}
+ auto lastPos = ast.nodes.size() - 1;
- const auto forward = (ast.nodes[forwardPos]->token == "true") ? true : false;
+ // The 'collName' should be third from last.
+ auto collName = std::move(ast.nodes[lastPos - 2]->identifier);
+
+ // The 'FORWARD' should be second last.
+ const auto forward = (ast.nodes[lastPos - 1]->token == "true") ? true : false;
+
+ // The 'NEED_SLOT_FOR_OPLOG_TS' always comes at the end.
+ const auto oplogTs = (ast.nodes[lastPos]->token == "true")
+ ? boost::optional<value::SlotId>(_env->registerSlot(
+ "oplogTs"_sd, value::TypeTags::Nothing, 0, false, &_slotIdGenerator))
+ : boost::none;
ast.stage = makeS<ScanStage>(getCollectionUuid(collName),
lookupSlot(recordName),
@@ -662,6 +667,7 @@ void Parser::walkScan(AstQuery& ast) {
lookupSlot(snapshotIdName),
lookupSlot(indexIdName),
lookupSlot(indexKeyName),
+ oplogTs,
ast.nodes[projectsPos]->identifiers,
lookupSlots(ast.nodes[projectsPos]->renames),
boost::none,
@@ -727,39 +733,41 @@ void Parser::walkSeek(AstQuery& ast) {
std::string snapshotIdName;
std::string indexIdName;
std::string indexKeyName;
- std::string collName;
+
int projectsPos;
- int forwardPos;
- if (ast.nodes.size() == 9) {
+ if (ast.nodes.size() == 10) {
recordName = std::move(ast.nodes[1]->identifier);
recordIdName = std::move(ast.nodes[2]->identifier);
snapshotIdName = std::move(ast.nodes[3]->identifier);
indexIdName = std::move(ast.nodes[4]->identifier);
indexKeyName = std::move(ast.nodes[5]->identifier);
projectsPos = 6;
- collName = std::move(ast.nodes[7]->identifier);
- forwardPos = 8;
} else if (ast.nodes.size() == 6) {
recordName = std::move(ast.nodes[1]->identifier);
recordIdName = std::move(ast.nodes[2]->identifier);
projectsPos = 3;
- collName = std::move(ast.nodes[4]->identifier);
- forwardPos = 5;
} else if (ast.nodes.size() == 5) {
recordName = std::move(ast.nodes[1]->identifier);
projectsPos = 2;
- collName = std::move(ast.nodes[3]->identifier);
- forwardPos = 4;
} else if (ast.nodes.size() == 4) {
projectsPos = 1;
- collName = std::move(ast.nodes[2]->identifier);
- forwardPos = 3;
} else {
- uasserted(5290717, "Wrong number of arguments for SEEk");
+ uasserted(5290717, "Wrong number of arguments for SEEK");
}
+ auto lastPos = ast.nodes.size() - 1;
- const auto forward = (ast.nodes[forwardPos]->token == "true") ? true : false;
+ // The 'collName' should be third from last.
+ auto collName = std::move(ast.nodes[lastPos - 2]->identifier);
+
+ // The 'FORWARD' should be second last.
+ const auto forward = (ast.nodes[lastPos - 1]->token == "true") ? true : false;
+
+ // The 'NEED_SLOT_FOR_OPLOG_TS' always comes at the end.
+ const auto oplogTs = (ast.nodes[lastPos]->token == "true")
+ ? boost::optional<value::SlotId>(_env->registerSlot(
+ "oplogTs"_sd, value::TypeTags::Nothing, 0, false, &_slotIdGenerator))
+ : boost::none;
ast.stage = makeS<ScanStage>(getCollectionUuid(collName),
lookupSlot(recordName),
@@ -767,6 +775,7 @@ void Parser::walkSeek(AstQuery& ast) {
lookupSlot(snapshotIdName),
lookupSlot(indexIdName),
lookupSlot(indexKeyName),
+ oplogTs,
ast.nodes[projectsPos]->identifiers,
lookupSlots(ast.nodes[projectsPos]->renames),
lookupSlot(ast.nodes[0]->identifier),
@@ -1808,7 +1817,9 @@ void Parser::walk(AstQuery& ast) {
}
}
-Parser::Parser() {
+Parser::Parser(RuntimeEnvironment* env) : _env(env) {
+ invariant(_env);
+
_parser.log = [&](size_t ln, size_t col, const std::string& msg) {
LOGV2(4885902, "{msg}", "msg"_attr = format_error_message(ln, col, msg));
};
diff --git a/src/mongo/db/exec/sbe/parser/parser.h b/src/mongo/db/exec/sbe/parser/parser.h
index 6b02814d171..dc99252bc4c 100644
--- a/src/mongo/db/exec/sbe/parser/parser.h
+++ b/src/mongo/db/exec/sbe/parser/parser.h
@@ -63,7 +63,7 @@ using AstQuery = peg::AstBase<ParsedQueryTree>;
class Parser {
public:
- Parser();
+ Parser(RuntimeEnvironment* env);
std::unique_ptr<PlanStage> parse(OperationContext* opCtx,
StringData defaultDb,
StringData line);
@@ -84,6 +84,7 @@ private:
value::SlotIdGenerator _slotIdGenerator;
value::SpoolIdGenerator _spoolIdGenerator;
FrameId _frameId{0};
+ RuntimeEnvironment* _env;
struct FrameSymbolTable {
FrameId id;
SymbolTable table;
diff --git a/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp b/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp
index 38f72fe2eaf..3fdfec561a2 100644
--- a/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp
+++ b/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp
@@ -188,6 +188,7 @@ protected:
boost::none,
boost::none,
boost::none,
+ boost::none,
std::vector<std::string>{},
sbe::makeSV(),
boost::none,
@@ -202,6 +203,7 @@ protected:
boost::none,
boost::none,
boost::none,
+ boost::none,
std::vector<std::string>{},
sbe::makeSV(),
sbe::value::SlotId{3},
@@ -210,19 +212,21 @@ protected:
planNodeId,
sbe::ScanCallbacks({})),
// SCAN with all slots present.
- sbe::makeS<sbe::ScanStage>(fakeUuid,
- sbe::value::SlotId{1},
- sbe::value::SlotId{2},
- sbe::value::SlotId{3},
- sbe::value::SlotId{4},
- sbe::value::SlotId{5},
- std::vector<std::string>{},
- sbe::makeSV(),
- sbe::value::SlotId{6},
- true /* forward */,
- nullptr,
- planNodeId,
- sbe::ScanCallbacks({})),
+ sbe::makeS<sbe::ScanStage>(
+ fakeUuid,
+ sbe::value::SlotId{1},
+ sbe::value::SlotId{2},
+ sbe::value::SlotId{3},
+ sbe::value::SlotId{4},
+ sbe::value::SlotId{5},
+ sbe::value::SlotId{6},
+ std::vector<std::string>{repl::OpTime::kTimestampFieldName.toString()},
+ sbe::makeSV(sbe::value::SlotId{6}),
+ sbe::value::SlotId{7},
+ true /* forward */,
+ nullptr,
+ planNodeId,
+ sbe::ScanCallbacks({})),
// PSCAN with both 'recordSlot' and 'recordIdSlot' slots present.
sbe::makeS<sbe::ParallelScanStage>(fakeUuid,
sbe::value::SlotId{1},
@@ -580,7 +584,8 @@ TEST_F(SBEParserTest, TestIdenticalDebugOutputAfterParse) {
sbe::DebugPrinter printer;
for (const auto& stage : stages) {
- sbe::Parser parser;
+ auto env = std::make_unique<sbe::RuntimeEnvironment>();
+ sbe::Parser parser(env.get());
const auto stageText = printer.print(*stage);
const auto parsedStage = parser.parse(nullptr, "testDb", stageText);
@@ -592,7 +597,8 @@ TEST_F(SBEParserTest, TestIdenticalDebugOutputAfterParse) {
TEST_F(SBEParserTest, TestPlanNodeIdIsParsed) {
sbe::DebugPrinter printer;
- sbe::Parser parser;
+ auto env = std::make_unique<sbe::RuntimeEnvironment>();
+ sbe::Parser parser(env.get());
for (const auto& stage : stages) {
const auto stageText = printer.print(*stage);
diff --git a/src/mongo/db/exec/sbe/stages/scan.cpp b/src/mongo/db/exec/sbe/stages/scan.cpp
index fd1d371b9ee..2d7e6fabe65 100644
--- a/src/mongo/db/exec/sbe/stages/scan.cpp
+++ b/src/mongo/db/exec/sbe/stages/scan.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/exec/sbe/expressions/expression.h"
#include "mongo/db/exec/trial_run_tracker.h"
#include "mongo/db/index/index_access_method.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/util/str.h"
namespace mongo {
@@ -44,6 +45,7 @@ ScanStage::ScanStage(CollectionUUID collectionUuid,
boost::optional<value::SlotId> snapshotIdSlot,
boost::optional<value::SlotId> indexIdSlot,
boost::optional<value::SlotId> indexKeySlot,
+ boost::optional<value::SlotId> oplogTsSlot,
std::vector<std::string> fields,
value::SlotVector vars,
boost::optional<value::SlotId> seekKeySlot,
@@ -58,6 +60,7 @@ ScanStage::ScanStage(CollectionUUID collectionUuid,
_snapshotIdSlot(snapshotIdSlot),
_indexIdSlot(indexIdSlot),
_indexKeySlot(indexKeySlot),
+ _oplogTsSlot(oplogTsSlot),
_fields(std::move(fields)),
_vars(std::move(vars)),
_seekKeySlot(seekKeySlot),
@@ -65,6 +68,11 @@ ScanStage::ScanStage(CollectionUUID collectionUuid,
_scanCallbacks(std::move(scanCallbacks)) {
invariant(_fields.size() == _vars.size());
invariant(!_seekKeySlot || _forward);
+ tassert(5567202,
+ "The '_oplogTsSlot' cannot be set without 'ts' field in '_fields'",
+ !_oplogTsSlot ||
+ (std::find(_fields.begin(), _fields.end(), repl::OpTime::kTimestampFieldName) !=
+ _fields.end()));
}
std::unique_ptr<PlanStage> ScanStage::clone() const {
@@ -74,6 +82,7 @@ std::unique_ptr<PlanStage> ScanStage::clone() const {
_snapshotIdSlot,
_indexIdSlot,
_indexKeySlot,
+ _oplogTsSlot,
_fields,
_vars,
_seekKeySlot,
@@ -116,6 +125,10 @@ void ScanStage::prepare(CompileCtx& ctx) {
_keyStringAccessor = ctx.getAccessor(*_indexKeySlot);
}
+ if (_oplogTsSlot) {
+ _oplogTsAccessor = ctx.getRuntimeEnvAccessor(*_oplogTsSlot);
+ }
+
std::tie(_collName, _catalogEpoch) =
acquireCollection(_opCtx, _collUuid, _scanCallbacks.lockAcquisitionCallback, _coll);
}
@@ -129,6 +142,10 @@ value::SlotAccessor* ScanStage::getAccessor(CompileCtx& ctx, value::SlotId slot)
return _recordIdAccessor.get();
}
+ if (_oplogTsSlot && *_oplogTsSlot == slot) {
+ return _oplogTsAccessor;
+ }
+
if (auto it = _varAccessors.find(slot); it != _varAccessors.end()) {
return it->second;
}
@@ -289,6 +306,11 @@ PlanState ScanStage::getNext() {
// Found the field so convert it to Value.
auto [tag, val] = bson::convertFrom(true, be, end, sv.size());
+ if (_oplogTsAccessor && it->first == repl::OpTime::kTimestampFieldName) {
+ auto&& [ownedTag, ownedVal] = value::copyValue(tag, val);
+ _oplogTsAccessor->reset(false, ownedTag, ownedVal);
+ }
+
it->second->reset(tag, val);
if ((--fieldsToMatch) == 0) {
@@ -413,6 +435,8 @@ std::vector<DebugPrinter::Block> ScanStage::debugPrint() const {
ret.emplace_back(_forward ? "true" : "false");
+ ret.emplace_back(_oplogTsAccessor ? "true" : "false");
+
return ret;
}
diff --git a/src/mongo/db/exec/sbe/stages/scan.h b/src/mongo/db/exec/sbe/stages/scan.h
index 485afd63e0b..1f749d922dd 100644
--- a/src/mongo/db/exec/sbe/stages/scan.h
+++ b/src/mongo/db/exec/sbe/stages/scan.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/exec/sbe/expressions/expression.h"
#include "mongo/db/exec/sbe/stages/collection_helpers.h"
#include "mongo/db/exec/sbe/stages/stages.h"
#include "mongo/db/exec/sbe/values/bson.h"
@@ -59,6 +60,7 @@ public:
boost::optional<value::SlotId> snapshotIdSlot,
boost::optional<value::SlotId> indexIdSlot,
boost::optional<value::SlotId> indexKeySlot,
+ boost::optional<value::SlotId> oplogTsSlot,
std::vector<std::string> fields,
value::SlotVector vars,
boost::optional<value::SlotId> seekKeySlot,
@@ -94,8 +96,11 @@ private:
const boost::optional<value::SlotId> _snapshotIdSlot;
const boost::optional<value::SlotId> _indexIdSlot;
const boost::optional<value::SlotId> _indexKeySlot;
+ const boost::optional<value::SlotId> _oplogTsSlot;
+
const std::vector<std::string> _fields;
const value::SlotVector _vars;
+
const boost::optional<value::SlotId> _seekKeySlot;
const bool _forward;
@@ -113,6 +118,7 @@ private:
value::SlotAccessor* _snapshotIdAccessor{nullptr};
value::SlotAccessor* _indexIdAccessor{nullptr};
value::SlotAccessor* _keyStringAccessor{nullptr};
+ RuntimeEnvironment::Accessor* _oplogTsAccessor{nullptr};
value::FieldAccessorMap _fieldAccessors;
value::SlotAccessorMap _varAccessors;
diff --git a/src/mongo/db/exec/sbe_cmd.cpp b/src/mongo/db/exec/sbe_cmd.cpp
index 42b11636853..0d58f7731a9 100644
--- a/src/mongo/db/exec/sbe_cmd.cpp
+++ b/src/mongo/db/exec/sbe_cmd.cpp
@@ -69,7 +69,8 @@ public:
uassertStatusOK(CursorRequest::parseCommandCursorOptions(
cmdObj, query_request_helper::kDefaultBatchSize, &batchSize));
- sbe::Parser parser;
+ auto env = std::make_unique<sbe::RuntimeEnvironment>();
+ sbe::Parser parser(env.get());
auto root = parser.parse(opCtx, dbname, cmdObj["sbe"].String());
auto [resultSlot, recordIdSlot] = parser.getTopLevelSlots();
@@ -83,7 +84,7 @@ public:
CanonicalQuery::canonicalize(opCtx, std::make_unique<FindCommandRequest>(nss));
std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
- stage_builder::PlanStageData data{std::make_unique<sbe::RuntimeEnvironment>()};
+ stage_builder::PlanStageData data{std::move(env)};
if (resultSlot) {
data.outputs.set(stage_builder::PlanStageSlots::kResult, *resultSlot);
diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp
index bfae6954a90..822e28883d8 100644
--- a/src/mongo/db/query/plan_executor_sbe.cpp
+++ b/src/mongo/db/query/plan_executor_sbe.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/query/plan_insert_listener.h"
#include "mongo/db/query/sbe_stage_builder.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/resharding/resume_token_gen.h"
namespace mongo {
// This failpoint is defined by the classic executor but is also accessed here.
@@ -78,9 +79,8 @@ PlanExecutorSBE::PlanExecutorSBE(OperationContext* opCtx,
uassert(4822866, "Query does not have recordId slot.", _resultRecordId);
}
- if (auto slot = _rootData.outputs.getIfExists(stage_builder::PlanStageSlots::kOplogTs); slot) {
- _oplogTs = _root->getAccessor(_rootData.ctx, *slot);
- uassert(4822867, "Query does not have oplogTs slot.", _oplogTs);
+ if (_rootData.shouldTrackLatestOplogTimestamp) {
+ _oplogTs = _rootData.env->getAccessor(_rootData.env->getSlot("oplogTs"_sd));
}
if (winner.data.shouldUseTailableScan) {
@@ -270,7 +270,10 @@ PlanExecutor::ExecState PlanExecutorSBE::getNext(BSONObj* out, RecordId* dlOut)
Timestamp PlanExecutorSBE::getLatestOplogTimestamp() const {
if (_rootData.shouldTrackLatestOplogTimestamp) {
- invariant(_oplogTs);
+ tassert(5567201,
+ "The '_oplogTs' accessor should be populated when "
+ "'shouldTrackLatestOplogTimestamp' is true",
+ _oplogTs);
auto [tag, val] = _oplogTs->getViewOfValue();
if (tag != sbe::value::TypeTags::Nothing) {
@@ -301,6 +304,11 @@ BSONObj PlanExecutorSBE::getPostBatchResumeToken() const {
return BSON("$recordId" << sbe::value::bitcastTo<int64_t>(val));
}
}
+
+ if (_rootData.shouldTrackLatestOplogTimestamp) {
+ return ResumeTokenOplogTimestamp{getLatestOplogTimestamp()}.toBSON();
+ }
+
return {};
}
diff --git a/src/mongo/db/query/plan_executor_sbe.h b/src/mongo/db/query/plan_executor_sbe.h
index ebb092927be..918b2c8670a 100644
--- a/src/mongo/db/query/plan_executor_sbe.h
+++ b/src/mongo/db/query/plan_executor_sbe.h
@@ -150,8 +150,8 @@ private:
sbe::value::SlotAccessor* _resultRecordId{nullptr};
sbe::value::TypeTags _tagLastRecordId{sbe::value::TypeTags::Nothing};
sbe::value::Value _valLastRecordId{0};
+ sbe::RuntimeEnvironment::Accessor* _oplogTs{nullptr};
- sbe::value::SlotAccessor* _oplogTs{nullptr};
boost::optional<sbe::value::SlotId> _resumeRecordIdSlot;
std::queue<std::pair<BSONObj, boost::optional<RecordId>>> _stash;
diff --git a/src/mongo/db/query/sbe_stage_builder.cpp b/src/mongo/db/query/sbe_stage_builder.cpp
index 59923ef4f39..83d2c9b1f37 100644
--- a/src/mongo/db/query/sbe_stage_builder.cpp
+++ b/src/mongo/db/query/sbe_stage_builder.cpp
@@ -259,9 +259,6 @@ std::string PlanStageData::debugString() const {
if (auto slot = outputs.getIfExists(PlanStageSlots::kRecordId); slot) {
builder << "$$RID=s" << *slot << " ";
}
- if (auto slot = outputs.getIfExists(PlanStageSlots::kOplogTs); slot) {
- builder << "$$OPLOGTS=s" << *slot << " ";
- }
env->debugString(&builder);
@@ -429,13 +426,10 @@ std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::build(const QuerySolution
_buildHasStarted = true;
// We always produce a 'resultSlot' and conditionally produce a 'recordIdSlot' based on the
- // 'shouldProduceRecordIdSlot'. If the solution contains a CollectionScanNode with the
- // 'shouldTrackLatestOplogTimestamp' flag set to true, then we will also produce an
- // 'oplogTsSlot'.
+ // 'shouldProduceRecordIdSlot'.
PlanStageReqs reqs;
reqs.set(kResult);
reqs.setIf(kRecordId, _shouldProduceRecordIdSlot);
- reqs.setIf(kOplogTs, _data.shouldTrackLatestOplogTimestamp);
// Build the SBE plan stage tree.
auto [stage, outputs] = build(root, reqs);
@@ -445,7 +439,6 @@ std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::build(const QuerySolution
// it's needed.
invariant(outputs.has(kResult));
invariant(!_shouldProduceRecordIdSlot || outputs.has(kRecordId));
- invariant(!_data.shouldTrackLatestOplogTimestamp || outputs.has(kOplogTs));
_data.outputs = std::move(outputs);
@@ -477,9 +470,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
sbe::makeE<sbe::EFunction>("newObj", sbe::makeEs()));
}
- // Assert that generateCollScan() generated an oplogTsSlot if it's needed.
- invariant(!reqs.has(kOplogTs) || outputs.has(kOplogTs));
-
return {std::move(stage), std::move(outputs)};
}
@@ -493,9 +483,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
invariant(!reqs.getIndexKeyBitset());
}
- // Virtual scans cannot produce an oplogTsSlot, so assert that the caller doesn't need it.
- invariant(!reqs.has(kOplogTs));
-
auto [inputTag, inputVal] = sbe::value::makeNewArray();
sbe::value::ValueGuard inputGuard{inputTag, inputVal};
auto inputView = sbe::value::getArrayView(inputVal);
@@ -563,9 +550,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
auto ixn = static_cast<const IndexScanNode*>(root);
invariant(reqs.has(kReturnKey) || !ixn->addKeyMetadata);
- // Index scans cannot produce an oplogTsSlot, so assert that the caller doesn't need it.
- invariant(!reqs.has(kOplogTs));
-
sbe::IndexKeysInclusionSet indexKeyBitset;
if (reqs.has(PlanStageSlots::kReturnKey) || reqs.has(PlanStageSlots::kResult)) {
@@ -661,6 +645,7 @@ SlotBasedStageBuilder::makeLoopJoinForFetch(std::unique_ptr<sbe::PlanStage> inpu
snapshotIdSlot,
indexIdSlot,
keyStringSlot,
+ boost::none,
std::vector<std::string>{},
sbe::makeSV(),
seekKeySlot,
@@ -686,10 +671,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
const QuerySolutionNode* root, const PlanStageReqs& reqs) {
auto fn = static_cast<const FetchNode*>(root);
- // At present, makeLoopJoinForFetch() doesn't have the necessary logic for producing an
- // oplogTsSlot, so assert that the caller doesn't need oplogTsSlot.
- invariant(!reqs.has(kOplogTs));
-
// The child must produce all of the slots required by the parent of this FetchNode, except for
// 'resultSlot' which will be produced by the call to makeLoopJoinForFetch() below. In addition
// to that, the child must always produce a 'recordIdSlot' because it's needed for the call to
@@ -1486,7 +1467,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
const QuerySolutionNode* root, const PlanStageReqs& reqs) {
tassert(5432212, "no collection object", _collection);
tassert(5432213, "index keys requsted for text match node", !reqs.getIndexKeyBitset());
- tassert(5432214, "oplogTs requsted for text match node", !reqs.has(kOplogTs));
tassert(5432215,
str::stream() << "text match node must have one child, but got "
<< root->children.size(),
diff --git a/src/mongo/db/query/sbe_stage_builder.h b/src/mongo/db/query/sbe_stage_builder.h
index 2e7978b03c8..cc8baac65ba 100644
--- a/src/mongo/db/query/sbe_stage_builder.h
+++ b/src/mongo/db/query/sbe_stage_builder.h
@@ -59,7 +59,6 @@ public:
static constexpr StringData kResult = "result"_sd;
static constexpr StringData kRecordId = "recordId"_sd;
static constexpr StringData kReturnKey = "returnKey"_sd;
- static constexpr StringData kOplogTs = "oplogTs"_sd;
static constexpr StringData kSnapshotId = "snapshotId"_sd;
static constexpr StringData kIndexId = "indexId"_sd;
static constexpr StringData kIndexKey = "indexKey"_sd;
@@ -256,7 +255,6 @@ public:
static constexpr StringData kResult = PlanStageSlots::kResult;
static constexpr StringData kRecordId = PlanStageSlots::kRecordId;
static constexpr StringData kReturnKey = PlanStageSlots::kReturnKey;
- static constexpr StringData kOplogTs = PlanStageSlots::kOplogTs;
static constexpr StringData kSnapshotId = PlanStageSlots::kSnapshotId;
static constexpr StringData kIndexId = PlanStageSlots::kIndexId;
static constexpr StringData kIndexKey = PlanStageSlots::kIndexKey;
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
index bccc263eace..890d443dae7 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
@@ -51,6 +51,33 @@
namespace mongo::stage_builder {
namespace {
+
+boost::optional<sbe::value::SlotId> registerOplogTs(sbe::RuntimeEnvironment* env,
+ sbe::value::SlotIdGenerator* slotIdGenerator) {
+ auto slotId = env->getSlotIfExists("oplogTs"_sd);
+ if (!slotId) {
+ return env->registerSlot(
+ "oplogTs"_sd, sbe::value::TypeTags::Nothing, 0, false, slotIdGenerator);
+ }
+ return slotId;
+}
+
+/**
+ * If 'shouldTrackLatestOplogTimestamp' is true, then returns a vector holding the name of the oplog
+ * 'ts' field along with another vector holding a SlotId to map this field to, as well as the
+ * standalone value of the same SlotId (the latter is returned purely for convenience purposes).
+ */
+std::tuple<std::vector<std::string>, sbe::value::SlotVector, boost::optional<sbe::value::SlotId>>
+makeOplogTimestampSlotsIfNeeded(sbe::RuntimeEnvironment* env,
+ sbe::value::SlotIdGenerator* slotIdGenerator,
+ bool shouldTrackLatestOplogTimestamp) {
+ if (shouldTrackLatestOplogTimestamp) {
+ auto slotId = registerOplogTs(env, slotIdGenerator);
+ return {{repl::OpTime::kTimestampFieldName.toString()}, sbe::makeSV(*slotId), slotId};
+ }
+ return {};
+}
+
/**
* Checks whether a callback function should be created for a ScanStage and returns it, if so. The
* logic in the provided callback will be executed when the ScanStage is opened or reopened.
@@ -83,24 +110,6 @@ sbe::ScanOpenCallback makeOpenCallbackIfNeeded(const CollectionPtr& collection,
}
/**
- * If 'shouldTrackLatestOplogTimestamp' returns a vector holding the name of the oplog 'ts' field
- * along with another vector holding a SlotId to map this field to, as well as the standalone value
- * of the same SlotId (the latter is returned purely for convenience purposes).
- */
-std::tuple<std::vector<std::string>, sbe::value::SlotVector, boost::optional<sbe::value::SlotId>>
-makeOplogTimestampSlotsIfNeeded(const CollectionPtr& collection,
- sbe::value::SlotIdGenerator* slotIdGenerator,
- bool shouldTrackLatestOplogTimestamp) {
- if (shouldTrackLatestOplogTimestamp) {
- invariant(collection->ns().isOplog());
-
- auto tsSlot = slotIdGenerator->generate();
- return {{repl::OpTime::kTimestampFieldName.toString()}, sbe::makeSV(tsSlot), tsSlot};
- }
- return {};
-};
-
-/**
* Creates a collection scan sub-tree optimized for oplog scans. We can built an optimized scan
* when there is a predicted on the 'ts' field of the oplog collection.
*
@@ -158,8 +167,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
// of if we need to track the latest oplog timestamp.
const auto shouldTrackLatestOplogTimestamp =
(csn->maxRecord || csn->shouldTrackLatestOplogTimestamp);
- auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded(
- collection, slotIdGenerator, shouldTrackLatestOplogTimestamp);
+ auto&& [fields, slots, tsSlot] =
+ makeOplogTimestampSlotsIfNeeded(env, slotIdGenerator, shouldTrackLatestOplogTimestamp);
sbe::ScanCallbacks callbacks(
lockAcquisitionCallback, {}, makeOpenCallbackIfNeeded(collection, csn));
@@ -169,6 +178,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
boost::none /* snapshotIdSlot */,
boost::none /* indexIdSlot */,
boost::none /* indexKeySlot */,
+ tsSlot,
std::move(fields),
std::move(slots),
seekRecordIdSlot,
@@ -204,23 +214,18 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
if (csn->assertTsHasNotFallenOffOplog && !isTailableResumeBranch) {
invariant(csn->shouldTrackLatestOplogTimestamp);
- // We will be constructing a filter that needs to see the 'ts' field. We name it 'minTsSlot'
- // here so that it does not shadow the 'tsSlot' which we allocated earlier.
- auto&& [fields, minTsSlots, minTsSlot] = makeOplogTimestampSlotsIfNeeded(
- collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp);
-
- // We should always have allocated a 'minTsSlot', and there should always be a 'tsSlot'
- // already allocated for the existing scan that we created previously.
- invariant(minTsSlot);
+ // There should always be a 'tsSlot' already allocated on the RuntimeEnvironment for the
+ // existing scan that we created previously.
invariant(tsSlot);
- // Our filter will also need to see the 'op' and 'o.msg' fields.
+ // We will be constructing a filter that needs to see the 'ts' field. We name it 'minTsSlot'
+ // here so that it does not shadow the 'tsSlot' which we allocated earlier. Our filter will
+ // also need to see the 'op' and 'o.msg' fields.
auto opTypeSlot = slotIdGenerator->generate();
auto oObjSlot = slotIdGenerator->generate();
- minTsSlots.push_back(opTypeSlot);
- minTsSlots.push_back(oObjSlot);
- fields.push_back("op");
- fields.push_back("o");
+ auto minTsSlot = slotIdGenerator->generate();
+ sbe::value::SlotVector minTsSlots = {minTsSlot, opTypeSlot, oObjSlot};
+ std::vector<std::string> fields = {repl::OpTime::kTimestampFieldName.toString(), "op", "o"};
// If the first entry we see in the oplog is the replset initialization, then it doesn't
// matter if its timestamp is later than the specified minTs; no events earlier than the
@@ -251,6 +256,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
boost::none,
boost::none,
boost::none,
+ boost::none,
std::move(fields),
minTsSlots, /* don't move this */
boost::none,
@@ -262,7 +268,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
makeBinaryOp(
sbe::EPrimBinary::logicOr,
makeBinaryOp(sbe::EPrimBinary::lessEq,
- makeVariable(*minTsSlot),
+ makeVariable(minTsSlot),
makeConstant(sbe::value::TypeTags::Timestamp,
csn->assertTsHasNotFallenOffOplog->asULL())),
makeBinaryOp(
@@ -365,9 +371,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
invariant(csn->minRecord);
invariant(csn->direction == CollectionScanParams::FORWARD);
- std::tie(fields, slots, tsSlot) = makeOplogTimestampSlotsIfNeeded(
- collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp);
-
seekRecordIdSlot = recordIdSlot;
resultSlot = slotIdGenerator->generate();
recordIdSlot = slotIdGenerator->generate();
@@ -380,8 +383,9 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
boost::none,
boost::none,
boost::none,
- std::move(fields),
- std::move(slots),
+ boost::none,
+ std::vector<std::string>(),
+ sbe::makeSV(),
seekRecordIdSlot,
true /* forward */,
yieldPolicy,
@@ -401,10 +405,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
outputs.set(PlanStageSlots::kResult, resultSlot);
outputs.set(PlanStageSlots::kRecordId, recordIdSlot);
- if (csn->shouldTrackLatestOplogTimestamp) {
- outputs.set(PlanStageSlots::kOplogTs, *tsSlot);
- }
-
return {std::move(stage), std::move(outputs)};
}
@@ -443,8 +443,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
}();
// See if we need to project out an oplog latest timestamp.
- auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded(
- collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp);
+ auto&& [fields, slots, tsSlot] =
+ makeOplogTimestampSlotsIfNeeded(env, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp);
sbe::ScanCallbacks callbacks(
lockAcquisitionCallback, {}, makeOpenCallbackIfNeeded(collection, csn));
@@ -454,6 +454,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
boost::none,
boost::none,
boost::none,
+ tsSlot,
std::move(fields),
std::move(slots),
seekRecordIdSlot,
@@ -487,6 +488,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
boost::none,
boost::none,
boost::none,
+ boost::none,
std::vector<std::string>{},
sbe::makeSV(),
seekSlot,
@@ -542,9 +544,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
invariant(!csn->stopApplyingFilterAfterFirstMatch);
auto relevantSlots = sbe::makeSV(resultSlot, recordIdSlot);
- if (tsSlot) {
- relevantSlots.push_back(*tsSlot);
- }
std::tie(std::ignore, stage) = generateFilter(opCtx,
csn->filter.get(),
@@ -561,10 +560,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
outputs.set(PlanStageSlots::kResult, resultSlot);
outputs.set(PlanStageSlots::kRecordId, recordIdSlot);
- if (tsSlot) {
- outputs.set(PlanStageSlots::kOplogTs, *tsSlot);
- }
-
return {std::move(stage), std::move(outputs)};
}
} // namespace