summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-06-07 15:27:59 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-06-07 15:27:59 +0100
commitded733191bfdc77f2438f68f04c3f342773d97f6 (patch)
treedbe379dd95542ed323f6228b4ad5a0a6a8904148
parent335fc2cc70c6131d4f270b0c39c906a7c81efcfe (diff)
parent87c44c6cf1d08d5e8e276b10dc2224ed41d87869 (diff)
downloadrabbitmq-server-ded733191bfdc77f2438f68f04c3f342773d97f6.tar.gz
Merged in default
-rw-r--r--include/rabbit_backing_queue_spec.hrl6
-rw-r--r--include/rabbit_exchange_type_spec.hrl6
-rw-r--r--src/delegate.erl10
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/rabbit_alarm.erl11
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_channel.erl115
-rw-r--r--src/rabbit_control.erl4
-rw-r--r--src/rabbit_exchange.erl13
-rw-r--r--src/rabbit_exchange_type.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl6
-rw-r--r--src/rabbit_exchange_type_fanout.erl6
-rw-r--r--src/rabbit_exchange_type_headers.erl6
-rw-r--r--src/rabbit_exchange_type_registry.erl6
-rw-r--r--src/rabbit_exchange_type_topic.erl6
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_multi.erl4
-rw-r--r--src/rabbit_net.erl2
-rw-r--r--src/rabbit_persister.erl2
-rw-r--r--src/rabbit_reader_queue_collector.erl4
-rw-r--r--src/rabbit_router.erl11
-rw-r--r--src/rabbit_tests.erl148
-rw-r--r--src/supervisor2.erl8
25 files changed, 302 insertions, 103 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 1b536dfa..55cd126e 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index 9864f1eb..38057beb 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/delegate.erl b/src/delegate.erl
index 98353453..8af28127 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -45,8 +45,8 @@
-ifdef(use_specs).
-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}).
--spec(invoke_no_result/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok').
--spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A).
+-spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
-spec(process_count/0 :: () -> non_neg_integer()).
@@ -73,7 +73,7 @@ invoke(Pid, Fun) when is_pid(Pid) ->
invoke(Pids, Fun) when is_list(Pids) ->
lists:foldl(
- fun({Status, Result, Pid}, {Good, Bad}) ->
+ fun ({Status, Result, Pid}, {Good, Bad}) ->
case Status of
ok -> {[{Pid, Result}|Good], Bad};
error -> {Good, [{Pid, Result}|Bad]}
@@ -136,10 +136,10 @@ delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
%% block forever.
[gen_server2:cast(
local_server(Node),
- {thunk, fun() ->
+ {thunk, fun () ->
Self ! {result,
DelegateFun(
- Node, fun() -> safe_invoke(Pids, Fun) end)}
+ Node, fun () -> safe_invoke(Pids, Fun) end)}
end}) || {Node, Pids} <- NodePids],
[receive {result, Result} -> Result end || _ <- NodePids].
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 5b899cdb..547f0a42 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -639,7 +639,7 @@ do_multi_call(Nodes, Name, Req, Timeout) ->
Caller = self(),
Receiver =
spawn(
- fun() ->
+ fun () ->
%% Middleman process. Should be unsensitive to regular
%% exit signals. The sychronization is needed in case
%% the receiver would exit before the caller started
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 7e96d9a3..53c713e6 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -47,7 +47,7 @@
-type(mfa_tuple() :: {atom(), atom(), list()}).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
+-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
-endif.
@@ -67,9 +67,9 @@ stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
register(Pid, HighMemMFA) ->
- ok = gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA},
- infinity).
+ gen_event:call(alarm_handler, ?MODULE,
+ {register, Pid, HighMemMFA},
+ infinity).
%%----------------------------------------------------------------------------
@@ -84,7 +84,8 @@ handle_call({register, Pid, {M, F, A} = HighMemMFA},
false -> ok
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
- {ok, ok, State#alarms{alertees = NewAlertees}};
+ {ok, State#alarms.vm_memory_high_watermark,
+ State#alarms{alertees = NewAlertees}};
handle_call(_Request, State) ->
{ok, not_understood, State}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 36dc1b90..76488255 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -392,15 +392,16 @@ safe_delegate_call_ok(H, F, Pids) ->
end.
delegate_call(Pid, Msg, Timeout) ->
- delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end).
+ delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_pcall(Pid, Pri, Msg, Timeout) ->
- delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
+ delegate:invoke(Pid,
+ fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
delegate_cast(Pid, Msg) ->
- delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end).
+ delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
delegate_pcast(Pid, Pri, Msg) ->
delegate:invoke_no_result(Pid,
- fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
+ fun (P) -> gen_server2:pcast(P, Pri, Msg) end).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2dba00ad..432d6290 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e90cddb9..fcec3352 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -39,6 +39,8 @@
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
+-export([flow_timeout/2]).
+
-export([init/1, terminate/2, code_change/3,
handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
@@ -46,9 +48,12 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid}).
+ consumer_mapping, blocking, queue_collector_pid, flow}).
+
+-record(flow, {server, client, pending}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
+-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
-define(INFO_KEYS,
[pid,
@@ -66,6 +71,8 @@
-ifdef(use_specs).
+-type(ref() :: any()).
+
-spec(start_link/6 ::
(channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
@@ -75,6 +82,7 @@
-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
+-spec(flow_timeout/2 :: (pid(), ref()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
@@ -113,6 +121,9 @@ conserve_memory(Pid, Conserve) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
+flow_timeout(Pid, Ref) ->
+ gen_server2:pcast(Pid, 7, {flow_timeout, Ref}).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -154,7 +165,9 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
- queue_collector_pid = CollectorPid},
+ queue_collector_pid = CollectorPid,
+ flow = #flow{server = true, client = true,
+ pending = none}},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -181,11 +194,9 @@ handle_cast({method, Method, Content}, State) ->
{stop, normal, State#ch{state = terminating}}
catch
exit:Reason = #amqp_error{} ->
- ok = rollback_and_notify(State),
MethodName = rabbit_misc:method_record_type(Method),
- State#ch.reader_pid ! {channel_exit, State#ch.channel,
- Reason#amqp_error{method = MethodName}},
- {stop, normal, State#ch{state = terminating}};
+ {stop, normal, terminating(Reason#amqp_error{method = MethodName},
+ State)};
exit:normal ->
{stop, normal, State};
_:Reason ->
@@ -209,11 +220,25 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast({conserve_memory, Conserve}, State) ->
- ok = clear_permission_cache(),
- ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
- noreply(State).
+handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
+ noreply(State);
+handle_cast({conserve_memory, false}, State = #ch{state = starting}) ->
+ ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}),
+ noreply(State#ch{state = running});
+handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) ->
+ flow_control(not Conserve, State);
+handle_cast({conserve_memory, _Conserve}, State) ->
+ noreply(State);
+
+handle_cast({flow_timeout, Ref},
+ State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
+ {stop, normal, terminating(
+ rabbit_misc:amqp_error(
+ precondition_failed,
+ "timeout waiting for channel.flow_ok{active=~w}",
+ [not Flow], none), State)};
+handle_cast({flow_timeout, _Ref}, State) ->
+ {noreply, State}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -254,6 +279,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
+terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
+ ok = rollback_and_notify(State),
+ Reader ! {channel_exit, Channel, Reason},
+ State#ch{state = terminating}.
+
return_queue_declare_ok(State, NoWait, Q) ->
NewState = State#ch{most_recently_declared_queue =
(Q#amqqueue.name)#resource.name},
@@ -369,8 +399,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- {reply, #'channel.open_ok'{}, State#ch{state = running}};
+ case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of
+ true -> {noreply, State};
+ false -> {reply, #'channel.open_ok'{}, State#ch{state = running}}
+ end;
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -387,13 +419,17 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{exchange = ExchangeNameBin,
+handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "basic.publish received after channel.flow_ok{active=false}", []);
+handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
- mandatory = Mandatory,
- immediate = Immediate},
- Content, State = #ch{ virtual_host = VHostPath,
- transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
+ mandatory = Mandatory,
+ immediate = Immediate},
+ Content, State = #ch{virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ writer_pid = WriterPid}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -824,7 +860,6 @@ handle_method(#'channel.flow'{active = true}, _,
end,
{reply, #'channel.flow_ok'{active = true},
State#ch{limiter_pid = LimiterPid1}};
-
handle_method(#'channel.flow'{active = false}, _,
State = #ch{limiter_pid = LimiterPid,
consumer_mapping = Consumers}) ->
@@ -842,11 +877,25 @@ handle_method(#'channel.flow'{active = false}, _,
blocking = dict:from_list(Queues)}}
end;
-handle_method(#'channel.flow_ok'{active = _}, _, State) ->
- %% TODO: We may want to correlate this to channel.flow messages we
- %% have sent, and complain if we get an unsolicited
- %% channel.flow_ok, or the client refuses our flow request.
- {noreply, State};
+handle_method(#'channel.flow_ok'{active = Active}, _,
+ State = #ch{flow = #flow{server = Active, client = Flow,
+ pending = {_Ref, TRef}} = F})
+ when Flow =:= not Active ->
+ {ok, cancel} = timer:cancel(TRef),
+ {noreply, State#ch{flow = F#flow{client = Active, pending = none}}};
+handle_method(#'channel.flow_ok'{active = Active}, _,
+ State = #ch{flow = #flow{server = Flow, client = Flow,
+ pending = {_Ref, TRef}}})
+ when Flow =:= not Active ->
+ {ok, cancel} = timer:cancel(TRef),
+ {noreply, issue_flow(Flow, State)};
+handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unsolicited channel.flow_ok", []);
+handle_method(#'channel.flow_ok'{active = Active}, _, _State) ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "received channel.flow_ok{active=~w} has incorrect polarity", [Active]);
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -854,6 +903,22 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}})
+ when Flow =:= not Active ->
+ ok = clear_permission_cache(),
+ noreply(issue_flow(Active, State));
+flow_control(Active, State = #ch{flow = F}) ->
+ noreply(State#ch{flow = F#flow{server = Active}}).
+
+issue_flow(Active, State) ->
+ ok = rabbit_writer:send_command(
+ State#ch.writer_pid, #'channel.flow'{active = Active}),
+ Ref = make_ref(),
+ {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
+ [self(), Ref]),
+ State#ch{flow = #flow{server = Active, client = not Active,
+ pending = {Ref, TRef}}}.
+
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index d1834b3b..323d4d2f 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -59,8 +59,8 @@ start() ->
parse_args(FullCommand, #params{quiet = false,
node = rabbit_misc:makenode(NodeStr)}),
Inform = case Quiet of
- true -> fun(_Format, _Args1) -> ok end;
- false -> fun(Format, Args1) ->
+ true -> fun (_Format, _Args1) -> ok end;
+ false -> fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
end
end,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index b3b9e1b4..d237134f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -82,8 +82,9 @@
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
--spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
--spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
+-spec(delete_queue_bindings/1 :: (queue_name()) -> fun (() -> none())).
+-spec(delete_transient_queue_bindings/1 :: (queue_name()) ->
+ fun (() -> none())).
-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -99,12 +100,12 @@
recover() ->
Exs = rabbit_misc:table_fold(
- fun(Exchange, Acc) ->
+ fun (Exchange, Acc) ->
ok = mnesia:write(rabbit_exchange, Exchange, write),
[Exchange | Acc]
end, [], rabbit_durable_exchange),
Bs = rabbit_misc:table_fold(
- fun(Route = #route{binding = B}, Acc) ->
+ fun (Route = #route{binding = B}, Acc) ->
{_, ReverseRoute} = route_with_reverse(Route),
ok = mnesia:write(rabbit_route,
Route, write),
@@ -351,7 +352,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun() -> case mnesia:read({rabbit_exchange, Exchange}) of
+ fun () -> case mnesia:read({rabbit_exchange, Exchange}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
@@ -359,7 +360,7 @@ call_with_exchange(Exchange, Fun) ->
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun() -> case {mnesia:read({rabbit_exchange, Exchange}),
+ fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
mnesia:read({rabbit_queue, Queue})} of
{[X], [Q]} -> Fun(X, Q);
{[ ], [_]} -> {error, exchange_not_found};
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index a8c071e6..699250f7 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 9b71e0e1..c3fb2588 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 311654ab..62c862a5 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 285dab1a..0991bf0d 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl
index 175d15ad..33ea0e92 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_exchange_type_registry.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 8a3dceea..e42c4518 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 9a911ab1..35739dcb 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -242,12 +242,12 @@ report_cover([Root]) when is_atom(Root) ->
report_cover(Root) ->
Dir = filename:join(Root, "cover"),
ok = filelib:ensure_dir(filename:join(Dir,"junk")),
- lists:foreach(fun(F) -> file:delete(F) end,
+ lists:foreach(fun (F) -> file:delete(F) end,
filelib:wildcard(filename:join(Dir, "*.html"))),
{ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]),
{CT, NCT} =
lists:foldl(
- fun(M,{CovTot, NotCovTot}) ->
+ fun (M,{CovTot, NotCovTot}) ->
{ok, {M, {Cov, NotCov}}} = cover:analyze(M, module),
ok = report_coverage_percentage(SummaryFile,
Cov, NotCov, M),
@@ -367,7 +367,7 @@ upmap(F, L) ->
Parent = self(),
Ref = make_ref(),
[receive {Ref, Result} -> Result end
- || _ <- [spawn(fun() -> Parent ! {Ref, F(X)} end) || X <- L]].
+ || _ <- [spawn(fun () -> Parent ! {Ref, F(X)} end) || X <- L]].
map_in_order(F, L) ->
lists:reverse(
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 55a6761d..a0b7aa4e 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -346,7 +346,7 @@ table_has_copy_type(TabDef, DiscType) ->
create_local_table_copies(Type) ->
lists:foreach(
- fun({Tab, TabDef}) ->
+ fun ({Tab, TabDef}) ->
HasDiscCopies = table_has_copy_type(TabDef, disc_copies),
HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies),
LocalTab = proplists:get_bool(local_content, TabDef),
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 336f74bf..5db1d77a 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -111,7 +111,7 @@ action(start_all, [NodeCount], RpcTimeout) ->
action(status, [], RpcTimeout) ->
io:format("Status of all running nodes...~n", []),
call_all_nodes(
- fun({Node, Pid}) ->
+ fun ({Node, Pid}) ->
RabbitRunning =
case is_rabbit_running(Node, RpcTimeout) of
false -> not_running;
@@ -123,7 +123,7 @@ action(status, [], RpcTimeout) ->
action(stop_all, [], RpcTimeout) ->
io:format("Stopping all nodes...~n", []),
- call_all_nodes(fun({Node, Pid}) ->
+ call_all_nodes(fun ({Node, Pid}) ->
io:format("Stopping node ~p~n", [Node]),
rpc:call(Node, rabbit, stop_and_halt, []),
case kill_wait(Pid, RpcTimeout, false) of
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 406977b4..975954fc 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -66,7 +66,7 @@ async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) ->
Pid = self(),
Ref = make_ref(),
- spawn(fun() -> Pid ! {inet_async, Sock, Ref,
+ spawn(fun () -> Pid ! {inet_async, Sock, Ref,
ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
end),
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 3cd42e47..8d3c2dc0 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -236,7 +236,7 @@ log_work(CreateWorkUnit, MessageList,
snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
Unit = CreateWorkUnit(
rabbit_misc:map_in_order(
- fun(M = {publish, Message, QK = {_QName, PKey}}) ->
+ fun (M = {publish, Message, QK = {_QName, PKey}}) ->
case ets:lookup(Messages, PKey) of
[_] -> {tied, QK};
[] -> ets:insert(Messages, {PKey, Message}),
diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_reader_queue_collector.erl
index 841549e9..8d4e8fdb 100644
--- a/src/rabbit_reader_queue_collector.erl
+++ b/src/rabbit_reader_queue_collector.erl
@@ -82,8 +82,8 @@ handle_call({register_exclusive_queue, Q}, _From,
handle_call(delete_all, _From,
State = #state{exclusive_queues = ExclusiveQueues}) ->
[rabbit_misc:with_exit_handler(
- fun() -> ok end,
- fun() ->
+ fun () -> ok end,
+ fun () ->
erlang:demonitor(MonitorRef),
rabbit_amqqueue:delete(Q, false, false)
end)
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 03979d6c..5cd15a94 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -57,14 +57,17 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
%% is preserved. This scales much better than the non-immediate
%% case below.
delegate:invoke_no_result(
- QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
+ QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};
deliver(QPids, Delivery) ->
{Success, _} =
delegate:invoke(QPids,
- fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
- {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success),
+ fun (Pid) ->
+ rabbit_amqqueue:deliver(Pid, Delivery)
+ end),
+ {Routed, Handled} =
+ lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
{Routed, Handled}).
@@ -88,7 +91,7 @@ match_routing_key(Name, RoutingKey) ->
lookup_qpids(Queues) ->
sets:fold(
- fun(Key, Acc) ->
+ fun (Key, Acc) ->
case mnesia:dirty_read({rabbit_queue, Key}) of
[#amqqueue{pid = QPid}] -> [QPid | Acc];
[] -> Acc
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index fa0ce2db..ecc2613d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,6 +41,7 @@
-import(lists).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
test_content_prop_roundtrip(Datum, Binary) ->
@@ -58,6 +59,7 @@ all_tests() ->
passed = test_log_management(),
passed = test_app_management(),
passed = test_log_management_during_startup(),
+ passed = test_memory_pressure(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
@@ -822,7 +824,7 @@ test_hooks() ->
{[arg1, arg2], 1, 3} = get(arg_hook_test_fired),
%% Invoking Pids
- Remote = fun() ->
+ Remote = fun () ->
receive
{rabbitmq_hook,[remote_test,test,[],Target]} ->
Target ! invoked
@@ -839,11 +841,137 @@ test_hooks() ->
end,
passed.
+test_memory_pressure_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ ok = case Method of
+ #'channel.flow'{} -> ok;
+ #'basic.qos_ok'{} -> ok;
+ #'channel.open_ok'{} -> ok
+ end,
+ Pid ! Method,
+ test_memory_pressure_receiver(Pid);
+ sync ->
+ Pid ! sync,
+ test_memory_pressure_receiver(Pid)
+ end.
+
+test_memory_pressure_receive_flow(Active) ->
+ receive #'channel.flow'{active = Active} -> ok
+ after 1000 -> throw(failed_to_receive_channel_flow)
+ end,
+ receive #'channel.flow'{} ->
+ throw(pipelining_sync_commands_detected)
+ after 0 ->
+ ok
+ end.
+
+test_memory_pressure_sync(Ch, Writer) ->
+ ok = rabbit_channel:do(Ch, #'basic.qos'{}),
+ Writer ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'basic.qos_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_basic_qos_ok)
+ end.
+
+test_memory_pressure_spawn() ->
+ Me = self(),
+ Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
+ ok = rabbit_channel:do(Ch, #'channel.open'{}),
+ MRef = erlang:monitor(process, Ch),
+ receive #'channel.open_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_channel_open_ok)
+ end,
+ {Writer, Ch, MRef}.
+
+expect_normal_channel_termination(MRef, Ch) ->
+ receive {'DOWN', MRef, process, Ch, normal} -> ok
+ after 1000 -> throw(channel_failed_to_exit)
+ end.
+
+test_memory_pressure() ->
+ {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(),
+ [ok = rabbit_channel:conserve_memory(Ch0, Conserve) ||
+ Conserve <- [false, false, true, false, true, true, false]],
+ ok = test_memory_pressure_sync(Ch0, Writer0),
+ receive {'DOWN', MRef0, process, Ch0, Info0} ->
+ throw({channel_died_early, Info0})
+ after 0 -> ok
+ end,
+
+ %% we should have just 1 active=false waiting for us
+ ok = test_memory_pressure_receive_flow(false),
+
+ %% if we reply with flow_ok, we should immediately get an
+ %% active=true back
+ ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}),
+ ok = test_memory_pressure_receive_flow(true),
+
+ %% if we publish at this point, the channel should die
+ Content = #content{class_id = element(1, rabbit_framing:method_id(
+ 'basic.publish')),
+ properties = none,
+ properties_bin = <<>>,
+ payload_fragments_rev = []},
+ ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
+ expect_normal_channel_termination(MRef0, Ch0),
+
+ {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(),
+ ok = rabbit_channel:conserve_memory(Ch1, true),
+ ok = test_memory_pressure_receive_flow(false),
+ ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
+ ok = test_memory_pressure_sync(Ch1, Writer1),
+ ok = rabbit_channel:conserve_memory(Ch1, false),
+ ok = test_memory_pressure_receive_flow(true),
+ %% send back the wrong flow_ok. Channel should die.
+ ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
+ expect_normal_channel_termination(MRef1, Ch1),
+
+ {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(),
+ %% just out of the blue, send a flow_ok. Life should end.
+ ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}),
+ expect_normal_channel_termination(MRef2, Ch2),
+
+ {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(),
+ ok = rabbit_channel:conserve_memory(Ch3, true),
+ receive {'DOWN', MRef3, process, Ch3, _} ->
+ ok
+ after 12000 ->
+ throw(channel_failed_to_exit)
+ end,
+
+ alarm_handler:set_alarm({vm_memory_high_watermark, []}),
+ Me = self(),
+ Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
+ Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>,
+ self()),
+ ok = rabbit_channel:do(Ch4, #'channel.open'{}),
+ MRef4 = erlang:monitor(process, Ch4),
+ Writer4 ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok)
+ after 0 -> ok
+ end,
+ alarm_handler:clear_alarm(vm_memory_high_watermark),
+ Writer4 ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'channel.open_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_channel_open_ok)
+ end,
+ rabbit_channel:shutdown(Ch4),
+ expect_normal_channel_termination(MRef4, Ch4),
+
+ passed.
+
test_delegates_async(SecondaryNode) ->
Self = self(),
- Sender = fun(Pid) -> Pid ! {invoked, Self} end,
+ Sender = fun (Pid) -> Pid ! {invoked, Self} end,
- Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end),
+ Responder = make_responder(fun ({invoked, Pid}) -> Pid ! response end),
ok = delegate:invoke_no_result(spawn(Responder), Sender),
ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender),
@@ -858,7 +986,7 @@ test_delegates_async(SecondaryNode) ->
make_responder(FMsg) -> make_responder(FMsg, timeout).
make_responder(FMsg, Throw) ->
- fun() ->
+ fun () ->
receive Msg -> FMsg(Msg)
after 1000 -> throw(Throw)
end
@@ -887,22 +1015,22 @@ must_exit(Fun) ->
end.
test_delegates_sync(SecondaryNode) ->
- Sender = fun(Pid) -> gen_server:call(Pid, invoked) end,
- BadSender = fun(_Pid) -> exit(exception) end,
+ Sender = fun (Pid) -> gen_server:call(Pid, invoked) end,
+ BadSender = fun (_Pid) -> exit(exception) end,
- Responder = make_responder(fun({'$gen_call', From, invoked}) ->
+ Responder = make_responder(fun ({'$gen_call', From, invoked}) ->
gen_server:reply(From, response)
end),
- BadResponder = make_responder(fun({'$gen_call', From, invoked}) ->
+ BadResponder = make_responder(fun ({'$gen_call', From, invoked}) ->
gen_server:reply(From, response)
end, bad_responder_died),
response = delegate:invoke(spawn(Responder), Sender),
response = delegate:invoke(spawn(SecondaryNode, Responder), Sender),
- must_exit(fun() -> delegate:invoke(spawn(BadResponder), BadSender) end),
- must_exit(fun() ->
+ must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end),
+ must_exit(fun () ->
delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
LocalGoodPids = spawn_responders(node(), Responder, 2),
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 55753512..0b1d7265 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -301,13 +301,13 @@ handle_call({terminate_child, Name}, _From, State) ->
handle_call(which_children, _From, State) when ?is_simple(State) ->
[#child{child_type = CT, modules = Mods}] = State#state.children,
- Reply = lists:map(fun({Pid, _}) -> {undefined, Pid, CT, Mods} end,
+ Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end,
?DICT:to_list(State#state.dynamics)),
{reply, Reply, State};
handle_call(which_children, _From, State) ->
Resp =
- lists:map(fun(#child{pid = Pid, name = Name,
+ lists:map(fun (#child{pid = Pid, name = Name,
child_type = ChildType, modules = Mods}) ->
{Name, Pid, ChildType, Mods}
end,
@@ -415,7 +415,7 @@ update_childspec1([], Children, KeepOld) ->
lists:reverse(Children ++ KeepOld).
update_chsp(OldCh, Children) ->
- case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name ->
+ case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name ->
Ch#child{pid = OldCh#child.pid};
(Ch) ->
Ch
@@ -828,7 +828,7 @@ validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}).
validMods(dynamic) -> true;
validMods(Mods) when is_list(Mods) ->
- lists:foreach(fun(Mod) ->
+ lists:foreach(fun (Mod) ->
if
is_atom(Mod) -> ok;
true -> throw({invalid_module, Mod})