summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarek Majkowski <majek@lshift.net>2009-10-02 15:04:47 +0100
committerMarek Majkowski <majek@lshift.net>2009-10-02 15:04:47 +0100
commit1c81db1791240f72a9f4c3143878651ff6bc2d72 (patch)
tree24344f7f19c3ae37307450d396d01666bf2ae61d
parenta20152bb0988bf185af9159c353977e493170488 (diff)
parentfafff323132651b9a23611f00ed75e839f45494e (diff)
downloadrabbitmq-server-1c81db1791240f72a9f4c3143878651ff6bc2d72.tar.gz
Merged bug 21294 into default.
-rw-r--r--.hgignore1
-rw-r--r--Makefile53
-rw-r--r--docs/rabbitmq-activate-plugins.1.pod2
-rw-r--r--docs/rabbitmq-deactivate-plugins.1.pod37
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl21
-rw-r--r--include/rabbit_framing_spec.hrl2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec1
-rw-r--r--packaging/common/rabbitmq-asroot-script-wrapper2
-rw-r--r--packaging/common/rabbitmq-script-wrapper2
-rw-r--r--packaging/debs/Debian/debian/rules2
-rw-r--r--packaging/macports/net/rabbitmq-server/Portfile28
-rw-r--r--packaging/windows/Makefile1
-rwxr-xr-xscripts/rabbitmq-deactivate-plugins37
-rw-r--r--scripts/rabbitmq-deactivate-plugins.bat35
-rwxr-xr-xscripts/rabbitmq-server5
-rwxr-xr-xscripts/rabbitmqctl5
-rwxr-xr-xscripts/rabbitmqctl.bat6
-rw-r--r--src/priority_queue.erl4
-rw-r--r--src/rabbit.erl33
-rw-r--r--src/rabbit_alarm.erl2
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_basic.erl5
-rw-r--r--src/rabbit_channel.erl30
-rw-r--r--src/rabbit_control.erl41
-rw-r--r--src/rabbit_dialyzer.erl91
-rw-r--r--src/rabbit_exchange.erl8
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_load.erl17
-rw-r--r--src/rabbit_misc.erl40
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_networking.erl40
-rw-r--r--src/rabbit_plugin_activator.erl74
-rw-r--r--src/rabbit_reader.erl169
-rw-r--r--src/tcp_acceptor.erl27
35 files changed, 579 insertions, 259 deletions
diff --git a/.hgignore b/.hgignore
index d77d4e5d..ccd0b09f 100644
--- a/.hgignore
+++ b/.hgignore
@@ -11,6 +11,7 @@ syntax: regexp
^include/rabbit_framing\.hrl$
^src/rabbit_framing\.erl$
^rabbit\.plt$
+^basic.plt$
^ebin/rabbit\.(app|rel|boot|script)$
^plugins/
^priv/plugins/
diff --git a/Makefile b/Makefile
index a94406a3..ad0316fc 100644
--- a/Makefile
+++ b/Makefile
@@ -1,29 +1,31 @@
-ifndef TMPDIR
-TMPDIR := /tmp
-endif
-RABBITMQ_NODENAME=rabbit
-RABBITMQ_SERVER_START_ARGS=
-RABBITMQ_MNESIA_DIR=$(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
-RABBITMQ_LOG_BASE=$(TMPDIR)
+TMPDIR ?= /tmp
+
+RABBITMQ_NODENAME ?= rabbit
+RABBITMQ_SERVER_START_ARGS ?=
+RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
+RABBITMQ_LOG_BASE ?= $(TMPDIR)
SOURCE_DIR=src
EBIN_DIR=ebin
INCLUDE_DIR=include
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl)
-BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
+BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS)
WEB_URL=http://stage.rabbitmq.com/
MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod))
PYTHON=python
+BASIC_PLT=basic.plt
+RABBIT_PLT=rabbit.plt
+
ifndef USE_SPECS
# our type specs rely on features / bug fixes in dialyzer that are
-# only available in R13B upwards (R13B is eshell 5.7.1)
+# only available in R13B01 upwards (R13B01 is eshell 5.7.2)
#
# NB: the test assumes that version number will only contain single digits
-USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.0" ]; then echo "true"; else echo "false"; fi)
+USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.1" ]; then echo "true"; else echo "false"; fi)
endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
@@ -39,6 +41,8 @@ AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
+ERL_EBIN=erl -noinput -pa $(EBIN_DIR)
+
all: $(TARGETS)
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
@@ -57,14 +61,32 @@ $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.p
$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
$(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@
-dialyze: $(BEAM_TARGETS)
- dialyzer -c $?
+dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
+ $(ERL_EBIN) -eval \
+ "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))."
+
+# rabbit.plt is used by rabbitmq-erlang-client's dialyze make target
+create-plt: $(RABBIT_PLT)
+
+$(RABBIT_PLT): $(BEAM_TARGETS) $(BASIC_PLT)
+ cp $(BASIC_PLT) $@
+ $(ERL_EBIN) -eval \
+ "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:add_to_plt(\"$@\", \"$(BEAM_TARGETS)\"))."
+
+$(BASIC_PLT): $(BEAM_TARGETS)
+ if [ -f $@ ]; then \
+ touch $@; \
+ else \
+ $(ERL_EBIN) -eval \
+ "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:create_basic_plt(\"$@\"))."; \
+ fi
clean:
rm -f $(EBIN_DIR)/*.beam
rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel
rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
rm -f docs/*.[0-9].gz
+ rm -f $(RABBIT_PLT)
cleandb:
rm -rf $(RABBITMQ_MNESIA_DIR)/*
@@ -79,13 +101,14 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\
run: all
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
- RABBITMQ_NODE_ONLY=true \
- RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -s rabbit" \
+ RABBITMQ_ALLOW_INPUT=true \
+ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \
./scripts/rabbitmq-server
run-node: all
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
+ RABBITMQ_ALLOW_INPUT=true \
RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \
./scripts/rabbitmq-server
@@ -170,7 +193,7 @@ install: all docs_all install_dirs
cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR)
chmod 0755 scripts/*
- for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins; do \
+ for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins rabbitmq-deactivate-plugins; do \
cp scripts/$$script $(TARGET_DIR)/sbin; \
[ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \
done
diff --git a/docs/rabbitmq-activate-plugins.1.pod b/docs/rabbitmq-activate-plugins.1.pod
index 58ffea79..42f0c4d2 100644
--- a/docs/rabbitmq-activate-plugins.1.pod
+++ b/docs/rabbitmq-activate-plugins.1.pod
@@ -26,7 +26,7 @@ execute:
=head1 SEE ALSO
L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>,
-L<rabbitmqctl(1)>
+L<rabbitmqctl(1)>, L<rabbitmq-deactivate-plugins(1)>
=head1 AUTHOR
diff --git a/docs/rabbitmq-deactivate-plugins.1.pod b/docs/rabbitmq-deactivate-plugins.1.pod
new file mode 100644
index 00000000..eb4fbb90
--- /dev/null
+++ b/docs/rabbitmq-deactivate-plugins.1.pod
@@ -0,0 +1,37 @@
+=head1 NAME
+
+rabbitmq-deactivate-plugins - command line tool for deactivating plugins
+in a RabbitMQ broker
+
+=head1 SYNOPSIS
+
+rabbitmq-deactivate-plugins
+
+=head1 DESCRIPTION
+
+RabbitMQ is an implementation of AMQP, the emerging standard for high
+performance enterprise messaging. The RabbitMQ server is a robust and
+scalable implementation of an AMQP broker.
+
+rabbitmq-deactivate-plugins is a command line tool for deactivating
+plugins installed into the broker.
+
+=head1 EXAMPLES
+
+To deactivate all of the installed plugins in the current RabbitMQ install,
+execute:
+
+ rabbitmq-deactivate-plugins
+
+=head1 SEE ALSO
+
+L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>,
+L<rabbitmqctl(1)>, L<rabbitmq-activate-plugins(1)>
+
+=head1 AUTHOR
+
+The RabbitMQ Team <info@rabbitmq.com>
+
+=head1 REFERENCES
+
+RabbitMQ Web Site: L<http://www.rabbitmq.com>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 6fc6e464..dd907d1a 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -17,7 +17,6 @@
{env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
{ssl_listeners, []},
{ssl_options, []},
- {extra_startup_steps, []},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_vhost, <<"/">>},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index d1a2f3bd..5703d0d6 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -67,6 +67,8 @@
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
+-record(amqp_error, {name, explanation, method = none}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -101,15 +103,15 @@
read :: regexp()}).
-type(amqqueue() ::
#amqqueue{name :: queue_name(),
- durable :: bool(),
- auto_delete :: bool(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
arguments :: amqp_table(),
pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),
type :: exchange_type(),
- durable :: bool(),
- auto_delete :: bool(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
arguments :: amqp_table()}).
-type(binding() ::
#binding{exchange_name :: exchange_name(),
@@ -139,14 +141,14 @@
persistent_key :: maybe(pkey())}).
-type(message() :: basic_message()).
-type(delivery() ::
- #delivery{mandatory :: bool(),
- immediate :: bool(),
+ #delivery{mandatory :: boolean(),
+ immediate :: boolean(),
txn :: maybe(txn()),
sender :: pid(),
message :: message()}).
%% this really should be an abstract type
-type(msg_id() :: non_neg_integer()).
--type(msg() :: {queue_name(), pid(), msg_id(), bool(), message()}).
+-type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
-type(listener() ::
#listener{node :: erlang_node(),
protocol :: atom(),
@@ -154,7 +156,10 @@
port :: non_neg_integer()}).
-type(not_found() :: {'error', 'not_found'}).
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-
+-type(amqp_error() ::
+ #amqp_error{name :: atom(),
+ explanation :: string(),
+ method :: atom()}).
-endif.
%%----------------------------------------------------------------------------
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
index f45fa6ca..a78c2301 100644
--- a/include/rabbit_framing_spec.hrl
+++ b/include/rabbit_framing_spec.hrl
@@ -50,8 +50,6 @@
%% TODO: make this more precise
-type(amqp_method_name() :: atom()).
-type(channel_number() :: non_neg_integer()).
-%% TODO: make this more precise
--type(amqp_error() :: {bool(), non_neg_integer(), binary()}).
-type(resource_name() :: binary()).
-type(routing_key() :: binary()).
-type(username() :: binary()).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 7f442831..30cfb99f 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -56,6 +56,7 @@ install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi
install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins
+install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper
index 0dd1c0fb..9ef59ad7 100644
--- a/packaging/common/rabbitmq-asroot-script-wrapper
+++ b/packaging/common/rabbitmq-asroot-script-wrapper
@@ -33,7 +33,7 @@
# Escape spaces and quotes, because shell is revolting.
for arg in "$@" ; do
# Escape quotes in parameters, so that they're passed through cleanly.
- arg=$(sed -e 's/"/\\"/' <<-END
+ arg=$(sed -e 's/"/\\"/g' <<-END
$arg
END
)
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 94d72f16..0c4bd0a8 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -33,7 +33,7 @@
# Escape spaces and quotes, because shell is revolting.
for arg in "$@" ; do
# Escape quotes in parameters, so that they're passed through cleanly.
- arg=$(sed -e 's/"/\\"/' <<-END
+ arg=$(sed -e 's/"/\\"/g' <<-END
$arg
END
)
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index 365eea6e..5e357955 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -17,6 +17,6 @@ install/rabbitmq-server::
for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \
install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
- for script in rabbitmq-activate-plugins; do \
+ for script in rabbitmq-activate-plugins rabbitmq-deactivate-plugins; do \
install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile
index 1826d5c4..6b51fb2f 100644
--- a/packaging/macports/net/rabbitmq-server/Portfile
+++ b/packaging/macports/net/rabbitmq-server/Portfile
@@ -34,6 +34,7 @@ set mnesiadbdir ${prefix}/var/lib/rabbitmq/mnesia
set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server
set sbindir ${destroot}${prefix}/lib/rabbitmq/bin
set wrappersbin ${destroot}${prefix}/sbin
+set realsbin ${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version}/sbin
use_configure no
@@ -61,23 +62,23 @@ post-destroot {
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
- ${sbindir}/rabbitmq-env
+ ${realsbin}/rabbitmq-env
reinplace -E "s:(CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \
- ${sbindir}/rabbitmq-multi \
- ${sbindir}/rabbitmq-server \
- ${sbindir}/rabbitmqctl
+ ${realsbin}/rabbitmq-multi \
+ ${realsbin}/rabbitmq-server \
+ ${realsbin}/rabbitmqctl
reinplace -E "s:(LOG_BASE)=/:\\1=${prefix}/:" \
- ${sbindir}/rabbitmq-multi \
- ${sbindir}/rabbitmq-server \
- ${sbindir}/rabbitmqctl
+ ${realsbin}/rabbitmq-multi \
+ ${realsbin}/rabbitmq-server \
+ ${realsbin}/rabbitmqctl
reinplace -E "s:(MNESIA_BASE)=/:\\1=${prefix}/:" \
- ${sbindir}/rabbitmq-multi \
- ${sbindir}/rabbitmq-server \
- ${sbindir}/rabbitmqctl
+ ${realsbin}/rabbitmq-multi \
+ ${realsbin}/rabbitmq-server \
+ ${realsbin}/rabbitmqctl
reinplace -E "s:(PIDS_FILE)=/:\\1=${prefix}/:" \
- ${sbindir}/rabbitmq-multi \
- ${sbindir}/rabbitmq-server \
- ${sbindir}/rabbitmqctl
+ ${realsbin}/rabbitmq-multi \
+ ${realsbin}/rabbitmq-server \
+ ${realsbin}/rabbitmqctl
xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
${wrappersbin}/rabbitmq-multi
@@ -94,6 +95,7 @@ post-destroot {
${wrappersbin}/rabbitmq-activate-plugins
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl
+ file copy ${wrappersbin}/rabbitmq-activate-plugins ${wrappersbin}/rabbitmq-deactivate-plugins
}
pre-install {
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index 387becb3..f17fe777 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -14,6 +14,7 @@ dist:
mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-activate-plugins.bat $(SOURCE_DIR)/sbin
+ mv $(SOURCE_DIR)/scripts/rabbitmq-deactivate-plugins.bat $(SOURCE_DIR)/sbin
rm -rf $(SOURCE_DIR)/scripts
rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile
rm -f $(SOURCE_DIR)/README
diff --git a/scripts/rabbitmq-deactivate-plugins b/scripts/rabbitmq-deactivate-plugins
new file mode 100755
index 00000000..771c4734
--- /dev/null
+++ b/scripts/rabbitmq-deactivate-plugins
@@ -0,0 +1,37 @@
+#!/bin/sh
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2009 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
+. `dirname $0`/rabbitmq-env
+
+RABBITMQ_EBIN=${RABBITMQ_HOME}/ebin
+
+rm -f ${RABBITMQ_EBIN}/rabbit.rel ${RABBITMQ_EBIN}/rabbit.script ${RABBITMQ_EBIN}/rabbit.boot
diff --git a/scripts/rabbitmq-deactivate-plugins.bat b/scripts/rabbitmq-deactivate-plugins.bat
new file mode 100644
index 00000000..190fdef7
--- /dev/null
+++ b/scripts/rabbitmq-deactivate-plugins.bat
@@ -0,0 +1,35 @@
+@echo off
+REM The contents of this file are subject to the Mozilla Public License
+REM Version 1.1 (the "License"); you may not use this file except in
+REM compliance with the License. You may obtain a copy of the License at
+REM http://www.mozilla.org/MPL/
+REM
+REM Software distributed under the License is distributed on an "AS IS"
+REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+REM License for the specific language governing rights and limitations
+REM under the License.
+REM
+REM The Original Code is RabbitMQ.
+REM
+REM The Initial Developers of the Original Code are LShift Ltd,
+REM Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+REM Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Ltd. Portions created by Cohesive Financial Technologies LLC are
+REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
+REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM
+REM All Rights Reserved.
+REM
+REM Contributor(s): ______________________________________.
+REM
+
+set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+
+del /f %RABBITMQ_EBIN_DIR%\rabbit.rel %RABBITMQ_EBIN_DIR%\rabbit.script %RABBITMQ_EBIN_DIR%\rabbit.boot
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index dd00a443..67768c0e 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -75,15 +75,16 @@ else
fi
RABBITMQ_START_RABBIT=
-[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit'
+[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
-if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then
+if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ] && [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit"
RABBITMQ_EBIN_PATH=""
else
RABBITMQ_BOOT_FILE=start_sasl
RABBITMQ_EBIN_PATH="-pa ${RABBITMQ_EBIN_ROOT}"
+ [ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="${RABBITMQ_START_RABBIT} -s rabbit"
fi
RABBITMQ_CONFIG_ARG=
[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 9c45e73d..a332afc6 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,8 +30,11 @@
## Contributor(s): ______________________________________.
##
+NODENAME=rabbit
+
. `dirname $0`/rabbitmq-env
+[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
exec erl \
@@ -41,4 +44,6 @@ exec erl \
${RABBITMQ_CTL_ERL_ARGS} \
-sname rabbitmqctl$$ \
-s rabbit_control \
+ -nodename $RABBITMQ_NODENAME \
-extra "$@"
+
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 5111724f..8a4e5445 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -30,6 +30,10 @@ REM
REM Contributor(s): ______________________________________.
REM
+if "%RABBITMQ_NODENAME%"=="" (
+ set RABBITMQ_NODENAME=rabbit
+)
+
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
@@ -42,4 +46,4 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -nodename %RABBITMQ_NODENAME% -extra %*
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index c74b39a9..74b41a91 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -67,8 +67,8 @@
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
-spec(new/0 :: () -> pqueue()).
--spec(is_queue/1 :: (any()) -> bool()).
--spec(is_empty/1 :: (pqueue()) -> bool()).
+-spec(is_queue/1 :: (any()) -> boolean()).
+-spec(is_empty/1 :: (pqueue()) -> boolean()).
-spec(len/1 :: (pqueue()) -> non_neg_integer()).
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ef1e0049..18fd1b17 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -33,7 +33,7 @@
-behaviour(application).
--export([start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]).
+-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]).
-export([start/2, stop/1]).
@@ -57,6 +57,7 @@
-type(log_location() :: 'tty' | 'undefined' | string()).
-type(file_suffix() :: binary()).
+-spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> 'ok').
@@ -71,11 +72,14 @@
%%----------------------------------------------------------------------------
+prepare() ->
+ ok = ensure_working_log_handlers(),
+ ok = rabbit_mnesia:ensure_mnesia_dir().
+
start() ->
try
- ok = ensure_working_log_handlers(),
- ok = rabbit_mnesia:ensure_mnesia_dir(),
- ok = rabbit_misc:start_applications(?APPS)
+ ok = prepare(),
+ ok = rabbit_misc:start_applications(?APPS)
after
%%give the error loggers some time to catch up
timer:sleep(100)
@@ -215,6 +219,16 @@ log_location(Type) ->
_ -> undefined
end.
+app_location() ->
+ {ok, Application} = application:get_application(),
+ filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")).
+
+home_dir() ->
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> Home;
+ Other -> Other
+ end.
+
%---------------------------------------------------------------------------
print_banner() ->
@@ -237,10 +251,13 @@ print_banner() ->
[Product, string:right([$v|Version], ProductLen),
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
- Settings = [{"node", node()},
- {"log", log_location(kernel)},
- {"sasl log", log_location(sasl)},
- {"database dir", rabbit_mnesia:dir()}],
+ Settings = [{"node", node()},
+ {"app descriptor", app_location()},
+ {"home dir", home_dir()},
+ {"cookie hash", rabbit_misc:cookie_hash()},
+ {"log", log_location(kernel)},
+ {"sasl log", log_location(sasl)},
+ {"database dir", rabbit_mnesia:dir()}],
DescrLen = lists:max([length(K) || {K, _V} <- Settings]),
Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 309c9a0e..7a2fbcb8 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -50,7 +50,7 @@
-ifdef(use_specs).
-type(mfa_tuple() :: {atom(), atom(), list()}).
--spec(start/1 :: (bool() | 'auto') -> 'ok').
+-spec(start/1 :: (boolean() | 'auto') -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f05f7880..1a5e82d7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -63,7 +63,7 @@
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
--spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
+-spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
@@ -83,8 +83,8 @@
{'error', 'in_use'} |
{'error', 'not_empty'}).
-spec(purge/1 :: (amqqueue()) -> qlen()).
--spec(deliver/2 :: (pid(), delivery()) -> bool()).
--spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
+-spec(deliver/2 :: (pid(), delivery()) -> boolean()).
+-spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
@@ -92,16 +92,16 @@
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
--spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
+-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
-spec(basic_consume/8 ::
- (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
+ (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
+-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 4033aaaf..bec2cd08 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -45,13 +45,14 @@
-type(publish_result() :: ({ok, routing_result(), [pid()]} | not_found())).
-spec(publish/1 :: (delivery()) -> publish_result()).
--spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
+-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) ->
+ delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> message()).
-spec(properties/1 :: (properties_input()) -> amqp_properties()).
-spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> publish_result()).
--spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(),
+-spec(publish/7 :: (exchange_name(), routing_key(), boolean(), boolean(),
maybe(txn()), properties_input(), binary()) ->
publish_result()).
-spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1285064f..c20cb16c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -60,8 +60,8 @@
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok').
--spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
+-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-endif.
@@ -125,11 +125,11 @@ handle_cast({method, Method, Content}, State) ->
stop ->
{stop, normal, State#ch{state = terminating}}
catch
- exit:{amqp, Error, Explanation, none} ->
+ exit:Reason = #amqp_error{} ->
ok = rollback_and_notify(State),
- Reason = {amqp, Error, Explanation,
- rabbit_misc:method_record_type(Method)},
- State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
+ MethodName = rabbit_misc:method_record_type(Method),
+ State#ch.reader_pid ! {channel_exit, State#ch.channel,
+ Reason#amqp_error{method = MethodName}},
{stop, normal, State#ch{state = terminating}};
exit:normal ->
{stop, normal, State};
@@ -260,12 +260,6 @@ expand_routing_key_shortcut(<<>>, <<>>,
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
-die_precondition_failed(Fmt, Params) ->
- %% FIXME: 406 should be replaced with precondition_failed when we
- %% move to AMQP spec >=8.1
- rabbit_misc:protocol_error({false, 406, <<"PRECONDITION_FAILED">>},
- Fmt, Params).
-
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
@@ -610,8 +604,8 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
{error, not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, in_use} ->
- die_precondition_failed(
- "~s in use", [rabbit_misc:rs(ExchangeName)]);
+ rabbit_misc:protocol_error(
+ precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
@@ -685,11 +679,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
QueueName,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
- die_precondition_failed(
- "~s in use", [rabbit_misc:rs(QueueName)]);
+ rabbit_misc:protocol_error(
+ precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
- die_precondition_failed(
- "~s not empty", [rabbit_misc:rs(QueueName)]);
+ rabbit_misc:protocol_error(
+ precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
#'queue.delete_ok'{
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 69e91803..a53ac289 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -52,10 +52,12 @@
%%----------------------------------------------------------------------------
start() ->
+ {ok, [[NodeNameStr|_]|_]} = init:get_argument(nodename),
+ NodeName = list_to_atom(NodeNameStr),
FullCommand = init:get_plain_arguments(),
#params{quiet = Quiet, node = Node, command = Command, args = Args} =
parse_args(FullCommand, #params{quiet = false,
- node = rabbit_misc:localnode(rabbit)}),
+ node = rabbit_misc:localnode(NodeName)}),
Inform = case Quiet of
true -> fun(_Format, _Args1) -> ok end;
false -> fun(Format, Args1) ->
@@ -80,13 +82,38 @@ start() ->
{error, Reason} ->
error("~p", [Reason]),
halt(2);
+ {badrpc, Reason} ->
+ error("unable to connect to node ~w: ~w", [Node, Reason]),
+ print_badrpc_diagnostics(Node),
+ halt(2);
Other ->
error("~p", [Other]),
halt(2)
end.
-error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
+
+error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
+
+print_badrpc_diagnostics(Node) ->
+ fmt_stderr("diagnostics:", []),
+ NodeHost = rabbit_misc:nodehost(Node),
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ fmt_stderr("- unable to connect to epmd on ~s: ~w",
+ [NodeHost, EpmdReason]);
+ {ok, NamePorts} ->
+ fmt_stderr("- nodes and their ports on ~s: ~p",
+ [NodeHost, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]])
+ end,
+ fmt_stderr("- current node: ~w", [node()]),
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]);
+ Other -> fmt_stderr("- no current node home dir: ~p", [Other])
+ end,
+ fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]),
+ ok.
parse_args(["-n", NodeS | Args], Params) ->
Node = case lists:member($@, NodeS) of
@@ -197,9 +224,11 @@ action(cluster, Node, ClusterNodeSs, Inform) ->
action(status, Node, [], Inform) ->
Inform("Status of node ~p", [Node]),
- Res = call(Node, {rabbit, status, []}),
- io:format("~p~n", [Res]),
- ok;
+ case call(Node, {rabbit, status, []}) of
+ {badrpc, _} = Res -> Res;
+ Res -> io:format("~p~n", [Res]),
+ ok
+ end;
action(rotate_logs, Node, [], Inform) ->
Inform("Reopening logs for node ~p", [Node]),
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
new file mode 100644
index 00000000..23e6fc44
--- /dev/null
+++ b/src/rabbit_dialyzer.erl
@@ -0,0 +1,91 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_dialyzer).
+-include("rabbit.hrl").
+
+-export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, halt_with_code/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(create_basic_plt/1 :: (string()) -> 'ok').
+-spec(add_to_plt/2 :: (string(), string()) -> 'ok').
+-spec(dialyze_files/2 :: (string(), string()) -> 'ok').
+-spec(halt_with_code/1 :: (atom()) -> no_return()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+create_basic_plt(BasicPltPath) ->
+ OptsRecord = dialyzer_options:build(
+ [{analysis_type, plt_build},
+ {output_plt, BasicPltPath},
+ {files_rec, otp_apps_dependencies_paths()}]),
+ dialyzer_cl:start(OptsRecord),
+ ok.
+
+add_to_plt(PltPath, FilesString) ->
+ {ok, Files} = regexp:split(FilesString, " "),
+ DialyzerWarnings = dialyzer:run([{analysis_type, plt_add},
+ {init_plt, PltPath},
+ {output_plt, PltPath},
+ {files, Files}]),
+ print_warnings(DialyzerWarnings),
+ ok.
+
+dialyze_files(PltPath, ModifiedFiles) ->
+ {ok, Files} = regexp:split(ModifiedFiles, " "),
+ DialyzerWarnings = dialyzer:run([{init_plt, PltPath},
+ {files, Files}]),
+ case DialyzerWarnings of
+ [] -> io:format("~nOk~n"),
+ ok;
+ _ -> io:format("~nFAILED with the following warnings:~n"),
+ print_warnings(DialyzerWarnings),
+ fail
+ end.
+
+print_warnings(Warnings) ->
+ [io:format("~s", [dialyzer:format_warning(W)]) || W <- Warnings],
+ io:format("~n"),
+ ok.
+
+otp_apps_dependencies_paths() ->
+ [code:lib_dir(App, ebin) ||
+ App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]].
+
+halt_with_code(ok) ->
+ halt();
+halt_with_code(fail) ->
+ halt(1).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 8fb9eae3..33dea8c7 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -61,7 +61,7 @@
'exchange_not_found' |
'exchange_and_queue_not_found'}).
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
+-spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(),
amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
@@ -83,9 +83,9 @@
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
--spec(topic_matches/2 :: (binary(), binary()) -> bool()).
--spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()).
--spec(delete/2 :: (exchange_name(), bool()) ->
+-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
+-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
[{exchange_name(), routing_key(), amqp_table()}]).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 9f3dcbd0..087a9f64 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -47,7 +47,7 @@
-spec(start_link/1 :: (pid()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
--spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()).
+-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl
index 7bf85347..6ef638cb 100644
--- a/src/rabbit_load.erl
+++ b/src/rabbit_load.erl
@@ -41,7 +41,7 @@
-ifdef(use_specs).
-type(erlang_node() :: atom()).
--type(load() :: {{non_neg_integer(), float()}, erlang_node()}).
+-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}).
-spec(local_load/0 :: () -> load()).
-spec(remote_loads/0 :: () -> [load()]).
-spec(pick/0 :: () -> erlang_node()).
@@ -52,8 +52,11 @@
local_load() ->
LoadAvg = case whereis(cpu_sup) of
- undefined -> 0.0;
- _Other -> cpu_sup:avg1()
+ undefined -> unknown;
+ _ -> case cpu_sup:avg1() of
+ L when is_integer(L) -> L;
+ {error, timeout} -> unknown
+ end
end,
{{statistics(run_queue), LoadAvg}, node()}.
@@ -65,8 +68,12 @@ remote_loads() ->
pick() ->
RemoteLoads = remote_loads(),
{{RunQ, LoadAvg}, Node} = local_load(),
- %% add bias towards current node
- AdjustedLoadAvg = LoadAvg * ?FUDGE_FACTOR,
+ %% add bias towards current node; we rely on Erlang's term order
+ %% of SomeFloat < local_unknown < unknown.
+ AdjustedLoadAvg = case LoadAvg of
+ unknown -> local_unknown;
+ _ -> LoadAvg * ?FUDGE_FACTOR
+ end,
Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads],
{_, SelectedNode} = lists:min(Loads),
SelectedNode.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 95a274e3..b20e9a86 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -35,7 +35,8 @@
-include_lib("kernel/include/file.hrl").
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
--export([die/1, frame_error/2, protocol_error/3, protocol_error/4]).
+-export([die/1, frame_error/2, amqp_error/4,
+ protocol_error/3, protocol_error/4]).
-export([not_found/1]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
@@ -46,7 +47,7 @@
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
--export([localnode/1, tcp_name/3]).
+-export([localnode/1, nodehost/1, cookie_hash/0, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
@@ -74,10 +75,9 @@
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
-spec(die/1 :: (atom()) -> no_return()).
-spec(frame_error/2 :: (atom(), binary()) -> no_return()).
--spec(protocol_error/3 ::
- (atom() | amqp_error(), string(), [any()]) -> no_return()).
--spec(protocol_error/4 ::
- (atom() | amqp_error(), string(), [any()], atom()) -> no_return()).
+-spec(amqp_error/4 :: (atom(), string(), [any()], atom()) -> amqp_error()).
+-spec(protocol_error/3 :: (atom(), string(), [any()]) -> no_return()).
+-spec(protocol_error/4 :: (atom(), string(), [any()], atom()) -> no_return()).
-spec(not_found/1 :: (r(atom())) -> no_return()).
-spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()).
-spec(get_config/2 :: (atom(), A) -> A).
@@ -106,6 +106,8 @@
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
-spec(localnode/1 :: (atom()) -> erlang_node()).
+-spec(nodehost/1 :: (erlang_node()) -> string()).
+-spec(cookie_hash/0 :: () -> string()).
-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
@@ -144,15 +146,17 @@ die(Error) ->
protocol_error(Error, "~w", [Error]).
frame_error(MethodName, BinaryFields) ->
- protocol_error(frame_error, "cannot decode ~w",
- [BinaryFields], MethodName).
+ protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName).
-protocol_error(Error, Explanation, Params) ->
- protocol_error(Error, Explanation, Params, none).
+amqp_error(Name, ExplanationFormat, Params, Method) ->
+ Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)),
+ #amqp_error{name = Name, explanation = Explanation, method = Method}.
-protocol_error(Error, Explanation, Params, Method) ->
- CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)),
- exit({amqp, Error, CompleteExplanation, Method}).
+protocol_error(Name, ExplanationFormat, Params) ->
+ protocol_error(Name, ExplanationFormat, Params, none).
+
+protocol_error(Name, ExplanationFormat, Params, Method) ->
+ exit(amqp_error(Name, ExplanationFormat, Params, Method)).
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
@@ -305,11 +309,15 @@ ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
localnode(Name) ->
+ list_to_atom(lists:append([atom_to_list(Name), "@", nodehost(node())])).
+
+nodehost(Node) ->
%% This is horrible, but there doesn't seem to be a way to split a
%% nodename into its constituent parts.
- list_to_atom(lists:append(atom_to_list(Name),
- lists:dropwhile(fun (E) -> E =/= $@ end,
- atom_to_list(node())))).
+ tl(lists:dropwhile(fun (E) -> E =/= $@ end, atom_to_list(Node))).
+
+cookie_hash() ->
+ ssl_base64:encode(erlang:md5(atom_to_list(erlang:get_cookie()))).
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 37e20335..c4d5aac6 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -50,7 +50,7 @@
-spec(dir/0 :: () -> string()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
--spec(is_db_empty/0 :: () -> bool()).
+-spec(is_db_empty/0 :: () -> boolean()).
-spec(cluster/1 :: ([erlang_node()]) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index eed21a01..1bc17a32 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -39,8 +39,8 @@
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/3]).
--export([tcp_listener_started/2, ssl_connection_upgrade/2,
- tcp_listener_stopped/2, start_client/1]).
+-export([tcp_listener_started/2, tcp_listener_stopped/2,
+ start_client/1, start_ssl_client/2]).
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
@@ -101,7 +101,7 @@ check_tcp_listener_address(NamePrefix, Host, Port) ->
if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
[Port]),
- throw({error, invalid_port, Port})
+ throw({error, {invalid_port, Port}})
end,
Name = rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
{IPAddress, Name}.
@@ -112,7 +112,7 @@ start_tcp_listener(Host, Port) ->
start_ssl_listener(Host, Port, SslOpts) ->
start_listener(Host, Port, "SSL Listener",
- {?MODULE, ssl_connection_upgrade, [SslOpts]}).
+ {?MODULE, start_ssl_client, [SslOpts]}).
start_listener(Host, Port, Label, OnConnect) ->
{IPAddress, Name} =
@@ -166,20 +166,28 @@ start_client(Sock) ->
Child ! {go, Sock},
Child.
-ssl_connection_upgrade(SslOpts, Sock) ->
- {ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock),
- PeerIp = inet_parse:ntoa(PeerAddress),
-
- case ssl:ssl_accept(Sock, SslOpts) of
- {ok, SslSock} ->
- rabbit_log:info("upgraded TCP connection from ~s:~p to SSL~n",
- [PeerIp, PeerPort]),
- RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock},
- start_client(RabbitSslSock);
+start_ssl_client(SslOpts, Sock) ->
+ case rabbit_net:peername(Sock) of
+ {ok, {PeerAddress, PeerPort}} ->
+ PeerIp = inet_parse:ntoa(PeerAddress),
+ case ssl:ssl_accept(Sock, SslOpts) of
+ {ok, SslSock} ->
+ rabbit_log:info("upgraded TCP connection "
+ "from ~s:~p to SSL~n",
+ [PeerIp, PeerPort]),
+ RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock},
+ start_client(RabbitSslSock);
+ {error, Reason} ->
+ gen_tcp:close(Sock),
+ rabbit_log:error("failed to upgrade TCP connection "
+ "from ~s:~p to SSL: ~n~p~n",
+ [PeerIp, PeerPort, Reason]),
+ {error, Reason}
+ end;
{error, Reason} ->
gen_tcp:close(Sock),
- rabbit_log:error("failed to upgrade TCP connection from ~s:~p "
- "to SSL: ~n~p~n", [PeerIp, PeerPort, Reason]),
+ rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n",
+ [Reason]),
{error, Reason}
end.
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 0206f73e..f28c4a6e 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -49,6 +49,8 @@ start() ->
UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir),
RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin),
+ RootName = RabbitEBin ++ "/rabbit",
+
%% Unpack any .ez plugins
unpack_ez_plugins(PluginDir, UnpackedPluginDir),
@@ -60,10 +62,8 @@ start() ->
%% Build the entire set of dependencies - this will load the
%% applications along the way
AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
- {unknown_app, {App, Err}} ->
- io:format("ERROR: Failed to load application " ++
- "~s: ~p~n", [App, Err]),
- halt(1);
+ {failed_to_load_app, App, Err} ->
+ error("failed to load application ~s: ~p", [App, Err]);
AppList ->
AppList
end,
@@ -77,11 +77,11 @@ start() ->
AppVersions},
%% Write it out to ebin/rabbit.rel
- file:write_file(RabbitEBin ++ "/rabbit.rel",
- io_lib:format("~p.~n", [RDesc])),
+ file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
%% Compile the script
- case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of
+ ScriptFile = RootName ++ ".script",
+ case systools:make_script(RootName, [local, silent]) of
{ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
@@ -98,9 +98,19 @@ start() ->
end,
ok;
{error, Module, Error} ->
- io:format("Boot file generation failed: ~s~n",
- [Module:format_error(Error)]),
- halt(1)
+ error("generation of boot script file ~s failed: ~w",
+ [ScriptFile, Module:format_error(Error)])
+ end,
+
+ case post_process_script(ScriptFile) of
+ ok -> ok;
+ {error, Reason} ->
+ error("post processing of boot script file ~s failed: ~w",
+ [ScriptFile, Reason])
+ end,
+ case systools:script2boot(RootName) of
+ ok -> ok;
+ error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
halt(),
ok.
@@ -122,10 +132,10 @@ determine_version(App) ->
assert_dir(Dir) ->
case filelib:is_dir(Dir) of
true -> ok;
- false ->
- ok = filelib:ensure_dir(Dir),
- ok = file:make_dir(Dir)
+ false -> ok = filelib:ensure_dir(Dir),
+ ok = file:make_dir(Dir)
end.
+
delete_dir(Dir) ->
case filelib:is_dir(Dir) of
true ->
@@ -143,6 +153,7 @@ delete_dir(Dir) ->
false ->
ok
end.
+
is_symlink(Name) ->
case file:read_link(Name) of
{ok, _} -> true;
@@ -185,14 +196,43 @@ expand_dependencies(Current, [Next|Rest]) ->
expand_dependencies(Current, Rest);
false ->
case application:load(Next) of
- ok ->
+ ok ->
ok;
- {error, {already_loaded, _}} ->
+ {error, {already_loaded, _}} ->
ok;
- X ->
- throw({unknown_app, {Next, X}})
+ {error, Reason} ->
+ throw({failed_to_load_app, Next, Reason})
end,
{ok, Required} = application:get_key(Next, applications),
Unique = [A || A <- Required, not(sets:is_element(A, Current))],
expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique)
end.
+
+post_process_script(ScriptFile) ->
+ case file:consult(ScriptFile) of
+ {ok, [{script, Name, Entries}]} ->
+ NewEntries = process_entries(Entries),
+ case file:open(ScriptFile, [write]) of
+ {ok, Fd} ->
+ io:format(Fd, "%% script generated at ~w ~w~n~p.~n",
+ [date(), time(), {script, Name, NewEntries}]),
+ file:close(Fd),
+ ok;
+ {error, OReason} ->
+ {error, {failed_to_open_script_file_for_writing, OReason}}
+ end;
+ {error, Reason} ->
+ {error, {failed_to_load_script, Reason}}
+ end.
+
+process_entries([]) ->
+ [];
+process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} |
+ Rest]) ->
+ [Entry, {apply,{rabbit,prepare,[]}} | Rest];
+process_entries([Entry|Rest]) ->
+ [Entry | process_entries(Rest)].
+
+error(Fmt, Args) ->
+ io:format("ERROR: " ++ Fmt ++ "~n", Args),
+ halt(1).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 761794f1..e21485b5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -49,7 +49,6 @@
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
--define(CHANNEL_CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
%---------------------------------------------------------------------------
@@ -94,23 +93,19 @@
%% -> log error, wait for channels to terminate forcefully, start
%% terminate_connection timer, send close, *closed*
%% channel exit with soft error
-%% -> log error, start terminate_channel timer, mark channel as
-%% closing, *running*
-%% terminate_channel timeout -> remove 'closing' mark, *running*
+%% -> log error, mark channel as closing, *running*
%% handshake_timeout -> ignore, *running*
%% heartbeat timeout -> *throw*
%% closing:
%% socket close -> *terminate*
%% receive frame -> ignore, *closing*
-%% terminate_channel timeout -> remove 'closing' mark, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
%% channel exit with hard error
%% -> log error, wait for channels to terminate forcefully, start
%% terminate_connection timer, send close, *closed*
%% channel exit with soft error
-%% -> log error, start terminate_channel timer, mark channel as
-%% closing
+%% -> log error, mark channel as closing
%% if last channel to exit then send connection.close_ok,
%% start terminate_connection timer, *closed*
%% else *closing*
@@ -123,7 +118,6 @@
%% *closed*
%% receive frame -> ignore, *closed*
%% terminate_connection timeout -> *terminate*
-%% terminate_channel timeout -> remove 'closing' mark, *closed*
%% handshake_timeout -> ignore, *closed*
%% heartbeat timeout -> *throw*
%% channel exit -> log error, *closed*
@@ -269,12 +263,10 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
throw({inet_error, Reason});
{'EXIT', Parent, Reason} ->
if State#v1.connection_state =:= running ->
- send_exception(
- State, 0,
- {amqp, connection_forced,
- io_lib:format(
- "broker forced connection closure with reason '~w'",
- [Reason]), none});
+ send_exception(State, 0,
+ rabbit_misc:amqp_error(connection_forced,
+ "broker forced connection closure with reason '~w'",
+ [Reason], none));
true -> ok
end,
%% this is what we are expected to do according to
@@ -292,8 +284,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
- {terminate_channel, Channel, Ref1} ->
- mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -341,32 +331,14 @@ close_connection(State = #v1{connection = #connection{
State#v1{connection_state = closed}.
close_channel(Channel, State) ->
- Ref = make_ref(),
- TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT,
- self(),
- {terminate_channel, Channel, Ref}),
- put({closing_channel, Channel}, {Ref, TRef}),
- State.
-
-terminate_channel(Channel, Ref, State) ->
- case get({closing_channel, Channel}) of
- undefined -> ok; %% got close_ok in the meantime
- {Ref, _} -> erase({closing_channel, Channel}),
- ok;
- {_Ref, _} -> ok %% got close_ok, and have new closing channel
- end,
+ put({channel, Channel}, closing),
State.
handle_channel_exit(Channel, Reason, State) ->
- %% We remove the channel from the inbound map only. That allows
- %% the channel to be re-opened, but also means the remaining
- %% cleanup, including possibly closing the connection, is deferred
- %% until we get the (normal) exit signal.
- erase({channel, Channel}),
handle_exception(State, Channel, Reason).
handle_dependent_exit(Pid, normal, State) ->
- channel_cleanup(Pid),
+ erase({chpid, Pid}),
maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
case channel_cleanup(Pid) of
@@ -376,17 +348,10 @@ handle_dependent_exit(Pid, Reason, State) ->
channel_cleanup(Pid) ->
case get({chpid, Pid}) of
- undefined ->
- case get({closing_chpid, Pid}) of
- undefined -> undefined;
- {channel, Channel} ->
- erase({closing_chpid, Pid}),
- Channel
- end;
- {channel, Channel} ->
- erase({channel, Channel}),
- erase({chpid, Pid}),
- Channel
+ undefined -> undefined;
+ {channel, Channel} -> erase({channel, Channel}),
+ erase({chpid, Pid}),
+ Channel
end.
all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
@@ -467,13 +432,27 @@ handle_frame(Type, Channel, Payload, State) ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{chpid, ChPid} ->
- ok = check_for_close(Channel, ChPid, AnalyzedFrame),
+ case AnalyzedFrame of
+ {method, 'channel.close', _} ->
+ erase({channel, Channel});
+ _ -> ok
+ end,
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
State;
+ closing ->
+ %% According to the spec, after sending a
+ %% channel.close we must ignore all frames except
+ %% channel.close_ok.
+ case AnalyzedFrame of
+ {method, 'channel.close_ok', _} ->
+ erase({channel, Channel});
+ _ -> ok
+ end,
+ State;
undefined ->
case State#v1.connection_state of
- running -> send_to_new_channel(
- Channel, AnalyzedFrame, State),
+ running -> ok = send_to_new_channel(
+ Channel, AnalyzedFrame, State),
State;
Other -> throw({channel_frame_while_starting,
Channel, Other, AnalyzedFrame})
@@ -567,17 +546,17 @@ handle_method0(MethodName, FieldsBin, State) ->
MethodName, FieldsBin),
State)
catch exit:Reason ->
- CompleteReason =
- case Reason of
- {amqp, Error, Explanation, none} ->
- {amqp, Error, Explanation, MethodName};
- OtherReason -> OtherReason
- end,
+ CompleteReason = case Reason of
+ #amqp_error{method = none} ->
+ Reason#amqp_error{method = MethodName};
+ OtherReason -> OtherReason
+ end,
case State#v1.connection_state of
running -> send_exception(State, 0, CompleteReason);
Other -> throw({channel0_error, Other, CompleteReason})
end
end.
+
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response},
State = #v1{connection_state = starting,
@@ -716,38 +695,17 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
send_to_new_channel(Channel, AnalyzedFrame, State) ->
- case get({closing_channel, Channel}) of
- undefined ->
- #v1{sock = Sock,
- connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost}} = State,
- WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
- ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame);
- {_, TRef} ->
- %% According to the spec, after sending a channel.close we
- %% must ignore all frames except channel.close_ok.
- case AnalyzedFrame of
- {method, 'channel.close_ok', _} ->
- erlang:cancel_timer(TRef),
- erase({closing_channel, Channel}),
- ok;
- _Other -> ok
- end
- end.
-
-check_for_close(Channel, ChPid, {method, 'channel.close', _}) ->
- channel_cleanup(ChPid),
- put({closing_chpid, ChPid}, {channel, Channel}),
- ok;
-check_for_close(_Channel, _ChPid, _Frame) ->
- ok.
+ #v1{sock = Sock, connection = #connection{
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
+ ChPid = rabbit_framing_channel:start_link(
+ fun rabbit_channel:start_link/5,
+ [Channel, self(), WriterPid, Username, VHost]),
+ put({channel, Channel}, {chpid, ChPid}),
+ put({chpid, ChPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
@@ -793,18 +751,27 @@ map_exception(Channel, Reason) ->
end,
{ShouldClose, CloseChannel, CloseMethod}.
-lookup_amqp_exception({amqp, {ShouldClose, Code, Text}, Expl, Method}) ->
- ExplBin = list_to_binary(Expl),
- CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>,
- SafeTextBin = if size(CompleteTextBin) > 255 ->
- <<CompleteTextBin:252/binary, "...">>;
- true ->
- CompleteTextBin
- end,
- {ShouldClose, Code, SafeTextBin, Method};
-lookup_amqp_exception({amqp, ErrorName, Expl, Method}) ->
- Details = rabbit_framing:lookup_amqp_exception(ErrorName),
- lookup_amqp_exception({amqp, Details, Expl, Method});
+%% FIXME: this clause can go when we move to AMQP spec >=8.1
+lookup_amqp_exception(#amqp_error{name = precondition_failed,
+ explanation = Expl,
+ method = Method}) ->
+ ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl),
+ {false, 406, ExplBin, Method};
+lookup_amqp_exception(#amqp_error{name = Name,
+ explanation = Expl,
+ method = Method}) ->
+ {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name),
+ ExplBin = amqp_exception_explanation(Text, Expl),
+ {ShouldClose, Code, ExplBin, Method};
lookup_amqp_exception(Other) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {true, ?INTERNAL_ERROR, <<"INTERNAL_ERROR">>, none}.
+ {ShouldClose, Code, Text} =
+ rabbit_framing:lookup_amqp_exception(internal_error),
+ {ShouldClose, Code, Text, none}.
+
+amqp_exception_explanation(Text, Expl) ->
+ ExplBin = list_to_binary(Expl),
+ CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>,
+ if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>;
+ true -> CompleteTextBin
+ end.
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index aa8b8ad5..bc742561 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -67,15 +67,20 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
- %% report
- {ok, {Address, Port}} = inet:sockname(LSock),
- {ok, {PeerAddress, PeerPort}} = inet:peername(Sock),
- error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
- [inet_parse:ntoa(Address), Port,
- inet_parse:ntoa(PeerAddress), PeerPort]),
-
- %% handle
- apply(M, F, A ++ [Sock]),
+ try
+ %% report
+ {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
+ {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
+ error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
+ [inet_parse:ntoa(Address), Port,
+ inet_parse:ntoa(PeerAddress), PeerPort]),
+ %% handle
+ apply(M, F, A ++ [Sock])
+ catch {inet_error, Reason} ->
+ gen_tcp:close(Sock),
+ error_logger:error_msg("unable to accept TCP connection: ~p~n",
+ [Reason])
+ end,
%% accept more
case prim_inet:async_accept(LSock, -1) of
@@ -95,3 +100,7 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+%%--------------------------------------------------------------------
+
+inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).