diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-02-14 10:40:16 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-02-14 10:40:16 +0000 |
commit | 272b89ad55086537eda6cd2fe722c87093e14156 (patch) | |
tree | 7a422fb39375dddf27ecfc58ed2e853f13c3e865 | |
parent | 1da16a83e5383c37bbfcdda83767209d22e0e663 (diff) | |
parent | cd012caed4385b393ad612aabc12107419e13e15 (diff) | |
download | rabbitmq-server-272b89ad55086537eda6cd2fe722c87093e14156.tar.gz |
merge bug23765 into default (current sync interval in queue_process and msg_store causes poor confirms performance)
-rw-r--r-- | .hgignore | 3 | ||||
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 9 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 6 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 12 | ||||
-rw-r--r-- | packaging/windows-exe/Makefile | 16 | ||||
-rw-r--r-- | packaging/windows-exe/lib/EnvVarUpdate.nsh | 327 | ||||
-rw-r--r-- | packaging/windows-exe/rabbitmq.ico | bin | 0 -> 4286 bytes | |||
-rw-r--r-- | packaging/windows-exe/rabbitmq_nsi.in | 241 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 1 | ||||
-rw-r--r-- | scripts/rabbitmq-server.bat | 1 | ||||
-rw-r--r-- | scripts/rabbitmq-service.bat | 1 | ||||
-rw-r--r-- | src/delegate.erl | 18 | ||||
-rw-r--r-- | src/delegate_sup.erl | 26 | ||||
-rw-r--r-- | src/rabbit.erl | 19 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 43 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 33 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 16 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 53 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 23 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 70 |
22 files changed, 840 insertions, 93 deletions
@@ -25,6 +25,9 @@ syntax: regexp ^packaging/macports/macports$ ^packaging/generic-unix/rabbitmq-server-generic-unix-.*\.tar\.gz$ ^packaging/windows/rabbitmq-server-windows-.*\.zip$ +^packaging/windows-exe/rabbitmq_server-.*$ +^packaging/windows-exe/rabbitmq-.*\.nsi$ +^packaging/windows-exe/rabbitmq-server-.*\.exe$ ^docs/.*\.[15]\.gz$ ^docs/.*\.man\.xml$ @@ -41,14 +41,12 @@ RABBIT_PLT=rabbit.plt ifndef USE_SPECS # our type specs rely on features and bug fixes in dialyzer that are -# only available in R14A upwards (R13B04 is erts 5.7.5) -# -# NB: the test assumes that version number will only contain single digits -USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.5" ]; then echo "true"; else echo "false"; fi) +# only available in R14A upwards (R14A is erts 5.8) +USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8]), halt().') endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests -ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(shell [ $(USE_SPECS) = "true" ] && echo "-Duse_specs") +ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(if $(filter true,$(USE_SPECS)),-Duse_specs) VERSION=0.0.0 TARBALL_NAME=rabbitmq-server-$(VERSION) diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index cc7221d6..f837684c 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -34,4 +34,11 @@ {collect_statistics, none}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, - {delegate_count, 16}]}]}. + {delegate_count, 16}, + {tcp_listen_options, [binary, + {packet, raw}, + {reuseaddr, true}, + {backlog, 128}, + {nodelay, true}, + {exit_on_close, false}]} + ]}]}. diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index b37f7ab1..47316864 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -124,6 +124,12 @@ done rm -rf %{buildroot} %changelog +* Thu Feb 3 2011 simon@rabbitmq.com 2.3.1-1 +- New Upstream Release + +* Tue Feb 1 2011 simon@rabbitmq.com 2.3.0-1 +- New Upstream Release + * Mon Nov 29 2010 rob@rabbitmq.com 2.2.0-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index a60e691d..12165dc0 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,15 @@ +rabbitmq-server (2.3.1-1) lucid; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Thu, 03 Feb 2011 12:43:56 +0000 + +rabbitmq-server (2.3.0-1) lucid; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Tue, 01 Feb 2011 12:52:16 +0000 + rabbitmq-server (2.2.0-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/windows-exe/Makefile b/packaging/windows-exe/Makefile new file mode 100644 index 00000000..59803f9c --- /dev/null +++ b/packaging/windows-exe/Makefile @@ -0,0 +1,16 @@ +VERSION=0.0.0 +ZIP=../windows/rabbitmq-server-windows-$(VERSION) + +dist: rabbitmq-$(VERSION).nsi rabbitmq_server-$(VERSION) + makensis rabbitmq-$(VERSION).nsi + +rabbitmq-$(VERSION).nsi: rabbitmq_nsi.in + sed \ + -e 's|%%VERSION%%|$(VERSION)|' \ + $< > $@ + +rabbitmq_server-$(VERSION): + unzip $(ZIP) + +clean: + rm -rf rabbitmq-*.nsi rabbitmq_server-* rabbitmq-server-*.exe diff --git a/packaging/windows-exe/lib/EnvVarUpdate.nsh b/packaging/windows-exe/lib/EnvVarUpdate.nsh new file mode 100644 index 00000000..839d6a02 --- /dev/null +++ b/packaging/windows-exe/lib/EnvVarUpdate.nsh @@ -0,0 +1,327 @@ +/**
+ * EnvVarUpdate.nsh
+ * : Environmental Variables: append, prepend, and remove entries
+ *
+ * WARNING: If you use StrFunc.nsh header then include it before this file
+ * with all required definitions. This is to avoid conflicts
+ *
+ * Usage:
+ * ${EnvVarUpdate} "ResultVar" "EnvVarName" "Action" "RegLoc" "PathString"
+ *
+ * Credits:
+ * Version 1.0
+ * * Cal Turney (turnec2)
+ * * Amir Szekely (KiCHiK) and e-circ for developing the forerunners of this
+ * function: AddToPath, un.RemoveFromPath, AddToEnvVar, un.RemoveFromEnvVar,
+ * WriteEnvStr, and un.DeleteEnvStr
+ * * Diego Pedroso (deguix) for StrTok
+ * * Kevin English (kenglish_hi) for StrContains
+ * * Hendri Adriaens (Smile2Me), Diego Pedroso (deguix), and Dan Fuhry
+ * (dandaman32) for StrReplace
+ *
+ * Version 1.1 (compatibility with StrFunc.nsh)
+ * * techtonik
+ *
+ * http://nsis.sourceforge.net/Environmental_Variables:_append%2C_prepend%2C_and_remove_entries
+ *
+ */
+
+
+!ifndef ENVVARUPDATE_FUNCTION
+!define ENVVARUPDATE_FUNCTION
+!verbose push
+!verbose 3
+!include "LogicLib.nsh"
+!include "WinMessages.NSH"
+!include "StrFunc.nsh"
+
+; ---- Fix for conflict if StrFunc.nsh is already includes in main file -----------------------
+!macro _IncludeStrFunction StrFuncName
+ !ifndef ${StrFuncName}_INCLUDED
+ ${${StrFuncName}}
+ !endif
+ !ifndef Un${StrFuncName}_INCLUDED
+ ${Un${StrFuncName}}
+ !endif
+ !define un.${StrFuncName} "${Un${StrFuncName}}"
+!macroend
+
+!insertmacro _IncludeStrFunction StrTok
+!insertmacro _IncludeStrFunction StrStr
+!insertmacro _IncludeStrFunction StrRep
+
+; ---------------------------------- Macro Definitions ----------------------------------------
+!macro _EnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
+ Push "${EnvVarName}"
+ Push "${Action}"
+ Push "${RegLoc}"
+ Push "${PathString}"
+ Call EnvVarUpdate
+ Pop "${ResultVar}"
+!macroend
+!define EnvVarUpdate '!insertmacro "_EnvVarUpdateConstructor"'
+
+!macro _unEnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
+ Push "${EnvVarName}"
+ Push "${Action}"
+ Push "${RegLoc}"
+ Push "${PathString}"
+ Call un.EnvVarUpdate
+ Pop "${ResultVar}"
+!macroend
+!define un.EnvVarUpdate '!insertmacro "_unEnvVarUpdateConstructor"'
+; ---------------------------------- Macro Definitions end-------------------------------------
+
+;----------------------------------- EnvVarUpdate start----------------------------------------
+!define hklm_all_users 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
+!define hkcu_current_user 'HKCU "Environment"'
+
+!macro EnvVarUpdate UN
+
+Function ${UN}EnvVarUpdate
+
+ Push $0
+ Exch 4
+ Exch $1
+ Exch 3
+ Exch $2
+ Exch 2
+ Exch $3
+ Exch
+ Exch $4
+ Push $5
+ Push $6
+ Push $7
+ Push $8
+ Push $9
+ Push $R0
+
+ /* After this point:
+ -------------------------
+ $0 = ResultVar (returned)
+ $1 = EnvVarName (input)
+ $2 = Action (input)
+ $3 = RegLoc (input)
+ $4 = PathString (input)
+ $5 = Orig EnvVar (read from registry)
+ $6 = Len of $0 (temp)
+ $7 = tempstr1 (temp)
+ $8 = Entry counter (temp)
+ $9 = tempstr2 (temp)
+ $R0 = tempChar (temp) */
+
+ ; Step 1: Read contents of EnvVarName from RegLoc
+ ;
+ ; Check for empty EnvVarName
+ ${If} $1 == ""
+ SetErrors
+ DetailPrint "ERROR: EnvVarName is blank"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Check for valid Action
+ ${If} $2 != "A"
+ ${AndIf} $2 != "P"
+ ${AndIf} $2 != "R"
+ SetErrors
+ DetailPrint "ERROR: Invalid Action - must be A, P, or R"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ${If} $3 == HKLM
+ ReadRegStr $5 ${hklm_all_users} $1 ; Get EnvVarName from all users into $5
+ ${ElseIf} $3 == HKCU
+ ReadRegStr $5 ${hkcu_current_user} $1 ; Read EnvVarName from current user into $5
+ ${Else}
+ SetErrors
+ DetailPrint 'ERROR: Action is [$3] but must be "HKLM" or HKCU"'
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Check for empty PathString
+ ${If} $4 == ""
+ SetErrors
+ DetailPrint "ERROR: PathString is blank"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Make sure we've got some work to do
+ ${If} $5 == ""
+ ${AndIf} $2 == "R"
+ SetErrors
+ DetailPrint "$1 is empty - Nothing to remove"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Step 2: Scrub EnvVar
+ ;
+ StrCpy $0 $5 ; Copy the contents to $0
+ ; Remove spaces around semicolons (NOTE: spaces before the 1st entry or
+ ; after the last one are not removed here but instead in Step 3)
+ ${If} $0 != "" ; If EnvVar is not empty ...
+ ${Do}
+ ${${UN}StrStr} $7 $0 " ;"
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 " ;" ";" ; Remove '<space>;'
+ ${Loop}
+ ${Do}
+ ${${UN}StrStr} $7 $0 "; "
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 "; " ";" ; Remove ';<space>'
+ ${Loop}
+ ${Do}
+ ${${UN}StrStr} $7 $0 ";;"
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 ";;" ";"
+ ${Loop}
+
+ ; Remove a leading or trailing semicolon from EnvVar
+ StrCpy $7 $0 1 0
+ ${If} $7 == ";"
+ StrCpy $0 $0 "" 1 ; Change ';<EnvVar>' to '<EnvVar>'
+ ${EndIf}
+ StrLen $6 $0
+ IntOp $6 $6 - 1
+ StrCpy $7 $0 1 $6
+ ${If} $7 == ";"
+ StrCpy $0 $0 $6 ; Change ';<EnvVar>' to '<EnvVar>'
+ ${EndIf}
+ ; DetailPrint "Scrubbed $1: [$0]" ; Uncomment to debug
+ ${EndIf}
+
+ /* Step 3. Remove all instances of the target path/string (even if "A" or "P")
+ $6 = bool flag (1 = found and removed PathString)
+ $7 = a string (e.g. path) delimited by semicolon(s)
+ $8 = entry counter starting at 0
+ $9 = copy of $0
+ $R0 = tempChar */
+
+ ${If} $5 != "" ; If EnvVar is not empty ...
+ StrCpy $9 $0
+ StrCpy $0 ""
+ StrCpy $8 0
+ StrCpy $6 0
+
+ ${Do}
+ ${${UN}StrTok} $7 $9 ";" $8 "0" ; $7 = next entry, $8 = entry counter
+
+ ${If} $7 == "" ; If we've run out of entries,
+ ${ExitDo} ; were done
+ ${EndIf} ;
+
+ ; Remove leading and trailing spaces from this entry (critical step for Action=Remove)
+ ${Do}
+ StrCpy $R0 $7 1
+ ${If} $R0 != " "
+ ${ExitDo}
+ ${EndIf}
+ StrCpy $7 $7 "" 1 ; Remove leading space
+ ${Loop}
+ ${Do}
+ StrCpy $R0 $7 1 -1
+ ${If} $R0 != " "
+ ${ExitDo}
+ ${EndIf}
+ StrCpy $7 $7 -1 ; Remove trailing space
+ ${Loop}
+ ${If} $7 == $4 ; If string matches, remove it by not appending it
+ StrCpy $6 1 ; Set 'found' flag
+ ${ElseIf} $7 != $4 ; If string does NOT match
+ ${AndIf} $0 == "" ; and the 1st string being added to $0,
+ StrCpy $0 $7 ; copy it to $0 without a prepended semicolon
+ ${ElseIf} $7 != $4 ; If string does NOT match
+ ${AndIf} $0 != "" ; and this is NOT the 1st string to be added to $0,
+ StrCpy $0 $0;$7 ; append path to $0 with a prepended semicolon
+ ${EndIf} ;
+
+ IntOp $8 $8 + 1 ; Bump counter
+ ${Loop} ; Check for duplicates until we run out of paths
+ ${EndIf}
+
+ ; Step 4: Perform the requested Action
+ ;
+ ${If} $2 != "R" ; If Append or Prepend
+ ${If} $6 == 1 ; And if we found the target
+ DetailPrint "Target is already present in $1. It will be removed and"
+ ${EndIf}
+ ${If} $0 == "" ; If EnvVar is (now) empty
+ StrCpy $0 $4 ; just copy PathString to EnvVar
+ ${If} $6 == 0 ; If found flag is either 0
+ ${OrIf} $6 == "" ; or blank (if EnvVarName is empty)
+ DetailPrint "$1 was empty and has been updated with the target"
+ ${EndIf}
+ ${ElseIf} $2 == "A" ; If Append (and EnvVar is not empty),
+ StrCpy $0 $0;$4 ; append PathString
+ ${If} $6 == 1
+ DetailPrint "appended to $1"
+ ${Else}
+ DetailPrint "Target was appended to $1"
+ ${EndIf}
+ ${Else} ; If Prepend (and EnvVar is not empty),
+ StrCpy $0 $4;$0 ; prepend PathString
+ ${If} $6 == 1
+ DetailPrint "prepended to $1"
+ ${Else}
+ DetailPrint "Target was prepended to $1"
+ ${EndIf}
+ ${EndIf}
+ ${Else} ; If Action = Remove
+ ${If} $6 == 1 ; and we found the target
+ DetailPrint "Target was found and removed from $1"
+ ${Else}
+ DetailPrint "Target was NOT found in $1 (nothing to remove)"
+ ${EndIf}
+ ${If} $0 == ""
+ DetailPrint "$1 is now empty"
+ ${EndIf}
+ ${EndIf}
+
+ ; Step 5: Update the registry at RegLoc with the updated EnvVar and announce the change
+ ;
+ ClearErrors
+ ${If} $3 == HKLM
+ WriteRegExpandStr ${hklm_all_users} $1 $0 ; Write it in all users section
+ ${ElseIf} $3 == HKCU
+ WriteRegExpandStr ${hkcu_current_user} $1 $0 ; Write it to current user section
+ ${EndIf}
+
+ IfErrors 0 +4
+ MessageBox MB_OK|MB_ICONEXCLAMATION "Could not write updated $1 to $3"
+ DetailPrint "Could not write updated $1 to $3"
+ Goto EnvVarUpdate_Restore_Vars
+
+ ; "Export" our change
+ SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
+
+ EnvVarUpdate_Restore_Vars:
+ ;
+ ; Restore the user's variables and return ResultVar
+ Pop $R0
+ Pop $9
+ Pop $8
+ Pop $7
+ Pop $6
+ Pop $5
+ Pop $4
+ Pop $3
+ Pop $2
+ Pop $1
+ Push $0 ; Push my $0 (ResultVar)
+ Exch
+ Pop $0 ; Restore his $0
+
+FunctionEnd
+
+!macroend ; EnvVarUpdate UN
+!insertmacro EnvVarUpdate ""
+!insertmacro EnvVarUpdate "un."
+;----------------------------------- EnvVarUpdate end----------------------------------------
+
+!verbose pop
+!endif
diff --git a/packaging/windows-exe/rabbitmq.ico b/packaging/windows-exe/rabbitmq.ico Binary files differnew file mode 100644 index 00000000..5e169a79 --- /dev/null +++ b/packaging/windows-exe/rabbitmq.ico diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in new file mode 100644 index 00000000..6d79ffd4 --- /dev/null +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -0,0 +1,241 @@ +; Use the "Modern" UI +!include MUI2.nsh +!include LogicLib.nsh +!include WinMessages.nsh +!include FileFunc.nsh +!include WordFunc.nsh +!include lib\EnvVarUpdate.nsh + +!define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"' +!define uninstall "Software\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ" + +;-------------------------------- + +; The name of the installer +Name "RabbitMQ Server %%VERSION%%" + +; The file to write +OutFile "rabbitmq-server-%%VERSION%%.exe" + +; Icons +!define MUI_ICON "rabbitmq.ico" + +; The default installation directory +InstallDir "$PROGRAMFILES\RabbitMQ Server" + +; Registry key to check for directory (so if you install again, it will +; overwrite the old one automatically) +InstallDirRegKey HKLM "Software\VMware, Inc.\RabbitMQ Server" "Install_Dir" + +; Request application privileges for Windows Vista +RequestExecutionLevel admin + +SetCompressor /solid lzma + +VIProductVersion "%%VERSION%%.0" +VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductVersion" "%%VERSION%%" +VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server" +;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" "" +VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc" +;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ? +VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2011 VMware, Inc. All rights reserved." +VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server" +VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%" + +;-------------------------------- + +; Pages + + +; !insertmacro MUI_PAGE_LICENSE "..\..\LICENSE-MPL-RabbitMQ" + !insertmacro MUI_PAGE_COMPONENTS + !insertmacro MUI_PAGE_DIRECTORY + !insertmacro MUI_PAGE_INSTFILES + !insertmacro MUI_PAGE_FINISH + + !insertmacro MUI_UNPAGE_CONFIRM + !insertmacro MUI_UNPAGE_INSTFILES + !define MUI_FINISHPAGE_TEXT "RabbitMQ Server %%VERSION%% has been uninstalled from your computer.$\n$\nPlease note that the log and database directories located at $APPDATA\RabbitMQ have not been removed. You can remove them manually if desired." + !insertmacro MUI_UNPAGE_FINISH + +;-------------------------------- +;Languages + + !insertmacro MUI_LANGUAGE "English" + +;-------------------------------- + +; The stuff to install +Section "RabbitMQ Server (required)" Rabbit + + SectionIn RO + + ; Set output path to the installation directory. + SetOutPath $INSTDIR + + ; Put files there + File /r "rabbitmq_server-%%VERSION%%" + File "rabbitmq.ico" + + ; Add to PATH + ${EnvVarUpdate} $0 "PATH" "A" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin" + + ; Write the installation path into the registry + WriteRegStr HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" "Install_Dir" "$INSTDIR" + + ; Write the uninstall keys for Windows + WriteRegStr HKLM ${uninstall} "DisplayName" "RabbitMQ Server" + WriteRegStr HKLM ${uninstall} "UninstallString" "$INSTDIR\uninstall.exe" + WriteRegStr HKLM ${uninstall} "DisplayIcon" "$INSTDIR\uninstall.exe,0" + WriteRegStr HKLM ${uninstall} "Publisher" "VMware, Inc." + WriteRegStr HKLM ${uninstall} "DisplayVersion" "%%VERSION%%" + WriteRegDWORD HKLM ${uninstall} "NoModify" 1 + WriteRegDWORD HKLM ${uninstall} "NoRepair" 1 + + ${GetSize} "$INSTDIR" "/S=0K" $0 $1 $2 + IntFmt $0 "0x%08X" $0 + WriteRegDWORD HKLM "${uninstall}" "EstimatedSize" "$0" + + WriteUninstaller "uninstall.exe" +SectionEnd + +;-------------------------------- + +Section "RabbitMQ Service" RabbitService + ExpandEnvStrings $0 %COMSPEC% + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" install' + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" start' + CopyFiles "$WINDIR\.erlang.cookie" "$PROFILE\.erlang.cookie" +SectionEnd + +;-------------------------------- + +Section "Start Menu" RabbitStartMenu + ; In case the service is not installed, or the service installation fails, + ; make sure these exist or Explorer will get confused. + CreateDirectory "$APPDATA\RabbitMQ\log" + CreateDirectory "$APPDATA\RabbitMQ\db" + + CreateDirectory "$SMPROGRAMS\RabbitMQ Server" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Uninstall.lnk" "$INSTDIR\uninstall.exe" "" "$INSTDIR\uninstall.exe" 0 + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Plugins Directory.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\plugins" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Log Directory.lnk" "$APPDATA\RabbitMQ\log" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Database Directory.lnk" "$APPDATA\RabbitMQ\db" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\(Re)Install Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "install" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Remove Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "remove" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Start Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Stop Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico" + +SectionEnd + +;-------------------------------- + +; Section descriptions + +LangString DESC_Rabbit ${LANG_ENGLISH} "The RabbitMQ Server." +LangString DESC_RabbitService ${LANG_ENGLISH} "Set up RabbitMQ as a Windows Service." +LangString DESC_RabbitStartMenu ${LANG_ENGLISH} "Add some useful links to the start menu." + +!insertmacro MUI_FUNCTION_DESCRIPTION_BEGIN + !insertmacro MUI_DESCRIPTION_TEXT ${Rabbit} $(DESC_Rabbit) + !insertmacro MUI_DESCRIPTION_TEXT ${RabbitService} $(DESC_RabbitService) + !insertmacro MUI_DESCRIPTION_TEXT ${RabbitStartMenu} $(DESC_RabbitStartMenu) +!insertmacro MUI_FUNCTION_DESCRIPTION_END + +;-------------------------------- + +; Uninstaller + +Section "Uninstall" + + ; Remove registry keys + DeleteRegKey HKLM ${uninstall} + DeleteRegKey HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" + + ; TODO these will fail if the service is not installed - do we care? + ExpandEnvStrings $0 %COMSPEC% + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" stop' + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" remove' + + ; Remove from PATH + ${un.EnvVarUpdate} $0 "PATH" "R" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin" + + ; Remove files and uninstaller + RMDir /r "$INSTDIR\rabbitmq_server-%%VERSION%%" + Delete "$INSTDIR\rabbitmq.ico" + Delete "$INSTDIR\uninstall.exe" + + ; Remove start menu items + RMDir /r "$SMPROGRAMS\RabbitMQ Server" + + DeleteRegValue ${env_hklm} ERLANG_HOME + SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000 + +SectionEnd + +;-------------------------------- + +; Functions + +Function .onInit + Call findErlang + + ReadRegStr $0 HKLM ${uninstall} "UninstallString" + ${If} $0 != "" + MessageBox MB_OKCANCEL|MB_ICONEXCLAMATION "RabbitMQ is already installed. $\n$\nClick 'OK' to remove the previous version or 'Cancel' to cancel this installation." IDCANCEL norun + + ;Run the uninstaller + ClearErrors + ExecWait $INSTDIR\uninstall.exe + + norun: + Abort + ${EndIf} +FunctionEnd + +Function findErlang + + StrCpy $0 0 + StrCpy $2 "not-found" + ${Do} + EnumRegKey $1 HKLM Software\Ericsson\Erlang $0 + ${If} $1 = "" + ${Break} + ${EndIf} + ${If} $1 <> "ErlSrv" + StrCpy $2 $1 + ${EndIf} + + IntOp $0 $0 + 1 + ${Loop} + + ${If} $2 = "not-found" + MessageBox MB_YESNO|MB_ICONEXCLAMATION "Erlang could not be detected.$\nYou must install Erlang before installing RabbitMQ. Would you like the installer to open a browser window to the Erlang download site?" IDNO abort + ExecShell "open" "http://www.erlang.org/download.html" + abort: + Abort + ${Else} + ${VersionCompare} $2 "5.6.3" $0 + ${VersionCompare} $2 "5.8.1" $1 + + ${If} $0 = 2 + MessageBox MB_OK|MB_ICONEXCLAMATION "Your installed version of Erlang ($2) is too old. Please install a more recent version." + Abort + ${ElseIf} $1 = 2 + MessageBox MB_YESNO|MB_ICONEXCLAMATION "Your installed version of Erlang ($2) is comparatively old.$\nFor best results, please install a newer version.$\nDo you wish to continue?" IDYES no_abort + Abort + no_abort: + ${EndIf} + + ReadRegStr $0 HKLM "Software\Ericsson\Erlang\$2" "" + + ; See http://nsis.sourceforge.net/Setting_Environment_Variables + WriteRegExpandStr ${env_hklm} ERLANG_HOME $0 + SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000 + + ; On Windows XP changing the permanent environment does not change *our* + ; environment, so do that as well. + System::Call 'Kernel32::SetEnvironmentVariableA(t, t) i("ERLANG_HOME", "$0").r0' + ${EndIf} + +FunctionEnd
\ No newline at end of file diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 5c390a51..2f80eb96 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -16,7 +16,6 @@ ## SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ --kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" CONFIG_FILE=/etc/rabbitmq/rabbitmq LOG_BASE=/var/log/rabbitmq diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 0cfa5ea8..2ca9f2b3 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -142,7 +142,6 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( +W w ^
+A30 ^
+P 1048576 ^
--kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 43520b55..bc452fea 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -207,7 +207,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{nodelay,true}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/src/delegate.erl b/src/delegate.erl index 46bd8245..17046201 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]). +-export([start_link/1, invoke_no_result/2, invoke/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,8 +36,6 @@ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], [{pid(), term()}]}). --spec(delegate_count/1 :: ([node()]) -> non_neg_integer()). - -endif. %%---------------------------------------------------------------------------- @@ -111,22 +109,14 @@ group_pids_by_node(Pids) -> node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} end, {[], orddict:new()}, Pids). -delegate_count([RemoteNode | _]) -> - {ok, Count} = case application:get_env(rabbit, delegate_count) of - undefined -> rpc:call(RemoteNode, application, get_env, - [rabbit, delegate_count]); - Result -> Result - end, - Count. - delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). delegate(RemoteNodes) -> case get(delegate) of - undefined -> Name = - delegate_name(erlang:phash2( - self(), delegate_count(RemoteNodes))), + undefined -> Name = delegate_name( + erlang:phash2(self(), + delegate_sup:count(RemoteNodes))), put(delegate, Name), Name; Name -> Name diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index e0ffa7c8..fc693c7d 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor). --export([start_link/0]). +-export([start_link/1, count/1]). -export([init/1]). @@ -28,20 +28,32 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}). +-spec(count/1 :: ([node()]) -> integer()). -endif. %%---------------------------------------------------------------------------- -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_link(Count) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]). + +count([]) -> + 1; +count([Node | Nodes]) -> + try + length(supervisor:which_children({?SERVER, Node})) + catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> + count(Nodes); + exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown; + R =:= nodedown -> + count(Nodes) + end. %%---------------------------------------------------------------------------- -init(_Args) -> - DCount = delegate:delegate_count([node()]), +init([Count]) -> {ok, {{one_for_one, 10, 10}, [{Num, {delegate, start_link, [Num]}, transient, 16#ffffffff, worker, [delegate]} || - Num <- lists:seq(0, DCount - 1)]}}. + Num <- lists:seq(0, Count - 1)]}}. diff --git a/src/rabbit.erl b/src/rabbit.erl index c6661d39..1beed5c1 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -27,7 +27,7 @@ %%--------------------------------------------------------------------------- %% Boot steps. --export([maybe_insert_default_data/0]). +-export([maybe_insert_default_data/0, boot_delegate/0]). -rabbit_boot_step({codec_correctness_check, [{description, "codec correctness check"}, @@ -101,8 +101,7 @@ -rabbit_boot_step({delegate_sup, [{description, "cluster delegate"}, - {mfa, {rabbit_sup, start_child, - [delegate_sup]}}, + {mfa, {rabbit, boot_delegate, []}}, {requires, kernel_ready}, {enables, core_initialized}]}). @@ -153,6 +152,11 @@ [{mfa, {rabbit_networking, boot, []}}, {requires, log_relay}]}). +-rabbit_boot_step({notify_cluster, + [{description, "notify cluster nodes"}, + {mfa, {rabbit_node_monitor, notify_cluster, []}}, + {requires, networking}]}). + %%--------------------------------------------------------------------------- -include("rabbit_framing.hrl"). @@ -179,6 +183,9 @@ {running_nodes, [node()]}]). -spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). +-spec(maybe_insert_default_data/0 :: () -> 'ok'). +-spec(boot_delegate/0 :: () -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -225,11 +232,11 @@ start(normal, []) -> case erts_version_check() of ok -> {ok, SupPid} = rabbit_sup:start_link(), + true = register(rabbit, self()), print_banner(), [ok = run_boot_step(Step) || Step <- boot_steps()], io:format("~nbroker running~n"), - {ok, SupPid}; Error -> Error @@ -448,6 +455,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler, end end. +boot_delegate() -> + {ok, Count} = application:get_env(rabbit, delegate_count), + rabbit_sup:start_child(delegate_sup, [Count]). + maybe_insert_default_data() -> case rabbit_mnesia:is_db_empty() of true -> insert_default_data(); diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a6da551d..6e5aae27 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -218,7 +218,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> rabbit_misc:const(not_found) end; [ExistingQ = #amqqueue{pid = QPid}] -> - case is_process_alive(QPid) of + case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); false -> TailFun = internal_delete(QueueName), fun (Tx) -> TailFun(Tx), ExistingQ end @@ -356,7 +356,8 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> + delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). @@ -421,7 +422,7 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - delegate_cast(QPid, {notify_sent, ChPid}). + gen_server2:cast(QPid, {notify_sent, ChPid}). unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 28430cb2..75f65c2d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -617,6 +617,10 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. +backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> + maybe_run_queue_via_backing_queue( + fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). + maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> {Guids, BQS1} = Fun(BQS), run_message_queue( @@ -786,20 +790,20 @@ handle_call({init, Recover}, From, handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> - case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of - true -> erlang:monitor(process, Owner), - declare(Recover, From, State); - _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined} = State, - gen_server2:reply(From, not_found), - case Recover of - true -> ok; - _ -> rabbit_log:warning( - "Queue ~p exclusive owner went away~n", [QName]) - end, - BQS = BQ:init(QName, IsDurable, Recover), - %% Rely on terminate to delete the queue. - {stop, normal, State#q{backing_queue_state = BQS}} + case rabbit_misc:is_process_alive(Owner) of + true -> erlang:monitor(process, Owner), + declare(Recover, From, State); + false -> #q{backing_queue = BQ, backing_queue_state = undefined, + q = #amqqueue{name = QName, durable = IsDurable}} = State, + gen_server2:reply(From, not_found), + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", [QName]) + end, + BQS = BQ:init(QName, IsDurable, Recover), + %% Rely on terminate to delete the queue. + {stop, normal, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> @@ -996,10 +1000,8 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> noreply(maybe_run_queue_via_backing_queue(Fun, State)); -handle_cast(sync_timeout, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - noreply(State#q{backing_queue_state = BQ:idle_timeout(BQS), - sync_timer_ref = undefined}); +handle_cast(sync_timeout, State) -> + noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -1133,9 +1135,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; -handle_info(timeout, State = #q{backing_queue = BQ}) -> - noreply(maybe_run_queue_via_backing_queue( - fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State)); +handle_info(timeout, State) -> + noreply(backing_queue_idle_timeout(State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3a4fb024..abc27c5f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -56,6 +56,7 @@ -export([lock_file/1]). -export([const_ok/1, const/1]). -export([ntoa/1, ntoab/1]). +-export([is_process_alive/1]). %%---------------------------------------------------------------------------- @@ -194,6 +195,7 @@ -spec(const/1 :: (A) -> const(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). +-spec(is_process_alive/1 :: (pid()) -> boolean()). -endif. @@ -240,11 +242,20 @@ assert_args_equivalence1(Orig, New, Name, Key) -> {Same, Same} -> ok; {Orig1, New1} -> protocol_error( precondition_failed, - "inequivalent arg '~s' for ~s: " - "required ~w, received ~w", - [Key, rabbit_misc:rs(Name), New1, Orig1]) + "inequivalent arg '~s' for ~s: " + "received ~s but current is ~s", + [Key, rs(Name), val(New1), val(Orig1)]) end. +val(undefined) -> + "none"; +val({Type, Value}) -> + Fmt = case is_binary(Value) of + true -> "the value '~s' of type '~s'"; + false -> "the value '~w' of type '~s'" + end, + lists:flatten(io_lib:format(Fmt, [Value, Type])). + dirty_read(ReadSpec) -> case mnesia:dirty_read(ReadSpec) of [Result] -> {ok, Result}; @@ -341,8 +352,11 @@ throw_on_error(E, Thunk) -> with_exit_handler(Handler, Thunk) -> try Thunk() - catch exit:{R, _} when R =:= noproc; R =:= nodedown; - R =:= normal; R =:= shutdown -> + catch + exit:{R, _} when R =:= noproc; R =:= nodedown; + R =:= normal; R =:= shutdown -> + Handler(); + exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> Handler() end. @@ -849,3 +863,12 @@ ntoab(IP) -> 0 -> Str; _ -> "[" ++ Str ++ "]" end. + +is_process_alive(Pid) when node(Pid) =:= node() -> + erlang:is_process_alive(Pid); +is_process_alive(Pid) -> + case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of + true -> true; + _ -> false + end. + diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 283d25c7..36f61628 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -32,16 +32,6 @@ -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). --define(RABBIT_TCP_OPTS, [ - binary, - {packet, raw}, % no packaging - {reuseaddr, true}, % allow rebind without waiting - {backlog, 128}, % use the maximum listen(2) backlog value - %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. - %% {delay_send, true}, - {exit_on_close, false} - ]). - -define(SSL_TIMEOUT, 5). %% seconds -define(FIRST_TEST_BIND_PORT, 10000). @@ -200,7 +190,7 @@ start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) -> rabbit_sup, {Name, {tcp_listener_sup, start_link, - [IPAddress, Port, [Family | ?RABBIT_TCP_OPTS], + [IPAddress, Port, [Family | tcp_opts()], {?MODULE, tcp_listener_started, [Protocol]}, {?MODULE, tcp_listener_stopped, [Protocol]}, OnConnect, Label]}, @@ -315,6 +305,10 @@ hostname() -> cmap(F) -> rabbit_misc:filter_exit_map(F, connections()). +tcp_opts() -> + {ok, Opts} = application:get_env(rabbit, tcp_listen_options), + Opts. + %%-------------------------------------------------------------------- %% There are three kinds of machine (for our purposes). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index e4bc1cdc..817abaa2 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -22,14 +22,41 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([notify_cluster/0, rabbit_running_on/1]). -define(SERVER, ?MODULE). +-define(RABBIT_UP_RPC_TIMEOUT, 2000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(rabbit_running_on/1 :: (node()) -> 'ok'). +-spec(notify_cluster/0 :: () -> 'ok'). + +-endif. %%-------------------------------------------------------------------- start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +rabbit_running_on(Node) -> + gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}). + +notify_cluster() -> + Node = node(), + Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], + %% notify other rabbits of this rabbit + case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on, + [Node], ?RABBIT_UP_RPC_TIMEOUT) of + {_, [] } -> ok; + {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) + end, + %% register other active rabbits with this rabbit + [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ], + ok. + %%-------------------------------------------------------------------- init([]) -> @@ -39,19 +66,20 @@ init([]) -> handle_call(_Request, _From, State) -> {noreply, State}. +handle_cast({rabbit_running_on, Node}, State) -> + rabbit_log:info("node ~p up~n", [Node]), + erlang:monitor(process, {rabbit, Node}), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({nodeup, Node}, State) -> - rabbit_log:info("node ~p up", [Node]), - {noreply, State}; handle_info({nodedown, Node}, State) -> - rabbit_log:info("node ~p down", [Node]), - %% TODO: This may turn out to be a performance hog when there are - %% lots of nodes. We really only need to execute this code on - %% *one* node, rather than all of them. - ok = rabbit_networking:on_node_down(Node), - ok = rabbit_amqqueue:on_node_down(Node), + rabbit_log:info("node ~p down~n", [Node]), + ok = handle_dead_rabbit(Node), + {noreply, State}; +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> + rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), + ok = handle_dead_rabbit(Node), {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -64,3 +92,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- +%% TODO: This may turn out to be a performance hog when there are +%% lots of nodes. We really only need to execute this code on +%% *one* node, rather than all of them. +handle_dead_rabbit(Node) -> + ok = rabbit_networking:on_node_down(Node), + ok = rabbit_amqqueue:on_node_down(Node). + diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 475c415e..1781469a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -72,7 +72,13 @@ %% pre-init: %% receive protocol header -> send connection.start, *starting* %% starting: -%% receive connection.start_ok -> send connection.tune, *tuning* +%% receive connection.start_ok -> *securing* +%% securing: +%% check authentication credentials +%% if authentication success -> send connection.tune, *tuning* +%% if more challenge needed -> send connection.secure, +%% receive connection.secure_ok *securing* +%% otherwise send close, *exit* %% tuning: %% receive connection.tune_ok -> start heartbeats, *opening* %% opening: @@ -351,7 +357,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw({handshake_timeout, State#v1.callback}) end; timeout -> - throw({timeout, State#v1.connection_state}); + case State#v1.connection_state of + closed -> mainloop(Deb, State); + S -> throw({timeout, S}) + end; {'$gen_call', From, {shutdown, Explanation}} -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -916,10 +925,14 @@ socket_info(Get, Select) -> end. ssl_info(F, Sock) -> + %% The first ok form is R14 + %% The second is R13 - the extra term is exportability (by inspection, + %% the docs are wrong) case rabbit_net:ssl_info(Sock) of - nossl -> ''; - {error, _} -> ''; - {ok, Info} -> F(Info) + nossl -> ''; + {error, _} -> ''; + {ok, {P, {K, C, H}}} -> F({P, {K, C, H}}); + {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}}) end. cert_info(F, Sock) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 49b09508..58c369b5 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -26,6 +26,7 @@ -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], @@ -80,20 +81,24 @@ run_cluster_dependent_tests(SecondaryNode) -> io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), + passed = test_queue_cleanup(SecondaryNode), + passed = test_declare_on_dead_queue(SecondaryNode), %% we now run the tests remotely, so that code coverage on the %% local node picks up more of the delegate Node = node(), Self = self(), Remote = spawn(SecondaryNode, - fun () -> A = test_delegates_async(Node), - B = test_delegates_sync(Node), - Self ! {self(), {A, B}} + fun () -> Rs = [ test_delegates_async(Node), + test_delegates_sync(Node), + test_queue_cleanup(Node), + test_declare_on_dead_queue(Node) ], + Self ! {self(), Rs} end), receive {Remote, Result} -> - Result = {passed, passed} - after 2000 -> + Result = lists:duplicate(length(Result), passed) + after 30000 -> throw(timeout) end, @@ -1278,6 +1283,61 @@ test_delegates_sync(SecondaryNode) -> passed. +test_queue_cleanup_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + Pid ! Method, + test_queue_cleanup_receiver(Pid) + end. + + +test_queue_cleanup(_SecondaryNode) -> + {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1), + rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }), + receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} -> + ok + after 1000 -> throw(failed_to_receive_queue_declare_ok) + end, + rabbit:stop(), + rabbit:start(), + rabbit_channel:do(Ch, #'queue.declare'{ passive = true, + queue = ?CLEANUP_QUEUE_NAME }), + receive + {channel_exit, 1, {amqp_error, not_found, _, _}} -> + ok + after 2000 -> + throw(failed_to_receive_channel_exit) + end, + passed. + +test_declare_on_dead_queue(SecondaryNode) -> + QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME), + Self = self(), + Pid = spawn(SecondaryNode, + fun () -> + {new, #amqqueue{name = QueueName, pid = QPid}} = + rabbit_amqqueue:declare(QueueName, false, false, [], + none), + exit(QPid, kill), + Self ! {self(), killed, QPid} + end), + receive + {Pid, killed, QPid} -> + {existing, #amqqueue{name = QueueName, + pid = QPid}} = + rabbit_amqqueue:declare(QueueName, false, false, [], none), + false = rabbit_misc:is_process_alive(QPid), + {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], + none), + true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + {ok, 0} = rabbit_amqqueue:delete(Q, false, false), + passed + after 2000 -> + throw(failed_to_create_and_kill_queue) + end. + %--------------------------------------------------------------------- control_action(Command, Args) -> |