summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-27 14:49:38 +0100
committerGitHub <noreply@github.com>2020-11-27 14:49:38 +0100
commitffd66027afbfbd880553aa8b82e6f29d6aa647eb (patch)
treefde05e00374b9aef313e60eea3ab7ae15a64265d
parent60ba89aabe86d23a629359d61d9d9fe8c383e69a (diff)
parent43cfb45a74ec85fd51303a0765ebd272937077fb (diff)
downloadrabbitmq-server-git-ffd66027afbfbd880553aa8b82e6f29d6aa647eb.tar.gz
Merge pull request #2506 from rabbitmq/stream-timestamp-offset
Support timestamp offsets for stream consumers
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl3
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl83
2 files changed, 86 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 8841e52ce3..223e19c713 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -182,6 +182,8 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
last;
{_, <<"next">>} ->
next;
+ {timestamp, V} ->
+ {timestamp, V * 1000};
{_, V} ->
V
end,
@@ -219,6 +221,7 @@ begin_stream(#stream_client{readers = Readers0} = State,
first -> NextOffset;
last -> NextOffset;
next -> NextOffset;
+ {timestamp, _} -> NextOffset;
_ -> Offset
end,
Str0 = #stream{name = amqqueue:get_name(Q),
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 401e470eea..25f3c05c66 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -74,6 +74,8 @@ all_tests() ->
consume_without_qos,
consume,
consume_offset,
+ consume_timestamp_offset,
+ consume_timestamp_last_offset,
basic_get,
consume_with_autoack,
consume_and_nack,
@@ -599,6 +601,87 @@ consume_offset(Config) ->
end)
end, [], 25).
+consume_timestamp_offset(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ Payload = <<"111">>,
+ [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5),
+
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+
+ Offset = erlang:system_time(second) - 60,
+ amqp_channel:subscribe(
+ Ch1,
+ #'basic.consume'{queue = Q,
+ no_ack = false,
+ consumer_tag = <<"ctag">>,
+ arguments = [{<<"x-stream-offset">>, timestamp, Offset}]},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end,
+
+ %% It has subscribed to a very old timestamp, so we will receive the whole stream
+ receive_batch(Ch1, 0, 99).
+
+consume_timestamp_last_offset(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ [publish(Ch, Q, <<"111">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5),
+
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+
+ %% Subscribe from now/future
+ Offset = erlang:system_time(second) + 60,
+ amqp_channel:subscribe(
+ Ch1,
+ #'basic.consume'{queue = Q,
+ no_ack = false,
+ consumer_tag = <<"ctag">>,
+ arguments = [{<<"x-stream-offset">>, timestamp, Offset}]},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end,
+
+ receive
+ {_,
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}}
+ when S < 100 ->
+ exit({unexpected_offset, S})
+ after 1000 ->
+ ok
+ end,
+
+ %% Publish a few more
+ [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5),
+
+ %% Yeah! we got them
+ receive_batch(Ch1, 100, 199).
+
basic_get(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),