diff options
author | Vlad Ionescu <vlad@lshift.net> | 2009-09-22 14:55:12 +0100 |
---|---|---|
committer | Vlad Ionescu <vlad@lshift.net> | 2009-09-22 14:55:12 +0100 |
commit | d27bbfdfb756204ebaa4babb44a83d621127cc4a (patch) | |
tree | 0b48a1c83c8d5f2612dd030f08b4b08fe96609a3 | |
parent | efb3b7a43cf2c044bc20233bee305f4f40c914d5 (diff) | |
parent | af9bb2a7bc33b126aa38792138d5dd922978e0aa (diff) | |
download | rabbitmq-server-d27bbfdfb756204ebaa4babb44a83d621127cc4a.tar.gz |
merging from default
-rw-r--r-- | .hgignore | 11 | ||||
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.pod | 2 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | packaging/common/rabbitmq-script-wrapper | 2 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 5 | ||||
-rw-r--r-- | src/rabbit_control.erl | 31 | ||||
-rw-r--r-- | src/rabbit_load.erl | 17 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 35 | ||||
-rw-r--r-- | src/rabbit_plugin_activator.erl | 2 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 118 |
11 files changed, 99 insertions, 133 deletions
@@ -8,13 +8,10 @@ erl_crash.dump syntax: regexp ^cover/ ^dist/ -^include/rabbit_framing.hrl$ -^src/rabbit_framing.erl$ -^rabbit.plt$ -^ebin/rabbit.app$ -^ebin/rabbit.rel$ -^ebin/rabbit.boot$ -^ebin/rabbit.script$ +^include/rabbit_framing\.hrl$ +^src/rabbit_framing\.erl$ +^rabbit\.plt$ +^ebin/rabbit\.(app|rel|boot|script)$ ^plugins/ ^priv/plugins/ @@ -65,15 +65,12 @@ $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.p $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ -$(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS) - erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().' - dialyze: $(BEAM_TARGETS) dialyzer -c $? clean: rm -f $(EBIN_DIR)/*.beam - rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script + rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz @@ -90,13 +87,14 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\ run: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ - RABBITMQ_NODE_ONLY=true \ + RABBITMQ_ALLOW_INPUT=true \ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -s rabbit" \ ./scripts/rabbitmq-server run-node: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ + RABBITMQ_ALLOW_INPUT=true \ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ ./scripts/rabbitmq-server diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 6d4aadeb..c43ed2ea 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -287,7 +287,7 @@ separated by tab characters. List queue information by virtual host. Each line printed describes an connection, with the requested I<connectioninfoitem> values separated by tab characters. If no I<connectioninfoitem>s are specified then -I<user>, I<peer_address> and I<peer_port> are assumed. +I<user>, I<peer_address>, I<peer_port> and I<state> are assumed. =back diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 6fc6e464..dd907d1a 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -17,7 +17,6 @@ {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, {ssl_listeners, []}, {ssl_options, []}, - {extra_startup_steps, []}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_vhost, <<"/">>}, diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index f1a9b1ff..94d72f16 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh ## The contents of this file are subject to the Mozilla Public License ## Version 1.1 (the "License"); you may not use this file except in ## compliance with the License. You may obtain a copy of the License at diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 547220b4..e5317bb1 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -73,10 +73,11 @@ else fi RABBITMQ_START_RABBIT= -[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit' +[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput' +[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="${RABBITMQ_START_RABBIT} -s rabbit" RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" -if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then +if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ] && [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit" RABBITMQ_EBIN_PATH="" else diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index cf20520e..69e91803 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -164,7 +164,7 @@ exchange name, routing key, queue name and arguments, in that order. <ConnectionInfoItem> must be a member of the list [node, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display -user, peer_address and peer_port. +user, peer_address, peer_port and state. "), halt(1). @@ -270,8 +270,9 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = list_replace(node, pid, - default_if_empty(Args, [user, peer_address, peer_port])), + ArgAtoms = list_replace(node, pid, + default_if_empty(Args, [user, peer_address, + peer_port, state])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); @@ -314,7 +315,7 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) || + lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) || X <- InfoItemKeys]) end, Results), ok; @@ -325,18 +326,20 @@ display_row(Row) -> io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), io:nl(). -format_info_item(Items, Key) -> - {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items), - case Info of - {_, #resource{name = Name}} -> +format_info_item(Key, Items) -> + case proplists:get_value(Key, Items) of + #resource{name = Name} -> escape(Name); - _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) -> + Value when Key =:= address; Key =:= peer_address andalso + is_tuple(Value) -> inet_parse:ntoa(Value); - _ when is_pid(Value) -> + Value when is_pid(Value) -> atom_to_list(node(Value)); - _ when is_binary(Value) -> + Value when is_binary(Value) -> escape(Value); - _ -> + Value when is_atom(Value) -> + escape(atom_to_list(Value)); + Value -> io_lib:format("~w", [Value]) end. @@ -362,7 +365,9 @@ rpc_call(Node, Mod, Fun, Args) -> %% form part of UTF-8 strings. escape(Bin) when binary(Bin) -> - escape_char(lists:reverse(binary_to_list(Bin)), []). + escape(binary_to_list(Bin)); +escape(L) when is_list(L) -> + escape_char(lists:reverse(L), []). escape_char([$\\ | T], Acc) -> escape_char(T, [$\\, $\\ | Acc]); diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl index 7bf85347..6ef638cb 100644 --- a/src/rabbit_load.erl +++ b/src/rabbit_load.erl @@ -41,7 +41,7 @@ -ifdef(use_specs). -type(erlang_node() :: atom()). --type(load() :: {{non_neg_integer(), float()}, erlang_node()}). +-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}). -spec(local_load/0 :: () -> load()). -spec(remote_loads/0 :: () -> [load()]). -spec(pick/0 :: () -> erlang_node()). @@ -52,8 +52,11 @@ local_load() -> LoadAvg = case whereis(cpu_sup) of - undefined -> 0.0; - _Other -> cpu_sup:avg1() + undefined -> unknown; + _ -> case cpu_sup:avg1() of + L when is_integer(L) -> L; + {error, timeout} -> unknown + end end, {{statistics(run_queue), LoadAvg}, node()}. @@ -65,8 +68,12 @@ remote_loads() -> pick() -> RemoteLoads = remote_loads(), {{RunQ, LoadAvg}, Node} = local_load(), - %% add bias towards current node - AdjustedLoadAvg = LoadAvg * ?FUDGE_FACTOR, + %% add bias towards current node; we rely on Erlang's term order + %% of SomeFloat < local_unknown < unknown. + AdjustedLoadAvg = case LoadAvg of + unknown -> local_unknown; + _ -> LoadAvg * ?FUDGE_FACTOR + end, Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads], {_, SelectedNode} = lists:min(Loads), SelectedNode. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index d9197535..b1cc4d02 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -114,12 +114,13 @@ action(status, [], RpcTimeout) -> io:format("Status of all running nodes...~n", []), call_all_nodes( fun({Node, Pid}) -> - Status = rpc:call(Node, rabbit, status, [], RpcTimeout), + RabbitRunning = + case is_rabbit_running(Node, RpcTimeout) of + false -> not_running; + true -> running + end, io:format("Node '~p' with Pid ~p: ~p~n", - [Node, Pid, case parse_status(Status) of - false -> not_running; - true -> running - end]) + [Node, Pid, RabbitRunning]) end); action(stop_all, [], RpcTimeout) -> @@ -197,7 +198,7 @@ start_node(NodeName, NodePort, RpcTimeout) -> wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 -> false; wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> - case parse_status(rpc:call(Node, rabbit, status, [])) of + case is_rabbit_running(Node, RpcTimeout) of true -> true; false -> receive {'EXIT', Port, PosixCode} -> @@ -211,22 +212,20 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> run_cmd(FullPath) -> erlang:open_port({spawn, FullPath}, [nouse_stdio]). -parse_status({badrpc, _}) -> - false; - -parse_status(Status) -> - case lists:keysearch(running_applications, 1, Status) of - {value, {running_applications, Apps}} -> - lists:keymember(rabbit, 1, Apps); - _ -> - false +is_rabbit_running(Node, RpcTimeout) -> + case rpc:call(Node, rabbit, status, [], RpcTimeout) of + {badrpc, _} -> false; + Status -> case proplists:get_value(running_applications, Status) of + undefined -> false; + Apps -> lists:keymember(rabbit, 1, Apps) + end end. with_os(Handlers) -> {OsFamily, _} = os:type(), - case lists:keysearch(OsFamily, 1, Handlers) of - {value, {_, Handler}} -> Handler(); - false -> throw({unsupported_os, OsFamily}) + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() end. script_filename() -> diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 71278bfb..0206f73e 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -68,7 +68,7 @@ start() -> AppList end, AppVersions = [determine_version(App) || App <- AllApps], - {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions), + {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions), %% Build the overall release descriptor RDesc = {release, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 69dbc008..beb53761 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -49,7 +49,6 @@ -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). --define(CHANNEL_CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). %--------------------------------------------------------------------------- @@ -94,23 +93,19 @@ %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing, *running* -%% terminate_channel timeout -> remove 'closing' mark, *running* +%% -> log error, mark channel as closing, *running* %% handshake_timeout -> ignore, *running* %% heartbeat timeout -> *throw* %% closing: %% socket close -> *terminate* %% receive frame -> ignore, *closing* -%% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* %% channel exit with hard error %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing +%% -> log error, mark channel as closing %% if last channel to exit then send connection.close_ok, %% start terminate_connection timer, *closed* %% else *closing* @@ -123,7 +118,6 @@ %% *closed* %% receive frame -> ignore, *closed* %% terminate_connection timeout -> *terminate* -%% terminate_channel timeout -> remove 'closing' mark, *closed* %% handshake_timeout -> ignore, *closed* %% heartbeat timeout -> *throw* %% channel exit -> log error, *closed* @@ -292,8 +286,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); - {terminate_channel, Channel, Ref1} -> - mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State)); terminate_connection -> State; handshake_timeout -> @@ -341,32 +333,14 @@ close_connection(State = #v1{connection = #connection{ State#v1{connection_state = closed}. close_channel(Channel, State) -> - Ref = make_ref(), - TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT, - self(), - {terminate_channel, Channel, Ref}), - put({closing_channel, Channel}, {Ref, TRef}), - State. - -terminate_channel(Channel, Ref, State) -> - case get({closing_channel, Channel}) of - undefined -> ok; %% got close_ok in the meantime - {Ref, _} -> erase({closing_channel, Channel}), - ok; - {_Ref, _} -> ok %% got close_ok, and have new closing channel - end, + put({channel, Channel}, closing), State. handle_channel_exit(Channel, Reason, State) -> - %% We remove the channel from the inbound map only. That allows - %% the channel to be re-opened, but also means the remaining - %% cleanup, including possibly closing the connection, is deferred - %% until we get the (normal) exit signal. - erase({channel, Channel}), handle_exception(State, Channel, Reason). handle_dependent_exit(Pid, normal, State) -> - channel_cleanup(Pid), + erase({chpid, Pid}), maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of @@ -376,17 +350,10 @@ handle_dependent_exit(Pid, Reason, State) -> channel_cleanup(Pid) -> case get({chpid, Pid}) of - undefined -> - case get({closing_chpid, Pid}) of - undefined -> undefined; - {channel, Channel} -> - erase({closing_chpid, Pid}), - Channel - end; - {channel, Channel} -> - erase({channel, Channel}), - erase({chpid, Pid}), - Channel + undefined -> undefined; + {channel, Channel} -> erase({channel, Channel}), + erase({chpid, Pid}), + Channel end. all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. @@ -451,7 +418,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) State; handle_frame(Type, 0, Payload, State) -> case analyze_frame(Type, Payload) of - error -> throw({unknown_frame, Type, Payload}); + error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; trace -> State; {method, MethodName, FieldsBin} -> @@ -460,20 +427,34 @@ handle_frame(Type, 0, Payload, State) -> end; handle_frame(Type, Channel, Payload, State) -> case analyze_frame(Type, Payload) of - error -> throw({unknown_frame, Type, Payload}); + error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); trace -> throw({unexpected_trace_frame, Channel}); AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of {chpid, ChPid} -> - ok = check_for_close(Channel, ChPid, AnalyzedFrame), + case AnalyzedFrame of + {method, 'channel.close', _} -> + erase({channel, Channel}); + _ -> ok + end, ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), State; + closing -> + %% According to the spec, after sending a + %% channel.close we must ignore all frames except + %% channel.close_ok. + case AnalyzedFrame of + {method, 'channel.close_ok', _} -> + erase({channel, Channel}); + _ -> ok + end, + State; undefined -> case State#v1.connection_state of - running -> send_to_new_channel( - Channel, AnalyzedFrame, State), + running -> ok = send_to_new_channel( + Channel, AnalyzedFrame, State), State; Other -> throw({channel_frame_while_starting, Channel, Other, AnalyzedFrame}) @@ -703,7 +684,7 @@ i(channels, #v1{}) -> i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> - none; + ''; i(vhost, #v1{connection = #connection{vhost = VHost}}) -> VHost; i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> @@ -716,38 +697,17 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- send_to_new_channel(Channel, AnalyzedFrame, State) -> - case get({closing_channel, Channel}) of - undefined -> - #v1{sock = Sock, - connection = #connection{ - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), - ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/5, - [Channel, self(), WriterPid, Username, VHost]), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); - {_, TRef} -> - %% According to the spec, after sending a channel.close we - %% must ignore all frames except channel.close_ok. - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - erlang:cancel_timer(TRef), - erase({closing_channel, Channel}), - ok; - _Other -> ok - end - end. - -check_for_close(Channel, ChPid, {method, 'channel.close', _}) -> - channel_cleanup(ChPid), - put({closing_chpid, ChPid}, {channel, Channel}), - ok; -check_for_close(_Channel, _ChPid, _Frame) -> - ok. + #v1{sock = Sock, connection = #connection{ + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), + ChPid = rabbit_framing_channel:start_link( + fun rabbit_channel:start_link/5, + [Channel, self(), WriterPid, Username, VHost]), + put({channel, Channel}, {chpid, ChPid}), + put({chpid, ChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", |