diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-02 13:55:46 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-02 13:55:46 +0100 |
commit | eb65a2e9e4e31379067859f5c544354d4b002905 (patch) | |
tree | 3627078dbf802e4db46bff509fe90dcb019800dc | |
parent | 46fd22bc2a1eb42cd26353b0bea2862c2b5ec5ea (diff) | |
parent | 78c5c9153de57e8859fc342456c7d7b71924b425 (diff) | |
download | rabbitmq-server-eb65a2e9e4e31379067859f5c544354d4b002905.tar.gz |
Merging default into bug24455
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 19 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 32 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 1 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 21 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 24 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 33 |
7 files changed, 87 insertions, 80 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e3a2ca90..fe1ddba0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -454,33 +454,20 @@ deliver_from_queue_deliver(AckRequired, false, State) -> confirm_messages([], State) -> State; confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> - {CMs, MTC1} = lists:foldl( - fun(MsgId, {CMs, MTC0}) -> - case dict:find(MsgId, MTC0) of - {ok, {ChPid, MsgSeqNo}} -> - {gb_trees_cons(ChPid, MsgSeqNo, CMs), - dict:erase(MsgId, MTC0)}; - _ -> - {CMs, MTC0} - end - end, {gb_trees:empty(), MTC}, MsgIds), - gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), + {CMs, MTC1} = + lists:foldl( + fun(MsgId, {CMs, MTC0}) -> + case dict:find(MsgId, MTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs), + dict:erase(MsgId, MTC0)}; + _ -> + {CMs, MTC0} + end + end, {gb_trees:empty(), MTC}, MsgIds), + rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), State#q{msg_id_to_channel = MTC1}. -gb_trees_foreach(_, none) -> - ok; -gb_trees_foreach(Fun, {Key, Val, It}) -> - Fun(Key, Val), - gb_trees_foreach(Fun, gb_trees:next(It)); -gb_trees_foreach(Fun, Tree) -> - gb_trees_foreach(Fun, gb_trees:next(gb_trees:iterator(Tree))). - -gb_trees_cons(Key, Value, Tree) -> - case gb_trees:lookup(Key, Tree) of - {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); - none -> gb_trees:insert(Key, [Value], Tree) - end. - should_confirm_message(#delivery{msg_seq_no = undefined}, _State) -> never; should_confirm_message(#delivery{sender = ChPid, diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 68511a32..494f3203 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -122,10 +122,9 @@ build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc, build_heartbeat_frame() -> create_frame(?FRAME_HEARTBEAT, 0, <<>>). -create_frame(TypeInt, ChannelInt, PayloadBin) when is_binary(PayloadBin) -> - [<<TypeInt:8, ChannelInt:16, (size(PayloadBin)):32>>, PayloadBin, <<?FRAME_END>>]; create_frame(TypeInt, ChannelInt, Payload) -> - create_frame(TypeInt, ChannelInt, list_to_binary(Payload)). + [<<TypeInt:8, ChannelInt:16, (iolist_size(Payload)):32>>, Payload, + ?FRAME_END]. %% table_field_to_binary supports the AMQP 0-8/0-9 standard types, S, %% I, D, T and F, as well as the QPid extensions b, d, f, l, s, t, x, @@ -194,13 +193,13 @@ generate_array(Array) when is_list(Array) -> short_string_to_binary(String) when is_binary(String) -> Len = size(String), - if Len < 256 -> [<<(size(String)):8>>, String]; + if Len < 256 -> [<<Len:8>>, String]; true -> exit(content_properties_shortstr_overflow) end; short_string_to_binary(String) -> - StringLength = length(String), - if StringLength < 256 -> [<<StringLength:8>>, String]; - true -> exit(content_properties_shortstr_overflow) + Len = length(String), + if Len < 256 -> [<<Len:8>>, String]; + true -> exit(content_properties_shortstr_overflow) end. long_string_to_binary(String) when is_binary(String) -> @@ -240,11 +239,11 @@ encode_properties(Bit, [T | TypeList], [Value | ValueList], FirstShortAcc, Flags encode_property(shortstr, String) -> Len = size(String), - if Len < 256 -> <<Len:8/unsigned, String:Len/binary>>; + if Len < 256 -> <<Len:8, String:Len/binary>>; true -> exit(content_properties_shortstr_overflow) end; encode_property(longstr, String) -> - Len = size(String), <<Len:32/unsigned, String:Len/binary>>; + Len = size(String), <<Len:32, String:Len/binary>>; encode_property(octet, Int) -> <<Int:8/unsigned>>; encode_property(shortint, Int) -> @@ -261,7 +260,7 @@ encode_property(table, Table) -> check_empty_content_body_frame_size() -> %% Intended to ensure that EMPTY_CONTENT_BODY_FRAME_SIZE is %% defined correctly. - ComputedSize = size(list_to_binary(create_frame(?FRAME_BODY, 0, <<>>))), + ComputedSize = iolist_size(create_frame(?FRAME_BODY, 0, <<>>)), if ComputedSize == ?EMPTY_CONTENT_BODY_FRAME_SIZE -> ok; true -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9eb77c32..3c61447a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -374,15 +374,20 @@ noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate). noreply(Mask, NewState, Timeout) -> {noreply, next_state(Mask, NewState), Timeout}. +-define(MASKED_CALL(Fun, Mask, State), + case lists:member(Fun, Mask) of + true -> State; + false -> Fun(State) + end). + next_state(Mask, State) -> - lists:foldl(fun (ensure_stats_timer, State1) -> ensure_stats_timer(State1); - (send_confirms, State1) -> send_confirms(State1) - end, State, [ensure_stats_timer, send_confirms] -- Mask). + State1 = ?MASKED_CALL(ensure_stats_timer, Mask, State), + State2 = ?MASKED_CALL(send_confirms, Mask, State1), + State2. ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> - ChPid = self(), State#ch{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, ChPid, emit_stats)}. + StatsTimer, self(), emit_stats)}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -1318,16 +1323,11 @@ notify_queues(State = #ch{consumer_mapping = Consumers}) -> State#ch{state = closing}}. fold_per_queue(F, Acc0, UAQ) -> - D = rabbit_misc:queue_fold( - fun ({_DTag, _CTag, {QPid, MsgId}}, D) -> - %% dict:append would avoid the lists:reverse in - %% handle_message({recover, true}, ...). However, it - %% is significantly slower when going beyond a few - %% thousand elements. - rabbit_misc:dict_cons(QPid, MsgId, D) - end, dict:new(), UAQ), - dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, - Acc0, D). + T = rabbit_misc:queue_fold( + fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> + rabbit_misc:gb_trees_cons(QPid, MsgId, T) + end, gb_trees:empty(), UAQ), + rabbit_misc:gb_trees_fold(F, Acc0, T). enable_limiter(State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> @@ -1415,6 +1415,8 @@ send_nacks(MXs, State = #ch{tx_status = none}) -> send_nacks(_, State) -> maybe_complete_tx(State#ch{tx_status = failed}). +send_confirms(State = #ch{tx_status = none, confirmed = []}) -> + State; send_confirms(State = #ch{tx_status = none, confirmed = C}) -> {MsgSeqNos, State1} = lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 24468a01..8a08d4b6 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -251,6 +251,7 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> end, {[], Queues}, Queues), case length(QList) of 0 -> ok; + 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case L -> %% We randomly vary the position of queues in the list, %% thus ensuring that each queue has an equal chance of diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 43962491..f423760a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -390,9 +390,9 @@ needs_confirming(_Delivery, _State) -> immediately. confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> - {MS1, CMs} = + {CMs, MS1} = lists:foldl( - fun (MsgId, {MSN, CMsN} = Acc) -> + fun (MsgId, {CMsN, MSN} = Acc) -> %% We will never see 'discarded' here case dict:find(MsgId, MSN) of error -> @@ -402,12 +402,12 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> {ok, {published, ChPid}} -> %% Still not seen it from the channel, just %% record that it's been confirmed. - {dict:store(MsgId, {confirmed, ChPid}, MSN), CMsN}; + {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)}; {ok, {published, ChPid, MsgSeqNo}} -> %% Seen from both GM and Channel. Can now %% confirm. - {dict:erase(MsgId, MSN), - gb_trees_cons(ChPid, MsgSeqNo, CMsN)}; + {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN), + dict:erase(MsgId, MSN)}; {ok, {confirmed, _ChPid}} -> %% It's already been confirmed. This is %% probably it's been both sync'd to disk @@ -416,17 +416,10 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> %% channel. Nothing to do here. Acc end - end, {MS, gb_trees:empty()}, MsgIds), - [ok = rabbit_channel:confirm(ChPid, MsgSeqNos) - || {ChPid, MsgSeqNos} <- gb_trees:to_list(CMs)], + end, {gb_trees:empty(), MS}, MsgIds), + rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), State #state { msg_id_status = MS1 }. -gb_trees_cons(Key, Value, Tree) -> - case gb_trees:lookup(Key, Tree) of - {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); - none -> gb_trees:insert(Key, [Value], Tree) - end. - handle_process_result({ok, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index f2dc97fd..b1cf45e7 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -44,7 +44,8 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([dict_cons/3, orddict_cons/3]). +-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). +-export([gb_trees_fold/3, gb_trees_foreach/2]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). @@ -171,6 +172,10 @@ -> boolean()). -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). +-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()). +-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A). +-spec(gb_trees_foreach/2 :: + (fun ((any(), any()) -> any()), gb_tree()) -> 'ok'). -spec(get_options/2 :: ([optdef()], [string()]) -> {[string()], [{string(), any()}]}). -spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). @@ -669,6 +674,23 @@ dict_cons(Key, Value, Dict) -> orddict_cons(Key, Value, Dict) -> orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). +gb_trees_cons(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); + none -> gb_trees:insert(Key, [Value], Tree) + end. + +gb_trees_fold(Fun, Acc, Tree) -> + gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))). + +gb_trees_fold1(_Fun, Acc, none) -> + Acc; +gb_trees_fold1(Fun, Acc, {Key, Val, It}) -> + gb_trees_fold1(Fun, Fun(Key, Val, Acc), gb_trees:next(It)). + +gb_trees_foreach(Fun, Tree) -> + gb_trees_fold(fun (Key, Val, Acc) -> Fun(Key, Val), Acc end, ok, Tree). + %% Separate flags and options from arguments. %% get_options([{flag, "-q"}, {option, "-p", "/"}], %% ["set_permissions","-p","/","guest", diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a9b78be9..347749e7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -322,7 +322,7 @@ q3 :: queue(), q4 :: queue(), next_seq_id :: seq_id(), - pending_ack :: dict(), + pending_ack :: gb_tree(), ram_ack_index :: gb_tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, @@ -534,7 +534,11 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, unconfirmed = UC1 }))}. drain_confirmed(State = #vqstate { confirmed = C }) -> - {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. + case gb_sets:is_empty(C) of + true -> {[], State}; %% common case + false -> {gb_sets:to_list(C), State #vqstate { + confirmed = gb_sets:new() }} + end. dropwhile(Pred, State) -> case queue_out(State) of @@ -727,7 +731,7 @@ status(#vqstate { {q3 , ?QUEUE:len(Q3)}, {q4 , ?QUEUE:len(Q4)}, {len , Len}, - {pending_acks , dict:size(PA)}, + {pending_acks , gb_trees:size(PA)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, {ram_ack_count , gb_trees:size(RAI)}, @@ -864,7 +868,7 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) -> true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; - false -> case dict:is_key(SeqId, PA) of + false -> case gb_trees:is_defined(SeqId, PA) of false -> {?QUEUE:in_r( m(#msg_status { seq_id = SeqId, @@ -943,7 +947,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, q3 = ?QUEUE:new(), q4 = ?QUEUE:new(), next_seq_id = NextSeqId, - pending_ack = dict:new(), + pending_ack = gb_trees:empty(), ram_ack_index = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, @@ -1201,15 +1205,14 @@ record_pending_ack(#msg_status { seq_id = SeqId, true -> {m(trim_msg_status(MsgStatus)), RAI}; false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} end, - PA1 = dict:store(SeqId, AckEntry, PA), - State #vqstate { pending_ack = PA1, + State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA), ram_ack_index = RAI1, ack_in_counter = AckInCount + 1}. remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> - {dict:fetch(SeqId, PA), - State #vqstate { pending_ack = dict:erase(SeqId, PA), + {gb_trees:get(SeqId, PA), + State #vqstate { pending_ack = gb_trees:delete(SeqId, PA), ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}. purge_pending_ack(KeepPersistent, @@ -1217,10 +1220,10 @@ purge_pending_ack(KeepPersistent, index_state = IndexState, msg_store_clients = MSCState }) -> {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = - dict:fold(fun (_SeqId, MsgStatus, Acc) -> - accumulate_ack(MsgStatus, Acc) - end, accumulate_ack_init(), PA), - State1 = State #vqstate { pending_ack = dict:new(), + rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) -> + accumulate_ack(MsgStatus, Acc) + end, accumulate_ack_init(), PA), + State1 = State #vqstate { pending_ack = gb_trees:empty(), ram_ack_index = gb_trees:empty() }, case KeepPersistent of true -> case orddict:find(false, MsgIdsByStore) of @@ -1503,10 +1506,10 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, false -> {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI), MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} = - dict:fetch(SeqId, PA), + gb_trees:get(SeqId, PA), {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - PA1 = dict:store(SeqId, m(trim_msg_status(MsgStatus1)), PA), + PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA), limit_ram_acks(Quota - 1, State1 #vqstate { pending_ack = PA1, ram_ack_index = RAI1 }) |