diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-01-04 12:12:11 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-01-04 12:12:11 +0000 |
commit | 3c29df8f13138df0a600ca5f642f0048784d6317 (patch) | |
tree | 9d5f531673399f0ef860b9aaf82a4dedea7964b0 | |
parent | 3179a6148228279d5aa491cbec401d4976375179 (diff) | |
parent | 653d599979edb2f0a38c6916c2de49a793d4fb56 (diff) | |
download | rabbitmq-server-3c29df8f13138df0a600ca5f642f0048784d6317.tar.gz |
Merged default into bug19375
-rw-r--r-- | src/dtree.erl | 24 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 50 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 5 |
5 files changed, 85 insertions, 13 deletions
diff --git a/src/dtree.erl b/src/dtree.erl index ca2d30cf..c59243bb 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -32,7 +32,7 @@ -module(dtree). --export([empty/0, insert/4, take/3, take/2, take_all/2, +-export([empty/0, insert/4, take/3, take/2, take_all/2, take_prim/2, is_defined/2, is_empty/1, smallest/1, size/1]). %%---------------------------------------------------------------------------- @@ -53,6 +53,7 @@ -spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(take_prim/2 :: (pk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -spec(smallest/1 :: (?MODULE()) -> kv()). @@ -120,6 +121,13 @@ take_all(SK, {P, S}) -> {KVs, {P1, prune(SKS, PKS, S)}} end. +%% Drop the entry with the given primary key +take_prim(PK, {P, S} = DTree) -> + case gb_trees:lookup(PK, P) of + none -> {[], DTree}; + {value, {SKS, V}} -> {[{PK, V}], take_prim2(PK, SKS, DTree)} + end. + is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). is_empty({P, _S}) -> gb_trees:is_empty(P). @@ -149,6 +157,20 @@ take_all2(PKS, P) -> gb_trees:delete(PK, P0)} end, {[], gb_sets:empty(), P}, PKS). +take_prim2(PK, SKS, {P, S}) -> + {gb_trees:delete(PK, P), + rabbit_misc:gb_trees_fold( + fun (SK0, PKS, S1) -> + case gb_sets:is_member(SK0, SKS) of + false -> S1; + true -> PKS1 = gb_sets:delete(PK, PKS), + case gb_sets:is_empty(PKS1) of + true -> gb_trees:delete(SK0, S1); + false -> gb_trees:update(SK0, PKS1, S1) + end + end + end, S, S)}. + prune(SKS, PKS, S) -> gb_sets:fold(fun (SK0, S0) -> PKS1 = gb_trees:get(SK0, S0), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1a270364..dacf4f0a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -389,7 +389,8 @@ check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, - {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, + {<<"x-maxdepth">>, fun check_maxdepth_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of undefined -> ok; TypeVal -> case Fun(TypeVal, Args) of @@ -412,6 +413,13 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. +check_maxdepth_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val > 0 -> ok; + ok -> {error, {value_not_positive, Val}}; + Error -> Error + end. + check_expires_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f9614517..f588c024 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,7 +54,8 @@ delayed_stop, queue_monitors, dlx, - dlx_routing_key + dlx_routing_key, + max_depth }). -record(consumer, {tag, ack_required}). @@ -134,6 +135,7 @@ init(Q) -> senders = pmon:new(), dlx = undefined, dlx_routing_key = undefined, + max_depth = undefined, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, @@ -159,6 +161,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, expiry_timer_ref = undefined, ttl = undefined, + max_depth = undefined, senders = Senders, publish_seqno = 1, unconfirmed = dtree:empty(), @@ -258,7 +261,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> [{<<"x-expires">>, fun init_expires/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-maxdepth">>, fun init_maxdepth/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -270,6 +274,9 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> init_dlx_routing_key(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. +init_maxdepth(MaxDepth, State) -> + State#q{max_depth = MaxDepth}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -553,7 +560,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, {false, State#q{backing_queue_state = BQS1}} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, +deliver_or_enqueue(Delivery = #delivery{message = Message}, Delivered, State) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State), @@ -563,10 +570,37 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% The next one is an optimisation {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); - {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - ensure_ttl_timer(Props#message_properties.expiry, - State2#q{backing_queue_state = BQS1}) + {false, State2} -> + case publish_max(Delivery, Props, Delivered, State2) of + nopub -> State2; + BQS1 -> ensure_ttl_timer(Props#message_properties.expiry, + State2#q{backing_queue_state = BQS1}) + end + end. + +publish_max(#delivery{message = Message, + sender = SenderPid}, + Props, Delivered, #q{backing_queue = BQ, + backing_queue_state = BQS, + max_depth = undefined}) -> + BQ:publish(Message, Props, Delivered, SenderPid, BQS); +publish_max(#delivery{message = Message, + msg_seq_no = MsgSeqNo, + sender = SenderPid}, + Props, Delivered, #q{backing_queue = BQ, + backing_queue_state = BQS, + max_depth = MaxDepth}) -> + case {BQ:depth(BQS) >= MaxDepth, BQ:len(BQS) =:= 0} of + {false, _} -> + BQ:publish(Message, Props, Delivered, SenderPid, BQS); + {true, true} -> + (dead_letter_fun(maxdepth))([{Message, undefined}]), + rabbit_misc:confirm_all(SenderPid, MsgSeqNo), + nopub; + {true, false} -> + {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS), + (dead_letter_fun(maxdepth))([{Msg, AckTag}]), + BQ:publish(Message, Props, Delivered, SenderPid, BQS1) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, @@ -800,7 +834,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, unconfirmed = UC, backing_queue = BQ, backing_queue_state = BQS}) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), + {_Guids, BQS1} = BQ:ack([Ack || Ack <- AckTags, Ack /= undefined], BQS), State1 = State#q{backing_queue_state = BQS1}, case dtree:is_empty(UC) andalso DS =/= undefined of true -> case DS of diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1af60de8..885452ce 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -228,8 +228,9 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - {confirm, _MsgSeqNos, _QPid} -> 5; - _ -> 0 + {confirm, _MsgSeqNos, _QPid} -> 5; + {confirm_all, _MsgSeqNo, _QPid} -> 5; + _ -> 0 end. prioritise_info(Msg, _State) -> @@ -556,6 +557,10 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), record_confirms(MXs, State#ch{unconfirmed = UC1}). +confirm_all(MsgSeqNo, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take_prim(MsgSeqNo, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}). + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index edaa7198..c6c8676f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -28,7 +28,7 @@ -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). --export([confirm_to_sender/2]). +-export([confirm_to_sender/2, confirm_all/2]). -export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1, filter_exit_map/2]). -export([with_user/2, with_user_and_vhost/3]). @@ -427,6 +427,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> confirm_to_sender(Pid, MsgSeqNos) -> gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). +confirm_all(Pid, MsgSeqNo) -> + gen_server2:cast(Pid, {confirm_all, MsgSeqNo, self()}). + %% @doc Halts the emulator returning the given status code to the os. %% On Windows this function will block indefinitely so as to give the io %% subsystem time to flush stdout completely. |