summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-11-18 11:56:07 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-11-18 11:56:07 +0000
commitcaa541096fe258c2969a8a9791bdc80e283773d2 (patch)
tree08932b60669520cfeb864e5763fc4035aae19b1a
parentaf1f6be9950a71dd5682dc604e87f9db881fffea (diff)
parent7e279bdbe449573b067ba3246d08b4eb53656003 (diff)
downloadrabbitmq-server-caa541096fe258c2969a8a9791bdc80e283773d2.tar.gz
Merging bug23505 into default
-rw-r--r--src/rabbit_mnesia.erl22
-rw-r--r--src/rabbit_reader.erl34
-rw-r--r--src/rabbit_upgrade.erl22
3 files changed, 34 insertions, 44 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 9d172269..da81f884 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -361,16 +361,15 @@ init_db(ClusterNodes, Force) ->
case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
{ok, Nodes} ->
case Force of
- false ->
- FailedClusterNodes = ProperClusterNodes -- Nodes,
- case FailedClusterNodes of
- [] -> ok;
- _ ->
- throw({error, {failed_to_cluster_with,
- FailedClusterNodes,
- "Mnesia could not connect to some nodes."}})
- end;
- _ -> ok
+ false -> FailedClusterNodes = ProperClusterNodes -- Nodes,
+ case FailedClusterNodes of
+ [] -> ok;
+ _ -> throw({error, {failed_to_cluster_with,
+ FailedClusterNodes,
+ "Mnesia could not connect "
+ "to some nodes."}})
+ end;
+ true -> ok
end,
case {Nodes, mnesia:system_info(use_dir),
mnesia:system_info(db_nodes)} of
@@ -411,8 +410,7 @@ init_db(ClusterNodes, Force) ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
%% are members of a different cluster
- throw({error, {unable_to_join_cluster,
- ClusterNodes, Reason}})
+ throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
end.
schema_ok_or_move() ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 44d2ec7c..71115a73 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -318,13 +318,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
done.
mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
- %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
receive
{inet_async, Sock, Ref, {ok, Data}} ->
- {State1, Callback1, Length1} =
- handle_input(State#v1.callback, Data,
- State#v1{recv_ref = none}),
- mainloop(Deb, switch_callback(State1, Callback1, Length1));
+ mainloop(Deb, handle_input(State#v1.callback, Data,
+ State#v1{recv_ref = none}));
{inet_async, Sock, Ref, {error, closed}} ->
if State#v1.connection_state =:= closed ->
State;
@@ -564,7 +561,6 @@ handle_frame(Type, Channel, Payload,
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
- %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{ch_fr_pid, ChFrPid} ->
ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
@@ -628,18 +624,18 @@ analyze_frame(_Type, _Body, _Protocol) ->
error.
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- %%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]),
- {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1};
+ ensure_stats_timer(
+ switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1));
-handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) ->
+handle_input({frame_payload, Type, Channel, PayloadSize},
+ PayloadAndMarker, State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
- %%?LOGDEBUG("Frame completed: ~p/~p/~p~n", [Type, Channel, Payload]),
- NewState = handle_frame(Type, Channel, Payload, State),
- {NewState, frame_header, 7};
+ handle_frame(Type, Channel, Payload,
+ switch_callback(State, frame_header, 7));
_ ->
- throw({bad_payload, PayloadAndMarker})
+ throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
end;
%% The two rules pertaining to version negotiation:
@@ -690,11 +686,11 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
mechanisms = <<"PLAIN AMQPLAIN">>,
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT,
- protocol = Protocol},
- connection_state = starting},
- frame_header, 7}.
+ switch_callback(State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7).
refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 0071a08a..9522227e 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -66,8 +66,8 @@ maybe_upgrade() ->
read_version() ->
case rabbit_misc:read_term_file(schema_filename()) of
- {ok, [Heads]} -> {ok, Heads};
- {error, E} -> {error, E}
+ {ok, [Heads]} -> {ok, Heads};
+ {error, _} = Err -> Err
end.
write_version() ->
@@ -111,8 +111,8 @@ upgrades_to_apply(Heads, G) ->
sets:from_list(digraph_utils:reaching(Heads, G)))),
%% Form a subgraph from that list and find a topological ordering
%% so we can invoke them in order.
- [element(2, digraph:vertex(G, StepName))
- || StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+ [element(2, digraph:vertex(G, StepName)) ||
+ StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
heads(G) ->
lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
@@ -141,16 +141,12 @@ apply_upgrade({M, F}) ->
%% -------------------------------------------------------------------
-schema_filename() ->
- filename:join(dir(), ?VERSION_FILENAME).
+dir() -> rabbit_mnesia:dir().
-lock_filename() ->
- filename:join(dir(), ?LOCK_FILENAME).
+schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).
+
+lock_filename() -> filename:join(dir(), ?LOCK_FILENAME).
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet
-info(Msg, Args) ->
- error_logger:info_msg(Msg, Args).
-
-dir() ->
- rabbit_mnesia:dir().
+info(Msg, Args) -> error_logger:info_msg(Msg, Args).