diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-12 15:24:25 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-12 15:24:25 +0000 |
commit | c8dc2769a5a3ff3f064f3a74779c0cc17149b9ed (patch) | |
tree | e260de52024bbfbe8a2d617cef606a61eda0ebc8 | |
parent | f91d1b93da97d304643ebd8dc1110180bdf9d46e (diff) | |
parent | 4667154f152fb1a262d181121b9fbe23890f6090 (diff) | |
download | rabbitmq-server-c8dc2769a5a3ff3f064f3a74779c0cc17149b9ed.tar.gz |
merge default into bug20337
All tests pass. This also fixes the rabbit_guid rename.
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 14 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 54 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 343 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 12 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 3 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 27 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 12 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 14 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 19 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 60 |
11 files changed, 470 insertions, 94 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 2a8cc13c..918d587a 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -25,6 +25,12 @@ -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). +-type(msg_lookup_result() :: {rabbit_types:basic_message(), state()}). + +-type(msg_lookup_fun() :: fun((state()) -> msg_lookup_result())). + +-type(msg_fun() :: fun((msg_lookup_fun(), state()) -> state())). + -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(), @@ -42,12 +48,14 @@ rabbit_types:message_properties(), pid(), state()) -> {undefined, state()}). -spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). --spec(dropwhile/2 :: - (fun ((rabbit_types:message_properties()) -> boolean()), state()) +-spec(dropwhile/3 :: + (fun ((rabbit_types:message_properties()) -> boolean()), + msg_fun(), state()) -> state()). -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). --spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). +-spec(ack/3 :: ([ack()], msg_fun(), state()) -> + {[rabbit_guid:guid()], state()}). -spec(requeue/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(len/1 :: (state()) -> non_neg_integer()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a7dfd535..d809e570 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -327,34 +327,60 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). -check_declare_arguments(QueueName, Args) -> - [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of +check_declare_arguments(QueueName = #resource{virtual_host = VHostPath}, + Args) -> + [case Fun(rabbit_misc:table_lookup(Args, Key), Args, VHostPath) of ok -> ok; {error, Error} -> rabbit_misc:protocol_error( precondition_failed, "invalid arg '~s' for ~s: ~255p", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- - [{<<"x-expires">>, fun check_integer_argument/2}, - {<<"x-message-ttl">>, fun check_integer_argument/2}, - {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]], + end || + {Key, Fun} <- + [{<<"x-expires">>, fun check_integer_argument/3}, + {<<"x-message-ttl">>, fun check_integer_argument/3}, + {<<"x-ha-policy">>, fun check_ha_policy_argument/3}, + {<<"x-dead-letter-exchange">>, + fun check_exchange_argument/3}, + {<<"x-dead-letter-routing-key">>, + fun check_string_argument/3}]], ok. -check_integer_argument(undefined, _Args) -> +check_string_argument(undefined, _Args, _VHostPath) -> ok; -check_integer_argument({Type, Val}, _Args) when Val > 0 -> +check_string_argument({longstr, _}, _Args, _VHostPath) -> + ok; +check_string_argument({Type, _}, _, _) -> + {error, {unacceptable_type, Type}}. + +check_integer_argument(undefined, _Args, _VHostPath) -> + ok; +check_integer_argument({Type, Val}, _Args, _VHostPath) when Val > 0 -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; false -> {error, {unacceptable_type, Type}} end; -check_integer_argument({_Type, Val}, _Args) -> +check_integer_argument({_Type, Val}, _Args, _VHostPath) -> {error, {value_zero_or_less, Val}}. -check_ha_policy_argument(undefined, _Args) -> +check_exchange_argument(undefined, Args, _VHostPath) -> + case rabbit_misc:table_lookup(Args, <<"x-dead-letter-routing-key">>) of + undefined -> ok; + _ -> {error, routing_key_but_no_dlx_defined} + end; +check_exchange_argument({longstr, Val}, _Args, VHostPath) -> + try rabbit_misc:r(VHostPath, exchange, Val) + of _Exchange -> ok + catch _:_ -> {error, {invalid_exchange_name, Val}} + end; +check_exchange_argument({Type, _Val}, _Args, _VHostPath) -> + {error, {unacceptable_type, Type}}. + +check_ha_policy_argument(undefined, _Args, _VHostPath) -> ok; -check_ha_policy_argument({longstr, <<"all">>}, _Args) -> +check_ha_policy_argument({longstr, <<"all">>}, _Args, _VHostPath) -> ok; -check_ha_policy_argument({longstr, <<"nodes">>}, Args) -> +check_ha_policy_argument({longstr, <<"nodes">>}, Args, _VHostPath) -> case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of undefined -> {error, {require, 'x-ha-policy-params'}}; @@ -370,9 +396,9 @@ check_ha_policy_argument({longstr, <<"nodes">>}, Args) -> {Type, _} -> {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} end; -check_ha_policy_argument({longstr, Policy}, _Args) -> +check_ha_policy_argument({longstr, Policy}, _Args, _VHostPath) -> {error, {invalid_ha_policy, Policy}}; -check_ha_policy_argument({Type, _}, _Args) -> +check_ha_policy_argument({Type, _}, _Args, _VHostPath) -> {error, {unacceptable_type, Type}}. list() -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b3a620fa..7ca00298 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -49,7 +49,14 @@ stats_timer, msg_id_to_channel, ttl, - ttl_timer_ref + ttl_timer_ref, + publish_seqno, + unconfirmed_mq, + unconfirmed_qm, + blocked_op, + queue_monitors, + dlx, + dlx_routing_key }). -record(consumer, {tag, ack_required}). @@ -128,6 +135,13 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, + dlx = undefined, + dlx_routing_key = undefined, + publish_seqno = 1, + unconfirmed_mq = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), + blocked_op = undefined, + queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -149,6 +163,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, expiry_timer_ref = undefined, ttl = undefined, + publish_seqno = 1, + unconfirmed_mq = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), + blocked_op = undefined, + queue_monitors = dict:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -216,12 +235,22 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> undefined -> State1 end end, State, [{<<"x-expires">>, fun init_expires/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-dead-letter-exchange">>, fun init_dlx/2}, + {<<"x-dead-letter-routing-key">>, + fun init_dlx_routing_key/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). +init_dlx(DLX, State = #q{q = #amqqueue{name = #resource{ + virtual_host = VHostPath}}}) -> + State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLX)}. + +init_dlx_routing_key(RoutingKey, State) -> + State#q{dlx_routing_key = RoutingKey}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -456,7 +485,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> {CMs, MTC0} end end, {gb_trees:empty(), MTC}, MsgIds), - rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), + rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State#q{msg_id_to_channel = MTC1}. should_confirm_message(#delivery{msg_seq_no = undefined}, _State) -> @@ -494,7 +523,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Confirm = should_confirm_message(Delivery, State), case Confirm of - immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); + immediately -> rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]); _ -> ok end, case BQ:is_duplicate(Message, BQS) of @@ -674,10 +703,11 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ}) -> Now = now_micros(), BQS1 = BQ:dropwhile( fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + mk_dead_letter_fun(expired, State), BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). @@ -694,6 +724,223 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +mk_dead_letter_fun(_Reason, #q{dlx = undefined}) -> + fun(_MsgLookupFun, _AckTag, BQS) -> BQS end; +mk_dead_letter_fun(Reason, _State) -> + fun(MsgLookupFun, AckTag, BQS) -> + {Msg, BQS1} = MsgLookupFun(BQS), + gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}), + BQS1 + end. + +dead_letter_deleted_queue(undefined, State = #q{dlx = undefined}) -> + {stop, normal, State}; +dead_letter_deleted_queue(_From, State = #q{dlx = undefined, + backing_queue_state = BQS, + backing_queue = BQ}) -> + {stop, normal, {ok, BQ:len(BQS)}, State}; +dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + case BQ:len(BQS) of + 0 -> dead_letter_deleted_queue(From, State#q{dlx = undefined}); + _ -> BQS1 = BQ:dropwhile(fun (_) -> true end, + mk_dead_letter_fun(queue_deleted, State), + BQS), + noreply(State#q{blocked_op = {delete, {From, BQ:len(BQS)}}, + backing_queue_state = BQS1}) + end. + +dead_letter_msg(Msg, AckTag, Reason, + State = #q{publish_seqno = MsgSeqNo, + unconfirmed_mq = UMQ, + dlx = DLX, + backing_queue = BQ, + backing_queue_state = BQS}) -> + rabbit_exchange:lookup_or_die(DLX), + + {ok, _, QPids} = + rabbit_basic:publish( + rabbit_basic:delivery( + false, false, make_dead_letter_msg(DLX, Reason, Msg, State), + MsgSeqNo)), + State1 = lists:foldl(fun monitor_queue/2, State, QPids), + State2 = State1#q{publish_seqno = MsgSeqNo + 1}, + case QPids of + [] -> {_, BQS1} = BQ:ack([AckTag], undefined, BQS), + cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); + _ -> State3 = + lists:foldl( + fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> + case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:insert(MsgSeqNo, + MsgSeqNos), + UQM1 = gb_trees:update(QPid, MsgSeqNos1, + UQM), + State0#q{unconfirmed_qm = UQM1}; + none -> + S = gb_sets:singleton(MsgSeqNo), + UQM1 = gb_trees:insert(QPid, S, UQM), + State0#q{unconfirmed_qm = UQM1} + end + end, State2, QPids), + noreply(State3#q{ + unconfirmed_mq = + gb_trees:insert( + MsgSeqNo, {gb_sets:from_list(QPids), + AckTag}, UMQ)}) + end. + +monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> + case dict:is_key(QPid, QMons) of + true -> State; + false -> State#q{queue_monitors = + dict:store(QPid, erlang:monitor(process, QPid), + QMons)} + end. + +demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> + case dict:find(QPid, QMons) of + {ok, MRef} -> erlang:demonitor(MRef), + State#q{queue_monitors = dict:erase(QPid, QMons)}; + error -> State + end. + +handle_queue_down(QPid, State = #q{queue_monitors = QMons, + unconfirmed_mq = UMQ}) -> + case dict:find(QPid, QMons) of + error -> + noreply(State); + {ok, _} -> + #resource{name = QName} = qname(State), + rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]), + MsgSeqNos = [MsgSeqNo || + {MsgSeqNo, {QPids, _}} <- gb_trees:to_list(UMQ), + gb_sets:is_member(QPid, QPids)], + handle_confirm(MsgSeqNos, QPid, + State#q{queue_monitors = dict:erase(QPid, QMons)}) + end. + +handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM, + backing_queue = BQ, + backing_queue_state = BQS}) -> + {BQS3, UMQ3} = + lists:foldl( + fun (MsgSeqNo, {BQS1, UMQ1}) -> + {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1), + QPids1 = gb_sets:delete(QPid, QPids), + case gb_sets:is_empty(QPids1) of + true -> {_Guids, BQS2} = + BQ:ack([AckTag], undefined, BQS1), + {BQS2, gb_trees:delete(MsgSeqNo, UMQ1)}; + false -> {BQS1, gb_trees:update(MsgSeqNo, + {QPids1, AckTag}, UMQ1)} + end + end, {BQS, UMQ}, MsgSeqNos), + MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), + gb_sets:from_list(MsgSeqNos)), + State1 = case gb_sets:is_empty(MsgSeqNos1) of + false -> State#q{ + unconfirmed_qm = + gb_trees:update(QPid, MsgSeqNos1, UQM)}; + true -> demonitor_queue( + QPid, State#q{ + unconfirmed_qm = + gb_trees:delete(QPid, UQM)}) + end, + cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, + backing_queue_state = BQS3}). + +cleanup_after_confirm(State = #q{blocked_op = Op, + unconfirmed_mq = UMQ}) -> + State1 = State#q{blocked_op = undefined}, + case {gb_trees:is_empty(UMQ), Op} of + {true, {purge, {From, Count}}} -> + gen_server2:reply(From, {ok, Count}), + noreply(State1); + {true, {delete, {From, Count}}} -> + case From of + undefined -> ok; + _ -> gen_server2:reply(From, {ok, Count}) + end, + {stop, normal, State1}; + _ -> + noreply(State1#q{blocked_op = Op}) + end. + +already_been_here(#delivery{message = #basic_message{content = Content}}, + State) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + #resource{name = QueueName} = qname(State), + case Headers of + undefined -> + false; + _ -> + case rabbit_misc:table_lookup(Headers, <<"x-death">>) of + {array, DeathTables} -> + OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || + {table, D} <- DeathTables], + OldQueues1 = [QName || {longstr, QName} <- OldQueues], + case lists:member(QueueName, OldQueues1) of + true -> [QueueName | OldQueues1]; + _ -> false + end; + _ -> + false + end + end. + +make_dead_letter_msg(DLX, Reason, + Msg = #basic_message{content = Content, + exchange_name = Exchange, + routing_keys = RoutingKeys}, + State = #q{dlx_routing_key = DlxRoutingKey}) -> + Content1 = #content{ + properties = Props = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + + #resource{name = QName} = qname(State), + + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + DeathTable = {table, [{<<"reason">>, longstr, + list_to_binary(atom_to_list(Reason))}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, + rabbit_misc:now_ms() div 1000}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, + [{longstr, Key} || Key <- RoutingKeys1]}]}, + Headers1 = + case Headers of + undefined -> + [{<<"x-death">>, array, [DeathTable]}]; + _ -> + case rabbit_misc:table_lookup(Headers, <<"x-death">>) of + {array, Prior} -> + rabbit_misc:set_table_value( + Headers, <<"x-death">>, array, + [DeathTable | Prior]); + _ -> + [{<<"x-death">>, array, [DeathTable]} | Headers] + end + end, + {DeathRoutingKeys, Headers2} = + case DlxRoutingKey of + undefined -> {RoutingKeys, Headers1}; + _ -> {[DlxRoutingKey], + rabbit_misc:remove_table_value(Headers1, <<"CC">>)} + end, + Content2 = + rabbit_binary_generator:clear_encoded_content( + Content1#content{properties = Props#'P_basic'{headers = Headers2}}), + Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(), + routing_keys = DeathRoutingKeys, content = Content2}. + + now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> @@ -988,7 +1235,7 @@ handle_call(stat, _From, State) -> drop_expired_messages(ensure_expiry_timer(State)), reply({ok, BQ:len(BQS), active_consumer_count()}, State1); -handle_call({delete, IfUnused, IfEmpty}, _From, +handle_call({delete, IfUnused, IfEmpty}, From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), @@ -998,14 +1245,24 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> - {stop, normal, {ok, BQ:len(BQS)}, State} + dead_letter_deleted_queue(From, State) end; -handle_call(purge, _From, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +handle_call(purge, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS, + dlx = undefined}) -> {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); +handle_call(purge, From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:dropwhile( + fun (_) -> true end, + mk_dead_letter_fun(queue_purged, State), + BQS), + noreply(State#q{backing_queue_state = BQS1, + blocked_op = {purge, {From, BQ:len(BQS)}}}); + handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(subtract_acks( @@ -1015,25 +1272,49 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender, + msg_seq_no = MsgSeqNo}, + Flow}, State = #q{dlx = DLX}) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - case Flow of - flow -> Key = {ch_publisher, Sender}, - case get(Key) of - undefined -> put(Key, erlang:monitor(process, Sender)); - _ -> ok - end, - credit_flow:ack(Sender); - noflow -> ok - end, - noreply(deliver_or_enqueue(Delivery, State)); + ShouldDeliver = + case DLX of + undefined -> + true; + _ -> + case already_been_here(Delivery, State) of + false -> true; + Qs -> rabbit_log:warning( + "Message dropped. Dead-letter queues " ++ + "cycle detected: ~p~n", [Qs]), + rabbit_misc:confirm_to_sender(Sender, + [MsgSeqNo]), + false + end + end, + case ShouldDeliver of + false -> noreply(State); + true -> case Flow of + flow -> + Key = {ch_publisher, Sender}, + case get(Key) of + undefined -> put(Key, erlang:monitor(process, + Sender)); + _ -> ok + end, + credit_flow:ack(Sender); + noflow -> + ok + end, + noreply(deliver_or_enqueue(Delivery, State)) + end; handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), + {_Guids, BQS1} = + BQ:ack(AckTags, undefined, BQS), State1#q{backing_queue_state = BQS1} end)); @@ -1044,13 +1325,15 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> backing_queue_state = BQS}) -> case Requeue of true -> requeue_and_run(AckTags, State1); - false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), + false -> Fun = mk_dead_letter_fun(rejected, State), + {_Guids, BQS1} = + BQ:ack(AckTags, Fun, BQS), State1#q{backing_queue_state = BQS1} end end)); handle_cast(delete_immediately, State) -> - {stop, normal, State}; + dead_letter_deleted_queue(undefined, State); handle_cast({unblock, ChPid}, State) -> noreply( @@ -1101,11 +1384,17 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), emit_consumer_created(Ch, CTag, true, AckRequired) end, - noreply(State). + noreply(State); + +handle_cast({confirm, MsgSeqNos, QPid}, State) -> + handle_confirm(MsgSeqNos, QPid, State); + +handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> + dead_letter_msg(Msg, AckTag, Reason, State). handle_info(maybe_expire, State) -> case is_unused(State) of - true -> {stop, normal, State}; + true -> dead_letter_deleted_queue(undefined, State); false -> noreply(ensure_expiry_timer(State)) end; @@ -1130,8 +1419,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, {stop, normal, State}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of - {ok, NewState} -> noreply(NewState); - {stop, NewState} -> {stop, normal, NewState} + {ok, State1} -> handle_queue_down(DownPid, State1); + {stop, State1} -> {stop, normal, State1} end; handle_info(update_ram_duration, State = #q{backing_queue = BQ, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 364eb8f6..50e47462 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -95,15 +95,19 @@ behaviour_info(callbacks) -> {drain_confirmed, 1}, %% Drop messages from the head of the queue while the supplied - %% predicate returns true. - {dropwhile, 2}, + %% predicate returns true. A callback function is supplied + %% allowing callers access to messages that are about to be + %% dropped. + {dropwhile, 3}, %% Produce the next message. {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten - %% about. Must return 1 msg_id per Ack, in the same order as Acks. - {ack, 2}, + %% about. Must return 1 msg_id per Ack, in the same order as + %% Acks. A callback function is supplied allowing callers to + %% access messages that are being acked. + {ack, 3}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index b8211d43..2777714f 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -19,7 +19,8 @@ -include("rabbit_framing.hrl"). -export([publish/4, publish/6, publish/1, - message/3, message/4, properties/1, delivery/4]). + message/3, message/4, properties/1, delivery/4, + header_routes/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a101886f..3098b621 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,7 +21,7 @@ -behaviour(gen_server2). -export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2, confirm/2]). +-export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). @@ -87,7 +87,6 @@ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). --spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -134,9 +133,6 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). -confirm(Pid, MsgSeqNos) -> - gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). - list() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), rabbit_channel, list_local, []). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 64a4a737..8d7b9ded 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,8 +17,8 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/3, + requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3]). @@ -172,12 +172,13 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Fun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - set_delivered = SetDelivered }) -> +dropwhile(Pred, MsgFun, + State = #state{gm = GM, + backing_queue = BQ, + set_delivered = SetDelivered, + backing_queue_state = BQS }) -> Len = BQ:len(BQS), - BQS1 = BQ:dropwhile(Fun, BQS), + BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), Dropped = Len - BQ:len(BQS1), SetDelivered1 = lists:max([0, SetDelivered - Dropped]), ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), @@ -235,15 +236,15 @@ fetch(AckRequired, State = #state { gm = GM, ack_msg_id = AM1 }} end. -ack(AckTags, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - ack_msg_id = AM }) -> - {MsgIds, BQS1} = BQ:ack(AckTags, BQS), +ack(AckTags, MsgFun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + {MsgIds, BQS1} = BQ:ack(AckTags, MsgFun, BQS), AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), case MsgIds of [] -> ok; - _ -> ok = gm:broadcast(GM, {ack, MsgIds}) + _ -> ok = gm:broadcast(GM, {ack, MsgFun, MsgIds}) end, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9bf89bce..29a2e8bd 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -430,7 +430,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> Acc end end, {gb_trees:empty(), MS}, MsgIds), - rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), + rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State #state { msg_id_status = MS1 }. handle_process_result({ok, State}) -> noreply(State); @@ -665,7 +665,7 @@ maybe_enqueue_message( {ok, {confirmed, ChPid}} -> %% BQ has confirmed it but we didn't know what the %% msg_seq_no was at the time. We do now! - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { sender_queues = SQ1, msg_id_status = dict:erase(MsgId, MS) }; @@ -682,7 +682,7 @@ maybe_enqueue_message( msg_id_status = dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; immediately -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 } @@ -744,7 +744,7 @@ process_instruction( {MQ2, PendingCh, dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; immediately -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), {MQ2, PendingCh, MS} end; {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> @@ -834,12 +834,12 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, %% we must be shorter than the master State end}; -process_instruction({ack, MsgIds}, +process_instruction({ack, MsgFun, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), - {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), + {MsgIds1, BQS1} = BQ:ack(AckTags, MsgFun, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9a6879b1..e4f8b687 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -23,11 +23,12 @@ protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). -export([dirty_read/1]). --export([table_lookup/2, set_table_value/4]). +-export([table_lookup/2, set_table_value/4, remove_table_value/2]). -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). +-export([confirm_to_sender/2]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). @@ -108,6 +109,8 @@ (rabbit_framing:amqp_table(), binary(), rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value()) -> rabbit_framing:amqp_table()). +-spec(remove_table_value/2 :: + (rabbit_framing:amqp_table(), binary()) -> rabbit_framing:amqp_table()). -spec(r/2 :: (rabbit_types:vhost(), K) -> rabbit_types:r3(rabbit_types:vhost(), K, '_') @@ -299,6 +302,12 @@ set_table_value(Table, Key, Type, Value) -> sort_field_table( lists:keystore(Key, 1, Table, {Key, Type, Value})). +remove_table_value(Table, Key) -> + case lists:keytake(Key, 1, Table) of + false -> Table; + {value, _, Table2} -> Table2 + end. + r(#resource{virtual_host = VHostPath}, Kind, Name) when is_binary(Name) -> #resource{virtual_host = VHostPath, kind = Kind, name = Name}; @@ -372,6 +381,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> end, Mod]). +confirm_to_sender(Pid, MsgSeqNos) -> + gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). + throw_on_error(E, Thunk) -> case Thunk() of {error, Reason} -> throw({E, Reason}); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7a96af26..ba0fffd6 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2353,7 +2353,9 @@ test_dropwhile(VQ0) -> VQ2 = rabbit_variable_queue:dropwhile( fun(#message_properties { expiry = Expiry }) -> Expiry =< 5 - end, VQ1), + end, + dummy_msg_fun(), + VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2367,13 +2369,17 @@ test_dropwhile(VQ0) -> VQ4. +dummy_msg_fun() -> fun(_Fun, _Extra, State) -> State end. + test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - VQ3 = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2), + VQ3 = rabbit_variable_queue:dropwhile( + fun(_) -> false end, dummy_msg_fun(), VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5). + rabbit_variable_queue:dropwhile( + fun(_) -> false end, dummy_msg_fun(), VQ5). test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -2398,7 +2404,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, undefined, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2408,7 +2414,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], undefined, VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2442,7 +2448,8 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, + undefined, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 52eb168a..64285be9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/2, fetch/2, ack/2, requeue/2, len/1, is_empty/1, + dropwhile/3, fetch/2, ack/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, @@ -581,15 +581,22 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, State) -> +dropwhile(Pred, MsgFun, State) -> case queue_out(State) of {empty, State1} -> a(State1); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case Pred(MsgProps) of - true -> {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, State2); - false -> a(in_r(MsgStatus, State1)) + case {Pred(MsgProps), MsgFun} of + {true, undefined} -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, MsgFun, State2); + {true, _} -> + {{_, _, AckTag, _}, State2} = + internal_fetch(true, MsgStatus, State1), + State3 = MsgFun(read_msg_callback(MsgStatus), AckTag, State2), + dropwhile(Pred, MsgFun, State3); + {false, _} -> + a(in_r(MsgStatus, State1)) end end. @@ -605,9 +612,27 @@ fetch(AckRequired, State) -> {Res, a(State3)} end. -ack([], State) -> +read_msg_callback(#msg_status { msg = undefined, + msg_id = MsgId, + is_persistent = IsPersistent }) -> + fun(State) -> read_msg_callback1(MsgId, IsPersistent, State) end; + +read_msg_callback(#msg_status{ msg = Msg }) -> + fun(State) -> {Msg, State} end; + +read_msg_callback({IsPersistent, MsgId, _MsgProps}) -> + fun(State) -> read_msg_callback1(MsgId, IsPersistent, State) end. + +read_msg_callback1(MsgId, IsPersistent, + State = #vqstate{ msg_store_clients = MSCState }) -> + {{ok, Msg = #basic_message{}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {Msg, State #vqstate { msg_store_clients = MSCState1 }}. + +ack([], _Fun, State) -> {[], State}; -ack(AckTags, State) -> + +ack(AckTags, undefined, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, @@ -616,7 +641,7 @@ ack(AckTags, State) -> lists:foldl( fun (SeqId, {Acc, State2}) -> {MsgStatus, State3} = remove_pending_ack(SeqId, State2), - {accumulate_ack(MsgStatus, Acc), State3} + {accumulate_ack(MsgStatus, Acc), State3} end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) @@ -626,13 +651,20 @@ ack(AckTags, State) -> {lists:reverse(AllMsgIds), a(State1 #vqstate { index_state = IndexState1, persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) })}. + ack_out_counter = AckOutCount + length(AckTags) })}; + +ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) -> + State2 = lists:foldl(fun(SeqId, State1) -> + AckEntry = gb_trees:get(SeqId, PA), + MsgFun(read_msg_callback(AckEntry), SeqId, State1) + end, State, AckTags), + {[], State2}. requeue(AckTags, #vqstate { delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len } = State) -> + q3 = Q3, + q4 = Q4, + in_counter = InCounter, + len = Len } = State) -> {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], beta_limit(Q3), fun publish_alpha/2, State), |