diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-02-18 17:29:33 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-02-18 17:29:33 +0000 |
commit | 9d5d8bada28a281edd9c2b11241b091aca4357d0 (patch) | |
tree | 49d03b4c1a44046c43136efc546c31781a844242 | |
parent | b15c279b847aaebe2e43b0af31cd8f69e00e6ab4 (diff) | |
parent | a25d3cb4189ce9b832aeb8397e743f0627fdbe21 (diff) | |
download | rabbitmq-server-9d5d8bada28a281edd9c2b11241b091aca4357d0.tar.gz |
Merge bug23553 (Present managed socket count in mgmt plugin)
37 files changed, 1043 insertions, 255 deletions
@@ -25,6 +25,9 @@ syntax: regexp ^packaging/macports/macports$ ^packaging/generic-unix/rabbitmq-server-generic-unix-.*\.tar\.gz$ ^packaging/windows/rabbitmq-server-windows-.*\.zip$ +^packaging/windows-exe/rabbitmq_server-.*$ +^packaging/windows-exe/rabbitmq-.*\.nsi$ +^packaging/windows-exe/rabbitmq-server-.*\.exe$ ^docs/.*\.[15]\.gz$ ^docs/.*\.man\.xml$ @@ -41,14 +41,12 @@ RABBIT_PLT=rabbit.plt ifndef USE_SPECS # our type specs rely on features and bug fixes in dialyzer that are -# only available in R14A upwards (R13B04 is erts 5.7.5) -# -# NB: the test assumes that version number will only contain single digits -USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.5" ]; then echo "true"; else echo "false"; fi) +# only available in R14A upwards (R14A is erts 5.8) +USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8]), halt().') endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests -ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(shell [ $(USE_SPECS) = "true" ] && echo "-Duse_specs") +ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(if $(filter true,$(USE_SPECS)),-Duse_specs) VERSION=0.0.0 TARBALL_NAME=rabbitmq-server-$(VERSION) diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq-env.conf.5.xml index 31de7164..4c7340c2 100644 --- a/docs/rabbitmq.conf.5.xml +++ b/docs/rabbitmq-env.conf.5.xml @@ -9,20 +9,20 @@ </refentryinfo> <refmeta> - <refentrytitle>rabbitmq.conf</refentrytitle> + <refentrytitle>rabbitmq-env.conf</refentrytitle> <manvolnum>5</manvolnum> <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> </refmeta> <refnamediv> - <refname>rabbitmq.conf</refname> + <refname>rabbitmq-env.conf</refname> <refpurpose>default settings for RabbitMQ AMQP server</refpurpose> </refnamediv> <refsect1> <title>Description</title> <para> -<filename>/etc/rabbitmq/rabbitmq.conf</filename> contains variable settings that override the +<filename>/etc/rabbitmq/rabbitmq-env.conf</filename> contains variable settings that override the defaults built in to the RabbitMQ startup scripts. </para> <para> @@ -33,7 +33,7 @@ operator), including line comments starting with "#". </para> <para> In order of preference, the startup scripts get their values from the -environment, from <filename>/etc/rabbitmq/rabbitmq.conf</filename> and finally from the +environment, from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> and finally from the built-in default values. For example, for the <envar>RABBITMQ_NODENAME</envar> setting, </para> @@ -48,26 +48,26 @@ empty string, then <envar>NODENAME</envar> </para> <para> -from <filename>/etc/rabbitmq/rabbitmq.conf</filename> is checked. If it is also absent +from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> is checked. If it is also absent or set equal to the empty string then the default value from the startup script is used. </para> <para> -The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the +The variable names in /etc/rabbitmq/rabbitmq-env.conf are always equal to the environment variable names, with the <envar>RABBITMQ_</envar> prefix removed: <envar>RABBITMQ_NODE_PORT</envar> from the environment becomes <envar>NODE_PORT</envar> in the -<filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc. +<filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file, etc. </para> <para role="example-prefix">For example:</para> <screen role="example-multiline"> -# I am a complete /etc/rabbitmq/rabbitmq.conf file. +# I am a complete /etc/rabbitmq/rabbitmq-env.conf file. # Comment lines start with a hash character. # This is a /bin/sh script file - use ordinary envt var syntax NODENAME=hare </screen> <para role="example"> This is an example of a complete - <filename>/etc/rabbitmq/rabbitmq.conf</filename> file that overrides the default Erlang + <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file that overrides the default Erlang node name from "rabbit" to "hare". </para> diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml index 6586890a..5f5c6c2f 100644 --- a/docs/rabbitmq-multi.1.xml +++ b/docs/rabbitmq-multi.1.xml @@ -92,7 +92,7 @@ Rotate log files for all local and running RabbitMQ nodes. <refsect1> <title>See also</title> <para> - <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> </para> diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml index f161a291..a0458c93 100644 --- a/docs/rabbitmq-server.1.xml +++ b/docs/rabbitmq-server.1.xml @@ -124,7 +124,7 @@ Defaults to 5672. <refsect1> <title>See also</title> <para> - <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> </para> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index cc7221d6..f837684c 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -34,4 +34,11 @@ {collect_statistics, none}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, - {delegate_count, 16}]}]}. + {delegate_count, 16}, + {tcp_listen_options, [binary, + {packet, raw}, + {reuseaddr, true}, + {backlog, 128}, + {nodelay, true}, + {exit_on_close, false}]} + ]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 15f5d7c5..24d0f961 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -28,7 +28,7 @@ -record(vhost, {virtual_host, dummy}). -record(connection, {protocol, user, timeout_sec, frame_max, vhost, - client_properties}). + client_properties, capabilities}). -record(content, {class_id, diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 47316864..5d573bde 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -92,6 +92,9 @@ fi %post /sbin/chkconfig --add %{name} +if [ -f %{_sysconfdir}/rabbitmq/rabbitmq.conf ] && [ ! -f %{_sysconfdir}/rabbitmq/rabbitmq-env.conf ]; then + mv %{_sysconfdir}/rabbitmq/rabbitmq.conf %{_sysconfdir}/rabbitmq/rabbitmq-env.conf +fi %preun if [ $1 = 0 ]; then diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index 134f16ee..b11340ef 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -35,6 +35,10 @@ chown -R rabbitmq:rabbitmq /var/log/rabbitmq case "$1" in configure) + if [ -f /etc/rabbitmq/rabbitmq.conf ] && \ + [ ! -f /etc/rabbitmq/rabbitmq-env.conf ]; then + mv /etc/rabbitmq/rabbitmq.conf /etc/rabbitmq/rabbitmq-env.conf + fi ;; abort-upgrade|abort-remove|abort-deconfigure) diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index f8417b83..862a0d1a 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -81,7 +81,7 @@ post-destroot { xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${serverhome} xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir} - reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ + reinplace -E "s:(/etc/rabbitmq/rabbitmq):${prefix}\\1:g" \ ${realsbin}/rabbitmq-env foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} { reinplace -E "s:^($var)=/:\\1=${prefix}/:" \ @@ -102,10 +102,10 @@ post-destroot { file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl - file copy ${mansrc}/man1/rabbitmq-multi.1.gz ${mandest}/man1/ - file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/ - file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/ - file copy ${mansrc}/man5/rabbitmq.conf.5.gz ${mandest}/man5/ + file copy ${mansrc}/man1/rabbitmq-multi.1.gz ${mandest}/man1/ + file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/ + file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/ + file copy ${mansrc}/man5/rabbitmq-env.conf.5.gz ${mandest}/man5/ } pre-install { diff --git a/packaging/windows-exe/Makefile b/packaging/windows-exe/Makefile new file mode 100644 index 00000000..59803f9c --- /dev/null +++ b/packaging/windows-exe/Makefile @@ -0,0 +1,16 @@ +VERSION=0.0.0 +ZIP=../windows/rabbitmq-server-windows-$(VERSION) + +dist: rabbitmq-$(VERSION).nsi rabbitmq_server-$(VERSION) + makensis rabbitmq-$(VERSION).nsi + +rabbitmq-$(VERSION).nsi: rabbitmq_nsi.in + sed \ + -e 's|%%VERSION%%|$(VERSION)|' \ + $< > $@ + +rabbitmq_server-$(VERSION): + unzip $(ZIP) + +clean: + rm -rf rabbitmq-*.nsi rabbitmq_server-* rabbitmq-server-*.exe diff --git a/packaging/windows-exe/lib/EnvVarUpdate.nsh b/packaging/windows-exe/lib/EnvVarUpdate.nsh new file mode 100644 index 00000000..839d6a02 --- /dev/null +++ b/packaging/windows-exe/lib/EnvVarUpdate.nsh @@ -0,0 +1,327 @@ +/**
+ * EnvVarUpdate.nsh
+ * : Environmental Variables: append, prepend, and remove entries
+ *
+ * WARNING: If you use StrFunc.nsh header then include it before this file
+ * with all required definitions. This is to avoid conflicts
+ *
+ * Usage:
+ * ${EnvVarUpdate} "ResultVar" "EnvVarName" "Action" "RegLoc" "PathString"
+ *
+ * Credits:
+ * Version 1.0
+ * * Cal Turney (turnec2)
+ * * Amir Szekely (KiCHiK) and e-circ for developing the forerunners of this
+ * function: AddToPath, un.RemoveFromPath, AddToEnvVar, un.RemoveFromEnvVar,
+ * WriteEnvStr, and un.DeleteEnvStr
+ * * Diego Pedroso (deguix) for StrTok
+ * * Kevin English (kenglish_hi) for StrContains
+ * * Hendri Adriaens (Smile2Me), Diego Pedroso (deguix), and Dan Fuhry
+ * (dandaman32) for StrReplace
+ *
+ * Version 1.1 (compatibility with StrFunc.nsh)
+ * * techtonik
+ *
+ * http://nsis.sourceforge.net/Environmental_Variables:_append%2C_prepend%2C_and_remove_entries
+ *
+ */
+
+
+!ifndef ENVVARUPDATE_FUNCTION
+!define ENVVARUPDATE_FUNCTION
+!verbose push
+!verbose 3
+!include "LogicLib.nsh"
+!include "WinMessages.NSH"
+!include "StrFunc.nsh"
+
+; ---- Fix for conflict if StrFunc.nsh is already includes in main file -----------------------
+!macro _IncludeStrFunction StrFuncName
+ !ifndef ${StrFuncName}_INCLUDED
+ ${${StrFuncName}}
+ !endif
+ !ifndef Un${StrFuncName}_INCLUDED
+ ${Un${StrFuncName}}
+ !endif
+ !define un.${StrFuncName} "${Un${StrFuncName}}"
+!macroend
+
+!insertmacro _IncludeStrFunction StrTok
+!insertmacro _IncludeStrFunction StrStr
+!insertmacro _IncludeStrFunction StrRep
+
+; ---------------------------------- Macro Definitions ----------------------------------------
+!macro _EnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
+ Push "${EnvVarName}"
+ Push "${Action}"
+ Push "${RegLoc}"
+ Push "${PathString}"
+ Call EnvVarUpdate
+ Pop "${ResultVar}"
+!macroend
+!define EnvVarUpdate '!insertmacro "_EnvVarUpdateConstructor"'
+
+!macro _unEnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
+ Push "${EnvVarName}"
+ Push "${Action}"
+ Push "${RegLoc}"
+ Push "${PathString}"
+ Call un.EnvVarUpdate
+ Pop "${ResultVar}"
+!macroend
+!define un.EnvVarUpdate '!insertmacro "_unEnvVarUpdateConstructor"'
+; ---------------------------------- Macro Definitions end-------------------------------------
+
+;----------------------------------- EnvVarUpdate start----------------------------------------
+!define hklm_all_users 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
+!define hkcu_current_user 'HKCU "Environment"'
+
+!macro EnvVarUpdate UN
+
+Function ${UN}EnvVarUpdate
+
+ Push $0
+ Exch 4
+ Exch $1
+ Exch 3
+ Exch $2
+ Exch 2
+ Exch $3
+ Exch
+ Exch $4
+ Push $5
+ Push $6
+ Push $7
+ Push $8
+ Push $9
+ Push $R0
+
+ /* After this point:
+ -------------------------
+ $0 = ResultVar (returned)
+ $1 = EnvVarName (input)
+ $2 = Action (input)
+ $3 = RegLoc (input)
+ $4 = PathString (input)
+ $5 = Orig EnvVar (read from registry)
+ $6 = Len of $0 (temp)
+ $7 = tempstr1 (temp)
+ $8 = Entry counter (temp)
+ $9 = tempstr2 (temp)
+ $R0 = tempChar (temp) */
+
+ ; Step 1: Read contents of EnvVarName from RegLoc
+ ;
+ ; Check for empty EnvVarName
+ ${If} $1 == ""
+ SetErrors
+ DetailPrint "ERROR: EnvVarName is blank"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Check for valid Action
+ ${If} $2 != "A"
+ ${AndIf} $2 != "P"
+ ${AndIf} $2 != "R"
+ SetErrors
+ DetailPrint "ERROR: Invalid Action - must be A, P, or R"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ${If} $3 == HKLM
+ ReadRegStr $5 ${hklm_all_users} $1 ; Get EnvVarName from all users into $5
+ ${ElseIf} $3 == HKCU
+ ReadRegStr $5 ${hkcu_current_user} $1 ; Read EnvVarName from current user into $5
+ ${Else}
+ SetErrors
+ DetailPrint 'ERROR: Action is [$3] but must be "HKLM" or HKCU"'
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Check for empty PathString
+ ${If} $4 == ""
+ SetErrors
+ DetailPrint "ERROR: PathString is blank"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Make sure we've got some work to do
+ ${If} $5 == ""
+ ${AndIf} $2 == "R"
+ SetErrors
+ DetailPrint "$1 is empty - Nothing to remove"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Step 2: Scrub EnvVar
+ ;
+ StrCpy $0 $5 ; Copy the contents to $0
+ ; Remove spaces around semicolons (NOTE: spaces before the 1st entry or
+ ; after the last one are not removed here but instead in Step 3)
+ ${If} $0 != "" ; If EnvVar is not empty ...
+ ${Do}
+ ${${UN}StrStr} $7 $0 " ;"
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 " ;" ";" ; Remove '<space>;'
+ ${Loop}
+ ${Do}
+ ${${UN}StrStr} $7 $0 "; "
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 "; " ";" ; Remove ';<space>'
+ ${Loop}
+ ${Do}
+ ${${UN}StrStr} $7 $0 ";;"
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 ";;" ";"
+ ${Loop}
+
+ ; Remove a leading or trailing semicolon from EnvVar
+ StrCpy $7 $0 1 0
+ ${If} $7 == ";"
+ StrCpy $0 $0 "" 1 ; Change ';<EnvVar>' to '<EnvVar>'
+ ${EndIf}
+ StrLen $6 $0
+ IntOp $6 $6 - 1
+ StrCpy $7 $0 1 $6
+ ${If} $7 == ";"
+ StrCpy $0 $0 $6 ; Change ';<EnvVar>' to '<EnvVar>'
+ ${EndIf}
+ ; DetailPrint "Scrubbed $1: [$0]" ; Uncomment to debug
+ ${EndIf}
+
+ /* Step 3. Remove all instances of the target path/string (even if "A" or "P")
+ $6 = bool flag (1 = found and removed PathString)
+ $7 = a string (e.g. path) delimited by semicolon(s)
+ $8 = entry counter starting at 0
+ $9 = copy of $0
+ $R0 = tempChar */
+
+ ${If} $5 != "" ; If EnvVar is not empty ...
+ StrCpy $9 $0
+ StrCpy $0 ""
+ StrCpy $8 0
+ StrCpy $6 0
+
+ ${Do}
+ ${${UN}StrTok} $7 $9 ";" $8 "0" ; $7 = next entry, $8 = entry counter
+
+ ${If} $7 == "" ; If we've run out of entries,
+ ${ExitDo} ; were done
+ ${EndIf} ;
+
+ ; Remove leading and trailing spaces from this entry (critical step for Action=Remove)
+ ${Do}
+ StrCpy $R0 $7 1
+ ${If} $R0 != " "
+ ${ExitDo}
+ ${EndIf}
+ StrCpy $7 $7 "" 1 ; Remove leading space
+ ${Loop}
+ ${Do}
+ StrCpy $R0 $7 1 -1
+ ${If} $R0 != " "
+ ${ExitDo}
+ ${EndIf}
+ StrCpy $7 $7 -1 ; Remove trailing space
+ ${Loop}
+ ${If} $7 == $4 ; If string matches, remove it by not appending it
+ StrCpy $6 1 ; Set 'found' flag
+ ${ElseIf} $7 != $4 ; If string does NOT match
+ ${AndIf} $0 == "" ; and the 1st string being added to $0,
+ StrCpy $0 $7 ; copy it to $0 without a prepended semicolon
+ ${ElseIf} $7 != $4 ; If string does NOT match
+ ${AndIf} $0 != "" ; and this is NOT the 1st string to be added to $0,
+ StrCpy $0 $0;$7 ; append path to $0 with a prepended semicolon
+ ${EndIf} ;
+
+ IntOp $8 $8 + 1 ; Bump counter
+ ${Loop} ; Check for duplicates until we run out of paths
+ ${EndIf}
+
+ ; Step 4: Perform the requested Action
+ ;
+ ${If} $2 != "R" ; If Append or Prepend
+ ${If} $6 == 1 ; And if we found the target
+ DetailPrint "Target is already present in $1. It will be removed and"
+ ${EndIf}
+ ${If} $0 == "" ; If EnvVar is (now) empty
+ StrCpy $0 $4 ; just copy PathString to EnvVar
+ ${If} $6 == 0 ; If found flag is either 0
+ ${OrIf} $6 == "" ; or blank (if EnvVarName is empty)
+ DetailPrint "$1 was empty and has been updated with the target"
+ ${EndIf}
+ ${ElseIf} $2 == "A" ; If Append (and EnvVar is not empty),
+ StrCpy $0 $0;$4 ; append PathString
+ ${If} $6 == 1
+ DetailPrint "appended to $1"
+ ${Else}
+ DetailPrint "Target was appended to $1"
+ ${EndIf}
+ ${Else} ; If Prepend (and EnvVar is not empty),
+ StrCpy $0 $4;$0 ; prepend PathString
+ ${If} $6 == 1
+ DetailPrint "prepended to $1"
+ ${Else}
+ DetailPrint "Target was prepended to $1"
+ ${EndIf}
+ ${EndIf}
+ ${Else} ; If Action = Remove
+ ${If} $6 == 1 ; and we found the target
+ DetailPrint "Target was found and removed from $1"
+ ${Else}
+ DetailPrint "Target was NOT found in $1 (nothing to remove)"
+ ${EndIf}
+ ${If} $0 == ""
+ DetailPrint "$1 is now empty"
+ ${EndIf}
+ ${EndIf}
+
+ ; Step 5: Update the registry at RegLoc with the updated EnvVar and announce the change
+ ;
+ ClearErrors
+ ${If} $3 == HKLM
+ WriteRegExpandStr ${hklm_all_users} $1 $0 ; Write it in all users section
+ ${ElseIf} $3 == HKCU
+ WriteRegExpandStr ${hkcu_current_user} $1 $0 ; Write it to current user section
+ ${EndIf}
+
+ IfErrors 0 +4
+ MessageBox MB_OK|MB_ICONEXCLAMATION "Could not write updated $1 to $3"
+ DetailPrint "Could not write updated $1 to $3"
+ Goto EnvVarUpdate_Restore_Vars
+
+ ; "Export" our change
+ SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
+
+ EnvVarUpdate_Restore_Vars:
+ ;
+ ; Restore the user's variables and return ResultVar
+ Pop $R0
+ Pop $9
+ Pop $8
+ Pop $7
+ Pop $6
+ Pop $5
+ Pop $4
+ Pop $3
+ Pop $2
+ Pop $1
+ Push $0 ; Push my $0 (ResultVar)
+ Exch
+ Pop $0 ; Restore his $0
+
+FunctionEnd
+
+!macroend ; EnvVarUpdate UN
+!insertmacro EnvVarUpdate ""
+!insertmacro EnvVarUpdate "un."
+;----------------------------------- EnvVarUpdate end----------------------------------------
+
+!verbose pop
+!endif
diff --git a/packaging/windows-exe/rabbitmq.ico b/packaging/windows-exe/rabbitmq.ico Binary files differnew file mode 100644 index 00000000..5e169a79 --- /dev/null +++ b/packaging/windows-exe/rabbitmq.ico diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in new file mode 100644 index 00000000..6d79ffd4 --- /dev/null +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -0,0 +1,241 @@ +; Use the "Modern" UI +!include MUI2.nsh +!include LogicLib.nsh +!include WinMessages.nsh +!include FileFunc.nsh +!include WordFunc.nsh +!include lib\EnvVarUpdate.nsh + +!define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"' +!define uninstall "Software\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ" + +;-------------------------------- + +; The name of the installer +Name "RabbitMQ Server %%VERSION%%" + +; The file to write +OutFile "rabbitmq-server-%%VERSION%%.exe" + +; Icons +!define MUI_ICON "rabbitmq.ico" + +; The default installation directory +InstallDir "$PROGRAMFILES\RabbitMQ Server" + +; Registry key to check for directory (so if you install again, it will +; overwrite the old one automatically) +InstallDirRegKey HKLM "Software\VMware, Inc.\RabbitMQ Server" "Install_Dir" + +; Request application privileges for Windows Vista +RequestExecutionLevel admin + +SetCompressor /solid lzma + +VIProductVersion "%%VERSION%%.0" +VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductVersion" "%%VERSION%%" +VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server" +;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" "" +VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc" +;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ? +VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2011 VMware, Inc. All rights reserved." +VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server" +VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%" + +;-------------------------------- + +; Pages + + +; !insertmacro MUI_PAGE_LICENSE "..\..\LICENSE-MPL-RabbitMQ" + !insertmacro MUI_PAGE_COMPONENTS + !insertmacro MUI_PAGE_DIRECTORY + !insertmacro MUI_PAGE_INSTFILES + !insertmacro MUI_PAGE_FINISH + + !insertmacro MUI_UNPAGE_CONFIRM + !insertmacro MUI_UNPAGE_INSTFILES + !define MUI_FINISHPAGE_TEXT "RabbitMQ Server %%VERSION%% has been uninstalled from your computer.$\n$\nPlease note that the log and database directories located at $APPDATA\RabbitMQ have not been removed. You can remove them manually if desired." + !insertmacro MUI_UNPAGE_FINISH + +;-------------------------------- +;Languages + + !insertmacro MUI_LANGUAGE "English" + +;-------------------------------- + +; The stuff to install +Section "RabbitMQ Server (required)" Rabbit + + SectionIn RO + + ; Set output path to the installation directory. + SetOutPath $INSTDIR + + ; Put files there + File /r "rabbitmq_server-%%VERSION%%" + File "rabbitmq.ico" + + ; Add to PATH + ${EnvVarUpdate} $0 "PATH" "A" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin" + + ; Write the installation path into the registry + WriteRegStr HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" "Install_Dir" "$INSTDIR" + + ; Write the uninstall keys for Windows + WriteRegStr HKLM ${uninstall} "DisplayName" "RabbitMQ Server" + WriteRegStr HKLM ${uninstall} "UninstallString" "$INSTDIR\uninstall.exe" + WriteRegStr HKLM ${uninstall} "DisplayIcon" "$INSTDIR\uninstall.exe,0" + WriteRegStr HKLM ${uninstall} "Publisher" "VMware, Inc." + WriteRegStr HKLM ${uninstall} "DisplayVersion" "%%VERSION%%" + WriteRegDWORD HKLM ${uninstall} "NoModify" 1 + WriteRegDWORD HKLM ${uninstall} "NoRepair" 1 + + ${GetSize} "$INSTDIR" "/S=0K" $0 $1 $2 + IntFmt $0 "0x%08X" $0 + WriteRegDWORD HKLM "${uninstall}" "EstimatedSize" "$0" + + WriteUninstaller "uninstall.exe" +SectionEnd + +;-------------------------------- + +Section "RabbitMQ Service" RabbitService + ExpandEnvStrings $0 %COMSPEC% + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" install' + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" start' + CopyFiles "$WINDIR\.erlang.cookie" "$PROFILE\.erlang.cookie" +SectionEnd + +;-------------------------------- + +Section "Start Menu" RabbitStartMenu + ; In case the service is not installed, or the service installation fails, + ; make sure these exist or Explorer will get confused. + CreateDirectory "$APPDATA\RabbitMQ\log" + CreateDirectory "$APPDATA\RabbitMQ\db" + + CreateDirectory "$SMPROGRAMS\RabbitMQ Server" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Uninstall.lnk" "$INSTDIR\uninstall.exe" "" "$INSTDIR\uninstall.exe" 0 + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Plugins Directory.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\plugins" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Log Directory.lnk" "$APPDATA\RabbitMQ\log" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Database Directory.lnk" "$APPDATA\RabbitMQ\db" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\(Re)Install Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "install" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Remove Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "remove" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Start Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Stop Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico" + +SectionEnd + +;-------------------------------- + +; Section descriptions + +LangString DESC_Rabbit ${LANG_ENGLISH} "The RabbitMQ Server." +LangString DESC_RabbitService ${LANG_ENGLISH} "Set up RabbitMQ as a Windows Service." +LangString DESC_RabbitStartMenu ${LANG_ENGLISH} "Add some useful links to the start menu." + +!insertmacro MUI_FUNCTION_DESCRIPTION_BEGIN + !insertmacro MUI_DESCRIPTION_TEXT ${Rabbit} $(DESC_Rabbit) + !insertmacro MUI_DESCRIPTION_TEXT ${RabbitService} $(DESC_RabbitService) + !insertmacro MUI_DESCRIPTION_TEXT ${RabbitStartMenu} $(DESC_RabbitStartMenu) +!insertmacro MUI_FUNCTION_DESCRIPTION_END + +;-------------------------------- + +; Uninstaller + +Section "Uninstall" + + ; Remove registry keys + DeleteRegKey HKLM ${uninstall} + DeleteRegKey HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" + + ; TODO these will fail if the service is not installed - do we care? + ExpandEnvStrings $0 %COMSPEC% + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" stop' + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" remove' + + ; Remove from PATH + ${un.EnvVarUpdate} $0 "PATH" "R" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin" + + ; Remove files and uninstaller + RMDir /r "$INSTDIR\rabbitmq_server-%%VERSION%%" + Delete "$INSTDIR\rabbitmq.ico" + Delete "$INSTDIR\uninstall.exe" + + ; Remove start menu items + RMDir /r "$SMPROGRAMS\RabbitMQ Server" + + DeleteRegValue ${env_hklm} ERLANG_HOME + SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000 + +SectionEnd + +;-------------------------------- + +; Functions + +Function .onInit + Call findErlang + + ReadRegStr $0 HKLM ${uninstall} "UninstallString" + ${If} $0 != "" + MessageBox MB_OKCANCEL|MB_ICONEXCLAMATION "RabbitMQ is already installed. $\n$\nClick 'OK' to remove the previous version or 'Cancel' to cancel this installation." IDCANCEL norun + + ;Run the uninstaller + ClearErrors + ExecWait $INSTDIR\uninstall.exe + + norun: + Abort + ${EndIf} +FunctionEnd + +Function findErlang + + StrCpy $0 0 + StrCpy $2 "not-found" + ${Do} + EnumRegKey $1 HKLM Software\Ericsson\Erlang $0 + ${If} $1 = "" + ${Break} + ${EndIf} + ${If} $1 <> "ErlSrv" + StrCpy $2 $1 + ${EndIf} + + IntOp $0 $0 + 1 + ${Loop} + + ${If} $2 = "not-found" + MessageBox MB_YESNO|MB_ICONEXCLAMATION "Erlang could not be detected.$\nYou must install Erlang before installing RabbitMQ. Would you like the installer to open a browser window to the Erlang download site?" IDNO abort + ExecShell "open" "http://www.erlang.org/download.html" + abort: + Abort + ${Else} + ${VersionCompare} $2 "5.6.3" $0 + ${VersionCompare} $2 "5.8.1" $1 + + ${If} $0 = 2 + MessageBox MB_OK|MB_ICONEXCLAMATION "Your installed version of Erlang ($2) is too old. Please install a more recent version." + Abort + ${ElseIf} $1 = 2 + MessageBox MB_YESNO|MB_ICONEXCLAMATION "Your installed version of Erlang ($2) is comparatively old.$\nFor best results, please install a newer version.$\nDo you wish to continue?" IDYES no_abort + Abort + no_abort: + ${EndIf} + + ReadRegStr $0 HKLM "Software\Ericsson\Erlang\$2" "" + + ; See http://nsis.sourceforge.net/Setting_Environment_Variables + WriteRegExpandStr ${env_hklm} ERLANG_HOME $0 + SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000 + + ; On Windows XP changing the permanent environment does not change *our* + ; environment, so do that as well. + System::Call 'Kernel32::SetEnvironmentVariableA(t, t) i("ERLANG_HOME", "$0").r0' + ${EndIf} + +FunctionEnd
\ No newline at end of file diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index df4b24d8..3e173949 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -37,4 +37,8 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.." NODENAME=rabbit@${HOSTNAME%%.*} # Load configuration from the rabbitmq.conf file -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +if [ -f /etc/rabbitmq/rabbitmq.conf ]; then + echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- " + echo "location has moved to /etc/rabbitmq/rabbitmq-env.conf" +fi +[ -f /etc/rabbitmq/rabbitmq-env.conf ] && . /etc/rabbitmq/rabbitmq-env.conf diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 5c390a51..2f80eb96 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -16,7 +16,6 @@ ## SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ --kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" CONFIG_FILE=/etc/rabbitmq/rabbitmq LOG_BASE=/var/log/rabbitmq diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 0cfa5ea8..2ca9f2b3 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -142,7 +142,6 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( +W w ^
+A30 ^
+P 1048576 ^
--kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 43520b55..bc452fea 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -207,7 +207,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{nodelay,true}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/src/delegate.erl b/src/delegate.erl index 46bd8245..17046201 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]). +-export([start_link/1, invoke_no_result/2, invoke/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,8 +36,6 @@ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], [{pid(), term()}]}). --spec(delegate_count/1 :: ([node()]) -> non_neg_integer()). - -endif. %%---------------------------------------------------------------------------- @@ -111,22 +109,14 @@ group_pids_by_node(Pids) -> node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} end, {[], orddict:new()}, Pids). -delegate_count([RemoteNode | _]) -> - {ok, Count} = case application:get_env(rabbit, delegate_count) of - undefined -> rpc:call(RemoteNode, application, get_env, - [rabbit, delegate_count]); - Result -> Result - end, - Count. - delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). delegate(RemoteNodes) -> case get(delegate) of - undefined -> Name = - delegate_name(erlang:phash2( - self(), delegate_count(RemoteNodes))), + undefined -> Name = delegate_name( + erlang:phash2(self(), + delegate_sup:count(RemoteNodes))), put(delegate, Name), Name; Name -> Name diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index e0ffa7c8..fc693c7d 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor). --export([start_link/0]). +-export([start_link/1, count/1]). -export([init/1]). @@ -28,20 +28,32 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}). +-spec(count/1 :: ([node()]) -> integer()). -endif. %%---------------------------------------------------------------------------- -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_link(Count) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]). + +count([]) -> + 1; +count([Node | Nodes]) -> + try + length(supervisor:which_children({?SERVER, Node})) + catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> + count(Nodes); + exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown; + R =:= nodedown -> + count(Nodes) + end. %%---------------------------------------------------------------------------- -init(_Args) -> - DCount = delegate:delegate_count([node()]), +init([Count]) -> {ok, {{one_for_one, 10, 10}, [{Num, {delegate, start_link, [Num]}, transient, 16#ffffffff, worker, [delegate]} || - Num <- lists:seq(0, DCount - 1)]}}. + Num <- lists:seq(0, Count - 1)]}}. diff --git a/src/gen_server2.erl b/src/gen_server2.erl index a637dddd..94296f97 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -58,6 +58,15 @@ %% hibernate the process immediately, as it would if backoff wasn't %% being used. Instead it'll wait for the current timeout as described %% above. +%% +%% 7) The callback module can return from any of the handle_* +%% functions, a {become, Module, State} triple, or a {become, Module, +%% State, Timeout} quadruple. This allows the gen_server to +%% dynamically change the callback module. The State is the new state +%% which will be passed into any of the callback functions in the new +%% module. Note there is no form also encompassing a reply, thus if +%% you wish to reply in handle_call/3 and change the callback module, +%% you need to use gen_server2:reply/2 to issue the reply manually. %% All modifications are (C) 2009-2011 VMware, Inc. @@ -880,6 +889,22 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, loop(GS2State #gs2_state { state = NState, time = Time1, debug = Debug1 }); + {become, Mod, NState} -> + Debug1 = common_debug(Debug, fun print_event/3, Name, + {become, Mod, NState}), + loop(find_prioritisers( + GS2State #gs2_state { mod = Mod, + state = NState, + time = infinity, + debug = Debug1 })); + {become, Mod, NState, Time1} -> + Debug1 = common_debug(Debug, fun print_event/3, Name, + {become, Mod, NState}), + loop(find_prioritisers( + GS2State #gs2_state { mod = Mod, + state = NState, + time = Time1, + debug = Debug1 })); _ -> handle_common_termination(Reply, Msg, GS2State) end. diff --git a/src/pg_local.erl b/src/pg_local.erl index fd515747..c9c3a3a7 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -83,7 +83,7 @@ get_members(Name) -> sync() -> ensure_started(), - gen_server:call(?MODULE, sync). + gen_server:call(?MODULE, sync, infinity). %%% %%% Callback functions from gen_server diff --git a/src/rabbit.erl b/src/rabbit.erl index c6661d39..1beed5c1 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -27,7 +27,7 @@ %%--------------------------------------------------------------------------- %% Boot steps. --export([maybe_insert_default_data/0]). +-export([maybe_insert_default_data/0, boot_delegate/0]). -rabbit_boot_step({codec_correctness_check, [{description, "codec correctness check"}, @@ -101,8 +101,7 @@ -rabbit_boot_step({delegate_sup, [{description, "cluster delegate"}, - {mfa, {rabbit_sup, start_child, - [delegate_sup]}}, + {mfa, {rabbit, boot_delegate, []}}, {requires, kernel_ready}, {enables, core_initialized}]}). @@ -153,6 +152,11 @@ [{mfa, {rabbit_networking, boot, []}}, {requires, log_relay}]}). +-rabbit_boot_step({notify_cluster, + [{description, "notify cluster nodes"}, + {mfa, {rabbit_node_monitor, notify_cluster, []}}, + {requires, networking}]}). + %%--------------------------------------------------------------------------- -include("rabbit_framing.hrl"). @@ -179,6 +183,9 @@ {running_nodes, [node()]}]). -spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). +-spec(maybe_insert_default_data/0 :: () -> 'ok'). +-spec(boot_delegate/0 :: () -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -225,11 +232,11 @@ start(normal, []) -> case erts_version_check() of ok -> {ok, SupPid} = rabbit_sup:start_link(), + true = register(rabbit, self()), print_banner(), [ok = run_boot_step(Step) || Step <- boot_steps()], io:format("~nbroker running~n"), - {ok, SupPid}; Error -> Error @@ -448,6 +455,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler, end end. +boot_delegate() -> + {ok, Count} = application:get_env(rabbit, delegate_count), + rabbit_sup:start_child(delegate_sup, [Count]). + maybe_insert_default_data() -> case rabbit_mnesia:is_db_empty() of true -> insert_default_data(); diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a6da551d..46b78c39 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -197,7 +197,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> arguments = Args, exclusive_owner = Owner, pid = none}), - case gen_server2:call(Q#amqqueue.pid, {init, false}) of + case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 end. @@ -218,7 +218,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> rabbit_misc:const(not_found) end; [ExistingQ = #amqqueue{pid = QPid}] -> - case is_process_alive(QPid) of + case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); false -> TailFun = internal_delete(QueueName), fun (Tx) -> TailFun(Tx), ExistingQ end @@ -300,29 +300,19 @@ check_declare_arguments(QueueName, Args) -> "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) end || {Key, Fun} <- - [{<<"x-expires">>, fun check_expires_argument/1}, - {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], + [{<<"x-expires">>, fun check_integer_argument/1}, + {<<"x-message-ttl">>, fun check_integer_argument/1}]], ok. -check_expires_argument(Val) -> - check_integer_argument(Val, - expires_not_of_acceptable_type, - expires_zero_or_less). - -check_message_ttl_argument(Val) -> - check_integer_argument(Val, - ttl_not_of_acceptable_type, - ttl_zero_or_less). - -check_integer_argument(undefined, _, _) -> +check_integer_argument(undefined) -> ok; -check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 -> +check_integer_argument({Type, Val}) when Val > 0 -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; - false -> {error, {InvalidTypeError, Type, Val}} + false -> {error, {unacceptable_type, Type}} end; -check_integer_argument({_Type, _Val}, _, ZeroOrLessError) -> - {error, ZeroOrLessError}. +check_integer_argument({_Type, Val}) -> + {error, {value_zero_or_less, Val}}. list(VHostPath) -> mnesia:dirty_match_object( @@ -334,10 +324,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - delegate_call(QPid, info, infinity). + delegate_call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> - case delegate_call(QPid, {info, Items}, infinity) of + case delegate_call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -347,7 +337,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - delegate_call(QPid, consumers, infinity). + delegate_call(QPid, consumers). consumers_all(VHostPath) -> lists:append( @@ -356,7 +346,8 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> + delegate_call(QPid, stat). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). @@ -365,9 +356,9 @@ delete_immediately(#amqqueue{ pid = QPid }) -> gen_server2:cast(QPid, delete_immediately). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). + delegate_call(QPid, {delete, IfUnused, IfEmpty}). -purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). +purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). deliver(QPid, Delivery = #delivery{immediate = true}) -> gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); @@ -379,7 +370,7 @@ deliver(QPid, Delivery) -> true. requeue(QPid, MsgIds, ChPid) -> - delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). + delegate_call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}). @@ -408,20 +399,18 @@ limit_all(QPids, ChPid, LimiterPid) -> end). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity). + delegate_call(QPid, {basic_get, ChPid, NoAck}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> delegate_call(QPid, {basic_consume, NoAck, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - infinity). + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, - infinity). + ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> - delegate_cast(QPid, {notify_sent, ChPid}). + gen_server2:cast(QPid, {notify_sent, ChPid}). unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}). @@ -509,8 +498,8 @@ safe_delegate_call_ok(F, Pids) -> {_, Bad} -> {error, Bad} end. -delegate_call(Pid, Msg, Timeout) -> - delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). +delegate_call(Pid, Msg) -> + delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end). delegate_cast(Pid, Msg) -> delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7c7e28fe..e794b4aa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -21,7 +21,7 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(SYNC_INTERVAL, 5). %% milliseconds +-define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(BASE_MESSAGE_PROPERTIES, @@ -122,6 +122,8 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> terminate(_Reason, State = #q{backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? terminate_shutdown(fun (BQS) -> + rabbit_event:notify( + queue_deleted, [{pid, self()}]), BQS1 = BQ:delete_and_terminate(BQS), %% don't care if the internal delete %% doesn't return 'ok'. @@ -186,7 +188,6 @@ terminate_shutdown(Fun, State) -> end, BQS, all_ch_record()), [emit_consumer_deleted(Ch, CTag) || {Ch, CTag, _} <- consumers(State1)], - rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. @@ -657,13 +658,13 @@ message_properties(#q{ttl=TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL)}. calculate_msg_expiry(undefined) -> undefined; -calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000). +calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> - Now = now_millis(), + Now = now_micros(), BQS1 = BQ:dropwhile( fun (#message_properties{expiry = Expiry}) -> Now > Expiry @@ -684,7 +685,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. -now_millis() -> timer:now_diff(now(), {0,0,0}). +now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -790,20 +791,20 @@ handle_call({init, Recover}, From, handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> - case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of - true -> erlang:monitor(process, Owner), - declare(Recover, From, State); - _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined} = State, - gen_server2:reply(From, not_found), - case Recover of - true -> ok; - _ -> rabbit_log:warning( - "Queue ~p exclusive owner went away~n", [QName]) - end, - BQS = BQ:init(QName, IsDurable, Recover), - %% Rely on terminate to delete the queue. - {stop, normal, State#q{backing_queue_state = BQS}} + case rabbit_misc:is_process_alive(Owner) of + true -> erlang:monitor(process, Owner), + declare(Recover, From, State); + false -> #q{backing_queue = BQ, backing_queue_state = undefined, + q = #amqqueue{name = QName, durable = IsDurable}} = State, + gen_server2:reply(From, not_found), + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", [QName]) + end, + BQS = BQ:init(QName, IsDurable, Recover), + %% Rely on terminate to delete the queue. + {stop, normal, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a82e5eff..73c031de 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,7 +20,7 @@ -behaviour(gen_server2). --export([start_link/7, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/8, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1]). @@ -34,7 +34,8 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed, confirmed}). + confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, + confirmed, capabilities}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -66,9 +67,9 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/7 :: +-spec(start_link/8 :: (channel_number(), pid(), pid(), rabbit_types:user(), - rabbit_types:vhost(), pid(), + rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). @@ -94,10 +95,11 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, - StartLimiterFun) -> - gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User, - VHost, CollectorPid, StartLimiterFun], []). +start_link(Channel, ReaderPid, WriterPid, User, VHost, Capabilities, + CollectorPid, StartLimiterFun) -> + gen_server2:start_link(?MODULE, + [Channel, ReaderPid, WriterPid, User, VHost, + Capabilities, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -106,7 +108,7 @@ do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content}). flush(Pid) -> - gen_server2:call(Pid, flush). + gen_server2:call(Pid, flush, infinity). shutdown(Pid) -> gen_server2:cast(Pid, terminate). @@ -148,7 +150,7 @@ emit_stats(Pid) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, +init([Channel, ReaderPid, WriterPid, User, VHost, Capabilities, CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), @@ -173,8 +175,10 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, - unconfirmed = gb_trees:empty(), - confirmed = []}, + unconfirmed_mq = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), + confirmed = [], + capabilities = Capabilities}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -278,19 +282,22 @@ handle_info(timeout, State) -> noreply(State); handle_info({'DOWN', _MRef, process, QPid, Reason}, - State = #ch{unconfirmed = UC}) -> - %% TODO: this does a complete scan and partial rebuild of the - %% tree, which is quite efficient. To do better we'd need to - %% maintain a secondary mapping, from QPids to MsgSeqNos. - {MXs, UC1} = remove_queue_unconfirmed( - gb_trees:next(gb_trees:iterator(UC)), QPid, - {[], UC}, State), + State = #ch{unconfirmed_qm = UQM}) -> + MsgSeqNos = case gb_trees:lookup(QPid, UQM) of + {value, MsgSet} -> gb_sets:to_list(MsgSet); + none -> [] + end, + %% We remove the MsgSeqNos from UQM before calling + %% process_confirms to prevent each MsgSeqNo being removed from + %% the set one by one which which would be inefficient + State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, + {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1), erase_queue_stats(QPid), - State1 = case Reason of - normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); - _ -> send_nacks(MXs, State#ch{unconfirmed = UC1}) - end, - noreply(queue_blocked(QPid, State1)). + State3 = (case Reason of + normal -> fun record_confirms/2; + _ -> fun send_nacks/2 + end)(MXs, State2), + noreply(queue_blocked(QPid, State3)). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -476,13 +483,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _QPid, Acc, _State) -> - Acc; -remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) -> - remove_queue_unconfirmed(gb_trees:next(Next), QPid, - remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State), - State). - record_confirm(undefined, _, State) -> State; record_confirm(MsgSeqNo, XName, State) -> @@ -495,25 +495,43 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = +confirm(MsgSeqNos, QPid, State) -> + {MXs, State1} = process_confirms(MsgSeqNos, QPid, State), + record_confirms(MXs, State1). + +process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM}) -> + {MXs, UMQ1, UQM1} = lists:foldl( - fun(MsgSeqNo, {_DMs, UC0} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UC0) of - none -> Acc; - {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State) + fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UMQ0) of + {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc, + State); + none -> Acc end - end, {[], UC}, MsgSeqNos), - record_confirms(MXs, State#ch{unconfirmed = UC1}). + end, {[], UMQ, UQM}, MsgSeqNos), + {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. -remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> - Qs1 = sets:del_element(QPid, Qs), +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) -> %% these confirms will be emitted even when a queue dies, but that %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), - case sets:size(Qs1) of - 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)} + UQM1 = case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), + case gb_sets:is_empty(MsgSeqNos1) of + true -> gb_trees:delete(QPid, UQM); + false -> gb_trees:update(QPid, MsgSeqNos1, UQM) + end; + none -> + UQM + end, + Qs1 = gb_sets:del_element(QPid, Qs), + case gb_sets:is_empty(Qs1) of + true -> + {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1}; + false -> + {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -1250,10 +1268,21 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed = UC} = State, - [maybe_monitor(QPid) || QPid <- QPids], - UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC), - State#ch{unconfirmed = UC1}. + #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State, + UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), + SingletonSet = gb_sets:singleton(MsgSeqNo), + UQM1 = lists:foldl( + fun (QPid, UQM2) -> + maybe_monitor(QPid), + case gb_trees:lookup(QPid, UQM2) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), + gb_trees:update(QPid, MsgSeqNos1, UQM2); + none -> + gb_trees:insert(QPid, SingletonSet, UQM2) + end + end, UQM, QPids), + State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1289,11 +1318,11 @@ send_confirms(Cs, State) -> end, State). coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case gb_trees:is_empty(UC) of + CutOff = case gb_trees:is_empty(UMQ) of true -> lists:last(SMsgSeqNos) + 1; - false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo + false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of @@ -1320,8 +1349,8 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed = UC}) -> - gb_trees:size(UC); +i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> + gb_trees:size(UMQ); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index d21cfdb7..90058194 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -33,9 +33,10 @@ -type(start_link_args() :: {'tcp', rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_types:user(), rabbit_types:vhost(), pid()} | + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid()} | {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(), - rabbit_types:vhost(), pid()}). + rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()}). -spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). @@ -44,7 +45,7 @@ %%---------------------------------------------------------------------------- start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, - Collector}) -> + Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = supervisor2:start_child( @@ -56,19 +57,21 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, WriterPid, User, VHost, + [Channel, ReaderPid, WriterPid, User, VHost, Capabilities, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; -start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) -> +start_link({direct, Channel, ClientChannelPid, User, VHost, Capabilities, + Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ClientChannelPid, ClientChannelPid, - User, VHost, Collector, start_limiter_fun(SupPid)]}, + [Channel, ClientChannelPid, ClientChannelPid, User, + VHost, Capabilities, Collector, + start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 80483097..3a18950f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -44,22 +44,18 @@ start() -> {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), - FullCommand = init:get_plain_arguments(), - case FullCommand of - [] -> usage(); - _ -> ok - end, {[Command0 | Args], Opts} = - rabbit_misc:get_options( - [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr}, - {option, ?VHOST_OPT, "/"}], - FullCommand), - Opts1 = lists:map(fun({K, V}) -> - case K of - ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)}; - _ -> {K, V} - end - end, Opts), + case rabbit_misc:get_options([{flag, ?QUIET_OPT}, + {option, ?NODE_OPT, NodeStr}, + {option, ?VHOST_OPT, "/"}], + init:get_plain_arguments()) of + {[], _Opts} -> usage(); + CmdArgsAndOpts -> CmdArgsAndOpts + end, + Opts1 = [case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)}; + _ -> {K, V} + end || {K, V} <- Opts], Command = list_to_atom(Command0), Quiet = proplists:get_bool(?QUIET_OPT, Opts1), Node = proplists:get_value(?NODE_OPT, Opts1), diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 3b8c9fba..5c89bf49 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, connect/3, start_channel/5]). +-export([boot/0, connect/4, start_channel/6]). -include("rabbit.hrl"). @@ -25,12 +25,13 @@ -ifdef(use_specs). -spec(boot/0 :: () -> 'ok'). --spec(connect/3 :: (binary(), binary(), binary()) -> +-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). --spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(), - rabbit_types:user(), rabbit_types:vhost(), pid()) -> - {'ok', pid()}). +-spec(start_channel/6 :: + (rabbit_channel:channel_number(), pid(), rabbit_types:user(), + rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) -> + {'ok', pid()}). -endif. @@ -49,13 +50,14 @@ boot() -> %%---------------------------------------------------------------------------- -connect(Username, Password, VHost) -> +connect(Username, Password, VHost, Protocol) -> case lists:keymember(rabbit, 1, application:which_applications()) of true -> try rabbit_access_control:user_pass_login(Username, Password) of #user{} = User -> try rabbit_access_control:check_vhost_access(User, VHost) of - ok -> {ok, {User, rabbit_reader:server_properties()}} + ok -> {ok, {User, + rabbit_reader:server_properties(Protocol)}} catch exit:#amqp_error{name = access_refused} -> {error, access_refused} @@ -67,9 +69,10 @@ connect(Username, Password, VHost) -> {error, broker_not_found_on_node} end. -start_channel(Number, ClientChannelPid, User, VHost, Collector) -> +start_channel(Number, ClientChannelPid, User, VHost, Capabilities, Collector) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, User, VHost, Collector}]), + [{direct, Number, ClientChannelPid, User, VHost, Capabilities, + Collector}]), {ok, ChannelPid}. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 86ea7282..1b72dd76 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -65,7 +65,7 @@ start_link(ChPid, UnackedMsgCount) -> limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:call(LimiterPid, {limit, PrefetchCount}). + gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 7d916797..abc27c5f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -56,6 +56,7 @@ -export([lock_file/1]). -export([const_ok/1, const/1]). -export([ntoa/1, ntoab/1]). +-export([is_process_alive/1]). %%---------------------------------------------------------------------------- @@ -194,6 +195,7 @@ -spec(const/1 :: (A) -> const(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). +-spec(is_process_alive/1 :: (pid()) -> boolean()). -endif. @@ -350,8 +352,11 @@ throw_on_error(E, Thunk) -> with_exit_handler(Handler, Thunk) -> try Thunk() - catch exit:{R, _} when R =:= noproc; R =:= nodedown; - R =:= normal; R =:= shutdown -> + catch + exit:{R, _} when R =:= noproc; R =:= nodedown; + R =:= normal; R =:= shutdown -> + Handler(); + exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> Handler() end. @@ -858,3 +863,12 @@ ntoab(IP) -> 0 -> Str; _ -> "[" ++ Str ++ "]" end. + +is_process_alive(Pid) when node(Pid) =:= node() -> + erlang:is_process_alive(Pid); +is_process_alive(Pid) -> + case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of + true -> true; + _ -> false + end. + diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e9c356e1..7f3cf35f 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,7 +33,7 @@ -include("rabbit_msg_store.hrl"). --define(SYNC_INTERVAL, 5). %% milliseconds +-define(SYNC_INTERVAL, 25). %% milliseconds -define(CLEAN_FILENAME, "clean.dot"). -define(FILE_SUMMARY_FILENAME, "file_summary.ets"). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 283d25c7..36f61628 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -32,16 +32,6 @@ -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). --define(RABBIT_TCP_OPTS, [ - binary, - {packet, raw}, % no packaging - {reuseaddr, true}, % allow rebind without waiting - {backlog, 128}, % use the maximum listen(2) backlog value - %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. - %% {delay_send, true}, - {exit_on_close, false} - ]). - -define(SSL_TIMEOUT, 5). %% seconds -define(FIRST_TEST_BIND_PORT, 10000). @@ -200,7 +190,7 @@ start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) -> rabbit_sup, {Name, {tcp_listener_sup, start_link, - [IPAddress, Port, [Family | ?RABBIT_TCP_OPTS], + [IPAddress, Port, [Family | tcp_opts()], {?MODULE, tcp_listener_started, [Protocol]}, {?MODULE, tcp_listener_stopped, [Protocol]}, OnConnect, Label]}, @@ -315,6 +305,10 @@ hostname() -> cmap(F) -> rabbit_misc:filter_exit_map(F, connections()). +tcp_opts() -> + {ok, Opts} = application:get_env(rabbit, tcp_listen_options), + Opts. + %%-------------------------------------------------------------------- %% There are three kinds of machine (for our purposes). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index e4bc1cdc..817abaa2 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -22,14 +22,41 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([notify_cluster/0, rabbit_running_on/1]). -define(SERVER, ?MODULE). +-define(RABBIT_UP_RPC_TIMEOUT, 2000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(rabbit_running_on/1 :: (node()) -> 'ok'). +-spec(notify_cluster/0 :: () -> 'ok'). + +-endif. %%-------------------------------------------------------------------- start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +rabbit_running_on(Node) -> + gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}). + +notify_cluster() -> + Node = node(), + Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], + %% notify other rabbits of this rabbit + case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on, + [Node], ?RABBIT_UP_RPC_TIMEOUT) of + {_, [] } -> ok; + {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) + end, + %% register other active rabbits with this rabbit + [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ], + ok. + %%-------------------------------------------------------------------- init([]) -> @@ -39,19 +66,20 @@ init([]) -> handle_call(_Request, _From, State) -> {noreply, State}. +handle_cast({rabbit_running_on, Node}, State) -> + rabbit_log:info("node ~p up~n", [Node]), + erlang:monitor(process, {rabbit, Node}), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({nodeup, Node}, State) -> - rabbit_log:info("node ~p up", [Node]), - {noreply, State}; handle_info({nodedown, Node}, State) -> - rabbit_log:info("node ~p down", [Node]), - %% TODO: This may turn out to be a performance hog when there are - %% lots of nodes. We really only need to execute this code on - %% *one* node, rather than all of them. - ok = rabbit_networking:on_node_down(Node), - ok = rabbit_amqqueue:on_node_down(Node), + rabbit_log:info("node ~p down~n", [Node]), + ok = handle_dead_rabbit(Node), + {noreply, State}; +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> + rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), + ok = handle_dead_rabbit(Node), {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -64,3 +92,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- +%% TODO: This may turn out to be a performance hog when there are +%% lots of nodes. We really only need to execute this code on +%% *one* node, rather than all of them. +handle_dead_rabbit(Node) -> + ok = rabbit_networking:on_node_down(Node), + ok = rabbit_amqqueue:on_node_down(Node). + diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b5d82ac2..e9ff97f9 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -24,7 +24,7 @@ -export([init/4, mainloop/2]). --export([conserve_memory/2, server_properties/0]). +-export([conserve_memory/2, server_properties/1]). -export([process_channel_frame/5]). %% used by erlang-client @@ -160,7 +160,8 @@ -spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). --spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). +-spec(server_properties/1 :: (rabbit_types:protocol()) -> + rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy -spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) @@ -219,7 +220,7 @@ conserve_memory(Pid, Conserve) -> Pid ! {conserve_memory, Conserve}, ok. -server_properties() -> +server_properties(Protocol) -> {ok, Product} = application:get_key(rabbit, id), {ok, Version} = application:get_key(rabbit, vsn), @@ -230,22 +231,30 @@ server_properties() -> %% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms %% from the config and merge them with the generated built-in properties NormalizedConfigServerProps = - [case X of - {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), - longstr, - list_to_binary(Value)}; - {BinKey, Type, Value} -> {BinKey, Type, Value} - end || X <- RawConfigServerProps ++ - [{product, Product}, - {version, Version}, - {platform, "Erlang/OTP"}, - {copyright, ?COPYRIGHT_MESSAGE}, - {information, ?INFORMATION_MESSAGE}]], + [{<<"capabilities">>, table, server_capabilities(Protocol)} | + [case X of + {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), + longstr, + list_to_binary(Value)}; + {BinKey, Type, Value} -> {BinKey, Type, Value} + end || X <- RawConfigServerProps ++ + [{product, Product}, + {version, Version}, + {platform, "Erlang/OTP"}, + {copyright, ?COPYRIGHT_MESSAGE}, + {information, ?INFORMATION_MESSAGE}]]], %% Filter duplicated properties in favor of config file provided values lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, NormalizedConfigServerProps). +server_capabilities(rabbit_framing_amqp_0_9_1) -> + [{<<"publisher_confirms">>, bool, true}, + {<<"exchange_exchange_bindings">>, bool, true}, + {<<"basic.nack">>, bool, true}]; +server_capabilities(_) -> + []. + inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> @@ -357,7 +366,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw({handshake_timeout, State#v1.callback}) end; timeout -> - throw({timeout, State#v1.connection_state}); + case State#v1.connection_state of + closed -> mainloop(Deb, State); + S -> throw({timeout, S}) + end; {'$gen_call', From, {shutdown, Explanation}} -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -652,7 +664,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, Start = #'connection.start'{ version_major = ProtocolMajor, version_minor = ProtocolMinor, - server_properties = server_properties(), + server_properties = server_properties(Protocol), mechanisms = auth_mechanisms_binary(), locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), @@ -706,12 +718,18 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, connection = Connection, sock = Sock}) -> AuthMechanism = auth_mechanism_to_module(Mechanism), + Capabilities = + case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of + {table, Capabilities1} -> Capabilities1; + _ -> [] + end, State = State0#v1{auth_mechanism = AuthMechanism, auth_state = AuthMechanism:init(Sock), connection_state = securing, connection = Connection#connection{ - client_properties = ClientProperties}}, + client_properties = ClientProperties, + capabilities = Capabilities}}, auth_phase(Response, State); handle_method0(#'connection.secure_ok'{response = Response}, @@ -922,10 +940,14 @@ socket_info(Get, Select) -> end. ssl_info(F, Sock) -> + %% The first ok form is R14 + %% The second is R13 - the extra term is exportability (by inspection, + %% the docs are wrong) case rabbit_net:ssl_info(Sock) of - nossl -> ''; - {error, _} -> ''; - {ok, Info} -> F(Info) + nossl -> ''; + {error, _} -> ''; + {ok, {P, {K, C, H}}} -> F({P, {K, C, H}}); + {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}}) end. cert_info(F, Sock) -> @@ -940,14 +962,15 @@ cert_info(F, Sock) -> send_to_new_channel(Channel, AnalyzedFrame, State) -> #v1{sock = Sock, queue_collector = Collector, channel_sup_sup_pid = ChanSupSup, - connection = #connection{protocol = Protocol, - frame_max = FrameMax, - user = User, - vhost = VHost}} = State, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = User, + vhost = VHost, + capabilities = Capabilities}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User, - VHost, Collector}), + VHost, Capabilities, Collector}), erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 795413aa..9821ae7b 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -48,7 +48,7 @@ start_link() -> %%--------------------------------------------------------------------------- register(Class, TypeName, ModuleName) -> - gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}). + gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity). %% This is used with user-supplied arguments (e.g., on exchange %% declare), so we restrict it to existing atoms only. This means it diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 49b09508..2015170a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -26,6 +26,7 @@ -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], @@ -80,20 +81,24 @@ run_cluster_dependent_tests(SecondaryNode) -> io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), + passed = test_queue_cleanup(SecondaryNode), + passed = test_declare_on_dead_queue(SecondaryNode), %% we now run the tests remotely, so that code coverage on the %% local node picks up more of the delegate Node = node(), Self = self(), Remote = spawn(SecondaryNode, - fun () -> A = test_delegates_async(Node), - B = test_delegates_sync(Node), - Self ! {self(), {A, B}} + fun () -> Rs = [ test_delegates_async(Node), + test_delegates_sync(Node), + test_queue_cleanup(Node), + test_declare_on_dead_queue(Node) ], + Self ! {self(), Rs} end), receive {Remote, Result} -> - Result = {passed, passed} - after 2000 -> + Result = lists:duplicate(length(Result), passed) + after 30000 -> throw(timeout) end, @@ -1015,7 +1020,7 @@ test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - user(<<"user">>), <<"/">>, self(), + user(<<"user">>), <<"/">>, [], self(), fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- @@ -1074,8 +1079,8 @@ test_server_status() -> test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, - user(<<"guest">>), <<"/">>, self(), + {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, user(<<"guest">>), + <<"/">>, [], self(), fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok @@ -1228,7 +1233,7 @@ must_exit(Fun) -> end. test_delegates_sync(SecondaryNode) -> - Sender = fun (Pid) -> gen_server:call(Pid, invoked) end, + Sender = fun (Pid) -> gen_server:call(Pid, invoked, infinity) end, BadSender = fun (_Pid) -> exit(exception) end, Responder = make_responder(fun ({'$gen_call', From, invoked}) -> @@ -1278,6 +1283,61 @@ test_delegates_sync(SecondaryNode) -> passed. +test_queue_cleanup_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + Pid ! Method, + test_queue_cleanup_receiver(Pid) + end. + + +test_queue_cleanup(_SecondaryNode) -> + {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1), + rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }), + receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} -> + ok + after 1000 -> throw(failed_to_receive_queue_declare_ok) + end, + rabbit:stop(), + rabbit:start(), + rabbit_channel:do(Ch, #'queue.declare'{ passive = true, + queue = ?CLEANUP_QUEUE_NAME }), + receive + {channel_exit, 1, {amqp_error, not_found, _, _}} -> + ok + after 2000 -> + throw(failed_to_receive_channel_exit) + end, + passed. + +test_declare_on_dead_queue(SecondaryNode) -> + QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME), + Self = self(), + Pid = spawn(SecondaryNode, + fun () -> + {new, #amqqueue{name = QueueName, pid = QPid}} = + rabbit_amqqueue:declare(QueueName, false, false, [], + none), + exit(QPid, kill), + Self ! {self(), killed, QPid} + end), + receive + {Pid, killed, QPid} -> + {existing, #amqqueue{name = QueueName, + pid = QPid}} = + rabbit_amqqueue:declare(QueueName, false, false, [], none), + false = rabbit_misc:is_process_alive(QPid), + {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], + none), + true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + {ok, 0} = rabbit_amqqueue:delete(Q, false, false), + passed + after 2000 -> + throw(failed_to_create_and_kill_queue) + end. + %--------------------------------------------------------------------- control_action(Command, Args) -> @@ -2141,9 +2201,11 @@ test_configurable_server_properties() -> BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>, <<"copyright">>, <<"information">>], + Protocol = rabbit_framing_amqp_0_9_1, + %% Verify that the built-in properties are initially present - ActualPropNames = [Key || - {Key, longstr, _} <- rabbit_reader:server_properties()], + ActualPropNames = [Key || {Key, longstr, _} <- + rabbit_reader:server_properties(Protocol)], true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end, BuiltInPropNames), @@ -2154,9 +2216,10 @@ test_configurable_server_properties() -> ConsProp = fun (X) -> application:set_env(rabbit, server_properties, [X | ServerProperties]) end, - IsPropPresent = fun (X) -> lists:member(X, - rabbit_reader:server_properties()) - end, + IsPropPresent = + fun (X) -> + lists:member(X, rabbit_reader:server_properties(Protocol)) + end, %% Add a wholly new property of the simplified {KeyAtom, StringValue} form NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"}, @@ -2179,7 +2242,7 @@ test_configurable_server_properties() -> {BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)), list_to_binary(NewVerVal)}, ConsProp(NewVersion), - ClobberedServerProps = rabbit_reader:server_properties(), + ClobberedServerProps = rabbit_reader:server_properties(Protocol), %% Is the clobbering insert present? true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}), %% Is the clobbering insert the only thing with the clobbering key? |