summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-05-26 18:40:30 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-05-26 18:40:30 +0100
commit5c9938c68d93cb29577cf5e3f245cbfb5d6291a7 (patch)
treed955bcbcfaede21502d5517368debfc6b930d2a5
parent16b397876264ad4e1973833b794596f89316231d (diff)
downloadrabbitmq-server-5c9938c68d93cb29577cf5e3f245cbfb5d6291a7.tar.gz
Sorted out exception handling correctly, and added many more tests
-rw-r--r--src/rabbit_channel.erl73
-rw-r--r--src/rabbit_tests.erl86
2 files changed, 110 insertions, 49 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a4ce5384..0cdf7f2d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -183,26 +183,9 @@ handle_call(_Request, _From, State) ->
noreply(State).
handle_cast({method, Method, Content}, State) ->
- try handle_method(Method, Content, State) of
- {reply, Reply, NewState} ->
- ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
- noreply(NewState);
- {noreply, NewState} ->
- noreply(NewState);
- stop ->
- {stop, normal, State#ch{state = terminating}}
- catch
- exit:Reason = #amqp_error{} ->
- ok = rollback_and_notify(State),
- MethodName = rabbit_misc:method_record_type(Method),
- State#ch.reader_pid ! {channel_exit, State#ch.channel,
- Reason#amqp_error{method = MethodName}},
- {stop, normal, State#ch{state = terminating}};
- exit:normal ->
- {stop, normal, State};
- _:Reason ->
- {stop, {Reason, erlang:get_stacktrace()}, State}
- end;
+ handle_exiting_function(
+ fun (State1) -> handle_method(Method, Content, State1) end, Method,
+ State);
handle_cast({flushed, QPid}, State) ->
{noreply, queue_blocked(QPid, State)};
@@ -224,13 +207,18 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
handle_cast({conserve_memory, Conserve}, State) ->
flow_control(not Conserve, State);
-handle_cast({flow_timeout, Ref}, #ch{flow = #flow{client = ClientFlow,
- pending = {Ref, _TRef}}}) ->
- rabbit_misc:protocol_error(
- precondition_failed,
- "timeout waiting for channel.flow_ok{active=~w}", [not ClientFlow]);
-handle_cast({flow_timeout, _Ref}, State) ->
- noreply(State).
+handle_cast({flow_timeout, Ref}, State) ->
+ handle_exiting_function(
+ fun (#ch{flow = #flow{client = ClientFlow,
+ pending = {Ref1, _TRef}}})
+ when Ref =:= Ref1 ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "timeout waiting for channel.flow_ok{active=~w}",
+ [not ClientFlow]);
+ (State1) ->
+ {noreply, State1}
+ end, none, State).
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -369,6 +357,31 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
+handle_exiting_function(Fun, Method, State) ->
+ try Fun(State) of
+ {reply, Reply, NewState} ->
+ ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
+ noreply(NewState);
+ {noreply, NewState} ->
+ noreply(NewState);
+ stop ->
+ {stop, normal, State#ch{state = terminating}}
+ catch
+ exit:Reason = #amqp_error{} ->
+ ok = rollback_and_notify(State),
+ MethodName = case Method of
+ none -> none;
+ _ -> rabbit_misc:method_record_type(Method)
+ end,
+ State#ch.reader_pid ! {channel_exit, State#ch.channel,
+ Reason#amqp_error{method = MethodName}},
+ {stop, normal, State#ch{state = terminating}};
+ exit:normal ->
+ {stop, normal, State};
+ _:Reason ->
+ {stop, {Reason, erlang:get_stacktrace()}, State}
+ end.
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -850,11 +863,11 @@ handle_method(#'channel.flow'{active = false}, _,
end;
handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{flow = #flow{server = Active, client = Flow,
- pending = {_Ref, TRef}}})
+ State = #ch{flow = F = #flow{server = Active, client = Flow,
+ pending = {_Ref, TRef}}})
when Flow =:= not Active ->
{ok, cancel} = timer:cancel(TRef),
- {noreply, State#ch{flow = #flow{client = Active, pending = none}}};
+ {noreply, State#ch{flow = F#flow{client = Active, pending = none}}};
handle_method(#'channel.flow_ok'{active = Active}, _,
State = #ch{flow = #flow{server = Flow, client = Flow,
pending = {_Ref, TRef}}})
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a871154c..357e9949 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -863,7 +863,12 @@ test_memory_pressure_receiver(Pid) ->
receive
shutdown ->
ok;
- {send_command, #'channel.flow'{} = Method} ->
+ {send_command, Method} ->
+ ok = case Method of
+ #'channel.flow'{} -> ok;
+ #'basic.qos_ok'{} -> ok;
+ #'channel.open_ok'{} -> ok
+ end,
Pid ! Method,
test_memory_pressure_receiver(Pid);
sync ->
@@ -881,46 +886,89 @@ test_memory_pressure_receive_flow(Active) ->
ok
end.
-test_memory_pressure_sync(WPid) ->
- WPid ! sync,
- receive sync -> ok after 1000 -> throw(timeout) end.
+test_memory_pressure_sync(Ch, Writer) ->
+ ok = rabbit_channel:do(Ch, #'basic.qos'{}),
+ Writer ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'basic.qos_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_basic_qos_ok)
+ end.
-test_memory_pressure() ->
+test_memory_pressure_spawn() ->
Me = self(),
Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
- ok = rabbit_channel:conserve_memory(Ch, false),
- ok = rabbit_channel:conserve_memory(Ch, false),
- ok = rabbit_channel:conserve_memory(Ch, true),
- ok = rabbit_channel:conserve_memory(Ch, false),
- ok = rabbit_channel:conserve_memory(Ch, true),
- ok = rabbit_channel:conserve_memory(Ch, true),
- ok = rabbit_channel:conserve_memory(Ch, false),
MRef = erlang:monitor(process, Ch),
- receive {'DOWN', MRef, process, Ch, Info} ->
- throw({channel_died_early, Info})
+ receive #'channel.open_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_channel_open_ok)
+ end,
+ {Writer, Ch, MRef}.
+
+test_memory_pressure() ->
+ {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(),
+ ok = rabbit_channel:conserve_memory(Ch0, false),
+ ok = rabbit_channel:conserve_memory(Ch0, false),
+ ok = rabbit_channel:conserve_memory(Ch0, true),
+ ok = rabbit_channel:conserve_memory(Ch0, false),
+ ok = rabbit_channel:conserve_memory(Ch0, true),
+ ok = rabbit_channel:conserve_memory(Ch0, true),
+ ok = rabbit_channel:conserve_memory(Ch0, false),
+ ok = test_memory_pressure_sync(Ch0, Writer0),
+ receive {'DOWN', MRef0, process, Ch0, Info0} ->
+ throw({channel_died_early, Info0})
after 0 ->
ok
end,
- ok = test_memory_pressure_sync(Writer),
%% we should have just 1 active=false waiting for us
ok = test_memory_pressure_receive_flow(false),
%% if we reply with flow_ok, we should immediately get an
%% active=true back
- ok = rabbit_channel:do(Ch, #'channel.flow_ok'{active = false}),
- ok = test_memory_pressure_sync(Writer),
+ ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}),
ok = test_memory_pressure_receive_flow(true),
%% if we publish at this point, the channel should die
- ok = rabbit_channel:do(Ch, #'basic.publish'{}, #content{}),
- receive {'DOWN', MRef, process, Ch, normal} ->
+ ok = rabbit_channel:do(Ch0, #'basic.publish'{}, #content{}),
+ receive {'DOWN', MRef0, process, Ch0, normal} ->
+ ok
+ after 1000 ->
+ throw(channel_failed_to_exit)
+ end,
+
+ {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(),
+ ok = rabbit_channel:conserve_memory(Ch1, true),
+ ok = test_memory_pressure_receive_flow(false),
+ ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
+ ok = test_memory_pressure_sync(Ch1, Writer1),
+ ok = rabbit_channel:conserve_memory(Ch1, false),
+ ok = test_memory_pressure_receive_flow(true),
+ %% send back the wrong flow_ok. Channel should die.
+ ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
+ receive {'DOWN', MRef1, process, Ch1, normal} ->
+ ok
+ after 1000 ->
+ throw(channel_failed_to_exit)
+ end,
+
+ {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(),
+ %% just out of the blue, send a flow_ok. Life should end.
+ ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}),
+ receive {'DOWN', MRef2, process, Ch2, normal} ->
ok
after 1000 ->
throw(channel_failed_to_exit)
end,
+
+ {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(),
+ ok = rabbit_channel:conserve_memory(Ch3, true),
+ receive {'DOWN', MRef3, process, Ch3, _} ->
+ ok
+ after 12000 ->
+ throw(channel_failed_to_exit)
+ end,
+
passed.
make_responder(FMsg) ->