diff options
author | Michael Bridgen <mikeb@lshift.net> | 2009-12-18 15:36:58 +0000 |
---|---|---|
committer | Michael Bridgen <mikeb@lshift.net> | 2009-12-18 15:36:58 +0000 |
commit | cd35f8a216237ba189413e4e8b62076b5df4f4f0 (patch) | |
tree | 28a5906c4f5c68d165ef45c30e89f30aa57e2e56 | |
parent | 1f6a5e822a915df9e8cb6952fbe45263f81ab430 (diff) | |
parent | 976004c8a6fb2863a9861858da59fde72b7fbe00 (diff) | |
download | rabbitmq-server-cd35f8a216237ba189413e4e8b62076b5df4f4f0.tar.gz |
Merge no-pluggable-exchange branch into v1.7-series branch.
41 files changed, 980 insertions, 758 deletions
@@ -15,7 +15,20 @@ TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) +ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python +else +ifeq ($(shell python2.6 -c 'import simplejson' 2>/dev/null && echo yes),yes) +PYTHON=python2.6 +else +ifeq ($(shell python2.5 -c 'import simplejson' 2>/dev/null && echo yes),yes) +PYTHON=python2.5 +else +# Hmm. Missing simplejson? +PYTHON=python +endif +endif +endif BASIC_PLT=basic.plt RABBIT_PLT=rabbit.plt @@ -46,7 +59,7 @@ ERL_EBIN=erl -noinput -pa $(EBIN_DIR) all: $(TARGETS) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app - escript generate_app $(EBIN_DIR) < $< > $@ + escript generate_app $(EBIN_DIR) $@ < $< $(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl erlc $(ERLC_OPTS) $< @@ -228,12 +228,12 @@ def genErl(spec): print " rabbit_binary_generator:encode_properties(%s, %s);" % \ (fieldTypeList(c.fields), fieldTempList(c.fields)) - def massageConstantClass(cls): + def messageConstantClass(cls): # We do this because 0.8 uses "soft error" and 8.1 uses "soft-error". return erlangConstantName(cls) def genLookupException(c,v,cls): - mCls = massageConstantClass(cls) + mCls = messageConstantClass(cls) if mCls == 'SOFT_ERROR': genLookupException1(c,'false') elif mCls == 'HARD_ERROR': genLookupException1(c, 'true') elif mCls == '': pass @@ -244,6 +244,11 @@ def genErl(spec): print 'lookup_amqp_exception(%s) -> {%s, ?%s, <<"%s">>};' % \ (n.lower(), hardErrorBoolStr, n, n) + def genAmqpException(c,v,cls): + n = erlangConstantName(c) + print 'amqp_exception(?%s) -> %s;' % \ + (n, n.lower()) + methods = spec.allMethods() print """-module(rabbit_framing). @@ -260,6 +265,7 @@ def genErl(spec): -export([encode_method_fields/1]). -export([encode_properties/1]). -export([lookup_amqp_exception/1]). +-export([amqp_exception/1]). bitvalue(true) -> 1; bitvalue(false) -> 0; @@ -296,8 +302,10 @@ bitvalue(undefined) -> 0. for (c,v,cls) in spec.constants: genLookupException(c,v,cls) print "lookup_amqp_exception(Code) ->" print " rabbit_log:warning(\"Unknown AMQP error code '~p'~n\", [Code])," - print " {true, ?INTERNAL_ERROR, <<\"INTERNAL_ERROR\">>}." + print " {true, ?INTERNAL_ERROR, <<\"INTERNAL_ERROR\">>}." + for(c,v,cls) in spec.constants: genAmqpException(c,v,cls) + print "amqp_exception(_Code) -> undefined." def genHrl(spec): def erlType(domain): diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index c43ed2ea..5255be28 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -198,9 +198,9 @@ whether the queue will be deleted when no longer used queue arguments -=item node +=item pid -node on which the process associated with the queue resides +id of the Erlang process associated with the queue =item messages_ready @@ -279,7 +279,7 @@ exchange arguments =item list_bindings [-p I<vhostpath>] List bindings by virtual host. Each line printed describes a binding, -with the exchange name, routing key, queue name and arguments, +with the exchange name, queue name, routing key and arguments, separated by tab characters. =item list_connections [I<connectioninfoitem> ...] @@ -297,7 +297,7 @@ I<user>, I<peer_address>, I<peer_port> and I<state> are assumed. =item node -node on which the process associated with the connection resides +id of the Erlang process associated with the connection =item address @@ -340,6 +340,11 @@ connection timeout maximum frame size (bytes) +=item client_properties + +informational properties transmitted by the client during connection +establishment + =item recv_oct octets received diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index dd907d1a..3616fcbf 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -17,8 +17,8 @@ {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, {ssl_listeners, []}, {ssl_options, []}, + {vm_memory_high_watermark, 0.4}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_vhost, <<"/">>}, - {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, - {memory_alarms, auto}]}]}. + {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}]}]}. diff --git a/generate_app b/generate_app index 62301292..576b485e 100644 --- a/generate_app +++ b/generate_app @@ -1,10 +1,12 @@ #!/usr/bin/env escript %% -*- erlang -*- -main([BeamDir]) -> +main([BeamDir, TargetFile]) -> Modules = [list_to_atom(filename:basename(F, ".beam")) || F <- filelib:wildcard("*.beam", BeamDir)], {ok, {application, Application, Properties}} = io:read(''), NewProperties = lists:keyreplace(modules, 1, Properties, {modules, Modules}), - io:format("~p.", [{application, Application, NewProperties}]). + file:write_file( + TargetFile, + io_lib:format("~p.~n", [{application, Application, NewProperties}])). diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5703d0d6..4b157cbc 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -36,7 +36,7 @@ -record(vhost, {virtual_host, dummy}). --record(connection, {user, timeout_sec, frame_max, vhost}). +-record(connection, {user, timeout_sec, frame_max, vhost, client_properties}). -record(content, {class_id, diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 3a5cc2b0..62fb1dfb 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -75,9 +75,8 @@ echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm %pre if [ $1 -gt 1 ]; then - #Upgrade - stop and remove previous instance of rabbitmq-server init.d script + # Upgrade - stop previous instance of rabbitmq-server init.d script /sbin/service rabbitmq-server stop - /sbin/chkconfig --del rabbitmq-server fi # create rabbitmq group diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper index 9ef59ad7..ee5947b6 100644 --- a/packaging/common/rabbitmq-asroot-script-wrapper +++ b/packaging/common/rabbitmq-asroot-script-wrapper @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh ## The contents of this file are subject to the Mozilla Public License ## Version 1.1 (the "License"); you may not use this file except in ## compliance with the License. You may obtain a copy of the License at @@ -30,24 +30,16 @@ ## Contributor(s): ______________________________________. ## -# Escape spaces and quotes, because shell is revolting. -for arg in "$@" ; do - # Escape quotes in parameters, so that they're passed through cleanly. - arg=$(sed -e 's/"/\\"/g' <<-END - $arg - END - ) - CMDLINE="${CMDLINE} \"${arg}\"" -done - cd /var/lib/rabbitmq SCRIPT=`basename $0` if [ `id -u` = 0 ] ; then - /usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE} + /usr/lib/rabbitmq/bin/${SCRIPT} "$@" else - echo -e "\nOnly root should run ${SCRIPT}\n" + echo + echo "Only root should run ${SCRIPT}" + echo exit 1 fi diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 0c4bd0a8..dfb714f1 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -46,9 +46,13 @@ SCRIPT=`basename $0` if [ `id -u` = 0 ] ; then su rabbitmq -s /bin/sh -c "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" +elif [ `id -u` = `id -u rabbitmq` ] ; then + /usr/lib/rabbitmq/bin/${SCRIPT} "$@" else /usr/lib/rabbitmq/bin/${SCRIPT} - echo -e "\nOnly root should run ${SCRIPT}\n" + echo + echo "Only root or rabbitmq should run ${SCRIPT}" + echo exit 1 fi diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index dc305975..39d23983 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -66,8 +66,6 @@ stop_rabbitmq () { $DAEMON stop_all > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err RETVAL=$? if [ $RETVAL = 0 ] ; then - # Try to stop epmd if run by the rabbitmq user - pkill -u rabbitmq epmd || : [ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE else echo FAILED - check ${INIT_LOG_DIR}/shutdown_log, _err diff --git a/packaging/debs/Debian/debian/postrm b/packaging/debs/Debian/debian/postrm.in index a999d95b..bfcf1f53 100644 --- a/packaging/debs/Debian/debian/postrm +++ b/packaging/debs/Debian/debian/postrm.in @@ -34,7 +34,15 @@ case "$1" in if [ -d /etc/rabbitmq ]; then rm -r /etc/rabbitmq fi + # Remove traces of plugins + rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins + for ext in rel script boot ; do + rm -f @RABBIT_LIB@/ebin/rabbit.$ext + done if getent passwd rabbitmq >/dev/null; then + # Stop epmd if run by the rabbitmq user + pkill -u rabbitmq epmd || : + deluser rabbitmq fi if getent group rabbitmq >/dev/null; then diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 5e357955..3799c438 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -20,3 +20,4 @@ install/rabbitmq-server:: for script in rabbitmq-activate-plugins rabbitmq-deactivate-plugins; do \ install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done + sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile index 6b51fb2f..739f99d0 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/net/rabbitmq-server/Portfile @@ -3,7 +3,7 @@ PortSystem 1.0 name rabbitmq-server -version 1.6.0 +version 1.7.0 revision 0 categories net maintainers tonyg@rabbitmq.com @@ -19,13 +19,28 @@ homepage http://www.rabbitmq.com/ master_sites http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/ checksums \ - md5 af3b0d868d58e5aefb4f0837b82ca010 \ - sha1 1834c670d076fa9878223aacaa35a5a6528f1d86 \ - rmd160 d6c9de4e1fb48c6ceb1cb5d717ca2afb5e3266fe + md5 4505ca0fd8718439bd6f5e2af2379e56 \ + sha1 84fb86d403057bb808c1b51deee0c1fca3bf7bef \ + rmd160 092f90946825cc3eb277019805e24db637a559f4 -depends_build port:erlang port:py25-simplejson +depends_build port:erlang depends_run port:erlang +platform darwin 7 { + depends_build-append port:py25-simplejson + build.args PYTHON=${prefix}/bin/python2.5 +} +platform darwin 8 { + depends_build-append port:py25-simplejson + build.args PYTHON=${prefix}/bin/python2.5 +} +platform darwin 9 { + depends_build-append port:py25-simplejson + build.args PYTHON=${prefix}/bin/python2.5 +} +# no need for simplejson on Snow Leopard or higher + + set serveruser rabbitmq set servergroup rabbitmq set serverhome ${prefix}/var/lib/rabbitmq @@ -40,8 +55,6 @@ use_configure no use_parallel_build yes -build.args PYTHON=${prefix}/bin/python2.5 - destroot.destdir \ TARGET_DIR=${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version} \ SBIN_DIR=${sbindir} \ diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper index 0d7118c4..80cb7bd5 100644 --- a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper +++ b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper @@ -5,9 +5,11 @@ SCRIPT=`basename $0` if [ `id -u` = 0 ] ; then sudo -u rabbitmq -H /usr/lib/rabbitmq/bin/${SCRIPT} "$@" +elif [ `id -u` = `id -u rabbitmq` ] ; then + /usr/lib/rabbitmq/bin/${SCRIPT} "$@" else /usr/lib/rabbitmq/bin/${SCRIPT} - echo -e "\nOnly root should run ${SCRIPT}\n" + echo -e "\nOnly root or rabbitmq should run ${SCRIPT}\n" exit 1 fi diff --git a/scripts/rabbitmq-activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat index 3540bf2d..e7aa7095 100644 --- a/scripts/rabbitmq-activate-plugins.bat +++ b/scripts/rabbitmq-activate-plugins.bat @@ -30,6 +30,8 @@ REM REM Contributor(s): ______________________________________.
REM
+setlocal
+
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
@@ -42,8 +44,17 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-set RABBITMQ_PLUGINS_DIR="%~dp0..\plugins"
-set RABBITMQ_PLUGINS_EXPAND_DIR="%~dp0..\priv\plugins"
-set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+set RABBITMQ_PLUGINS_DIR=%~dp0..\plugins
+set RABBITMQ_PLUGINS_EXPAND_DIR=%~dp0..\priv\plugins
+set RABBITMQ_EBIN_DIR=%~dp0..\ebin
+
+"%ERLANG_HOME%\bin\erl.exe" ^
+-pa "%RABBITMQ_EBIN_DIR%" ^
+-noinput -hidden ^
+-s rabbit_plugin_activator ^
+-rabbit plugins_dir \""%RABBITMQ_PLUGINS_DIR:\=/%"\" ^
+-rabbit plugins_expand_dir \""%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%"\" ^
+-rabbit rabbit_ebin \""%RABBITMQ_EBIN_DIR:\=/%"\" ^
+-extra %*
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -s rabbit_plugin_activator -rabbit plugins_dir \"%RABBITMQ_PLUGINS_DIR:\=/%\" -rabbit plugins_expand_dir \"%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%\" -rabbit rabbit_ebin \"%RABBITMQ_EBIN_DIR:\=/%\" -extra %*
+endlocal
diff --git a/scripts/rabbitmq-deactivate-plugins.bat b/scripts/rabbitmq-deactivate-plugins.bat index 190fdef7..40155183 100644 --- a/scripts/rabbitmq-deactivate-plugins.bat +++ b/scripts/rabbitmq-deactivate-plugins.bat @@ -30,6 +30,10 @@ REM REM Contributor(s): ______________________________________.
REM
-set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+setlocal
-del /f %RABBITMQ_EBIN_DIR%\rabbit.rel %RABBITMQ_EBIN_DIR%\rabbit.script %RABBITMQ_EBIN_DIR%\rabbit.boot
+set RABBITMQ_EBIN_DIR=%~dp0..\ebin
+
+del /f "%RABBITMQ_EBIN_DIR%"\rabbit.rel "%RABBITMQ_EBIN_DIR%"\rabbit.script "%RABBITMQ_EBIN_DIR%"\rabbit.boot
+
+endlocal
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 7db4cb70..1a7eb97e 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -36,23 +36,37 @@ SCRIPT_HOME=$(dirname $0) PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= MULTI_START_ARGS= +CONFIG_FILE=/etc/rabbitmq/rabbitmq . `dirname $0`/rabbitmq-env +if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] +then + if [ "x" != "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + fi +else + if [ "x" = "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_PORT=${NODE_PORT} + fi +fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} [ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME} [ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE} [ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS} [ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS} +[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} export \ RABBITMQ_NODENAME \ RABBITMQ_NODE_IP_ADDRESS \ RABBITMQ_NODE_PORT \ RABBITMQ_SCRIPT_HOME \ - RABBITMQ_PIDS_FILE + RABBITMQ_PIDS_FILE \ + RABBITMQ_CONFIG_FILE + +RABBITMQ_CONFIG_ARG= +[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}" # we need to turn off path expansion because some of the vars, notably # RABBITMQ_MULTI_ERL_ARGS, may contain terms that look like globs and @@ -65,6 +79,7 @@ exec erl \ -hidden \ ${RABBITMQ_MULTI_ERL_ARGS} \ -sname rabbitmq_multi$$ \ + ${RABBITMQ_CONFIG_ARG} \ -s rabbit_multi \ ${RABBITMQ_MULTI_START_ARGS} \ -extra "$@" diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index 8abf13f1..6dda13af 100755 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -30,6 +30,8 @@ REM REM Contributor(s): ______________________________________.
REM
+setlocal
+
if "%RABBITMQ_BASE%"=="" (
set RABBITMQ_BASE=%APPDATA%\RabbitMQ
)
@@ -39,20 +41,32 @@ if "%RABBITMQ_NODENAME%"=="" ( )
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
+if "%RABBITMQ_CONFIG_FILE%"=="" (
+ set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+)
+
+if exist "%RABBITMQ_CONFIG_FILE%.config" (
+ set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+) else (
+ set RABBITMQ_CONFIG_ARG=
+)
+
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -61,5 +75,14 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_MULTI_ERL_ARGS% -sname rabbitmq_multi -s rabbit_multi %RABBITMQ_MULTI_START_ARGS% -extra %*
+"%ERLANG_HOME%\bin\erl.exe" ^
+-pa "%~dp0..\ebin" ^
+-noinput -hidden ^
+%RABBITMQ_MULTI_ERL_ARGS% ^
+-sname rabbitmq_multi ^
+%RABBITMQ_CONFIG_ARG% ^
+-s rabbit_multi ^
+%RABBITMQ_MULTI_START_ARGS% ^
+-extra %*
+endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 67768c0e..7f08cd9d 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -44,9 +44,17 @@ SERVER_START_ARGS= . `dirname $0`/rabbitmq-env +if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] +then + if [ "x" != "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + fi +else + if [ "x" = "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_PORT=${NODE_PORT} + fi +fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} [ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} [ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} @@ -89,6 +97,9 @@ fi RABBITMQ_CONFIG_ARG= [ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}" +RABBITMQ_LISTEN_ARG= +[ "x" != "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_LISTEN_ARG="-rabbit tcp_listeners [{\""${RABBITMQ_NODE_IP_ADDRESS}"\","${RABBITMQ_NODE_PORT}"}]" + # we need to turn off path expansion because some of the vars, notably # RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and # there is no other way of preventing their expansion. @@ -102,16 +113,13 @@ exec erl \ ${RABBITMQ_CONFIG_ARG} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ - -rabbit tcp_listeners '[{"'${RABBITMQ_NODE_IP_ADDRESS}'", '${RABBITMQ_NODE_PORT}'}]' \ + ${RABBITMQ_LISTEN_ARG} \ -sasl errlog_type error \ -kernel error_logger '{file,"'${RABBITMQ_LOGS}'"}' \ -sasl sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \ -os_mon start_cpu_sup true \ -os_mon start_disksup false \ -os_mon start_memsup false \ - -os_mon start_os_sup false \ - -os_mon memsup_system_only true \ - -os_mon system_memory_high_watermark 0.95 \ -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \ ${RABBITMQ_CLUSTER_CONFIG_OPTION} \ ${RABBITMQ_SERVER_START_ARGS} \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 40f47c4b..51102851 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -30,6 +30,8 @@ REM REM Contributor(s): ______________________________________.
REM
+setlocal
+
if "%RABBITMQ_BASE%"=="" (
set RABBITMQ_BASE=%APPDATA%\RabbitMQ
)
@@ -39,17 +41,19 @@ if "%RABBITMQ_NODENAME%"=="" ( )
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -73,17 +77,17 @@ rem Log management (rotation, filtering based of size...) is left as an exercice set BACKUP_EXTENSION=.1
-set LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log"
-set SASL_LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log"
+set LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log
+set SASL_LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log
-set LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%"
-set SASL_LOGS_BAKCUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%"
+set LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%
+set SASL_LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%
-if exist %LOGS% (
- type %LOGS% >> %LOGS_BACKUP%
+if exist "%LOGS%" (
+ type "%LOGS%" >> "%LOGS_BACKUP%"
)
-if exist %SASL_LOGS% (
- type %SASL_LOGS% >> %SASL_LOGS_BAKCUP%
+if exist "%SASL_LOGS%" (
+ type "%SASL_LOGS%" >> "%SASL_LOGS_BACKUP%"
)
rem End of log management
@@ -103,33 +107,41 @@ if "%RABBITMQ_MNESIA_DIR%"=="" ( set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
- set RABBITMQ_BOOT_FILE="%RABBITMQ_EBIN_ROOT%\rabbit"
+ set RABBITMQ_BOOT_FILE=%RABBITMQ_EBIN_ROOT%\rabbit
set RABBITMQ_EBIN_PATH=
) else (
set RABBITMQ_BOOT_FILE=start_sasl
set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
)
if "%RABBITMQ_CONFIG_FILE%"=="" (
- set RABBITMQ_CONFIG_FILE="%RABBITMQ_BASE%\rabbitmq"
+ set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
)
-
+
if exist "%RABBITMQ_CONFIG_FILE%.config" (
set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
) else (
- set RABBITMQ_CONFIG_ARG=""
+ set RABBITMQ_CONFIG_ARG=
+)
+
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners [{\""%RABBITMQ_NODE_IP_ADDRESS%"\","%RABBITMQ_NODE_PORT%"}]
+ )
)
"%ERLANG_HOME%\bin\erl.exe" ^
%RABBITMQ_EBIN_PATH% ^
-noinput ^
--boot %RABBITMQ_BOOT_FILE% %RABBITMQ_CONFIG_ARG% ^
+-boot "%RABBITMQ_BOOT_FILE%" ^
+%RABBITMQ_CONFIG_ARG% ^
-sname %RABBITMQ_NODENAME% ^
-s rabbit ^
+W w ^
+A30 ^
-kernel inet_default_listen_options "[{nodelay, true}, {sndbuf, 16384}, {recbuf, 4096}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
@@ -137,10 +149,9 @@ if exist "%RABBITMQ_CONFIG_FILE%.config" ( -os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--os_mon start_os_sup false ^
--os_mon memsup_system_only true ^
--os_mon system_memory_high_watermark 0.95 ^
-mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBITMQ_SERVER_START_ARGS% ^
%*
+
+endlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 29be1742..d960d29d 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -30,6 +30,8 @@ REM REM Contributor(s): ______________________________________.
REM
+setlocal
+
if "%RABBITMQ_SERVICENAME%"=="" (
set RABBITMQ_SERVICENAME=RabbitMQ
)
@@ -43,15 +45,17 @@ if "%RABBITMQ_NODENAME%"=="" ( )
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if "%ERLANG_SERVICE_MANAGER_PATH%"=="" (
- set ERLANG_SERVICE_MANAGER_PATH=C:\Program Files\erl5.5.5\erts-5.5.5\bin
+ set ERLANG_SERVICE_MANAGER_PATH=C:\Program Files\erl5.6.5\erts-5.6.5\bin
)
set CONSOLE_FLAG=
@@ -69,7 +73,7 @@ if not exist "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" ( echo ERLANG_SERVICE_MANAGER_PATH not set correctly.
echo **********************************************
echo.
- echo %ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe not found!
+ echo "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" not found!
echo Please set ERLANG_SERVICE_MANAGER_PATH to the folder containing "erlsrv.exe".
echo.
exit /B 1
@@ -91,17 +95,17 @@ rem Log management (rotation, filtering based on size...) is left as an exercise set BACKUP_EXTENSION=.1
-set LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log"
-set SASL_LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log"
+set LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log
+set SASL_LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log
-set LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%"
-set SASL_LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%"
+set LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%
+set SASL_LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%
-if exist %LOGS% (
- type %LOGS% >> %LOGS_BACKUP%
+if exist "%LOGS%" (
+ type "%LOGS%" >> "%LOGS_BACKUP%"
)
-if exist %SASL_LOGS% (
- type %SASL_LOGS% >> %SASL_LOGS_BACKUP%
+if exist "%SASL_LOGS%" (
+ type "%SASL_LOGS%" >> "%SASL_LOGS_BACKUP%"
)
rem End of log management
@@ -156,26 +160,49 @@ if errorlevel 1 ( echo %RABBITMQ_SERVICENAME% service is already present - only updating service parameters
)
-set RABBIT_EBIN=%~dp0..\ebin
+set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
+if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
+ echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
+ set RABBITMQ_BOOT_FILE=%RABBITMQ_EBIN_ROOT%\rabbit
+ set RABBITMQ_EBIN_PATH=
+) else (
+ set RABBITMQ_BOOT_FILE=start_sasl
+ set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
+)
+if "%RABBITMQ_CONFIG_FILE%"=="" (
+ set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+)
+
+if exist "%RABBITMQ_CONFIG_FILE%.config" (
+ set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+) else (
+ set RABBITMQ_CONFIG_ARG=
+)
+
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]"
+ )
+)
set ERLANG_SERVICE_ARGUMENTS= ^
--pa "%RABBIT_EBIN%" ^
--boot start_sasl ^
+%RABBITMQ_EBIN_PATH% ^
+-boot "%RABBITMQ_BOOT_FILE%" ^
+%RABBITMQ_CONFIG_ARG% ^
-s rabbit ^
+W w ^
+A30 ^
-kernel inet_default_listen_options "[{nodelay,true},{sndbuf,16384},{recbuf,4096}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\",%RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
+%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
-sasl sasl_error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--os_mon start_os_sup false ^
--os_mon memsup_system_only true ^
--os_mon system_memory_high_watermark 0.95 ^
-mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBITMQ_SERVER_START_ARGS% ^
@@ -202,3 +229,5 @@ goto END :END
+
+endlocal
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 8a4e5445..512e8587 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -30,6 +30,8 @@ REM REM Contributor(s): ______________________________________.
REM
+setlocal
+
if "%RABBITMQ_NODENAME%"=="" (
set RABBITMQ_NODENAME=rabbit
)
@@ -47,3 +49,5 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( )
"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -nodename %RABBITMQ_NODENAME% -extra %*
+
+endlocal
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index a2d9350c..53edf8de 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -180,6 +180,20 @@ -import(error_logger, [format/2]). %%%========================================================================= +%%% Specs. These exist only to shut up dialyzer's warnings +%%%========================================================================= + +-ifdef(use_specs). + +-spec(handle_common_termination/6 :: + (any(), any(), any(), atom(), any(), any()) -> no_return()). + +-spec(hibernate/7 :: + (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()). + +-endif. + +%%%========================================================================= %%% API %%%========================================================================= diff --git a/src/rabbit.erl b/src/rabbit.erl index 18fd1b17..c6dde385 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -140,9 +140,17 @@ start(normal, []) -> ok = rabbit_binary_generator: check_empty_content_body_frame_size(), - {ok, MemoryAlarms} = application:get_env(memory_alarms), - ok = rabbit_alarm:start(MemoryAlarms), - + ok = rabbit_alarm:start(), + + {ok, MemoryWatermark} = + application:get_env(vm_memory_high_watermark), + ok = case MemoryWatermark == 0 of + true -> + ok; + false -> + start_child(vm_memory_monitor, [MemoryWatermark]) + end, + ok = rabbit_amqqueue:start(), ok = start_child(rabbit_router), @@ -202,6 +210,10 @@ start(normal, []) -> stop(_State) -> terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), + ok = case rabbit_mnesia:is_clustered() of + true -> rabbit_amqqueue:on_node_down(node()); + false -> rabbit_mnesia:empty_ram_only_tables() + end, ok. %--------------------------------------------------------------------------- @@ -264,8 +276,11 @@ print_banner() -> io:nl(). start_child(Mod) -> + start_child(Mod, []). + +start_child(Mod, Args) -> {ok,_} = supervisor:start_child(rabbit_sup, - {Mod, {Mod, start_link, []}, + {Mod, {Mod, start_link, Args}, transient, 100, worker, [Mod]}), ok. diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 7a2fbcb8..9a639ed4 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -33,24 +33,19 @@ -behaviour(gen_event). --export([start/1, stop/0, register/2]). +-export([start/0, stop/0, register/2]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). --define(MEMSUP_CHECK_INTERVAL, 1000). - -%% OSes on which we know memory alarms to be trustworthy --define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]). - --record(alarms, {alertees, system_memory_high_watermark = false}). +-record(alarms, {alertees, vm_memory_high_watermark = false}). %%---------------------------------------------------------------------------- -ifdef(use_specs). -type(mfa_tuple() :: {atom(), atom(), list()}). --spec(start/1 :: (boolean() | 'auto') -> 'ok'). +-spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). @@ -58,20 +53,8 @@ %%---------------------------------------------------------------------------- -start(MemoryAlarms) -> - EnableAlarms = case MemoryAlarms of - true -> true; - false -> false; - auto -> lists:member(os:type(), ?SUPPORTED_OS) - end, - ok = alarm_handler:add_alarm_handler(?MODULE, [EnableAlarms]), - case whereis(memsup) of - undefined -> if EnableAlarms -> ok = start_memsup(), - ok = adjust_memsup_interval(); - true -> ok - end; - _ -> ok = adjust_memsup_interval() - end. +start() -> + ok = alarm_handler:add_alarm_handler(?MODULE, []). stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). @@ -83,43 +66,33 @@ register(Pid, HighMemMFA) -> %%---------------------------------------------------------------------------- -init([MemoryAlarms]) -> - {ok, #alarms{alertees = case MemoryAlarms of - true -> dict:new(); - false -> undefined - end}}. +init([]) -> + {ok, #alarms{alertees = dict:new()}}. -handle_call({register, _Pid, _HighMemMFA}, - State = #alarms{alertees = undefined}) -> - {ok, ok, State}; -handle_call({register, Pid, HighMemMFA}, +handle_call({register, Pid, {M, F, A} = HighMemMFA}, State = #alarms{alertees = Alertess}) -> _MRef = erlang:monitor(process, Pid), - case State#alarms.system_memory_high_watermark of - true -> {M, F, A} = HighMemMFA, - ok = erlang:apply(M, F, A ++ [Pid, true]); - false -> ok - end, + ok = case State#alarms.vm_memory_high_watermark of + true -> apply(M, F, A ++ [Pid, true]); + false -> ok + end, NewAlertees = dict:store(Pid, HighMemMFA, Alertess), {ok, ok, State#alarms{alertees = NewAlertees}}; handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> +handle_event({set_alarm, {vm_memory_high_watermark, []}}, State) -> ok = alert(true, State#alarms.alertees), - {ok, State#alarms{system_memory_high_watermark = true}}; + {ok, State#alarms{vm_memory_high_watermark = true}}; -handle_event({clear_alarm, system_memory_high_watermark}, State) -> +handle_event({clear_alarm, vm_memory_high_watermark}, State) -> ok = alert(false, State#alarms.alertees), - {ok, State#alarms{system_memory_high_watermark = false}}; + {ok, State#alarms{vm_memory_high_watermark = false}}; handle_event(_Event, State) -> {ok, State}. -handle_info({'DOWN', _MRef, process, _Pid, _Reason}, - State = #alarms{alertees = undefined}) -> - {ok, State}; handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #alarms{alertees = Alertess}) -> {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}}; @@ -134,57 +107,6 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- - -start_memsup() -> - {Mod, Args} = - case os:type() of - %% memsup doesn't take account of buffers or cache when - %% considering "free" memory - therefore on Linux we can - %% get memory alarms very easily without any pressure - %% existing on memory at all. Therefore we need to use - %% our own simple memory monitor. - %% - {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]}; - {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]}; - - %% Start memsup programmatically rather than via the - %% rabbitmq-server script. This is not quite the right - %% thing to do as os_mon checks to see if memsup is - %% available before starting it, but as memsup is - %% available everywhere (even on VXWorks) it should be - %% ok. - %% - %% One benefit of the programmatic startup is that we - %% can add our alarm_handler before memsup is running, - %% thus ensuring that we notice memory alarms that go - %% off on startup. - %% - _ -> {memsup, []} - end, - %% This is based on os_mon:childspec(memsup, true) - {ok, _} = supervisor:start_child( - os_mon_sup, - {memsup, {Mod, start_link, Args}, - permanent, 2000, worker, [Mod]}), - ok. - -adjust_memsup_interval() -> - %% The default memsup check interval is 1 minute, which is way too - %% long - rabbit can gobble up all memory in a matter of seconds. - %% Unfortunately the memory_check_interval configuration parameter - %% and memsup:set_check_interval/1 function only provide a - %% granularity of minutes. So we have to peel off one layer of the - %% API to get to the underlying layer which operates at the - %% granularity of milliseconds. - %% - %% Note that the new setting will only take effect after the first - %% check has completed, i.e. after one minute. So if rabbit eats - %% all the memory within the first minute after startup then we - %% are out of luck. - ok = os_mon:call(memsup, - {set_check_interval, ?MEMSUP_CHECK_INTERVAL}, - infinity). - alert(_Alert, undefined) -> ok; alert(Alert, Alertees) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe2e8509..80b7a92c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -281,13 +281,13 @@ possibly_unblock(State, ChPid, Update) -> store_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; - unblock -> {NewBlockedeConsumers, NewActiveConsumers} = + unblock -> {NewBlockedConsumers, NewActiveConsumers} = move_consumers(ChPid, State#q.blocked_consumers, State#q.active_consumers), run_poke_burst( State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedeConsumers}) + blocked_consumers = NewBlockedConsumers}) end end. @@ -297,7 +297,7 @@ should_auto_delete(State) -> is_unused(State). handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of - not_found -> noreply(State); + not_found -> {ok, State}; #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, unacked_messages = UAM} -> erlang:demonitor(MonitorRef), @@ -321,8 +321,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> blocked_consumers = remove_consumers( ChPid, State#q.blocked_consumers)}), case should_auto_delete(NewState) of - false -> noreply(NewState); - true -> {stop, normal, NewState} + false -> {ok, NewState}; + true -> {stop, NewState} end end. @@ -576,10 +576,16 @@ handle_call({commit, Txn}, From, State) -> erase_tx(Txn), noreply(NewState); -handle_call({notify_down, ChPid}, From, State) -> - %% optimisation: we reply straight away so the sender can continue - gen_server2:reply(From, ok), - handle_ch_down(ChPid, State); +handle_call({notify_down, ChPid}, _From, State) -> + %% we want to do this synchronously, so that auto_deleted queues + %% are no longer visible by the time we send a response to the + %% client. The queue is ultimately deleted in terminate/2; if we + %% return stop with a reply, terminate/2 will be called by + %% gen_server2 *before* the reply is sent. + case handle_ch_down(ChPid, State) of + {ok, NewState} -> reply(ok, NewState); + {stop, NewState} -> {stop, normal, ok, NewState} + end; handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, @@ -813,7 +819,10 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, NewState = State#q{owner = none}, {stop, normal, NewState}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> - handle_ch_down(DownPid, State); + case handle_ch_down(DownPid, State) of + {ok, NewState} -> noreply(NewState); + {stop, NewState} -> {stop, normal, NewState} + end; handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 6cfa9e6d..01ac4f02 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -132,52 +132,66 @@ create_frame(TypeInt, ChannelInt, Payload) -> %% I, D, T and F, as well as the QPid extensions b, d, f, l, s, t, x, %% and V. -table_field_to_binary({FName, longstr, Value}) -> - [short_string_to_binary(FName), "S", long_string_to_binary(Value)]; +table_field_to_binary({FName, Type, Value}) -> + [short_string_to_binary(FName) | field_value_to_binary(Type, Value)]. -table_field_to_binary({FName, signedint, Value}) -> - [short_string_to_binary(FName), "I", <<Value:32/signed>>]; +field_value_to_binary(longstr, Value) -> + ["S", long_string_to_binary(Value)]; -table_field_to_binary({FName, decimal, {Before, After}}) -> - [short_string_to_binary(FName), "D", Before, <<After:32>>]; +field_value_to_binary(signedint, Value) -> + ["I", <<Value:32/signed>>]; -table_field_to_binary({FName, timestamp, Value}) -> - [short_string_to_binary(FName), "T", <<Value:64>>]; +field_value_to_binary(decimal, {Before, After}) -> + ["D", Before, <<After:32>>]; -table_field_to_binary({FName, table, Value}) -> - [short_string_to_binary(FName), "F", table_to_binary(Value)]; +field_value_to_binary(timestamp, Value) -> + ["T", <<Value:64>>]; -table_field_to_binary({FName, byte, Value}) -> - [short_string_to_binary(FName), "b", <<Value:8/unsigned>>]; +field_value_to_binary(table, Value) -> + ["F", table_to_binary(Value)]; -table_field_to_binary({FName, double, Value}) -> - [short_string_to_binary(FName), "d", <<Value:64/float>>]; +field_value_to_binary(array, Value) -> + ["A", array_to_binary(Value)]; -table_field_to_binary({FName, float, Value}) -> - [short_string_to_binary(FName), "f", <<Value:32/float>>]; +field_value_to_binary(byte, Value) -> + ["b", <<Value:8/unsigned>>]; -table_field_to_binary({FName, long, Value}) -> - [short_string_to_binary(FName), "l", <<Value:64/signed>>]; +field_value_to_binary(double, Value) -> + ["d", <<Value:64/float>>]; -table_field_to_binary({FName, short, Value}) -> - [short_string_to_binary(FName), "s", <<Value:16/signed>>]; +field_value_to_binary(float, Value) -> + ["f", <<Value:32/float>>]; -table_field_to_binary({FName, bool, Value}) -> - [short_string_to_binary(FName), "t", if Value -> 1; true -> 0 end]; +field_value_to_binary(long, Value) -> + ["l", <<Value:64/signed>>]; -table_field_to_binary({FName, binary, Value}) -> - [short_string_to_binary(FName), "x", long_string_to_binary(Value)]; +field_value_to_binary(short, Value) -> + ["s", <<Value:16/signed>>]; -table_field_to_binary({FName, void, _Value}) -> - [short_string_to_binary(FName), "V"]. +field_value_to_binary(bool, Value) -> + ["t", if Value -> 1; true -> 0 end]; + +field_value_to_binary(binary, Value) -> + ["x", long_string_to_binary(Value)]; + +field_value_to_binary(void, _Value) -> + ["V"]. table_to_binary(Table) when is_list(Table) -> BinTable = generate_table(Table), [<<(size(BinTable)):32>>, BinTable]. +array_to_binary(Array) when is_list(Array) -> + BinArray = generate_array(Array), + [<<(size(BinArray)):32>>, BinArray]. + generate_table(Table) when is_list(Table) -> list_to_binary(lists:map(fun table_field_to_binary/1, Table)). +generate_array(Array) when is_list(Array) -> + list_to_binary(lists:map( + fun ({Type, Value}) -> field_value_to_binary(Type, Value) end, + Array)). short_string_to_binary(String) when is_binary(String) and (size(String) < 256) -> [<<(size(String)):8>>, String]; @@ -186,13 +200,11 @@ short_string_to_binary(String) -> true = (StringLength < 256), % assertion [<<StringLength:8>>, String]. - long_string_to_binary(String) when is_binary(String) -> [<<(size(String)):32>>, String]; long_string_to_binary(String) -> [<<(length(String)):32>>, String]. - encode_properties([], []) -> <<0, 0>>; encode_properties(TypeList, ValueList) -> @@ -238,32 +250,7 @@ encode_property(longlongint, Int) -> encode_property(timestamp, Int) -> <<Int:64/unsigned>>; encode_property(table, Table) -> - encode_table(Table). - - -encode_table(Table) -> - TableBin = list_to_binary(lists:map(fun encode_table_entry/1, Table)), - Len = size(TableBin), - <<Len:32/unsigned, TableBin:Len/binary>>. - - -encode_table_entry({Name, longstr, Value}) -> - NLen = size(Name), - VLen = size(Value), - <<NLen:8/unsigned, Name:NLen/binary, "S", VLen:32/unsigned, Value:VLen/binary>>; -encode_table_entry({Name, signedint, Value}) -> - NLen = size(Name), - <<NLen:8/unsigned, Name:NLen/binary, "I", Value:32/signed>>; -encode_table_entry({Name, decimal, {Before, After}}) -> - NLen = size(Name), - <<NLen:8/unsigned, Name:NLen/binary, "D", Before:8/unsigned, After:32/unsigned>>; -encode_table_entry({Name, timestamp, Value}) -> - NLen = size(Name), - <<NLen:8/unsigned, Name:NLen/binary, "T", Value:64/unsigned>>; -encode_table_entry({Name, table, Value}) -> - NLen = size(Name), - TableBin = encode_table(Value), - <<NLen:8/unsigned, Name:NLen/binary, "F", TableBin/binary>>. + table_to_binary(Table). check_empty_content_body_frame_size() -> %% Intended to ensure that EMPTY_CONTENT_BODY_FRAME_SIZE is diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 4ef382aa..506e87ec 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -56,45 +56,57 @@ parse_table(<<>>) -> []; +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, ValueAndRest/binary>>) -> + {Type, Value, Rest} = parse_field_value(ValueAndRest), + [{NameString, Type, Value} | parse_table(Rest)]. -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "S", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) -> - [{NameString, longstr, ValueString} | parse_table(Rest)]; +parse_array(<<>>) -> + []; +parse_array(<<ValueAndRest/binary>>) -> + {Type, Value, Rest} = parse_field_value(ValueAndRest), + [{Type, Value} | parse_array(Rest)]. + +parse_field_value(<<"S", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) -> + {longstr, ValueString, Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "I", Value:32/signed, Rest/binary>>) -> - [{NameString, signedint, Value} | parse_table(Rest)]; +parse_field_value(<<"I", Value:32/signed, Rest/binary>>) -> + {signedint, Value, Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "D", Before:8/unsigned, After:32/unsigned, Rest/binary>>) -> - [{NameString, decimal, {Before, After}} | parse_table(Rest)]; +parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, Rest/binary>>) -> + {decimal, {Before, After}, Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "T", Value:64/unsigned, Rest/binary>>) -> - [{NameString, timestamp, Value} | parse_table(Rest)]; +parse_field_value(<<"T", Value:64/unsigned, Rest/binary>>) -> + {timestamp, Value, Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "F", VLen:32/unsigned, Table:VLen/binary, Rest/binary>>) -> - [{NameString, table, parse_table(Table)} | parse_table(Rest)]; +parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, Rest/binary>>) -> + {table, parse_table(Table), Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "b", Value:8/unsigned, Rest/binary>>) -> - [{NameString, byte, Value} | parse_table(Rest)]; +parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, Rest/binary>>) -> + {array, parse_array(Array), Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "d", Value:64/float, Rest/binary>>) -> - [{NameString, double, Value} | parse_table(Rest)]; +parse_field_value(<<"b", Value:8/unsigned, Rest/binary>>) -> + {byte, Value, Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "f", Value:32/float, Rest/binary>>) -> - [{NameString, float, Value} | parse_table(Rest)]; +parse_field_value(<<"d", Value:64/float, Rest/binary>>) -> + {double, Value, Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "l", Value:64/signed, Rest/binary>>) -> - [{NameString, long, Value} | parse_table(Rest)]; +parse_field_value(<<"f", Value:32/float, Rest/binary>>) -> + {float, Value, Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "s", Value:16/signed, Rest/binary>>) -> - [{NameString, short, Value} | parse_table(Rest)]; +parse_field_value(<<"l", Value:64/signed, Rest/binary>>) -> + {long, Value, Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "t", Value:8/unsigned, Rest/binary>>) -> - [{NameString, bool, (Value /= 0)} | parse_table(Rest)]; +parse_field_value(<<"s", Value:16/signed, Rest/binary>>) -> + {short, Value, Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "x", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) -> - [{NameString, binary, ValueString} | parse_table(Rest)]; +parse_field_value(<<"t", Value:8/unsigned, Rest/binary>>) -> + {bool, (Value /= 0), Rest}; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "V", Rest/binary>>) -> - [{NameString, void, undefined} | parse_table(Rest)]. +parse_field_value(<<"x", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) -> + {binary, ValueString, Rest}; + +parse_field_value(<<"V", Rest/binary>>) -> + {void, undefined, Rest}. parse_properties([], _PropBin) -> @@ -147,7 +159,6 @@ parse_property(bit, Rest) -> parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) -> {parse_table(Table), Rest}. - ensure_content_decoded(Content = #content{properties = Props}) when Props =/= 'none' -> Content; diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index a53ac289..ddd0c002 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -52,12 +52,11 @@ %%---------------------------------------------------------------------------- start() -> - {ok, [[NodeNameStr|_]|_]} = init:get_argument(nodename), - NodeName = list_to_atom(NodeNameStr), + {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), FullCommand = init:get_plain_arguments(), #params{quiet = Quiet, node = Node, command = Command, args = Args} = parse_args(FullCommand, #params{quiet = false, - node = rabbit_misc:localnode(NodeName)}), + node = rabbit_misc:makenode(NodeStr)}), Inform = case Quiet of true -> fun(_Format, _Args1) -> ok end; false -> fun(Format, Args1) -> @@ -97,12 +96,12 @@ error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). print_badrpc_diagnostics(Node) -> fmt_stderr("diagnostics:", []), - NodeHost = rabbit_misc:nodehost(Node), + {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node), case net_adm:names(NodeHost) of {error, EpmdReason} -> fmt_stderr("- unable to connect to epmd on ~s: ~w", [NodeHost, EpmdReason]); - {ok, NamePorts} -> + {ok, NamePorts} -> fmt_stderr("- nodes and their ports on ~s: ~p", [NodeHost, [{list_to_atom(Name), Port} || {Name, Port} <- NamePorts]]) @@ -116,11 +115,7 @@ print_badrpc_diagnostics(Node) -> ok. parse_args(["-n", NodeS | Args], Params) -> - Node = case lists:member($@, NodeS) of - true -> list_to_atom(NodeS); - false -> rabbit_misc:localnode(list_to_atom(NodeS)) - end, - parse_args(Args, Params#params{node = Node}); + parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)}); parse_args(["-q" | Args], Params) -> parse_args(Args, Params#params{quiet = true}); parse_args([Command | Args], Params) -> @@ -178,7 +173,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is \"/\". <QueueInfoItem> must be a member of the list [name, durable, auto_delete, -arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted, +arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted, consumers, transactions, memory]. The default is to display name and (number of) messages. @@ -186,12 +181,12 @@ messages, acks_uncommitted, consumers, transactions, memory]. The default is auto_delete, arguments]. The default is to display name and type. The output format for \"list_bindings\" is a list of rows containing -exchange name, routing key, queue name and arguments, in that order. +exchange name, queue name, routing key and arguments, in that order. -<ConnectionInfoItem> must be a member of the list [node, address, port, +<ConnectionInfoItem> must be a member of the list [pid, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, -recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display -user, peer_address, peer_port and state. +client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. +The default is to display user, peer_address, peer_port and state. "), halt(1). @@ -273,8 +268,7 @@ action(list_user_permissions, Node, Args = [_Username], Inform) -> action(list_queues, Node, Args, Inform) -> Inform("Listing queues", []), {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), - ArgAtoms = list_replace(node, pid, - default_if_empty(RemainingArgs, [name, messages])), + ArgAtoms = default_if_empty(RemainingArgs, [name, messages]), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]), ArgAtoms); @@ -290,7 +284,7 @@ action(list_exchanges, Node, Args, Inform) -> action(list_bindings, Node, Args, Inform) -> Inform("Listing bindings", []), {VHostArg, _} = parse_vhost_flag_bin(Args), - InfoKeys = [exchange_name, routing_key, queue_name, args], + InfoKeys = [exchange_name, queue_name, routing_key, args], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], @@ -299,9 +293,7 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = list_replace(node, pid, - default_if_empty(Args, [user, peer_address, - peer_port, state])), + ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); @@ -363,12 +355,15 @@ format_info_item(Key, Items) -> is_tuple(Value) -> inet_parse:ntoa(Value); Value when is_pid(Value) -> - atom_to_list(node(Value)); + pid_to_string(Value); Value when is_binary(Value) -> escape(Value); Value when is_atom(Value) -> - escape(atom_to_list(Value)); - Value -> + escape(atom_to_list(Value)); + Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _] + when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> + io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); + Value -> io_lib:format("~w", [Value]) end. @@ -393,14 +388,14 @@ rpc_call(Node, Mod, Fun, Args) -> %% characters. We don't escape characters above 127, since they may %% form part of UTF-8 strings. -escape(Bin) when binary(Bin) -> +escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin)); escape(L) when is_list(L) -> escape_char(lists:reverse(L), []). escape_char([$\\ | T], Acc) -> escape_char(T, [$\\, $\\ | Acc]); -escape_char([X | T], Acc) when X > 32, X /= 127 -> +escape_char([X | T], Acc) when X >= 32, X /= 127 -> escape_char(T, [X | Acc]); escape_char([X | T], Acc) -> escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3), @@ -408,6 +403,20 @@ escape_char([X | T], Acc) -> escape_char([], Acc) -> Acc. -list_replace(Find, Replace, List) -> - [case X of Find -> Replace; _ -> X end || X <- List]. +prettify_amqp_table(Table) -> + [{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table]. +prettify_typed_amqp_value(Type, Value) -> + case Type of + longstr -> escape(Value); + table -> prettify_amqp_table(Value); + array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; + _ -> Value + end. + +%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7) +pid_to_string(Pid) -> + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> + = term_to_binary(Pid), + Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), + lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index b789fbd1..ea61a679 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -104,13 +104,8 @@ guid() -> %% generate a readable string representation of a guid. Note that any %% monotonicity of the guid is not preserved in the encoding. string_guid(Prefix) -> - %% we use the (undocumented) ssl_base64 module here because it is - %% present throughout OTP R11 and R12 whereas base64 only becomes - %% available in R11B-4. - %% - %% TODO: once debian stable and EPEL have moved from R11B-2 to - %% R11B-4 or later we should change this to use base64. - Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))). + Prefix ++ "-" ++ base64:encode_to_string( + erlang:md5(term_to_binary(guid()))). binstring_guid(Prefix) -> list_to_binary(string_guid(Prefix)). diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl deleted file mode 100644 index b0d57cb2..00000000 --- a/src/rabbit_memsup.erl +++ /dev/null @@ -1,142 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_memsup). - --behaviour(gen_server). - --export([start_link/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([update/0]). - --record(state, {memory_fraction, - timeout, - timer, - mod, - mod_state, - alarmed - }). - --define(SERVER, memsup). %% must be the same as the standard memsup - --define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(update/0 :: () -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- - -start_link(Args) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). - -update() -> - gen_server:cast(?SERVER, update). - -%%---------------------------------------------------------------------------- - -init([Mod]) -> - Fraction = os_mon:get_env(memsup, system_memory_high_watermark), - TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), - InitState = Mod:init(), - State = #state { memory_fraction = Fraction, - timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, - timer = TRef, - mod = Mod, - mod_state = InitState, - alarmed = false }, - {ok, internal_update(State)}. - -start_timer(Timeout) -> - {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), - TRef. - -%% Export the same API as the real memsup. Note that -%% get_sysmem_high_watermark gives an int in the range 0 - 100, while -%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. -handle_call(get_sysmem_high_watermark, _From, State) -> - {reply, trunc(100 * State#state.memory_fraction), State}; - -handle_call({set_sysmem_high_watermark, Float}, _From, State) -> - {reply, ok, State#state{memory_fraction = Float}}; - -handle_call(get_check_interval, _From, State) -> - {reply, State#state.timeout, State}; - -handle_call({set_check_interval, Timeout}, _From, State) -> - {ok, cancel} = timer:cancel(State#state.timer), - {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; - -handle_call(get_memory_data, _From, - State = #state { mod = Mod, mod_state = ModState }) -> - {reply, Mod:get_memory_data(ModState), State}; - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast(update, State) -> - {noreply, internal_update(State)}; - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -internal_update(State = #state { memory_fraction = MemoryFraction, - alarmed = Alarmed, - mod = Mod, mod_state = ModState }) -> - ModState1 = Mod:update(ModState), - {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1), - NewAlarmed = MemUsed / MemTotal > MemoryFraction, - case {Alarmed, NewAlarmed} of - {false, true} -> - alarm_handler:set_alarm({system_memory_high_watermark, []}); - {true, false} -> - alarm_handler:clear_alarm(system_memory_high_watermark); - _ -> - ok - end, - State #state { mod_state = ModState1, alarmed = NewAlarmed }. diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl deleted file mode 100644 index 3de2d843..00000000 --- a/src/rabbit_memsup_darwin.erl +++ /dev/null @@ -1,88 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_memsup_darwin). - --export([init/0, update/1, get_memory_data/1]). - --record(state, {total_memory, - allocated_memory}). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), - allocated_memory :: ('undefined' | non_neg_integer()) - }). - --spec(init/0 :: () -> state()). --spec(update/1 :: (state()) -> state()). --spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), - ('undefined' | pid())}). - --endif. - -%%---------------------------------------------------------------------------- - -init() -> - #state{total_memory = undefined, - allocated_memory = undefined}. - -update(State) -> - File = os:cmd("/usr/bin/vm_stat"), - Lines = string:tokens(File, "\n"), - Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), - [PageSize, Inactive, Active, Free, Wired] = - [dict:fetch(Key, Dict) || - Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free', - 'Pages wired down']], - MemTotal = PageSize * (Inactive + Active + Free + Wired), - MemUsed = PageSize * (Active + Wired), - State#state{total_memory = MemTotal, allocated_memory = MemUsed}. - -get_memory_data(State) -> - {State#state.total_memory, State#state.allocated_memory, undefined}. - -%%---------------------------------------------------------------------------- - -%% A line looks like "Foo bar: 123456." -parse_line(Line) -> - [Name, RHS | _Rest] = string:tokens(Line, ":"), - case Name of - "Mach Virtual Memory Statistics" -> - ["(page", "size", "of", PageSize, "bytes)"] = - string:tokens(RHS, " "), - {page_size, list_to_integer(PageSize)}; - _ -> - [Value | _Rest1] = string:tokens(RHS, " ."), - {list_to_atom(Name), list_to_integer(Value)} - end. diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl deleted file mode 100644 index ca942d7c..00000000 --- a/src/rabbit_memsup_linux.erl +++ /dev/null @@ -1,101 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_memsup_linux). - --export([init/0, update/1, get_memory_data/1]). - --record(state, {total_memory, - allocated_memory}). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), - allocated_memory :: ('undefined' | non_neg_integer()) - }). - --spec(init/0 :: () -> state()). --spec(update/1 :: (state()) -> state()). --spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), - ('undefined' | pid())}). - --endif. - -%%---------------------------------------------------------------------------- - -init() -> - #state{total_memory = undefined, - allocated_memory = undefined}. - -update(State) -> - File = read_proc_file("/proc/meminfo"), - Lines = string:tokens(File, "\n"), - Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), - [MemTotal, MemFree, Buffers, Cached] = - [dict:fetch(Key, Dict) || - Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']], - MemUsed = MemTotal - MemFree - Buffers - Cached, - State#state{total_memory = MemTotal, allocated_memory = MemUsed}. - -get_memory_data(State) -> - {State#state.total_memory, State#state.allocated_memory, undefined}. - -%%---------------------------------------------------------------------------- - --define(BUFFER_SIZE, 1024). - -%% file:read_file does not work on files in /proc as it seems to get -%% the size of the file first and then read that many bytes. But files -%% in /proc always have length 0, we just have to read until we get -%% eof. -read_proc_file(File) -> - {ok, IoDevice} = file:open(File, [read, raw]), - Res = read_proc_file(IoDevice, []), - file:close(IoDevice), - lists:flatten(lists:reverse(Res)). - -read_proc_file(IoDevice, Acc) -> - case file:read(IoDevice, ?BUFFER_SIZE) of - {ok, Res} -> read_proc_file(IoDevice, [Res | Acc]); - eof -> Acc - end. - -%% A line looks like "FooBar: 123456 kB" -parse_line(Line) -> - [Name, RHS | _Rest] = string:tokens(Line, ":"), - [Value | UnitsRest] = string:tokens(RHS, " "), - Value1 = case UnitsRest of - [] -> list_to_integer(Value); %% no units - ["kB"] -> list_to_integer(Value) * 1024 - end, - {list_to_atom(Name), Value1}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index b20e9a86..21764fce 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -47,7 +47,7 @@ -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). --export([localnode/1, nodehost/1, cookie_hash/0, tcp_name/3]). +-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). -export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). @@ -105,8 +105,8 @@ -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). --spec(localnode/1 :: (atom()) -> erlang_node()). --spec(nodehost/1 :: (erlang_node()) -> string()). +-spec(makenode/1 :: ({string(), string()} | string()) -> erlang_node()). +-spec(nodeparts/1 :: (erlang_node() | string()) -> {string(), string()}). -spec(cookie_hash/0 :: () -> string()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). @@ -308,16 +308,22 @@ execute_mnesia_transaction(TxFun) -> ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). -localnode(Name) -> - list_to_atom(lists:append([atom_to_list(Name), "@", nodehost(node())])). - -nodehost(Node) -> - %% This is horrible, but there doesn't seem to be a way to split a - %% nodename into its constituent parts. - tl(lists:dropwhile(fun (E) -> E =/= $@ end, atom_to_list(Node))). +makenode({Prefix, Suffix}) -> + list_to_atom(lists:append([Prefix, "@", Suffix])); +makenode(NodeStr) -> + makenode(nodeparts(NodeStr)). + +nodeparts(Node) when is_atom(Node) -> + nodeparts(atom_to_list(Node)); +nodeparts(NodeStr) -> + case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of + {Prefix, []} -> {_, Suffix} = nodeparts(node()), + {Prefix, Suffix}; + {Prefix, Suffix} -> {Prefix, tl(Suffix)} + end. cookie_hash() -> - ssl_base64:encode(erlang:md5(atom_to_list(erlang:get_cookie()))). + base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). tcp_name(Prefix, IPAddress, Port) when is_atom(Prefix) andalso is_number(Port) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c4d5aac6..749038db 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -32,7 +32,8 @@ -module(rabbit_mnesia). -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, reset/0, force_reset/0]). + cluster/1, reset/0, force_reset/0, is_clustered/0, + empty_ram_only_tables/0]). -export([table_names/0]). @@ -54,6 +55,8 @@ -spec(cluster/1 :: ([erlang_node()]) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). +-spec(is_clustered/0 :: () -> boolean()). +-spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -endif. @@ -98,6 +101,21 @@ cluster(ClusterNodes) -> reset() -> reset(false). force_reset() -> reset(true). +is_clustered() -> + RunningNodes = mnesia:system_info(running_db_nodes), + [node()] /= RunningNodes andalso [] /= RunningNodes. + +empty_ram_only_tables() -> + Node = node(), + lists:foreach( + fun (TabName) -> + case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of + true -> {atomic, ok} = mnesia:clear_table(TabName); + false -> ok + end + end, table_names()), + ok. + %%-------------------------------------------------------------------- table_definitions() -> diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index b1cc4d02..dc642df4 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -99,11 +99,16 @@ Available commands: action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), - N = list_to_integer(NodeCount), - {NodePids, Running} = start_nodes(N, N, [], true, - getenv("RABBITMQ_NODENAME"), - getenv("RABBITMQ_NODE_PORT"), - RpcTimeout), + application:load(rabbit), + NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")), + {NodePids, Running} = + case list_to_integer(NodeCount) of + 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName), + RpcTimeout), + {[NodePid], Started}; + N -> start_nodes(N, N, [], true, NodeName, + get_node_tcp_listener(), RpcTimeout) + end, write_pids_file(NodePids), case Running of true -> ok; @@ -156,26 +161,29 @@ action(rotate_logs, [Suffix], RpcTimeout) -> %% Running is a boolean exhibiting success at some moment start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running}; -start_nodes(N, Total, PNodePid, Running, - NodeNameBase, NodePortBase, RpcTimeout) -> +start_nodes(N, Total, PNodePid, Running, NodeNameBase, Listener, RpcTimeout) -> + {NodePre, NodeSuff} = NodeNameBase, NodeNumber = Total - N, - NodeName = if NodeNumber == 0 -> - %% For compatibility with running a single node - NodeNameBase; - true -> - NodeNameBase ++ "_" ++ integer_to_list(NodeNumber) + NodePre1 = case NodeNumber of + %% For compatibility with running a single node + 0 -> NodePre; + _ -> NodePre ++ "_" ++ integer_to_list(NodeNumber) end, - {NodePid, Started} = start_node(NodeName, - list_to_integer(NodePortBase) + NodeNumber, - RpcTimeout), + Node = rabbit_misc:makenode({NodePre1, NodeSuff}), + os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)), + case Listener of + {NodeIpAddress, NodePortBase} -> + NodePort = NodePortBase + NodeNumber, + os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)), + os:putenv("RABBITMQ_NODE_IP_ADDRESS", NodeIpAddress); + undefined -> + ok + end, + {NodePid, Started} = start_node(Node, RpcTimeout), start_nodes(N - 1, Total, [NodePid | PNodePid], - Started and Running, - NodeNameBase, NodePortBase, RpcTimeout). + Started and Running, NodeNameBase, Listener, RpcTimeout). -start_node(NodeName, NodePort, RpcTimeout) -> - os:putenv("RABBITMQ_NODENAME", NodeName), - os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)), - Node = rabbit_misc:localnode(list_to_atom(NodeName)), +start_node(Node, RpcTimeout) -> io:format("Starting node ~s...~n", [Node]), case rpc:call(Node, os, getpid, []) of {badrpc, _} -> @@ -291,7 +299,7 @@ kill_wait(Pid, TimeLeft, Forceful) -> io:format(".", []), is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful). -% Test using some OS clunkiness since we shouldn't trust +% Test using some OS clunkiness since we shouldn't trust % rpc:call(os, getpid, []) at this point is_dead(Pid) -> PidS = integer_to_list(Pid), @@ -319,3 +327,21 @@ getenv(Var) -> false -> throw({missing_env_var, Var}); Value -> Value end. + +get_node_tcp_listener() -> + try + {getenv("RABBITMQ_NODE_IP_ADDRESS"), + list_to_integer(getenv("RABBITMQ_NODE_PORT"))} + catch _ -> + case application:get_env(rabbit, tcp_listeners) of + {ok, [{_IpAddy, _Port} = Listener]} -> + Listener; + {ok, []} -> + undefined; + {ok, Other} -> + throw({cannot_start_multiple_nodes, multiple_tcp_listeners, + Other}); + undefined -> + throw({missing_configuration, tcp_listeners}) + end + end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 1bc17a32..3a0f9240 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -53,6 +53,9 @@ %% {delay_send, true}, {exit_on_close, false} ]). + +-define(SSL_TIMEOUT, 5). %% seconds + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -160,36 +163,31 @@ node_listeners(Node) -> on_node_down(Node) -> ok = mnesia:dirty_delete(rabbit_listener, Node). -start_client(Sock) -> +start_client(Sock, SockTransform) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), ok = rabbit_net:controlling_process(Sock, Child), - Child ! {go, Sock}, + Child ! {go, Sock, SockTransform}, Child. +start_client(Sock) -> + start_client(Sock, fun (S) -> {ok, S} end). + start_ssl_client(SslOpts, Sock) -> - case rabbit_net:peername(Sock) of - {ok, {PeerAddress, PeerPort}} -> - PeerIp = inet_parse:ntoa(PeerAddress), - case ssl:ssl_accept(Sock, SslOpts) of - {ok, SslSock} -> - rabbit_log:info("upgraded TCP connection " - "from ~s:~p to SSL~n", - [PeerIp, PeerPort]), - RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, - start_client(RabbitSslSock); - {error, Reason} -> - gen_tcp:close(Sock), - rabbit_log:error("failed to upgrade TCP connection " - "from ~s:~p to SSL: ~n~p~n", - [PeerIp, PeerPort, Reason]), - {error, Reason} - end; - {error, Reason} -> - gen_tcp:close(Sock), - rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n", - [Reason]), - {error, Reason} - end. + start_client( + Sock, + fun (Sock1) -> + case catch ssl:ssl_accept(Sock1, SslOpts, ?SSL_TIMEOUT * 1000) of + {ok, SslSock} -> + rabbit_log:info("upgraded TCP connection ~p to SSL~n", + [self()]), + {ok, #ssl_socket{tcp = Sock1, ssl = SslSock}}; + {error, Reason} -> + {error, {ssl_upgrade_error, Reason}}; + {'EXIT', Reason} -> + {error, {ssl_upgrade_failure, Reason}} + + end + end). connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index f28c4a6e..9f787920 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -39,6 +39,17 @@ -define(BaseApps, [rabbit]). %%---------------------------------------------------------------------------- +%% Specs +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> no_return()). +-spec(stop/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- start() -> %% Ensure Rabbit is loaded so we can access it's environment @@ -62,8 +73,8 @@ start() -> %% Build the entire set of dependencies - this will load the %% applications along the way AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of - {failed_to_load_app, App, Err} -> - error("failed to load application ~s: ~p", [App, Err]); + {failed_to_load_app, App, Err} -> + error("failed to load application ~s:~n~p", [App, Err]); AppList -> AppList end, @@ -71,8 +82,8 @@ start() -> {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions), %% Build the overall release descriptor - RDesc = {release, - {"rabbit", RabbitVersion}, + RDesc = {release, + {"rabbit", RabbitVersion}, {erts, erlang:system_info(version)}, AppVersions}, @@ -82,15 +93,15 @@ start() -> %% Compile the script ScriptFile = RootName ++ ".script", case systools:make_script(RootName, [local, silent]) of - {ok, Module, Warnings} -> + {ok, Module, Warnings} -> %% This gets lots of spurious no-source warnings when we %% have .ez files, so we want to supress them to prevent %% hiding real issues. WarningStr = Module:format_warning( - [W || W <- Warnings, - case W of - {warning, {source_not_found, _}} -> false; - _ -> true + [W || W <- Warnings, + case W of + {warning, {source_not_found, _}} -> false; + _ -> true end]), case length(WarningStr) of 0 -> ok; @@ -98,14 +109,14 @@ start() -> end, ok; {error, Module, Error} -> - error("generation of boot script file ~s failed: ~w", + error("generation of boot script file ~s failed:~n~s", [ScriptFile, Module:format_error(Error)]) end, case post_process_script(ScriptFile) of ok -> ok; {error, Reason} -> - error("post processing of boot script file ~s failed: ~w", + error("post processing of boot script file ~s failed:~n~w", [ScriptFile, Reason]) end, case systools:script2boot(RootName) of @@ -125,8 +136,8 @@ get_env(Key, Default) -> end. determine_version(App) -> - application:load(App), - {ok, Vsn} = application:get_key(App, vsn), + application:load(App), + {ok, Vsn} = application:get_key(App, vsn), {App, Vsn}. assert_dir(Dir) -> @@ -225,7 +236,7 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entries([]) -> +process_entries([]) -> []; process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} | Rest]) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e21485b5..e78d889d 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -58,7 +58,7 @@ -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, - state, channels, user, vhost, timeout, frame_max]). + state, channels, user, vhost, timeout, frame_max, client_properties]). %% connection lifecycle %% @@ -142,7 +142,8 @@ start_link() -> init(Parent) -> Deb = sys:debug_options([]), receive - {go, Sock} -> start_connection(Parent, Deb, Sock) + {go, Sock, SockTransform} -> + start_connection(Parent, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, State) -> @@ -192,34 +193,35 @@ teardown_profiling(Value) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). -peername(Sock) -> - try - {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end), - AddressS = inet_parse:ntoa(Address), - {AddressS, Port} - catch - Ex -> rabbit_log:error("error on TCP connection ~p:~p~n", - [self(), Ex]), - rabbit_log:info("closing TCP connection ~p", [self()]), - exit(normal) +socket_op(Sock, Fun) -> + case Fun(Sock) of + {ok, Res} -> Res; + {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n", + [self(), Reason]), + rabbit_log:info("closing TCP connection ~p~n", + [self()]), + exit(normal) end. -start_connection(Parent, Deb, ClientSock) -> +start_connection(Parent, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - {PeerAddressS, PeerPort} = peername(ClientSock), + {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), + PeerAddressS = inet_parse:ntoa(PeerAddress), + rabbit_log:info("starting TCP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), + ClientSock = socket_op(Sock, SockTransform), + erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), + handshake_timeout), ProfilingValue = setup_profiling(), try - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), - erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), - handshake_timeout), mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, connection = #connection{ user = none, timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, - vhost = none}, + vhost = none, + client_properties = none}, callback = uninitialized_callback, recv_ref = none, connection_state = pre_init}, @@ -558,7 +560,8 @@ handle_method0(MethodName, FieldsBin, State) -> end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, - response = Response}, + response = Response, + client_properties = ClientProperties}, State = #v1{connection_state = starting, connection = Connection, sock = Sock}) -> @@ -570,7 +573,9 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, frame_max = 131072, heartbeat = 0}), State#v1{connection_state = tuning, - connection = Connection#connection{user = User}}; + connection = Connection#connection{ + user = User, + client_properties = ClientProperties}}; handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, frame_max = FrameMax, heartbeat = ClientHeartbeat}, @@ -689,6 +694,9 @@ i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> Timeout; i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> FrameMax; +i(client_properties, #v1{connection = #connection{ + client_properties = ClientProperties}}) -> + ClientProperties; i(Item, #v1{}) -> throw({bad_argument, Item}). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 5c5c55f1..ba048184 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -193,6 +193,7 @@ test_unfold() -> test_parsing() -> passed = test_content_properties(), + passed = test_field_values(), passed. test_content_properties() -> @@ -218,9 +219,12 @@ test_content_properties() -> [{<<"one">>, signedint, 1}, {<<"two">>, signedint, 2}]}]}], << - 16#8000:16, % flags - % properties: + % property-flags + 16#8000:16, + % property-list: + + % table 117:32, % table length in bytes 11,"a signedint", % name @@ -249,6 +253,51 @@ test_content_properties() -> V -> exit({got_success_but_expected_failure, V}) end. +test_field_values() -> + %% FIXME this does not test inexact numbers (double and float) yet, + %% because they won't pass the equality assertions + test_content_prop_roundtrip( + [{table, [{<<"longstr">>, longstr, <<"Here is a long string">>}, + {<<"signedint">>, signedint, 12345}, + {<<"decimal">>, decimal, {3, 123456}}, + {<<"timestamp">>, timestamp, 109876543209876}, + {<<"table">>, table, [{<<"one">>, signedint, 54321}, + {<<"two">>, longstr, <<"A long string">>}]}, + {<<"byte">>, byte, 255}, + {<<"long">>, long, 1234567890}, + {<<"short">>, short, 655}, + {<<"bool">>, bool, true}, + {<<"binary">>, binary, <<"a binary string">>}, + {<<"void">>, void, undefined}, + {<<"array">>, array, [{signedint, 54321}, + {longstr, <<"A long string">>}]} + + ]}], + << + % property-flags + 16#8000:16, + % table length in bytes + 228:32, + + 7,"longstr", "S", 21:32, "Here is a long string", % = 34 + 9,"signedint", "I", 12345:32/signed, % + 15 = 49 + 7,"decimal", "D", 3, 123456:32, % + 14 = 63 + 9,"timestamp", "T", 109876543209876:64, % + 19 = 82 + 5,"table", "F", 31:32, % length of table % + 11 = 93 + 3,"one", "I", 54321:32, % + 9 = 102 + 3,"two", "S", 13:32, "A long string",% + 22 = 124 + 4,"byte", "b", 255:8, % + 7 = 131 + 4,"long", "l", 1234567890:64, % + 14 = 145 + 5,"short", "s", 655:16, % + 9 = 154 + 4,"bool", "t", 1, % + 7 = 161 + 6,"binary", "x", 15:32, "a binary string", % + 27 = 188 + 4,"void", "V", % + 6 = 194 + 5,"array", "A", 23:32, % + 11 = 205 + "I", 54321:32, % + 5 = 210 + "S", 13:32, "A long string" % + 18 = 228 + >>), + passed. + test_topic_match(P, R) -> test_topic_match(P, R, true). @@ -495,7 +544,7 @@ test_cluster_management() -> ok = control_action(cluster, ["invalid1@invalid", "invalid2@invalid"]), - SecondaryNode = rabbit_misc:localnode(hare), + SecondaryNode = rabbit_misc:makenode("hare"), case net_adm:ping(SecondaryNode) of pong -> passed = test_cluster_management2(SecondaryNode); pang -> io:format("Skipping clustering tests with node ~p~n", diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl new file mode 100644 index 00000000..8be28f52 --- /dev/null +++ b/src/vm_memory_monitor.erl @@ -0,0 +1,325 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +%% In practice Erlang shouldn't be allowed to grow to more than a half +%% of available memory. The pessimistic scenario is when the Erlang VM +%% has a single process that's consuming all memory. In such a case, +%% during garbage collection, Erlang tries to allocate a huge chunk of +%% continuous memory, which can result in a crash or heavy swapping. +%% +%% This module tries to warn Rabbit before such situations occur, so +%% that it has a higher chance to avoid running out of memory. +%% +%% This code depends on Erlang os_mon application. + +-module(vm_memory_monitor). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([update/0, get_total_memory/0, + get_check_interval/0, set_check_interval/1, + get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1]). + + +-define(SERVER, ?MODULE). +-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). + +%% For an unknown OS, we assume that we have 1GB of memory. It'll be +%% wrong. Scale by vm_memory_high_watermark in configuration to get a +%% sensible value. +-define(MEMORY_SIZE_FOR_UNKNOWN_OS, 1073741824). + +-record(state, {total_memory, + memory_limit, + timeout, + timer, + alarmed + }). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (float()) -> + ('ignore' | {'error', any()} | {'ok', pid()})). +-spec(update/0 :: () -> 'ok'). +-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). +-spec(get_check_interval/0 :: () -> non_neg_integer()). +-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). +-spec(get_vm_memory_high_watermark/0 :: () -> float()). +-spec(set_vm_memory_high_watermark/1 :: (float()) -> 'ok'). + +-endif. + + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +update() -> + gen_server:cast(?SERVER, update). + +get_total_memory() -> + get_total_memory(os:type()). + +get_check_interval() -> + gen_server:call(?MODULE, get_check_interval). + +set_check_interval(Fraction) -> + gen_server:call(?MODULE, {set_check_interval, Fraction}). + +get_vm_memory_high_watermark() -> + gen_server:call(?MODULE, get_vm_memory_high_watermark). + +set_vm_memory_high_watermark(Fraction) -> + gen_server:call(?MODULE, {set_vm_memory_high_watermark, Fraction}). + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). + +init([MemFraction]) -> + TotalMemory = + case get_total_memory() of + unknown -> + error_logger:warning_msg( + "Unknown total memory size for your OS ~p. " + "Assuming memory size is ~pMB.~n", + [os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]), + ?MEMORY_SIZE_FOR_UNKNOWN_OS; + M -> M + end, + MemLimit = get_mem_limit(MemFraction, TotalMemory), + error_logger:info_msg("Memory limit set to ~pMB.~n", + [trunc(MemLimit/1048576)]), + TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), + State = #state { total_memory = TotalMemory, + memory_limit = MemLimit, + timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, + timer = TRef, + alarmed = false}, + {ok, internal_update(State)}. + +handle_call(get_vm_memory_high_watermark, _From, State) -> + {reply, State#state.memory_limit / State#state.total_memory, State}; + +handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) -> + MemLimit = get_mem_limit(MemFraction, State#state.total_memory), + error_logger:info_msg("Memory alarm changed to ~p, ~p bytes.~n", + [MemFraction, MemLimit]), + {reply, ok, State#state{memory_limit = MemLimit}}; + +handle_call(get_check_interval, _From, State) -> + {reply, State#state.timeout, State}; + +handle_call({set_check_interval, Timeout}, _From, State) -> + {ok, cancel} = timer:cancel(State#state.timer), + {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(update, State) -> + {noreply, internal_update(State)}; + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- +%% Server Internals +%%---------------------------------------------------------------------------- + +internal_update(State = #state { memory_limit = MemLimit, + alarmed = Alarmed}) -> + MemUsed = erlang:memory(total), + NewAlarmed = MemUsed > MemLimit, + case {Alarmed, NewAlarmed} of + {false, true} -> + emit_update_info(set, MemUsed, MemLimit), + alarm_handler:set_alarm({vm_memory_high_watermark, []}); + {true, false} -> + emit_update_info(clear, MemUsed, MemLimit), + alarm_handler:clear_alarm(vm_memory_high_watermark); + _ -> + ok + end, + State #state {alarmed = NewAlarmed}. + +emit_update_info(State, MemUsed, MemLimit) -> + error_logger:info_msg( + "vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n", + [State, MemUsed, MemLimit]). + +start_timer(Timeout) -> + {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), + TRef. + +%% On a 32-bit machine, if you're using more than 2 gigs of RAM you're +%% in big trouble anyway. +get_vm_limit() -> + case erlang:system_info(wordsize) of + 4 -> 4294967296; %% 4 GB for 32 bits 2^32 + 8 -> 281474976710656 %% 256 TB for 64 bits 2^48 + %%http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details + end. + +get_mem_limit(MemFraction, TotalMemory) -> + lists:min([trunc(TotalMemory * MemFraction), get_vm_limit()]). + +%%---------------------------------------------------------------------------- +%% Internal Helpers +%%---------------------------------------------------------------------------- +cmd(Command) -> + Exec = hd(string:tokens(Command, " ")), + case os:find_executable(Exec) of + false -> throw({command_not_found, Exec}); + _ -> os:cmd(Command) + end. + +%% get_total_memory(OS) -> Total +%% Windows and Freebsd code based on: memsup:get_memory_usage/1 +%% Original code was part of OTP and released under "Erlang Public License". + +get_total_memory({unix,darwin}) -> + File = cmd("/usr/bin/vm_stat"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(lists:map(fun parse_line_mach/1, Lines)), + [PageSize, Inactive, Active, Free, Wired] = + [dict:fetch(Key, Dict) || + Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free', + 'Pages wired down']], + PageSize * (Inactive + Active + Free + Wired); + +get_total_memory({unix,freebsd}) -> + PageSize = freebsd_sysctl("vm.stats.vm.v_page_size"), + PageCount = freebsd_sysctl("vm.stats.vm.v_page_count"), + PageCount * PageSize; + +get_total_memory({win32,_OSname}) -> + [Result|_] = os_mon_sysinfo:get_mem_info(), + {ok, [_MemLoad, TotPhys, _AvailPhys, + _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} = + io_lib:fread("~d~d~d~d~d~d~d", Result), + TotPhys; + +get_total_memory({unix, linux}) -> + File = read_proc_file("/proc/meminfo"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(lists:map(fun parse_line_linux/1, Lines)), + dict:fetch('MemTotal', Dict); + +get_total_memory({unix, sunos}) -> + File = cmd("/usr/sbin/prtconf"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(lists:map(fun parse_line_sunos/1, Lines)), + dict:fetch('Memory size', Dict); + +get_total_memory(_OsType) -> + unknown. + +%% A line looks like "Foo bar: 123456." +parse_line_mach(Line) -> + [Name, RHS | _Rest] = string:tokens(Line, ":"), + case Name of + "Mach Virtual Memory Statistics" -> + ["(page", "size", "of", PageSize, "bytes)"] = + string:tokens(RHS, " "), + {page_size, list_to_integer(PageSize)}; + _ -> + [Value | _Rest1] = string:tokens(RHS, " ."), + {list_to_atom(Name), list_to_integer(Value)} + end. + +%% A line looks like "FooBar: 123456 kB" +parse_line_linux(Line) -> + [Name, RHS | _Rest] = string:tokens(Line, ":"), + [Value | UnitsRest] = string:tokens(RHS, " "), + Value1 = case UnitsRest of + [] -> list_to_integer(Value); %% no units + ["kB"] -> list_to_integer(Value) * 1024 + end, + {list_to_atom(Name), Value1}. + +%% A line looks like "Memory size: 1024 Megabytes" +parse_line_sunos(Line) -> + case string:tokens(Line, ":") of + [Name, RHS | _Rest] -> + [Value1 | UnitsRest] = string:tokens(RHS, " "), + Value2 = case UnitsRest of + ["Gigabytes"] -> + list_to_integer(Value1) * 1024 * 1024 * 1024; + ["Megabytes"] -> + list_to_integer(Value1) * 1024 * 1024; + ["Kilobytes"] -> + list_to_integer(Value1) * 1024; + _ -> + Value1 ++ UnitsRest %% no known units + end, + {list_to_atom(Name), Value2}; + [Name] -> {list_to_atom(Name), none} + end. + +freebsd_sysctl(Def) -> + list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n"). + +%% file:read_file does not work on files in /proc as it seems to get +%% the size of the file first and then read that many bytes. But files +%% in /proc always have length 0, we just have to read until we get +%% eof. +read_proc_file(File) -> + {ok, IoDevice} = file:open(File, [read, raw]), + Res = read_proc_file(IoDevice, []), + file:close(IoDevice), + lists:flatten(lists:reverse(Res)). + +-define(BUFFER_SIZE, 1024). +read_proc_file(IoDevice, Acc) -> + case file:read(IoDevice, ?BUFFER_SIZE) of + {ok, Res} -> read_proc_file(IoDevice, [Res | Acc]); + eof -> Acc + end. |