diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-05-11 13:50:00 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-11 13:50:00 +0100 |
commit | 94e943692b509b770101ed80a9f439251defeb31 (patch) | |
tree | 6708f98ac3587a829a8df98de7e4c07f7f50d07e | |
parent | c13c2af6145cd4a0d13238e85f9a5680e470fb62 (diff) | |
parent | 5a5042521d78644fc84be5e2f127c5479aa4fe62 (diff) | |
download | rabbitmq-server-git-94e943692b509b770101ed80a9f439251defeb31.tar.gz |
Merge pull request #3022 from rabbitmq/relative-time-offset
Support relative time based offset specs
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 57 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 31 |
2 files changed, 65 insertions, 23 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index d982c88469..85fc1539c8 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -182,33 +182,44 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) -> args := Args, ok_msg := OkMsg} = Spec, QName = amqqueue:get_name(Q), - Offset = case rabbit_misc:table_lookup(Args, <<"x-stream-offset">>) of - undefined -> - next; - {_, <<"first">>} -> - first; - {_, <<"last">>} -> - last; - {_, <<"next">>} -> - next; - {timestamp, V} -> - {timestamp, V * 1000}; - {_, V} -> - V - end, - rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, QName, - ConsumerPrefetchCount, false, - up, Args), - %% FIXME: reply needs to be sent before the stream begins sending - %% really it should be sent by the stream queue process like classic queues - %% do - maybe_send_reply(ChPid, OkMsg), - begin_stream(QState0, Q, ConsumerTag, Offset, ConsumerPrefetchCount); + case parse_offset(rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of + {error, _} = Err -> + Err; + OffsetSpec -> + rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck, QName, + ConsumerPrefetchCount, false, + up, Args), + %% FIXME: reply needs to be sent before the stream begins sending + %% really it should be sent by the stream queue process like classic queues + %% do + maybe_send_reply(ChPid, OkMsg), + begin_stream(QState0, Q, ConsumerTag, OffsetSpec, ConsumerPrefetchCount) + end; Err -> Err end. +parse_offset(undefined) -> + next; +parse_offset({_, <<"first">>}) -> + first; +parse_offset({_, <<"last">>}) -> + last; +parse_offset({_, <<"next">>}) -> + next; +parse_offset({timestamp, V}) -> + {timestamp, V * 1000}; +parse_offset({longstr, V}) -> + case rabbit_amqqueue:check_max_age(V) of + {error, _} = Err -> + Err; + Ms -> + {timestamp, erlang:system_time(millisecond) - Ms} + end; +parse_offset({_, V}) -> + V. + get_local_pid(#stream_client{local_pid = Pid} = State) when is_pid(Pid) -> {Pid, State}; diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index e87d460b77..806a5706a8 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -93,6 +93,7 @@ all_tests() -> consume_from_last, consume_from_next, consume_from_default, + consume_from_relative_time_offset, consume_credit, consume_credit_out_of_order_ack, consume_credit_multiple_ack, @@ -1099,6 +1100,36 @@ consume_from_next(Config, Args) -> receive_batch(Ch1, 100, 199), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +consume_from_relative_time_offset(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + amqp_channel:subscribe( + Ch1, #'basic.consume'{queue = Q, + no_ack = false, + consumer_tag = <<"ctag">>, + arguments = [{<<"x-stream-offset">>, longstr, <<"100s">>}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end, + + receive_batch(Ch1, 0, 99), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + consume_from_replica(Config) -> [Server1, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |