summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-02-18 17:29:33 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-02-18 17:29:33 +0000
commit9d5d8bada28a281edd9c2b11241b091aca4357d0 (patch)
tree49d03b4c1a44046c43136efc546c31781a844242
parentb15c279b847aaebe2e43b0af31cd8f69e00e6ab4 (diff)
parenta25d3cb4189ce9b832aeb8397e743f0627fdbe21 (diff)
downloadrabbitmq-server-9d5d8bada28a281edd9c2b11241b091aca4357d0.tar.gz
Merge bug23553 (Present managed socket count in mgmt plugin)
-rw-r--r--.hgignore3
-rw-r--r--Makefile8
-rw-r--r--docs/rabbitmq-env.conf.5.xml (renamed from docs/rabbitmq.conf.5.xml)18
-rw-r--r--docs/rabbitmq-multi.1.xml2
-rw-r--r--docs/rabbitmq-server.1.xml2
-rw-r--r--ebin/rabbit_app.in9
-rw-r--r--include/rabbit.hrl2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/postinst4
-rw-r--r--packaging/macports/Portfile.in10
-rw-r--r--packaging/windows-exe/Makefile16
-rw-r--r--packaging/windows-exe/lib/EnvVarUpdate.nsh327
-rw-r--r--packaging/windows-exe/rabbitmq.icobin0 -> 4286 bytes
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in241
-rwxr-xr-xscripts/rabbitmq-env6
-rwxr-xr-xscripts/rabbitmq-server1
-rw-r--r--scripts/rabbitmq-server.bat1
-rw-r--r--scripts/rabbitmq-service.bat1
-rw-r--r--src/delegate.erl18
-rw-r--r--src/delegate_sup.erl26
-rw-r--r--src/gen_server2.erl25
-rw-r--r--src/pg_local.erl2
-rw-r--r--src/rabbit.erl19
-rw-r--r--src/rabbit_amqqueue.erl57
-rw-r--r--src/rabbit_amqqueue_process.erl39
-rw-r--r--src/rabbit_channel.erl135
-rw-r--r--src/rabbit_channel_sup.erl17
-rw-r--r--src/rabbit_control.erl26
-rw-r--r--src/rabbit_direct.erl21
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_misc.erl18
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_networking.erl16
-rw-r--r--src/rabbit_node_monitor.erl53
-rw-r--r--src/rabbit_reader.erl73
-rw-r--r--src/rabbit_registry.erl2
-rw-r--r--src/rabbit_tests.erl93
37 files changed, 1043 insertions, 255 deletions
diff --git a/.hgignore b/.hgignore
index 03b60914..912b4a56 100644
--- a/.hgignore
+++ b/.hgignore
@@ -25,6 +25,9 @@ syntax: regexp
^packaging/macports/macports$
^packaging/generic-unix/rabbitmq-server-generic-unix-.*\.tar\.gz$
^packaging/windows/rabbitmq-server-windows-.*\.zip$
+^packaging/windows-exe/rabbitmq_server-.*$
+^packaging/windows-exe/rabbitmq-.*\.nsi$
+^packaging/windows-exe/rabbitmq-server-.*\.exe$
^docs/.*\.[15]\.gz$
^docs/.*\.man\.xml$
diff --git a/Makefile b/Makefile
index 0693035f..51b998f4 100644
--- a/Makefile
+++ b/Makefile
@@ -41,14 +41,12 @@ RABBIT_PLT=rabbit.plt
ifndef USE_SPECS
# our type specs rely on features and bug fixes in dialyzer that are
-# only available in R14A upwards (R13B04 is erts 5.7.5)
-#
-# NB: the test assumes that version number will only contain single digits
-USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.5" ]; then echo "true"; else echo "false"; fi)
+# only available in R14A upwards (R14A is erts 5.8)
+USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8]), halt().')
endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
-ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(shell [ $(USE_SPECS) = "true" ] && echo "-Duse_specs")
+ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(if $(filter true,$(USE_SPECS)),-Duse_specs)
VERSION=0.0.0
TARBALL_NAME=rabbitmq-server-$(VERSION)
diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq-env.conf.5.xml
index 31de7164..4c7340c2 100644
--- a/docs/rabbitmq.conf.5.xml
+++ b/docs/rabbitmq-env.conf.5.xml
@@ -9,20 +9,20 @@
</refentryinfo>
<refmeta>
- <refentrytitle>rabbitmq.conf</refentrytitle>
+ <refentrytitle>rabbitmq-env.conf</refentrytitle>
<manvolnum>5</manvolnum>
<refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
</refmeta>
<refnamediv>
- <refname>rabbitmq.conf</refname>
+ <refname>rabbitmq-env.conf</refname>
<refpurpose>default settings for RabbitMQ AMQP server</refpurpose>
</refnamediv>
<refsect1>
<title>Description</title>
<para>
-<filename>/etc/rabbitmq/rabbitmq.conf</filename> contains variable settings that override the
+<filename>/etc/rabbitmq/rabbitmq-env.conf</filename> contains variable settings that override the
defaults built in to the RabbitMQ startup scripts.
</para>
<para>
@@ -33,7 +33,7 @@ operator), including line comments starting with "#".
</para>
<para>
In order of preference, the startup scripts get their values from the
-environment, from <filename>/etc/rabbitmq/rabbitmq.conf</filename> and finally from the
+environment, from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> and finally from the
built-in default values. For example, for the <envar>RABBITMQ_NODENAME</envar>
setting,
</para>
@@ -48,26 +48,26 @@ empty string, then
<envar>NODENAME</envar>
</para>
<para>
-from <filename>/etc/rabbitmq/rabbitmq.conf</filename> is checked. If it is also absent
+from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> is checked. If it is also absent
or set equal to the empty string then the default value from the
startup script is used.
</para>
<para>
-The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the
+The variable names in /etc/rabbitmq/rabbitmq-env.conf are always equal to the
environment variable names, with the <envar>RABBITMQ_</envar> prefix removed:
<envar>RABBITMQ_NODE_PORT</envar> from the environment becomes <envar>NODE_PORT</envar> in the
-<filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc.
+<filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file, etc.
</para>
<para role="example-prefix">For example:</para>
<screen role="example-multiline">
-# I am a complete /etc/rabbitmq/rabbitmq.conf file.
+# I am a complete /etc/rabbitmq/rabbitmq-env.conf file.
# Comment lines start with a hash character.
# This is a /bin/sh script file - use ordinary envt var syntax
NODENAME=hare
</screen>
<para role="example">
This is an example of a complete
- <filename>/etc/rabbitmq/rabbitmq.conf</filename> file that overrides the default Erlang
+ <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file that overrides the default Erlang
node name from "rabbit" to "hare".
</para>
diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml
index 6586890a..5f5c6c2f 100644
--- a/docs/rabbitmq-multi.1.xml
+++ b/docs/rabbitmq-multi.1.xml
@@ -92,7 +92,7 @@ Rotate log files for all local and running RabbitMQ nodes.
<refsect1>
<title>See also</title>
<para>
- <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
</para>
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
index f161a291..a0458c93 100644
--- a/docs/rabbitmq-server.1.xml
+++ b/docs/rabbitmq-server.1.xml
@@ -124,7 +124,7 @@ Defaults to 5672.
<refsect1>
<title>See also</title>
<para>
- <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
</para>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index cc7221d6..f837684c 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -34,4 +34,11 @@
{collect_statistics, none},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]},
- {delegate_count, 16}]}]}.
+ {delegate_count, 16},
+ {tcp_listen_options, [binary,
+ {packet, raw},
+ {reuseaddr, true},
+ {backlog, 128},
+ {nodelay, true},
+ {exit_on_close, false}]}
+ ]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 15f5d7c5..24d0f961 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -28,7 +28,7 @@
-record(vhost, {virtual_host, dummy}).
-record(connection, {protocol, user, timeout_sec, frame_max, vhost,
- client_properties}).
+ client_properties, capabilities}).
-record(content,
{class_id,
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 47316864..5d573bde 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -92,6 +92,9 @@ fi
%post
/sbin/chkconfig --add %{name}
+if [ -f %{_sysconfdir}/rabbitmq/rabbitmq.conf ] && [ ! -f %{_sysconfdir}/rabbitmq/rabbitmq-env.conf ]; then
+ mv %{_sysconfdir}/rabbitmq/rabbitmq.conf %{_sysconfdir}/rabbitmq/rabbitmq-env.conf
+fi
%preun
if [ $1 = 0 ]; then
diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst
index 134f16ee..b11340ef 100644
--- a/packaging/debs/Debian/debian/postinst
+++ b/packaging/debs/Debian/debian/postinst
@@ -35,6 +35,10 @@ chown -R rabbitmq:rabbitmq /var/log/rabbitmq
case "$1" in
configure)
+ if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
+ [ ! -f /etc/rabbitmq/rabbitmq-env.conf ]; then
+ mv /etc/rabbitmq/rabbitmq.conf /etc/rabbitmq/rabbitmq-env.conf
+ fi
;;
abort-upgrade|abort-remove|abort-deconfigure)
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index f8417b83..862a0d1a 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -81,7 +81,7 @@ post-destroot {
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${serverhome}
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
- reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
+ reinplace -E "s:(/etc/rabbitmq/rabbitmq):${prefix}\\1:g" \
${realsbin}/rabbitmq-env
foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} {
reinplace -E "s:^($var)=/:\\1=${prefix}/:" \
@@ -102,10 +102,10 @@ post-destroot {
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl
- file copy ${mansrc}/man1/rabbitmq-multi.1.gz ${mandest}/man1/
- file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/
- file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/
- file copy ${mansrc}/man5/rabbitmq.conf.5.gz ${mandest}/man5/
+ file copy ${mansrc}/man1/rabbitmq-multi.1.gz ${mandest}/man1/
+ file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/
+ file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/
+ file copy ${mansrc}/man5/rabbitmq-env.conf.5.gz ${mandest}/man5/
}
pre-install {
diff --git a/packaging/windows-exe/Makefile b/packaging/windows-exe/Makefile
new file mode 100644
index 00000000..59803f9c
--- /dev/null
+++ b/packaging/windows-exe/Makefile
@@ -0,0 +1,16 @@
+VERSION=0.0.0
+ZIP=../windows/rabbitmq-server-windows-$(VERSION)
+
+dist: rabbitmq-$(VERSION).nsi rabbitmq_server-$(VERSION)
+ makensis rabbitmq-$(VERSION).nsi
+
+rabbitmq-$(VERSION).nsi: rabbitmq_nsi.in
+ sed \
+ -e 's|%%VERSION%%|$(VERSION)|' \
+ $< > $@
+
+rabbitmq_server-$(VERSION):
+ unzip $(ZIP)
+
+clean:
+ rm -rf rabbitmq-*.nsi rabbitmq_server-* rabbitmq-server-*.exe
diff --git a/packaging/windows-exe/lib/EnvVarUpdate.nsh b/packaging/windows-exe/lib/EnvVarUpdate.nsh
new file mode 100644
index 00000000..839d6a02
--- /dev/null
+++ b/packaging/windows-exe/lib/EnvVarUpdate.nsh
@@ -0,0 +1,327 @@
+/**
+ * EnvVarUpdate.nsh
+ * : Environmental Variables: append, prepend, and remove entries
+ *
+ * WARNING: If you use StrFunc.nsh header then include it before this file
+ * with all required definitions. This is to avoid conflicts
+ *
+ * Usage:
+ * ${EnvVarUpdate} "ResultVar" "EnvVarName" "Action" "RegLoc" "PathString"
+ *
+ * Credits:
+ * Version 1.0
+ * * Cal Turney (turnec2)
+ * * Amir Szekely (KiCHiK) and e-circ for developing the forerunners of this
+ * function: AddToPath, un.RemoveFromPath, AddToEnvVar, un.RemoveFromEnvVar,
+ * WriteEnvStr, and un.DeleteEnvStr
+ * * Diego Pedroso (deguix) for StrTok
+ * * Kevin English (kenglish_hi) for StrContains
+ * * Hendri Adriaens (Smile2Me), Diego Pedroso (deguix), and Dan Fuhry
+ * (dandaman32) for StrReplace
+ *
+ * Version 1.1 (compatibility with StrFunc.nsh)
+ * * techtonik
+ *
+ * http://nsis.sourceforge.net/Environmental_Variables:_append%2C_prepend%2C_and_remove_entries
+ *
+ */
+
+
+!ifndef ENVVARUPDATE_FUNCTION
+!define ENVVARUPDATE_FUNCTION
+!verbose push
+!verbose 3
+!include "LogicLib.nsh"
+!include "WinMessages.NSH"
+!include "StrFunc.nsh"
+
+; ---- Fix for conflict if StrFunc.nsh is already includes in main file -----------------------
+!macro _IncludeStrFunction StrFuncName
+ !ifndef ${StrFuncName}_INCLUDED
+ ${${StrFuncName}}
+ !endif
+ !ifndef Un${StrFuncName}_INCLUDED
+ ${Un${StrFuncName}}
+ !endif
+ !define un.${StrFuncName} "${Un${StrFuncName}}"
+!macroend
+
+!insertmacro _IncludeStrFunction StrTok
+!insertmacro _IncludeStrFunction StrStr
+!insertmacro _IncludeStrFunction StrRep
+
+; ---------------------------------- Macro Definitions ----------------------------------------
+!macro _EnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
+ Push "${EnvVarName}"
+ Push "${Action}"
+ Push "${RegLoc}"
+ Push "${PathString}"
+ Call EnvVarUpdate
+ Pop "${ResultVar}"
+!macroend
+!define EnvVarUpdate '!insertmacro "_EnvVarUpdateConstructor"'
+
+!macro _unEnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
+ Push "${EnvVarName}"
+ Push "${Action}"
+ Push "${RegLoc}"
+ Push "${PathString}"
+ Call un.EnvVarUpdate
+ Pop "${ResultVar}"
+!macroend
+!define un.EnvVarUpdate '!insertmacro "_unEnvVarUpdateConstructor"'
+; ---------------------------------- Macro Definitions end-------------------------------------
+
+;----------------------------------- EnvVarUpdate start----------------------------------------
+!define hklm_all_users 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
+!define hkcu_current_user 'HKCU "Environment"'
+
+!macro EnvVarUpdate UN
+
+Function ${UN}EnvVarUpdate
+
+ Push $0
+ Exch 4
+ Exch $1
+ Exch 3
+ Exch $2
+ Exch 2
+ Exch $3
+ Exch
+ Exch $4
+ Push $5
+ Push $6
+ Push $7
+ Push $8
+ Push $9
+ Push $R0
+
+ /* After this point:
+ -------------------------
+ $0 = ResultVar (returned)
+ $1 = EnvVarName (input)
+ $2 = Action (input)
+ $3 = RegLoc (input)
+ $4 = PathString (input)
+ $5 = Orig EnvVar (read from registry)
+ $6 = Len of $0 (temp)
+ $7 = tempstr1 (temp)
+ $8 = Entry counter (temp)
+ $9 = tempstr2 (temp)
+ $R0 = tempChar (temp) */
+
+ ; Step 1: Read contents of EnvVarName from RegLoc
+ ;
+ ; Check for empty EnvVarName
+ ${If} $1 == ""
+ SetErrors
+ DetailPrint "ERROR: EnvVarName is blank"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Check for valid Action
+ ${If} $2 != "A"
+ ${AndIf} $2 != "P"
+ ${AndIf} $2 != "R"
+ SetErrors
+ DetailPrint "ERROR: Invalid Action - must be A, P, or R"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ${If} $3 == HKLM
+ ReadRegStr $5 ${hklm_all_users} $1 ; Get EnvVarName from all users into $5
+ ${ElseIf} $3 == HKCU
+ ReadRegStr $5 ${hkcu_current_user} $1 ; Read EnvVarName from current user into $5
+ ${Else}
+ SetErrors
+ DetailPrint 'ERROR: Action is [$3] but must be "HKLM" or HKCU"'
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Check for empty PathString
+ ${If} $4 == ""
+ SetErrors
+ DetailPrint "ERROR: PathString is blank"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Make sure we've got some work to do
+ ${If} $5 == ""
+ ${AndIf} $2 == "R"
+ SetErrors
+ DetailPrint "$1 is empty - Nothing to remove"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Step 2: Scrub EnvVar
+ ;
+ StrCpy $0 $5 ; Copy the contents to $0
+ ; Remove spaces around semicolons (NOTE: spaces before the 1st entry or
+ ; after the last one are not removed here but instead in Step 3)
+ ${If} $0 != "" ; If EnvVar is not empty ...
+ ${Do}
+ ${${UN}StrStr} $7 $0 " ;"
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 " ;" ";" ; Remove '<space>;'
+ ${Loop}
+ ${Do}
+ ${${UN}StrStr} $7 $0 "; "
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 "; " ";" ; Remove ';<space>'
+ ${Loop}
+ ${Do}
+ ${${UN}StrStr} $7 $0 ";;"
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 ";;" ";"
+ ${Loop}
+
+ ; Remove a leading or trailing semicolon from EnvVar
+ StrCpy $7 $0 1 0
+ ${If} $7 == ";"
+ StrCpy $0 $0 "" 1 ; Change ';<EnvVar>' to '<EnvVar>'
+ ${EndIf}
+ StrLen $6 $0
+ IntOp $6 $6 - 1
+ StrCpy $7 $0 1 $6
+ ${If} $7 == ";"
+ StrCpy $0 $0 $6 ; Change ';<EnvVar>' to '<EnvVar>'
+ ${EndIf}
+ ; DetailPrint "Scrubbed $1: [$0]" ; Uncomment to debug
+ ${EndIf}
+
+ /* Step 3. Remove all instances of the target path/string (even if "A" or "P")
+ $6 = bool flag (1 = found and removed PathString)
+ $7 = a string (e.g. path) delimited by semicolon(s)
+ $8 = entry counter starting at 0
+ $9 = copy of $0
+ $R0 = tempChar */
+
+ ${If} $5 != "" ; If EnvVar is not empty ...
+ StrCpy $9 $0
+ StrCpy $0 ""
+ StrCpy $8 0
+ StrCpy $6 0
+
+ ${Do}
+ ${${UN}StrTok} $7 $9 ";" $8 "0" ; $7 = next entry, $8 = entry counter
+
+ ${If} $7 == "" ; If we've run out of entries,
+ ${ExitDo} ; were done
+ ${EndIf} ;
+
+ ; Remove leading and trailing spaces from this entry (critical step for Action=Remove)
+ ${Do}
+ StrCpy $R0 $7 1
+ ${If} $R0 != " "
+ ${ExitDo}
+ ${EndIf}
+ StrCpy $7 $7 "" 1 ; Remove leading space
+ ${Loop}
+ ${Do}
+ StrCpy $R0 $7 1 -1
+ ${If} $R0 != " "
+ ${ExitDo}
+ ${EndIf}
+ StrCpy $7 $7 -1 ; Remove trailing space
+ ${Loop}
+ ${If} $7 == $4 ; If string matches, remove it by not appending it
+ StrCpy $6 1 ; Set 'found' flag
+ ${ElseIf} $7 != $4 ; If string does NOT match
+ ${AndIf} $0 == "" ; and the 1st string being added to $0,
+ StrCpy $0 $7 ; copy it to $0 without a prepended semicolon
+ ${ElseIf} $7 != $4 ; If string does NOT match
+ ${AndIf} $0 != "" ; and this is NOT the 1st string to be added to $0,
+ StrCpy $0 $0;$7 ; append path to $0 with a prepended semicolon
+ ${EndIf} ;
+
+ IntOp $8 $8 + 1 ; Bump counter
+ ${Loop} ; Check for duplicates until we run out of paths
+ ${EndIf}
+
+ ; Step 4: Perform the requested Action
+ ;
+ ${If} $2 != "R" ; If Append or Prepend
+ ${If} $6 == 1 ; And if we found the target
+ DetailPrint "Target is already present in $1. It will be removed and"
+ ${EndIf}
+ ${If} $0 == "" ; If EnvVar is (now) empty
+ StrCpy $0 $4 ; just copy PathString to EnvVar
+ ${If} $6 == 0 ; If found flag is either 0
+ ${OrIf} $6 == "" ; or blank (if EnvVarName is empty)
+ DetailPrint "$1 was empty and has been updated with the target"
+ ${EndIf}
+ ${ElseIf} $2 == "A" ; If Append (and EnvVar is not empty),
+ StrCpy $0 $0;$4 ; append PathString
+ ${If} $6 == 1
+ DetailPrint "appended to $1"
+ ${Else}
+ DetailPrint "Target was appended to $1"
+ ${EndIf}
+ ${Else} ; If Prepend (and EnvVar is not empty),
+ StrCpy $0 $4;$0 ; prepend PathString
+ ${If} $6 == 1
+ DetailPrint "prepended to $1"
+ ${Else}
+ DetailPrint "Target was prepended to $1"
+ ${EndIf}
+ ${EndIf}
+ ${Else} ; If Action = Remove
+ ${If} $6 == 1 ; and we found the target
+ DetailPrint "Target was found and removed from $1"
+ ${Else}
+ DetailPrint "Target was NOT found in $1 (nothing to remove)"
+ ${EndIf}
+ ${If} $0 == ""
+ DetailPrint "$1 is now empty"
+ ${EndIf}
+ ${EndIf}
+
+ ; Step 5: Update the registry at RegLoc with the updated EnvVar and announce the change
+ ;
+ ClearErrors
+ ${If} $3 == HKLM
+ WriteRegExpandStr ${hklm_all_users} $1 $0 ; Write it in all users section
+ ${ElseIf} $3 == HKCU
+ WriteRegExpandStr ${hkcu_current_user} $1 $0 ; Write it to current user section
+ ${EndIf}
+
+ IfErrors 0 +4
+ MessageBox MB_OK|MB_ICONEXCLAMATION "Could not write updated $1 to $3"
+ DetailPrint "Could not write updated $1 to $3"
+ Goto EnvVarUpdate_Restore_Vars
+
+ ; "Export" our change
+ SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
+
+ EnvVarUpdate_Restore_Vars:
+ ;
+ ; Restore the user's variables and return ResultVar
+ Pop $R0
+ Pop $9
+ Pop $8
+ Pop $7
+ Pop $6
+ Pop $5
+ Pop $4
+ Pop $3
+ Pop $2
+ Pop $1
+ Push $0 ; Push my $0 (ResultVar)
+ Exch
+ Pop $0 ; Restore his $0
+
+FunctionEnd
+
+!macroend ; EnvVarUpdate UN
+!insertmacro EnvVarUpdate ""
+!insertmacro EnvVarUpdate "un."
+;----------------------------------- EnvVarUpdate end----------------------------------------
+
+!verbose pop
+!endif
diff --git a/packaging/windows-exe/rabbitmq.ico b/packaging/windows-exe/rabbitmq.ico
new file mode 100644
index 00000000..5e169a79
--- /dev/null
+++ b/packaging/windows-exe/rabbitmq.ico
Binary files differ
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
new file mode 100644
index 00000000..6d79ffd4
--- /dev/null
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -0,0 +1,241 @@
+; Use the "Modern" UI
+!include MUI2.nsh
+!include LogicLib.nsh
+!include WinMessages.nsh
+!include FileFunc.nsh
+!include WordFunc.nsh
+!include lib\EnvVarUpdate.nsh
+
+!define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
+!define uninstall "Software\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ"
+
+;--------------------------------
+
+; The name of the installer
+Name "RabbitMQ Server %%VERSION%%"
+
+; The file to write
+OutFile "rabbitmq-server-%%VERSION%%.exe"
+
+; Icons
+!define MUI_ICON "rabbitmq.ico"
+
+; The default installation directory
+InstallDir "$PROGRAMFILES\RabbitMQ Server"
+
+; Registry key to check for directory (so if you install again, it will
+; overwrite the old one automatically)
+InstallDirRegKey HKLM "Software\VMware, Inc.\RabbitMQ Server" "Install_Dir"
+
+; Request application privileges for Windows Vista
+RequestExecutionLevel admin
+
+SetCompressor /solid lzma
+
+VIProductVersion "%%VERSION%%.0"
+VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductVersion" "%%VERSION%%"
+VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server"
+;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" ""
+VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc"
+;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ?
+VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2011 VMware, Inc. All rights reserved."
+VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server"
+VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%"
+
+;--------------------------------
+
+; Pages
+
+
+; !insertmacro MUI_PAGE_LICENSE "..\..\LICENSE-MPL-RabbitMQ"
+ !insertmacro MUI_PAGE_COMPONENTS
+ !insertmacro MUI_PAGE_DIRECTORY
+ !insertmacro MUI_PAGE_INSTFILES
+ !insertmacro MUI_PAGE_FINISH
+
+ !insertmacro MUI_UNPAGE_CONFIRM
+ !insertmacro MUI_UNPAGE_INSTFILES
+ !define MUI_FINISHPAGE_TEXT "RabbitMQ Server %%VERSION%% has been uninstalled from your computer.$\n$\nPlease note that the log and database directories located at $APPDATA\RabbitMQ have not been removed. You can remove them manually if desired."
+ !insertmacro MUI_UNPAGE_FINISH
+
+;--------------------------------
+;Languages
+
+ !insertmacro MUI_LANGUAGE "English"
+
+;--------------------------------
+
+; The stuff to install
+Section "RabbitMQ Server (required)" Rabbit
+
+ SectionIn RO
+
+ ; Set output path to the installation directory.
+ SetOutPath $INSTDIR
+
+ ; Put files there
+ File /r "rabbitmq_server-%%VERSION%%"
+ File "rabbitmq.ico"
+
+ ; Add to PATH
+ ${EnvVarUpdate} $0 "PATH" "A" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin"
+
+ ; Write the installation path into the registry
+ WriteRegStr HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" "Install_Dir" "$INSTDIR"
+
+ ; Write the uninstall keys for Windows
+ WriteRegStr HKLM ${uninstall} "DisplayName" "RabbitMQ Server"
+ WriteRegStr HKLM ${uninstall} "UninstallString" "$INSTDIR\uninstall.exe"
+ WriteRegStr HKLM ${uninstall} "DisplayIcon" "$INSTDIR\uninstall.exe,0"
+ WriteRegStr HKLM ${uninstall} "Publisher" "VMware, Inc."
+ WriteRegStr HKLM ${uninstall} "DisplayVersion" "%%VERSION%%"
+ WriteRegDWORD HKLM ${uninstall} "NoModify" 1
+ WriteRegDWORD HKLM ${uninstall} "NoRepair" 1
+
+ ${GetSize} "$INSTDIR" "/S=0K" $0 $1 $2
+ IntFmt $0 "0x%08X" $0
+ WriteRegDWORD HKLM "${uninstall}" "EstimatedSize" "$0"
+
+ WriteUninstaller "uninstall.exe"
+SectionEnd
+
+;--------------------------------
+
+Section "RabbitMQ Service" RabbitService
+ ExpandEnvStrings $0 %COMSPEC%
+ ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" install'
+ ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" start'
+ CopyFiles "$WINDIR\.erlang.cookie" "$PROFILE\.erlang.cookie"
+SectionEnd
+
+;--------------------------------
+
+Section "Start Menu" RabbitStartMenu
+ ; In case the service is not installed, or the service installation fails,
+ ; make sure these exist or Explorer will get confused.
+ CreateDirectory "$APPDATA\RabbitMQ\log"
+ CreateDirectory "$APPDATA\RabbitMQ\db"
+
+ CreateDirectory "$SMPROGRAMS\RabbitMQ Server"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Uninstall.lnk" "$INSTDIR\uninstall.exe" "" "$INSTDIR\uninstall.exe" 0
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Plugins Directory.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\plugins"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Log Directory.lnk" "$APPDATA\RabbitMQ\log"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Database Directory.lnk" "$APPDATA\RabbitMQ\db"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\(Re)Install Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "install" "$INSTDIR\rabbitmq.ico"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Remove Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "remove" "$INSTDIR\rabbitmq.ico"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Start Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Stop Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico"
+
+SectionEnd
+
+;--------------------------------
+
+; Section descriptions
+
+LangString DESC_Rabbit ${LANG_ENGLISH} "The RabbitMQ Server."
+LangString DESC_RabbitService ${LANG_ENGLISH} "Set up RabbitMQ as a Windows Service."
+LangString DESC_RabbitStartMenu ${LANG_ENGLISH} "Add some useful links to the start menu."
+
+!insertmacro MUI_FUNCTION_DESCRIPTION_BEGIN
+ !insertmacro MUI_DESCRIPTION_TEXT ${Rabbit} $(DESC_Rabbit)
+ !insertmacro MUI_DESCRIPTION_TEXT ${RabbitService} $(DESC_RabbitService)
+ !insertmacro MUI_DESCRIPTION_TEXT ${RabbitStartMenu} $(DESC_RabbitStartMenu)
+!insertmacro MUI_FUNCTION_DESCRIPTION_END
+
+;--------------------------------
+
+; Uninstaller
+
+Section "Uninstall"
+
+ ; Remove registry keys
+ DeleteRegKey HKLM ${uninstall}
+ DeleteRegKey HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server"
+
+ ; TODO these will fail if the service is not installed - do we care?
+ ExpandEnvStrings $0 %COMSPEC%
+ ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" stop'
+ ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" remove'
+
+ ; Remove from PATH
+ ${un.EnvVarUpdate} $0 "PATH" "R" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin"
+
+ ; Remove files and uninstaller
+ RMDir /r "$INSTDIR\rabbitmq_server-%%VERSION%%"
+ Delete "$INSTDIR\rabbitmq.ico"
+ Delete "$INSTDIR\uninstall.exe"
+
+ ; Remove start menu items
+ RMDir /r "$SMPROGRAMS\RabbitMQ Server"
+
+ DeleteRegValue ${env_hklm} ERLANG_HOME
+ SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
+
+SectionEnd
+
+;--------------------------------
+
+; Functions
+
+Function .onInit
+ Call findErlang
+
+ ReadRegStr $0 HKLM ${uninstall} "UninstallString"
+ ${If} $0 != ""
+ MessageBox MB_OKCANCEL|MB_ICONEXCLAMATION "RabbitMQ is already installed. $\n$\nClick 'OK' to remove the previous version or 'Cancel' to cancel this installation." IDCANCEL norun
+
+ ;Run the uninstaller
+ ClearErrors
+ ExecWait $INSTDIR\uninstall.exe
+
+ norun:
+ Abort
+ ${EndIf}
+FunctionEnd
+
+Function findErlang
+
+ StrCpy $0 0
+ StrCpy $2 "not-found"
+ ${Do}
+ EnumRegKey $1 HKLM Software\Ericsson\Erlang $0
+ ${If} $1 = ""
+ ${Break}
+ ${EndIf}
+ ${If} $1 <> "ErlSrv"
+ StrCpy $2 $1
+ ${EndIf}
+
+ IntOp $0 $0 + 1
+ ${Loop}
+
+ ${If} $2 = "not-found"
+ MessageBox MB_YESNO|MB_ICONEXCLAMATION "Erlang could not be detected.$\nYou must install Erlang before installing RabbitMQ. Would you like the installer to open a browser window to the Erlang download site?" IDNO abort
+ ExecShell "open" "http://www.erlang.org/download.html"
+ abort:
+ Abort
+ ${Else}
+ ${VersionCompare} $2 "5.6.3" $0
+ ${VersionCompare} $2 "5.8.1" $1
+
+ ${If} $0 = 2
+ MessageBox MB_OK|MB_ICONEXCLAMATION "Your installed version of Erlang ($2) is too old. Please install a more recent version."
+ Abort
+ ${ElseIf} $1 = 2
+ MessageBox MB_YESNO|MB_ICONEXCLAMATION "Your installed version of Erlang ($2) is comparatively old.$\nFor best results, please install a newer version.$\nDo you wish to continue?" IDYES no_abort
+ Abort
+ no_abort:
+ ${EndIf}
+
+ ReadRegStr $0 HKLM "Software\Ericsson\Erlang\$2" ""
+
+ ; See http://nsis.sourceforge.net/Setting_Environment_Variables
+ WriteRegExpandStr ${env_hklm} ERLANG_HOME $0
+ SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
+
+ ; On Windows XP changing the permanent environment does not change *our*
+ ; environment, so do that as well.
+ System::Call 'Kernel32::SetEnvironmentVariableA(t, t) i("ERLANG_HOME", "$0").r0'
+ ${EndIf}
+
+FunctionEnd \ No newline at end of file
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index df4b24d8..3e173949 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -37,4 +37,8 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.."
NODENAME=rabbit@${HOSTNAME%%.*}
# Load configuration from the rabbitmq.conf file
-[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
+if [ -f /etc/rabbitmq/rabbitmq.conf ]; then
+ echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- "
+ echo "location has moved to /etc/rabbitmq/rabbitmq-env.conf"
+fi
+[ -f /etc/rabbitmq/rabbitmq-env.conf ] && . /etc/rabbitmq/rabbitmq-env.conf
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 5c390a51..2f80eb96 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -16,7 +16,6 @@
##
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
--kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
CONFIG_FILE=/etc/rabbitmq/rabbitmq
LOG_BASE=/var/log/rabbitmq
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 0cfa5ea8..2ca9f2b3 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -142,7 +142,6 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+W w ^
+A30 ^
+P 1048576 ^
--kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 43520b55..bc452fea 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -207,7 +207,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{nodelay,true}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/src/delegate.erl b/src/delegate.erl
index 46bd8245..17046201 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]).
+-export([start_link/1, invoke_no_result/2, invoke/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,8 +36,6 @@
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}).
--spec(delegate_count/1 :: ([node()]) -> non_neg_integer()).
-
-endif.
%%----------------------------------------------------------------------------
@@ -111,22 +109,14 @@ group_pids_by_node(Pids) ->
node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
end, {[], orddict:new()}, Pids).
-delegate_count([RemoteNode | _]) ->
- {ok, Count} = case application:get_env(rabbit, delegate_count) of
- undefined -> rpc:call(RemoteNode, application, get_env,
- [rabbit, delegate_count]);
- Result -> Result
- end,
- Count.
-
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
delegate(RemoteNodes) ->
case get(delegate) of
- undefined -> Name =
- delegate_name(erlang:phash2(
- self(), delegate_count(RemoteNodes))),
+ undefined -> Name = delegate_name(
+ erlang:phash2(self(),
+ delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
Name -> Name
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index e0ffa7c8..fc693c7d 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/1, count/1]).
-export([init/1]).
@@ -28,20 +28,32 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(count/1 :: ([node()]) -> integer()).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_link(Count) ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]).
+
+count([]) ->
+ 1;
+count([Node | Nodes]) ->
+ try
+ length(supervisor:which_children({?SERVER, Node}))
+ catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
+ count(Nodes);
+ exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown;
+ R =:= nodedown ->
+ count(Nodes)
+ end.
%%----------------------------------------------------------------------------
-init(_Args) ->
- DCount = delegate:delegate_count([node()]),
+init([Count]) ->
{ok, {{one_for_one, 10, 10},
[{Num, {delegate, start_link, [Num]},
transient, 16#ffffffff, worker, [delegate]} ||
- Num <- lists:seq(0, DCount - 1)]}}.
+ Num <- lists:seq(0, Count - 1)]}}.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index a637dddd..94296f97 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -58,6 +58,15 @@
%% hibernate the process immediately, as it would if backoff wasn't
%% being used. Instead it'll wait for the current timeout as described
%% above.
+%%
+%% 7) The callback module can return from any of the handle_*
+%% functions, a {become, Module, State} triple, or a {become, Module,
+%% State, Timeout} quadruple. This allows the gen_server to
+%% dynamically change the callback module. The State is the new state
+%% which will be passed into any of the callback functions in the new
+%% module. Note there is no form also encompassing a reply, thus if
+%% you wish to reply in handle_call/3 and change the callback module,
+%% you need to use gen_server2:reply/2 to issue the reply manually.
%% All modifications are (C) 2009-2011 VMware, Inc.
@@ -880,6 +889,22 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
loop(GS2State #gs2_state { state = NState,
time = Time1,
debug = Debug1 });
+ {become, Mod, NState} ->
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
+ {become, Mod, NState}),
+ loop(find_prioritisers(
+ GS2State #gs2_state { mod = Mod,
+ state = NState,
+ time = infinity,
+ debug = Debug1 }));
+ {become, Mod, NState, Time1} ->
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
+ {become, Mod, NState}),
+ loop(find_prioritisers(
+ GS2State #gs2_state { mod = Mod,
+ state = NState,
+ time = Time1,
+ debug = Debug1 }));
_ ->
handle_common_termination(Reply, Msg, GS2State)
end.
diff --git a/src/pg_local.erl b/src/pg_local.erl
index fd515747..c9c3a3a7 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -83,7 +83,7 @@ get_members(Name) ->
sync() ->
ensure_started(),
- gen_server:call(?MODULE, sync).
+ gen_server:call(?MODULE, sync, infinity).
%%%
%%% Callback functions from gen_server
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c6661d39..1beed5c1 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -27,7 +27,7 @@
%%---------------------------------------------------------------------------
%% Boot steps.
--export([maybe_insert_default_data/0]).
+-export([maybe_insert_default_data/0, boot_delegate/0]).
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
@@ -101,8 +101,7 @@
-rabbit_boot_step({delegate_sup,
[{description, "cluster delegate"},
- {mfa, {rabbit_sup, start_child,
- [delegate_sup]}},
+ {mfa, {rabbit, boot_delegate, []}},
{requires, kernel_ready},
{enables, core_initialized}]}).
@@ -153,6 +152,11 @@
[{mfa, {rabbit_networking, boot, []}},
{requires, log_relay}]}).
+-rabbit_boot_step({notify_cluster,
+ [{description, "notify cluster nodes"},
+ {mfa, {rabbit_node_monitor, notify_cluster, []}},
+ {requires, networking}]}).
+
%%---------------------------------------------------------------------------
-include("rabbit_framing.hrl").
@@ -179,6 +183,9 @@
{running_nodes, [node()]}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
+-spec(maybe_insert_default_data/0 :: () -> 'ok').
+-spec(boot_delegate/0 :: () -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
@@ -225,11 +232,11 @@ start(normal, []) ->
case erts_version_check() of
ok ->
{ok, SupPid} = rabbit_sup:start_link(),
+ true = register(rabbit, self()),
print_banner(),
[ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
-
{ok, SupPid};
Error ->
Error
@@ -448,6 +455,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
end
end.
+boot_delegate() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ rabbit_sup:start_child(delegate_sup, [Count]).
+
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
true -> insert_default_data();
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a6da551d..46b78c39 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -197,7 +197,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
arguments = Args,
exclusive_owner = Owner,
pid = none}),
- case gen_server2:call(Q#amqqueue.pid, {init, false}) of
+ case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
end.
@@ -218,7 +218,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
rabbit_misc:const(not_found)
end;
[ExistingQ = #amqqueue{pid = QPid}] ->
- case is_process_alive(QPid) of
+ case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
false -> TailFun = internal_delete(QueueName),
fun (Tx) -> TailFun(Tx), ExistingQ end
@@ -300,29 +300,19 @@ check_declare_arguments(QueueName, Args) ->
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <-
- [{<<"x-expires">>, fun check_expires_argument/1},
- {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]],
+ [{<<"x-expires">>, fun check_integer_argument/1},
+ {<<"x-message-ttl">>, fun check_integer_argument/1}]],
ok.
-check_expires_argument(Val) ->
- check_integer_argument(Val,
- expires_not_of_acceptable_type,
- expires_zero_or_less).
-
-check_message_ttl_argument(Val) ->
- check_integer_argument(Val,
- ttl_not_of_acceptable_type,
- ttl_zero_or_less).
-
-check_integer_argument(undefined, _, _) ->
+check_integer_argument(undefined) ->
ok;
-check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 ->
+check_integer_argument({Type, Val}) when Val > 0 ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
- false -> {error, {InvalidTypeError, Type, Val}}
+ false -> {error, {unacceptable_type, Type}}
end;
-check_integer_argument({_Type, _Val}, _, ZeroOrLessError) ->
- {error, ZeroOrLessError}.
+check_integer_argument({_Type, Val}) ->
+ {error, {value_zero_or_less, Val}}.
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -334,10 +324,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, info, infinity).
+ delegate_call(QPid, info).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_call(QPid, {info, Items}, infinity) of
+ case delegate_call(QPid, {info, Items}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -347,7 +337,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, consumers, infinity).
+ delegate_call(QPid, consumers).
consumers_all(VHostPath) ->
lists:append(
@@ -356,7 +346,8 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
+stat(#amqqueue{pid = QPid}) ->
+ delegate_call(QPid, stat).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
@@ -365,9 +356,9 @@ delete_immediately(#amqqueue{ pid = QPid }) ->
gen_server2:cast(QPid, delete_immediately).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
+ delegate_call(QPid, {delete, IfUnused, IfEmpty}).
-purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity).
+purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
deliver(QPid, Delivery = #delivery{immediate = true}) ->
gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
@@ -379,7 +370,7 @@ deliver(QPid, Delivery) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
+ delegate_call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}).
@@ -408,20 +399,18 @@ limit_all(QPids, ChPid, LimiterPid) ->
end).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
+ delegate_call(QPid, {basic_get, ChPid, NoAck}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
delegate_call(QPid, {basic_consume, NoAck, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
- infinity).
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
- infinity).
+ ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
- delegate_cast(QPid, {notify_sent, ChPid}).
+ gen_server2:cast(QPid, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
@@ -509,8 +498,8 @@ safe_delegate_call_ok(F, Pids) ->
{_, Bad} -> {error, Bad}
end.
-delegate_call(Pid, Msg, Timeout) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
+delegate_call(Pid, Msg) ->
+ delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end).
delegate_cast(Pid, Msg) ->
delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7c7e28fe..e794b4aa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -21,7 +21,7 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(SYNC_INTERVAL, 5). %% milliseconds
+-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(BASE_MESSAGE_PROPERTIES,
@@ -122,6 +122,8 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
terminate(_Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
+ rabbit_event:notify(
+ queue_deleted, [{pid, self()}]),
BQS1 = BQ:delete_and_terminate(BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
@@ -186,7 +188,6 @@ terminate_shutdown(Fun, State) ->
end, BQS, all_ch_record()),
[emit_consumer_deleted(Ch, CTag)
|| {Ch, CTag, _} <- consumers(State1)],
- rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -657,13 +658,13 @@ message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.
calculate_msg_expiry(undefined) -> undefined;
-calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000).
+calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
- Now = now_millis(),
+ Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) ->
Now > Expiry
@@ -684,7 +685,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
-now_millis() -> timer:now_diff(now(), {0,0,0}).
+now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -790,20 +791,20 @@ handle_call({init, Recover}, From,
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
- case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
- true -> erlang:monitor(process, Owner),
- declare(Recover, From, State);
- _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined} = State,
- gen_server2:reply(From, not_found),
- case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
- end,
- BQS = BQ:init(QName, IsDurable, Recover),
- %% Rely on terminate to delete the queue.
- {stop, normal, State#q{backing_queue_state = BQS}}
+ case rabbit_misc:is_process_alive(Owner) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ false -> #q{backing_queue = BQ, backing_queue_state = undefined,
+ q = #amqqueue{name = QName, durable = IsDurable}} = State,
+ gen_server2:reply(From, not_found),
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a82e5eff..73c031de 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/8, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1]).
@@ -34,7 +34,8 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed, confirmed}).
+ confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm,
+ confirmed, capabilities}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -66,9 +67,9 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/7 ::
+-spec(start_link/8 ::
(channel_number(), pid(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid(),
+ rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(),
fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -94,10 +95,11 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
- StartLimiterFun) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User,
- VHost, CollectorPid, StartLimiterFun], []).
+start_link(Channel, ReaderPid, WriterPid, User, VHost, Capabilities,
+ CollectorPid, StartLimiterFun) ->
+ gen_server2:start_link(?MODULE,
+ [Channel, ReaderPid, WriterPid, User, VHost,
+ Capabilities, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -106,7 +108,7 @@ do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content}).
flush(Pid) ->
- gen_server2:call(Pid, flush).
+ gen_server2:call(Pid, flush, infinity).
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
@@ -148,7 +150,7 @@ emit_stats(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
+init([Channel, ReaderPid, WriterPid, User, VHost, Capabilities, CollectorPid,
StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
@@ -173,8 +175,10 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = gb_trees:empty(),
- confirmed = []},
+ unconfirmed_mq = gb_trees:empty(),
+ unconfirmed_qm = gb_trees:empty(),
+ confirmed = [],
+ capabilities = Capabilities},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -278,19 +282,22 @@ handle_info(timeout, State) ->
noreply(State);
handle_info({'DOWN', _MRef, process, QPid, Reason},
- State = #ch{unconfirmed = UC}) ->
- %% TODO: this does a complete scan and partial rebuild of the
- %% tree, which is quite efficient. To do better we'd need to
- %% maintain a secondary mapping, from QPids to MsgSeqNos.
- {MXs, UC1} = remove_queue_unconfirmed(
- gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}, State),
+ State = #ch{unconfirmed_qm = UQM}) ->
+ MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSet} -> gb_sets:to_list(MsgSet);
+ none -> []
+ end,
+ %% We remove the MsgSeqNos from UQM before calling
+ %% process_confirms to prevent each MsgSeqNo being removed from
+ %% the set one by one which which would be inefficient
+ State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
+ {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
erase_queue_stats(QPid),
- State1 = case Reason of
- normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
- _ -> send_nacks(MXs, State#ch{unconfirmed = UC1})
- end,
- noreply(queue_blocked(QPid, State1)).
+ State3 = (case Reason of
+ normal -> fun record_confirms/2;
+ _ -> fun send_nacks/2
+ end)(MXs, State2),
+ noreply(queue_blocked(QPid, State3)).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -476,13 +483,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-remove_queue_unconfirmed(none, _QPid, Acc, _State) ->
- Acc;
-remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) ->
- remove_queue_unconfirmed(gb_trees:next(Next), QPid,
- remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State),
- State).
-
record_confirm(undefined, _, State) ->
State;
record_confirm(MsgSeqNo, XName, State) ->
@@ -495,25 +495,43 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} =
+confirm(MsgSeqNos, QPid, State) ->
+ {MXs, State1} = process_confirms(MsgSeqNos, QPid, State),
+ record_confirms(MXs, State1).
+
+process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}) ->
+ {MXs, UMQ1, UQM1} =
lists:foldl(
- fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UC0) of
- none -> Acc;
- {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State)
+ fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UMQ0) of
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc,
+ State);
+ none -> Acc
end
- end, {[], UC}, MsgSeqNos),
- record_confirms(MXs, State#ch{unconfirmed = UC1}).
+ end, {[], UMQ, UQM}, MsgSeqNos),
+ {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) ->
- Qs1 = sets:del_element(QPid, Qs),
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) ->
%% these confirms will be emitted even when a queue dies, but that
%% should be fine, since the queue stats get erased immediately
maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
- case sets:size(Qs1) of
- 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)}
+ UQM1 = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
+ case gb_sets:is_empty(MsgSeqNos1) of
+ true -> gb_trees:delete(QPid, UQM);
+ false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
+ end;
+ none ->
+ UQM
+ end,
+ Qs1 = gb_sets:del_element(QPid, Qs),
+ case gb_sets:is_empty(Qs1) of
+ true ->
+ {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
+ false ->
+ {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -1250,10 +1268,21 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed = UC} = State,
- [maybe_monitor(QPid) || QPid <- QPids],
- UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC),
- State#ch{unconfirmed = UC1}.
+ #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
+ UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
+ SingletonSet = gb_sets:singleton(MsgSeqNo),
+ UQM1 = lists:foldl(
+ fun (QPid, UQM2) ->
+ maybe_monitor(QPid),
+ case gb_trees:lookup(QPid, UQM2) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
+ gb_trees:update(QPid, MsgSeqNos1, UQM2);
+ none ->
+ gb_trees:insert(QPid, SingletonSet, UQM2)
+ end
+ end, UQM, QPids),
+ State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1289,11 +1318,11 @@ send_confirms(Cs, State) ->
end, State).
coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case gb_trees:is_empty(UC) of
+ CutOff = case gb_trees:is_empty(UMQ) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
+ false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
@@ -1320,8 +1349,8 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
- gb_trees:size(UC);
+i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
+ gb_trees:size(UMQ);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index d21cfdb7..90058194 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -33,9 +33,10 @@
-type(start_link_args() ::
{'tcp', rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()} |
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid()} |
{'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid()}).
+ rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -44,7 +45,7 @@
%%----------------------------------------------------------------------------
start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
- Collector}) ->
+ Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
supervisor2:start_child(
@@ -56,19 +57,21 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, WriterPid, User, VHost,
+ [Channel, ReaderPid, WriterPid, User, VHost, Capabilities,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
-start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) ->
+start_link({direct, Channel, ClientChannelPid, User, VHost, Capabilities,
+ Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid,
- User, VHost, Collector, start_limiter_fun(SupPid)]},
+ [Channel, ClientChannelPid, ClientChannelPid, User,
+ VHost, Capabilities, Collector,
+ start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 80483097..3a18950f 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -44,22 +44,18 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
- FullCommand = init:get_plain_arguments(),
- case FullCommand of
- [] -> usage();
- _ -> ok
- end,
{[Command0 | Args], Opts} =
- rabbit_misc:get_options(
- [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr},
- {option, ?VHOST_OPT, "/"}],
- FullCommand),
- Opts1 = lists:map(fun({K, V}) ->
- case K of
- ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
- _ -> {K, V}
- end
- end, Opts),
+ case rabbit_misc:get_options([{flag, ?QUIET_OPT},
+ {option, ?NODE_OPT, NodeStr},
+ {option, ?VHOST_OPT, "/"}],
+ init:get_plain_arguments()) of
+ {[], _Opts} -> usage();
+ CmdArgsAndOpts -> CmdArgsAndOpts
+ end,
+ Opts1 = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts],
Command = list_to_atom(Command0),
Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
Node = proplists:get_value(?NODE_OPT, Opts1),
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 3b8c9fba..5c89bf49 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, connect/3, start_channel/5]).
+-export([boot/0, connect/4, start_channel/6]).
-include("rabbit.hrl").
@@ -25,12 +25,13 @@
-ifdef(use_specs).
-spec(boot/0 :: () -> 'ok').
--spec(connect/3 :: (binary(), binary(), binary()) ->
+-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
--spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()) ->
- {'ok', pid()}).
+-spec(start_channel/6 ::
+ (rabbit_channel:channel_number(), pid(), rabbit_types:user(),
+ rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) ->
+ {'ok', pid()}).
-endif.
@@ -49,13 +50,14 @@ boot() ->
%%----------------------------------------------------------------------------
-connect(Username, Password, VHost) ->
+connect(Username, Password, VHost, Protocol) ->
case lists:keymember(rabbit, 1, application:which_applications()) of
true ->
try rabbit_access_control:user_pass_login(Username, Password) of
#user{} = User ->
try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> {ok, {User, rabbit_reader:server_properties()}}
+ ok -> {ok, {User,
+ rabbit_reader:server_properties(Protocol)}}
catch
exit:#amqp_error{name = access_refused} ->
{error, access_refused}
@@ -67,9 +69,10 @@ connect(Username, Password, VHost) ->
{error, broker_not_found_on_node}
end.
-start_channel(Number, ClientChannelPid, User, VHost, Collector) ->
+start_channel(Number, ClientChannelPid, User, VHost, Capabilities, Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, User, VHost, Collector}]),
+ [{direct, Number, ClientChannelPid, User, VHost, Capabilities,
+ Collector}]),
{ok, ChannelPid}.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 86ea7282..1b72dd76 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -65,7 +65,7 @@ start_link(ChPid, UnackedMsgCount) ->
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:call(LimiterPid, {limit, PrefetchCount}).
+ gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 7d916797..abc27c5f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,6 +56,7 @@
-export([lock_file/1]).
-export([const_ok/1, const/1]).
-export([ntoa/1, ntoab/1]).
+-export([is_process_alive/1]).
%%----------------------------------------------------------------------------
@@ -194,6 +195,7 @@
-spec(const/1 :: (A) -> const(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
+-spec(is_process_alive/1 :: (pid()) -> boolean()).
-endif.
@@ -350,8 +352,11 @@ throw_on_error(E, Thunk) ->
with_exit_handler(Handler, Thunk) ->
try
Thunk()
- catch exit:{R, _} when R =:= noproc; R =:= nodedown;
- R =:= normal; R =:= shutdown ->
+ catch
+ exit:{R, _} when R =:= noproc; R =:= nodedown;
+ R =:= normal; R =:= shutdown ->
+ Handler();
+ exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
Handler()
end.
@@ -858,3 +863,12 @@ ntoab(IP) ->
0 -> Str;
_ -> "[" ++ Str ++ "]"
end.
+
+is_process_alive(Pid) when node(Pid) =:= node() ->
+ erlang:is_process_alive(Pid);
+is_process_alive(Pid) ->
+ case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of
+ true -> true;
+ _ -> false
+ end.
+
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e9c356e1..7f3cf35f 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,7 +33,7 @@
-include("rabbit_msg_store.hrl").
--define(SYNC_INTERVAL, 5). %% milliseconds
+-define(SYNC_INTERVAL, 25). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 283d25c7..36f61628 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -32,16 +32,6 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
--define(RABBIT_TCP_OPTS, [
- binary,
- {packet, raw}, % no packaging
- {reuseaddr, true}, % allow rebind without waiting
- {backlog, 128}, % use the maximum listen(2) backlog value
- %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
- %% {delay_send, true},
- {exit_on_close, false}
- ]).
-
-define(SSL_TIMEOUT, 5). %% seconds
-define(FIRST_TEST_BIND_PORT, 10000).
@@ -200,7 +190,7 @@ start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
rabbit_sup,
{Name,
{tcp_listener_sup, start_link,
- [IPAddress, Port, [Family | ?RABBIT_TCP_OPTS],
+ [IPAddress, Port, [Family | tcp_opts()],
{?MODULE, tcp_listener_started, [Protocol]},
{?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]},
@@ -315,6 +305,10 @@ hostname() ->
cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
+tcp_opts() ->
+ {ok, Opts} = application:get_env(rabbit, tcp_listen_options),
+ Opts.
+
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index e4bc1cdc..817abaa2 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -22,14 +22,41 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-export([notify_cluster/0, rabbit_running_on/1]).
-define(SERVER, ?MODULE).
+-define(RABBIT_UP_RPC_TIMEOUT, 2000).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(rabbit_running_on/1 :: (node()) -> 'ok').
+-spec(notify_cluster/0 :: () -> 'ok').
+
+-endif.
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+rabbit_running_on(Node) ->
+ gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}).
+
+notify_cluster() ->
+ Node = node(),
+ Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
+ %% notify other rabbits of this rabbit
+ case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
+ [Node], ?RABBIT_UP_RPC_TIMEOUT) of
+ {_, [] } -> ok;
+ {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
+ end,
+ %% register other active rabbits with this rabbit
+ [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ],
+ ok.
+
%%--------------------------------------------------------------------
init([]) ->
@@ -39,19 +66,20 @@ init([]) ->
handle_call(_Request, _From, State) ->
{noreply, State}.
+handle_cast({rabbit_running_on, Node}, State) ->
+ rabbit_log:info("node ~p up~n", [Node]),
+ erlang:monitor(process, {rabbit, Node}),
+ {noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({nodeup, Node}, State) ->
- rabbit_log:info("node ~p up", [Node]),
- {noreply, State};
handle_info({nodedown, Node}, State) ->
- rabbit_log:info("node ~p down", [Node]),
- %% TODO: This may turn out to be a performance hog when there are
- %% lots of nodes. We really only need to execute this code on
- %% *one* node, rather than all of them.
- ok = rabbit_networking:on_node_down(Node),
- ok = rabbit_amqqueue:on_node_down(Node),
+ rabbit_log:info("node ~p down~n", [Node]),
+ ok = handle_dead_rabbit(Node),
+ {noreply, State};
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
+ rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
+ ok = handle_dead_rabbit(Node),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -64,3 +92,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
+%% TODO: This may turn out to be a performance hog when there are
+%% lots of nodes. We really only need to execute this code on
+%% *one* node, rather than all of them.
+handle_dead_rabbit(Node) ->
+ ok = rabbit_networking:on_node_down(Node),
+ ok = rabbit_amqqueue:on_node_down(Node).
+
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b5d82ac2..e9ff97f9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -24,7 +24,7 @@
-export([init/4, mainloop/2]).
--export([conserve_memory/2, server_properties/0]).
+-export([conserve_memory/2, server_properties/1]).
-export([process_channel_frame/5]). %% used by erlang-client
@@ -160,7 +160,8 @@
-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
--spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
+-spec(server_properties/1 :: (rabbit_types:protocol()) ->
+ rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
@@ -219,7 +220,7 @@ conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
-server_properties() ->
+server_properties(Protocol) ->
{ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -230,22 +231,30 @@ server_properties() ->
%% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms
%% from the config and merge them with the generated built-in properties
NormalizedConfigServerProps =
- [case X of
- {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
- longstr,
- list_to_binary(Value)};
- {BinKey, Type, Value} -> {BinKey, Type, Value}
- end || X <- RawConfigServerProps ++
- [{product, Product},
- {version, Version},
- {platform, "Erlang/OTP"},
- {copyright, ?COPYRIGHT_MESSAGE},
- {information, ?INFORMATION_MESSAGE}]],
+ [{<<"capabilities">>, table, server_capabilities(Protocol)} |
+ [case X of
+ {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
+ longstr,
+ list_to_binary(Value)};
+ {BinKey, Type, Value} -> {BinKey, Type, Value}
+ end || X <- RawConfigServerProps ++
+ [{product, Product},
+ {version, Version},
+ {platform, "Erlang/OTP"},
+ {copyright, ?COPYRIGHT_MESSAGE},
+ {information, ?INFORMATION_MESSAGE}]]],
%% Filter duplicated properties in favor of config file provided values
lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
NormalizedConfigServerProps).
+server_capabilities(rabbit_framing_amqp_0_9_1) ->
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true}];
+server_capabilities(_) ->
+ [].
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
@@ -357,7 +366,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
throw({handshake_timeout, State#v1.callback})
end;
timeout ->
- throw({timeout, State#v1.connection_state});
+ case State#v1.connection_state of
+ closed -> mainloop(Deb, State);
+ S -> throw({timeout, S})
+ end;
{'$gen_call', From, {shutdown, Explanation}} ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
@@ -652,7 +664,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
- server_properties = server_properties(),
+ server_properties = server_properties(Protocol),
mechanisms = auth_mechanisms_binary(),
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
@@ -706,12 +718,18 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
connection = Connection,
sock = Sock}) ->
AuthMechanism = auth_mechanism_to_module(Mechanism),
+ Capabilities =
+ case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
+ {table, Capabilities1} -> Capabilities1;
+ _ -> []
+ end,
State = State0#v1{auth_mechanism = AuthMechanism,
auth_state = AuthMechanism:init(Sock),
connection_state = securing,
connection =
Connection#connection{
- client_properties = ClientProperties}},
+ client_properties = ClientProperties,
+ capabilities = Capabilities}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -922,10 +940,14 @@ socket_info(Get, Select) ->
end.
ssl_info(F, Sock) ->
+ %% The first ok form is R14
+ %% The second is R13 - the extra term is exportability (by inspection,
+ %% the docs are wrong)
case rabbit_net:ssl_info(Sock) of
- nossl -> '';
- {error, _} -> '';
- {ok, Info} -> F(Info)
+ nossl -> '';
+ {error, _} -> '';
+ {ok, {P, {K, C, H}}} -> F({P, {K, C, H}});
+ {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}})
end.
cert_info(F, Sock) ->
@@ -940,14 +962,15 @@ cert_info(F, Sock) ->
send_to_new_channel(Channel, AnalyzedFrame, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost}} = State,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User,
- VHost, Collector}),
+ VHost, Capabilities, Collector}),
erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 795413aa..9821ae7b 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -48,7 +48,7 @@ start_link() ->
%%---------------------------------------------------------------------------
register(Class, TypeName, ModuleName) ->
- gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}).
+ gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity).
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 49b09508..2015170a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -26,6 +26,7 @@
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
test_content_prop_roundtrip(Datum, Binary) ->
Types = [element(1, E) || E <- Datum],
@@ -80,20 +81,24 @@ run_cluster_dependent_tests(SecondaryNode) ->
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
+ passed = test_queue_cleanup(SecondaryNode),
+ passed = test_declare_on_dead_queue(SecondaryNode),
%% we now run the tests remotely, so that code coverage on the
%% local node picks up more of the delegate
Node = node(),
Self = self(),
Remote = spawn(SecondaryNode,
- fun () -> A = test_delegates_async(Node),
- B = test_delegates_sync(Node),
- Self ! {self(), {A, B}}
+ fun () -> Rs = [ test_delegates_async(Node),
+ test_delegates_sync(Node),
+ test_queue_cleanup(Node),
+ test_declare_on_dead_queue(Node) ],
+ Self ! {self(), Rs}
end),
receive
{Remote, Result} ->
- Result = {passed, passed}
- after 2000 ->
+ Result = lists:duplicate(length(Result), passed)
+ after 30000 ->
throw(timeout)
end,
@@ -1015,7 +1020,7 @@ test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
{ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- user(<<"user">>), <<"/">>, self(),
+ user(<<"user">>), <<"/">>, [], self(),
fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
@@ -1074,8 +1079,8 @@ test_server_status() ->
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
- user(<<"guest">>), <<"/">>, self(),
+ {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, user(<<"guest">>),
+ <<"/">>, [], self(),
fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
@@ -1228,7 +1233,7 @@ must_exit(Fun) ->
end.
test_delegates_sync(SecondaryNode) ->
- Sender = fun (Pid) -> gen_server:call(Pid, invoked) end,
+ Sender = fun (Pid) -> gen_server:call(Pid, invoked, infinity) end,
BadSender = fun (_Pid) -> exit(exception) end,
Responder = make_responder(fun ({'$gen_call', From, invoked}) ->
@@ -1278,6 +1283,61 @@ test_delegates_sync(SecondaryNode) ->
passed.
+test_queue_cleanup_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ Pid ! Method,
+ test_queue_cleanup_receiver(Pid)
+ end.
+
+
+test_queue_cleanup(_SecondaryNode) ->
+ {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1),
+ rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }),
+ receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} ->
+ ok
+ after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ end,
+ rabbit:stop(),
+ rabbit:start(),
+ rabbit_channel:do(Ch, #'queue.declare'{ passive = true,
+ queue = ?CLEANUP_QUEUE_NAME }),
+ receive
+ {channel_exit, 1, {amqp_error, not_found, _, _}} ->
+ ok
+ after 2000 ->
+ throw(failed_to_receive_channel_exit)
+ end,
+ passed.
+
+test_declare_on_dead_queue(SecondaryNode) ->
+ QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME),
+ Self = self(),
+ Pid = spawn(SecondaryNode,
+ fun () ->
+ {new, #amqqueue{name = QueueName, pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ exit(QPid, kill),
+ Self ! {self(), killed, QPid}
+ end),
+ receive
+ {Pid, killed, QPid} ->
+ {existing, #amqqueue{name = QueueName,
+ pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [], none),
+ false = rabbit_misc:is_process_alive(QPid),
+ {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ {ok, 0} = rabbit_amqqueue:delete(Q, false, false),
+ passed
+ after 2000 ->
+ throw(failed_to_create_and_kill_queue)
+ end.
+
%---------------------------------------------------------------------
control_action(Command, Args) ->
@@ -2141,9 +2201,11 @@ test_configurable_server_properties() ->
BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>,
<<"copyright">>, <<"information">>],
+ Protocol = rabbit_framing_amqp_0_9_1,
+
%% Verify that the built-in properties are initially present
- ActualPropNames = [Key ||
- {Key, longstr, _} <- rabbit_reader:server_properties()],
+ ActualPropNames = [Key || {Key, longstr, _} <-
+ rabbit_reader:server_properties(Protocol)],
true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end,
BuiltInPropNames),
@@ -2154,9 +2216,10 @@ test_configurable_server_properties() ->
ConsProp = fun (X) -> application:set_env(rabbit,
server_properties,
[X | ServerProperties]) end,
- IsPropPresent = fun (X) -> lists:member(X,
- rabbit_reader:server_properties())
- end,
+ IsPropPresent =
+ fun (X) ->
+ lists:member(X, rabbit_reader:server_properties(Protocol))
+ end,
%% Add a wholly new property of the simplified {KeyAtom, StringValue} form
NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"},
@@ -2179,7 +2242,7 @@ test_configurable_server_properties() ->
{BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)),
list_to_binary(NewVerVal)},
ConsProp(NewVersion),
- ClobberedServerProps = rabbit_reader:server_properties(),
+ ClobberedServerProps = rabbit_reader:server_properties(Protocol),
%% Is the clobbering insert present?
true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}),
%% Is the clobbering insert the only thing with the clobbering key?