diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-07-15 17:13:14 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-07-15 17:13:14 +0100 |
commit | 3e610ca047c7fe745b0b4f15f27fb0e4e185274e (patch) | |
tree | 00d1db0e6b6d6d0723f6c93ada3db1833a9a1e5d | |
parent | 5d878a6666046c4ae92efa65d86fd8859c295161 (diff) | |
parent | 1bbe4398e5c3ead6c0fc585129040fc724e58701 (diff) | |
download | rabbitmq-server-3e610ca047c7fe745b0b4f15f27fb0e4e185274e.tar.gz |
merge default into bug25056
-rw-r--r-- | src/rabbit_channel.erl | 18 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 110 |
2 files changed, 71 insertions, 57 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 864e100a..cdacc19e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -405,17 +405,17 @@ handle_exception(Reason, State = #ch{protocol = Protocol, writer_pid = WriterPid, reader_pid = ReaderPid, conn_pid = ConnPid}) -> - {CloseChannel, CloseMethod} = - rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", - [ConnPid, Channel, Reason]), %% something bad's happened: notify_queues may not be 'ok' {_Result, State1} = notify_queues(State), - case CloseChannel of - Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod), - {noreply, State1}; - _ -> ReaderPid ! {channel_exit, Channel, Reason}, - {stop, normal, State1} + case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of + {Channel, CloseMethod} -> + rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", + [ConnPid, Channel, Reason]), + ok = rabbit_writer:send_command(WriterPid, CloseMethod), + {noreply, State1}; + {0, _} -> + ReaderPid ! {channel_exit, Channel, Reason}, + {stop, normal, State1} end. precondition_failed(Format) -> precondition_failed(Format, []). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ee3bf29a..cacca151 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -355,9 +355,9 @@ switch_callback(State, Callback, Length) -> State#v1{callback = Callback, recv_len = Length}. terminate(Explanation, State) when ?IS_RUNNING(State) -> - {normal, send_exception(State, 0, - rabbit_misc:amqp_error( - connection_forced, Explanation, [], none))}; + {normal, handle_exception(State, 0, + rabbit_misc:amqp_error( + connection_forced, Explanation, [], none))}; terminate(_Explanation, State) -> {force, State}. @@ -411,8 +411,6 @@ handle_dependent_exit(ChPid, Reason, State) -> {_Channel, controlled} -> maybe_close(control_throttle(State)); {Channel, uncontrolled} -> - log(error, "AMQP connection ~p, channel ~p - error:~n~p~n", - [self(), Channel, Reason]), maybe_close(handle_exception(control_throttle(State), Channel, Reason)) end. @@ -470,19 +468,46 @@ maybe_close(State) -> termination_kind(normal) -> controlled; termination_kind(_) -> uncontrolled. -handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> +handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> + log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", + [self(), closed, Channel, Reason]), State; -handle_exception(State, Channel, Reason) -> - send_exception(State, Channel, Reason). - -send_exception(State = #v1{connection = #connection{protocol = Protocol}}, - Channel, Reason) -> +handle_exception(State = #v1{connection = #connection{protocol = Protocol}, + connection_state = CS}, + Channel, Reason) + when ?IS_RUNNING(State) orelse CS =:= closing -> + log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", + [self(), CS, Channel, Reason]), {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), terminate_channels(), State1 = close_connection(State), ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), - State1. + State1; +handle_exception(State, Channel, Reason) -> + %% We don't trust the client at this point - force them to wait + %% for a bit so they can't DOS us with repeated failed logins etc. + timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({handshake_error, State#v1.connection_state, Channel, Reason}). + +frame_error(Error, Type, Channel, Payload, State) -> + {Str, Bin} = payload_snippet(Payload), + handle_exception(State, Channel, + rabbit_misc:amqp_error(frame_error, + "type ~p, ~s octets = ~p: ~p", + [Type, Str, Bin, Error], none)). + +unexpected_frame(Type, Channel, Payload, State) -> + {Str, Bin} = payload_snippet(Payload), + handle_exception(State, Channel, + rabbit_misc:amqp_error(unexpected_frame, + "type ~p, ~s octets = ~p", + [Type, Str, Bin], none)). + +payload_snippet(Payload) when size(Payload) =< 16 -> + {"all", Payload}; +payload_snippet(<<Snippet:16/binary, _/binary>>) -> + {"first 16", Snippet}. %%-------------------------------------------------------------------------- @@ -501,7 +526,7 @@ create_channel(Channel, State) -> MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), - ok. + {ChPid, AState}. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of @@ -532,34 +557,32 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) handle_frame(Type, 0, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of - error -> throw({unknown_frame, 0, Type, Payload}); + error -> frame_error(unknown_frame, Type, 0, Payload, State); heartbeat -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); - Other -> throw({unexpected_frame_on_channel0, Other}) + _Other -> unexpected_frame(Type, 0, Payload, State) end; handle_frame(Type, Channel, Payload, - State = #v1{connection = #connection{protocol = Protocol}}) -> + State = #v1{connection = #connection{protocol = Protocol}}) + when ?IS_RUNNING(State) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of - error -> throw({unknown_frame, Channel, Type, Payload}); - heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State) - end. + error -> frame_error(unknown_frame, Type, Channel, Payload, State); + heartbeat -> unexpected_frame(Type, Channel, Payload, State); + Frame -> process_frame(Frame, Channel, State) + end; +handle_frame(Type, Channel, Payload, State) -> + unexpected_frame(Type, Channel, Payload, State). process_frame(Frame, Channel, State) -> - case get({channel, Channel}) of - {ChPid, AState} -> - case process_channel_frame(Frame, ChPid, AState) of - {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {error, Reason} -> handle_exception(State, Channel, Reason) - end; - undefined when ?IS_RUNNING(State) -> - ok = create_channel(Channel, State), - process_frame(Frame, Channel, State); - undefined -> - throw({channel_frame_while_starting, - Channel, State#v1.connection_state, Frame}) + {ChPid, AState} = case get({channel, Channel}) of + undefined -> create_channel(Channel, State); + Other -> Other + end, + case process_channel_frame(Frame, ChPid, AState) of + {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {error, Reason} -> handle_exception(State, Channel, Reason) end. process_channel_frame(Frame, ChPid, AState) -> @@ -600,8 +623,9 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, <<Payload:PayloadSize/binary, ?FRAME_END>> -> switch_callback(handle_frame(Type, Channel, Payload, State), frame_header, 7); - _ -> - throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker}) + <<Payload:PayloadSize/binary, EndMarker>> -> + frame_error({invalid_frame_end_marker, EndMarker}, + Type, Channel, Payload, State) end; %% The two rules pertaining to version negotiation: @@ -672,24 +696,14 @@ ensure_stats_timer(State) -> handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> - HandleException = - fun(R) -> - case ?IS_RUNNING(State) of - true -> send_exception(State, 0, R); - %% We don't trust the client at this point - force - %% them to wait for a bit so they can't DOS us with - %% repeated failed logins etc. - false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, State#v1.connection_state, R}) - end - end, try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) catch exit:#amqp_error{method = none} = Reason -> - HandleException(Reason#amqp_error{method = MethodName}); + handle_exception(State, 0, Reason#amqp_error{method = MethodName}); Type:Reason -> - HandleException({Type, Reason, MethodName, erlang:get_stacktrace()}) + Stack = erlang:get_stacktrace(), + handle_exception(State, 0, {Type, Reason, MethodName, Stack}) end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, |