summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-10-02 13:55:46 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-10-02 13:55:46 +0100
commiteb65a2e9e4e31379067859f5c544354d4b002905 (patch)
tree3627078dbf802e4db46bff509fe90dcb019800dc
parent46fd22bc2a1eb42cd26353b0bea2862c2b5ec5ea (diff)
parent78c5c9153de57e8859fc342456c7d7b71924b425 (diff)
downloadrabbitmq-server-eb65a2e9e4e31379067859f5c544354d4b002905.tar.gz
Merging default into bug24455
-rw-r--r--src/rabbit_amqqueue_process.erl37
-rw-r--r--src/rabbit_binary_generator.erl19
-rw-r--r--src/rabbit_channel.erl32
-rw-r--r--src/rabbit_limiter.erl1
-rw-r--r--src/rabbit_mirror_queue_slave.erl21
-rw-r--r--src/rabbit_misc.erl24
-rw-r--r--src/rabbit_variable_queue.erl33
7 files changed, 87 insertions, 80 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e3a2ca90..fe1ddba0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -454,33 +454,20 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
confirm_messages([], State) ->
State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
- {CMs, MTC1} = lists:foldl(
- fun(MsgId, {CMs, MTC0}) ->
- case dict:find(MsgId, MTC0) of
- {ok, {ChPid, MsgSeqNo}} ->
- {gb_trees_cons(ChPid, MsgSeqNo, CMs),
- dict:erase(MsgId, MTC0)};
- _ ->
- {CMs, MTC0}
- end
- end, {gb_trees:empty(), MTC}, MsgIds),
- gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
+ {CMs, MTC1} =
+ lists:foldl(
+ fun(MsgId, {CMs, MTC0}) ->
+ case dict:find(MsgId, MTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ dict:erase(MsgId, MTC0)};
+ _ ->
+ {CMs, MTC0}
+ end
+ end, {gb_trees:empty(), MTC}, MsgIds),
+ rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
State#q{msg_id_to_channel = MTC1}.
-gb_trees_foreach(_, none) ->
- ok;
-gb_trees_foreach(Fun, {Key, Val, It}) ->
- Fun(Key, Val),
- gb_trees_foreach(Fun, gb_trees:next(It));
-gb_trees_foreach(Fun, Tree) ->
- gb_trees_foreach(Fun, gb_trees:next(gb_trees:iterator(Tree))).
-
-gb_trees_cons(Key, Value, Tree) ->
- case gb_trees:lookup(Key, Tree) of
- {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
- none -> gb_trees:insert(Key, [Value], Tree)
- end.
-
should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
never;
should_confirm_message(#delivery{sender = ChPid,
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 68511a32..494f3203 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -122,10 +122,9 @@ build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
build_heartbeat_frame() ->
create_frame(?FRAME_HEARTBEAT, 0, <<>>).
-create_frame(TypeInt, ChannelInt, PayloadBin) when is_binary(PayloadBin) ->
- [<<TypeInt:8, ChannelInt:16, (size(PayloadBin)):32>>, PayloadBin, <<?FRAME_END>>];
create_frame(TypeInt, ChannelInt, Payload) ->
- create_frame(TypeInt, ChannelInt, list_to_binary(Payload)).
+ [<<TypeInt:8, ChannelInt:16, (iolist_size(Payload)):32>>, Payload,
+ ?FRAME_END].
%% table_field_to_binary supports the AMQP 0-8/0-9 standard types, S,
%% I, D, T and F, as well as the QPid extensions b, d, f, l, s, t, x,
@@ -194,13 +193,13 @@ generate_array(Array) when is_list(Array) ->
short_string_to_binary(String) when is_binary(String) ->
Len = size(String),
- if Len < 256 -> [<<(size(String)):8>>, String];
+ if Len < 256 -> [<<Len:8>>, String];
true -> exit(content_properties_shortstr_overflow)
end;
short_string_to_binary(String) ->
- StringLength = length(String),
- if StringLength < 256 -> [<<StringLength:8>>, String];
- true -> exit(content_properties_shortstr_overflow)
+ Len = length(String),
+ if Len < 256 -> [<<Len:8>>, String];
+ true -> exit(content_properties_shortstr_overflow)
end.
long_string_to_binary(String) when is_binary(String) ->
@@ -240,11 +239,11 @@ encode_properties(Bit, [T | TypeList], [Value | ValueList], FirstShortAcc, Flags
encode_property(shortstr, String) ->
Len = size(String),
- if Len < 256 -> <<Len:8/unsigned, String:Len/binary>>;
+ if Len < 256 -> <<Len:8, String:Len/binary>>;
true -> exit(content_properties_shortstr_overflow)
end;
encode_property(longstr, String) ->
- Len = size(String), <<Len:32/unsigned, String:Len/binary>>;
+ Len = size(String), <<Len:32, String:Len/binary>>;
encode_property(octet, Int) ->
<<Int:8/unsigned>>;
encode_property(shortint, Int) ->
@@ -261,7 +260,7 @@ encode_property(table, Table) ->
check_empty_content_body_frame_size() ->
%% Intended to ensure that EMPTY_CONTENT_BODY_FRAME_SIZE is
%% defined correctly.
- ComputedSize = size(list_to_binary(create_frame(?FRAME_BODY, 0, <<>>))),
+ ComputedSize = iolist_size(create_frame(?FRAME_BODY, 0, <<>>)),
if ComputedSize == ?EMPTY_CONTENT_BODY_FRAME_SIZE ->
ok;
true ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9eb77c32..3c61447a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -374,15 +374,20 @@ noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate).
noreply(Mask, NewState, Timeout) ->
{noreply, next_state(Mask, NewState), Timeout}.
+-define(MASKED_CALL(Fun, Mask, State),
+ case lists:member(Fun, Mask) of
+ true -> State;
+ false -> Fun(State)
+ end).
+
next_state(Mask, State) ->
- lists:foldl(fun (ensure_stats_timer, State1) -> ensure_stats_timer(State1);
- (send_confirms, State1) -> send_confirms(State1)
- end, State, [ensure_stats_timer, send_confirms] -- Mask).
+ State1 = ?MASKED_CALL(ensure_stats_timer, Mask, State),
+ State2 = ?MASKED_CALL(send_confirms, Mask, State1),
+ State2.
ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
- ChPid = self(),
State#ch{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer, ChPid, emit_stats)}.
+ StatsTimer, self(), emit_stats)}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -1318,16 +1323,11 @@ notify_queues(State = #ch{consumer_mapping = Consumers}) ->
State#ch{state = closing}}.
fold_per_queue(F, Acc0, UAQ) ->
- D = rabbit_misc:queue_fold(
- fun ({_DTag, _CTag, {QPid, MsgId}}, D) ->
- %% dict:append would avoid the lists:reverse in
- %% handle_message({recover, true}, ...). However, it
- %% is significantly slower when going beyond a few
- %% thousand elements.
- rabbit_misc:dict_cons(QPid, MsgId, D)
- end, dict:new(), UAQ),
- dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
- Acc0, D).
+ T = rabbit_misc:queue_fold(
+ fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
+ rabbit_misc:gb_trees_cons(QPid, MsgId, T)
+ end, gb_trees:empty(), UAQ),
+ rabbit_misc:gb_trees_fold(F, Acc0, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
@@ -1415,6 +1415,8 @@ send_nacks(MXs, State = #ch{tx_status = none}) ->
send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx_status = failed}).
+send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
+ State;
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
{MsgSeqNos, State1} =
lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 24468a01..8a08d4b6 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -251,6 +251,7 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
+ 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case
L ->
%% We randomly vary the position of queues in the list,
%% thus ensuring that each queue has an equal chance of
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 43962491..f423760a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -390,9 +390,9 @@ needs_confirming(_Delivery, _State) ->
immediately.
confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
- {MS1, CMs} =
+ {CMs, MS1} =
lists:foldl(
- fun (MsgId, {MSN, CMsN} = Acc) ->
+ fun (MsgId, {CMsN, MSN} = Acc) ->
%% We will never see 'discarded' here
case dict:find(MsgId, MSN) of
error ->
@@ -402,12 +402,12 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{ok, {published, ChPid}} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
- {dict:store(MsgId, {confirmed, ChPid}, MSN), CMsN};
+ {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
- {dict:erase(MsgId, MSN),
- gb_trees_cons(ChPid, MsgSeqNo, CMsN)};
+ {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
+ dict:erase(MsgId, MSN)};
{ok, {confirmed, _ChPid}} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
@@ -416,17 +416,10 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
%% channel. Nothing to do here.
Acc
end
- end, {MS, gb_trees:empty()}, MsgIds),
- [ok = rabbit_channel:confirm(ChPid, MsgSeqNos)
- || {ChPid, MsgSeqNos} <- gb_trees:to_list(CMs)],
+ end, {gb_trees:empty(), MS}, MsgIds),
+ rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
State #state { msg_id_status = MS1 }.
-gb_trees_cons(Key, Value, Tree) ->
- case gb_trees:lookup(Key, Tree) of
- {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
- none -> gb_trees:insert(Key, [Value], Tree)
- end.
-
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index f2dc97fd..b1cf45e7 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -44,7 +44,8 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([dict_cons/3, orddict_cons/3]).
+-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
+-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
@@ -171,6 +172,10 @@
-> boolean()).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
+-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
+-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).
+-spec(gb_trees_foreach/2 ::
+ (fun ((any(), any()) -> any()), gb_tree()) -> 'ok').
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
@@ -669,6 +674,23 @@ dict_cons(Key, Value, Dict) ->
orddict_cons(Key, Value, Dict) ->
orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
+gb_trees_cons(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
+ none -> gb_trees:insert(Key, [Value], Tree)
+ end.
+
+gb_trees_fold(Fun, Acc, Tree) ->
+ gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))).
+
+gb_trees_fold1(_Fun, Acc, none) ->
+ Acc;
+gb_trees_fold1(Fun, Acc, {Key, Val, It}) ->
+ gb_trees_fold1(Fun, Fun(Key, Val, Acc), gb_trees:next(It)).
+
+gb_trees_foreach(Fun, Tree) ->
+ gb_trees_fold(fun (Key, Val, Acc) -> Fun(Key, Val), Acc end, ok, Tree).
+
%% Separate flags and options from arguments.
%% get_options([{flag, "-q"}, {option, "-p", "/"}],
%% ["set_permissions","-p","/","guest",
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a9b78be9..347749e7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -322,7 +322,7 @@
q3 :: queue(),
q4 :: queue(),
next_seq_id :: seq_id(),
- pending_ack :: dict(),
+ pending_ack :: gb_tree(),
ram_ack_index :: gb_tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
@@ -534,7 +534,11 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
unconfirmed = UC1 }))}.
drain_confirmed(State = #vqstate { confirmed = C }) ->
- {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
+ case gb_sets:is_empty(C) of
+ true -> {[], State}; %% common case
+ false -> {gb_sets:to_list(C), State #vqstate {
+ confirmed = gb_sets:new() }}
+ end.
dropwhile(Pred, State) ->
case queue_out(State) of
@@ -727,7 +731,7 @@ status(#vqstate {
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
{len , Len},
- {pending_acks , dict:size(PA)},
+ {pending_acks , gb_trees:size(PA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
{ram_ack_count , gb_trees:size(RAI)},
@@ -864,7 +868,7 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> case dict:is_key(SeqId, PA) of
+ false -> case gb_trees:is_defined(SeqId, PA) of
false -> {?QUEUE:in_r(
m(#msg_status {
seq_id = SeqId,
@@ -943,7 +947,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q3 = ?QUEUE:new(),
q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
- pending_ack = dict:new(),
+ pending_ack = gb_trees:empty(),
ram_ack_index = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
@@ -1201,15 +1205,14 @@ record_pending_ack(#msg_status { seq_id = SeqId,
true -> {m(trim_msg_status(MsgStatus)), RAI};
false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
end,
- PA1 = dict:store(SeqId, AckEntry, PA),
- State #vqstate { pending_ack = PA1,
+ State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
ram_ack_index = RAI1,
ack_in_counter = AckInCount + 1}.
remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
- {dict:fetch(SeqId, PA),
- State #vqstate { pending_ack = dict:erase(SeqId, PA),
+ {gb_trees:get(SeqId, PA),
+ State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
purge_pending_ack(KeepPersistent,
@@ -1217,10 +1220,10 @@ purge_pending_ack(KeepPersistent,
index_state = IndexState,
msg_store_clients = MSCState }) ->
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
- dict:fold(fun (_SeqId, MsgStatus, Acc) ->
- accumulate_ack(MsgStatus, Acc)
- end, accumulate_ack_init(), PA),
- State1 = State #vqstate { pending_ack = dict:new(),
+ rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) ->
+ accumulate_ack(MsgStatus, Acc)
+ end, accumulate_ack_init(), PA),
+ State1 = State #vqstate { pending_ack = gb_trees:empty(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
@@ -1503,10 +1506,10 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
false ->
{SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
- dict:fetch(SeqId, PA),
+ gb_trees:get(SeqId, PA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
- PA1 = dict:store(SeqId, m(trim_msg_status(MsgStatus1)), PA),
+ PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),
limit_ram_acks(Quota - 1,
State1 #vqstate { pending_ack = PA1,
ram_ack_index = RAI1 })