diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-09-14 13:21:20 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-09-22 09:57:39 +0100 |
commit | df084aabb81132ed05ee539fb7eb32fdd863ccbe (patch) | |
tree | 193eed90158a33bb243828babc15224f869e9be9 | |
parent | 88c3e89fc1bf7d437cd60d469537a5c3b495a456 (diff) | |
download | rabbitmq-server-git-df084aabb81132ed05ee539fb7eb32fdd863ccbe.tar.gz |
Quorum Queue Peek command
Allow peeking of messages into a quorum queue. This uses a dedicated aux
command to peek at a message at the given 1-indexed position in the
queue.
-rw-r--r-- | src/rabbit_basic.erl | 34 | ||||
-rw-r--r-- | src/rabbit_fifo.erl | 55 | ||||
-rw-r--r-- | src/rabbit_fifo.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 30 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 34 | ||||
-rw-r--r-- | test/rabbit_fifo_SUITE.erl | 12 |
6 files changed, 150 insertions, 17 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index f09583eb68..9c5a5c8775 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -15,7 +15,8 @@ header_routes/1, parse_expiration/1, header/2, header/3]). -export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1, maybe_gc_large_msg/2]). --export([add_header/4]). +-export([add_header/4, + peek_fmt_message/1]). %%---------------------------------------------------------------------------- @@ -319,3 +320,34 @@ add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) -> rabbit_misc:set_table_value(Headers, Name, Type, Value) end, Content0), Msg#basic_message{content = Content}. + +peek_fmt_message(#basic_message{exchange_name = Ex, + routing_keys = RKeys, + content = + #content{payload_fragments_rev = Payl0, + properties = Props}}) -> + Fields = [atom_to_binary(F, utf8) || F <- record_info(fields, 'P_basic')], + T = lists:zip(Fields, tl(tuple_to_list(Props))), + lists:foldl( + fun ({<<"headers">>, Hdrs}, Acc) -> + case Hdrs of + [] -> + Acc; + _ -> + Acc ++ [{header_key(H), V} || {H, _T, V} <- Hdrs] + end; + ({_, undefined}, Acc) -> + Acc; + (KV, Acc) -> + [KV | Acc] + end, [], [{<<"payload (max 64 bytes)">>, + %% restric payload to 64 bytes + binary_prefix_64(iolist_to_binary(lists:reverse(Payl0)), 64)}, + {<<"exchange">>, Ex#resource.name}, + {<<"routing_keys">>, RKeys} | T]). + +header_key(A) -> + <<"header.", A/binary>>. + +binary_prefix_64(Bin, Len) -> + binary:part(Bin, 0, min(byte_size(Bin), Len)). diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d03a997c85..79b2bb4f72 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -40,6 +40,7 @@ query_stat/1, query_single_active_consumer/1, query_in_memory_usage/1, + query_peek/2, usage/1, zero/1, @@ -320,7 +321,7 @@ apply(#{index := Index, end, {Reply, Effects2} = case Msg of - {RaftIdx, {Header, 'empty'}} -> + {RaftIdx, {Header, empty}} -> %% TODO add here new log effect with reply {'$ra_no_reply', [reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From) | @@ -763,20 +764,33 @@ handle_aux(leader, _, garbage_collection, State, Log, _MacState) -> handle_aux(follower, _, garbage_collection, State, Log, MacState) -> ra_log_wal:force_roll_over(ra_log_wal), {no_reply, force_eval_gc(Log, MacState, State), Log}; -handle_aux(_RaState, cast, Cmd, #aux{name = Name, - utilisation = Use0} = State0, +handle_aux(_RaState, cast, eval, Aux0, Log, _MacState) -> + {no_reply, Aux0, Log}; +handle_aux(_RaState, cast, Cmd, #aux{utilisation = Use0} = Aux0, + Log, _MacState) + when Cmd == active orelse Cmd == inactive -> + {no_reply, Aux0#aux{utilisation = update_use(Use0, Cmd)}, Log}; +handle_aux(_RaState, cast, tick, #aux{name = Name, + utilisation = Use0} = State0, Log, MacState) -> - State = case Cmd of - _ when Cmd == active orelse Cmd == inactive -> - State0#aux{utilisation = update_use(Use0, Cmd)}; - tick -> - true = ets:insert(rabbit_fifo_usage, - {Name, utilisation(Use0)}), - eval_gc(Log, MacState, State0); - eval -> - State0 - end, - {no_reply, State, Log}. + true = ets:insert(rabbit_fifo_usage, + {Name, utilisation(Use0)}), + Aux = eval_gc(Log, MacState, State0), + {no_reply, Aux, Log}; +handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0, + Log0, MacState) -> + case rabbit_fifo:query_peek(Pos, MacState) of + {ok, {Idx, {Header, empty}}} -> + %% need to re-hydrate from the log + {{_, _, {_, _, Cmd, _}}, Log} = ra_log:fetch(Idx, Log0), + #enqueue{msg = Msg} = Cmd, + {reply, {ok, {Header, Msg}}, Aux0, Log}; + {ok, {_Idx, {Header, Msg}}} -> + {reply, {ok, {Header, Msg}}, Aux0, Log0}; + Err -> + {reply, Err, Aux0, Log0} + end. + eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState, #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) -> @@ -896,6 +910,7 @@ query_consumers(#?MODULE{consumers = Consumers, end, #{}, WaitingConsumers), maps:merge(FromConsumers, FromWaitingConsumers). + query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active}, consumers = Consumers}) -> case maps:size(Consumers) of @@ -917,6 +932,18 @@ query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes, msgs_ready_in_memory = Length}) -> {Length, Bytes}. +query_peek(Pos, State0) when Pos > 0 -> + case take_next_msg(State0) of + empty -> + {error, no_message_at_pos}; + {{_Seq, IdxMsg}, _State} + when Pos == 1 -> + {ok, IdxMsg}; + {_Msg, State} -> + query_peek(Pos-1, State) + end. + + -spec usage(atom()) -> float(). usage(Name) when is_atom(Name) -> case ets:lookup(rabbit_fifo_usage, Name) of diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 4c87167ea1..3329915125 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -144,7 +144,7 @@ -record(rabbit_fifo, {cfg :: #cfg{}, % unassigned messages - messages = lqueue:new() :: lqueue:queue(), + messages = lqueue:new() :: lqueue:lqueue(), % defines the next message id next_msg_num = 1 :: msg_in_id(), % queue of returned msg_in_ids - when checking out it picks from diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index bb4c320b1f..33f8166da6 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -23,6 +23,7 @@ -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). +-export([peek/2, peek/3]). -export([add_member/4]). -export([delete_member/3]). -export([requeue/3]). @@ -1345,6 +1346,35 @@ leader(Q) when ?is_amqqueue(Q) -> false -> '' end. +peek(Vhost, Queue, Pos) -> + peek(Pos, rabbit_misc:r(Vhost, queue, Queue)). + +peek(Pos, #resource{} = QName) -> + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + peek(Pos, Q); + Err -> + Err + end; +peek(Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_quorum(Q) -> + LeaderPid = amqqueue:get_pid(Q), + case ra:aux_command(LeaderPid, {peek, Pos}) of + {ok, {MsgHeader, Msg0}} -> + Count = case MsgHeader of + #{delivery_count := C} -> C; + _ -> 0 + end, + Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, + Count, Msg0), + {ok, rabbit_basic:peek_fmt_message(Msg)}; + {error, Err} -> + {error, Err}; + Err -> + Err + end; +peek(_Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_classic(Q) -> + {error, classic_queue_not_supported}. + online(Q) when ?is_amqqueue(Q) -> Nodes = get_nodes(Q), {Name, _} = amqqueue:get_pid(Q), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 15f3558046..e871e7497d 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -131,7 +131,8 @@ all_tests() -> invalid_policy, delete_if_empty, delete_if_unused, - queue_ttl + queue_ttl, + peek ]. memory_tests() -> @@ -2397,6 +2398,37 @@ queue_length_in_memory_purge(Config) -> ?assertEqual([{0, 0}], dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). +peek(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 2}])), + + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + + QName = rabbit_misc:r(<<"/">>, queue, QQ), + ?assertMatch({error, no_message_at_pos}, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + peek, [1, QName])), + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + + ?assertMatch({ok, [_|_]}, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + peek, [1, QName])), + ?assertMatch({ok, [_|_]}, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + peek, [2, QName])), + ?assertMatch({error, no_message_at_pos}, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + peek, [3, QName])), + ok. + in_memory(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 7778e04afb..8431dd8db7 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -1619,6 +1619,18 @@ queue_ttl_with_single_active_consumer_test(_) -> ok. +query_peek_test(_) -> + State0 = test_init(test), + ?assertEqual({error, no_message_at_pos}, rabbit_fifo:query_peek(1, State0)), + {State1, _} = enq(1, 1, first, State0), + {State2, _} = enq(2, 2, second, State1), + ?assertMatch({ok, {_, {_, first}}}, rabbit_fifo:query_peek(1, State1)), + ?assertEqual({error, no_message_at_pos}, rabbit_fifo:query_peek(2, State1)), + ?assertMatch({ok, {_, {_, first}}}, rabbit_fifo:query_peek(1, State2)), + ?assertMatch({ok, {_, {_, second}}}, rabbit_fifo:query_peek(2, State2)), + ?assertEqual({error, no_message_at_pos}, rabbit_fifo:query_peek(3, State2)), + ok. + %% Utility init(Conf) -> rabbit_fifo:init(Conf). |