diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 08:54:07 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 08:54:07 +0100 |
commit | 501d0b0ea52e18224d7e0003604893ec1826a540 (patch) | |
tree | 009579ebf471a91828014500266c538b9480470a | |
parent | 36acaf540745ac666fa3b5b95785a05f8dd3df44 (diff) | |
parent | 0b7c17f7241cca7558d79f2db20c57a0f338db4b (diff) | |
download | rabbitmq-server-501d0b0ea52e18224d7e0003604893ec1826a540.tar.gz |
merge bug24298 into default
65 files changed, 1450 insertions, 963 deletions
@@ -121,7 +121,7 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_8) $@ dialyze: $(BEAM_TARGETS) $(BASIC_PLT) - dialyzer --plt $(BASIC_PLT) --no_native \ + dialyzer --plt $(BASIC_PLT) --no_native --fullpath \ -Wrace_conditions $(BEAM_TARGETS) # rabbit.plt is used by rabbitmq-erlang-client's dialyze make target @@ -173,6 +173,12 @@ run-node: all RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ ./scripts/rabbitmq-server +run-background-node: all + $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ + RABBITMQ_NODE_ONLY=true \ + RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ + ./scripts/rabbitmq-server + run-tests: all OUT=$$(echo "rabbit_tests:all_tests()." | $(ERL_CALL)) ; \ echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null @@ -180,14 +186,15 @@ run-tests: all run-qc: all $(foreach MOD,$(QC_MODULES),./quickcheck $(RABBITMQ_NODENAME) $(MOD) $(QC_TRIALS)) -start-background-node: - $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ - RABBITMQ_NODE_ONLY=true \ - RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ - ./scripts/rabbitmq-server; sleep 1 +start-background-node: all + -rm -f $(RABBITMQ_MNESIA_DIR).pid + mkdir -p $(RABBITMQ_MNESIA_DIR) + setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" & + sleep 1 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) + ./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) @@ -371,6 +371,8 @@ def genErl(spec): classIds.add(m.klass.index) print prettyType("amqp_class_id()", ["%i" % ci for ci in classIds]) + print prettyType("amqp_class_name()", + ["%s" % c.erlangName() for c in spec.allClasses()]) print "-endif. % use_specs" print """ @@ -378,6 +380,7 @@ def genErl(spec): -ifdef(use_specs). -spec(version/0 :: () -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()). +-spec(lookup_class_name/1 :: (amqp_class_id()) -> amqp_class_name()). -spec(method_id/1 :: (amqp_method_name()) -> amqp_method()). -spec(method_has_content/1 :: (amqp_method_name()) -> boolean()). -spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()). diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl index a0a74178..4db1d5c4 100644 --- a/docs/examples-to-end.xsl +++ b/docs/examples-to-end.xsl @@ -2,7 +2,10 @@ <xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version='1.0'> -<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" /> +<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" + doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" + indent="yes" +/> <!-- Don't copy examples through in place --> <xsl:template match="*[@role='example-prefix']"/> @@ -27,7 +30,7 @@ <varlistentry> <term><command><xsl:copy-of select="text()"/></command></term> <listitem> - <xsl:copy-of select="following-sibling::para[@role='example']"/> + <xsl:copy-of select="following-sibling::para[@role='example' and preceding-sibling::screen[1] = current()]"/> </listitem> </varlistentry> </xsl:for-each> diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index 4bfcf6ca..88aa2e78 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -6,7 +6,7 @@ <xsl:param name="original"/> -<xsl:output method="xml" doctype-public="bug in xslt processor requires fake doctype" doctype-system="otherwise css isn't included" /> +<xsl:output method="xml" /> <xsl:template match="*"/> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index ba87c836..1f8cf28e 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -163,20 +163,28 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>wait</command></cmdsynopsis></term> + <term><cmdsynopsis><command>wait</command> <arg choice="req"><replaceable>pid_file</replaceable></arg></cmdsynopsis></term> <listitem> <para> Wait for the RabbitMQ application to start. </para> <para> This command will wait for the RabbitMQ application to - start at the node. As long as the Erlang node is up but - the RabbitMQ application is down it will wait - indefinitely. If the node itself goes down, or takes - more than five seconds to come up, it will fail. + start at the node. It will wait for the pid file to + be created, then for a process with a pid specified in the + pid file to start, and then for the RabbitMQ application + to start in that process. It will fail if the process + terminates without starting the RabbitMQ application. + </para> + <para> + A suitable pid file is created by + the <command>rabbitmq-server</command> script. By + default this is located in the Mnesia directory. Modify + the <command>RABBITMQ_PID_FILE</command> environment + variable to change the location. </para> <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl wait</screen> + <screen role="example">rabbitmqctl wait /var/run/rabbitmq/pid</screen> <para role="example"> This command will return when the RabbitMQ node has started up. @@ -243,29 +251,28 @@ Instruct the RabbitMQ node to rotate the log files. </para> <para> - The RabbitMQ broker will attempt to append the current contents - of the log file to the file with name composed of the original - name and the suffix. - It will create a new file if such a file does not already exist. - When no <option>suffix</option> is specified, the empty log file is - simply created at the original location; no rotation takes place. - </para> - <para> - When an error occurs while appending the contents of the old log - file, the operation behaves in the same way as if no <option>suffix</option> was - specified. + The RabbitMQ broker appends the contents of its log + files to files with names composed of the original name + and the suffix, and then resumes logging to freshly + created files at the original location. I.e. effectively + the current log contents are moved to the end of the + suffixed files. </para> <para> - This command might be helpful when you are e.g. writing your - own logrotate script and you do not want to restart the RabbitMQ - node. + When the target files do not exist they are created. + target files do not already exist. When + no <option>suffix</option> is specified, the empty log + files are simply created at the original location; no + rotation takes place. </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl rotate_logs .1</screen> <para role="example"> - This command instructs the RabbitMQ node to append the current content - of the log files to the files with names consisting of the original logs' - names and ".1" suffix, e.g. rabbit.log.1. Finally, the old log files are reopened. + This command instructs the RabbitMQ node to append the contents + of the log files to files with names consisting of the original logs' + names and ".1" suffix, e.g. rabbit@mymachine.log.1 and + rabbit@mymachine-sasl.log.1. Finally, logging resumes to + fresh files at the old locations. </para> </listitem> </varlistentry> @@ -548,13 +555,15 @@ <varlistentry> <term><cmdsynopsis><command>list_users</command></cmdsynopsis></term> <listitem> - <para>Lists users</para> + <para> + Lists users. Each result row will contain the user name + followed by a list of the tags set for that user. + </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl list_users</screen> <para role="example"> This command instructs the RabbitMQ broker to list all - users. Each result row will contain the user name and - the administrator status of the user, in that order. + users. </para> </listitem> </varlistentry> diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index ee102f5e..20fe4234 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -26,6 +26,7 @@ fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). +-type(duration() :: ('undefined' | 'infinity' | number())). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -55,8 +56,8 @@ -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). -spec(set_ram_duration_target/2 :: - (('undefined' | 'infinity' | number()), state()) -> state()). --spec(ram_duration/1 :: (state()) -> {number(), state()}). + (duration(), state()) -> state()). +-spec(ram_duration/1 :: (state()) -> {duration(), state()}). -spec(needs_timeout/1 :: (state()) -> 'false' | 'timed' | 'idle'). -spec(timeout/1 :: (state()) -> state()). -spec(handle_pre_hibernate/1 :: (state()) -> state()). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 11f5f01c..0c5aa96a 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -120,6 +120,12 @@ done rm -rf %{buildroot} %changelog +* Fri Sep 9 2011 tim@rabbitmq.com 2.6.1-1 +- New Upstream Release + +* Fri Aug 26 2011 tim@rabbitmq.com 2.6.0-1 +- New Upstream Release + * Mon Jun 27 2011 simon@rabbitmq.com 2.5.1-1 - New Upstream Release diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index d8a7a94d..15fd5d5b 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -24,6 +24,7 @@ DESC=rabbitmq-server USER=rabbitmq ROTATE_SUFFIX= INIT_LOG_DIR=/var/log/rabbitmq +PID_FILE=/var/run/rabbitmq/pid LOCK_FILE= # This is filled in when building packages @@ -33,16 +34,31 @@ test -x $CONTROL || exit 0 RETVAL=0 set -e +ensure_pid_dir () { + PID_DIR=`dirname ${PID_FILE}` + if [ ! -d ${PID_DIR} ] ; then + mkdir -p ${PID_DIR} + chown -R ${USER}:${USER} ${PID_DIR} + chmod 755 ${PID_DIR} + fi +} + +remove_pid () { + rm -f ${PID_FILE} + rmdir `dirname ${PID_FILE}` || : +} + start_rabbitmq () { status_rabbitmq quiet if [ $RETVAL = 0 ] ; then echo RabbitMQ is currently running else RETVAL=0 + ensure_pid_dir set +e - setsid sh -c "$DAEMON > ${INIT_LOG_DIR}/startup_log \ - 2> ${INIT_LOG_DIR}/startup_err" & - $CONTROL wait >/dev/null 2>&1 + setsid sh -c "RABBITMQ_PID_FILE=$PID_FILE $DAEMON > \ + ${INIT_LOG_DIR}/startup_log 2> ${INIT_LOG_DIR}/startup_err" & + $CONTROL wait $PID_FILE >/dev/null 2>&1 RETVAL=$? set -e case "$RETVAL" in @@ -53,6 +69,7 @@ start_rabbitmq () { fi ;; *) + remove_pid echo FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\} RETVAL=1 ;; @@ -68,6 +85,7 @@ stop_rabbitmq () { RETVAL=$? set -e if [ $RETVAL = 0 ] ; then + remove_pid if [ -n "$LOCK_FILE" ] ; then rm -f $LOCK_FILE fi diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf index d58c48ed..e6776eff 100755 --- a/packaging/common/rabbitmq-server.ocf +++ b/packaging/common/rabbitmq-server.ocf @@ -29,6 +29,7 @@ ## OCF_RESKEY_log_base ## OCF_RESKEY_mnesia_base ## OCF_RESKEY_server_start_args +## OCF_RESKEY_pid_file ####################################################################### # Initialization: @@ -42,10 +43,12 @@ OCF_RESKEY_server_default="/usr/sbin/rabbitmq-server" OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl" OCF_RESKEY_nodename_default="rabbit@localhost" OCF_RESKEY_log_base_default="/var/log/rabbitmq" +OCF_RESKEY_pid_file_default="/var/run/rabbitmq/pid" : ${OCF_RESKEY_server=${OCF_RESKEY_server_default}} : ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}} : ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}} : ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}} +: ${OCF_RESKEY_pid_file=${OCF_RESKEY_pid_file_default}} meta_data() { cat <<END @@ -133,6 +136,14 @@ Additional arguments provided to the server on startup <content type="string" default="" /> </parameter> +<parameter name="pid_file" unique="0" required="0"> +<longdesc lang="en"> +Location of the file in which the pid will be stored +</longdesc> +<shortdesc lang="en">Pid file path</shortdesc> +<content type="string" default="${OCF_RESKEY_pid_file_default}" /> +</parameter> + </parameters> <actions> @@ -164,9 +175,25 @@ RABBITMQ_CONFIG_FILE=$OCF_RESKEY_config_file RABBITMQ_LOG_BASE=$OCF_RESKEY_log_base RABBITMQ_MNESIA_BASE=$OCF_RESKEY_mnesia_base RABBITMQ_SERVER_START_ARGS=$OCF_RESKEY_server_start_args +RABBITMQ_PID_FILE=$OCF_RESKEY_pid_file [ ! -z $RABBITMQ_NODENAME ] && NODENAME_ARG="-n $RABBITMQ_NODENAME" [ ! -z $RABBITMQ_NODENAME ] && export RABBITMQ_NODENAME +ensure_pid_dir () { + PID_DIR=`dirname ${RABBITMQ_PID_FILE}` + if [ ! -d ${PID_DIR} ] ; then + mkdir -p ${PID_DIR} + chown -R rabbitmq:rabbitmq ${PID_DIR} + chmod 755 ${PID_DIR} + fi + return $OCF_SUCCESS +} + +remove_pid () { + rm -f ${RABBITMQ_PID_FILE} + rmdir `dirname ${RABBITMQ_PID_FILE}` || : +} + export_vars() { [ ! -z $RABBITMQ_NODE_IP_ADDRESS ] && export RABBITMQ_NODE_IP_ADDRESS [ ! -z $RABBITMQ_NODE_PORT ] && export RABBITMQ_NODE_PORT @@ -174,6 +201,7 @@ export_vars() { [ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE [ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE [ ! -z $RABBITMQ_SERVER_START_ARGS ] && export RABBITMQ_SERVER_START_ARGS + [ ! -z $RABBITMQ_PID_FILE ] && ensure_pid_dir && export RABBITMQ_PID_FILE } rabbit_validate_partial() { @@ -214,13 +242,13 @@ rabbit_status() { } rabbit_wait() { - rabbitmqctl_action "wait" + rabbitmqctl_action "wait" $1 } rabbitmqctl_action() { local rc local action - action=$1 + action=$@ $RABBITMQ_CTL $NODENAME_ARG $action > /dev/null 2> /dev/null rc=$? case "$rc" in @@ -252,9 +280,10 @@ rabbit_start() { # Wait for the server to come up. # Let the CRM/LRM time us out if required - rabbit_wait + rabbit_wait $RABBITMQ_PID_FILE rc=$? if [ "$rc" != $OCF_SUCCESS ]; then + remove_pid ocf_log info "rabbitmq-server start failed: $rc" exit $OCF_ERR_GENERIC fi @@ -285,6 +314,7 @@ rabbit_stop() { rabbit_status rc=$? if [ "$rc" = $OCF_NOT_RUNNING ]; then + remove_pid stop_wait=0 break elif [ "$rc" != $OCF_SUCCESS ]; then diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 9063a6ed..8f526544 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,15 @@ +rabbitmq-server (2.6.1-1) natty; urgency=low + + * New Upstream Release + + -- Tim <tim@rabbitmq.com> Fri, 09 Sep 2011 14:38:45 +0100 + +rabbitmq-server (2.6.0-1) natty; urgency=low + + * New Upstream Release + + -- Tim <tim@rabbitmq.com> Fri, 26 Aug 2011 16:29:40 +0100 + rabbitmq-server (2.5.1-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in index c4aeeebe..baf081fc 100644 --- a/packaging/debs/Debian/debian/postrm.in +++ b/packaging/debs/Debian/debian/postrm.in @@ -32,9 +32,6 @@ case "$1" in if [ -d /var/log/rabbitmq ]; then rm -r /var/log/rabbitmq fi - if [ -d /var/run/rabbitmq ]; then - rm -r /var/run/rabbitmq - fi if [ -d /etc/rabbitmq ]; then rm -r /etc/rabbitmq fi diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 2f80eb96..deca5b30 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -47,6 +47,7 @@ fi [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} +[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand @@ -58,15 +59,13 @@ fi [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log" [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS} [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log" -[ "x" = "x$RABBITMQ_BACKUP_EXTENSION" ] && RABBITMQ_BACKUP_EXTENSION=${BACKUP_EXTENSION} -[ "x" = "x$RABBITMQ_BACKUP_EXTENSION" ] && RABBITMQ_BACKUP_EXTENSION=".1" - -[ -f "${RABBITMQ_LOGS}" ] && cat "${RABBITMQ_LOGS}" >> "${RABBITMQ_LOGS}${RABBITMQ_BACKUP_EXTENSION}" -[ -f "${RABBITMQ_SASL_LOGS}" ] && cat "${RABBITMQ_SASL_LOGS}" >> "${RABBITMQ_SASL_LOGS}${RABBITMQ_BACKUP_EXTENSION}" RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput' +mkdir -p $(dirname ${RABBITMQ_PID_FILE}) +echo $$ > ${RABBITMQ_PID_FILE} + RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then if erl \ @@ -107,8 +106,9 @@ exec erl \ ${RABBITMQ_SERVER_ERL_ARGS} \ ${RABBITMQ_LISTEN_ARG} \ -sasl errlog_type error \ - -kernel error_logger '{file,"'${RABBITMQ_LOGS}'"}' \ - -sasl sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \ + -sasl sasl_error_logger false \ + -rabbit error_logger '{file,"'${RABBITMQ_LOGS}'"}' \ + -rabbit sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \ -os_mon start_cpu_sup true \ -os_mon start_disksup false \ -os_mon start_memsup false \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 5e2097db..56bed435 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -70,18 +70,9 @@ if "!RABBITMQ_LOG_BASE!"=="" ( rem We save the previous logs in their respective backup
rem Log management (rotation, filtering based of size...) is left as an exercice for the user.
-set BACKUP_EXTENSION=.1
-
set LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!.log
set SASL_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!-sasl.log
-if exist "!LOGS!" (
- type "!LOGS!" >> "!LOGS!!BACKUP_EXTENSION!"
-)
-if exist "!SASL_LOGS!" (
- type "!SASL_LOGS!" >> "!SASL_LOGS!!BACKUP_EXTENSION!"
-)
-
rem End of log management
@@ -135,16 +126,16 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -boot "!RABBITMQ_BOOT_FILE!" ^
!RABBITMQ_CONFIG_ARG! ^
-sname !RABBITMQ_NODENAME! ^
--s rabbit ^
+W w ^
+A30 ^
+P 1048576 ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
--kernel error_logger {file,\""!LOGS:\=/!"\"} ^
!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
--sasl sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-sasl sasl_error_logger false ^
+-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
+-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index b2aa4f58..26c6ea65 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -103,18 +103,9 @@ if "!RABBITMQ_LOG_BASE!"=="" ( rem We save the previous logs in their respective backup
rem Log management (rotation, filtering based on size...) is left as an exercise for the user.
-set BACKUP_EXTENSION=.1
-
set LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!.log
set SASL_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!-sasl.log
-if exist "!LOGS!" (
- type "!LOGS!" >> "!LOGS!!BACKUP_EXTENSION!"
-)
-if exist "!SASL_LOGS!" (
- type "!SASL_LOGS!" >> "!SASL_LOGS!!BACKUP_EXTENSION!"
-)
-
rem End of log management
@@ -201,15 +192,16 @@ set ERLANG_SERVICE_ARGUMENTS= ^ !RABBITMQ_EBIN_PATH! ^
-boot "!RABBITMQ_BOOT_FILE!" ^
!RABBITMQ_CONFIG_ARG! ^
--s rabbit ^
+W w ^
+A30 ^
++P 1048576 ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
!RABBITMQ_LISTEN_ARG! ^
--kernel error_logger {file,\""!LOGS:\=/!"\"} ^
!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
--sasl sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-sasl sasl_error_logger false ^
+-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
+-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/src/delegate.erl b/src/delegate.erl index 17046201..edb4eba4 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -28,13 +28,13 @@ -ifdef(use_specs). -spec(start_link/1 :: - (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). --spec(invoke_no_result/2 :: - (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). + (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). -spec(invoke/2 :: ( pid(), fun ((pid()) -> A)) -> A; ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], [{pid(), term()}]}). +-spec(invoke_no_result/2 :: + (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). -endif. diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index fc693c7d..4c131a6c 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -28,7 +28,7 @@ -ifdef(use_specs). --spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: (integer()) -> rabbit_types:ok_pid_or_error()). -spec(count/1 :: ([node()]) -> integer()). -endif. diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 3c2111dc..6c3f1b5f 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -120,37 +120,39 @@ %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. %% -%% The server also supports obtain and transfer. obtain/0 blocks until -%% a file descriptor is available, at which point the requesting -%% process is considered to 'own' one more descriptor. transfer/1 -%% transfers ownership of a file descriptor between processes. It is -%% non-blocking. Obtain is used to obtain permission to accept file -%% descriptors. Obtain has a lower limit, set by the ?OBTAIN_LIMIT/1 -%% macro. File handles can use the entire limit, but will be evicted -%% by obtain calls up to the point at which no more obtain calls can -%% be satisfied by the obtains limit. Thus there will always be some -%% capacity available for file handles. Processes that use obtain are -%% never asked to return them, and they are not managed in any way by -%% the server. It is simply a mechanism to ensure that processes that -%% need file descriptors such as sockets can do so in such a way that -%% the overall number of open file descriptors is managed. +%% The server also supports obtain, release and transfer. obtain/0 +%% blocks until a file descriptor is available, at which point the +%% requesting process is considered to 'own' one more +%% descriptor. release/0 is the inverse operation and releases a +%% previously obtained descriptor. transfer/1 transfers ownership of a +%% file descriptor between processes. It is non-blocking. Obtain is +%% used to obtain permission to accept file descriptors. Obtain has a +%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use +%% the entire limit, but will be evicted by obtain calls up to the +%% point at which no more obtain calls can be satisfied by the obtains +%% limit. Thus there will always be some capacity available for file +%% handles. Processes that use obtain are never asked to return them, +%% and they are not managed in any way by the server. It is simply a +%% mechanism to ensure that processes that need file descriptors such +%% as sockets can do so in such a way that the overall number of open +%% file descriptors is managed. %% %% The callers of register_callback/3, obtain/0, and the argument of %% transfer/1 are monitored, reducing the count of handles in use %% appropriately when the processes terminate. --behaviour(gen_server). +-behaviour(gen_server2). -export([register_callback/3]). -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, - info/1]). +-export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, + info/0, info/1]). -export([ulimit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_cast/2]). -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). @@ -159,7 +161,8 @@ -define(FILE_HANDLES_CHECK_INTERVAL, 2000). -define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)). --define(CLIENT_ETS_TABLE, ?MODULE). +-define(CLIENT_ETS_TABLE, file_handle_cache_client). +-define(ELDERS_ETS_TABLE, file_handle_cache_elders). %%---------------------------------------------------------------------------- @@ -228,7 +231,7 @@ -spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok'). -spec(open/3 :: - (string(), [any()], + (file:filename(), [any()], [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) -> val_or_error(ref())). -spec(close/1 :: (ref()) -> ok_or_error()). @@ -243,17 +246,18 @@ -spec(flush/1 :: (ref()) -> ok_or_error()). -spec(copy/3 :: (ref(), ref(), non_neg_integer()) -> val_or_error(non_neg_integer())). --spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). +-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(obtain/0 :: () -> 'ok'). +-spec(release/0 :: () -> 'ok'). -spec(transfer/1 :: (pid()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). --spec(info_keys/0 :: () -> [atom()]). --spec(info/0 :: () -> [{atom(), any()}]). --spec(info/1 :: ([atom()]) -> [{atom(), any()}]). --spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(info/0 :: () -> rabbit_types:infos()). +-spec(info/1 :: ([atom()]) -> rabbit_types:infos()). +-spec(ulimit/0 :: () -> 'unknown' | non_neg_integer()). -endif. @@ -265,11 +269,11 @@ %%---------------------------------------------------------------------------- start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). register_callback(M, F, A) when is_atom(M) andalso is_atom(F) andalso is_list(A) -> - gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}). + gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}). open(Path, Mode, Options) -> Path1 = filename:absname(Path), @@ -317,7 +321,7 @@ read(Ref, Count) -> fun ([#handle { is_read = false }]) -> {error, not_open_for_reading}; ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> - case file:read(Hdl, Count) of + case prim_file:read(Hdl, Count) of {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), {Obj, [Handle #handle { offset = Offset1 }]}; @@ -337,7 +341,7 @@ append(Ref, Data) -> write_buffer_size_limit = 0, at_eof = true } = Handle1} -> Offset1 = Offset + iolist_size(Data), - {file:write(Hdl, Data), + {prim_file:write(Hdl, Data), [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; {{ok, _Offset}, #handle { write_buffer = WriteBuffer, write_buffer_size = Size, @@ -364,7 +368,7 @@ sync(Ref) -> ok; ([Handle = #handle { hdl = Hdl, is_dirty = true, write_buffer = [] }]) -> - case file:sync(Hdl) of + case prim_file:sync(Hdl) of ok -> {ok, [Handle #handle { is_dirty = false }]}; Error -> {Error, [Handle]} end @@ -381,7 +385,7 @@ truncate(Ref) -> with_flushed_handles( [Ref], fun ([Handle1 = #handle { hdl = Hdl }]) -> - case file:truncate(Hdl) of + case prim_file:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end @@ -408,7 +412,7 @@ copy(Src, Dest, Count) -> fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset }, DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }] ) -> - case file:copy(SHdl, DHdl, Count) of + case prim_file:copy(SHdl, DHdl, Count) of {ok, Count1} = Result1 -> {Result1, [SHandle #handle { offset = SOffset + Count1 }, @@ -428,7 +432,7 @@ delete(Ref) -> Handle = #handle { path = Path } -> case hard_close(Handle #handle { is_dirty = false, write_buffer = [] }) of - ok -> file:delete(Path); + ok -> prim_file:delete(Path); {Error, Handle1} -> put_handle(Ref, Handle1), Error end @@ -443,7 +447,7 @@ clear(Ref) -> case maybe_seek(bof, Handle #handle { write_buffer = [], write_buffer_size = 0 }) of {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> - case file:truncate(Hdl) of + case prim_file:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end; @@ -470,21 +474,28 @@ set_maximum_since_use(MaximumAge) -> end. obtain() -> - gen_server:call(?SERVER, {obtain, self()}, infinity). + %% If the FHC isn't running, obtains succeed immediately. + case whereis(?SERVER) of + undefined -> ok; + _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity) + end. + +release() -> + gen_server2:cast(?SERVER, {release, self()}). transfer(Pid) -> - gen_server:cast(?SERVER, {transfer, self(), Pid}). + gen_server2:cast(?SERVER, {transfer, self(), Pid}). set_limit(Limit) -> - gen_server:call(?SERVER, {set_limit, Limit}, infinity). + gen_server2:call(?SERVER, {set_limit, Limit}, infinity). get_limit() -> - gen_server:call(?SERVER, get_limit, infinity). + gen_server2:call(?SERVER, get_limit, infinity). info_keys() -> ?INFO_KEYS. info() -> info(?INFO_KEYS). -info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity). +info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity). %%---------------------------------------------------------------------------- %% Internal functions @@ -538,8 +549,8 @@ get_or_reopen(RefNewOrReopens) -> {ok, [Handle || {_Ref, Handle} <- OpenHdls]}; {OpenHdls, ClosedHdls} -> Oldest = oldest(get_age_tree(), fun () -> now() end), - case gen_server:call(?SERVER, {open, self(), length(ClosedHdls), - Oldest}, infinity) of + case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls), + Oldest}, infinity) of ok -> case reopen(ClosedHdls) of {ok, RefHdls} -> sort_handles(RefNewOrReopens, @@ -566,10 +577,10 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, offset = Offset, last_used_at = undefined }} | RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> - case file:open(Path, case NewOrReopen of - new -> Mode; - reopen -> [read | Mode] - end) of + case prim_file:open(Path, case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end) of {ok, Hdl} -> Now = now(), {{ok, _Offset}, Handle1} = @@ -582,7 +593,7 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, Error -> %% NB: none of the handles in ToOpen are in the age tree Oldest = oldest(Tree, fun () -> undefined end), - [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen], + [gen_server2:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen], put_age_tree(Tree), Error end. @@ -631,7 +642,7 @@ age_tree_delete(Then) -> fun (Tree) -> Tree1 = gb_trees:delete_any(Then, Tree), Oldest = oldest(Tree1, fun () -> undefined end), - gen_server:cast(?SERVER, {close, self(), Oldest}), + gen_server2:cast(?SERVER, {close, self(), Oldest}), Tree1 end). @@ -641,7 +652,7 @@ age_tree_change() -> case gb_trees:is_empty(Tree) of true -> Tree; false -> {Oldest, _Ref} = gb_trees:smallest(Tree), - gen_server:cast(?SERVER, {update, self(), Oldest}) + gen_server2:cast(?SERVER, {update, self(), Oldest}) end, Tree end). @@ -693,10 +704,10 @@ soft_close(Handle) -> is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of - true -> file:sync(Hdl); + true -> prim_file:sync(Hdl); false -> ok end, - ok = file:close(Hdl), + ok = prim_file:close(Hdl), age_tree_delete(Then), {ok, Handle1 #handle { hdl = closed, is_dirty = false, @@ -731,7 +742,7 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, at_eof = AtEoF }) -> {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), case (case NeedsSeek of - true -> file:position(Hdl, NewOffset); + true -> prim_file:position(Hdl, NewOffset); false -> {ok, Offset} end) of {ok, Offset1} = Result -> @@ -768,7 +779,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, write_buffer = WriteBuffer, write_buffer_size = DataSize, at_eof = true }) -> - case file:write(Hdl, lists:reverse(WriteBuffer)) of + case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of ok -> Offset1 = Offset + DataSize, {ok, Handle #handle { offset = Offset1, is_dirty = true, @@ -784,7 +795,7 @@ i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit; i(Item, _) -> throw({bad_argument, Item}). %%---------------------------------------------------------------------------- -%% gen_server callbacks +%% gen_server2 callbacks %%---------------------------------------------------------------------------- init([]) -> @@ -794,7 +805,6 @@ init([]) -> Watermark; _ -> case ulimit() of - infinity -> infinity; unknown -> ?FILE_HANDLES_LIMIT_OTHER; Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS]) end @@ -803,7 +813,8 @@ init([]) -> error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", [Limit, ObtainLimit]), Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), - {ok, #fhc_state { elders = dict:new(), + Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]), + {ok, #fhc_state { elders = Elders, limit = Limit, open_count = 0, open_pending = pending_new(), @@ -813,34 +824,39 @@ init([]) -> clients = Clients, timer_ref = undefined }}. +prioritise_cast(Msg, _State) -> + case Msg of + {release, _} -> 5; + _ -> 0 + end. + handle_call({open, Pid, Requested, EldestUnusedSince}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders, clients = Clients }) when EldestUnusedSince =/= undefined -> - Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + true = ets:insert(Elders, {Pid, EldestUnusedSince}), Item = #pending { kind = open, pid = Pid, requested = Requested, from = From }, ok = track_client(Pid, Clients), - State1 = State #fhc_state { elders = Elders1 }, - case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of + case needs_reduce(State #fhc_state { open_count = Count + Requested }) of true -> case ets:lookup(Clients, Pid) of [#cstate { opened = 0 }] -> true = ets:update_element( Clients, Pid, {#cstate.blocked, true}), {noreply, - reduce(State1 #fhc_state { + reduce(State #fhc_state { open_pending = pending_in(Item, Pending) })}; [#cstate { opened = Opened }] -> true = ets:update_element( Clients, Pid, {#cstate.pending_closes, Opened}), - {reply, close, State1} + {reply, close, State} end; - false -> {noreply, run_pending_item(Item, State1)} + false -> {noreply, run_pending_item(Item, State)} end; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, @@ -888,21 +904,24 @@ handle_cast({register_callback, Pid, MFA}, handle_cast({update, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders }) when EldestUnusedSince =/= undefined -> - Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + true = ets:insert(Elders, {Pid, EldestUnusedSince}), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages - {noreply, State #fhc_state { elders = Elders1 }}; + {noreply, State}; + +handle_cast({release, Pid}, State) -> + {noreply, adjust_alarm(State, process_pending( + update_counts(obtain, Pid, -1, State)))}; handle_cast({close, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, clients = Clients }) -> - Elders1 = case EldestUnusedSince of - undefined -> dict:erase(Pid, Elders); - _ -> dict:store(Pid, EldestUnusedSince, Elders) - end, + true = case EldestUnusedSince of + undefined -> ets:delete(Elders, Pid); + _ -> ets:insert(Elders, {Pid, EldestUnusedSince}) + end, ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), {noreply, adjust_alarm(State, process_pending( - update_counts(open, Pid, -1, - State #fhc_state { elders = Elders1 })))}; + update_counts(open, Pid, -1, State)))}; handle_cast({transfer, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), @@ -923,6 +942,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, [#cstate { opened = Opened, obtained = Obtained }] = ets:lookup(Clients, Pid), true = ets:delete(Clients, Pid), + true = ets:delete(Elders, Pid), FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, {noreply, adjust_alarm( State, @@ -931,11 +951,12 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, open_count = OpenCount - Opened, open_pending = filter_pending(FilterFun, OpenPending), obtain_count = ObtainCount - Obtained, - obtain_pending = filter_pending(FilterFun, ObtainPending), - elders = dict:erase(Pid, Elders) }))}. + obtain_pending = filter_pending(FilterFun, ObtainPending) }))}. -terminate(_Reason, State = #fhc_state { clients = Clients }) -> +terminate(_Reason, State = #fhc_state { clients = Clients, + elders = Elders }) -> ets:delete(Clients), + ets:delete(Elders), State. code_change(_OldVsn, State, _Extra) -> @@ -1047,7 +1068,7 @@ run_pending_item(#pending { kind = Kind, requested = Requested, from = From }, State = #fhc_state { clients = Clients }) -> - gen_server:reply(From, ok), + gen_server2:reply(From, ok), true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), update_counts(Kind, Pid, Requested, State). @@ -1091,7 +1112,7 @@ reduce(State = #fhc_state { open_pending = OpenPending, timer_ref = TRef }) -> Now = now(), {CStates, Sum, ClientCount} = - dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) -> + ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) -> [#cstate { pending_closes = PendingCloses, opened = Opened, blocked = Blocked } = CState] = diff --git a/src/gatherer.erl b/src/gatherer.erl index aa43e9a9..fe976b50 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -27,7 +27,7 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(stop/1 :: (pid()) -> 'ok'). -spec(fork/1 :: (pid()) -> 'ok'). -spec(finish/1 :: (pid()) -> 'ok'). diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 35258139..ab6c4e64 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -616,11 +616,11 @@ in(Input, Priority, GS2State = #gs2_state { queue = Queue }) -> process_msg({system, From, Req}, GS2State = #gs2_state { parent = Parent, debug = Debug }) -> + %% gen_server puts Hib on the end as the 7th arg, but that version + %% of the fun seems not to be documented so leaving out for now. sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State); process_msg({'EXIT', Parent, Reason} = Msg, GS2State = #gs2_state { parent = Parent }) -> - %% gen_server puts Hib on the end as the 7th arg, but that version - %% of the fun seems not to be documented so leaving out for now. terminate(Reason, Msg, GS2State); process_msg(Msg, GS2State = #gs2_state { debug = [] }) -> handle_msg(Msg, GS2State); @@ -422,9 +422,9 @@ -type(group_name() :: any()). --spec(create_tables/0 :: () -> 'ok'). +-spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}). -spec(start_link/3 :: (group_name(), atom(), any()) -> - {'ok', pid()} | {'error', any()}). + rabbit_types:ok_pid_or_error()). -spec(leave/1 :: (pid()) -> 'ok'). -spec(broadcast/2 :: (pid(), any()) -> 'ok'). -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). diff --git a/src/rabbit.erl b/src/rabbit.erl index 8cae7fde..47bc4433 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -18,7 +18,8 @@ -behaviour(application). --export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, environment/0, +-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, + is_running/0 , is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]). -export([start/2, stop/1]). @@ -196,6 +197,8 @@ {os, {atom(), atom()}} | {erlang_version, string()} | {memory, any()}]). +-spec(is_running/0 :: () -> boolean()). +-spec(is_running/1 :: (node()) -> boolean()). -spec(environment/0 :: () -> [{atom() | term()}]). -spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). @@ -203,6 +206,14 @@ -spec(boot_delegate/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). +-spec(start/2 :: ('normal',[]) -> + {'error', + {'erlang_version_too_old', + {'found',[any()]}, + {'required',[any(),...]}}} | + {'ok',pid()}). +-spec(stop/1 :: (_) -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -221,23 +232,33 @@ start() -> end. stop() -> + rabbit_log:info("Stopping Rabbit~n"), ok = rabbit_misc:stop_applications(application_load_order()). stop_and_halt() -> try stop() after + rabbit_misc:local_info_msg("Halting Erlang VM~n", []), init:stop() end, ok. status() -> [{pid, list_to_integer(os:getpid())}, - {running_applications, application:which_applications()}, + {running_applications, application:which_applications(infinity)}, {os, os:type()}, {erlang_version, erlang:system_info(system_version)}, {memory, erlang:memory()}]. +is_running() -> is_running(node()). + +is_running(Node) -> + case rpc:call(Node, application, which_applications, [infinity]) of + {badrpc, _} -> false; + Apps -> proplists:is_defined(rabbit, Apps) + end. + environment() -> lists:keysort( 1, [P || P = {K, _} <- application:get_all_env(rabbit), @@ -245,6 +266,7 @@ environment() -> rotate_logs(BinarySuffix) -> Suffix = binary_to_list(BinarySuffix), + rabbit_misc:local_info_msg("Rotating logs with suffix '~s'~n", [Suffix]), log_rotation_result(rotate_logs(log_location(kernel), Suffix, rabbit_error_logger_file_h), @@ -442,20 +464,20 @@ insert_default_data() -> ensure_working_log_handlers() -> Handlers = gen_event:which_handlers(error_logger), - ok = ensure_working_log_handler(error_logger_file_h, + ok = ensure_working_log_handler(error_logger_tty_h, rabbit_error_logger_file_h, error_logger_tty_h, log_location(kernel), Handlers), - ok = ensure_working_log_handler(sasl_report_file_h, + ok = ensure_working_log_handler(sasl_report_tty_h, rabbit_sasl_report_file_h, sasl_report_tty_h, log_location(sasl), Handlers), ok. -ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler, +ensure_working_log_handler(OldHandler, NewHandler, TTYHandler, LogLocation, Handlers) -> case LogLocation of undefined -> ok; @@ -465,10 +487,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler, throw({error, {cannot_log_to_tty, TTYHandler, not_installed}}) end; - _ -> case lists:member(NewFHandler, Handlers) of + _ -> case lists:member(NewHandler, Handlers) of true -> ok; false -> case rotate_logs(LogLocation, "", - OldFHandler, NewFHandler) of + OldHandler, NewHandler) of ok -> ok; {error, Reason} -> throw({error, {cannot_log_to_file, @@ -478,10 +500,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler, end. log_location(Type) -> - case application:get_env(Type, case Type of - kernel -> error_logger; - sasl -> sasl_error_logger - end) of + case application:get_env(rabbit, case Type of + kernel -> error_logger; + sasl -> sasl_error_logger + end) of {ok, {file, File}} -> File; {ok, false} -> undefined; {ok, tty} -> tty; diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index c0ae18c0..ca28d686 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -32,6 +32,9 @@ -spec(check_user_pass_login/2 :: (rabbit_types:username(), rabbit_types:password()) -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). +-spec(check_user_login/2 :: + (rabbit_types:username(), [{atom(), any()}]) + -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). -spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) -> 'ok' | rabbit_types:channel_exit()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 88ff26cc..b3e92b69 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -49,7 +49,7 @@ -type(name() :: rabbit_types:r('queue')). -type(qlen() :: rabbit_types:ok(non_neg_integer())). --type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A)). +-type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())). -type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}). -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: @@ -64,6 +64,9 @@ rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> {'new' | 'existing', rabbit_types:amqqueue()} | rabbit_types:channel_exit()). +-spec(internal_declare/2 :: + (rabbit_types:amqqueue(), boolean()) + -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | rabbit_types:error('not_found')). @@ -132,9 +135,6 @@ -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). --spec(internal_declare/2 :: - (rabbit_types:amqqueue(), boolean()) - -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit() | @@ -147,6 +147,7 @@ -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). +-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). -endif. @@ -320,7 +321,7 @@ check_declare_arguments(QueueName, Args) -> ok -> ok; {error, Error} -> rabbit_misc:protocol_error( precondition_failed, - "invalid arg '~s' for ~s: ~w", + "invalid arg '~s' for ~s: ~255p", [Key, rabbit_misc:rs(QueueName), Error]) end || {Key, Fun} <- [{<<"x-expires">>, fun check_integer_argument/2}, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e5038efe..e3a2ca90 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -29,12 +29,12 @@ -export([start_link/1, info_keys/0]). +-export([init_with_backing_queue_state/7]). + -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2, prioritise_info/2, format_message_queue/2]). --export([init_with_backing_queue_state/7]). - %% Queue's state -record(q, {q, exclusive_consumer, @@ -42,7 +42,6 @@ backing_queue, backing_queue_state, active_consumers, - blocked_consumers, expires, sync_timer_ref, rate_timer_ref, @@ -56,14 +55,30 @@ -record(consumer, {tag, ack_required}). %% These are held in our process dictionary --record(cr, {consumer_count, - ch_pid, - limiter, +-record(cr, {ch_pid, monitor_ref, acktags, + consumer_count, + blocked_consumers, + limiter, is_limit_active, unsent_message_count}). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: + (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(init_with_backing_queue_state/7 :: + (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()], + [rabbit_types:delivery()], dict()) -> #q{}). + +-endif. + +%%---------------------------------------------------------------------------- + -define(STATISTICS_KEYS, [pid, exclusive_consumer_pid, @@ -109,7 +124,6 @@ init(Q) -> backing_queue = backing_queue_module(Q), backing_queue_state = undefined, active_consumers = queue:new(), - blocked_consumers = queue:new(), expires = undefined, sync_timer_ref = undefined, rate_timer_ref = undefined, @@ -135,7 +149,6 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, backing_queue = BQ, backing_queue_state = BQS, active_consumers = queue:new(), - blocked_consumers = queue:new(), expires = undefined, sync_timer_ref = undefined, rate_timer_ref = RateTRef, @@ -321,10 +334,11 @@ ch_record(ChPid) -> Key = {ch, ChPid}, case get(Key) of undefined -> MonitorRef = erlang:monitor(process, ChPid), - C = #cr{consumer_count = 0, - ch_pid = ChPid, + C = #cr{ch_pid = ChPid, monitor_ref = MonitorRef, acktags = sets:new(), + consumer_count = 0, + blocked_consumers = queue:new(), is_limit_active = false, limiter = rabbit_limiter:make_token(), unsent_message_count = 0}, @@ -333,18 +347,18 @@ ch_record(ChPid) -> C = #cr{} -> C end. -store_ch_record(C = #cr{ch_pid = ChPid}) -> - put({ch, ChPid}, C). - -maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, - acktags = ChAckTags, - unsent_message_count = UnsentMessageCount}) -> +update_ch_record(C = #cr{consumer_count = ConsumerCount, + acktags = ChAckTags, + unsent_message_count = UnsentMessageCount}) -> case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of - {0, 0, 0} -> ok = erase_ch_record(C), - false; - _ -> store_ch_record(C), - true - end. + {0, 0, 0} -> ok = erase_ch_record(C); + _ -> ok = store_ch_record(C) + end, + C. + +store_ch_record(C = #cr{ch_pid = ChPid}) -> + put({ch, ChPid}, C), + ok. erase_ch_record(#cr{ch_pid = ChPid, limiter = Limiter, @@ -354,8 +368,21 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. +update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> + ok = rabbit_limiter:register(Limiter, self()), + update_ch_record(C#cr{consumer_count = 1}); +update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) -> + ok = rabbit_limiter:unregister(Limiter, self()), + update_ch_record(C#cr{consumer_count = 0, + limiter = rabbit_limiter:make_token()}); +update_consumer_count(C = #cr{consumer_count = Count}, Delta) -> + update_ch_record(C#cr{consumer_count = Count + Delta}). + all_ch_record() -> [C || {{ch, _}, C} <- get()]. +block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> + update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}). + is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. @@ -367,67 +394,56 @@ ch_record_state_transition(OldCR, NewCR) -> end. deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, - State = #q{q = #amqqueue{name = QName}, - active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> - case queue:out(ActiveConsumers) of - {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}}, - ActiveConsumersTail} -> - C = #cr{limiter = Limiter, - unsent_message_count = Count, - acktags = ChAckTags} = ch_record(ChPid), - IsMsgReady = PredFun(FunAcc, State), - case (IsMsgReady andalso - rabbit_limiter:can_send(Limiter, self(), AckRequired)) of - true -> - {{Message, IsDelivered, AckTag}, FunAcc1, State1} = - DeliverFun(AckRequired, FunAcc, State), - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), AckTag, IsDelivered, Message}), - ChAckTags1 = - case AckRequired of - true -> sets:add_element(AckTag, ChAckTags); - false -> ChAckTags - end, - NewC = C#cr{unsent_message_count = Count + 1, - acktags = ChAckTags1}, - true = maybe_store_ch_record(NewC), - {NewActiveConsumers, NewBlockedConsumers} = - case ch_record_state_transition(C, NewC) of - ok -> {queue:in(QEntry, ActiveConsumersTail), - BlockedConsumers}; - block -> {ActiveConsumers1, BlockedConsumers1} = - move_consumers(ChPid, - ActiveConsumersTail, - BlockedConsumers), - {ActiveConsumers1, - queue:in(QEntry, BlockedConsumers1)} - end, - State2 = State1#q{ - active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State2); - %% if IsMsgReady then we've hit the limiter - false when IsMsgReady -> - true = maybe_store_ch_record(C#cr{is_limit_active = true}), - {NewActiveConsumers, NewBlockedConsumers} = - move_consumers(ChPid, - ActiveConsumers, - BlockedConsumers), - deliver_msgs_to_consumers( - Funs, FunAcc, - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}); - false -> - %% no message was ready, so we don't need to block anyone - {FunAcc, State} - end; - {empty, _} -> - {FunAcc, State} + State = #q{active_consumers = ActiveConsumers}) -> + case PredFun(FunAcc, State) of + false -> {FunAcc, State}; + true -> case queue:out(ActiveConsumers) of + {empty, _} -> + {FunAcc, State}; + {{value, QEntry}, Tail} -> + {FunAcc1, State1} = + deliver_msg_to_consumer( + DeliverFun, QEntry, + FunAcc, State#q{active_consumers = Tail}), + deliver_msgs_to_consumers(Funs, FunAcc1, State1) + end end. +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) -> + C = ch_record(ChPid), + case is_ch_blocked(C) of + true -> block_consumer(C, E), + {FunAcc, State}; + false -> case rabbit_limiter:can_send(C#cr.limiter, self(), + Consumer#consumer.ack_required) of + false -> block_consumer(C#cr{is_limit_active = true}, E), + {FunAcc, State}; + true -> AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Consumer, C, FunAcc, + State#q{active_consumers = AC1}) + end + end. + +deliver_msg_to_consumer(DeliverFun, + #consumer{tag = ConsumerTag, + ack_required = AckRequired}, + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + unsent_message_count = Count}, + FunAcc, State = #q{q = #amqqueue{name = QName}}) -> + {{Message, IsDelivered, AckTag}, FunAcc1, State1} = + DeliverFun(AckRequired, FunAcc, State), + rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + {QName, self(), AckTag, IsDelivered, Message}), + ChAckTags1 = case AckRequired of + true -> sets:add_element(AckTag, ChAckTags); + false -> ChAckTags + end, + update_ch_record(C#cr{acktags = ChAckTags1, + unsent_message_count = Count + 1}), + {FunAcc1, State1}. + deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, false, State) -> @@ -545,11 +561,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_record_confirm_message(Confirm, State1), case Delivered of true -> State2; - false -> BQS1 = - BQ:publish(Message, - (message_properties(State)) #message_properties{ - needs_confirming = needs_confirming(Confirm)}, - ChPid, BQS), + false -> Props = (message_properties(State)) #message_properties{ + needs_confirming = needs_confirming(Confirm)}, + BQS1 = BQ:publish(Message, Props, ChPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. @@ -566,44 +580,34 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS, {Result, BQS1} = BQ:fetch(AckRequired, BQS), {Result, State#q{backing_queue_state = BQS1}}. -add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). - remove_consumer(ChPid, ConsumerTag, Queue) -> - queue:filter(fun ({CP, #consumer{tag = CT}}) -> - (CP /= ChPid) or (CT /= ConsumerTag) + queue:filter(fun ({CP, #consumer{tag = CTag}}) -> + (CP /= ChPid) or (CTag /= ConsumerTag) end, Queue). remove_consumers(ChPid, Queue) -> - {Kept, Removed} = split_by_channel(ChPid, Queue), - [emit_consumer_deleted(Ch, CTag) || - {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)], - Kept. - -move_consumers(ChPid, From, To) -> - {Kept, Removed} = split_by_channel(ChPid, From), - {Kept, queue:join(To, Removed)}. - -split_by_channel(ChPid, Queue) -> - {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(Queue)), - {queue:from_list(Kept), queue:from_list(Removed)}. + queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> + emit_consumer_deleted(ChPid, CTag), + false; + (_) -> + true + end, Queue). possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of not_found -> State; C -> - NewC = Update(C), - maybe_store_ch_record(NewC), - case ch_record_state_transition(C, NewC) of - ok -> State; - unblock -> {NewBlockedConsumers, NewActiveConsumers} = - move_consumers(ChPid, - State#q.blocked_consumers, - State#q.active_consumers), - run_message_queue( - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) + C1 = Update(C), + case ch_record_state_transition(C, C1) of + ok -> update_ch_record(C1), + State; + unblock -> #cr{blocked_consumers = Consumers} = C1, + update_ch_record( + C1#cr{blocked_consumers = queue:new()}), + AC1 = queue:join(State#q.active_consumers, + Consumers), + run_message_queue(State#q{active_consumers = AC1}) end end. @@ -615,7 +619,10 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> {ok, State}; - C = #cr{ch_pid = ChPid, acktags = ChAckTags} -> + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + blocked_consumers = Blocked} -> + _ = remove_consumers(ChPid, Blocked), %% for stats emission ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of @@ -623,9 +630,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers), - blocked_consumers = remove_consumers( - ChPid, State#q.blocked_consumers)}, + ChPid, State#q.active_consumers)}, case should_auto_delete(State1) of true -> {stop, State1}; false -> {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -633,11 +638,6 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> end end. -cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> - none; -cancel_holder(_ChPid, _ConsumerTag, Holder) -> - Holder. - check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; check_exclusive_access(none, false, _State) -> @@ -648,8 +648,15 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso - queue:is_empty(State#q.blocked_consumers). +consumer_count() -> consumer_count(fun (_) -> false end). + +active_consumer_count() -> consumer_count(fun is_ch_blocked/1). + +consumer_count(Exclude) -> + lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(), + not Exclude(C)]). + +is_unused(_State) -> consumer_count() == 0. maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). @@ -663,8 +670,15 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). -subtract_acks(A, B) when is_list(B) -> - lists:foldl(fun sets:del_element/2, A, B). +subtract_acks(ChPid, AckTags, State, Fun) -> + case lookup_ch(ChPid) of + not_found -> + State; + C = #cr{acktags = ChAckTags} -> + update_ch_record(C#cr{acktags = lists:foldl(fun sets:del_element/2, + ChAckTags, AckTags)}), + Fun(State) + end. discard_delivery(#delivery{sender = ChPid, message = Message}, @@ -768,8 +782,8 @@ i(messages_unacknowledged, _) -> i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, messages_unacknowledged]]); -i(consumers, State) -> - queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers); +i(consumers, _) -> + consumer_count(); i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -785,13 +799,15 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). -consumers(#q{active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> +consumers(#q{active_consumers = ActiveConsumers}) -> + lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, + consumers(ActiveConsumers, []), all_ch_record()). + +consumers(Consumers, Acc) -> rabbit_misc:queue_fold( - fun ({ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}, Acc) -> - [{ChPid, ConsumerTag, AckRequired} | Acc] - end, [], queue:join(ActiveConsumers, BlockedConsumers)). + fun ({ChPid, #consumer{tag = CTag, ack_required = AckRequired}}, Acc1) -> + [{ChPid, CTag, AckRequired} | Acc1] + end, Acc, Consumers). emit_stats(State) -> emit_stats(State, []). @@ -931,10 +947,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, State3 = case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - true = maybe_store_ch_record( - C#cr{acktags = - sets:add_element(AckTag, - ChAckTags)}), + ChAckTags1 = sets:add_element(AckTag, ChAckTags), + update_ch_record(C#cr{acktags = ChAckTags1}), State2; false -> State2 end, @@ -950,33 +964,24 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + C = ch_record(ChPid), + C1 = update_consumer_count(C#cr{limiter = Limiter}, +1), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - true = maybe_store_ch_record( - C#cr{consumer_count = ConsumerCount +1, - limiter = Limiter}), - ok = case ConsumerCount of - 0 -> rabbit_limiter:register(Limiter, self()); - _ -> ok - end, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> ExistingHolder end, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), + E = {ChPid, Consumer}, State2 = - case is_ch_blocked(C) of - true -> State1#q{ - blocked_consumers = - add_consumer(ChPid, Consumer, - State1#q.blocked_consumers)}; - false -> run_message_queue( - State1#q{ - active_consumers = - add_consumer(ChPid, Consumer, - State1#q.active_consumers)}) + case is_ch_blocked(C1) of + true -> block_consumer(C1, E), + State1; + false -> update_ch_record(C1), + AC1 = queue:in(E, State1#q.active_consumers), + run_message_queue(State1#q{active_consumers = AC1}) end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck), @@ -985,42 +990,32 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> + ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of not_found -> - ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumer_count = ConsumerCount, - limiter = Limiter} -> - C1 = C#cr{consumer_count = ConsumerCount -1}, - maybe_store_ch_record( - case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(Limiter, self()), - C1#cr{limiter = rabbit_limiter:make_token()}; - _ -> C1 - end), + C = #cr{blocked_consumers = Blocked} -> emit_consumer_deleted(ChPid, ConsumerTag), - ok = maybe_send_reply(ChPid, OkMsg), - NewState = - State#q{exclusive_consumer = cancel_holder(ChPid, - ConsumerTag, - Holder), - active_consumers = remove_consumer( - ChPid, ConsumerTag, - State#q.active_consumers), - blocked_consumers = remove_consumer( + Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), + update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), + State1 = State#q{ + exclusive_consumer = case Holder of + {ChPid, ConsumerTag} -> none; + _ -> Holder + end, + active_consumers = remove_consumer( ChPid, ConsumerTag, - State#q.blocked_consumers)}, - case should_auto_delete(NewState) of - false -> reply(ok, ensure_expiry_timer(NewState)); - true -> {stop, normal, ok, NewState} + State#q.active_consumers)}, + case should_auto_delete(State1) of + false -> reply(ok, ensure_expiry_timer(State1)); + true -> {stop, normal, ok, State1} end end; handle_call(stat, _From, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS, - active_consumers = ActiveConsumers} = + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(ensure_expiry_timer(State)), - reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1); + reply({ok, BQ:len(BQS), active_consumer_count()}, State1); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -1042,14 +1037,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(requeue_and_run(AckTags, State)) - end. + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1) -> requeue_and_run(AckTags, State1) end)). handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -1058,33 +1048,26 @@ handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. noreply(deliver_or_enqueue(Delivery, State)); -handle_cast({ack, AckTags, ChPid}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - maybe_store_ch_record(C#cr{acktags = subtract_acks( - ChAckTags, AckTags)}), - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - noreply(State#q{backing_queue_state = BQS1}) - end; - -handle_cast({reject, AckTags, Requeue, ChPid}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(case Requeue of - true -> requeue_and_run(AckTags, State); - false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State#q{backing_queue_state = BQS1} - end) - end; +handle_cast({ack, AckTags, ChPid}, State) -> + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1#q{backing_queue_state = BQS1} + end)); + +handle_cast({reject, AckTags, Requeue, ChPid}, State) -> + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + case Requeue of + true -> requeue_and_run(AckTags, State1); + false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1#q{backing_queue_state = BQS1} + end + end)); handle_cast(delete_immediately, State) -> {stop, normal, State}; diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 2c28adce..7b3ebcf2 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -26,6 +26,20 @@ -define(SERVER, ?MODULE). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_child/2 :: + (node(), [any()]) -> rabbit_types:ok(pid() | undefined) | + rabbit_types:ok({pid(), any()}) | + rabbit_types:error(any())). + +-endif. + +%%---------------------------------------------------------------------------- + start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 6a018bd1..086a90b4 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -110,17 +110,13 @@ internal_check_user_login(Username, Fun) -> Refused end. -check_vhost_access(#user{username = Username}, VHost) -> - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_user_permission, - #user_vhost{username = Username, - virtual_host = VHost}}) of - [] -> false; - [_R] -> true - end - end). +check_vhost_access(#user{username = Username}, VHostPath) -> + case mnesia:dirty_read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of + [] -> false; + [_R] -> true + end. check_resource_access(#user{username = Username}, #resource{virtual_host = VHostPath, name = Name}, @@ -150,6 +146,7 @@ permission_index(read) -> #permission.read. %% Manipulation of the user database add_user(Username, Password) -> + rabbit_log:info("Creating user '~s'~n", [Username]), R = rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_user, Username}) of @@ -165,10 +162,10 @@ add_user(Username, Password) -> mnesia:abort({user_already_exists, Username}) end end), - rabbit_log:info("Created user ~p~n", [Username]), R. delete_user(Username) -> + rabbit_log:info("Deleting user '~s'~n", [Username]), R = rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user( Username, @@ -185,13 +182,14 @@ delete_user(Username) -> write)], ok end)), - rabbit_log:info("Deleted user ~p~n", [Username]), R. change_password(Username, Password) -> + rabbit_log:info("Changing password for '~s'~n", [Username]), change_password_hash(Username, hash_password(Password)). clear_password(Username) -> + rabbit_log:info("Clearing password for '~s'~n", [Username]), change_password_hash(Username, <<"">>). change_password_hash(Username, PasswordHash) -> @@ -199,7 +197,6 @@ change_password_hash(Username, PasswordHash) -> User#internal_user{ password_hash = PasswordHash } end), - rabbit_log:info("Changed password for user ~p~n", [Username]), R. hash_password(Cleartext) -> @@ -221,11 +218,10 @@ salted_md5(Salt, Cleartext) -> erlang:md5(Salted). set_tags(Username, Tags) -> + rabbit_log:info("Setting user tags for user '~s' to ~p~n", [Username, Tags]), R = update_user(Username, fun(User) -> User#internal_user{tags = Tags} end), - rabbit_log:info("Set user tags for user ~p to ~p~n", - [Username, Tags]), R. update_user(Username, Fun) -> @@ -255,6 +251,8 @@ validate_regexp(RegexpBin) -> end. set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> + rabbit_log:info("Setting permissions for '~s' in '~s' to '~s', '~s', '~s'~n", + [Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm]), lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 9cc406e7..b266d366 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,8 +18,8 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/3, message/4, properties/1, delivery/4]). --export([publish/4, publish/6]). +-export([publish/4, publish/6, publish/1, + message/3, message/4, properties/1, delivery/4]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -35,6 +35,12 @@ -type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). -type(body_input() :: (binary() | [binary()])). +-spec(publish/4 :: + (exchange_input(), rabbit_router:routing_key(), properties_input(), + body_input()) -> publish_result()). +-spec(publish/6 :: + (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), + properties_input(), body_input()) -> publish_result()). -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). -spec(delivery/4 :: @@ -49,12 +55,6 @@ rabbit_types:ok_or_error2(rabbit_types:message(), any())). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). --spec(publish/4 :: - (exchange_input(), rabbit_router:routing_key(), properties_input(), - body_input()) -> publish_result()). --spec(publish/6 :: - (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), - properties_input(), body_input()) -> publish_result()). -spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> @@ -64,13 +64,34 @@ %%---------------------------------------------------------------------------- +%% Convenience function, for avoiding round-trips in calls across the +%% erlang distributed network. +publish(Exchange, RoutingKeyBin, Properties, Body) -> + publish(Exchange, RoutingKeyBin, false, false, Properties, Body). + +%% Convenience function, for avoiding round-trips in calls across the +%% erlang distributed network. +publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) -> + publish(X, delivery(Mandatory, Immediate, + message(XName, RKey, properties(Props), Body), + undefined)); +publish(XName, RKey, Mandatory, Immediate, Props, Body) -> + publish(delivery(Mandatory, Immediate, + message(XName, RKey, properties(Props), Body), + undefined)). + publish(Delivery = #delivery{ - message = #basic_message{exchange_name = ExchangeName}}) -> - case rabbit_exchange:lookup(ExchangeName) of + message = #basic_message{exchange_name = XName}}) -> + case rabbit_exchange:lookup(XName) of {ok, X} -> publish(X, Delivery); - Other -> Other + Err -> Err end. +publish(X, Delivery) -> + {RoutingRes, DeliveredQPids} = + rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery), + {ok, RoutingRes, DeliveredQPids}. + delivery(Mandatory, Immediate, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. @@ -113,11 +134,10 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} headers = Headers0}}) end. -message(ExchangeName, RoutingKey, - #content{properties = Props} = DecodedContent) -> +message(XName, RoutingKey, #content{properties = Props} = DecodedContent) -> try {ok, #basic_message{ - exchange_name = ExchangeName, + exchange_name = XName, content = strip_header(DecodedContent, ?DELETED_HEADER), id = rabbit_guid:guid(), is_persistent = is_message_persistent(DecodedContent), @@ -127,10 +147,10 @@ message(ExchangeName, RoutingKey, {error, _Reason} = Error -> Error end. -message(ExchangeName, RoutingKey, RawProperties, Body) -> +message(XName, RoutingKey, RawProperties, Body) -> Properties = properties(RawProperties), Content = build_content(Properties, Body), - {ok, Msg} = message(ExchangeName, RoutingKey, Content), + {ok, Msg} = message(XName, RoutingKey, Content), Msg. properties(P = #'P_basic'{}) -> @@ -152,28 +172,6 @@ indexof([], _Element, _N) -> 0; indexof([Element | _Rest], Element, N) -> N; indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). -%% Convenience function, for avoiding round-trips in calls across the -%% erlang distributed network. -publish(Exchange, RoutingKeyBin, Properties, Body) -> - publish(Exchange, RoutingKeyBin, false, false, Properties, Body). - -%% Convenience function, for avoiding round-trips in calls across the -%% erlang distributed network. -publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) -> - publish(X, delivery(Mandatory, Immediate, - message(XName, RKey, properties(Props), Body), - undefined)); -publish(XName, RKey, Mandatory, Immediate, Props, Body) -> - case rabbit_exchange:lookup(XName) of - {ok, X} -> publish(X, RKey, Mandatory, Immediate, Props, Body); - Err -> Err - end. - -publish(X, Delivery) -> - {RoutingRes, DeliveredQPids} = - rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery), - {ok, RoutingRes, DeliveredQPids}. - is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> case Mode of diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 205d5bba..e625a427 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -40,7 +40,7 @@ 'source_and_destination_not_found')). -type(bind_ok_or_error() :: 'ok' | bind_errors() | rabbit_types:error('binding_not_found')). --type(bind_res() :: bind_ok_or_error() | rabbit_misc:const(bind_ok_or_error())). +-type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())). -type(inner_fun() :: fun((rabbit_types:exchange(), rabbit_types:exchange() | rabbit_types:amqqueue()) -> @@ -108,21 +108,34 @@ recover(XNames, QNames) -> SelectSet = fun (#resource{kind = exchange}) -> XNameSet; (#resource{kind = queue}) -> QNameSet end, - [recover_semi_durable_route(R, SelectSet(Dst)) || + {ok, Gatherer} = gatherer:start_link(), + [recover_semi_durable_route(Gatherer, R, SelectSet(Dst)) || R = #route{binding = #binding{destination = Dst}} <- rabbit_misc:dirty_read_all(rabbit_semi_durable_route)], + empty = gatherer:out(Gatherer), + ok = gatherer:stop(Gatherer), ok. -recover_semi_durable_route(R = #route{binding = B}, ToRecover) -> +recover_semi_durable_route(Gatherer, R = #route{binding = B}, ToRecover) -> #binding{source = Src, destination = Dst} = B, - {ok, X} = rabbit_exchange:lookup(Src), + case sets:is_element(Dst, ToRecover) of + true -> {ok, X} = rabbit_exchange:lookup(Src), + ok = gatherer:fork(Gatherer), + ok = worker_pool:submit_async( + fun () -> + recover_semi_durable_route_txn(R, X), + gatherer:finish(Gatherer) + end); + false -> ok + end. + +recover_semi_durable_route_txn(R = #route{binding = B}, X) -> rabbit_misc:execute_mnesia_transaction( fun () -> - Rs = mnesia:match_object(rabbit_semi_durable_route, R, read), - case Rs =/= [] andalso sets:is_element(Dst, ToRecover) of - false -> no_recover; - true -> ok = sync_transient_route(R, fun mnesia:write/3), - rabbit_exchange:serial(X) + case mnesia:match_object(rabbit_semi_durable_route, R, read) of + [] -> no_recover; + _ -> ok = sync_transient_route(R, fun mnesia:write/3), + rabbit_exchange:serial(X) end end, fun (no_recover, _) -> ok; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dfe84644..d2f55277 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,8 +35,8 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_ack_q, - user, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, consumer_monitors, queue_collector_pid, + user, virtual_host, most_recently_declared_queue, queue_monitors, + consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, confirmed, capabilities, trace_state}). @@ -189,9 +189,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, + queue_monitors = dict:new(), consumer_mapping = dict:new(), - blocking = dict:new(), - consumer_monitors = dict:new(), + blocking = sets:new(), + queue_consumers = dict:new(), queue_collector_pid = CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, @@ -275,7 +276,7 @@ handle_cast(terminate, State) -> handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - noreply(monitor_consumer(ConsumerTag, State)); + noreply(consumer_monitor(ConsumerTag, State)); handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), @@ -299,13 +300,13 @@ handle_cast({deliver, ConsumerTag, AckRequired, exchange = ExchangeName#resource.name, routing_key = RoutingKey}, rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - maybe_incr_stats([{QPid, 1}], case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State), - maybe_incr_redeliver_stats(Redelivered, QPid, State), + State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State1), + State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), rabbit_trace:tap_trace_out(Msg, TraceState), - noreply(State1#ch{next_tag = DeliveryTag + 1}); + noreply(State3#ch{next_tag = DeliveryTag + 1}); handle_cast(force_event_refresh, State) -> @@ -323,15 +324,13 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) -> noreply([ensure_stats_timer], State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); -handle_info({'DOWN', MRef, process, QPid, Reason}, - State = #ch{consumer_monitors = ConsumerMonitors}) -> - noreply( - case dict:find(MRef, ConsumerMonitors) of - error -> - handle_publishing_queue_down(QPid, Reason, State); - {ok, ConsumerTag} -> - handle_consuming_queue_down(MRef, ConsumerTag, State) - end); +handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> + State1 = handle_publishing_queue_down(QPid, Reason, State), + State2 = queue_blocked(QPid, State1), + State3 = handle_consuming_queue_down(QPid, State2), + erase_queue_stats(QPid), + noreply(State3#ch{queue_monitors = + dict:erase(QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -516,17 +515,16 @@ check_name(_Kind, NameBin) -> NameBin. queue_blocked(QPid, State = #ch{blocking = Blocking}) -> - case dict:find(QPid, Blocking) of - error -> State; - {ok, MRef} -> true = erlang:demonitor(MRef), - Blocking1 = dict:erase(QPid, Blocking), - ok = case dict:size(Blocking1) of - 0 -> rabbit_writer:send_command( - State#ch.writer_pid, - #'channel.flow_ok'{active = false}); - _ -> ok - end, - State#ch{blocking = Blocking1} + case sets:is_element(QPid, Blocking) of + false -> State; + true -> Blocking1 = sets:del_element(QPid, Blocking), + ok = case sets:size(Blocking1) of + 0 -> rabbit_writer:send_command( + State#ch.writer_pid, + #'channel.flow_ok'{active = false}); + _ -> ok + end, + demonitor_queue(QPid, State#ch{blocking = Blocking1}) end. record_confirm(undefined, _, State) -> @@ -545,38 +543,41 @@ confirm(MsgSeqNos, QPid, State) -> {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State), record_confirms(MXs, State1). -process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM}) -> - {MXs, UMQ1, UQM1} = - lists:foldl( - fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UMQ0) of - {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, - Acc, Nack); - none -> Acc - end - end, {[], UMQ, UQM}, MsgSeqNos), - {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. - -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) -> - UQM1 = case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), - case gb_sets:is_empty(MsgSeqNos1) of - true -> gb_trees:delete(QPid, UQM); - false -> gb_trees:update(QPid, MsgSeqNos1, UQM) - end; - none -> - UQM - end, +process_confirms(MsgSeqNos, QPid, Nack, State) -> + lists:foldl( + fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UMQ0) of + {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, + Acc, Nack); + none -> Acc + end + end, {[], State}, MsgSeqNos). + +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, + {MXs, State = #ch{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM}}, + Nack) -> + State1 = case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), + case gb_sets:is_empty(MsgSeqNos1) of + true -> UQM1 = gb_trees:delete(QPid, UQM), + demonitor_queue( + QPid, State#ch{unconfirmed_qm = UQM1}); + false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), + State#ch{unconfirmed_qm = UQM1} + end; + none -> + State + end, Qs1 = gb_sets:del_element(QPid, Qs), %% If QPid somehow died initiating a nack, clear the message from %% internal data-structures. Also, cleanup empty entries. case (Nack orelse gb_sets:is_empty(Qs1)) of - true -> - {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1}; - false -> - {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} + true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ), + {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}}; + false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), + {MXs, State1#ch{unconfirmed_mq = UMQ1}} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -693,11 +694,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, State1 = lock_message(not(NoAck), ack_record(DeliveryTag, none, Msg), State), - maybe_incr_stats([{QPid, 1}], case NoAck of - true -> get_no_ack; - false -> get - end, State), - maybe_incr_redeliver_stats(Redelivered, QPid, State), + State2 = maybe_incr_stats([{QPid, 1}], case NoAck of + true -> get_no_ack; + false -> get + end, State1), + State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), rabbit_trace:tap_trace_out(Msg, TraceState), ok = rabbit_writer:send_command( WriterPid, @@ -707,7 +708,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State1#ch{next_tag = DeliveryTag + 1}}; + {noreply, State3#ch{next_tag = DeliveryTag + 1}}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -746,12 +747,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end) of {ok, Q} -> State1 = State#ch{consumer_mapping = - dict:store(ActualConsumerTag, - {Q, undefined}, + dict:store(ActualConsumerTag, Q, ConsumerMapping)}, {noreply, case NoWait of - true -> monitor_consumer(ActualConsumerTag, State1); + true -> consumer_monitor(ActualConsumerTag, State1); false -> State1 end}; {{error, exclusive_consume_unavailable}, _Q} -> @@ -768,22 +768,26 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, _, State = #ch{consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors}) -> + queue_consumers = QCons}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); - {ok, {Q, MRef}} -> - ConsumerMonitors1 = - case MRef of - undefined -> ConsumerMonitors; - _ -> true = erlang:demonitor(MRef), - dict:erase(MRef, ConsumerMonitors) + {ok, Q = #amqqueue{pid = QPid}} -> + ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), + QCons1 = + case dict:find(QPid, QCons) of + error -> QCons; + {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags), + case gb_sets:is_empty(CTags1) of + true -> dict:erase(QPid, QCons); + false -> dict:store(QPid, CTags1, QCons) + end end, - NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag, - ConsumerMapping), - consumer_monitors = ConsumerMonitors1}, + NewState = demonitor_queue( + Q, State#ch{consumer_mapping = ConsumerMapping1, + queue_consumers = QCons1}), %% In order to ensure that no more messages are sent to %% the consumer after the cancel_ok has been sent, we get %% the queue process to send the cancel_ok on our @@ -1108,10 +1112,12 @@ handle_method(#'channel.flow'{active = false}, _, ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || - QPid <- QPids], + QPids -> State2 = lists:foldl(fun monitor_queue/2, + State1#ch{blocking = + sets:from_list(QPids)}, + QPids), ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, State1#ch{blocking = dict:from_list(Queues)}} + {noreply, State2} end; handle_method(_MethodRecord, _Content, _State) -> @@ -1120,23 +1126,51 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors, - capabilities = Capabilities}) -> +consumer_monitor(ConsumerTag, + State = #ch{consumer_mapping = ConsumerMapping, + queue_consumers = QCons, + capabilities = Capabilities}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> - {#amqqueue{pid = QPid} = Q, undefined} = - dict:fetch(ConsumerTag, ConsumerMapping), - MRef = erlang:monitor(process, QPid), - State#ch{consumer_mapping = - dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping), - consumer_monitors = - dict:store(MRef, ConsumerTag, ConsumerMonitors)}; + #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), + QCons1 = dict:update(QPid, + fun (CTags) -> + gb_sets:insert(ConsumerTag, CTags) + end, + gb_sets:singleton(ConsumerTag), + QCons), + monitor_queue(QPid, State#ch{queue_consumers = QCons1}); _ -> State end. +monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> + case (not dict:is_key(QPid, QMons) andalso + queue_monitor_needed(QPid, State)) of + true -> MRef = erlang:monitor(process, QPid), + State#ch{queue_monitors = dict:store(QPid, MRef, QMons)}; + false -> State + end. + +demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> + case (dict:is_key(QPid, QMons) andalso + not queue_monitor_needed(QPid, State)) of + true -> true = erlang:demonitor(dict:fetch(QPid, QMons)), + State#ch{queue_monitors = dict:erase(QPid, QMons)}; + false -> State + end. + +queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer, + queue_consumers = QCons, + blocking = Blocking, + unconfirmed_qm = UQM}) -> + StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, + ConsumerMonitored = dict:is_key(QPid, QCons), + QueueBlocked = sets:is_element(QPid, Blocking), + ConfirmMonitored = gb_trees:is_defined(QPid, UQM), + StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. + handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); @@ -1157,21 +1191,25 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> {true, fun send_nacks/2} end, {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), - erase_queue_stats(QPid), - State3 = SendFun(MXs, State2), - queue_blocked(QPid, State3). - -handle_consuming_queue_down(MRef, ConsumerTag, - State = #ch{consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors, - writer_pid = WriterPid}) -> - ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), - ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors), - Cancel = #'basic.cancel'{consumer_tag = ConsumerTag, - nowait = true}, - ok = rabbit_writer:send_command(WriterPid, Cancel), + SendFun(MXs, State2). + +handle_consuming_queue_down(QPid, + State = #ch{consumer_mapping = ConsumerMapping, + queue_consumers = QCons, + writer_pid = WriterPid}) -> + ConsumerTags = case dict:find(QPid, QCons) of + error -> gb_sets:new(); + {ok, CTags} -> CTags + end, + ConsumerMapping1 = + gb_sets:fold(fun (CTag, CMap) -> + Cancel = #'basic.cancel'{consumer_tag = CTag, + nowait = true}, + ok = rabbit_writer:send_command(WriterPid, Cancel), + dict:erase(CTag, CMap) + end, ConsumerMapping, ConsumerTags), State#ch{consumer_mapping = ConsumerMapping1, - consumer_monitors = ConsumerMonitors1}. + queue_consumers = dict:erase(QPid, QCons)}. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, @@ -1271,9 +1309,8 @@ ack(Acked, State) -> ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), [{QPid, length(MsgIds)} | L] end, [], Acked), - maybe_incr_stats(QIncs, ack, State), ok = notify_limiter(State#ch.limiter, Acked), - State. + maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), uncommitted_ack_q = queue:new()}. @@ -1307,8 +1344,7 @@ limit_queues(Limiter, #ch{consumer_mapping = Consumers}) -> consumer_queues(Consumers) -> lists:usort([QPid || - {_Key, {#amqqueue{pid = QPid}, _MRef}} - <- dict:to_list(Consumers)]). + {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for @@ -1334,38 +1370,37 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ XName, MsgSeqNo, Message, State), maybe_incr_stats([{XName, 1} | [{{QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State1), - State1. + QPid <- DeliveredQPids]], publish, State1). process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], - return_unroutable, State), - record_confirm(MsgSeqNo, XName, State); + record_confirm(MsgSeqNo, XName, + maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_unroutable, State)); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), - maybe_incr_stats([{XName, 1}], return_not_delivered, State), - record_confirm(MsgSeqNo, XName, State); + record_confirm(MsgSeqNo, XName, + maybe_incr_stats([{XName, 1}], return_not_delivered, State)); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State, + #ch{unconfirmed_mq = UMQ} = State, UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), SingletonSet = gb_sets:singleton(MsgSeqNo), - UQM1 = lists:foldl( - fun (QPid, UQM2) -> - maybe_monitor(QPid), - case gb_trees:lookup(QPid, UQM2) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), - gb_trees:update(QPid, MsgSeqNos1, UQM2); - none -> - gb_trees:insert(QPid, SingletonSet, UQM2) - end - end, UQM, QPids), - State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}. + lists:foldl( + fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) -> + case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), + UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), + State0#ch{unconfirmed_qm = UQM1}; + none -> + UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), + monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1}) + end + end, State#ch{unconfirmed_mq = UMQ1}, QPids). lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1385,11 +1420,13 @@ send_nacks(_, State) -> maybe_complete_tx(State#ch{tx_status = failed}). send_confirms(State = #ch{tx_status = none, confirmed = C}) -> - C1 = lists:append(C), - MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), - MsgSeqNo - end || {MsgSeqNo, ExchangeName} <- C1 ], - send_confirms(MsgSeqNos, State #ch{confirmed = []}); + {MsgSeqNos, State1} = + lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) -> + {[MsgSeqNo | MSNs], + maybe_incr_stats([{ExchangeName, 1}], confirm, + State0)} + end, {[], State}, lists:append(C)), + send_confirms(MsgSeqNos, State1 #ch{confirmed = []}); send_confirms(State) -> maybe_complete_tx(State). @@ -1469,30 +1506,26 @@ i(Item, _) -> maybe_incr_redeliver_stats(true, QPid, State) -> maybe_incr_stats([{QPid, 1}], redeliver, State); -maybe_incr_redeliver_stats(_, _, _) -> - ok. +maybe_incr_redeliver_stats(_, _, State) -> + State. -maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) -> +maybe_incr_stats(QXIncs, Measure, State = #ch{stats_timer = StatsTimer}) -> case rabbit_event:stats_level(StatsTimer) of - fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; - _ -> ok + fine -> lists:foldl(fun ({QX, Inc}, State0) -> + incr_stats(QX, Inc, Measure, State0) + end, State, QXIncs); + _ -> State end. -incr_stats({QPid, _} = QX, Inc, Measure) -> - maybe_monitor(QPid), - update_measures(queue_exchange_stats, QX, Inc, Measure); -incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> - maybe_monitor(QPid), - update_measures(queue_stats, QPid, Inc, Measure); -incr_stats(X, Inc, Measure) -> - update_measures(exchange_stats, X, Inc, Measure). - -maybe_monitor(QPid) -> - case get({monitoring, QPid}) of - undefined -> erlang:monitor(process, QPid), - put({monitoring, QPid}, true); - _ -> ok - end. +incr_stats({QPid, _} = QX, Inc, Measure, State) -> + update_measures(queue_exchange_stats, QX, Inc, Measure), + monitor_queue(QPid, State); +incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) -> + update_measures(queue_stats, QPid, Inc, Measure), + monitor_queue(QPid, State); +incr_stats(X, Inc, Measure, State) -> + update_measures(exchange_stats, X, Inc, Measure), + State. update_measures(Type, QX, Inc, Measure) -> Measures = case get({Type, QX}) of @@ -1528,7 +1561,6 @@ emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> end. erase_queue_stats(QPid) -> - erase({monitoring, QPid}), erase({queue_stats, QPid}), [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index 15e92542..dfb400e3 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -28,8 +28,7 @@ -ifdef(use_specs). --spec(start_link/1 :: (mfa()) -> - rabbit_types:ok_pid_or_error()). +-spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()). -spec(start_link/2 :: ({'local', atom()}, mfa()) -> rabbit_types:ok_pid_or_error()). diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl index 07036ce8..a0953eab 100644 --- a/src/rabbit_command_assembler.erl +++ b/src/rabbit_command_assembler.erl @@ -22,8 +22,12 @@ %%---------------------------------------------------------------------------- +%%---------------------------------------------------------------------------- + -ifdef(use_specs). +-export_type([frame/0]). + -type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY | ?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY | ?FRAME_TRACE | ?FRAME_HEARTBEAT). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index e8afed0c..1163ae9d 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -17,10 +17,9 @@ -module(rabbit_control). -include("rabbit.hrl"). --export([start/0, stop/0, action/5, diagnostics/1, log_action/3]). +-export([start/0, stop/0, action/5, diagnostics/1]). -define(RPC_TIMEOUT, infinity). --define(WAIT_FOR_VM_ATTEMPTS, 5). -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). @@ -51,7 +50,6 @@ -> 'ok'). -spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]). -spec(usage/0 :: () -> no_return()). --spec(log_action/3 :: (node(), string(), [term()]) -> ok). -endif. @@ -74,7 +72,6 @@ start() -> Command = list_to_atom(Command0), Quiet = proplists:get_bool(?QUIET_OPT, Opts1), Node = proplists:get_value(?NODE_OPT, Opts1), - rpc_call(Node, rabbit_control, log_action, [node(), Command0, Args]), Inform = case Quiet of true -> fun (_Format, _Args1) -> ok end; false -> fun (Format, Args1) -> @@ -193,9 +190,9 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); -action(wait, Node, [], _Opts, Inform) -> +action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), - wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS); + wait_for_application(Node, PidFile, Inform); action(status, Node, [], _Opts, Inform) -> Inform("Status of node ~p", [Node]), @@ -356,23 +353,63 @@ action(report, Node, _Args, _Opts, Inform) -> %%---------------------------------------------------------------------------- -wait_for_application(Node, Attempts) -> - case rpc_call(Node, application, which_applications, [infinity]) of - {badrpc, _} = E -> case Attempts of - 0 -> E; - _ -> wait_for_application0(Node, Attempts - 1) - end; - Apps -> case proplists:is_defined(rabbit, Apps) of - %% We've seen the node up; if it goes down - %% die immediately. - true -> ok; - false -> wait_for_application0(Node, 0) - end +wait_for_application(Node, PidFile, Inform) -> + Pid = wait_and_read_pid_file(PidFile), + Inform("pid is ~s", [Pid]), + wait_for_application(Node, Pid). + +wait_for_application(Node, Pid) -> + case process_up(Pid) of + true -> case rabbit:is_running(Node) of + true -> ok; + false -> timer:sleep(1000), + wait_for_application(Node, Pid) + end; + false -> {error, process_not_running} end. -wait_for_application0(Node, Attempts) -> - timer:sleep(1000), - wait_for_application(Node, Attempts). +wait_and_read_pid_file(PidFile) -> + case file:read_file(PidFile) of + {ok, Bin} -> string:strip(binary_to_list(Bin), right, $\n); + {error, enoent} -> timer:sleep(500), + wait_and_read_pid_file(PidFile); + {error, _} = E -> exit({error, {could_not_read_pid, E}}) + end. + +% Test using some OS clunkiness since we shouldn't trust +% rpc:call(os, getpid, []) at this point +process_up(Pid) -> + with_os([{unix, fun () -> + system("ps -p " ++ Pid + ++ " >/dev/null 2>&1") =:= 0 + end}, + {win32, fun () -> + Res = os:cmd("tasklist /nh /fi \"pid eq " ++ + Pid ++ "\" 2>&1"), + case re:run(Res, "erl\\.exe", [{capture, none}]) of + match -> true; + _ -> false + end + end}]). + +with_os(Handlers) -> + {OsFamily, _} = os:type(), + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() + end. + +% Like system(3) +system(Cmd) -> + ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", + Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), + receive {Port, {exit_status, Status}} -> Status end. + +% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" +escape_quotes(Cmd) -> + lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). + +%%---------------------------------------------------------------------------- default_if_empty(List, Default) when is_list(List) -> if List == [] -> Default; @@ -476,22 +513,3 @@ quit(Status) -> {unix, _} -> halt(Status); {win32, _} -> init:stop(Status) end. - -log_action(Node, Command, Args) -> - rabbit_misc:with_local_io( - fun () -> - error_logger:info_msg("~p executing~n rabbitmqctl ~s ~s~n", - [Node, Command, - format_args(mask_args(Command, Args))]) - end). - -%% Mask passwords and other sensitive info before logging. -mask_args("add_user", [Name, _Password | Args]) -> - [Name, "****" | Args]; -mask_args("change_password", [Name, _Password | Args]) -> - [Name, "****" | Args]; -mask_args(_, Args) -> - Args. - -format_args(Args) -> - string:join([io_lib:format("~p", [A]) || A <- Args], " "). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 68afaf5d..6f9a4650 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -72,7 +72,7 @@ list() -> %%---------------------------------------------------------------------------- connect(Username, VHost, Protocol, Pid, Infos) -> - case lists:keymember(rabbit, 1, application:which_applications()) of + case rabbit:is_running() of true -> case rabbit_access_control:check_user_login(Username, []) of {ok, User} -> diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 93aad9e3..6e29ace7 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -27,6 +27,16 @@ -export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(boot/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + boot() -> {ok, DefaultVHost} = application:get_env(default_vhost), ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]). diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 7e9ebc4f..7b6e07c1 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -26,11 +26,16 @@ %% with the result of closing the old handler when swapping handlers. %% The first init/1 additionally allows for simple log rotation %% when the suffix is not the empty string. +%% The original init/2 also opened the file in 'write' mode, thus +%% overwriting old logs. To remedy this, init/2 from +%% lib/stdlib/src/error_logger_file_h.erl from R14B3 was copied as +%% init_file/2 and changed so that it opens the file in 'append' mode. %% Used only when swapping handlers in log rotation init({{File, Suffix}, []}) -> - case rabbit_misc:append_file(File, Suffix) of - ok -> ok; + case rabbit_file:append_file(File, Suffix) of + ok -> file:delete(File), + ok; {error, Error} -> rabbit_log:error("Failed to append contents of " "log file '~s' to '~s':~n~p~n", @@ -45,12 +50,31 @@ init({{File, _}, error}) -> %% log rotation init({File, []}) -> init(File); -init({File, _Type} = FileInfo) -> - rabbit_misc:ensure_parent_dirs_exist(File), - error_logger_file_h:init(FileInfo); +%% Used only when taking over from the tty handler +init({{File, []}, _}) -> + init(File); +init({File, {error_logger, Buf}}) -> + rabbit_file:ensure_parent_dirs_exist(File), + init_file(File, {error_logger, Buf}); init(File) -> - rabbit_misc:ensure_parent_dirs_exist(File), - error_logger_file_h:init(File). + rabbit_file:ensure_parent_dirs_exist(File), + init_file(File, []). + +init_file(File, {error_logger, Buf}) -> + case init_file(File, error_logger) of + {ok, {Fd, File, PrevHandler}} -> + [handle_event(Event, {Fd, File, PrevHandler}) || + {_, Event} <- lists:reverse(Buf)], + {ok, {Fd, File, PrevHandler}}; + Error -> + Error + end; +init_file(File, PrevHandler) -> + process_flag(trap_exit, true), + case file:open(File, [append]) of + {ok,Fd} -> {ok, {Fd, File, PrevHandler}}; + Error -> Error + end. handle_event(Event, State) -> error_logger_file_h:handle_event(Event, State). diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl new file mode 100644 index 00000000..5cb8e7b6 --- /dev/null +++ b/src/rabbit_file.erl @@ -0,0 +1,282 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_file). + +-include_lib("kernel/include/file.hrl"). + +-export([is_file/1, is_dir/1, file_size/1, ensure_dir/1, wildcard/2, list_dir/1]). +-export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]). +-export([append_file/2, ensure_parent_dirs_exist/1]). +-export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]). +-export([lock_file/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(ok_or_error() :: rabbit_types:ok_or_error(any())). + +-spec(is_file/1 :: ((file:filename())) -> boolean()). +-spec(is_dir/1 :: ((file:filename())) -> boolean()). +-spec(file_size/1 :: ((file:filename())) -> non_neg_integer()). +-spec(ensure_dir/1 :: ((file:filename())) -> ok_or_error()). +-spec(wildcard/2 :: (string(), file:filename()) -> [file:filename()]). +-spec(list_dir/1 :: (file:filename()) -> rabbit_types:ok_or_error2( + [file:filename()], any())). +-spec(read_term_file/1 :: + (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())). +-spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()). +-spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()). +-spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()). +-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). +-spec(rename/2 :: + (file:filename(), file:filename()) -> ok_or_error()). +-spec(delete/1 :: ([file:filename()]) -> ok_or_error()). +-spec(recursive_delete/1 :: + ([file:filename()]) + -> rabbit_types:ok_or_error({file:filename(), any()})). +-spec(recursive_copy/2 :: + (file:filename(), file:filename()) + -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})). +-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). + +-endif. + +%%---------------------------------------------------------------------------- + +is_file(File) -> + case read_file_info(File) of + {ok, #file_info{type=regular}} -> true; + {ok, #file_info{type=directory}} -> true; + _ -> false + end. + +is_dir(Dir) -> is_dir_internal(read_file_info(Dir)). + +is_dir_no_handle(Dir) -> is_dir_internal(prim_file:read_file_info(Dir)). + +is_dir_internal({ok, #file_info{type=directory}}) -> true; +is_dir_internal(_) -> false. + +file_size(File) -> + case read_file_info(File) of + {ok, #file_info{size=Size}} -> Size; + _ -> 0 + end. + +ensure_dir(File) -> with_fhc_handle(fun () -> ensure_dir_internal(File) end). + +ensure_dir_internal("/") -> + ok; +ensure_dir_internal(File) -> + Dir = filename:dirname(File), + case is_dir_no_handle(Dir) of + true -> ok; + false -> ensure_dir_internal(Dir), + prim_file:make_dir(Dir) + end. + +wildcard(Pattern, Dir) -> + {ok, Files} = list_dir(Dir), + {ok, RE} = re:compile(Pattern, [anchored]), + [File || File <- Files, match =:= re:run(File, RE, [{capture, none}])]. + +list_dir(Dir) -> with_fhc_handle(fun () -> prim_file:list_dir(Dir) end). + +read_file_info(File) -> + with_fhc_handle(fun () -> prim_file:read_file_info(File) end). + +with_fhc_handle(Fun) -> + ok = file_handle_cache:obtain(), + try Fun() + after ok = file_handle_cache:release() + end. + +read_term_file(File) -> + try + {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end), + {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)), + TokenGroups = group_tokens(Tokens), + {ok, [begin + {ok, Term} = erl_parse:parse_term(Tokens1), + Term + end || Tokens1 <- TokenGroups]} + catch + error:{badmatch, Error} -> Error + end. + +group_tokens(Ts) -> [lists:reverse(G) || G <- group_tokens([], Ts)]. + +group_tokens([], []) -> []; +group_tokens(Cur, []) -> [Cur]; +group_tokens(Cur, [T = {dot, _} | Ts]) -> [[T | Cur] | group_tokens([], Ts)]; +group_tokens(Cur, [T | Ts]) -> group_tokens([T | Cur], Ts). + +write_term_file(File, Terms) -> + write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || + Term <- Terms])). + +write_file(Path, Data) -> write_file(Path, Data, []). + +%% write_file/3 and make_binary/1 are both based on corresponding +%% functions in the kernel/file.erl module of the Erlang R14B02 +%% release, which is licensed under the EPL. That implementation of +%% write_file/3 does not do an fsync prior to closing the file, hence +%% the existence of this version. APIs are otherwise identical. +write_file(Path, Data, Modes) -> + Modes1 = [binary, write | (Modes -- [binary, write])], + case make_binary(Data) of + Bin when is_binary(Bin) -> + with_fhc_handle( + fun () -> case prim_file:open(Path, Modes1) of + {ok, Hdl} -> try prim_file:write(Hdl, Bin) of + ok -> prim_file:sync(Hdl); + {error, _} = E -> E + after + prim_file:close(Hdl) + end; + {error, _} = E -> E + end + end); + {error, _} = E -> E + end. + +make_binary(Bin) when is_binary(Bin) -> + Bin; +make_binary(List) -> + try + iolist_to_binary(List) + catch error:Reason -> + {error, Reason} + end. + + +append_file(File, Suffix) -> + case read_file_info(File) of + {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); + {error, enoent} -> append_file(File, 0, Suffix); + Error -> Error + end. + +append_file(_, _, "") -> + ok; +append_file(File, 0, Suffix) -> + with_fhc_handle(fun () -> + case prim_file:open([File, Suffix], [append]) of + {ok, Fd} -> prim_file:close(Fd); + Error -> Error + end + end); +append_file(File, _, Suffix) -> + case with_fhc_handle(fun () -> prim_file:read_file(File) end) of + {ok, Data} -> write_file([File, Suffix], Data, [append]); + Error -> Error + end. + +ensure_parent_dirs_exist(Filename) -> + case ensure_dir(Filename) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_create_parent_dirs, Filename, Reason}}) + end. + +rename(Old, New) -> with_fhc_handle(fun () -> prim_file:rename(Old, New) end). + +delete(File) -> with_fhc_handle(fun () -> prim_file:delete(File) end). + +recursive_delete(Files) -> + with_fhc_handle( + fun () -> lists:foldl(fun (Path, ok) -> recursive_delete1(Path); + (_Path, {error, _Err} = Error) -> Error + end, ok, Files) + end). + +recursive_delete1(Path) -> + case is_dir_no_handle(Path) and not(is_symlink_no_handle(Path)) of + false -> case prim_file:delete(Path) of + ok -> ok; + {error, enoent} -> ok; %% Path doesn't exist anyway + {error, Err} -> {error, {Path, Err}} + end; + true -> case prim_file:list_dir(Path) of + {ok, FileNames} -> + case lists:foldl( + fun (FileName, ok) -> + recursive_delete1( + filename:join(Path, FileName)); + (_FileName, Error) -> + Error + end, ok, FileNames) of + ok -> + case prim_file:del_dir(Path) of + ok -> ok; + {error, Err} -> {error, {Path, Err}} + end; + {error, _Err} = Error -> + Error + end; + {error, Err} -> + {error, {Path, Err}} + end + end. + +is_symlink_no_handle(File) -> + case prim_file:read_link(File) of + {ok, _} -> true; + _ -> false + end. + +recursive_copy(Src, Dest) -> + %% Note that this uses the 'file' module and, hence, shouldn't be + %% run on many processes at once. + case is_dir(Src) of + false -> case file:copy(Src, Dest) of + {ok, _Bytes} -> ok; + {error, enoent} -> ok; %% Path doesn't exist anyway + {error, Err} -> {error, {Src, Dest, Err}} + end; + true -> case file:list_dir(Src) of + {ok, FileNames} -> + case file:make_dir(Dest) of + ok -> + lists:foldl( + fun (FileName, ok) -> + recursive_copy( + filename:join(Src, FileName), + filename:join(Dest, FileName)); + (_FileName, Error) -> + Error + end, ok, FileNames); + {error, Err} -> + {error, {Src, Dest, Err}} + end; + {error, Err} -> + {error, {Src, Dest, Err}} + end + end. + +%% TODO: When we stop supporting Erlang prior to R14, this should be +%% replaced with file:open [write, exclusive] +lock_file(Path) -> + case is_file(Path) of + true -> {error, eexist}; + false -> with_fhc_handle( + fun () -> {ok, Lock} = prim_file:open(Path, [write]), + ok = prim_file:close(Lock) + end) + end. diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 234bc55b..cf3fea1a 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -52,13 +52,13 @@ start_link() -> update_disk_serial() -> Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), - Serial = case rabbit_misc:read_term_file(Filename) of + Serial = case rabbit_file:read_term_file(Filename) of {ok, [Num]} -> Num; {error, enoent} -> 0; {error, Reason} -> throw({error, {cannot_read_serial_file, Filename, Reason}}) end, - case rabbit_misc:write_term_file(Filename, [Serial + 1]) of + case rabbit_file:write_term_file(Filename, [Serial + 1]) of ok -> ok; {error, Reason1} -> throw({error, {cannot_write_serial_file, Filename, Reason1}}) diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 8207d6bc..558e0957 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -42,6 +42,8 @@ -spec(error/1 :: (string()) -> 'ok'). -spec(error/2 :: (string(), [any()]) -> 'ok'). +-spec(message/4 :: (_,_,_,_) -> 'ok'). + -endif. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index ad5fd28f..5fc6341f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -59,6 +59,10 @@ known_senders :: set() }). +-type(ack() :: non_neg_integer()). +-type(state() :: master_state()). +-include("rabbit_backing_queue_spec.hrl"). + -spec(promote_backing_queue_state/6 :: (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index cf8e9484..baebc52b 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -22,6 +22,26 @@ -include("rabbit.hrl"). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(remove_from_queue/2 :: + (rabbit_amqqueue:name(), [pid()]) + -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). +-spec(on_node_up/0 :: () -> 'ok'). +-spec(drop_mirror/2 :: + (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). +-spec(add_mirror/2 :: + (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). +-spec(add_mirror/3 :: + (rabbit_types:vhost(), binary(), atom()) + -> rabbit_types:ok_or_error(any())). + +-endif. + +%%---------------------------------------------------------------------------- + %% If the dead pids include the queue pid (i.e. the master has died) %% then only remove that if we are about to be promoted. Otherwise we %% can have the situation where a slave updates the mnesia record for diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3c453981..43962491 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -45,8 +45,19 @@ -behaviour(gm). -include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + -include("gm_specs.hrl"). +-ifdef(use_specs). +%% Shut dialyzer up +-spec(promote_me/2 :: (_, _) -> no_return()). +-endif. + +%%---------------------------------------------------------------------------- + + -define(CREATION_EVENT_KEYS, [pid, name, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ae28722a..f2dc97fd 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -18,8 +18,6 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --include_lib("kernel/include/file.hrl"). - -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, protocol_error/3, protocol_error/4, protocol_error/1]). @@ -40,19 +38,16 @@ -export([upmap/2, map_in_order/2]). -export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). --export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]). --export([append_file/2, ensure_parent_dirs_exist/1]). --export([format_stderr/2, with_local_io/1]). +-export([format_stderr/2, with_local_io/1, local_info_msg/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3]). +-export([dict_cons/3, orddict_cons/3]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). --export([lock_file/1]). -export([const_ok/0, const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). @@ -158,15 +153,9 @@ -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). -spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()). --spec(read_term_file/1 :: - (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())). --spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()). --spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()). --spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()). --spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()). --spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(with_local_io/1 :: (fun (() -> A)) -> A). +-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). @@ -180,12 +169,6 @@ -spec(version_compare/3 :: (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()). --spec(recursive_delete/1 :: - ([file:filename()]) - -> rabbit_types:ok_or_error({file:filename(), any()})). --spec(recursive_copy/2 :: - (file:filename(), file:filename()) - -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})). -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(get_options/2 :: ([optdef()], [string()]) @@ -199,7 +182,6 @@ {bad_edge, [digraph:vertex()]}), digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). --spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). -spec(const_ok/0 :: () -> 'ok'). -spec(const/1 :: (A) -> thunk(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). @@ -525,74 +507,6 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]), dirty_dump_log1(LH, disk_log:chunk(LH, K)). - -read_term_file(File) -> file:consult(File). - -write_term_file(File, Terms) -> - write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || - Term <- Terms])). - -write_file(Path, Data) -> - write_file(Path, Data, []). - -%% write_file/3 and make_binary/1 are both based on corresponding -%% functions in the kernel/file.erl module of the Erlang R14B02 -%% release, which is licensed under the EPL. That implementation of -%% write_file/3 does not do an fsync prior to closing the file, hence -%% the existence of this version. APIs are otherwise identical. -write_file(Path, Data, Modes) -> - Modes1 = [binary, write | (Modes -- [binary, write])], - case make_binary(Data) of - Bin when is_binary(Bin) -> - case file:open(Path, Modes1) of - {ok, Hdl} -> try file:write(Hdl, Bin) of - ok -> file:sync(Hdl); - {error, _} = E -> E - after - file:close(Hdl) - end; - {error, _} = E -> E - end; - {error, _} = E -> E - end. - -make_binary(Bin) when is_binary(Bin) -> - Bin; -make_binary(List) -> - try - iolist_to_binary(List) - catch error:Reason -> - {error, Reason} - end. - - -append_file(File, Suffix) -> - case file:read_file_info(File) of - {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); - {error, enoent} -> append_file(File, 0, Suffix); - Error -> Error - end. - -append_file(_, _, "") -> - ok; -append_file(File, 0, Suffix) -> - case file:open([File, Suffix], [append]) of - {ok, Fd} -> file:close(Fd); - Error -> Error - end; -append_file(File, _, Suffix) -> - case file:read_file(File) of - {ok, Data} -> write_file([File, Suffix], Data, [append]); - Error -> Error - end. - -ensure_parent_dirs_exist(Filename) -> - case filelib:ensure_dir(Filename) of - ok -> ok; - {error, Reason} -> - throw({error, {cannot_create_parent_dirs, Filename, Reason}}) - end. - format_stderr(Fmt, Args) -> case os:type() of {unix, _} -> @@ -619,6 +533,12 @@ with_local_io(Fun) -> group_leader(GL, self()) end. +%% Log an info message on the local node using the standard logger. +%% Use this if rabbit isn't running and the call didn't originate on +%% the local node (e.g. rabbitmqctl calls). +local_info_msg(Format, Args) -> + with_local_io(fun () -> error_logger:info_msg(Format, Args) end). + manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> Iterate(fun (App, Acc) -> case Do(App) of @@ -743,67 +663,6 @@ version_compare(A, B) -> dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A). -recursive_delete(Files) -> - lists:foldl(fun (Path, ok ) -> recursive_delete1(Path); - (_Path, {error, _Err} = Error) -> Error - end, ok, Files). - -recursive_delete1(Path) -> - case filelib:is_dir(Path) of - false -> case file:delete(Path) of - ok -> ok; - {error, enoent} -> ok; %% Path doesn't exist anyway - {error, Err} -> {error, {Path, Err}} - end; - true -> case file:list_dir(Path) of - {ok, FileNames} -> - case lists:foldl( - fun (FileName, ok) -> - recursive_delete1( - filename:join(Path, FileName)); - (_FileName, Error) -> - Error - end, ok, FileNames) of - ok -> - case file:del_dir(Path) of - ok -> ok; - {error, Err} -> {error, {Path, Err}} - end; - {error, _Err} = Error -> - Error - end; - {error, Err} -> - {error, {Path, Err}} - end - end. - -recursive_copy(Src, Dest) -> - case filelib:is_dir(Src) of - false -> case file:copy(Src, Dest) of - {ok, _Bytes} -> ok; - {error, enoent} -> ok; %% Path doesn't exist anyway - {error, Err} -> {error, {Src, Dest, Err}} - end; - true -> case file:list_dir(Src) of - {ok, FileNames} -> - case file:make_dir(Dest) of - ok -> - lists:foldl( - fun (FileName, ok) -> - recursive_copy( - filename:join(Src, FileName), - filename:join(Dest, FileName)); - (_FileName, Error) -> - Error - end, ok, FileNames); - {error, Err} -> - {error, {Src, Dest, Err}} - end; - {error, Err} -> - {error, {Src, Dest, Err}} - end - end. - dict_cons(Key, Value, Dict) -> dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). @@ -893,15 +752,6 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> {error, Reason} end. -%% TODO: When we stop supporting Erlang prior to R14, this should be -%% replaced with file:open [write, exclusive] -lock_file(Path) -> - case filelib:is_file(Path) of - true -> {error, eexist}; - false -> {ok, Lock} = file:open(Path, [write]), - ok = file:close(Lock) - end. - const_ok() -> ok. const(X) -> fun () -> X end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c63c67f4..c8c18843 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -70,6 +70,8 @@ -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). +-spec(table_names/0 :: () -> [atom()]). + -endif. %%---------------------------------------------------------------------------- @@ -119,11 +121,17 @@ force_cluster(ClusterNodes) -> %% node. If Force is false, only connections to online nodes are %% allowed. cluster(ClusterNodes, Force) -> + rabbit_misc:local_info_msg("Clustering with ~p~s~n", + [ClusterNodes, if Force -> " forcefully"; + true -> "" + end]), ensure_mnesia_not_running(), ensure_mnesia_dir(), - case not Force andalso is_only_disc_node(node(), false) andalso - not should_be_disc_node(ClusterNodes) of + case not Force andalso is_clustered() andalso + is_only_disc_node(node(), false) andalso + not should_be_disc_node(ClusterNodes) + of true -> log_both("last running disc node leaving cluster"); _ -> ok end, @@ -430,7 +438,7 @@ cluster_nodes_config_filename() -> create_cluster_nodes_config(ClusterNodes) -> FileName = cluster_nodes_config_filename(), - case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of + case rabbit_file:write_term_file(FileName, [ClusterNodes]) of ok -> ok; {error, Reason} -> throw({error, {cannot_create_cluster_nodes_config, @@ -439,7 +447,7 @@ create_cluster_nodes_config(ClusterNodes) -> read_cluster_nodes_config() -> FileName = cluster_nodes_config_filename(), - case rabbit_misc:read_term_file(FileName) of + case rabbit_file:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes), @@ -467,12 +475,12 @@ record_running_nodes() -> Nodes = running_clustered_nodes() -- [node()], %% Don't check the result: we're shutting down anyway and this is %% a best-effort-basis. - rabbit_misc:write_term_file(FileName, [Nodes]), + rabbit_file:write_term_file(FileName, [Nodes]), ok. read_previously_running_nodes() -> FileName = running_nodes_filename(), - case rabbit_misc:read_term_file(FileName) of + case rabbit_file:read_term_file(FileName) of {ok, [Nodes]} -> Nodes; {error, enoent} -> []; {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, @@ -634,7 +642,7 @@ move_db() -> copy_db(Destination) -> ok = ensure_mnesia_not_running(), - rabbit_misc:recursive_copy(dir(), Destination). + rabbit_file:recursive_copy(dir(), Destination). create_tables() -> create_tables(disc). @@ -714,8 +722,13 @@ wait_for_tables(TableNames) -> end. reset(Force) -> + rabbit_misc:local_info_msg("Resetting Rabbit~s~n", [if Force -> " forcefully"; + true -> "" + end]), ensure_mnesia_not_running(), - case not Force andalso is_only_disc_node(node(), false) of + case not Force andalso is_clustered() andalso + is_only_disc_node(node(), false) + of true -> log_both("no other disc nodes running"); false -> ok end, @@ -739,7 +752,7 @@ reset(Force) -> end, ok = delete_cluster_nodes_config(), %% remove persisted messages and any other garbage we find - ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")), + ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok. leave_cluster([], _) -> ok; @@ -772,19 +785,13 @@ wait_for(Condition) -> on_node_up(Node) -> case is_only_disc_node(Node, true) of - true -> rabbit_misc:with_local_io( - fun () -> rabbit_log:info("cluster contains disc " - "nodes again~n") - end); + true -> rabbit_log:info("cluster contains disc nodes again~n"); false -> ok end. on_node_down(Node) -> case is_only_disc_node(Node, true) of - true -> rabbit_misc:with_local_io( - fun () -> rabbit_log:info("only running disc node " - "went down~n") - end); + true -> rabbit_log:info("only running disc node went down~n"); false -> ok end. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 17d5f64b..fc3cbebd 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -146,6 +146,8 @@ -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_ref/1 :: (client_msstate()) -> client_ref()). +-spec(close_all_indicated/1 :: + (client_msstate()) -> rabbit_types:ok(client_msstate())). -spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). -spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). @@ -587,7 +589,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> AttemptFileSummaryRecovery = case ClientRefs of - undefined -> ok = rabbit_misc:recursive_delete([Dir]), + undefined -> ok = rabbit_file:recursive_delete([Dir]), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), false; _ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), @@ -1338,11 +1340,11 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> end. store_recovery_terms(Terms, Dir) -> - rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). + rabbit_file:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). read_recovery_terms(Dir) -> Path = filename:join(Dir, ?CLEAN_FILENAME), - case rabbit_misc:read_term_file(Path) of + case rabbit_file:read_term_file(Path) of {ok, Terms} -> case file:delete(Path) of ok -> {true, Terms}; {error, Error} -> {false, Error} @@ -1899,7 +1901,7 @@ transform_dir(BaseDir, Store, TransformFun) -> end. transform_msg_file(FileOld, FileNew, TransformFun) -> - ok = rabbit_misc:ensure_parent_dirs_exist(FileNew), + ok = rabbit_file:ensure_parent_dirs_exist(FileNew), {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []), {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write], [{write_buffer, diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index b2abcba6..2c0912df 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -78,6 +78,33 @@ -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(check_tcp_listener_address/2 :: (atom(), listener_config()) -> [{inet:ip_address(), ip_port(), family(), atom()}]). +-spec(ensure_ssl/0 :: () -> rabbit_types:infos()). +-spec(ssl_transform_fun/1 :: + (rabbit_types:infos()) + -> fun ((rabbit_net:socket()) + -> rabbit_types:ok_or_error(#ssl_socket{}))). + +-spec(boot/0 :: () -> 'ok'). +-spec(start_client/1 :: + (port() | #ssl_socket{ssl::{'sslsocket',_,_}}) -> + atom() | pid() | port() | {atom(),atom()}). +-spec(start_ssl_client/2 :: + (_,port() | #ssl_socket{ssl::{'sslsocket',_,_}}) -> + atom() | pid() | port() | {atom(),atom()}). +-spec(tcp_listener_started/3 :: + (_, + string() | + {byte(),byte(),byte(),byte()} | + {char(),char(),char(),char(),char(),char(),char(),char()}, + _) -> + 'ok'). +-spec(tcp_listener_stopped/3 :: + (_, + string() | + {byte(),byte(),byte(),byte()} | + {char(),char(),char(),char(),char(),char(),char(),char()}, + _) -> + 'ok'). -endif. @@ -293,6 +320,7 @@ connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end). connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end). close_connection(Pid, Explanation) -> + rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]), case lists:member(Pid, connections()) of true -> rabbit_reader:shutdown(Pid, Explanation); false -> throw({error, {not_a_connection_pid, Pid}}) diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index cb4f826d..8aa24ab5 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -31,6 +31,7 @@ -ifdef(use_specs). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(rabbit_running_on/1 :: (node()) -> 'ok'). -spec(notify_cluster/0 :: () -> 'ok'). diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 92829e49..cd0c322b 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -29,6 +29,9 @@ -spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). +%% Shut dialyzer up +-spec(terminate/1 :: (string()) -> no_return()). +-spec(terminate/2 :: (string(), [any()]) -> no_return()). -endif. @@ -67,7 +70,7 @@ start() -> AppVersions}, %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel - rabbit_misc:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), + rabbit_file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), %% We exclude mochiweb due to its optional use of fdsrv. XRefExclude = [mochiweb], @@ -136,38 +139,10 @@ determine_version(App) -> {App, Vsn}. delete_recursively(Fn) -> - case filelib:is_dir(Fn) and not(is_symlink(Fn)) of - true -> - case file:list_dir(Fn) of - {ok, Files} -> - case lists:foldl(fun ( Fn1, ok) -> delete_recursively( - Fn ++ "/" ++ Fn1); - (_Fn1, Err) -> Err - end, ok, Files) of - ok -> case file:del_dir(Fn) of - ok -> ok; - {error, E} -> {error, - {cannot_delete, Fn, E}} - end; - Err -> Err - end; - {error, E} -> - {error, {cannot_list_files, Fn, E}} - end; - false -> - case filelib:is_file(Fn) of - true -> case file:delete(Fn) of - ok -> ok; - {error, E} -> {error, {cannot_delete, Fn, E}} - end; - false -> ok - end - end. - -is_symlink(Name) -> - case file:read_link(Name) of - {ok, _} -> true; - _ -> false + case rabbit_file:recursive_delete([Fn]) of + ok -> ok; + {error, {Path, E}} -> {error, {cannot_delete, Path, E}}; + Error -> Error end. unpack_ez_plugins(SrcDir, DestDir) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 636913b5..f1751e95 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -229,7 +229,7 @@ init(Name, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), - false = filelib:is_file(Dir), %% is_file == is file or dir + false = rabbit_file:is_file(Dir), %% is_file == is file or dir State #qistate { on_sync = OnSyncFun }. shutdown_terms(Name) -> @@ -256,7 +256,7 @@ terminate(Terms, State) -> delete_and_terminate(State) -> {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), - ok = rabbit_misc:recursive_delete([Dir]), + ok = rabbit_file:recursive_delete([Dir]), State1. publish(MsgId, SeqId, MsgProps, IsPersistent, @@ -359,16 +359,16 @@ recover(DurableQueues) -> {[dict:fetch(QueueDirName, DurableDict) | DurableAcc], TermsAcc1}; false -> - ok = rabbit_misc:recursive_delete([QueueDirPath]), + ok = rabbit_file:recursive_delete([QueueDirPath]), {DurableAcc, TermsAcc} end end, {[], []}, QueueDirNames), {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. all_queue_directory_names(Dir) -> - case file:list_dir(Dir) of + case rabbit_file:list_dir(Dir) of {ok, Entries} -> [ Entry || Entry <- Entries, - filelib:is_dir( + rabbit_file:is_dir( filename:join(Dir, Entry)) ]; {error, enoent} -> [] end. @@ -392,18 +392,18 @@ blank_state(QueueName) -> clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). detect_clean_shutdown(Dir) -> - case file:delete(clean_file_name(Dir)) of + case rabbit_file:delete(clean_file_name(Dir)) of ok -> true; {error, enoent} -> false end. read_shutdown_terms(Dir) -> - rabbit_misc:read_term_file(clean_file_name(Dir)). + rabbit_file:read_term_file(clean_file_name(Dir)). store_clean_shutdown(Terms, Dir) -> CleanFileName = clean_file_name(Dir), - ok = filelib:ensure_dir(CleanFileName), - rabbit_misc:write_term_file(CleanFileName, Terms). + ok = rabbit_file:ensure_dir(CleanFileName), + rabbit_file:write_term_file(CleanFileName, Terms). init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) @@ -603,8 +603,8 @@ flush_journal(State = #qistate { segments = Segments }) -> Segments1 = segment_fold( fun (#segment { unacked = 0, path = Path }, SegmentsN) -> - case filelib:is_file(Path) of - true -> ok = file:delete(Path); + case rabbit_file:is_file(Path) of + true -> ok = rabbit_file:delete(Path); false -> ok end, SegmentsN; @@ -630,7 +630,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries, get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), - ok = filelib:ensure_dir(Path), + ok = rabbit_file:ensure_dir(Path), {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; @@ -735,7 +735,7 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end, SegName)), Set) end, sets:from_list(segment_nums(Segments)), - filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))). + rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)))). segment_find_or_new(Seg, Dir, Segments) -> case segment_find(Seg, Segments) of @@ -836,7 +836,7 @@ segment_entries_foldr(Fun, Init, %% %% Does not do any combining with the journal at all. load_segment(KeepAcked, #segment { path = Path }) -> - case filelib:is_file(Path) of + case rabbit_file:is_file(Path) of false -> {array_new(), 0}; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), @@ -1040,12 +1040,12 @@ foreach_queue_index(Funs) -> transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun), [ok = transform_file(filename:join(Dir, Seg), SegmentFun) - || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)], + || Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)], ok = gatherer:finish(Gatherer). transform_file(Path, Fun) -> PathTmp = Path ++ ".upgrade", - case filelib:file_size(Path) of + case rabbit_file:file_size(Path) of 0 -> ok; Size -> {ok, PathTmpHdl} = file_handle_cache:open(PathTmp, ?WRITE_MODE, @@ -1059,7 +1059,7 @@ transform_file(Path, Fun) -> ok = drive_transform_fun(Fun, PathTmpHdl, Content), ok = file_handle_cache:close(PathTmpHdl), - ok = file:rename(PathTmp, Path) + ok = rabbit_file:rename(PathTmp, Path) end. drive_transform_fun(Fun, Hdl, Contents) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7eec2a2e..b4871cef 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -85,6 +85,15 @@ rabbit_types:ok_or_error2( rabbit_net:socket(), any()))) -> no_return()). +-spec(mainloop/2 :: (_,#v1{}) -> any()). +-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). +-spec(system_continue/3 :: (_,_,#v1{}) -> any()). +-spec(system_terminate/4 :: (_,_,_,_) -> none()). + +-spec(process_channel_frame/5 :: + (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(), + tuple()) -> tuple()). + -endif. %%-------------------------------------------------------------------------- @@ -493,20 +502,7 @@ handle_frame(Type, Channel, Payload, AnalyzedFrame, self(), Channel, ChPid, FramingState), put({channel, Channel}, {ChPid, NewAState}), - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - channel_cleanup(ChPid), - State; - {method, MethodName, _} -> - case (State#v1.connection_state =:= blocking - andalso - Protocol:method_has_content(MethodName)) of - true -> State#v1{connection_state = blocked}; - false -> State - end; - _ -> - State - end; + post_process_frame(AnalyzedFrame, ChPid, State); undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -518,6 +514,23 @@ handle_frame(Type, Channel, Payload, end end. +post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> + channel_cleanup(ChPid), + State; +post_process_frame({method, MethodName, _}, _ChPid, + State = #v1{connection = #connection{ + protocol = Protocol}}) -> + case Protocol:method_has_content(MethodName) of + true -> erlang:bump_reductions(2000), + case State#v1.connection_state of + blocking -> State#v1{connection_state = blocked}; + _ -> State + end; + false -> State + end; +post_process_frame(_Frame, _ChPid, State) -> + State. + handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl index 0491244b..cda3ccbe 100644 --- a/src/rabbit_restartable_sup.erl +++ b/src/rabbit_restartable_sup.erl @@ -24,6 +24,16 @@ -include("rabbit.hrl"). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). + +-endif. + +%%---------------------------------------------------------------------------- + start_link(Name, {_M, _F, _A} = Fun) -> supervisor:start_link({local, Name}, ?MODULE, [Fun]). diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 6f3c5c75..963294d9 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -26,12 +26,17 @@ %% with the result of closing the old handler when swapping handlers. %% The first init/1 additionally allows for simple log rotation %% when the suffix is not the empty string. +%% The original init/1 also opened the file in 'write' mode, thus +%% overwriting old logs. To remedy this, init/1 from +%% lib/sasl/src/sasl_report_file_h.erl from R14B3 was copied as +%% init_file/1 and changed so that it opens the file in 'append' mode. %% Used only when swapping handlers and performing %% log rotation init({{File, Suffix}, []}) -> - case rabbit_misc:append_file(File, Suffix) of - ok -> ok; + case rabbit_file:append_file(File, Suffix) of + ok -> file:delete(File), + ok; {error, Error} -> rabbit_log:error("Failed to append contents of " "sasl log file '~s' to '~s':~n~p~n", @@ -47,11 +52,18 @@ init({{File, _}, error}) -> init({File, []}) -> init(File); init({File, _Type} = FileInfo) -> - rabbit_misc:ensure_parent_dirs_exist(File), - sasl_report_file_h:init(FileInfo); + rabbit_file:ensure_parent_dirs_exist(File), + init_file(FileInfo); init(File) -> - rabbit_misc:ensure_parent_dirs_exist(File), - sasl_report_file_h:init({File, sasl_error_logger_type()}). + rabbit_file:ensure_parent_dirs_exist(File), + init_file({File, sasl_error_logger_type()}). + +init_file({File, Type}) -> + process_flag(trap_exit, true), + case file:open(File, [append]) of + {ok,Fd} -> {ok, {Fd, File, Type}}; + Error -> Error + end. handle_event(Event, State) -> sasl_report_file_h:handle_event(Event, State). diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 508b127e..802ea5e2 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -27,6 +27,21 @@ -define(SERVER, ?MODULE). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_child/1 :: (atom()) -> 'ok'). +-spec(start_child/3 :: (atom(), atom(), [any()]) -> 'ok'). +-spec(start_restartable_child/1 :: (atom()) -> 'ok'). +-spec(start_restartable_child/2 :: (atom(), [any()]) -> 'ok'). +-spec(stop_child/1 :: (atom()) -> rabbit_types:ok_or_error(any())). + +-endif. + +%%---------------------------------------------------------------------------- + start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index cd5d9be0..44c13376 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -757,13 +757,23 @@ test_topic_expect_match(X, List) -> end, List). test_app_management() -> - %% starting, stopping, status + control_action(wait, [rabbit_mnesia:dir() ++ ".pid"]), + %% Starting, stopping and diagnostics. Note that we don't try + %% 'report' when the rabbit app is stopped and that we enable + %% tracing for the duration of this function. + ok = control_action(trace_on, []), ok = control_action(stop_app, []), ok = control_action(stop_app, []), ok = control_action(status, []), + ok = control_action(cluster_status, []), + ok = control_action(environment, []), ok = control_action(start_app, []), ok = control_action(start_app, []), ok = control_action(status, []), + ok = control_action(report, []), + ok = control_action(cluster_status, []), + ok = control_action(environment, []), + ok = control_action(trace_off, []), passed. test_log_management() -> @@ -795,23 +805,11 @@ test_log_management() -> ok = control_action(rotate_logs, []), ok = test_logs_working(MainLog, SaslLog), - %% log rotation on empty file + %% log rotation on empty files (the main log will have a ctl action logged) ok = clean_logs([MainLog, SaslLog], Suffix), ok = control_action(rotate_logs, []), ok = control_action(rotate_logs, [Suffix]), - [true, true] = empty_files([[MainLog, Suffix], [SaslLog, Suffix]]), - - %% original main log file is not writable - ok = make_files_non_writable([MainLog]), - {error, {cannot_rotate_main_logs, _}} = control_action(rotate_logs, []), - ok = clean_logs([MainLog], Suffix), - ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}]), - - %% original sasl log file is not writable - ok = make_files_non_writable([SaslLog]), - {error, {cannot_rotate_sasl_logs, _}} = control_action(rotate_logs, []), - ok = clean_logs([SaslLog], Suffix), - ok = add_log_handlers([{rabbit_sasl_report_file_h, SaslLog}]), + [false, true] = empty_files([[MainLog, Suffix], [SaslLog, Suffix]]), %% logs with suffix are not writable ok = control_action(rotate_logs, [Suffix]), @@ -819,27 +817,28 @@ test_log_management() -> ok = control_action(rotate_logs, [Suffix]), ok = test_logs_working(MainLog, SaslLog), - %% original log files are not writable + %% rotate when original log files are not writable ok = make_files_non_writable([MainLog, SaslLog]), - {error, {{cannot_rotate_main_logs, _}, - {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []), + ok = control_action(rotate_logs, []), - %% logging directed to tty (handlers were removed in last test) + %% logging directed to tty (first, remove handlers) + ok = delete_log_handlers([rabbit_sasl_report_file_h, + rabbit_error_logger_file_h]), ok = clean_logs([MainLog, SaslLog], Suffix), - ok = application:set_env(sasl, sasl_error_logger, tty), - ok = application:set_env(kernel, error_logger, tty), + ok = application:set_env(rabbit, sasl_error_logger, tty), + ok = application:set_env(rabbit, error_logger, tty), ok = control_action(rotate_logs, []), [{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]), %% rotate logs when logging is turned off - ok = application:set_env(sasl, sasl_error_logger, false), - ok = application:set_env(kernel, error_logger, silent), + ok = application:set_env(rabbit, sasl_error_logger, false), + ok = application:set_env(rabbit, error_logger, silent), ok = control_action(rotate_logs, []), [{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]), %% cleanup - ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}), - ok = application:set_env(kernel, error_logger, {file, MainLog}), + ok = application:set_env(rabbit, sasl_error_logger, {file, SaslLog}), + ok = application:set_env(rabbit, error_logger, {file, MainLog}), ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}, {rabbit_sasl_report_file_h, SaslLog}]), passed. @@ -850,8 +849,8 @@ test_log_management_during_startup() -> %% start application with simple tty logging ok = control_action(stop_app, []), - ok = application:set_env(kernel, error_logger, tty), - ok = application:set_env(sasl, sasl_error_logger, tty), + ok = application:set_env(rabbit, error_logger, tty), + ok = application:set_env(rabbit, sasl_error_logger, tty), ok = add_log_handlers([{error_logger_tty_h, []}, {sasl_report_tty_h, []}]), ok = control_action(start_app, []), @@ -868,13 +867,12 @@ test_log_management_during_startup() -> end, %% fix sasl logging - ok = application:set_env(sasl, sasl_error_logger, - {file, SaslLog}), + ok = application:set_env(rabbit, sasl_error_logger, {file, SaslLog}), %% start application with logging to non-existing directory TmpLog = "/tmp/rabbit-tests/test.log", delete_file(TmpLog), - ok = application:set_env(kernel, error_logger, {file, TmpLog}), + ok = application:set_env(rabbit, error_logger, {file, TmpLog}), ok = delete_log_handlers([rabbit_error_logger_file_h]), ok = add_log_handlers([{error_logger_file_h, MainLog}]), @@ -895,7 +893,7 @@ test_log_management_during_startup() -> %% start application with logging to a subdirectory which %% parent directory has no write permissions TmpTestDir = "/tmp/rabbit-tests/no-permission/test/log", - ok = application:set_env(kernel, error_logger, {file, TmpTestDir}), + ok = application:set_env(rabbit, error_logger, {file, TmpTestDir}), ok = add_log_handlers([{error_logger_file_h, MainLog}]), ok = case control_action(start_app, []) of ok -> exit({got_success_but_expected_failure, @@ -910,7 +908,7 @@ test_log_management_during_startup() -> %% start application with standard error_logger_file_h %% handler not installed - ok = application:set_env(kernel, error_logger, {file, MainLog}), + ok = application:set_env(rabbit, error_logger, {file, MainLog}), ok = control_action(start_app, []), ok = control_action(stop_app, []), @@ -1146,6 +1144,7 @@ test_user_management() -> ok = control_action(add_user, ["foo", "bar"]), {error, {user_already_exists, _}} = control_action(add_user, ["foo", "bar"]), + ok = control_action(clear_password, ["foo"]), ok = control_action(change_password, ["foo", "baz"]), TestTags = fun (Tags) -> @@ -1754,7 +1753,11 @@ test_file_handle_cache() -> [filename:join(TmpDir, Str) || Str <- ["file1", "file2", "file3", "file4"]], Content = <<"foo">>, CopyFun = fun (Src, Dst) -> - ok = rabbit_misc:write_file(Src, Content), + {ok, Hdl} = prim_file:open(Src, [binary, write]), + ok = prim_file:write(Hdl, Content), + ok = prim_file:sync(Hdl), + prim_file:close(Hdl), + {ok, SrcHdl} = file_handle_cache:open(Src, [read], []), {ok, DstHdl} = file_handle_cache:open(Dst, [write], []), Size = size(Content), diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index f9632324..58079ccf 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -67,9 +67,11 @@ tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, %%---------------------------------------------------------------------------- start(VHost) -> + rabbit_log:info("Enabling tracing for vhost '~s'~n", [VHost]), update_config(fun (VHosts) -> [VHost | VHosts -- [VHost]] end). stop(VHost) -> + rabbit_log:info("Disabling tracing for vhost '~s'~n", [VHost]), update_config(fun (VHosts) -> VHosts -- [VHost] end). update_config(Fun) -> diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 9739f6b7..717d94a8 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -115,7 +115,7 @@ ensure_backup_removed() -> end. remove_backup() -> - ok = rabbit_misc:recursive_delete([backup_dir()]), + ok = rabbit_file:recursive_delete([backup_dir()]), info("upgrades: Mnesia backup removed~n", []). maybe_upgrade_mnesia() -> @@ -228,13 +228,7 @@ secondary_upgrade(AllNodes) -> ok. nodes_running(Nodes) -> - [N || N <- Nodes, node_running(N)]. - -node_running(Node) -> - case rpc:call(Node, application, which_applications, []) of - {badrpc, _} -> false; - Apps -> lists:keysearch(rabbit, 1, Apps) =/= false - end. + [N || N <- Nodes, rabbit:is_running(N)]. %% ------------------------------------------------------------------- @@ -255,7 +249,7 @@ maybe_upgrade_local() -> %% ------------------------------------------------------------------- apply_upgrades(Scope, Upgrades, Fun) -> - ok = rabbit_misc:lock_file(lock_filename()), + ok = rabbit_file:lock_file(lock_filename()), info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), Fun(), diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index 400abc10..f6bcbb7f 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -49,12 +49,12 @@ %% ------------------------------------------------------------------- -recorded() -> case rabbit_misc:read_term_file(schema_filename()) of +recorded() -> case rabbit_file:read_term_file(schema_filename()) of {ok, [V]} -> {ok, V}; {error, _} = Err -> Err end. -record(V) -> ok = rabbit_misc:write_term_file(schema_filename(), [V]). +record(V) -> ok = rabbit_file:write_term_file(schema_filename(), [V]). recorded_for_scope(Scope) -> case recorded() of diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 08d6c99a..38bb76b0 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -44,6 +44,7 @@ -define(INFO_KEYS, [name, tracing]). add(VHostPath) -> + rabbit_log:info("Adding vhost '~s'~n", [VHostPath]), R = rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_vhost, VHostPath}) of @@ -69,7 +70,6 @@ add(VHostPath) -> {<<"amq.rabbitmq.trace">>, topic}]], ok end), - rabbit_log:info("Added vhost ~p~n", [VHostPath]), R. delete(VHostPath) -> @@ -78,6 +78,7 @@ delete(VHostPath) -> %% process, which in turn results in further mnesia actions and %% eventually the termination of that process. Exchange deletion causes %% notifications which must be sent outside the TX + rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]), [{ok,_} = rabbit_amqqueue:delete(Q, false, false) || Q <- rabbit_amqqueue:list(VHostPath)], [ok = rabbit_exchange:delete(Name, false) || @@ -86,7 +87,6 @@ delete(VHostPath) -> with(VHostPath, fun () -> ok = internal_delete(VHostPath) end)), - rabbit_log:info("Deleted vhost ~p~n", [VHostPath]), R. internal_delete(VHostPath) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index ac3434d2..091b50e4 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -67,6 +67,9 @@ non_neg_integer(), rabbit_types:protocol()) -> 'ok'). +-spec(mainloop/2 :: (_,_) -> 'done'). +-spec(mainloop1/2 :: (_,_) -> any()). + -endif. %%--------------------------------------------------------------------------- diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl index bf0eacd1..4c835598 100644 --- a/src/tcp_acceptor_sup.erl +++ b/src/tcp_acceptor_sup.erl @@ -22,6 +22,14 @@ -export([init/1]). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). +-spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). +-endif. + +%%---------------------------------------------------------------------------- + start_link(Name, Callback) -> supervisor:start_link({local,Name}, ?MODULE, Callback). diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index cd646969..ad2a0d02 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -25,6 +25,14 @@ -record(state, {sock, on_startup, on_shutdown, label}). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). +-spec(start_link/8 :: + (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(), + atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()). +-endif. + %%-------------------------------------------------------------------- start_link(IPAddress, Port, SocketOpts, diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 58c2f30c..5bff5c27 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -22,6 +22,21 @@ -export([init/1]). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/7 :: + (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), + mfa(), string()) -> rabbit_types:ok_pid_or_error()). +-spec(start_link/8 :: + (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), + mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()). + +-endif. + +%%---------------------------------------------------------------------------- + start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, AcceptCallback, Label) -> start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, diff --git a/src/test_sup.erl b/src/test_sup.erl index 84c4121c..5feb146f 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -21,6 +21,18 @@ -export([test_supervisor_delayed_restart/0, init/1, start_child/0]). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(test_supervisor_delayed_restart/0 :: () -> 'passed'). + +-endif. + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + test_supervisor_delayed_restart() -> passed = with_sup(simple_one_for_one_terminate, fun (SupPid) -> diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index fb2fa267..a54bf996 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -57,15 +57,15 @@ -ifdef(use_specs). --spec(start_link/1 :: (float()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()). -spec(update/0 :: () -> 'ok'). -spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). -spec(get_vm_limit/0 :: () -> non_neg_integer()). --spec(get_memory_limit/0 :: () -> non_neg_integer()). -spec(get_check_interval/0 :: () -> non_neg_integer()). -spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). -spec(get_vm_memory_high_watermark/0 :: () -> float()). -spec(set_vm_memory_high_watermark/1 :: (float()) -> 'ok'). +-spec(get_memory_limit/0 :: () -> non_neg_integer()). -endif. diff --git a/src/worker_pool.erl b/src/worker_pool.erl index e4f260cc..456ff39f 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -41,6 +41,7 @@ -spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). -spec(submit_async/1 :: (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). +-spec(idle/1 :: (any()) -> 'ok'). -endif. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl index 28c1adc6..d37c3a0f 100644 --- a/src/worker_pool_sup.erl +++ b/src/worker_pool_sup.erl @@ -26,8 +26,8 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). --spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_link/1 :: (non_neg_integer()) -> rabbit_types:ok_pid_or_error()). -endif. |