summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/dtree.erl112
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl41
-rw-r--r--src/rabbit_basic.erl9
-rw-r--r--src/rabbit_channel.erl11
-rw-r--r--src/rabbit_mirror_queue_master.erl13
-rw-r--r--src/rabbit_mirror_queue_slave.erl5
-rw-r--r--src/rabbit_misc.erl5
-rw-r--r--src/rabbit_msg_store.erl3
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_variable_queue.erl11
11 files changed, 161 insertions, 76 deletions
diff --git a/src/dtree.erl b/src/dtree.erl
index 265bb340..0e0785bf 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -16,23 +16,23 @@
%% A dual-index tree.
%%
-%% Conceptually, what we want is a map that has two distinct sets of
-%% keys (referred to here as primary and secondary, although that
-%% shouldn't imply a hierarchy) pointing to one set of
-%% values. However, in practice what we'll always want to do is insert
-%% a value that's pointed at by (one primary, many secondaries) and
-%% remove values that are pointed at by (one secondary, many
-%% primaries) or (one secondary, all primaries). Thus the API.
+%% Entries have the following shape:
%%
-%% Entries exists while they have a non-empty secondary key set. The
-%% 'take' operations return the entries that got removed, i.e. that
-%% had no remaining secondary keys. take/3 expects entries to exist
-%% with the supplied primary keys and secondary key. take/2 can cope
-%% with the supplied secondary key having no entries.
+%% +----+--------------------+---+
+%% | PK | SK1, SK2, ..., SKN | V |
+%% +----+--------------------+---+
+%%
+%% i.e. a primary key, set of secondary keys, and a value.
+%%
+%% There can be only one entry per primary key, but secondary keys may
+%% appear in multiple entries.
+%%
+%% The set of secondary keys must be non-empty. Or, to put it another
+%% way, entries only exist while their secondary key set is non-empty.
-module(dtree).
--export([empty/0, insert/4, take/3, take/2,
+-export([empty/0, insert/4, take/3, take/2, take_all/2,
is_defined/2, is_empty/1, smallest/1, size/1]).
%%----------------------------------------------------------------------------
@@ -52,6 +52,7 @@
-spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()).
-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-spec(smallest/1 :: (?MODULE()) -> kv()).
@@ -63,6 +64,12 @@
empty() -> {gb_trees:empty(), gb_trees:empty()}.
+%% Insert an entry. Fails if there already is an entry with the given
+%% primary key.
+insert(PK, [], V, {P, S}) ->
+ %% dummy insert to force error if PK exists
+ gb_trees:insert(PK, {gb_sets:empty(), V}, P),
+ {P, S};
insert(PK, SKs, V, {P, S}) ->
{gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P),
lists:foldl(fun (SK, S0) ->
@@ -74,21 +81,45 @@ insert(PK, SKs, V, {P, S}) ->
end
end, S, SKs)}.
+%% Remove the given secondary key from the entries of the given
+%% primary keys, returning the primary-key/value pairs of any entries
+%% that were dropped as the result (i.e. due to their secondary key
+%% set becoming empty). It is ok for the given primary keys and/or
+%% secondary key to not exist.
take(PKs, SK, {P, S}) ->
- {KVs, P1} = take2(PKs, SK, P),
- PKS = gb_sets:difference(gb_trees:get(SK, S), gb_sets:from_list(PKs)),
- {KVs, {P1, case gb_sets:is_empty(PKS) of
- true -> gb_trees:delete(SK, S);
- false -> gb_trees:update(SK, PKS, S)
- end}}.
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> TakenPKS = gb_sets:from_list(PKs),
+ PKSInter = gb_sets:intersection(PKS, TakenPKS),
+ PKSDiff = gb_sets_difference_unsafe(PKS, TakenPKS),
+ {KVs, P1} = take2(PKSInter, SK, P),
+ {KVs, {P1, case gb_sets:is_empty(PKSDiff) of
+ true -> gb_trees:delete(SK, S);
+ false -> gb_trees:update(SK, PKSDiff, S)
+ end}}
+ end.
+%% Remove the given secondary key from all entries, returning the
+%% primary-key/value pairs of any entries that were dropped as the
+%% result (i.e. due to their secondary key set becoming empty). It is
+%% ok for the given secondary key to not exist.
take(SK, {P, S}) ->
case gb_trees:lookup(SK, S) of
none -> {[], {P, S}};
- {value, PKS} -> {KVs, P1} = take2(gb_sets:to_list(PKS), SK, P),
+ {value, PKS} -> {KVs, P1} = take2(PKS, SK, P),
{KVs, {P1, gb_trees:delete(SK, S)}}
end.
+%% Drop all entries which contain the given secondary key, returning
+%% the primary-key/value pairs of these entries. It is ok for the
+%% given secondary key to not exist.
+take_all(SK, {P, S}) ->
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P),
+ {KVs, {P1, prune(SKS, PKS, S)}}
+ end.
+
is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
is_empty({P, _S}) -> gb_trees:is_empty(P).
@@ -100,12 +131,35 @@ size({P, _S}) -> gb_trees:size(P).
%%----------------------------------------------------------------------------
-take2(PKs, SK, P) ->
- lists:foldl(fun (PK, {KVs, P0}) ->
- {SKS, V} = gb_trees:get(PK, P0),
- SKS1 = gb_sets:delete(SK, SKS),
- case gb_sets:is_empty(SKS1) of
- true -> {[{PK, V} | KVs], gb_trees:delete(PK, P0)};
- false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)}
- end
- end, {[], P}, PKs).
+take2(PKS, SK, P) ->
+ gb_sets:fold(fun (PK, {KVs, P0}) ->
+ {SKS, V} = gb_trees:get(PK, P0),
+ SKS1 = gb_sets:delete(SK, SKS),
+ case gb_sets:is_empty(SKS1) of
+ true -> KVs1 = [{PK, V} | KVs],
+ {KVs1, gb_trees:delete(PK, P0)};
+ false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)}
+ end
+ end, {[], P}, PKS).
+
+take_all2(PKS, P) ->
+ gb_sets:fold(fun (PK, {KVs, SKS0, P0}) ->
+ {SKS, V} = gb_trees:get(PK, P0),
+ {[{PK, V} | KVs], gb_sets:union(SKS, SKS0),
+ gb_trees:delete(PK, P0)}
+ end, {[], gb_sets:empty(), P}, PKS).
+
+prune(SKS, PKS, S) ->
+ gb_sets:fold(fun (SK0, S0) ->
+ PKS1 = gb_trees:get(SK0, S0),
+ PKS2 = gb_sets_difference_unsafe(PKS1, PKS),
+ case gb_sets:is_empty(PKS2) of
+ true -> gb_trees:delete(SK0, S0);
+ false -> gb_trees:update(SK0, PKS2, S0)
+ end
+ end, S, SKS).
+
+%% This function assumes that all the elements we're deleting from the
+%% first set are present.
+gb_sets_difference_unsafe(S1, S2) ->
+ lists:foldl(fun gb_sets:delete/2, S1, gb_sets:to_list(S2)).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9ecbcbc3..75091692 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -331,7 +331,7 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
check_declare_arguments(QueueName, Args) ->
Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
- {<<"x-message-ttl">>, fun check_positive_int_arg/2},
+ {<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
{<<"x-ha-policy">>, fun check_ha_policy_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
@@ -353,11 +353,24 @@ check_string_arg({longstr, _}, _Args) ->
check_string_arg({Type, _}, _) ->
{error, {unacceptable_type, Type}}.
-check_positive_int_arg({Type, Val}, _Args) ->
+check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
- false -> {error, {unacceptable_type, Type}};
- true when Val =< 0 -> {error, {value_zero_or_less, Val}};
- true -> ok
+ true -> ok;
+ false -> {error, {unacceptable_type, Type}}
+ end.
+
+check_positive_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val > 0 -> ok;
+ ok -> {error, {value_zero_or_less, Val}};
+ Error -> Error
+ end.
+
+check_non_neg_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_less_than_zero, Val}};
+ Error -> Error
end.
check_dlxrk_arg({longstr, _}, Args) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 88a014b5..222045b2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -538,17 +538,25 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, State) ->
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ msg_seq_no = MsgSeqNo,
+ sender = SenderPid}, State) ->
Confirm = should_confirm_message(Delivery, State),
- {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- maybe_record_confirm_message(Confirm, State1),
- case Delivered of
- true -> State2;
- false -> Props = message_properties(Confirm, State),
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ case attempt_delivery(Delivery, Confirm, State) of
+ {true, State1} ->
+ maybe_record_confirm_message(Confirm, State1);
+ %% the next two are optimisations
+ {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
+ discard_delivery(Delivery, State1);
+ {false, State1 = #q{ttl = 0, dlx = undefined}} ->
+ rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
+ discard_delivery(Delivery, State1);
+ {false, State1} ->
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_record_confirm_message(Confirm, State1),
+ Props = message_properties(Confirm, State2),
+ BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -757,15 +765,14 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
error ->
noreply(State);
{ok, _} ->
- rabbit_log:info("DLQ ~p (for ~s) died~n",
- [QPid, rabbit_misc:rs(qname(State))]),
- {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
- case (MsgSeqNoAckTags =/= [] andalso
- rabbit_misc:is_abnormal_termination(Reason)) of
- true -> rabbit_log:warning("Dead queue lost ~p messages~n",
- [length(MsgSeqNoAckTags)]);
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
+ rabbit_log:warning(
+ "DLQ ~p for ~s died with ~p unconfirmed messages~n",
+ [QPid, rabbit_misc:rs(qname(State)), length(Lost)]);
false -> ok
end,
+ {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
cleanup_after_confirm(
[AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
State#q{queue_monitors = dict:erase(QPid, QMons),
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index cc876cb4..8ad59016 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -20,7 +20,7 @@
-export([publish/4, publish/6, publish/1,
message/3, message/4, properties/1, append_table_header/3,
- map_headers/2, delivery/4, header_routes/1]).
+ extract_headers/1, map_headers/2, delivery/4, header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -61,6 +61,8 @@
-spec(append_table_header/3 ::
(binary(), rabbit_framing:amqp_table(), headers()) -> headers()).
+-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
+
-spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers()))
-> rabbit_types:content()).
@@ -186,6 +188,11 @@ append_table_header(Name, Info, Headers) ->
end,
rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
+extract_headers(Content) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Headers.
+
map_headers(F, Content) ->
Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
#content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4a0e93be..0c1c11d8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1116,11 +1116,12 @@ monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
end.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(QPid, UC),
- (case rabbit_misc:is_abnormal_termination(Reason) of
- true -> fun send_nacks/2;
- false -> fun record_confirms/2
- end)(MXs, State#ch{unconfirmed = UC1}).
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {MXs, UC1} = dtree:take_all(QPid, UC),
+ send_nacks(MXs, State#ch{unconfirmed = UC1});
+ false -> {MXs, UC1} = dtree:take(QPid, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1})
+ end.
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index bfdab487..a009d2b9 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -177,11 +177,12 @@ dropwhile(Pred, MsgFun,
backing_queue = BQ,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
- Len = BQ:len(BQS),
+ Len = BQ:len(BQS),
BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
- Dropped = Len - BQ:len(BQS1),
+ Len1 = BQ:len(BQS1),
+ ok = gm:broadcast(GM, {set_length, Len1}),
+ Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 }.
@@ -241,20 +242,18 @@ ack(AckTags, State = #state { gm = GM,
backing_queue_state = BQS,
ack_msg_id = AM }) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
+ AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
fold(MsgFun, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS}, AckTags) ->
- BQS1 = BQ:fold(MsgFun, BQS, AckTags),
- ok = gm:broadcast(GM, {fold, MsgFun, AckTags}),
- State #state { backing_queue_state = BQS1 }.
+ State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 98a80a26..eb1da1e8 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -843,11 +843,6 @@ process_instruction({ack, MsgIds},
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
-process_instruction({fold, MsgFun, AckTags},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- BQS1 = BQ:fold(AckTags, MsgFun, BQS),
- {ok, State #state { backing_queue_state = BQS1 }};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c1be7613..e46c7150 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -60,6 +60,7 @@
-export([multi_call/2]).
-export([quit/1]).
-export([os_cmd/1]).
+-export([gb_sets_difference/2]).
%%----------------------------------------------------------------------------
@@ -204,6 +205,7 @@
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(quit/1 :: (integer() | string()) -> no_return()).
-spec(os_cmd/1 :: (string()) -> string()).
+-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
-endif.
@@ -912,3 +914,6 @@ os_cmd(Command) ->
false -> throw({command_not_found, Exec});
_ -> os:cmd(Command)
end.
+
+gb_sets_difference(S1, S2) ->
+ lists:foldl(fun gb_sets:delete_any/2, S1, gb_sets:to_list(S2)).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 56265136..8ffd5b13 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1240,7 +1240,8 @@ client_confirm(CRef, MsgIds, ActionTaken, State) ->
case dict:find(CRef, CTM) of
{ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds),
ActionTaken),
- MsgIds1 = gb_sets:difference(Gs, MsgIds),
+ MsgIds1 =
+ rabbit_misc:gb_sets_difference(Gs, MsgIds),
case gb_sets:is_empty(MsgIds1) of
true -> dict:erase(CRef, CTM);
false -> dict:store(CRef, MsgIds1, CTM)
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e356d7ff..c74b8d5f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2306,8 +2306,8 @@ wait_for_confirms(Unconfirmed) ->
true -> ok;
false -> receive {'$gen_cast', {confirm, Confirmed, _}} ->
wait_for_confirms(
- gb_sets:difference(Unconfirmed,
- gb_sets:from_list(Confirmed)))
+ rabbit_misc:gb_sets_difference(
+ Unconfirmed, gb_sets:from_list(Confirmed)))
after 5000 -> exit(timeout_waiting_for_confirm)
end
end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 46f6d6c1..6e8a1cca 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1291,10 +1291,13 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC,
confirmed = C }) ->
- State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet),
- msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet),
- unconfirmed = gb_sets:difference(UC, MsgIdSet),
- confirmed = gb_sets:union (C, MsgIdSet) }.
+ State #vqstate { msgs_on_disk =
+ rabbit_misc:gb_sets_difference(MOD, MsgIdSet),
+ msg_indices_on_disk =
+ rabbit_misc:gb_sets_difference(MIOD, MsgIdSet),
+ unconfirmed =
+ rabbit_misc:gb_sets_difference(UC, MsgIdSet),
+ confirmed = gb_sets:union(C, MsgIdSet) }.
must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->