summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-07-15 17:13:14 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-07-15 17:13:14 +0100
commit3e610ca047c7fe745b0b4f15f27fb0e4e185274e (patch)
tree00d1db0e6b6d6d0723f6c93ada3db1833a9a1e5d
parent5d878a6666046c4ae92efa65d86fd8859c295161 (diff)
parent1bbe4398e5c3ead6c0fc585129040fc724e58701 (diff)
downloadrabbitmq-server-3e610ca047c7fe745b0b4f15f27fb0e4e185274e.tar.gz
merge default into bug25056
-rw-r--r--src/rabbit_channel.erl18
-rw-r--r--src/rabbit_reader.erl110
2 files changed, 71 insertions, 57 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 864e100a..cdacc19e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -405,17 +405,17 @@ handle_exception(Reason, State = #ch{protocol = Protocol,
writer_pid = WriterPid,
reader_pid = ReaderPid,
conn_pid = ConnPid}) ->
- {CloseChannel, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
- [ConnPid, Channel, Reason]),
%% something bad's happened: notify_queues may not be 'ok'
{_Result, State1} = notify_queues(State),
- case CloseChannel of
- Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod),
- {noreply, State1};
- _ -> ReaderPid ! {channel_exit, Channel, Reason},
- {stop, normal, State1}
+ case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
+ {Channel, CloseMethod} ->
+ rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
+ [ConnPid, Channel, Reason]),
+ ok = rabbit_writer:send_command(WriterPid, CloseMethod),
+ {noreply, State1};
+ {0, _} ->
+ ReaderPid ! {channel_exit, Channel, Reason},
+ {stop, normal, State1}
end.
precondition_failed(Format) -> precondition_failed(Format, []).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ee3bf29a..cacca151 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -355,9 +355,9 @@ switch_callback(State, Callback, Length) ->
State#v1{callback = Callback, recv_len = Length}.
terminate(Explanation, State) when ?IS_RUNNING(State) ->
- {normal, send_exception(State, 0,
- rabbit_misc:amqp_error(
- connection_forced, Explanation, [], none))};
+ {normal, handle_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, Explanation, [], none))};
terminate(_Explanation, State) ->
{force, State}.
@@ -411,8 +411,6 @@ handle_dependent_exit(ChPid, Reason, State) ->
{_Channel, controlled} ->
maybe_close(control_throttle(State));
{Channel, uncontrolled} ->
- log(error, "AMQP connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
maybe_close(handle_exception(control_throttle(State),
Channel, Reason))
end.
@@ -470,19 +468,46 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
-handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
+handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
+ log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
+ [self(), closed, Channel, Reason]),
State;
-handle_exception(State, Channel, Reason) ->
- send_exception(State, Channel, Reason).
-
-send_exception(State = #v1{connection = #connection{protocol = Protocol}},
- Channel, Reason) ->
+handle_exception(State = #v1{connection = #connection{protocol = Protocol},
+ connection_state = CS},
+ Channel, Reason)
+ when ?IS_RUNNING(State) orelse CS =:= closing ->
+ log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
+ [self(), CS, Channel, Reason]),
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
terminate_channels(),
State1 = close_connection(State),
ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
- State1.
+ State1;
+handle_exception(State, Channel, Reason) ->
+ %% We don't trust the client at this point - force them to wait
+ %% for a bit so they can't DOS us with repeated failed logins etc.
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({handshake_error, State#v1.connection_state, Channel, Reason}).
+
+frame_error(Error, Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(frame_error,
+ "type ~p, ~s octets = ~p: ~p",
+ [Type, Str, Bin, Error], none)).
+
+unexpected_frame(Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(unexpected_frame,
+ "type ~p, ~s octets = ~p",
+ [Type, Str, Bin], none)).
+
+payload_snippet(Payload) when size(Payload) =< 16 ->
+ {"all", Payload};
+payload_snippet(<<Snippet:16/binary, _/binary>>) ->
+ {"first 16", Snippet}.
%%--------------------------------------------------------------------------
@@ -501,7 +526,7 @@ create_channel(Channel, State) ->
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),
- ok.
+ {ChPid, AState}.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
@@ -532,34 +557,32 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> throw({unknown_frame, 0, Type, Payload});
+ error -> frame_error(unknown_frame, Type, 0, Payload, State);
heartbeat -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
- Other -> throw({unexpected_frame_on_channel0, Other})
+ _Other -> unexpected_frame(Type, 0, Payload, State)
end;
handle_frame(Type, Channel, Payload,
- State = #v1{connection = #connection{protocol = Protocol}}) ->
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_RUNNING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> throw({unknown_frame, Channel, Type, Payload});
- heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
- end.
+ error -> frame_error(unknown_frame, Type, Channel, Payload, State);
+ heartbeat -> unexpected_frame(Type, Channel, Payload, State);
+ Frame -> process_frame(Frame, Channel, State)
+ end;
+handle_frame(Type, Channel, Payload, State) ->
+ unexpected_frame(Type, Channel, Payload, State).
process_frame(Frame, Channel, State) ->
- case get({channel, Channel}) of
- {ChPid, AState} ->
- case process_channel_frame(Frame, ChPid, AState) of
- {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {error, Reason} -> handle_exception(State, Channel, Reason)
- end;
- undefined when ?IS_RUNNING(State) ->
- ok = create_channel(Channel, State),
- process_frame(Frame, Channel, State);
- undefined ->
- throw({channel_frame_while_starting,
- Channel, State#v1.connection_state, Frame})
+ {ChPid, AState} = case get({channel, Channel}) of
+ undefined -> create_channel(Channel, State);
+ Other -> Other
+ end,
+ case process_channel_frame(Frame, ChPid, AState) of
+ {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {error, Reason} -> handle_exception(State, Channel, Reason)
end.
process_channel_frame(Frame, ChPid, AState) ->
@@ -600,8 +623,9 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker,
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
switch_callback(handle_frame(Type, Channel, Payload, State),
frame_header, 7);
- _ ->
- throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
+ <<Payload:PayloadSize/binary, EndMarker>> ->
+ frame_error({invalid_frame_end_marker, EndMarker},
+ Type, Channel, Payload, State)
end;
%% The two rules pertaining to version negotiation:
@@ -672,24 +696,14 @@ ensure_stats_timer(State) ->
handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- HandleException =
- fun(R) ->
- case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, R);
- %% We don't trust the client at this point - force
- %% them to wait for a bit so they can't DOS us with
- %% repeated failed logins etc.
- false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state, R})
- end
- end,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
catch exit:#amqp_error{method = none} = Reason ->
- HandleException(Reason#amqp_error{method = MethodName});
+ handle_exception(State, 0, Reason#amqp_error{method = MethodName});
Type:Reason ->
- HandleException({Type, Reason, MethodName, erlang:get_stacktrace()})
+ Stack = erlang:get_stacktrace(),
+ handle_exception(State, 0, {Type, Reason, MethodName, Stack})
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,