summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-09-14 13:21:20 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-22 09:57:39 +0100
commitdf084aabb81132ed05ee539fb7eb32fdd863ccbe (patch)
tree193eed90158a33bb243828babc15224f869e9be9
parent88c3e89fc1bf7d437cd60d469537a5c3b495a456 (diff)
downloadrabbitmq-server-git-df084aabb81132ed05ee539fb7eb32fdd863ccbe.tar.gz
Quorum Queue Peek command
Allow peeking of messages into a quorum queue. This uses a dedicated aux command to peek at a message at the given 1-indexed position in the queue.
-rw-r--r--src/rabbit_basic.erl34
-rw-r--r--src/rabbit_fifo.erl55
-rw-r--r--src/rabbit_fifo.hrl2
-rw-r--r--src/rabbit_quorum_queue.erl30
-rw-r--r--test/quorum_queue_SUITE.erl34
-rw-r--r--test/rabbit_fifo_SUITE.erl12
6 files changed, 150 insertions, 17 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index f09583eb68..9c5a5c8775 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -15,7 +15,8 @@
header_routes/1, parse_expiration/1, header/2, header/3]).
-export([build_content/2, from_content/1, msg_size/1,
maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
--export([add_header/4]).
+-export([add_header/4,
+ peek_fmt_message/1]).
%%----------------------------------------------------------------------------
@@ -319,3 +320,34 @@ add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) ->
rabbit_misc:set_table_value(Headers, Name, Type, Value)
end, Content0),
Msg#basic_message{content = Content}.
+
+peek_fmt_message(#basic_message{exchange_name = Ex,
+ routing_keys = RKeys,
+ content =
+ #content{payload_fragments_rev = Payl0,
+ properties = Props}}) ->
+ Fields = [atom_to_binary(F, utf8) || F <- record_info(fields, 'P_basic')],
+ T = lists:zip(Fields, tl(tuple_to_list(Props))),
+ lists:foldl(
+ fun ({<<"headers">>, Hdrs}, Acc) ->
+ case Hdrs of
+ [] ->
+ Acc;
+ _ ->
+ Acc ++ [{header_key(H), V} || {H, _T, V} <- Hdrs]
+ end;
+ ({_, undefined}, Acc) ->
+ Acc;
+ (KV, Acc) ->
+ [KV | Acc]
+ end, [], [{<<"payload (max 64 bytes)">>,
+ %% restric payload to 64 bytes
+ binary_prefix_64(iolist_to_binary(lists:reverse(Payl0)), 64)},
+ {<<"exchange">>, Ex#resource.name},
+ {<<"routing_keys">>, RKeys} | T]).
+
+header_key(A) ->
+ <<"header.", A/binary>>.
+
+binary_prefix_64(Bin, Len) ->
+ binary:part(Bin, 0, min(byte_size(Bin), Len)).
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index d03a997c85..79b2bb4f72 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -40,6 +40,7 @@
query_stat/1,
query_single_active_consumer/1,
query_in_memory_usage/1,
+ query_peek/2,
usage/1,
zero/1,
@@ -320,7 +321,7 @@ apply(#{index := Index,
end,
{Reply, Effects2} =
case Msg of
- {RaftIdx, {Header, 'empty'}} ->
+ {RaftIdx, {Header, empty}} ->
%% TODO add here new log effect with reply
{'$ra_no_reply',
[reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From) |
@@ -763,20 +764,33 @@ handle_aux(leader, _, garbage_collection, State, Log, _MacState) ->
handle_aux(follower, _, garbage_collection, State, Log, MacState) ->
ra_log_wal:force_roll_over(ra_log_wal),
{no_reply, force_eval_gc(Log, MacState, State), Log};
-handle_aux(_RaState, cast, Cmd, #aux{name = Name,
- utilisation = Use0} = State0,
+handle_aux(_RaState, cast, eval, Aux0, Log, _MacState) ->
+ {no_reply, Aux0, Log};
+handle_aux(_RaState, cast, Cmd, #aux{utilisation = Use0} = Aux0,
+ Log, _MacState)
+ when Cmd == active orelse Cmd == inactive ->
+ {no_reply, Aux0#aux{utilisation = update_use(Use0, Cmd)}, Log};
+handle_aux(_RaState, cast, tick, #aux{name = Name,
+ utilisation = Use0} = State0,
Log, MacState) ->
- State = case Cmd of
- _ when Cmd == active orelse Cmd == inactive ->
- State0#aux{utilisation = update_use(Use0, Cmd)};
- tick ->
- true = ets:insert(rabbit_fifo_usage,
- {Name, utilisation(Use0)}),
- eval_gc(Log, MacState, State0);
- eval ->
- State0
- end,
- {no_reply, State, Log}.
+ true = ets:insert(rabbit_fifo_usage,
+ {Name, utilisation(Use0)}),
+ Aux = eval_gc(Log, MacState, State0),
+ {no_reply, Aux, Log};
+handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
+ Log0, MacState) ->
+ case rabbit_fifo:query_peek(Pos, MacState) of
+ {ok, {Idx, {Header, empty}}} ->
+ %% need to re-hydrate from the log
+ {{_, _, {_, _, Cmd, _}}, Log} = ra_log:fetch(Idx, Log0),
+ #enqueue{msg = Msg} = Cmd,
+ {reply, {ok, {Header, Msg}}, Aux0, Log};
+ {ok, {_Idx, {Header, Msg}}} ->
+ {reply, {ok, {Header, Msg}}, Aux0, Log0};
+ Err ->
+ {reply, Err, Aux0, Log0}
+ end.
+
eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState,
#aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) ->
@@ -896,6 +910,7 @@ query_consumers(#?MODULE{consumers = Consumers,
end, #{}, WaitingConsumers),
maps:merge(FromConsumers, FromWaitingConsumers).
+
query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active},
consumers = Consumers}) ->
case maps:size(Consumers) of
@@ -917,6 +932,18 @@ query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes,
msgs_ready_in_memory = Length}) ->
{Length, Bytes}.
+query_peek(Pos, State0) when Pos > 0 ->
+ case take_next_msg(State0) of
+ empty ->
+ {error, no_message_at_pos};
+ {{_Seq, IdxMsg}, _State}
+ when Pos == 1 ->
+ {ok, IdxMsg};
+ {_Msg, State} ->
+ query_peek(Pos-1, State)
+ end.
+
+
-spec usage(atom()) -> float().
usage(Name) when is_atom(Name) ->
case ets:lookup(rabbit_fifo_usage, Name) of
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 4c87167ea1..3329915125 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -144,7 +144,7 @@
-record(rabbit_fifo,
{cfg :: #cfg{},
% unassigned messages
- messages = lqueue:new() :: lqueue:queue(),
+ messages = lqueue:new() :: lqueue:lqueue(),
% defines the next message id
next_msg_num = 1 :: msg_in_id(),
% queue of returned msg_in_ids - when checking out it picks from
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index bb4c320b1f..33f8166da6 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -23,6 +23,7 @@
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
+-export([peek/2, peek/3]).
-export([add_member/4]).
-export([delete_member/3]).
-export([requeue/3]).
@@ -1345,6 +1346,35 @@ leader(Q) when ?is_amqqueue(Q) ->
false -> ''
end.
+peek(Vhost, Queue, Pos) ->
+ peek(Pos, rabbit_misc:r(Vhost, queue, Queue)).
+
+peek(Pos, #resource{} = QName) ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} ->
+ peek(Pos, Q);
+ Err ->
+ Err
+ end;
+peek(Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_quorum(Q) ->
+ LeaderPid = amqqueue:get_pid(Q),
+ case ra:aux_command(LeaderPid, {peek, Pos}) of
+ {ok, {MsgHeader, Msg0}} ->
+ Count = case MsgHeader of
+ #{delivery_count := C} -> C;
+ _ -> 0
+ end,
+ Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long,
+ Count, Msg0),
+ {ok, rabbit_basic:peek_fmt_message(Msg)};
+ {error, Err} ->
+ {error, Err};
+ Err ->
+ Err
+ end;
+peek(_Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_classic(Q) ->
+ {error, classic_queue_not_supported}.
+
online(Q) when ?is_amqqueue(Q) ->
Nodes = get_nodes(Q),
{Name, _} = amqqueue:get_pid(Q),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 15f3558046..e871e7497d 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -131,7 +131,8 @@ all_tests() ->
invalid_policy,
delete_if_empty,
delete_if_unused,
- queue_ttl
+ queue_ttl,
+ peek
].
memory_tests() ->
@@ -2397,6 +2398,37 @@ queue_length_in_memory_purge(Config) ->
?assertEqual([{0, 0}],
dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+peek(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 2}])),
+
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+
+ QName = rabbit_misc:r(<<"/">>, queue, QQ),
+ ?assertMatch({error, no_message_at_pos},
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
+ peek, [1, QName])),
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+
+ ?assertMatch({ok, [_|_]},
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
+ peek, [1, QName])),
+ ?assertMatch({ok, [_|_]},
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
+ peek, [2, QName])),
+ ?assertMatch({error, no_message_at_pos},
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
+ peek, [3, QName])),
+ ok.
+
in_memory(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 7778e04afb..8431dd8db7 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -1619,6 +1619,18 @@ queue_ttl_with_single_active_consumer_test(_) ->
ok.
+query_peek_test(_) ->
+ State0 = test_init(test),
+ ?assertEqual({error, no_message_at_pos}, rabbit_fifo:query_peek(1, State0)),
+ {State1, _} = enq(1, 1, first, State0),
+ {State2, _} = enq(2, 2, second, State1),
+ ?assertMatch({ok, {_, {_, first}}}, rabbit_fifo:query_peek(1, State1)),
+ ?assertEqual({error, no_message_at_pos}, rabbit_fifo:query_peek(2, State1)),
+ ?assertMatch({ok, {_, {_, first}}}, rabbit_fifo:query_peek(1, State2)),
+ ?assertMatch({ok, {_, {_, second}}}, rabbit_fifo:query_peek(2, State2)),
+ ?assertEqual({error, no_message_at_pos}, rabbit_fifo:query_peek(3, State2)),
+ ok.
+
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).