diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-09-30 00:13:09 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-09-30 00:13:09 +0100 |
commit | f37f5dead8b50bf0fece5c9aaf53de7cd7a4a549 (patch) | |
tree | 55722a965432902282e05b8fcd0c2ff226a48a10 | |
parent | 6ac26dbda773395157b55b957fda15c49e61a4e8 (diff) | |
parent | fe8547dbc3bd88e3bceb30abfed1041af3aab293 (diff) | |
download | rabbitmq-server-f37f5dead8b50bf0fece5c9aaf53de7cd7a4a549.tar.gz |
merge branch heads
-rw-r--r-- | .hgignore | 1 | ||||
-rw-r--r-- | Makefile | 36 | ||||
-rw-r--r-- | docs/rabbitmq-activate-plugins.1.pod | 2 | ||||
-rw-r--r-- | docs/rabbitmq-deactivate-plugins.1.pod | 37 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | include/rabbit.hrl | 7 | ||||
-rw-r--r-- | include/rabbit_framing_spec.hrl | 2 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 1 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/rules | 2 | ||||
-rw-r--r-- | packaging/macports/net/rabbitmq-server/Portfile | 1 | ||||
-rw-r--r-- | packaging/windows/Makefile | 1 | ||||
-rwxr-xr-x | scripts/rabbitmq-deactivate-plugins | 37 | ||||
-rw-r--r-- | scripts/rabbitmq-deactivate-plugins.bat | 35 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 5 | ||||
-rw-r--r-- | src/rabbit.erl | 21 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 8 | ||||
-rw-r--r-- | src/rabbit_control.erl | 37 | ||||
-rw-r--r-- | src/rabbit_dialyzer.erl | 91 | ||||
-rw-r--r-- | src/rabbit_load.erl | 17 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 40 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 10 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 148 |
22 files changed, 392 insertions, 148 deletions
@@ -11,6 +11,7 @@ syntax: regexp ^include/rabbit_framing\.hrl$ ^src/rabbit_framing\.erl$ ^rabbit\.plt$ +^basic.plt$ ^ebin/rabbit\.(app|rel|boot|script)$ ^plugins/ ^priv/plugins/ @@ -11,13 +11,16 @@ SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) -BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) PYTHON=python +BASIC_PLT=basic.plt +RABBIT_PLT=rabbit.plt + ifndef USE_SPECS # our type specs rely on features / bug fixes in dialyzer that are # only available in R13B upwards (R13B is eshell 5.7.1) @@ -39,6 +42,8 @@ AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e +ERL_EBIN=erl -noinput -pa $(EBIN_DIR) + all: $(TARGETS) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app @@ -57,14 +62,32 @@ $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.p $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ -dialyze: $(BEAM_TARGETS) - dialyzer -c $? +dialyze: $(BEAM_TARGETS) $(BASIC_PLT) + $(ERL_EBIN) -eval \ + "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))." + +# rabbit.plt is used by rabbitmq-erlang-client's dialyze make target +create-plt: $(RABBIT_PLT) + +$(RABBIT_PLT): $(BEAM_TARGETS) $(BASIC_PLT) + cp $(BASIC_PLT) $@ + $(ERL_EBIN) -eval \ + "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:add_to_plt(\"$@\", \"$(BEAM_TARGETS)\"))." + +$(BASIC_PLT): $(BEAM_TARGETS) + if [ -f $@ ]; then \ + touch $@; \ + else \ + $(ERL_EBIN) -eval \ + "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:create_basic_plt(\"$@\"))."; \ + fi clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz + rm -f $(RABBIT_PLT) cleandb: rm -rf $(RABBITMQ_MNESIA_DIR)/* @@ -79,13 +102,14 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\ run: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ - RABBITMQ_NODE_ONLY=true \ - RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -s rabbit" \ + RABBITMQ_ALLOW_INPUT=true \ + RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ ./scripts/rabbitmq-server run-node: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ + RABBITMQ_ALLOW_INPUT=true \ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ ./scripts/rabbitmq-server @@ -170,7 +194,7 @@ install: all docs_all install_dirs cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR) chmod 0755 scripts/* - for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins; do \ + for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins rabbitmq-deactivate-plugins; do \ cp scripts/$$script $(TARGET_DIR)/sbin; \ [ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \ done diff --git a/docs/rabbitmq-activate-plugins.1.pod b/docs/rabbitmq-activate-plugins.1.pod index 58ffea79..42f0c4d2 100644 --- a/docs/rabbitmq-activate-plugins.1.pod +++ b/docs/rabbitmq-activate-plugins.1.pod @@ -26,7 +26,7 @@ execute: =head1 SEE ALSO L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>, -L<rabbitmqctl(1)> +L<rabbitmqctl(1)>, L<rabbitmq-deactivate-plugins(1)> =head1 AUTHOR diff --git a/docs/rabbitmq-deactivate-plugins.1.pod b/docs/rabbitmq-deactivate-plugins.1.pod new file mode 100644 index 00000000..eb4fbb90 --- /dev/null +++ b/docs/rabbitmq-deactivate-plugins.1.pod @@ -0,0 +1,37 @@ +=head1 NAME + +rabbitmq-deactivate-plugins - command line tool for deactivating plugins +in a RabbitMQ broker + +=head1 SYNOPSIS + +rabbitmq-deactivate-plugins + +=head1 DESCRIPTION + +RabbitMQ is an implementation of AMQP, the emerging standard for high +performance enterprise messaging. The RabbitMQ server is a robust and +scalable implementation of an AMQP broker. + +rabbitmq-deactivate-plugins is a command line tool for deactivating +plugins installed into the broker. + +=head1 EXAMPLES + +To deactivate all of the installed plugins in the current RabbitMQ install, +execute: + + rabbitmq-deactivate-plugins + +=head1 SEE ALSO + +L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>, +L<rabbitmqctl(1)>, L<rabbitmq-activate-plugins(1)> + +=head1 AUTHOR + +The RabbitMQ Team <info@rabbitmq.com> + +=head1 REFERENCES + +RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 6fc6e464..dd907d1a 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -17,7 +17,6 @@ {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, {ssl_listeners, []}, {ssl_options, []}, - {extra_startup_steps, []}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_vhost, <<"/">>}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index d1a2f3bd..c95ce738 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -67,6 +67,8 @@ -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). +-record(amqp_error, {name, explanation, method = none}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -154,7 +156,10 @@ port :: non_neg_integer()}). -type(not_found() :: {'error', 'not_found'}). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). - +-type(amqp_error() :: + #amqp_error{name :: atom(), + explanation :: string(), + method :: atom()}). -endif. %%---------------------------------------------------------------------------- diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl index f45fa6ca..a78c2301 100644 --- a/include/rabbit_framing_spec.hrl +++ b/include/rabbit_framing_spec.hrl @@ -50,8 +50,6 @@ %% TODO: make this more precise -type(amqp_method_name() :: atom()). -type(channel_number() :: non_neg_integer()). -%% TODO: make this more precise --type(amqp_error() :: {bool(), non_neg_integer(), binary()}). -type(resource_name() :: binary()). -type(routing_key() :: binary()). -type(username() :: binary()). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 7f442831..30cfb99f 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -56,6 +56,7 @@ install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins +install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 365eea6e..5e357955 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -17,6 +17,6 @@ install/rabbitmq-server:: for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \ install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done - for script in rabbitmq-activate-plugins; do \ + for script in rabbitmq-activate-plugins rabbitmq-deactivate-plugins; do \ install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile index 1826d5c4..cf1a3a03 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/net/rabbitmq-server/Portfile @@ -94,6 +94,7 @@ post-destroot { ${wrappersbin}/rabbitmq-activate-plugins file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl + file copy ${wrappersbin}/rabbitmq-activate-plugins ${wrappersbin}/rabbitmq-deactivate-plugins } pre-install { diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index 387becb3..f17fe777 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -14,6 +14,7 @@ dist: mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-activate-plugins.bat $(SOURCE_DIR)/sbin + mv $(SOURCE_DIR)/scripts/rabbitmq-deactivate-plugins.bat $(SOURCE_DIR)/sbin rm -rf $(SOURCE_DIR)/scripts rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile rm -f $(SOURCE_DIR)/README diff --git a/scripts/rabbitmq-deactivate-plugins b/scripts/rabbitmq-deactivate-plugins new file mode 100755 index 00000000..771c4734 --- /dev/null +++ b/scripts/rabbitmq-deactivate-plugins @@ -0,0 +1,37 @@ +#!/bin/sh +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +. `dirname $0`/rabbitmq-env + +RABBITMQ_EBIN=${RABBITMQ_HOME}/ebin + +rm -f ${RABBITMQ_EBIN}/rabbit.rel ${RABBITMQ_EBIN}/rabbit.script ${RABBITMQ_EBIN}/rabbit.boot diff --git a/scripts/rabbitmq-deactivate-plugins.bat b/scripts/rabbitmq-deactivate-plugins.bat new file mode 100644 index 00000000..190fdef7 --- /dev/null +++ b/scripts/rabbitmq-deactivate-plugins.bat @@ -0,0 +1,35 @@ +@echo off
+REM The contents of this file are subject to the Mozilla Public License
+REM Version 1.1 (the "License"); you may not use this file except in
+REM compliance with the License. You may obtain a copy of the License at
+REM http://www.mozilla.org/MPL/
+REM
+REM Software distributed under the License is distributed on an "AS IS"
+REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+REM License for the specific language governing rights and limitations
+REM under the License.
+REM
+REM The Original Code is RabbitMQ.
+REM
+REM The Initial Developers of the Original Code are LShift Ltd,
+REM Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+REM Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Ltd. Portions created by Cohesive Financial Technologies LLC are
+REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
+REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM
+REM All Rights Reserved.
+REM
+REM Contributor(s): ______________________________________.
+REM
+
+set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+
+del /f %RABBITMQ_EBIN_DIR%\rabbit.rel %RABBITMQ_EBIN_DIR%\rabbit.script %RABBITMQ_EBIN_DIR%\rabbit.boot
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 547220b4..e5317bb1 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -73,10 +73,11 @@ else fi RABBITMQ_START_RABBIT= -[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit' +[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput' +[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="${RABBITMQ_START_RABBIT} -s rabbit" RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" -if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then +if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ] && [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit" RABBITMQ_EBIN_PATH="" else diff --git a/src/rabbit.erl b/src/rabbit.erl index ef1e0049..b098fd5a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -215,6 +215,16 @@ log_location(Type) -> _ -> undefined end. +app_location() -> + {ok, Application} = application:get_application(), + filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")). + +home_dir() -> + case init:get_argument(home) of + {ok, [[Home]]} -> Home; + Other -> Other + end. + %--------------------------------------------------------------------------- print_banner() -> @@ -237,10 +247,13 @@ print_banner() -> [Product, string:right([$v|Version], ProductLen), ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), - Settings = [{"node", node()}, - {"log", log_location(kernel)}, - {"sasl log", log_location(sasl)}, - {"database dir", rabbit_mnesia:dir()}], + Settings = [{"node", node()}, + {"app descriptor", app_location()}, + {"home dir", home_dir()}, + {"cookie hash", rabbit_misc:cookie_hash()}, + {"log", log_location(kernel)}, + {"sasl log", log_location(sasl)}, + {"database dir", rabbit_mnesia:dir()}], DescrLen = lists:max([length(K) || {K, _V} <- Settings]), Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1285064f..a1fa1066 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -125,11 +125,11 @@ handle_cast({method, Method, Content}, State) -> stop -> {stop, normal, State#ch{state = terminating}} catch - exit:{amqp, Error, Explanation, none} -> + exit:Reason = #amqp_error{} -> ok = rollback_and_notify(State), - Reason = {amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, - State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + MethodName = rabbit_misc:method_record_type(Method), + State#ch.reader_pid ! {channel_exit, State#ch.channel, + Reason#amqp_error{method = MethodName}}, {stop, normal, State#ch{state = terminating}}; exit:normal -> {stop, normal, State}; diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 69e91803..a15c7fd8 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -80,13 +80,38 @@ start() -> {error, Reason} -> error("~p", [Reason]), halt(2); + {badrpc, Reason} -> + error("unable to connect to node ~w: ~w", [Node, Reason]), + print_badrpc_diagnostics(Node), + halt(2); Other -> error("~p", [Other]), halt(2) end. -error(Format, Args) -> - rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). +fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). + +error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). + +print_badrpc_diagnostics(Node) -> + fmt_stderr("diagnostics:", []), + NodeHost = rabbit_misc:nodehost(Node), + case net_adm:names(NodeHost) of + {error, EpmdReason} -> + fmt_stderr("- unable to connect to epmd on ~s: ~w", + [NodeHost, EpmdReason]); + {ok, NamePorts} -> + fmt_stderr("- nodes and their ports on ~s: ~p", + [NodeHost, [{list_to_atom(Name), Port} || + {Name, Port} <- NamePorts]]) + end, + fmt_stderr("- current node: ~w", [node()]), + case init:get_argument(home) of + {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]); + Other -> fmt_stderr("- no current node home dir: ~p", [Other]) + end, + fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]), + ok. parse_args(["-n", NodeS | Args], Params) -> Node = case lists:member($@, NodeS) of @@ -197,9 +222,11 @@ action(cluster, Node, ClusterNodeSs, Inform) -> action(status, Node, [], Inform) -> Inform("Status of node ~p", [Node]), - Res = call(Node, {rabbit, status, []}), - io:format("~p~n", [Res]), - ok; + case call(Node, {rabbit, status, []}) of + {badrpc, _} = Res -> Res; + Res -> io:format("~p~n", [Res]), + ok + end; action(rotate_logs, Node, [], Inform) -> Inform("Reopening logs for node ~p", [Node]), diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl new file mode 100644 index 00000000..23e6fc44 --- /dev/null +++ b/src/rabbit_dialyzer.erl @@ -0,0 +1,91 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_dialyzer). +-include("rabbit.hrl"). + +-export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, halt_with_code/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(create_basic_plt/1 :: (string()) -> 'ok'). +-spec(add_to_plt/2 :: (string(), string()) -> 'ok'). +-spec(dialyze_files/2 :: (string(), string()) -> 'ok'). +-spec(halt_with_code/1 :: (atom()) -> no_return()). + +-endif. + +%%---------------------------------------------------------------------------- + +create_basic_plt(BasicPltPath) -> + OptsRecord = dialyzer_options:build( + [{analysis_type, plt_build}, + {output_plt, BasicPltPath}, + {files_rec, otp_apps_dependencies_paths()}]), + dialyzer_cl:start(OptsRecord), + ok. + +add_to_plt(PltPath, FilesString) -> + {ok, Files} = regexp:split(FilesString, " "), + DialyzerWarnings = dialyzer:run([{analysis_type, plt_add}, + {init_plt, PltPath}, + {output_plt, PltPath}, + {files, Files}]), + print_warnings(DialyzerWarnings), + ok. + +dialyze_files(PltPath, ModifiedFiles) -> + {ok, Files} = regexp:split(ModifiedFiles, " "), + DialyzerWarnings = dialyzer:run([{init_plt, PltPath}, + {files, Files}]), + case DialyzerWarnings of + [] -> io:format("~nOk~n"), + ok; + _ -> io:format("~nFAILED with the following warnings:~n"), + print_warnings(DialyzerWarnings), + fail + end. + +print_warnings(Warnings) -> + [io:format("~s", [dialyzer:format_warning(W)]) || W <- Warnings], + io:format("~n"), + ok. + +otp_apps_dependencies_paths() -> + [code:lib_dir(App, ebin) || + App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]]. + +halt_with_code(ok) -> + halt(); +halt_with_code(fail) -> + halt(1). diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl index 7bf85347..6ef638cb 100644 --- a/src/rabbit_load.erl +++ b/src/rabbit_load.erl @@ -41,7 +41,7 @@ -ifdef(use_specs). -type(erlang_node() :: atom()). --type(load() :: {{non_neg_integer(), float()}, erlang_node()}). +-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}). -spec(local_load/0 :: () -> load()). -spec(remote_loads/0 :: () -> [load()]). -spec(pick/0 :: () -> erlang_node()). @@ -52,8 +52,11 @@ local_load() -> LoadAvg = case whereis(cpu_sup) of - undefined -> 0.0; - _Other -> cpu_sup:avg1() + undefined -> unknown; + _ -> case cpu_sup:avg1() of + L when is_integer(L) -> L; + {error, timeout} -> unknown + end end, {{statistics(run_queue), LoadAvg}, node()}. @@ -65,8 +68,12 @@ remote_loads() -> pick() -> RemoteLoads = remote_loads(), {{RunQ, LoadAvg}, Node} = local_load(), - %% add bias towards current node - AdjustedLoadAvg = LoadAvg * ?FUDGE_FACTOR, + %% add bias towards current node; we rely on Erlang's term order + %% of SomeFloat < local_unknown < unknown. + AdjustedLoadAvg = case LoadAvg of + unknown -> local_unknown; + _ -> LoadAvg * ?FUDGE_FACTOR + end, Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads], {_, SelectedNode} = lists:min(Loads), SelectedNode. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 95a274e3..b20e9a86 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -35,7 +35,8 @@ -include_lib("kernel/include/file.hrl"). -export([method_record_type/1, polite_pause/0, polite_pause/1]). --export([die/1, frame_error/2, protocol_error/3, protocol_error/4]). +-export([die/1, frame_error/2, amqp_error/4, + protocol_error/3, protocol_error/4]). -export([not_found/1]). -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). @@ -46,7 +47,7 @@ -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). --export([localnode/1, tcp_name/3]). +-export([localnode/1, nodehost/1, cookie_hash/0, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). -export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). @@ -74,10 +75,9 @@ -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). -spec(die/1 :: (atom()) -> no_return()). -spec(frame_error/2 :: (atom(), binary()) -> no_return()). --spec(protocol_error/3 :: - (atom() | amqp_error(), string(), [any()]) -> no_return()). --spec(protocol_error/4 :: - (atom() | amqp_error(), string(), [any()], atom()) -> no_return()). +-spec(amqp_error/4 :: (atom(), string(), [any()], atom()) -> amqp_error()). +-spec(protocol_error/3 :: (atom(), string(), [any()]) -> no_return()). +-spec(protocol_error/4 :: (atom(), string(), [any()], atom()) -> no_return()). -spec(not_found/1 :: (r(atom())) -> no_return()). -spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()). -spec(get_config/2 :: (atom(), A) -> A). @@ -106,6 +106,8 @@ -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> erlang_node()). +-spec(nodehost/1 :: (erlang_node()) -> string()). +-spec(cookie_hash/0 :: () -> string()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). @@ -144,15 +146,17 @@ die(Error) -> protocol_error(Error, "~w", [Error]). frame_error(MethodName, BinaryFields) -> - protocol_error(frame_error, "cannot decode ~w", - [BinaryFields], MethodName). + protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName). -protocol_error(Error, Explanation, Params) -> - protocol_error(Error, Explanation, Params, none). +amqp_error(Name, ExplanationFormat, Params, Method) -> + Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)), + #amqp_error{name = Name, explanation = Explanation, method = Method}. -protocol_error(Error, Explanation, Params, Method) -> - CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)), - exit({amqp, Error, CompleteExplanation, Method}). +protocol_error(Name, ExplanationFormat, Params) -> + protocol_error(Name, ExplanationFormat, Params, none). + +protocol_error(Name, ExplanationFormat, Params, Method) -> + exit(amqp_error(Name, ExplanationFormat, Params, Method)). not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). @@ -305,11 +309,15 @@ ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). localnode(Name) -> + list_to_atom(lists:append([atom_to_list(Name), "@", nodehost(node())])). + +nodehost(Node) -> %% This is horrible, but there doesn't seem to be a way to split a %% nodename into its constituent parts. - list_to_atom(lists:append(atom_to_list(Name), - lists:dropwhile(fun (E) -> E =/= $@ end, - atom_to_list(node())))). + tl(lists:dropwhile(fun (E) -> E =/= $@ end, atom_to_list(Node))). + +cookie_hash() -> + ssl_base64:encode(erlang:md5(atom_to_list(erlang:get_cookie()))). tcp_name(Prefix, IPAddress, Port) when is_atom(Prefix) andalso is_number(Port) -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index eed21a01..4bbdb65e 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -39,8 +39,8 @@ %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). --export([tcp_listener_started/2, ssl_connection_upgrade/2, - tcp_listener_stopped/2, start_client/1]). +-export([tcp_listener_started/2, tcp_listener_stopped/2, + start_client/1, start_ssl_client/2]). -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). @@ -101,7 +101,7 @@ check_tcp_listener_address(NamePrefix, Host, Port) -> if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), - throw({error, invalid_port, Port}) + throw({error, {invalid_port, Port}}) end, Name = rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), {IPAddress, Name}. @@ -112,7 +112,7 @@ start_tcp_listener(Host, Port) -> start_ssl_listener(Host, Port, SslOpts) -> start_listener(Host, Port, "SSL Listener", - {?MODULE, ssl_connection_upgrade, [SslOpts]}). + {?MODULE, start_ssl_client, [SslOpts]}). start_listener(Host, Port, Label, OnConnect) -> {IPAddress, Name} = @@ -166,7 +166,7 @@ start_client(Sock) -> Child ! {go, Sock}, Child. -ssl_connection_upgrade(SslOpts, Sock) -> +start_ssl_client(SslOpts, Sock) -> {ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock), PeerIp = inet_parse:ntoa(PeerAddress), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 761794f1..5cc98992 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -49,7 +49,6 @@ -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). --define(CHANNEL_CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). %--------------------------------------------------------------------------- @@ -94,23 +93,19 @@ %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing, *running* -%% terminate_channel timeout -> remove 'closing' mark, *running* +%% -> log error, mark channel as closing, *running* %% handshake_timeout -> ignore, *running* %% heartbeat timeout -> *throw* %% closing: %% socket close -> *terminate* %% receive frame -> ignore, *closing* -%% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* %% channel exit with hard error %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing +%% -> log error, mark channel as closing %% if last channel to exit then send connection.close_ok, %% start terminate_connection timer, *closed* %% else *closing* @@ -123,7 +118,6 @@ %% *closed* %% receive frame -> ignore, *closed* %% terminate_connection timeout -> *terminate* -%% terminate_channel timeout -> remove 'closing' mark, *closed* %% handshake_timeout -> ignore, *closed* %% heartbeat timeout -> *throw* %% channel exit -> log error, *closed* @@ -269,12 +263,10 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> throw({inet_error, Reason}); {'EXIT', Parent, Reason} -> if State#v1.connection_state =:= running -> - send_exception( - State, 0, - {amqp, connection_forced, - io_lib:format( - "broker forced connection closure with reason '~w'", - [Reason]), none}); + send_exception(State, 0, + rabbit_misc:amqp_error(connection_forced, + "broker forced connection closure with reason '~w'", + [Reason], none)); true -> ok end, %% this is what we are expected to do according to @@ -292,8 +284,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); - {terminate_channel, Channel, Ref1} -> - mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State)); terminate_connection -> State; handshake_timeout -> @@ -341,32 +331,14 @@ close_connection(State = #v1{connection = #connection{ State#v1{connection_state = closed}. close_channel(Channel, State) -> - Ref = make_ref(), - TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT, - self(), - {terminate_channel, Channel, Ref}), - put({closing_channel, Channel}, {Ref, TRef}), - State. - -terminate_channel(Channel, Ref, State) -> - case get({closing_channel, Channel}) of - undefined -> ok; %% got close_ok in the meantime - {Ref, _} -> erase({closing_channel, Channel}), - ok; - {_Ref, _} -> ok %% got close_ok, and have new closing channel - end, + put({channel, Channel}, closing), State. handle_channel_exit(Channel, Reason, State) -> - %% We remove the channel from the inbound map only. That allows - %% the channel to be re-opened, but also means the remaining - %% cleanup, including possibly closing the connection, is deferred - %% until we get the (normal) exit signal. - erase({channel, Channel}), handle_exception(State, Channel, Reason). handle_dependent_exit(Pid, normal, State) -> - channel_cleanup(Pid), + erase({chpid, Pid}), maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of @@ -376,17 +348,10 @@ handle_dependent_exit(Pid, Reason, State) -> channel_cleanup(Pid) -> case get({chpid, Pid}) of - undefined -> - case get({closing_chpid, Pid}) of - undefined -> undefined; - {channel, Channel} -> - erase({closing_chpid, Pid}), - Channel - end; - {channel, Channel} -> - erase({channel, Channel}), - erase({chpid, Pid}), - Channel + undefined -> undefined; + {channel, Channel} -> erase({channel, Channel}), + erase({chpid, Pid}), + Channel end. all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. @@ -467,13 +432,27 @@ handle_frame(Type, Channel, Payload, State) -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of {chpid, ChPid} -> - ok = check_for_close(Channel, ChPid, AnalyzedFrame), + case AnalyzedFrame of + {method, 'channel.close', _} -> + erase({channel, Channel}); + _ -> ok + end, ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), State; + closing -> + %% According to the spec, after sending a + %% channel.close we must ignore all frames except + %% channel.close_ok. + case AnalyzedFrame of + {method, 'channel.close_ok', _} -> + erase({channel, Channel}); + _ -> ok + end, + State; undefined -> case State#v1.connection_state of - running -> send_to_new_channel( - Channel, AnalyzedFrame, State), + running -> ok = send_to_new_channel( + Channel, AnalyzedFrame, State), State; Other -> throw({channel_frame_while_starting, Channel, Other, AnalyzedFrame}) @@ -567,17 +546,17 @@ handle_method0(MethodName, FieldsBin, State) -> MethodName, FieldsBin), State) catch exit:Reason -> - CompleteReason = - case Reason of - {amqp, Error, Explanation, none} -> - {amqp, Error, Explanation, MethodName}; - OtherReason -> OtherReason - end, + CompleteReason = case Reason of + #amqp_error{method = none} -> + Reason#amqp_error{method = MethodName}; + OtherReason -> OtherReason + end, case State#v1.connection_state of running -> send_exception(State, 0, CompleteReason); Other -> throw({channel0_error, Other, CompleteReason}) end end. + handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response}, State = #v1{connection_state = starting, @@ -716,38 +695,17 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- send_to_new_channel(Channel, AnalyzedFrame, State) -> - case get({closing_channel, Channel}) of - undefined -> - #v1{sock = Sock, - connection = #connection{ - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), - ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/5, - [Channel, self(), WriterPid, Username, VHost]), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); - {_, TRef} -> - %% According to the spec, after sending a channel.close we - %% must ignore all frames except channel.close_ok. - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - erlang:cancel_timer(TRef), - erase({closing_channel, Channel}), - ok; - _Other -> ok - end - end. - -check_for_close(Channel, ChPid, {method, 'channel.close', _}) -> - channel_cleanup(ChPid), - put({closing_chpid, ChPid}, {channel, Channel}), - ok; -check_for_close(_Channel, _ChPid, _Frame) -> - ok. + #v1{sock = Sock, connection = #connection{ + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), + ChPid = rabbit_framing_channel:start_link( + fun rabbit_channel:start_link/5, + [Channel, self(), WriterPid, Username, VHost]), + put({channel, Channel}, {chpid, ChPid}), + put({chpid, ChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", @@ -793,18 +751,18 @@ map_exception(Channel, Reason) -> end, {ShouldClose, CloseChannel, CloseMethod}. -lookup_amqp_exception({amqp, {ShouldClose, Code, Text}, Expl, Method}) -> +lookup_amqp_exception( + #amqp_error{name = Name, explanation = Expl, method = Method}) -> + {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), ExplBin = list_to_binary(Expl), CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, SafeTextBin = if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; - true -> - CompleteTextBin + true -> CompleteTextBin end, {ShouldClose, Code, SafeTextBin, Method}; -lookup_amqp_exception({amqp, ErrorName, Expl, Method}) -> - Details = rabbit_framing:lookup_amqp_exception(ErrorName), - lookup_amqp_exception({amqp, Details, Expl, Method}); lookup_amqp_exception(Other) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {true, ?INTERNAL_ERROR, <<"INTERNAL_ERROR">>, none}. + {ShouldClose, Code, Text} = + rabbit_framing:lookup_amqp_exception(internal_error), + {ShouldClose, Code, Text, none}. |