summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-07-15 16:46:23 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-07-15 16:46:23 +0100
commit1bbe4398e5c3ead6c0fc585129040fc724e58701 (patch)
tree00d1db0e6b6d6d0723f6c93ada3db1833a9a1e5d
parent3206dd9d996afa4e728e80beba6bd926b309bf21 (diff)
downloadrabbitmq-server-1bbe4398e5c3ead6c0fc585129040fc724e58701.tar.gz
more consistent error handling in reader
-rw-r--r--src/rabbit_channel.erl18
-rw-r--r--src/rabbit_reader.erl215
2 files changed, 128 insertions, 105 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 864e100a..cdacc19e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -405,17 +405,17 @@ handle_exception(Reason, State = #ch{protocol = Protocol,
writer_pid = WriterPid,
reader_pid = ReaderPid,
conn_pid = ConnPid}) ->
- {CloseChannel, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
- [ConnPid, Channel, Reason]),
%% something bad's happened: notify_queues may not be 'ok'
{_Result, State1} = notify_queues(State),
- case CloseChannel of
- Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod),
- {noreply, State1};
- _ -> ReaderPid ! {channel_exit, Channel, Reason},
- {stop, normal, State1}
+ case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
+ {Channel, CloseMethod} ->
+ rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
+ [ConnPid, Channel, Reason]),
+ ok = rabbit_writer:send_command(WriterPid, CloseMethod),
+ {noreply, State1};
+ {0, _} ->
+ ReaderPid ! {channel_exit, Channel, Reason},
+ {stop, normal, State1}
end.
precondition_failed(Format) -> precondition_failed(Format, []).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 86255790..cacca151 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -173,6 +173,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
server_capabilities(_) ->
[].
+%%--------------------------------------------------------------------------
+
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -353,9 +355,9 @@ switch_callback(State, Callback, Length) ->
State#v1{callback = Callback, recv_len = Length}.
terminate(Explanation, State) when ?IS_RUNNING(State) ->
- {normal, send_exception(State, 0,
- rabbit_misc:amqp_error(
- connection_forced, Explanation, [], none))};
+ {normal, handle_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, Explanation, [], none))};
terminate(_Explanation, State) ->
{force, State}.
@@ -383,6 +385,9 @@ update_last_blocked_by(State = #v1{conserve_resources = true}) ->
update_last_blocked_by(State = #v1{conserve_resources = false}) ->
State#v1{last_blocked_by = flow}.
+%%--------------------------------------------------------------------------
+%% error handling / termination
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -406,24 +411,10 @@ handle_dependent_exit(ChPid, Reason, State) ->
{_Channel, controlled} ->
maybe_close(control_throttle(State));
{Channel, uncontrolled} ->
- log(error, "AMQP connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
maybe_close(handle_exception(control_throttle(State),
Channel, Reason))
end.
-channel_cleanup(ChPid) ->
- case get({ch_pid, ChPid}) of
- undefined -> undefined;
- {Channel, MRef} -> credit_flow:peer_down(ChPid),
- erase({channel, Channel}),
- erase({ch_pid, ChPid}),
- erlang:demonitor(MRef, [flush]),
- Channel
- end.
-
-all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
-
terminate_channels() ->
NChannels =
length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
@@ -477,6 +468,80 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
+ log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
+ [self(), closed, Channel, Reason]),
+ State;
+handle_exception(State = #v1{connection = #connection{protocol = Protocol},
+ connection_state = CS},
+ Channel, Reason)
+ when ?IS_RUNNING(State) orelse CS =:= closing ->
+ log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
+ [self(), CS, Channel, Reason]),
+ {0, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ terminate_channels(),
+ State1 = close_connection(State),
+ ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
+ State1;
+handle_exception(State, Channel, Reason) ->
+ %% We don't trust the client at this point - force them to wait
+ %% for a bit so they can't DOS us with repeated failed logins etc.
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({handshake_error, State#v1.connection_state, Channel, Reason}).
+
+frame_error(Error, Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(frame_error,
+ "type ~p, ~s octets = ~p: ~p",
+ [Type, Str, Bin, Error], none)).
+
+unexpected_frame(Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(unexpected_frame,
+ "type ~p, ~s octets = ~p",
+ [Type, Str, Bin], none)).
+
+payload_snippet(Payload) when size(Payload) =< 16 ->
+ {"all", Payload};
+payload_snippet(<<Snippet:16/binary, _/binary>>) ->
+ {"first 16", Snippet}.
+
+%%--------------------------------------------------------------------------
+
+create_channel(Channel, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
+ Protocol, User, VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ChPid, AState}.
+
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ erlang:demonitor(MRef, [flush]),
+ Channel
+ end.
+
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
+
+%%--------------------------------------------------------------------------
+
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
@@ -492,34 +557,43 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> throw({unknown_frame, 0, Type, Payload});
+ error -> frame_error(unknown_frame, Type, 0, Payload, State);
heartbeat -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
- Other -> throw({unexpected_frame_on_channel0, Other})
+ _Other -> unexpected_frame(Type, 0, Payload, State)
end;
handle_frame(Type, Channel, Payload,
- State = #v1{connection = #connection{protocol = Protocol}}) ->
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_RUNNING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> throw({unknown_frame, Channel, Type, Payload});
- heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
- end.
+ error -> frame_error(unknown_frame, Type, Channel, Payload, State);
+ heartbeat -> unexpected_frame(Type, Channel, Payload, State);
+ Frame -> process_frame(Frame, Channel, State)
+ end;
+handle_frame(Type, Channel, Payload, State) ->
+ unexpected_frame(Type, Channel, Payload, State).
process_frame(Frame, Channel, State) ->
- case get({channel, Channel}) of
- {ChPid, AState} ->
- case process_channel_frame(Frame, ChPid, AState) of
- {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {error, Reason} -> handle_exception(State, Channel, Reason)
- end;
- undefined when ?IS_RUNNING(State) ->
- ok = create_channel(Channel, State),
- process_frame(Frame, Channel, State);
- undefined ->
- throw({channel_frame_while_starting,
- Channel, State#v1.connection_state, Frame})
+ {ChPid, AState} = case get({channel, Channel}) of
+ undefined -> create_channel(Channel, State);
+ Other -> Other
+ end,
+ case process_channel_frame(Frame, ChPid, AState) of
+ {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {error, Reason} -> handle_exception(State, Channel, Reason)
+ end.
+
+process_channel_frame(Frame, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> {ok, NewAState};
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ {ok, NewAState};
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
+ {ok, NewAState};
+ {error, Reason} -> {error, Reason}
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
@@ -536,6 +610,8 @@ post_process_frame({method, MethodName, _}, _ChPid,
post_process_frame(_Frame, _ChPid, State) ->
control_throttle(State).
+%%--------------------------------------------------------------------------
+
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
@@ -547,8 +623,9 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker,
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
switch_callback(handle_frame(Type, Channel, Payload, State),
frame_header, 7);
- _ ->
- throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
+ <<Payload:PayloadSize/binary, EndMarker>> ->
+ frame_error({invalid_frame_end_marker, EndMarker},
+ Type, Channel, Payload, State)
end;
%% The two rules pertaining to version negotiation:
@@ -619,24 +696,14 @@ ensure_stats_timer(State) ->
handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- HandleException =
- fun(R) ->
- case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, R);
- %% We don't trust the client at this point - force
- %% them to wait for a bit so they can't DOS us with
- %% repeated failed logins etc.
- false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state, R})
- end
- end,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
catch exit:#amqp_error{method = none} = Reason ->
- HandleException(Reason#amqp_error{method = MethodName});
+ handle_exception(State, 0, Reason#amqp_error{method = MethodName});
Type:Reason ->
- HandleException({Type, Reason, MethodName, erlang:get_stacktrace()})
+ Stack = erlang:get_stacktrace(),
+ handle_exception(State, 0, {Type, Reason, MethodName, Stack})
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
@@ -897,50 +964,6 @@ cert_info(F, Sock) ->
{ok, Cert} -> list_to_binary(F(Cert))
end.
-%%--------------------------------------------------------------------------
-
-create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
- channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- ok.
-
-process_channel_frame(Frame, ChPid, AState) ->
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
- end.
-
-handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
- State;
-handle_exception(State, Channel, Reason) ->
- send_exception(State, Channel, Reason).
-
-send_exception(State = #v1{connection = #connection{protocol = Protocol}},
- Channel, Reason) ->
- {0, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- terminate_channels(),
- State1 = close_connection(State),
- ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
- State1.
-
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).