summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-12 15:24:25 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-12 15:24:25 +0000
commitc8dc2769a5a3ff3f064f3a74779c0cc17149b9ed (patch)
treee260de52024bbfbe8a2d617cef606a61eda0ebc8
parentf91d1b93da97d304643ebd8dc1110180bdf9d46e (diff)
parent4667154f152fb1a262d181121b9fbe23890f6090 (diff)
downloadrabbitmq-server-c8dc2769a5a3ff3f064f3a74779c0cc17149b9ed.tar.gz
merge default into bug20337
All tests pass. This also fixes the rabbit_guid rename.
-rw-r--r--include/rabbit_backing_queue_spec.hrl14
-rw-r--r--src/rabbit_amqqueue.erl54
-rw-r--r--src/rabbit_amqqueue_process.erl343
-rw-r--r--src/rabbit_backing_queue.erl12
-rw-r--r--src/rabbit_basic.erl3
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl27
-rw-r--r--src/rabbit_mirror_queue_slave.erl12
-rw-r--r--src/rabbit_misc.erl14
-rw-r--r--src/rabbit_tests.erl19
-rw-r--r--src/rabbit_variable_queue.erl60
11 files changed, 470 insertions, 94 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 2a8cc13c..918d587a 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -25,6 +25,12 @@
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
+-type(msg_lookup_result() :: {rabbit_types:basic_message(), state()}).
+
+-type(msg_lookup_fun() :: fun((state()) -> msg_lookup_result())).
+
+-type(msg_fun() :: fun((msg_lookup_fun(), state()) -> state())).
+
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(),
@@ -42,12 +48,14 @@
rabbit_types:message_properties(), pid(), state())
-> {undefined, state()}).
-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
--spec(dropwhile/2 ::
- (fun ((rabbit_types:message_properties()) -> boolean()), state())
+-spec(dropwhile/3 ::
+ (fun ((rabbit_types:message_properties()) -> boolean()),
+ msg_fun(), state())
-> state()).
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
--spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
+-spec(ack/3 :: ([ack()], msg_fun(), state()) ->
+ {[rabbit_guid:guid()], state()}).
-spec(requeue/2 :: ([ack()], state())
-> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a7dfd535..d809e570 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -327,34 +327,60 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
Args, RequiredArgs, QueueName,
[<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]).
-check_declare_arguments(QueueName, Args) ->
- [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of
+check_declare_arguments(QueueName = #resource{virtual_host = VHostPath},
+ Args) ->
+ [case Fun(rabbit_misc:table_lookup(Args, Key), Args, VHostPath) of
ok -> ok;
{error, Error} -> rabbit_misc:protocol_error(
precondition_failed,
"invalid arg '~s' for ~s: ~255p",
[Key, rabbit_misc:rs(QueueName), Error])
- end || {Key, Fun} <-
- [{<<"x-expires">>, fun check_integer_argument/2},
- {<<"x-message-ttl">>, fun check_integer_argument/2},
- {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]],
+ end ||
+ {Key, Fun} <-
+ [{<<"x-expires">>, fun check_integer_argument/3},
+ {<<"x-message-ttl">>, fun check_integer_argument/3},
+ {<<"x-ha-policy">>, fun check_ha_policy_argument/3},
+ {<<"x-dead-letter-exchange">>,
+ fun check_exchange_argument/3},
+ {<<"x-dead-letter-routing-key">>,
+ fun check_string_argument/3}]],
ok.
-check_integer_argument(undefined, _Args) ->
+check_string_argument(undefined, _Args, _VHostPath) ->
ok;
-check_integer_argument({Type, Val}, _Args) when Val > 0 ->
+check_string_argument({longstr, _}, _Args, _VHostPath) ->
+ ok;
+check_string_argument({Type, _}, _, _) ->
+ {error, {unacceptable_type, Type}}.
+
+check_integer_argument(undefined, _Args, _VHostPath) ->
+ ok;
+check_integer_argument({Type, Val}, _Args, _VHostPath) when Val > 0 ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
false -> {error, {unacceptable_type, Type}}
end;
-check_integer_argument({_Type, Val}, _Args) ->
+check_integer_argument({_Type, Val}, _Args, _VHostPath) ->
{error, {value_zero_or_less, Val}}.
-check_ha_policy_argument(undefined, _Args) ->
+check_exchange_argument(undefined, Args, _VHostPath) ->
+ case rabbit_misc:table_lookup(Args, <<"x-dead-letter-routing-key">>) of
+ undefined -> ok;
+ _ -> {error, routing_key_but_no_dlx_defined}
+ end;
+check_exchange_argument({longstr, Val}, _Args, VHostPath) ->
+ try rabbit_misc:r(VHostPath, exchange, Val)
+ of _Exchange -> ok
+ catch _:_ -> {error, {invalid_exchange_name, Val}}
+ end;
+check_exchange_argument({Type, _Val}, _Args, _VHostPath) ->
+ {error, {unacceptable_type, Type}}.
+
+check_ha_policy_argument(undefined, _Args, _VHostPath) ->
ok;
-check_ha_policy_argument({longstr, <<"all">>}, _Args) ->
+check_ha_policy_argument({longstr, <<"all">>}, _Args, _VHostPath) ->
ok;
-check_ha_policy_argument({longstr, <<"nodes">>}, Args) ->
+check_ha_policy_argument({longstr, <<"nodes">>}, Args, _VHostPath) ->
case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of
undefined ->
{error, {require, 'x-ha-policy-params'}};
@@ -370,9 +396,9 @@ check_ha_policy_argument({longstr, <<"nodes">>}, Args) ->
{Type, _} ->
{error, {ha_nodes_policy_params_not_array_of_longstr, Type}}
end;
-check_ha_policy_argument({longstr, Policy}, _Args) ->
+check_ha_policy_argument({longstr, Policy}, _Args, _VHostPath) ->
{error, {invalid_ha_policy, Policy}};
-check_ha_policy_argument({Type, _}, _Args) ->
+check_ha_policy_argument({Type, _}, _Args, _VHostPath) ->
{error, {unacceptable_type, Type}}.
list() ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b3a620fa..7ca00298 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -49,7 +49,14 @@
stats_timer,
msg_id_to_channel,
ttl,
- ttl_timer_ref
+ ttl_timer_ref,
+ publish_seqno,
+ unconfirmed_mq,
+ unconfirmed_qm,
+ blocked_op,
+ queue_monitors,
+ dlx,
+ dlx_routing_key
}).
-record(consumer, {tag, ack_required}).
@@ -128,6 +135,13 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
ttl = undefined,
+ dlx = undefined,
+ dlx_routing_key = undefined,
+ publish_seqno = 1,
+ unconfirmed_mq = gb_trees:empty(),
+ unconfirmed_qm = gb_trees:empty(),
+ blocked_op = undefined,
+ queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -149,6 +163,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
expiry_timer_ref = undefined,
ttl = undefined,
+ publish_seqno = 1,
+ unconfirmed_mq = gb_trees:empty(),
+ unconfirmed_qm = gb_trees:empty(),
+ blocked_op = undefined,
+ queue_monitors = dict:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -216,12 +235,22 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
undefined -> State1
end
end, State, [{<<"x-expires">>, fun init_expires/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-dead-letter-exchange">>, fun init_dlx/2},
+ {<<"x-dead-letter-routing-key">>,
+ fun init_dlx_routing_key/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_dlx(DLX, State = #q{q = #amqqueue{name = #resource{
+ virtual_host = VHostPath}}}) ->
+ State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLX)}.
+
+init_dlx_routing_key(RoutingKey, State) ->
+ State#q{dlx_routing_key = RoutingKey}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -456,7 +485,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
{CMs, MTC0}
end
end, {gb_trees:empty(), MTC}, MsgIds),
- rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
+ rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
@@ -494,7 +523,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Confirm = should_confirm_message(Delivery, State),
case Confirm of
- immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ immediately -> rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]);
_ -> ok
end,
case BQ:is_duplicate(Message, BQS) of
@@ -674,10 +703,11 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ backing_queue = BQ}) ->
Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ mk_dead_letter_fun(expired, State),
BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
@@ -694,6 +724,223 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
+mk_dead_letter_fun(_Reason, #q{dlx = undefined}) ->
+ fun(_MsgLookupFun, _AckTag, BQS) -> BQS end;
+mk_dead_letter_fun(Reason, _State) ->
+ fun(MsgLookupFun, AckTag, BQS) ->
+ {Msg, BQS1} = MsgLookupFun(BQS),
+ gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}),
+ BQS1
+ end.
+
+dead_letter_deleted_queue(undefined, State = #q{dlx = undefined}) ->
+ {stop, normal, State};
+dead_letter_deleted_queue(_From, State = #q{dlx = undefined,
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ {stop, normal, {ok, BQ:len(BQS)}, State};
+dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ case BQ:len(BQS) of
+ 0 -> dead_letter_deleted_queue(From, State#q{dlx = undefined});
+ _ -> BQS1 = BQ:dropwhile(fun (_) -> true end,
+ mk_dead_letter_fun(queue_deleted, State),
+ BQS),
+ noreply(State#q{blocked_op = {delete, {From, BQ:len(BQS)}},
+ backing_queue_state = BQS1})
+ end.
+
+dead_letter_msg(Msg, AckTag, Reason,
+ State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed_mq = UMQ,
+ dlx = DLX,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ rabbit_exchange:lookup_or_die(DLX),
+
+ {ok, _, QPids} =
+ rabbit_basic:publish(
+ rabbit_basic:delivery(
+ false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
+ MsgSeqNo)),
+ State1 = lists:foldl(fun monitor_queue/2, State, QPids),
+ State2 = State1#q{publish_seqno = MsgSeqNo + 1},
+ case QPids of
+ [] -> {_, BQS1} = BQ:ack([AckTag], undefined, BQS),
+ cleanup_after_confirm(State2#q{backing_queue_state = BQS1});
+ _ -> State3 =
+ lists:foldl(
+ fun(QPid, State0 = #q{unconfirmed_qm = UQM}) ->
+ case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo,
+ MsgSeqNos),
+ UQM1 = gb_trees:update(QPid, MsgSeqNos1,
+ UQM),
+ State0#q{unconfirmed_qm = UQM1};
+ none ->
+ S = gb_sets:singleton(MsgSeqNo),
+ UQM1 = gb_trees:insert(QPid, S, UQM),
+ State0#q{unconfirmed_qm = UQM1}
+ end
+ end, State2, QPids),
+ noreply(State3#q{
+ unconfirmed_mq =
+ gb_trees:insert(
+ MsgSeqNo, {gb_sets:from_list(QPids),
+ AckTag}, UMQ)})
+ end.
+
+monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
+ case dict:is_key(QPid, QMons) of
+ true -> State;
+ false -> State#q{queue_monitors =
+ dict:store(QPid, erlang:monitor(process, QPid),
+ QMons)}
+ end.
+
+demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
+ case dict:find(QPid, QMons) of
+ {ok, MRef} -> erlang:demonitor(MRef),
+ State#q{queue_monitors = dict:erase(QPid, QMons)};
+ error -> State
+ end.
+
+handle_queue_down(QPid, State = #q{queue_monitors = QMons,
+ unconfirmed_mq = UMQ}) ->
+ case dict:find(QPid, QMons) of
+ error ->
+ noreply(State);
+ {ok, _} ->
+ #resource{name = QName} = qname(State),
+ rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]),
+ MsgSeqNos = [MsgSeqNo ||
+ {MsgSeqNo, {QPids, _}} <- gb_trees:to_list(UMQ),
+ gb_sets:is_member(QPid, QPids)],
+ handle_confirm(MsgSeqNos, QPid,
+ State#q{queue_monitors = dict:erase(QPid, QMons)})
+ end.
+
+handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {BQS3, UMQ3} =
+ lists:foldl(
+ fun (MsgSeqNo, {BQS1, UMQ1}) ->
+ {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1),
+ QPids1 = gb_sets:delete(QPid, QPids),
+ case gb_sets:is_empty(QPids1) of
+ true -> {_Guids, BQS2} =
+ BQ:ack([AckTag], undefined, BQS1),
+ {BQS2, gb_trees:delete(MsgSeqNo, UMQ1)};
+ false -> {BQS1, gb_trees:update(MsgSeqNo,
+ {QPids1, AckTag}, UMQ1)}
+ end
+ end, {BQS, UMQ}, MsgSeqNos),
+ MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM),
+ gb_sets:from_list(MsgSeqNos)),
+ State1 = case gb_sets:is_empty(MsgSeqNos1) of
+ false -> State#q{
+ unconfirmed_qm =
+ gb_trees:update(QPid, MsgSeqNos1, UQM)};
+ true -> demonitor_queue(
+ QPid, State#q{
+ unconfirmed_qm =
+ gb_trees:delete(QPid, UQM)})
+ end,
+ cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
+ backing_queue_state = BQS3}).
+
+cleanup_after_confirm(State = #q{blocked_op = Op,
+ unconfirmed_mq = UMQ}) ->
+ State1 = State#q{blocked_op = undefined},
+ case {gb_trees:is_empty(UMQ), Op} of
+ {true, {purge, {From, Count}}} ->
+ gen_server2:reply(From, {ok, Count}),
+ noreply(State1);
+ {true, {delete, {From, Count}}} ->
+ case From of
+ undefined -> ok;
+ _ -> gen_server2:reply(From, {ok, Count})
+ end,
+ {stop, normal, State1};
+ _ ->
+ noreply(State1#q{blocked_op = Op})
+ end.
+
+already_been_here(#delivery{message = #basic_message{content = Content}},
+ State) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ #resource{name = QueueName} = qname(State),
+ case Headers of
+ undefined ->
+ false;
+ _ ->
+ case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
+ {array, DeathTables} ->
+ OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
+ {table, D} <- DeathTables],
+ OldQueues1 = [QName || {longstr, QName} <- OldQueues],
+ case lists:member(QueueName, OldQueues1) of
+ true -> [QueueName | OldQueues1];
+ _ -> false
+ end;
+ _ ->
+ false
+ end
+ end.
+
+make_dead_letter_msg(DLX, Reason,
+ Msg = #basic_message{content = Content,
+ exchange_name = Exchange,
+ routing_keys = RoutingKeys},
+ State = #q{dlx_routing_key = DlxRoutingKey}) ->
+ Content1 = #content{
+ properties = Props = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+
+ #resource{name = QName} = qname(State),
+
+ %% The first routing key is the one specified in the
+ %% basic.publish; all others are CC or BCC keys.
+ RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ DeathTable = {table, [{<<"reason">>, longstr,
+ list_to_binary(atom_to_list(Reason))},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp,
+ rabbit_misc:now_ms() div 1000},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array,
+ [{longstr, Key} || Key <- RoutingKeys1]}]},
+ Headers1 =
+ case Headers of
+ undefined ->
+ [{<<"x-death">>, array, [DeathTable]}];
+ _ ->
+ case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
+ {array, Prior} ->
+ rabbit_misc:set_table_value(
+ Headers, <<"x-death">>, array,
+ [DeathTable | Prior]);
+ _ ->
+ [{<<"x-death">>, array, [DeathTable]} | Headers]
+ end
+ end,
+ {DeathRoutingKeys, Headers2} =
+ case DlxRoutingKey of
+ undefined -> {RoutingKeys, Headers1};
+ _ -> {[DlxRoutingKey],
+ rabbit_misc:remove_table_value(Headers1, <<"CC">>)}
+ end,
+ Content2 =
+ rabbit_binary_generator:clear_encoded_content(
+ Content1#content{properties = Props#'P_basic'{headers = Headers2}}),
+ Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(),
+ routing_keys = DeathRoutingKeys, content = Content2}.
+
+
now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) ->
@@ -988,7 +1235,7 @@ handle_call(stat, _From, State) ->
drop_expired_messages(ensure_expiry_timer(State)),
reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
-handle_call({delete, IfUnused, IfEmpty}, _From,
+handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
@@ -998,14 +1245,24 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IfUnused and not(IsUnused) ->
reply({error, in_use}, State);
true ->
- {stop, normal, {ok, BQ:len(BQS)}, State}
+ dead_letter_deleted_queue(From, State)
end;
-handle_call(purge, _From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+handle_call(purge, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ dlx = undefined}) ->
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
+handle_call(purge, From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = BQ:dropwhile(
+ fun (_) -> true end,
+ mk_dead_letter_fun(queue_purged, State),
+ BQS),
+ noreply(State#q{backing_queue_state = BQS1,
+ blocked_op = {purge, {From, BQ:len(BQS)}}});
+
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(subtract_acks(
@@ -1015,25 +1272,49 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender,
+ msg_seq_no = MsgSeqNo},
+ Flow}, State = #q{dlx = DLX}) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- case Flow of
- flow -> Key = {ch_publisher, Sender},
- case get(Key) of
- undefined -> put(Key, erlang:monitor(process, Sender));
- _ -> ok
- end,
- credit_flow:ack(Sender);
- noflow -> ok
- end,
- noreply(deliver_or_enqueue(Delivery, State));
+ ShouldDeliver =
+ case DLX of
+ undefined ->
+ true;
+ _ ->
+ case already_been_here(Delivery, State) of
+ false -> true;
+ Qs -> rabbit_log:warning(
+ "Message dropped. Dead-letter queues " ++
+ "cycle detected: ~p~n", [Qs]),
+ rabbit_misc:confirm_to_sender(Sender,
+ [MsgSeqNo]),
+ false
+ end
+ end,
+ case ShouldDeliver of
+ false -> noreply(State);
+ true -> case Flow of
+ flow ->
+ Key = {ch_publisher, Sender},
+ case get(Key) of
+ undefined -> put(Key, erlang:monitor(process,
+ Sender));
+ _ -> ok
+ end,
+ credit_flow:ack(Sender);
+ noflow ->
+ ok
+ end,
+ noreply(deliver_or_enqueue(Delivery, State))
+ end;
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ {_Guids, BQS1} =
+ BQ:ack(AckTags, undefined, BQS),
State1#q{backing_queue_state = BQS1}
end));
@@ -1044,13 +1325,15 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
backing_queue_state = BQS}) ->
case Requeue of
true -> requeue_and_run(AckTags, State1);
- false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ false -> Fun = mk_dead_letter_fun(rejected, State),
+ {_Guids, BQS1} =
+ BQ:ack(AckTags, Fun, BQS),
State1#q{backing_queue_state = BQS1}
end
end));
handle_cast(delete_immediately, State) ->
- {stop, normal, State};
+ dead_letter_deleted_queue(undefined, State);
handle_cast({unblock, ChPid}, State) ->
noreply(
@@ -1101,11 +1384,17 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
emit_consumer_created(Ch, CTag, true, AckRequired)
end,
- noreply(State).
+ noreply(State);
+
+handle_cast({confirm, MsgSeqNos, QPid}, State) ->
+ handle_confirm(MsgSeqNos, QPid, State);
+
+handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
+ dead_letter_msg(Msg, AckTag, Reason, State).
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> {stop, normal, State};
+ true -> dead_letter_deleted_queue(undefined, State);
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1130,8 +1419,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
{stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
- {ok, NewState} -> noreply(NewState);
- {stop, NewState} -> {stop, normal, NewState}
+ {ok, State1} -> handle_queue_down(DownPid, State1);
+ {stop, State1} -> {stop, normal, State1}
end;
handle_info(update_ram_duration, State = #q{backing_queue = BQ,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 364eb8f6..50e47462 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -95,15 +95,19 @@ behaviour_info(callbacks) ->
{drain_confirmed, 1},
%% Drop messages from the head of the queue while the supplied
- %% predicate returns true.
- {dropwhile, 2},
+ %% predicate returns true. A callback function is supplied
+ %% allowing callers access to messages that are about to be
+ %% dropped.
+ {dropwhile, 3},
%% Produce the next message.
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about. Must return 1 msg_id per Ack, in the same order as Acks.
- {ack, 2},
+ %% about. Must return 1 msg_id per Ack, in the same order as
+ %% Acks. A callback function is supplied allowing callers to
+ %% access messages that are being acked.
+ {ack, 3},
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index b8211d43..2777714f 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -19,7 +19,8 @@
-include("rabbit_framing.hrl").
-export([publish/4, publish/6, publish/1,
- message/3, message/4, properties/1, delivery/4]).
+ message/3, message/4, properties/1, delivery/4,
+ header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a101886f..3098b621 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,7 +21,7 @@
-behaviour(gen_server2).
-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2, confirm/2]).
+-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
@@ -87,7 +87,6 @@
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -134,9 +133,6 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-confirm(Pid, MsgSeqNos) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
-
list() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
rabbit_channel, list_local, []).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 64a4a737..8d7b9ded 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,8 +17,8 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/3,
+ requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3]).
@@ -172,12 +172,13 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 })}.
-dropwhile(Fun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+dropwhile(Pred, MsgFun,
+ State = #state{gm = GM,
+ backing_queue = BQ,
+ set_delivered = SetDelivered,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- BQS1 = BQ:dropwhile(Fun, BQS),
+ BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
Dropped = Len - BQ:len(BQS1),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
@@ -235,15 +236,15 @@ fetch(AckRequired, State = #state { gm = GM,
ack_msg_id = AM1 }}
end.
-ack(AckTags, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
- {MsgIds, BQS1} = BQ:ack(AckTags, BQS),
+ack(AckTags, MsgFun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
+ {MsgIds, BQS1} = BQ:ack(AckTags, MsgFun, BQS),
AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
case MsgIds of
[] -> ok;
- _ -> ok = gm:broadcast(GM, {ack, MsgIds})
+ _ -> ok = gm:broadcast(GM, {ack, MsgFun, MsgIds})
end,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9bf89bce..29a2e8bd 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -430,7 +430,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
Acc
end
end, {gb_trees:empty(), MS}, MsgIds),
- rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
+ rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State #state { msg_id_status = MS1 }.
handle_process_result({ok, State}) -> noreply(State);
@@ -665,7 +665,7 @@ maybe_enqueue_message(
{ok, {confirmed, ChPid}} ->
%% BQ has confirmed it but we didn't know what the
%% msg_seq_no was at the time. We do now!
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { sender_queues = SQ1,
msg_id_status = dict:erase(MsgId, MS) };
@@ -682,7 +682,7 @@ maybe_enqueue_message(
msg_id_status =
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
immediately ->
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 }
@@ -744,7 +744,7 @@ process_instruction(
{MQ2, PendingCh,
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
immediately ->
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
{MQ2, PendingCh, MS}
end;
{{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
@@ -834,12 +834,12 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
%% we must be shorter than the master
State
end};
-process_instruction({ack, MsgIds},
+process_instruction({ack, MsgFun, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
+ {MsgIds1, BQS1} = BQ:ack(AckTags, MsgFun, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 9a6879b1..e4f8b687 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -23,11 +23,12 @@
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
--export([table_lookup/2, set_table_value/4]).
+-export([table_lookup/2, set_table_value/4, remove_table_value/2]).
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
+-export([confirm_to_sender/2]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
@@ -108,6 +109,8 @@
(rabbit_framing:amqp_table(), binary(),
rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value())
-> rabbit_framing:amqp_table()).
+-spec(remove_table_value/2 ::
+ (rabbit_framing:amqp_table(), binary()) -> rabbit_framing:amqp_table()).
-spec(r/2 :: (rabbit_types:vhost(), K)
-> rabbit_types:r3(rabbit_types:vhost(), K, '_')
@@ -299,6 +302,12 @@ set_table_value(Table, Key, Type, Value) ->
sort_field_table(
lists:keystore(Key, 1, Table, {Key, Type, Value})).
+remove_table_value(Table, Key) ->
+ case lists:keytake(Key, 1, Table) of
+ false -> Table;
+ {value, _, Table2} -> Table2
+ end.
+
r(#resource{virtual_host = VHostPath}, Kind, Name)
when is_binary(Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name};
@@ -372,6 +381,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
end,
Mod]).
+confirm_to_sender(Pid, MsgSeqNos) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
+
throw_on_error(E, Thunk) ->
case Thunk() of
{error, Reason} -> throw({E, Reason});
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7a96af26..ba0fffd6 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2353,7 +2353,9 @@ test_dropwhile(VQ0) ->
VQ2 = rabbit_variable_queue:dropwhile(
fun(#message_properties { expiry = Expiry }) ->
Expiry =< 5
- end, VQ1),
+ end,
+ dummy_msg_fun(),
+ VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2367,13 +2369,17 @@ test_dropwhile(VQ0) ->
VQ4.
+dummy_msg_fun() -> fun(_Fun, _Extra, State) -> State end.
+
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- VQ3 = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2),
+ VQ3 = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, dummy_msg_fun(), VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5).
+ rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, dummy_msg_fun(), VQ5).
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -2398,7 +2404,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, undefined, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2408,7 +2414,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], undefined, VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2442,7 +2448,8 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1,
+ undefined, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 52eb168a..64285be9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/2, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
+ dropwhile/3, fetch/2, ack/3, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
@@ -581,15 +581,22 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, State) ->
+dropwhile(Pred, MsgFun, State) ->
case queue_out(State) of
{empty, State1} ->
a(State1);
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case Pred(MsgProps) of
- true -> {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, State2);
- false -> a(in_r(MsgStatus, State1))
+ case {Pred(MsgProps), MsgFun} of
+ {true, undefined} ->
+ {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile(Pred, MsgFun, State2);
+ {true, _} ->
+ {{_, _, AckTag, _}, State2} =
+ internal_fetch(true, MsgStatus, State1),
+ State3 = MsgFun(read_msg_callback(MsgStatus), AckTag, State2),
+ dropwhile(Pred, MsgFun, State3);
+ {false, _} ->
+ a(in_r(MsgStatus, State1))
end
end.
@@ -605,9 +612,27 @@ fetch(AckRequired, State) ->
{Res, a(State3)}
end.
-ack([], State) ->
+read_msg_callback(#msg_status { msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent }) ->
+ fun(State) -> read_msg_callback1(MsgId, IsPersistent, State) end;
+
+read_msg_callback(#msg_status{ msg = Msg }) ->
+ fun(State) -> {Msg, State} end;
+
+read_msg_callback({IsPersistent, MsgId, _MsgProps}) ->
+ fun(State) -> read_msg_callback1(MsgId, IsPersistent, State) end.
+
+read_msg_callback1(MsgId, IsPersistent,
+ State = #vqstate{ msg_store_clients = MSCState }) ->
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ {Msg, State #vqstate { msg_store_clients = MSCState1 }}.
+
+ack([], _Fun, State) ->
{[], State};
-ack(AckTags, State) ->
+
+ack(AckTags, undefined, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
@@ -616,7 +641,7 @@ ack(AckTags, State) ->
lists:foldl(
fun (SeqId, {Acc, State2}) ->
{MsgStatus, State3} = remove_pending_ack(SeqId, State2),
- {accumulate_ack(MsgStatus, Acc), State3}
+ {accumulate_ack(MsgStatus, Acc), State3}
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
@@ -626,13 +651,20 @@ ack(AckTags, State) ->
{lists:reverse(AllMsgIds),
a(State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) })}.
+ ack_out_counter = AckOutCount + length(AckTags) })};
+
+ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) ->
+ State2 = lists:foldl(fun(SeqId, State1) ->
+ AckEntry = gb_trees:get(SeqId, PA),
+ MsgFun(read_msg_callback(AckEntry), SeqId, State1)
+ end, State, AckTags),
+ {[], State2}.
requeue(AckTags, #vqstate { delta = Delta,
- q3 = Q3,
- q4 = Q4,
- in_counter = InCounter,
- len = Len } = State) ->
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
{SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
beta_limit(Q3),
fun publish_alpha/2, State),