summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-05-11 13:50:00 +0100
committerGitHub <noreply@github.com>2021-05-11 13:50:00 +0100
commit94e943692b509b770101ed80a9f439251defeb31 (patch)
tree6708f98ac3587a829a8df98de7e4c07f7f50d07e
parentc13c2af6145cd4a0d13238e85f9a5680e470fb62 (diff)
parent5a5042521d78644fc84be5e2f127c5479aa4fe62 (diff)
downloadrabbitmq-server-git-94e943692b509b770101ed80a9f439251defeb31.tar.gz
Merge pull request #3022 from rabbitmq/relative-time-offset
Support relative time based offset specs
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl57
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl31
2 files changed, 65 insertions, 23 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index d982c88469..85fc1539c8 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -182,33 +182,44 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
args := Args,
ok_msg := OkMsg} = Spec,
QName = amqqueue:get_name(Q),
- Offset = case rabbit_misc:table_lookup(Args, <<"x-stream-offset">>) of
- undefined ->
- next;
- {_, <<"first">>} ->
- first;
- {_, <<"last">>} ->
- last;
- {_, <<"next">>} ->
- next;
- {timestamp, V} ->
- {timestamp, V * 1000};
- {_, V} ->
- V
- end,
- rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, QName,
- ConsumerPrefetchCount, false,
- up, Args),
- %% FIXME: reply needs to be sent before the stream begins sending
- %% really it should be sent by the stream queue process like classic queues
- %% do
- maybe_send_reply(ChPid, OkMsg),
- begin_stream(QState0, Q, ConsumerTag, Offset, ConsumerPrefetchCount);
+ case parse_offset(rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of
+ {error, _} = Err ->
+ Err;
+ OffsetSpec ->
+ rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck, QName,
+ ConsumerPrefetchCount, false,
+ up, Args),
+ %% FIXME: reply needs to be sent before the stream begins sending
+ %% really it should be sent by the stream queue process like classic queues
+ %% do
+ maybe_send_reply(ChPid, OkMsg),
+ begin_stream(QState0, Q, ConsumerTag, OffsetSpec, ConsumerPrefetchCount)
+ end;
Err ->
Err
end.
+parse_offset(undefined) ->
+ next;
+parse_offset({_, <<"first">>}) ->
+ first;
+parse_offset({_, <<"last">>}) ->
+ last;
+parse_offset({_, <<"next">>}) ->
+ next;
+parse_offset({timestamp, V}) ->
+ {timestamp, V * 1000};
+parse_offset({longstr, V}) ->
+ case rabbit_amqqueue:check_max_age(V) of
+ {error, _} = Err ->
+ Err;
+ Ms ->
+ {timestamp, erlang:system_time(millisecond) - Ms}
+ end;
+parse_offset({_, V}) ->
+ V.
+
get_local_pid(#stream_client{local_pid = Pid} = State)
when is_pid(Pid) ->
{Pid, State};
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index e87d460b77..806a5706a8 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -93,6 +93,7 @@ all_tests() ->
consume_from_last,
consume_from_next,
consume_from_default,
+ consume_from_relative_time_offset,
consume_credit,
consume_credit_out_of_order_ack,
consume_credit_multiple_ack,
@@ -1099,6 +1100,36 @@ consume_from_next(Config, Args) ->
receive_batch(Ch1, 100, 199),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+consume_from_relative_time_offset(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ amqp_channel:subscribe(
+ Ch1, #'basic.consume'{queue = Q,
+ no_ack = false,
+ consumer_tag = <<"ctag">>,
+ arguments = [{<<"x-stream-offset">>, longstr, <<"100s">>}]},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end,
+
+ receive_batch(Ch1, 0, 99),
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+
consume_from_replica(Config) ->
[Server1, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),