summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-07 12:10:46 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-07 12:10:46 +0000
commite85848a7fffba26d3859a54d81a8b84ee9533f63 (patch)
tree5972761f642a206516faf70e9e254f3fb8c7def4
parent1d15ffde6cf646d05861d6cd71d4c4519f36c798 (diff)
parent716d91ed4d1935c966126a7636195c2ab57770eb (diff)
downloadrabbitmq-server-e85848a7fffba26d3859a54d81a8b84ee9533f63.tar.gz
Merge default
-rw-r--r--src/delegate.erl12
-rw-r--r--src/gen_server2.erl92
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl62
-rw-r--r--src/rabbit_channel.erl91
-rw-r--r--src/rabbit_control_main.erl2
-rw-r--r--src/rabbit_mirror_queue_sync.erl3
7 files changed, 124 insertions, 140 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 9222c34c..96b8ba31 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -62,6 +62,13 @@ invoke(Pid, Fun) when is_pid(Pid) ->
erlang:raise(Class, Reason, StackTrace)
end;
+invoke([], _Fun) -> %% optimisation
+ {[], []};
+invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
+ case safe_invoke(Pid, Fun) of
+ {ok, _, Result} -> {[{Pid, Result}], []};
+ {error, _, Error} -> {[], [{Pid, Error}]}
+ end;
invoke(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
%% The use of multi_call is only safe because the timeout is
@@ -90,6 +97,11 @@ invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
invoke_no_result(Pid, Fun) when is_pid(Pid) ->
invoke_no_result([Pid], Fun);
+invoke_no_result([], _Fun) -> %% optimisation
+ ok;
+invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
+ safe_invoke(Pid, Fun), %% must not die
+ ok;
invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 78bbbe06..dc55948b 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -196,8 +196,7 @@
%% State record
-record(gs2_state, {parent, name, state, mod, time,
- timeout_state, queue, debug, prioritise_call,
- prioritise_cast, prioritise_info}).
+ timeout_state, queue, debug, prioritisers}).
-ifdef(use_specs).
@@ -638,17 +637,17 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
{backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
in({'$gen_cast', Msg} = Input,
- GS2State = #gs2_state { prioritise_cast = PC }) ->
- in(Input, PC(Msg, GS2State), GS2State);
+ GS2State = #gs2_state { prioritisers = {_, F, _} }) ->
+ in(Input, F(Msg, GS2State), GS2State);
in({'$gen_call', From, Msg} = Input,
- GS2State = #gs2_state { prioritise_call = PC }) ->
- in(Input, PC(Msg, From, GS2State), GS2State);
+ GS2State = #gs2_state { prioritisers = {F, _, _} }) ->
+ in(Input, F(Msg, From, GS2State), GS2State);
in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) ->
in(Input, infinity, GS2State);
in({system, _From, _Req} = Input, GS2State) ->
in(Input, infinity, GS2State);
-in(Input, GS2State = #gs2_state { prioritise_info = PI }) ->
- in(Input, PI(Input, GS2State), GS2State).
+in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) ->
+ in(Input, F(Input, GS2State), GS2State).
in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }.
@@ -864,13 +863,19 @@ dispatch(Info, Mod, State) ->
common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
reply(From, Reply),
[];
-common_reply(Name, From, Reply, NState, Debug) ->
- reply(Name, From, Reply, NState, Debug).
+common_reply(Name, {To, Tag} = From, Reply, NState, Debug) ->
+ reply(From, Reply),
+ sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}).
+
+common_noreply(_Name, _NState, [] = _Debug) ->
+ [];
+common_noreply(Name, NState, Debug) ->
+ sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}).
-common_debug([] = _Debug, _Func, _Info, _Event) ->
+common_become(_Name, _Mod, _NState, [] = _Debug) ->
[];
-common_debug(Debug, Func, Info, Event) ->
- sys:handle_debug(Debug, Func, Info, Event).
+common_become(Name, Mod, NState, Debug) ->
+ sys:handle_debug(Debug, fun print_event/3, Name, {become, Mod, NState}).
handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
state = State,
@@ -887,23 +892,11 @@ handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
loop(GS2State #gs2_state { state = NState,
time = Time1,
debug = Debug1});
- {noreply, NState} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state {state = NState,
- time = infinity,
- debug = Debug1});
- {noreply, NState, Time1} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state {state = NState,
- time = Time1,
- debug = Debug1});
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Msg,
GS2State #gs2_state { state = NState })),
- reply(Name, From, Reply, NState, Debug),
+ common_reply(Name, From, Reply, NState, Debug),
exit(R);
Other ->
handle_common_reply(Other, Msg, GS2State)
@@ -916,28 +909,24 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
debug = Debug}) ->
case Reply of
{noreply, NState} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state { state = NState,
- time = infinity,
- debug = Debug1 });
+ Debug1 = common_noreply(Name, NState, Debug),
+ loop(GS2State #gs2_state {state = NState,
+ time = infinity,
+ debug = Debug1});
{noreply, NState, Time1} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state { state = NState,
- time = Time1,
- debug = Debug1 });
+ Debug1 = common_noreply(Name, NState, Debug),
+ loop(GS2State #gs2_state {state = NState,
+ time = Time1,
+ debug = Debug1});
{become, Mod, NState} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {become, Mod, NState}),
+ Debug1 = common_become(Name, Mod, NState, Debug),
loop(find_prioritisers(
GS2State #gs2_state { mod = Mod,
state = NState,
time = infinity,
debug = Debug1 }));
{become, Mod, NState, Time1} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {become, Mod, NState}),
+ Debug1 = common_become(Name, Mod, NState, Debug),
loop(find_prioritisers(
GS2State #gs2_state { mod = Mod,
state = NState,
@@ -957,12 +946,6 @@ handle_common_termination(Reply, Msg, GS2State) ->
terminate({bad_return_value, Reply}, Msg, GS2State)
end.
-reply(Name, {To, Tag}, Reply, State, Debug) ->
- reply({To, Tag}, Reply),
- sys:handle_debug(
- Debug, fun print_event/3, Name, {out, Reply, To, State}).
-
-
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
@@ -1165,16 +1148,13 @@ whereis_name(Name) ->
end.
find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
- PrioriCall = function_exported_or_default(
- Mod, 'prioritise_call', 3,
- fun (_Msg, _From, _State) -> 0 end),
- PrioriCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
- fun (_Msg, _State) -> 0 end),
- PrioriInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
- fun (_Msg, _State) -> 0 end),
- GS2State #gs2_state { prioritise_call = PrioriCall,
- prioritise_cast = PrioriCast,
- prioritise_info = PrioriInfo }.
+ PCall = function_exported_or_default(Mod, 'prioritise_call', 3,
+ fun (_Msg, _From, _State) -> 0 end),
+ PCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
+ fun (_Msg, _State) -> 0 end),
+ PInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
+ fun (_Msg, _State) -> 0 end),
+ GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }.
function_exported_or_default(Mod, Fun, Arity, Default) ->
case erlang:function_exported(Mod, Fun, Arity) of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2b5da0cb..5ce0c623 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -682,6 +682,8 @@ deliver(Qs, Delivery, _Flow) ->
R -> {routed, [QPid || {QPid, ok} <- R]}
end.
+qpids([]) -> {[], []}; %% optimisation
+qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt
qpids(Qs) ->
{MPids, SPids} = lists:foldl(fun (#amqqueue{pid = QPid, slave_pids = SPids},
{MPidAcc, SPidAcc}) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 854ba640..64d55684 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -122,27 +122,7 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
- State = #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = undefined,
- backing_queue_state = undefined,
- active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = undefined,
- expiry_timer_ref = undefined,
- ttl = undefined,
- senders = pmon:new(),
- dlx = undefined,
- dlx_routing_key = undefined,
- publish_seqno = 1,
- unconfirmed = dtree:empty(),
- delayed_stop = undefined,
- queue_monitors = pmon:new(),
- msg_id_to_channel = gb_trees:empty(),
- status = running},
- {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
+ {ok, init_state(Q#amqqueue{pid = self()}), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
@@ -151,28 +131,29 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
none -> ok;
_ -> erlang:monitor(process, Owner)
end,
+ State = init_state(Q),
+ State1 = State#q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ senders = Senders,
+ msg_id_to_channel = MTC},
+ State2 = process_args(State1),
+ lists:foldl(fun (Delivery, StateN) ->
+ deliver_or_enqueue(Delivery, true, StateN)
+ end, State2, Deliveries).
+
+init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = BQ,
- backing_queue_state = BQS,
active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = RateTRef,
- expiry_timer_ref = undefined,
- ttl = undefined,
- senders = Senders,
+ senders = pmon:new(),
publish_seqno = 1,
unconfirmed = dtree:empty(),
- delayed_stop = undefined,
queue_monitors = pmon:new(),
- msg_id_to_channel = MTC,
+ msg_id_to_channel = gb_trees:empty(),
status = running},
- State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)),
- lists:foldl(fun (Delivery, StateN) ->
- deliver_or_enqueue(Delivery, true, StateN)
- end, State1, Deliveries).
+ rabbit_event:init_stats_timer(State, #q.stats_timer).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
@@ -520,11 +501,14 @@ send_or_record_confirm(#delivery{sender = SenderPid,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
-discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
- State) ->
- %% fake an 'eventual' confirm from BQ; noop if not needed
+discard(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message{id = MsgId}}, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- confirm_messages([MsgId], State),
+ case MsgSeqNo of
+ undefined -> State;
+ _ -> confirm_messages([MsgId], State)
+ end,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1af60de8..2686d76d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -432,15 +432,13 @@ check_resource_access(User, Resource, Perm) ->
undefined -> [];
Other -> Other
end,
- CacheTail =
- case lists:member(V, Cache) of
- true -> lists:delete(V, Cache);
- false -> ok = rabbit_access_control:check_resource_access(
- User, Resource, Perm),
- lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1)
- end,
- put(permission_cache, [V | CacheTail]),
- ok.
+ case lists:member(V, Cache) of
+ true -> ok;
+ false -> ok = rabbit_access_control:check_resource_access(
+ User, Resource, Perm),
+ CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
+ put(permission_cache, [V | CacheTail])
+ end.
clear_permission_cache() ->
erase(permission_cache),
@@ -802,14 +800,12 @@ handle_method(#'basic.recover_async'{requeue = true},
limiter = Limiter}) ->
OkFun = fun () -> ok end,
UAMQL = queue:to_list(UAMQ),
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_misc:with_exit_handler(
- OkFun, fun () ->
- rabbit_amqqueue:requeue(
- QPid, MsgIds, self())
- end)
- end, ok, UAMQL),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_misc:with_exit_handler(
+ OkFun,
+ fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
+ end, UAMQL),
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
@@ -1217,10 +1213,10 @@ reject(DeliveryTag, Requeue, Multiple,
end}.
reject(Requeue, Acked, Limiter) ->
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
- end, ok, Acked),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, Acked),
ok = notify_limiter(Limiter, Acked).
record_sent(ConsumerTag, AckRequired,
@@ -1249,19 +1245,25 @@ record_sent(ConsumerTag, AckRequired,
collect_acks(Q, 0, true) ->
{queue:to_list(Q), queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
- collect_acks([], queue:new(), Q, DeliveryTag, Multiple).
+ collect_acks([], [], Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case queue:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
- {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)};
+ {[UnackedMsg | ToAcc],
+ case PrefixAcc of
+ [] -> QTail;
+ _ -> queue:join(
+ queue:from_list(lists:reverse(PrefixAcc)),
+ QTail)
+ end};
Multiple ->
collect_acks([UnackedMsg | ToAcc], PrefixAcc,
QTail, DeliveryTag, Multiple);
true ->
- collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc),
+ collect_acks(ToAcc, [UnackedMsg | PrefixAcc],
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
@@ -1269,17 +1271,16 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
end.
ack(Acked, State = #ch{queue_names = QNames}) ->
- Incs = fold_per_queue(
- fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- case dict:find(QPid, QNames) of
- {ok, QName} -> Count = length(MsgIds),
- [{queue_stats, QName, Count} | L];
- error -> L
- end
- end, [], Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- ?INCR_STATS(Incs, ack, State).
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ ?INCR_STATS(case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ [{queue_stats, QName, Count}];
+ error -> []
+ end, ack, State)
+ end, Acked),
+ ok = notify_limiter(State#ch.limiter, Acked).
new_tx() -> #tx{msgs = queue:new(), acks = [], nacks = []}.
@@ -1291,15 +1292,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
{rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
-fold_per_queue(_F, Acc, []) ->
- Acc;
-fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
- F(QPid, [MsgId], Acc);
-fold_per_queue(F, Acc, UAL) ->
+foreach_per_queue(_F, []) ->
+ ok;
+foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
+ F(QPid, [MsgId]);
+foreach_per_queue(F, UAL) ->
T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons(QPid, MsgId, T)
end, gb_trees:empty(), UAL),
- rabbit_misc:gb_trees_fold(F, Acc, T).
+ rabbit_misc:gb_trees_foreach(F, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
@@ -1322,13 +1323,19 @@ notify_limiter(Limiter, Acked) ->
case rabbit_limiter:is_enabled(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
+ ({_, _, _}, Acc) -> Acc + 1
end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(Limiter, Count)
end
end.
+deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
+ msg_seq_no = undefined,
+ mandatory = false},
+ []}, State) -> %% optimisation
+ ?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
+ State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 7f43c228..9f48877c 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -170,7 +170,7 @@ start() ->
{'EXIT', {badarg, _}} ->
print_error("invalid parameter: ~p", [Args]),
usage();
- {error, {Problem, Reason}} when is_atom(Problem); is_binary(Reason) ->
+ {error, {Problem, Reason}} when is_atom(Problem), is_binary(Reason) ->
%% We handle this common case specially to avoid ~p since
%% that has i18n issues
print_error("~s: ~s", [Problem, Reason]),
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index e3f254e4..f2ab67cd 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -157,8 +157,7 @@ syncer(Ref, Log, MPid, SPids) ->
end] of
[] -> Log("all slaves already synced", []);
SPids1 -> MPid ! {ready, self()},
- Log("~p to sync", [[rabbit_misc:pid_to_string(SPid) ||
- SPid <- SPids1]]),
+ Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]),
syncer_loop(Ref, MPid, SPids1)
end.