diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-27 14:49:38 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-27 14:49:38 +0100 |
commit | ffd66027afbfbd880553aa8b82e6f29d6aa647eb (patch) | |
tree | fde05e00374b9aef313e60eea3ab7ae15a64265d | |
parent | 60ba89aabe86d23a629359d61d9d9fe8c383e69a (diff) | |
parent | 43cfb45a74ec85fd51303a0765ebd272937077fb (diff) | |
download | rabbitmq-server-git-ffd66027afbfbd880553aa8b82e6f29d6aa647eb.tar.gz |
Merge pull request #2506 from rabbitmq/stream-timestamp-offset
Support timestamp offsets for stream consumers
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 3 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 83 |
2 files changed, 86 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 8841e52ce3..223e19c713 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -182,6 +182,8 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) -> last; {_, <<"next">>} -> next; + {timestamp, V} -> + {timestamp, V * 1000}; {_, V} -> V end, @@ -219,6 +221,7 @@ begin_stream(#stream_client{readers = Readers0} = State, first -> NextOffset; last -> NextOffset; next -> NextOffset; + {timestamp, _} -> NextOffset; _ -> Offset end, Str0 = #stream{name = amqqueue:get_name(Q), diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 401e470eea..25f3c05c66 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -74,6 +74,8 @@ all_tests() -> consume_without_qos, consume, consume_offset, + consume_timestamp_offset, + consume_timestamp_last_offset, basic_get, consume_with_autoack, consume_and_nack, @@ -599,6 +601,87 @@ consume_offset(Config) -> end) end, [], 25). +consume_timestamp_offset(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + Payload = <<"111">>, + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5), + + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + Offset = erlang:system_time(second) - 60, + amqp_channel:subscribe( + Ch1, + #'basic.consume'{queue = Q, + no_ack = false, + consumer_tag = <<"ctag">>, + arguments = [{<<"x-stream-offset">>, timestamp, Offset}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end, + + %% It has subscribed to a very old timestamp, so we will receive the whole stream + receive_batch(Ch1, 0, 99). + +consume_timestamp_last_offset(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, <<"111">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5), + + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + %% Subscribe from now/future + Offset = erlang:system_time(second) + 60, + amqp_channel:subscribe( + Ch1, + #'basic.consume'{queue = Q, + no_ack = false, + consumer_tag = <<"ctag">>, + arguments = [{<<"x-stream-offset">>, timestamp, Offset}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end, + + receive + {_, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}} + when S < 100 -> + exit({unexpected_offset, S}) + after 1000 -> + ok + end, + + %% Publish a few more + [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5), + + %% Yeah! we got them + receive_batch(Ch1, 100, 199). + basic_get(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |