diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-06-07 15:27:59 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-06-07 15:27:59 +0100 |
commit | ded733191bfdc77f2438f68f04c3f342773d97f6 (patch) | |
tree | dbe379dd95542ed323f6228b4ad5a0a6a8904148 | |
parent | 335fc2cc70c6131d4f270b0c39c906a7c81efcfe (diff) | |
parent | 87c44c6cf1d08d5e8e276b10dc2224ed41d87869 (diff) | |
download | rabbitmq-server-ded733191bfdc77f2438f68f04c3f342773d97f6.tar.gz |
Merged in default
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 6 | ||||
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 6 | ||||
-rw-r--r-- | src/delegate.erl | 10 | ||||
-rw-r--r-- | src/gen_server2.erl | 2 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 115 | ||||
-rw-r--r-- | src/rabbit_control.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 13 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_registry.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 6 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 2 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 4 | ||||
-rw-r--r-- | src/rabbit_net.erl | 2 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 2 | ||||
-rw-r--r-- | src/rabbit_reader_queue_collector.erl | 4 | ||||
-rw-r--r-- | src/rabbit_router.erl | 11 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 148 | ||||
-rw-r--r-- | src/supervisor2.erl | 8 |
25 files changed, 302 insertions, 103 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 1b536dfa..55cd126e 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index 9864f1eb..38057beb 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/delegate.erl b/src/delegate.erl index 98353453..8af28127 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -45,8 +45,8 @@ -ifdef(use_specs). -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}). --spec(invoke_no_result/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok'). --spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A). +-spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). -spec(process_count/0 :: () -> non_neg_integer()). @@ -73,7 +73,7 @@ invoke(Pid, Fun) when is_pid(Pid) -> invoke(Pids, Fun) when is_list(Pids) -> lists:foldl( - fun({Status, Result, Pid}, {Good, Bad}) -> + fun ({Status, Result, Pid}, {Good, Bad}) -> case Status of ok -> {[{Pid, Result}|Good], Bad}; error -> {Good, [{Pid, Result}|Bad]} @@ -136,10 +136,10 @@ delegate_per_remote_node(NodePids, Fun, DelegateFun) -> %% block forever. [gen_server2:cast( local_server(Node), - {thunk, fun() -> + {thunk, fun () -> Self ! {result, DelegateFun( - Node, fun() -> safe_invoke(Pids, Fun) end)} + Node, fun () -> safe_invoke(Pids, Fun) end)} end}) || {Node, Pids} <- NodePids], [receive {result, Result} -> Result end || _ <- NodePids]. diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 5b899cdb..547f0a42 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -639,7 +639,7 @@ do_multi_call(Nodes, Name, Req, Timeout) -> Caller = self(), Receiver = spawn( - fun() -> + fun () -> %% Middleman process. Should be unsensitive to regular %% exit signals. The sychronization is needed in case %% the receiver would exit before the caller started diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 7e96d9a3..53c713e6 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -47,7 +47,7 @@ -type(mfa_tuple() :: {atom(), atom(), list()}). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). +-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). -endif. @@ -67,9 +67,9 @@ stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). register(Pid, HighMemMFA) -> - ok = gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}, - infinity). + gen_event:call(alarm_handler, ?MODULE, + {register, Pid, HighMemMFA}, + infinity). %%---------------------------------------------------------------------------- @@ -84,7 +84,8 @@ handle_call({register, Pid, {M, F, A} = HighMemMFA}, false -> ok end, NewAlertees = dict:store(Pid, HighMemMFA, Alertess), - {ok, ok, State#alarms{alertees = NewAlertees}}; + {ok, State#alarms.vm_memory_high_watermark, + State#alarms{alertees = NewAlertees}}; handle_call(_Request, State) -> {ok, not_understood, State}. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 36dc1b90..76488255 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -392,15 +392,16 @@ safe_delegate_call_ok(H, F, Pids) -> end. delegate_call(Pid, Msg, Timeout) -> - delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end). + delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). + delegate:invoke(Pid, + fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). delegate_cast(Pid, Msg) -> - delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end). + delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). delegate_pcast(Pid, Pri, Msg) -> delegate:invoke_no_result(Pid, - fun(P) -> gen_server2:pcast(P, Pri, Msg) end). + fun (P) -> gen_server2:pcast(P, Pri, Msg) end). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2dba00ad..432d6290 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e90cddb9..fcec3352 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -39,6 +39,8 @@ -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). +-export([flow_timeout/2]). + -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -46,9 +48,12 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid}). + consumer_mapping, blocking, queue_collector_pid, flow}). + +-record(flow, {server, client, pending}). -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds -define(INFO_KEYS, [pid, @@ -66,6 +71,8 @@ -ifdef(use_specs). +-type(ref() :: any()). + -spec(start_link/6 :: (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). @@ -75,6 +82,7 @@ -spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). +-spec(flow_timeout/2 :: (pid(), ref()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). @@ -113,6 +121,9 @@ conserve_memory(Pid, Conserve) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). +flow_timeout(Pid, Ref) -> + gen_server2:pcast(Pid, 7, {flow_timeout, Ref}). + list() -> pg_local:get_members(rabbit_channels). @@ -154,7 +165,9 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = dict:new(), - queue_collector_pid = CollectorPid}, + queue_collector_pid = CollectorPid, + flow = #flow{server = true, client = true, + pending = none}}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -181,11 +194,9 @@ handle_cast({method, Method, Content}, State) -> {stop, normal, State#ch{state = terminating}} catch exit:Reason = #amqp_error{} -> - ok = rollback_and_notify(State), MethodName = rabbit_misc:method_record_type(Method), - State#ch.reader_pid ! {channel_exit, State#ch.channel, - Reason#amqp_error{method = MethodName}}, - {stop, normal, State#ch{state = terminating}}; + {stop, normal, terminating(Reason#amqp_error{method = MethodName}, + State)}; exit:normal -> {stop, normal, State}; _:Reason -> @@ -209,11 +220,25 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast({conserve_memory, Conserve}, State) -> - ok = clear_permission_cache(), - ok = rabbit_writer:send_command( - State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - noreply(State). +handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> + noreply(State); +handle_cast({conserve_memory, false}, State = #ch{state = starting}) -> + ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}), + noreply(State#ch{state = running}); +handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) -> + flow_control(not Conserve, State); +handle_cast({conserve_memory, _Conserve}, State) -> + noreply(State); + +handle_cast({flow_timeout, Ref}, + State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> + {stop, normal, terminating( + rabbit_misc:amqp_error( + precondition_failed, + "timeout waiting for channel.flow_ok{active=~w}", + [not Flow], none), State)}; +handle_cast({flow_timeout, _Ref}, State) -> + {noreply, State}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -254,6 +279,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. +terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> + ok = rollback_and_notify(State), + Reader ! {channel_exit, Channel, Reason}, + State#ch{state = terminating}. + return_queue_declare_ok(State, NoWait, Q) -> NewState = State#ch{most_recently_declared_queue = (Q#amqqueue.name)#resource.name}, @@ -369,8 +399,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - {reply, #'channel.open_ok'{}, State#ch{state = running}}; + case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of + true -> {noreply, State}; + false -> {reply, #'channel.open_ok'{}, State#ch{state = running}} + end; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -387,13 +419,17 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; -handle_method(#'basic.publish'{exchange = ExchangeNameBin, +handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) -> + rabbit_misc:protocol_error( + command_invalid, + "basic.publish received after channel.flow_ok{active=false}", []); +handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, - mandatory = Mandatory, - immediate = Immediate}, - Content, State = #ch{ virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + mandatory = Mandatory, + immediate = Immediate}, + Content, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + writer_pid = WriterPid}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -824,7 +860,6 @@ handle_method(#'channel.flow'{active = true}, _, end, {reply, #'channel.flow_ok'{active = true}, State#ch{limiter_pid = LimiterPid1}}; - handle_method(#'channel.flow'{active = false}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> @@ -842,11 +877,25 @@ handle_method(#'channel.flow'{active = false}, _, blocking = dict:from_list(Queues)}} end; -handle_method(#'channel.flow_ok'{active = _}, _, State) -> - %% TODO: We may want to correlate this to channel.flow messages we - %% have sent, and complain if we get an unsolicited - %% channel.flow_ok, or the client refuses our flow request. - {noreply, State}; +handle_method(#'channel.flow_ok'{active = Active}, _, + State = #ch{flow = #flow{server = Active, client = Flow, + pending = {_Ref, TRef}} = F}) + when Flow =:= not Active -> + {ok, cancel} = timer:cancel(TRef), + {noreply, State#ch{flow = F#flow{client = Active, pending = none}}}; +handle_method(#'channel.flow_ok'{active = Active}, _, + State = #ch{flow = #flow{server = Flow, client = Flow, + pending = {_Ref, TRef}}}) + when Flow =:= not Active -> + {ok, cancel} = timer:cancel(TRef), + {noreply, issue_flow(Flow, State)}; +handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) -> + rabbit_misc:protocol_error( + command_invalid, "unsolicited channel.flow_ok", []); +handle_method(#'channel.flow_ok'{active = Active}, _, _State) -> + rabbit_misc:protocol_error( + command_invalid, + "received channel.flow_ok{active=~w} has incorrect polarity", [Active]); handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -854,6 +903,22 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}}) + when Flow =:= not Active -> + ok = clear_permission_cache(), + noreply(issue_flow(Active, State)); +flow_control(Active, State = #ch{flow = F}) -> + noreply(State#ch{flow = F#flow{server = Active}}). + +issue_flow(Active, State) -> + ok = rabbit_writer:send_command( + State#ch.writer_pid, #'channel.flow'{active = Active}), + Ref = make_ref(), + {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, + [self(), Ref]), + State#ch{flow = #flow{server = Active, client = not Active, + pending = {Ref, TRef}}}. + binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index d1834b3b..323d4d2f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -59,8 +59,8 @@ start() -> parse_args(FullCommand, #params{quiet = false, node = rabbit_misc:makenode(NodeStr)}), Inform = case Quiet of - true -> fun(_Format, _Args1) -> ok end; - false -> fun(Format, Args1) -> + true -> fun (_Format, _Args1) -> ok end; + false -> fun (Format, Args1) -> io:format(Format ++ " ...~n", Args1) end end, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index b3b9e1b4..d237134f 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -82,8 +82,9 @@ bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). --spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). +-spec(delete_queue_bindings/1 :: (queue_name()) -> fun (() -> none())). +-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> + fun (() -> none())). -spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -99,12 +100,12 @@ recover() -> Exs = rabbit_misc:table_fold( - fun(Exchange, Acc) -> + fun (Exchange, Acc) -> ok = mnesia:write(rabbit_exchange, Exchange, write), [Exchange | Acc] end, [], rabbit_durable_exchange), Bs = rabbit_misc:table_fold( - fun(Route = #route{binding = B}, Acc) -> + fun (Route = #route{binding = B}, Acc) -> {_, ReverseRoute} = route_with_reverse(Route), ok = mnesia:write(rabbit_route, Route, write), @@ -351,7 +352,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun() -> case mnesia:read({rabbit_exchange, Exchange}) of + fun () -> case mnesia:read({rabbit_exchange, Exchange}) of [] -> {error, not_found}; [X] -> Fun(X) end @@ -359,7 +360,7 @@ call_with_exchange(Exchange, Fun) -> call_with_exchange_and_queue(Exchange, Queue, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun() -> case {mnesia:read({rabbit_exchange, Exchange}), + fun () -> case {mnesia:read({rabbit_exchange, Exchange}), mnesia:read({rabbit_queue, Queue})} of {[X], [Q]} -> Fun(X, Q); {[ ], [_]} -> {error, exchange_not_found}; diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index a8c071e6..699250f7 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 9b71e0e1..c3fb2588 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 311654ab..62c862a5 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 285dab1a..0991bf0d 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl index 175d15ad..33ea0e92 100644 --- a/src/rabbit_exchange_type_registry.erl +++ b/src/rabbit_exchange_type_registry.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 8a3dceea..e42c4518 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9a911ab1..35739dcb 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -242,12 +242,12 @@ report_cover([Root]) when is_atom(Root) -> report_cover(Root) -> Dir = filename:join(Root, "cover"), ok = filelib:ensure_dir(filename:join(Dir,"junk")), - lists:foreach(fun(F) -> file:delete(F) end, + lists:foreach(fun (F) -> file:delete(F) end, filelib:wildcard(filename:join(Dir, "*.html"))), {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]), {CT, NCT} = lists:foldl( - fun(M,{CovTot, NotCovTot}) -> + fun (M,{CovTot, NotCovTot}) -> {ok, {M, {Cov, NotCov}}} = cover:analyze(M, module), ok = report_coverage_percentage(SummaryFile, Cov, NotCov, M), @@ -367,7 +367,7 @@ upmap(F, L) -> Parent = self(), Ref = make_ref(), [receive {Ref, Result} -> Result end - || _ <- [spawn(fun() -> Parent ! {Ref, F(X)} end) || X <- L]]. + || _ <- [spawn(fun () -> Parent ! {Ref, F(X)} end) || X <- L]]. map_in_order(F, L) -> lists:reverse( diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 55a6761d..a0b7aa4e 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -346,7 +346,7 @@ table_has_copy_type(TabDef, DiscType) -> create_local_table_copies(Type) -> lists:foreach( - fun({Tab, TabDef}) -> + fun ({Tab, TabDef}) -> HasDiscCopies = table_has_copy_type(TabDef, disc_copies), HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies), LocalTab = proplists:get_bool(local_content, TabDef), diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 336f74bf..5db1d77a 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -111,7 +111,7 @@ action(start_all, [NodeCount], RpcTimeout) -> action(status, [], RpcTimeout) -> io:format("Status of all running nodes...~n", []), call_all_nodes( - fun({Node, Pid}) -> + fun ({Node, Pid}) -> RabbitRunning = case is_rabbit_running(Node, RpcTimeout) of false -> not_running; @@ -123,7 +123,7 @@ action(status, [], RpcTimeout) -> action(stop_all, [], RpcTimeout) -> io:format("Stopping all nodes...~n", []), - call_all_nodes(fun({Node, Pid}) -> + call_all_nodes(fun ({Node, Pid}) -> io:format("Stopping node ~p~n", [Node]), rpc:call(Node, rabbit, stop_and_halt, []), case kill_wait(Pid, RpcTimeout, false) of diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 406977b4..975954fc 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -66,7 +66,7 @@ async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> Pid = self(), Ref = make_ref(), - spawn(fun() -> Pid ! {inet_async, Sock, Ref, + spawn(fun () -> Pid ! {inet_async, Sock, Ref, ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} end), diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 3cd42e47..8d3c2dc0 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -236,7 +236,7 @@ log_work(CreateWorkUnit, MessageList, snapshot = Snapshot = #psnapshot{messages = Messages}}) -> Unit = CreateWorkUnit( rabbit_misc:map_in_order( - fun(M = {publish, Message, QK = {_QName, PKey}}) -> + fun (M = {publish, Message, QK = {_QName, PKey}}) -> case ets:lookup(Messages, PKey) of [_] -> {tied, QK}; [] -> ets:insert(Messages, {PKey, Message}), diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_reader_queue_collector.erl index 841549e9..8d4e8fdb 100644 --- a/src/rabbit_reader_queue_collector.erl +++ b/src/rabbit_reader_queue_collector.erl @@ -82,8 +82,8 @@ handle_call({register_exclusive_queue, Q}, _From, handle_call(delete_all, _From, State = #state{exclusive_queues = ExclusiveQueues}) -> [rabbit_misc:with_exit_handler( - fun() -> ok end, - fun() -> + fun () -> ok end, + fun () -> erlang:demonitor(MonitorRef), rabbit_amqqueue:delete(Q, false, false) end) diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 03979d6c..5cd15a94 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -57,14 +57,17 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% is preserved. This scales much better than the non-immediate %% case below. delegate:invoke_no_result( - QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), + QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; deliver(QPids, Delivery) -> {Success, _} = delegate:invoke(QPids, - fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), - {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), + fun (Pid) -> + rabbit_amqqueue:deliver(Pid, Delivery) + end), + {Routed, Handled} = + lists:foldl(fun fold_deliveries/2, {false, []}, Success), check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, {Routed, Handled}). @@ -88,7 +91,7 @@ match_routing_key(Name, RoutingKey) -> lookup_qpids(Queues) -> sets:fold( - fun(Key, Acc) -> + fun (Key, Acc) -> case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index fa0ce2db..ecc2613d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,6 +41,7 @@ -import(lists). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). test_content_prop_roundtrip(Datum, Binary) -> @@ -58,6 +59,7 @@ all_tests() -> passed = test_log_management(), passed = test_app_management(), passed = test_log_management_during_startup(), + passed = test_memory_pressure(), passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), @@ -822,7 +824,7 @@ test_hooks() -> {[arg1, arg2], 1, 3} = get(arg_hook_test_fired), %% Invoking Pids - Remote = fun() -> + Remote = fun () -> receive {rabbitmq_hook,[remote_test,test,[],Target]} -> Target ! invoked @@ -839,11 +841,137 @@ test_hooks() -> end, passed. +test_memory_pressure_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + ok = case Method of + #'channel.flow'{} -> ok; + #'basic.qos_ok'{} -> ok; + #'channel.open_ok'{} -> ok + end, + Pid ! Method, + test_memory_pressure_receiver(Pid); + sync -> + Pid ! sync, + test_memory_pressure_receiver(Pid) + end. + +test_memory_pressure_receive_flow(Active) -> + receive #'channel.flow'{active = Active} -> ok + after 1000 -> throw(failed_to_receive_channel_flow) + end, + receive #'channel.flow'{} -> + throw(pipelining_sync_commands_detected) + after 0 -> + ok + end. + +test_memory_pressure_sync(Ch, Writer) -> + ok = rabbit_channel:do(Ch, #'basic.qos'{}), + Writer ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'basic.qos_ok'{} -> ok + after 1000 -> throw(failed_to_receive_basic_qos_ok) + end. + +test_memory_pressure_spawn() -> + Me = self(), + Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + self()), + ok = rabbit_channel:do(Ch, #'channel.open'{}), + MRef = erlang:monitor(process, Ch), + receive #'channel.open_ok'{} -> ok + after 1000 -> throw(failed_to_receive_channel_open_ok) + end, + {Writer, Ch, MRef}. + +expect_normal_channel_termination(MRef, Ch) -> + receive {'DOWN', MRef, process, Ch, normal} -> ok + after 1000 -> throw(channel_failed_to_exit) + end. + +test_memory_pressure() -> + {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(), + [ok = rabbit_channel:conserve_memory(Ch0, Conserve) || + Conserve <- [false, false, true, false, true, true, false]], + ok = test_memory_pressure_sync(Ch0, Writer0), + receive {'DOWN', MRef0, process, Ch0, Info0} -> + throw({channel_died_early, Info0}) + after 0 -> ok + end, + + %% we should have just 1 active=false waiting for us + ok = test_memory_pressure_receive_flow(false), + + %% if we reply with flow_ok, we should immediately get an + %% active=true back + ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}), + ok = test_memory_pressure_receive_flow(true), + + %% if we publish at this point, the channel should die + Content = #content{class_id = element(1, rabbit_framing:method_id( + 'basic.publish')), + properties = none, + properties_bin = <<>>, + payload_fragments_rev = []}, + ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), + expect_normal_channel_termination(MRef0, Ch0), + + {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(), + ok = rabbit_channel:conserve_memory(Ch1, true), + ok = test_memory_pressure_receive_flow(false), + ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), + ok = test_memory_pressure_sync(Ch1, Writer1), + ok = rabbit_channel:conserve_memory(Ch1, false), + ok = test_memory_pressure_receive_flow(true), + %% send back the wrong flow_ok. Channel should die. + ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), + expect_normal_channel_termination(MRef1, Ch1), + + {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(), + %% just out of the blue, send a flow_ok. Life should end. + ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), + expect_normal_channel_termination(MRef2, Ch2), + + {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(), + ok = rabbit_channel:conserve_memory(Ch3, true), + receive {'DOWN', MRef3, process, Ch3, _} -> + ok + after 12000 -> + throw(channel_failed_to_exit) + end, + + alarm_handler:set_alarm({vm_memory_high_watermark, []}), + Me = self(), + Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), + Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, + self()), + ok = rabbit_channel:do(Ch4, #'channel.open'{}), + MRef4 = erlang:monitor(process, Ch4), + Writer4 ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok) + after 0 -> ok + end, + alarm_handler:clear_alarm(vm_memory_high_watermark), + Writer4 ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'channel.open_ok'{} -> ok + after 1000 -> throw(failed_to_receive_channel_open_ok) + end, + rabbit_channel:shutdown(Ch4), + expect_normal_channel_termination(MRef4, Ch4), + + passed. + test_delegates_async(SecondaryNode) -> Self = self(), - Sender = fun(Pid) -> Pid ! {invoked, Self} end, + Sender = fun (Pid) -> Pid ! {invoked, Self} end, - Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end), + Responder = make_responder(fun ({invoked, Pid}) -> Pid ! response end), ok = delegate:invoke_no_result(spawn(Responder), Sender), ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender), @@ -858,7 +986,7 @@ test_delegates_async(SecondaryNode) -> make_responder(FMsg) -> make_responder(FMsg, timeout). make_responder(FMsg, Throw) -> - fun() -> + fun () -> receive Msg -> FMsg(Msg) after 1000 -> throw(Throw) end @@ -887,22 +1015,22 @@ must_exit(Fun) -> end. test_delegates_sync(SecondaryNode) -> - Sender = fun(Pid) -> gen_server:call(Pid, invoked) end, - BadSender = fun(_Pid) -> exit(exception) end, + Sender = fun (Pid) -> gen_server:call(Pid, invoked) end, + BadSender = fun (_Pid) -> exit(exception) end, - Responder = make_responder(fun({'$gen_call', From, invoked}) -> + Responder = make_responder(fun ({'$gen_call', From, invoked}) -> gen_server:reply(From, response) end), - BadResponder = make_responder(fun({'$gen_call', From, invoked}) -> + BadResponder = make_responder(fun ({'$gen_call', From, invoked}) -> gen_server:reply(From, response) end, bad_responder_died), response = delegate:invoke(spawn(Responder), Sender), response = delegate:invoke(spawn(SecondaryNode, Responder), Sender), - must_exit(fun() -> delegate:invoke(spawn(BadResponder), BadSender) end), - must_exit(fun() -> + must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end), + must_exit(fun () -> delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end), LocalGoodPids = spawn_responders(node(), Responder, 2), diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 55753512..0b1d7265 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -301,13 +301,13 @@ handle_call({terminate_child, Name}, _From, State) -> handle_call(which_children, _From, State) when ?is_simple(State) -> [#child{child_type = CT, modules = Mods}] = State#state.children, - Reply = lists:map(fun({Pid, _}) -> {undefined, Pid, CT, Mods} end, + Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end, ?DICT:to_list(State#state.dynamics)), {reply, Reply, State}; handle_call(which_children, _From, State) -> Resp = - lists:map(fun(#child{pid = Pid, name = Name, + lists:map(fun (#child{pid = Pid, name = Name, child_type = ChildType, modules = Mods}) -> {Name, Pid, ChildType, Mods} end, @@ -415,7 +415,7 @@ update_childspec1([], Children, KeepOld) -> lists:reverse(Children ++ KeepOld). update_chsp(OldCh, Children) -> - case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name -> + case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name -> Ch#child{pid = OldCh#child.pid}; (Ch) -> Ch @@ -828,7 +828,7 @@ validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}). validMods(dynamic) -> true; validMods(Mods) when is_list(Mods) -> - lists:foreach(fun(Mod) -> + lists:foreach(fun (Mod) -> if is_atom(Mod) -> ok; true -> throw({invalid_module, Mod}) |