summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Powell <steve@rabbitmq.com>2011-03-11 14:45:46 +0000
committerSteve Powell <steve@rabbitmq.com>2011-03-11 14:45:46 +0000
commit0bc04b510e618ace4381336e6056bb81943d59e2 (patch)
treea720a26a8dc931395361e35b0a32b2afd7f2c480
parent59151f5f16866343a50bea76ac7da9a63008a2e0 (diff)
parent36f2b7b90445999485e57f5b7b3fec334a72c96a (diff)
downloadrabbitmq-server-0bc04b510e618ace4381336e6056bb81943d59e2.tar.gz
Merged bug22005 into default.
-rw-r--r--Makefile4
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl2
-rw-r--r--include/rabbit_msg_store.hrl3
-rw-r--r--include/rabbit_msg_store_index.hrl8
-rw-r--r--packaging/RPMS/Fedora/Makefile9
-rw-r--r--packaging/common/rabbitmq-server.init4
-rw-r--r--packaging/debs/Debian/Makefile4
-rw-r--r--packaging/windows-exe/lib/EnvVarUpdate.nsh327
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in10
-rw-r--r--src/file_handle_cache.erl125
-rw-r--r--src/gen_server2.erl4
-rw-r--r--src/gm.erl24
-rw-r--r--src/gm_tests.erl14
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_alarm.erl111
-rw-r--r--src/rabbit_amqqueue.erl26
-rw-r--r--src/rabbit_amqqueue_process.erl166
-rw-r--r--src/rabbit_auth_backend_internal.erl4
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl2
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_basic.erl36
-rw-r--r--src/rabbit_binary_generator.erl13
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl194
-rw-r--r--src/rabbit_channel_sup.erl12
-rw-r--r--src/rabbit_client_sup.erl4
-rw-r--r--src/rabbit_control.erl46
-rw-r--r--src/rabbit_direct.erl22
-rw-r--r--src/rabbit_event.erl2
-rw-r--r--src/rabbit_exchange.erl8
-rw-r--r--src/rabbit_exchange_type_topic.erl34
-rw-r--r--src/rabbit_memory_monitor.erl10
-rw-r--r--src/rabbit_misc.erl24
-rw-r--r--src/rabbit_mnesia.erl22
-rw-r--r--src/rabbit_msg_file.erl79
-rw-r--r--src/rabbit_msg_store.erl419
-rw-r--r--src/rabbit_msg_store_ets_index.erl2
-rw-r--r--src/rabbit_node_monitor.erl13
-rw-r--r--src/rabbit_prelaunch.erl14
-rw-r--r--src/rabbit_queue_index.erl128
-rw-r--r--src/rabbit_reader.erl42
-rw-r--r--src/rabbit_router.erl6
-rw-r--r--src/rabbit_ssl.erl6
-rw-r--r--src/rabbit_tests.erl569
-rw-r--r--src/rabbit_types.erl99
-rw-r--r--src/rabbit_upgrade.erl6
-rw-r--r--src/rabbit_variable_queue.erl253
-rw-r--r--src/rabbit_vhost.erl18
-rw-r--r--src/rabbit_writer.erl10
-rw-r--r--src/vm_memory_monitor.erl4
51 files changed, 1402 insertions, 1547 deletions
diff --git a/Makefile b/Makefile
index 00c7809d..cdb86aad 100644
--- a/Makefile
+++ b/Makefile
@@ -177,11 +177,11 @@ stop-rabbit-on-node: all
echo "rabbit:stop()." | $(ERL_CALL)
set-memory-alarm: all
- echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \
+ echo "alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []})." | \
$(ERL_CALL)
clear-memory-alarm: all
- echo "alarm_handler:clear_alarm(vm_memory_high_watermark)." | \
+ echo "alarm_handler:clear_alarm({vm_memory_high_watermark, node()})." | \
$(ERL_CALL)
stop-node:
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index f837684c..014c18b0 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -20,6 +20,7 @@
{vm_memory_high_watermark, 0.4},
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
+ {frame_max, 131072},
{persister_max_wrap_entries, 500},
{persister_hibernate_after, 10000},
{msg_store_file_size_limit, 16777216},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 4d75b546..9f483c30 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -62,7 +62,7 @@
-record(listener, {node, protocol, host, ip_address, port}).
--record(basic_message, {exchange_name, routing_keys = [], content, guid,
+-record(basic_message, {exchange_name, routing_keys = [], content, id,
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
index 9d704f65..e9150a97 100644
--- a/include/rabbit_msg_store.hrl
+++ b/include/rabbit_msg_store.hrl
@@ -22,5 +22,4 @@
-endif.
--record(msg_location,
- {guid, ref_count, file, offset, total_size}).
+-record(msg_location, {msg_id, ref_count, file, offset, total_size}).
diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl
index 289f8f60..2ae5b000 100644
--- a/include/rabbit_msg_store_index.hrl
+++ b/include/rabbit_msg_store_index.hrl
@@ -29,13 +29,13 @@
-spec(new/1 :: (dir()) -> index_state()).
-spec(recover/1 :: (dir()) -> rabbit_types:ok_or_error2(index_state(), any())).
-spec(lookup/2 ::
- (rabbit_guid:guid(), index_state()) -> ('not_found' | keyvalue())).
+ (rabbit_types:msg_id(), index_state()) -> ('not_found' | keyvalue())).
-spec(insert/2 :: (keyvalue(), index_state()) -> 'ok').
-spec(update/2 :: (keyvalue(), index_state()) -> 'ok').
--spec(update_fields/3 :: (rabbit_guid:guid(), ({fieldpos(), fieldvalue()} |
- [{fieldpos(), fieldvalue()}]),
+-spec(update_fields/3 :: (rabbit_types:msg_id(), ({fieldpos(), fieldvalue()} |
+ [{fieldpos(), fieldvalue()}]),
index_state()) -> 'ok').
--spec(delete/2 :: (rabbit_guid:guid(), index_state()) -> 'ok').
+-spec(delete/2 :: (rabbit_types:msg_id(), index_state()) -> 'ok').
-spec(delete_object/2 :: (keyvalue(), index_state()) -> 'ok').
-spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok').
-spec(terminate/1 :: (index_state()) -> any()).
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 287945fe..c67d8fd6 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -12,7 +12,7 @@ ifndef RPM_OS
RPM_OS=fedora
endif
-ifeq "x$(RPM_OS)" "xsuse"
+ifeq "$(RPM_OS)" "suse"
REQUIRES=/sbin/chkconfig /sbin/service
OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse'
else
@@ -33,6 +33,11 @@ prepare:
sed -i \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \
SOURCES/rabbitmq-server.init
+ifeq "$(RPM_OS)" "fedora"
+# Fedora says that only vital services should have Default-Start
+ sed -i -e '/^# Default-Start:/d;/^# Default-Stop:/d' \
+ SOURCES/rabbitmq-server.init
+endif
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
SOURCES/rabbitmq-script-wrapper
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
@@ -40,5 +45,5 @@ prepare:
server: prepare
rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES)
-clean:
+clean:
rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init
index 916dee6f..f3bdc3d2 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/common/rabbitmq-server.init
@@ -10,8 +10,8 @@
# Provides: rabbitmq-server
# Required-Start: $remote_fs $network
# Required-Stop: $remote_fs $network
-# Default-Start:
-# Default-Stop:
+# Default-Start: 3 4 5
+# Default-Stop: 0 1 2 6
# Description: RabbitMQ broker
# Short-Description: Enable AMQP service provided by RabbitMQ broker
### END INIT INFO
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index d937fbb2..31979a8e 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -22,8 +22,12 @@ package: clean
tar -zxvf $(DEBIAN_ORIG_TARBALL)
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
+# Debian and descendants differ from most other distros in that
+# runlevel 2 should start network services.
sed -i \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \
+ -e 's|^\(# Default-Start:\).*$$|\1 2 3 4 5|' \
+ -e 's|^\(# Default-Stop:\).*$$|\1 0 1 6|' \
$(UNPACKED_DIR)/debian/rabbitmq-server.init
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
$(UNPACKED_DIR)/debian/rabbitmq-script-wrapper
diff --git a/packaging/windows-exe/lib/EnvVarUpdate.nsh b/packaging/windows-exe/lib/EnvVarUpdate.nsh
deleted file mode 100644
index 839d6a02..00000000
--- a/packaging/windows-exe/lib/EnvVarUpdate.nsh
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * EnvVarUpdate.nsh
- * : Environmental Variables: append, prepend, and remove entries
- *
- * WARNING: If you use StrFunc.nsh header then include it before this file
- * with all required definitions. This is to avoid conflicts
- *
- * Usage:
- * ${EnvVarUpdate} "ResultVar" "EnvVarName" "Action" "RegLoc" "PathString"
- *
- * Credits:
- * Version 1.0
- * * Cal Turney (turnec2)
- * * Amir Szekely (KiCHiK) and e-circ for developing the forerunners of this
- * function: AddToPath, un.RemoveFromPath, AddToEnvVar, un.RemoveFromEnvVar,
- * WriteEnvStr, and un.DeleteEnvStr
- * * Diego Pedroso (deguix) for StrTok
- * * Kevin English (kenglish_hi) for StrContains
- * * Hendri Adriaens (Smile2Me), Diego Pedroso (deguix), and Dan Fuhry
- * (dandaman32) for StrReplace
- *
- * Version 1.1 (compatibility with StrFunc.nsh)
- * * techtonik
- *
- * http://nsis.sourceforge.net/Environmental_Variables:_append%2C_prepend%2C_and_remove_entries
- *
- */
-
-
-!ifndef ENVVARUPDATE_FUNCTION
-!define ENVVARUPDATE_FUNCTION
-!verbose push
-!verbose 3
-!include "LogicLib.nsh"
-!include "WinMessages.NSH"
-!include "StrFunc.nsh"
-
-; ---- Fix for conflict if StrFunc.nsh is already includes in main file -----------------------
-!macro _IncludeStrFunction StrFuncName
- !ifndef ${StrFuncName}_INCLUDED
- ${${StrFuncName}}
- !endif
- !ifndef Un${StrFuncName}_INCLUDED
- ${Un${StrFuncName}}
- !endif
- !define un.${StrFuncName} "${Un${StrFuncName}}"
-!macroend
-
-!insertmacro _IncludeStrFunction StrTok
-!insertmacro _IncludeStrFunction StrStr
-!insertmacro _IncludeStrFunction StrRep
-
-; ---------------------------------- Macro Definitions ----------------------------------------
-!macro _EnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
- Push "${EnvVarName}"
- Push "${Action}"
- Push "${RegLoc}"
- Push "${PathString}"
- Call EnvVarUpdate
- Pop "${ResultVar}"
-!macroend
-!define EnvVarUpdate '!insertmacro "_EnvVarUpdateConstructor"'
-
-!macro _unEnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
- Push "${EnvVarName}"
- Push "${Action}"
- Push "${RegLoc}"
- Push "${PathString}"
- Call un.EnvVarUpdate
- Pop "${ResultVar}"
-!macroend
-!define un.EnvVarUpdate '!insertmacro "_unEnvVarUpdateConstructor"'
-; ---------------------------------- Macro Definitions end-------------------------------------
-
-;----------------------------------- EnvVarUpdate start----------------------------------------
-!define hklm_all_users 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
-!define hkcu_current_user 'HKCU "Environment"'
-
-!macro EnvVarUpdate UN
-
-Function ${UN}EnvVarUpdate
-
- Push $0
- Exch 4
- Exch $1
- Exch 3
- Exch $2
- Exch 2
- Exch $3
- Exch
- Exch $4
- Push $5
- Push $6
- Push $7
- Push $8
- Push $9
- Push $R0
-
- /* After this point:
- -------------------------
- $0 = ResultVar (returned)
- $1 = EnvVarName (input)
- $2 = Action (input)
- $3 = RegLoc (input)
- $4 = PathString (input)
- $5 = Orig EnvVar (read from registry)
- $6 = Len of $0 (temp)
- $7 = tempstr1 (temp)
- $8 = Entry counter (temp)
- $9 = tempstr2 (temp)
- $R0 = tempChar (temp) */
-
- ; Step 1: Read contents of EnvVarName from RegLoc
- ;
- ; Check for empty EnvVarName
- ${If} $1 == ""
- SetErrors
- DetailPrint "ERROR: EnvVarName is blank"
- Goto EnvVarUpdate_Restore_Vars
- ${EndIf}
-
- ; Check for valid Action
- ${If} $2 != "A"
- ${AndIf} $2 != "P"
- ${AndIf} $2 != "R"
- SetErrors
- DetailPrint "ERROR: Invalid Action - must be A, P, or R"
- Goto EnvVarUpdate_Restore_Vars
- ${EndIf}
-
- ${If} $3 == HKLM
- ReadRegStr $5 ${hklm_all_users} $1 ; Get EnvVarName from all users into $5
- ${ElseIf} $3 == HKCU
- ReadRegStr $5 ${hkcu_current_user} $1 ; Read EnvVarName from current user into $5
- ${Else}
- SetErrors
- DetailPrint 'ERROR: Action is [$3] but must be "HKLM" or HKCU"'
- Goto EnvVarUpdate_Restore_Vars
- ${EndIf}
-
- ; Check for empty PathString
- ${If} $4 == ""
- SetErrors
- DetailPrint "ERROR: PathString is blank"
- Goto EnvVarUpdate_Restore_Vars
- ${EndIf}
-
- ; Make sure we've got some work to do
- ${If} $5 == ""
- ${AndIf} $2 == "R"
- SetErrors
- DetailPrint "$1 is empty - Nothing to remove"
- Goto EnvVarUpdate_Restore_Vars
- ${EndIf}
-
- ; Step 2: Scrub EnvVar
- ;
- StrCpy $0 $5 ; Copy the contents to $0
- ; Remove spaces around semicolons (NOTE: spaces before the 1st entry or
- ; after the last one are not removed here but instead in Step 3)
- ${If} $0 != "" ; If EnvVar is not empty ...
- ${Do}
- ${${UN}StrStr} $7 $0 " ;"
- ${If} $7 == ""
- ${ExitDo}
- ${EndIf}
- ${${UN}StrRep} $0 $0 " ;" ";" ; Remove '<space>;'
- ${Loop}
- ${Do}
- ${${UN}StrStr} $7 $0 "; "
- ${If} $7 == ""
- ${ExitDo}
- ${EndIf}
- ${${UN}StrRep} $0 $0 "; " ";" ; Remove ';<space>'
- ${Loop}
- ${Do}
- ${${UN}StrStr} $7 $0 ";;"
- ${If} $7 == ""
- ${ExitDo}
- ${EndIf}
- ${${UN}StrRep} $0 $0 ";;" ";"
- ${Loop}
-
- ; Remove a leading or trailing semicolon from EnvVar
- StrCpy $7 $0 1 0
- ${If} $7 == ";"
- StrCpy $0 $0 "" 1 ; Change ';<EnvVar>' to '<EnvVar>'
- ${EndIf}
- StrLen $6 $0
- IntOp $6 $6 - 1
- StrCpy $7 $0 1 $6
- ${If} $7 == ";"
- StrCpy $0 $0 $6 ; Change ';<EnvVar>' to '<EnvVar>'
- ${EndIf}
- ; DetailPrint "Scrubbed $1: [$0]" ; Uncomment to debug
- ${EndIf}
-
- /* Step 3. Remove all instances of the target path/string (even if "A" or "P")
- $6 = bool flag (1 = found and removed PathString)
- $7 = a string (e.g. path) delimited by semicolon(s)
- $8 = entry counter starting at 0
- $9 = copy of $0
- $R0 = tempChar */
-
- ${If} $5 != "" ; If EnvVar is not empty ...
- StrCpy $9 $0
- StrCpy $0 ""
- StrCpy $8 0
- StrCpy $6 0
-
- ${Do}
- ${${UN}StrTok} $7 $9 ";" $8 "0" ; $7 = next entry, $8 = entry counter
-
- ${If} $7 == "" ; If we've run out of entries,
- ${ExitDo} ; were done
- ${EndIf} ;
-
- ; Remove leading and trailing spaces from this entry (critical step for Action=Remove)
- ${Do}
- StrCpy $R0 $7 1
- ${If} $R0 != " "
- ${ExitDo}
- ${EndIf}
- StrCpy $7 $7 "" 1 ; Remove leading space
- ${Loop}
- ${Do}
- StrCpy $R0 $7 1 -1
- ${If} $R0 != " "
- ${ExitDo}
- ${EndIf}
- StrCpy $7 $7 -1 ; Remove trailing space
- ${Loop}
- ${If} $7 == $4 ; If string matches, remove it by not appending it
- StrCpy $6 1 ; Set 'found' flag
- ${ElseIf} $7 != $4 ; If string does NOT match
- ${AndIf} $0 == "" ; and the 1st string being added to $0,
- StrCpy $0 $7 ; copy it to $0 without a prepended semicolon
- ${ElseIf} $7 != $4 ; If string does NOT match
- ${AndIf} $0 != "" ; and this is NOT the 1st string to be added to $0,
- StrCpy $0 $0;$7 ; append path to $0 with a prepended semicolon
- ${EndIf} ;
-
- IntOp $8 $8 + 1 ; Bump counter
- ${Loop} ; Check for duplicates until we run out of paths
- ${EndIf}
-
- ; Step 4: Perform the requested Action
- ;
- ${If} $2 != "R" ; If Append or Prepend
- ${If} $6 == 1 ; And if we found the target
- DetailPrint "Target is already present in $1. It will be removed and"
- ${EndIf}
- ${If} $0 == "" ; If EnvVar is (now) empty
- StrCpy $0 $4 ; just copy PathString to EnvVar
- ${If} $6 == 0 ; If found flag is either 0
- ${OrIf} $6 == "" ; or blank (if EnvVarName is empty)
- DetailPrint "$1 was empty and has been updated with the target"
- ${EndIf}
- ${ElseIf} $2 == "A" ; If Append (and EnvVar is not empty),
- StrCpy $0 $0;$4 ; append PathString
- ${If} $6 == 1
- DetailPrint "appended to $1"
- ${Else}
- DetailPrint "Target was appended to $1"
- ${EndIf}
- ${Else} ; If Prepend (and EnvVar is not empty),
- StrCpy $0 $4;$0 ; prepend PathString
- ${If} $6 == 1
- DetailPrint "prepended to $1"
- ${Else}
- DetailPrint "Target was prepended to $1"
- ${EndIf}
- ${EndIf}
- ${Else} ; If Action = Remove
- ${If} $6 == 1 ; and we found the target
- DetailPrint "Target was found and removed from $1"
- ${Else}
- DetailPrint "Target was NOT found in $1 (nothing to remove)"
- ${EndIf}
- ${If} $0 == ""
- DetailPrint "$1 is now empty"
- ${EndIf}
- ${EndIf}
-
- ; Step 5: Update the registry at RegLoc with the updated EnvVar and announce the change
- ;
- ClearErrors
- ${If} $3 == HKLM
- WriteRegExpandStr ${hklm_all_users} $1 $0 ; Write it in all users section
- ${ElseIf} $3 == HKCU
- WriteRegExpandStr ${hkcu_current_user} $1 $0 ; Write it to current user section
- ${EndIf}
-
- IfErrors 0 +4
- MessageBox MB_OK|MB_ICONEXCLAMATION "Could not write updated $1 to $3"
- DetailPrint "Could not write updated $1 to $3"
- Goto EnvVarUpdate_Restore_Vars
-
- ; "Export" our change
- SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
-
- EnvVarUpdate_Restore_Vars:
- ;
- ; Restore the user's variables and return ResultVar
- Pop $R0
- Pop $9
- Pop $8
- Pop $7
- Pop $6
- Pop $5
- Pop $4
- Pop $3
- Pop $2
- Pop $1
- Push $0 ; Push my $0 (ResultVar)
- Exch
- Pop $0 ; Restore his $0
-
-FunctionEnd
-
-!macroend ; EnvVarUpdate UN
-!insertmacro EnvVarUpdate ""
-!insertmacro EnvVarUpdate "un."
-;----------------------------------- EnvVarUpdate end----------------------------------------
-
-!verbose pop
-!endif
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
index 6d79ffd4..1ed4064e 100644
--- a/packaging/windows-exe/rabbitmq_nsi.in
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -4,7 +4,6 @@
!include WinMessages.nsh
!include FileFunc.nsh
!include WordFunc.nsh
-!include lib\EnvVarUpdate.nsh
!define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
!define uninstall "Software\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ"
@@ -77,9 +76,6 @@ Section "RabbitMQ Server (required)" Rabbit
File /r "rabbitmq_server-%%VERSION%%"
File "rabbitmq.ico"
- ; Add to PATH
- ${EnvVarUpdate} $0 "PATH" "A" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin"
-
; Write the installation path into the registry
WriteRegStr HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" "Install_Dir" "$INSTDIR"
@@ -126,6 +122,9 @@ Section "Start Menu" RabbitStartMenu
CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Start Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico"
CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Stop Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico"
+ SetOutPath "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Command Prompt (sbin dir).lnk" "$WINDIR\system32\cmd.exe" "" "$WINDIR\system32\cmd.exe"
+ SetOutPath $INSTDIR
SectionEnd
;--------------------------------
@@ -157,9 +156,6 @@ Section "Uninstall"
ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" stop'
ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" remove'
- ; Remove from PATH
- ${un.EnvVarUpdate} $0 "PATH" "R" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin"
-
; Remove files and uninstaller
RMDir /r "$INSTDIR\rabbitmq_server-%%VERSION%%"
Delete "$INSTDIR\rabbitmq.ico"
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index f41815d0..b26bb988 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -156,13 +156,6 @@
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
-%% Googling around suggests that Windows has a limit somewhere around
-%% 16M, eg
-%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
-%% however, it turns out that's only available through the win32
-%% API. Via the C Runtime, we have just 512:
-%% http://msdn.microsoft.com/en-us/library/6e3b887c%28VS.80%29.aspx
--define(FILE_HANDLES_LIMIT_WINDOWS, 512).
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
@@ -242,7 +235,7 @@
-> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
-spec(read/2 :: (ref(), non_neg_integer()) ->
- val_or_error([char()] | binary()) | 'eof').
+ val_or_error([char()] | binary()) | 'eof').
-spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
-spec(sync/1 :: (ref()) -> ok_or_error()).
-spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
@@ -252,7 +245,7 @@
-spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())).
-spec(flush/1 :: (ref()) -> ok_or_error()).
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
- val_or_error(non_neg_integer())).
+ val_or_error(non_neg_integer())).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
@@ -867,34 +860,35 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
false -> {noreply, run_pending_item(Item, State1)}
end;
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
- obtain_count = Count,
- obtain_pending = Pending,
- clients = Clients })
- when Limit =/= infinity andalso Count >= Limit ->
- ok = track_client(Pid, Clients),
- true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
- {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }};
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
clients = Clients }) ->
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
ok = track_client(Pid, Clients),
- case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of
- true ->
- true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- {noreply, reduce(State #fhc_state {
- obtain_pending = pending_in(Item, Pending) })};
- false ->
- {noreply, run_pending_item(Item, State)}
- end;
+ Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ Enqueue = fun () ->
+ true = ets:update_element(Clients, Pid,
+ {#cstate.blocked, true}),
+ State #fhc_state {
+ obtain_pending = pending_in(Item, Pending) }
+ end,
+ {noreply,
+ case obtain_limit_reached(State) of
+ true -> Enqueue();
+ false -> case needs_reduce(State #fhc_state {
+ obtain_count = Count + 1 }) of
+ true -> reduce(Enqueue());
+ false -> adjust_alarm(
+ State, run_pending_item(Item, State))
+ end
+ end};
handle_call({set_limit, Limit}, _From, State) ->
- {reply, ok, maybe_reduce(
- process_pending(State #fhc_state {
- limit = Limit,
- obtain_limit = obtain_limit(Limit) }))};
+ {reply, ok, adjust_alarm(
+ State, maybe_reduce(
+ process_pending(
+ State #fhc_state {
+ limit = Limit,
+ obtain_limit = obtain_limit(Limit) })))};
handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
{reply, Limit, State};
@@ -923,9 +917,9 @@ handle_cast({close, Pid, EldestUnusedSince},
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
- {noreply, process_pending(
+ {noreply, adjust_alarm(State, process_pending(
update_counts(open, Pid, -1,
- State #fhc_state { elders = Elders1 }))};
+ State #fhc_state { elders = Elders1 })))};
handle_cast({transfer, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
@@ -947,13 +941,15 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
- {noreply, process_pending(
- State #fhc_state {
- open_count = OpenCount - Opened,
- open_pending = filter_pending(FilterFun, OpenPending),
- obtain_count = ObtainCount - Obtained,
- obtain_pending = filter_pending(FilterFun, ObtainPending),
- elders = dict:erase(Pid, Elders) })}.
+ {noreply, adjust_alarm(
+ State,
+ process_pending(
+ State #fhc_state {
+ open_count = OpenCount - Opened,
+ open_pending = filter_pending(FilterFun, OpenPending),
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = filter_pending(FilterFun, ObtainPending),
+ elders = dict:erase(Pid, Elders) }))}.
terminate(_Reason, State = #fhc_state { clients = Clients }) ->
ets:delete(Clients),
@@ -1013,6 +1009,18 @@ obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of
OLimit -> OLimit
end.
+obtain_limit_reached(#fhc_state { obtain_limit = Limit,
+ obtain_count = Count}) ->
+ Limit =/= infinity andalso Count >= Limit.
+
+adjust_alarm(OldState, NewState) ->
+ case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
+ {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []});
+ {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit);
+ _ -> ok
+ end,
+ NewState.
+
requested({_Kind, _Pid, Requested, _From}) ->
Requested.
@@ -1117,7 +1125,7 @@ reduce(State = #fhc_state { open_pending = OpenPending,
case CStates of
[] -> ok;
_ -> case (Sum / ClientCount) -
- (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
+ (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
AverageAge when AverageAge > 0 ->
notify_age(CStates, AverageAge);
_ ->
@@ -1170,29 +1178,20 @@ track_client(Pid, Clients) ->
false -> ok
end.
-%% For all unices, assume ulimit exists. Further googling suggests
-%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n
-%% is file handles
+
+%% To increase the number of file descriptors: on Windows set ERL_MAX_PORTS
+%% environment variable, on Linux set `ulimit -n`.
ulimit() ->
- case os:type() of
- {win32, _OsName} ->
- ?FILE_HANDLES_LIMIT_WINDOWS;
- {unix, _OsName} ->
- %% Under Linux, Solaris and FreeBSD, ulimit is a shell
- %% builtin, not a command. In OS X and AIX it's a command.
- %% Fortunately, os:cmd invokes the cmd in a shell env, so
- %% we're safe in all cases.
- case os:cmd("ulimit -n") of
- "unlimited" ->
- infinity;
- String = [C|_] when $0 =< C andalso C =< $9 ->
- list_to_integer(
- lists:takewhile(
- fun (D) -> $0 =< D andalso D =< $9 end, String));
- _ ->
- %% probably a variant of
- %% "/bin/sh: line 1: ulimit: command not found\n"
- unknown
+ case proplists:get_value(max_fds, erlang:system_info(check_io)) of
+ MaxFds when is_integer(MaxFds) andalso MaxFds > 1 ->
+ case os:type() of
+ {win32, _OsName} ->
+ %% On Windows max_fds is twice the number of open files:
+ %% https://github.com/yrashk/erlang/blob/e1282325ed75e52a98d5/erts/emulator/sys/win32/sys.c#L2459-2466
+ MaxFds div 2;
+ _Any ->
+ %% For other operating systems trust Erlang.
+ MaxFds
end;
_ ->
unknown
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 94296f97..43e0a8f5 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -453,8 +453,8 @@ unregister_name({global,Name}) ->
_ = global:unregister_name(Name);
unregister_name(Pid) when is_pid(Pid) ->
Pid;
-% Under R12 let's just ignore it, as we have a single term as Name.
-% On R13 it will never get here, as we get tuple with 'local/global' atom.
+%% Under R12 let's just ignore it, as we have a single term as Name.
+%% On R13 it will never get here, as we get tuple with 'local/global' atom.
unregister_name(_Name) -> ok.
extend_backoff(undefined) ->
diff --git a/src/gm.erl b/src/gm.erl
index 70633a08..fd8d9b77 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -852,9 +852,9 @@ alive_view_members({_Ver, View}) ->
all_known_members({_Ver, View}) ->
?DICT:fold(
- fun (Member, #view_member { aliases = Aliases }, Acc) ->
- ?SETS:to_list(Aliases) ++ [Member | Acc]
- end, [], View).
+ fun (Member, #view_member { aliases = Aliases }, Acc) ->
+ ?SETS:to_list(Aliases) ++ [Member | Acc]
+ end, [], View).
group_to_view(#gm_group { members = Members, version = Ver }) ->
Alive = lists:filter(fun is_member_alive/1, Members),
@@ -1037,15 +1037,15 @@ maybe_erase_aliases(State = #state { self = Self,
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
- fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
- #member { last_pub = LP, last_ack = LA } =
- find_member_or_blank(Id, MembersState),
- case can_erase_view_member(Self, Id, LA, LP) of
- true -> {[Id | ErasableAcc],
- erase_member(Id, MembersStateAcc)};
- false -> Acc
- end
- end, {[], MembersState}, Aliases),
+ fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
+ #member { last_pub = LP, last_ack = LA } =
+ find_member_or_blank(Id, MembersState),
+ case can_erase_view_member(Self, Id, LA, LP) of
+ true -> {[Id | ErasableAcc],
+ erase_member(Id, MembersStateAcc)};
+ false -> Acc
+ end
+ end, {[], MembersState}, Aliases),
State1 = State #state { members_state = MembersState1 },
case Erasable of
[] -> {ok, State1};
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index 65e9cff0..ca0ffd64 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -117,13 +117,13 @@ test_broadcast(Fun) ->
with_two_members(test_broadcast_fun(Fun)).
test_broadcast_fun(Fun) ->
- fun (Pid, Pid2) ->
- ok = Fun(Pid, magic_message),
- passed = receive_or_throw({msg, Pid, Pid, magic_message},
- timeout_waiting_for_msg),
- passed = receive_or_throw({msg, Pid2, Pid, magic_message},
- timeout_waiting_for_msg)
- end.
+ fun (Pid, Pid2) ->
+ ok = Fun(Pid, magic_message),
+ passed = receive_or_throw({msg, Pid, Pid, magic_message},
+ timeout_waiting_for_msg),
+ passed = receive_or_throw({msg, Pid2, Pid, magic_message},
+ timeout_waiting_for_msg)
+ end.
with_two_members(Fun) ->
ok = gm:create_tables(),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 6eb59c3e..c9a929ae 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -375,7 +375,7 @@ config_files() ->
error -> []
end.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
print_banner() ->
{ok, Product} = application:get_key(id),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 37e40981..d38ecb91 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -18,12 +18,14 @@
-behaviour(gen_event).
--export([start/0, stop/0, register/2]).
+-export([start/0, stop/0, register/2, on_node_up/1, on_node_down/1]).
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
--record(alarms, {alertees, vm_memory_high_watermark = false}).
+-export([remote_conserve_memory/2]). %% Internal use only
+
+-record(alarms, {alertees, alarmed_nodes}).
%%----------------------------------------------------------------------------
@@ -33,6 +35,8 @@
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
+-spec(on_node_up/1 :: (node()) -> 'ok').
+-spec(on_node_down/1 :: (node()) -> 'ok').
-endif.
@@ -56,39 +60,57 @@ register(Pid, HighMemMFA) ->
{register, Pid, HighMemMFA},
infinity).
+on_node_up(Node) -> gen_event:notify(alarm_handler, {node_up, Node}).
+
+on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}).
+
+%% Can't use alarm_handler:{set,clear}_alarm because that doesn't
+%% permit notifying a remote node.
+remote_conserve_memory(Pid, true) ->
+ gen_event:notify({alarm_handler, node(Pid)},
+ {set_alarm, {{vm_memory_high_watermark, node()}, []}});
+remote_conserve_memory(Pid, false) ->
+ gen_event:notify({alarm_handler, node(Pid)},
+ {clear_alarm, {vm_memory_high_watermark, node()}}).
+
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #alarms{alertees = dict:new()}}.
+ {ok, #alarms{alertees = dict:new(),
+ alarmed_nodes = sets:new()}}.
-handle_call({register, Pid, {M, F, A} = HighMemMFA},
- State = #alarms{alertees = Alertess}) ->
- _MRef = erlang:monitor(process, Pid),
- ok = case State#alarms.vm_memory_high_watermark of
- true -> apply(M, F, A ++ [Pid, true]);
- false -> ok
- end,
- NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
- {ok, State#alarms.vm_memory_high_watermark,
- State#alarms{alertees = NewAlertees}};
+handle_call({register, Pid, HighMemMFA}, State) ->
+ {ok, 0 < sets:size(State#alarms.alarmed_nodes),
+ internal_register(Pid, HighMemMFA, State)};
handle_call(_Request, State) ->
{ok, not_understood, State}.
-handle_event({set_alarm, {vm_memory_high_watermark, []}}, State) ->
- ok = alert(true, State#alarms.alertees),
- {ok, State#alarms{vm_memory_high_watermark = true}};
+handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) ->
+ {ok, maybe_alert(fun sets:add_element/2, Node, State)};
-handle_event({clear_alarm, vm_memory_high_watermark}, State) ->
- ok = alert(false, State#alarms.alertees),
- {ok, State#alarms{vm_memory_high_watermark = false}};
+handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) ->
+ {ok, maybe_alert(fun sets:del_element/2, Node, State)};
+
+handle_event({node_up, Node}, State) ->
+ %% Must do this via notify and not call to avoid possible deadlock.
+ ok = gen_event:notify(
+ {alarm_handler, Node},
+ {register, self(), {?MODULE, remote_conserve_memory, []}}),
+ {ok, State};
+
+handle_event({node_down, Node}, State) ->
+ {ok, maybe_alert(fun sets:del_element/2, Node, State)};
+
+handle_event({register, Pid, HighMemMFA}, State) ->
+ {ok, internal_register(Pid, HighMemMFA, State)};
handle_event(_Event, State) ->
{ok, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #alarms{alertees = Alertess}) ->
- {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}};
+ State = #alarms{alertees = Alertees}) ->
+ {ok, State#alarms{alertees = dict:erase(Pid, Alertees)}};
handle_info(_Info, State) ->
{ok, State}.
@@ -100,10 +122,45 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-alert(_Alert, undefined) ->
- ok;
-alert(Alert, Alertees) ->
- dict:fold(fun (Pid, {M, F, A}, Acc) ->
- ok = erlang:apply(M, F, A ++ [Pid, Alert]),
- Acc
+
+maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN,
+ alertees = Alertees}) ->
+ AN1 = SetFun(Node, AN),
+ BeforeSz = sets:size(AN),
+ AfterSz = sets:size(AN1),
+ %% If we have changed our alarm state, inform the remotes.
+ IsLocal = Node =:= node(),
+ if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees);
+ IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees);
+ true -> ok
+ end,
+ %% If the overall alarm state has changed, inform the locals.
+ case {BeforeSz, AfterSz} of
+ {0, 1} -> ok = alert_local(true, Alertees);
+ {1, 0} -> ok = alert_local(false, Alertees);
+ {_, _} -> ok
+ end,
+ State#alarms{alarmed_nodes = AN1}.
+
+alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2).
+
+alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2).
+
+alert(Alert, Alertees, NodeComparator) ->
+ Node = node(),
+ dict:fold(fun (Pid, {M, F, A}, ok) ->
+ case NodeComparator(Node, node(Pid)) of
+ true -> apply(M, F, A ++ [Pid, Alert]);
+ false -> ok
+ end
end, ok, Alertees).
+
+internal_register(Pid, {M, F, A} = HighMemMFA,
+ State = #alarms{alertees = Alertees}) ->
+ _MRef = erlang:monitor(process, Pid),
+ case sets:is_element(node(), State#alarms.alarmed_nodes) of
+ true -> ok = apply(M, F, A ++ [Pid, true]);
+ false -> ok
+ end,
+ NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
+ State#alarms{alertees = NewAlertees}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 46b78c39..3aa20821 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -52,7 +52,7 @@
-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
- 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
@@ -100,13 +100,13 @@
-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
- (rabbit_types:amqqueue(), 'false', 'false')
+ (rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
- (rabbit_types:amqqueue(), 'true' , 'false')
+ (rabbit_types:amqqueue(), 'true' , 'false')
-> qlen() | rabbit_types:error('in_use');
- (rabbit_types:amqqueue(), 'false', 'true' )
+ (rabbit_types:amqqueue(), 'false', 'true' )
-> qlen() | rabbit_types:error('not_empty');
- (rabbit_types:amqqueue(), 'true' , 'true' )
+ (rabbit_types:amqqueue(), 'true' , 'true' )
-> qlen() |
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
@@ -122,10 +122,10 @@
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
- {'ok', non_neg_integer(), qmsg()} | 'empty').
+ {'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/7 ::
- (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined',
- rabbit_types:ctag(), boolean(), any())
+ (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined',
+ rabbit_types:ctag(), boolean(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -141,9 +141,9 @@
fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit())).
-spec(maybe_run_queue_via_backing_queue/2 ::
- (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
+ (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(maybe_run_queue_via_backing_queue_async/2 ::
- (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
+ (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -214,8 +214,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[] -> ok = store_queue(Q),
B = add_default_binding(Q),
fun (Tx) -> B(Tx), Q end;
- [_] -> %% Q exists on stopped node
- rabbit_misc:const(not_found)
+ %% Q exists on stopped node
+ [_] -> rabbit_misc:const(not_found)
end;
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
@@ -288,7 +288,7 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
- RequiredArgs) ->
+ RequiredArgs) ->
rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
[<<"x-expires">>]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 44053593..54c92dc7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,7 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
-% Queue's state
+%% Queue's state
-record(q, {q,
exclusive_consumer,
has_had_consumers,
@@ -46,7 +46,7 @@
rate_timer_ref,
expiry_timer_ref,
stats_timer,
- guid_to_channel,
+ msg_id_to_channel,
ttl,
ttl_timer_ref
}).
@@ -112,7 +112,7 @@ init(Q) ->
expiry_timer_ref = undefined,
ttl = undefined,
stats_timer = rabbit_event:init_stats_timer(),
- guid_to_channel = dict:new()}, hibernate,
+ msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -283,17 +283,16 @@ lookup_ch(ChPid) ->
ch_record(ChPid) ->
Key = {ch, ChPid},
case get(Key) of
- undefined ->
- MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumer_count = 0,
- ch_pid = ChPid,
- monitor_ref = MonitorRef,
- acktags = sets:new(),
- is_limit_active = false,
- txn = none,
- unsent_message_count = 0},
- put(Key, C),
- C;
+ undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ C = #cr{consumer_count = 0,
+ ch_pid = ChPid,
+ monitor_ref = MonitorRef,
+ acktags = sets:new(),
+ is_limit_active = false,
+ txn = none,
+ unsent_message_count = 0},
+ put(Key, C),
+ C;
C = #cr{} -> C
end.
@@ -319,18 +318,16 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
-all_ch_record() ->
- [C || {{ch, _}, C} <- get()].
+all_ch_record() -> [C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
- BlockedOld = is_ch_blocked(OldCR),
- BlockedNew = is_ch_blocked(NewCR),
- if BlockedOld andalso not(BlockedNew) -> unblock;
- BlockedNew andalso not(BlockedOld) -> block;
- true -> ok
+ case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
+ {true, false} -> unblock;
+ {false, true} -> block;
+ {_, _} -> ok
end.
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
@@ -365,13 +362,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
BlockedConsumers};
- block ->
- {ActiveConsumers1, BlockedConsumers1} =
- move_consumers(ChPid,
- ActiveConsumersTail,
- BlockedConsumers),
- {ActiveConsumers1,
- queue:in(QEntry, BlockedConsumers1)}
+ block -> {ActiveConsumers1, BlockedConsumers1} =
+ move_consumers(ChPid,
+ ActiveConsumersTail,
+ BlockedConsumers),
+ {ActiveConsumers1,
+ queue:in(QEntry, BlockedConsumers1)}
end,
State2 = State1#q{
active_consumers = NewActiveConsumers,
@@ -396,30 +392,28 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{FunAcc, State}
end.
-deliver_from_queue_pred(IsEmpty, _State) ->
- not IsEmpty.
+deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
-confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
- {CMs, GTC1} =
- lists:foldl(
- fun(Guid, {CMs, GTC0}) ->
- case dict:find(Guid, GTC0) of
- {ok, {ChPid, MsgSeqNo}} ->
- {gb_trees_cons(ChPid, MsgSeqNo, CMs),
- dict:erase(Guid, GTC0)};
- _ ->
- {CMs, GTC0}
- end
- end, {gb_trees:empty(), GTC}, Guids),
+confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
+ {CMs, MTC1} = lists:foldl(
+ fun(MsgId, {CMs, MTC0}) ->
+ case dict:find(MsgId, MTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ dict:erase(MsgId, MTC0)};
+ _ ->
+ {CMs, MTC0}
+ end
+ end, {gb_trees:empty(), MTC}, MsgIds),
gb_trees:map(fun(ChPid, MsgSeqNos) ->
rabbit_channel:confirm(ChPid, MsgSeqNos)
end, CMs),
- State#q{guid_to_channel = GTC1}.
+ State#q{msg_id_to_channel = MTC1}.
gb_trees_cons(Key, Value, Tree) ->
case gb_trees:lookup(Key, Tree) of
@@ -428,19 +422,18 @@ gb_trees_cons(Key, Value, Tree) ->
end.
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
- {no_confirm, State};
+ {never, State};
record_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
- guid = Guid}},
- State =
- #q{guid_to_channel = GTC,
- q = #amqqueue{durable = true}}) ->
- {confirm,
- State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}};
+ id = MsgId}},
+ State = #q{q = #amqqueue{durable = true},
+ msg_id_to_channel = MTC}) ->
+ {eventually,
+ State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}};
record_confirm_message(_Delivery, State) ->
- {no_confirm, State}.
+ {immediately, State}.
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -456,11 +449,9 @@ attempt_delivery(#delivery{txn = none,
message = Message,
msg_seq_no = MsgSeqNo},
{NeedsConfirming, State = #q{backing_queue = BQ}}) ->
- %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming
- case {NeedsConfirming, MsgSeqNo} of
- {_, undefined} -> ok;
- {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
- {confirm, _} -> ok
+ case NeedsConfirming of
+ immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ _ -> ok
end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -472,7 +463,7 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = (NeedsConfirming =:= confirm)},
+ needs_confirming = (NeedsConfirming =:= eventually)},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
@@ -480,31 +471,28 @@ attempt_delivery(#delivery{txn = none,
{Delivered, State1} =
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
{Delivered, NeedsConfirming, State1};
-attempt_delivery(#delivery{txn = Txn,
+attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
- {NeedsConfirming,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}}) ->
+ {NeedsConfirming, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}}) ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
- {true,
- NeedsConfirming,
- State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
+ BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
+ {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
{true, _, State1} ->
- {true, State1};
+ State1;
{false, NeedsConfirming, State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}} ->
#delivery{message = Message} = Delivery,
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
needs_confirming =
- (NeedsConfirming =:= confirm)},
+ (NeedsConfirming =:= eventually)},
BQS),
- {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
+ ensure_ttl_timer(State1#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
@@ -618,9 +606,9 @@ backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {Guids, BQS1} = Fun(BQS),
+ {MsgIds, BQS1} = Fun(BQS),
run_message_queue(
- confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
+ confirm_messages(MsgIds, State#q{backing_queue_state = BQS1})).
commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
State = #q{backing_queue = BQ,
@@ -661,9 +649,8 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
Now = now_micros(),
BQS1 = BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) ->
- Now > Expiry
- end, BQS),
+ fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -722,10 +709,10 @@ i(Item, _) ->
consumers(#q{active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers}) ->
rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)).
emit_stats(State) ->
emit_stats(State, []).
@@ -747,7 +734,7 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
{channel, ChPid},
{queue, self()}]).
-%---------------------------------------------------------------------------
+%%----------------------------------------------------------------------------
prioritise_call(Msg, _From, _State) ->
case Msg of
@@ -767,8 +754,8 @@ prioritise_cast(Msg, _State) ->
maybe_expire -> 8;
drop_expired -> 8;
emit_stats -> 7;
- {ack, _Txn, _MsgIds, _ChPid} -> 7;
- {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {ack, _Txn, _AckTags, _ChPid} -> 7;
+ {reject, _AckTags, _Requeue, _ChPid} -> 7;
{notify_sent, _ChPid} -> 7;
{unblock, _ChPid} -> 7;
{maybe_run_queue_via_backing_queue, _Fun} -> 6;
@@ -814,8 +801,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery},
- _From, State) ->
+handle_call({deliver_immediately, Delivery}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -836,8 +822,7 @@ handle_call({deliver_immediately, Delivery},
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" delivery mode. Reply asap.
gen_server2:reply(From, true),
- {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- noreply(NewState);
+ noreply(deliver_or_enqueue(Delivery, State));
handle_call({commit, Txn, ChPid}, From, State) ->
case lookup_ch(ChPid) of
@@ -906,15 +891,13 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
case is_ch_blocked(C) of
true -> State1#q{
blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
+ add_consumer(ChPid, Consumer,
+ State1#q.blocked_consumers)};
false -> run_message_queue(
State1#q{
active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
+ add_consumer(ChPid, Consumer,
+ State1#q.active_consumers)})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck),
@@ -1001,8 +984,7 @@ handle_cast(sync_timeout, State) ->
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- noreply(NewState);
+ noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, Txn, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index a564480b..3d005845 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -52,8 +52,8 @@
-spec(clear_admin/1 :: (rabbit_types:username()) -> 'ok').
-spec(list_users/0 :: () -> [{rabbit_types:username(), boolean()}]).
-spec(lookup_user/1 :: (rabbit_types:username())
- -> rabbit_types:ok(rabbit_types:internal_user())
- | rabbit_types:error('not_found')).
+ -> rabbit_types:ok(rabbit_types:internal_user())
+ | rabbit_types:error('not_found')).
-spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(),
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index 2168495d..b8682a46 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -54,5 +54,5 @@ handle_response(Response, _State) ->
_ ->
{protocol_error,
"AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field",
- [LoginTable]}
+ [LoginTable]}
end.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 6a21e10f..03c1fdd1 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -62,7 +62,7 @@ behaviour_info(callbacks) ->
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about. Must return 1 guid per Ack, in the same order as Acks.
+ %% about. Must return 1 msg_id per Ack, in the same order as Acks.
{ack, 2},
%% A publish, but in the context of a transaction.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 57aad808..3cf73e80 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -44,7 +44,7 @@
-spec(message/3 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
rabbit_types:decoded_content()) ->
- rabbit_types:ok_or_error2(rabbit_types:message(), any())).
+ rabbit_types:ok_or_error2(rabbit_types:message(), any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
@@ -107,21 +107,21 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
false -> DecodedContent;
{value, Found} -> Headers0 = lists:delete(Found, Headers),
rabbit_binary_generator:clear_encoded_content(
- DecodedContent#content{
- properties = Props#'P_basic'{
- headers = Headers0}})
+ DecodedContent#content{
+ properties = Props#'P_basic'{
+ headers = Headers0}})
end.
message(ExchangeName, RoutingKey,
#content{properties = Props} = DecodedContent) ->
try
{ok, #basic_message{
- exchange_name = ExchangeName,
- content = strip_header(DecodedContent, ?DELETED_HEADER),
- guid = rabbit_guid:guid(),
- is_persistent = is_message_persistent(DecodedContent),
- routing_keys = [RoutingKey |
- header_routes(Props#'P_basic'.headers)]}}
+ exchange_name = ExchangeName,
+ content = strip_header(DecodedContent, ?DELETED_HEADER),
+ id = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent),
+ routing_keys = [RoutingKey |
+ header_routes(Props#'P_basic'.headers)]}}
catch
{error, _Reason} = Error -> Error
end.
@@ -175,15 +175,15 @@ is_message_persistent(#content{properties = #'P_basic'{
Other -> throw({error, {delivery_mode_unknown, Other}})
end.
-% Extract CC routes from headers
+%% Extract CC routes from headers
header_routes(undefined) ->
[];
header_routes(HeadersTable) ->
lists:append(
- [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
- {array, Routes} -> [Route || {longstr, Route} <- Routes];
- undefined -> [];
- {Type, _Val} -> throw({error, {unacceptable_type_in_header,
- Type,
- binary_to_list(HeaderKey)}})
- end || HeaderKey <- ?ROUTING_HEADERS]).
+ [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
+ {array, Routes} -> [Route || {longstr, Route} <- Routes];
+ undefined -> [];
+ {Type, _Val} -> throw({error, {unacceptable_type_in_header,
+ Type,
+ binary_to_list(HeaderKey)}})
+ end || HeaderKey <- ?ROUTING_HEADERS]).
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index dc81ace6..68511a32 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -18,12 +18,13 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
-% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
-% - 1 byte of frame type
-% - 2 bytes of channel number
-% - 4 bytes of frame payload length
-% - 1 byte of payload trailer FRAME_END byte
-% See definition of check_empty_content_body_frame_size/0, an assertion called at startup.
+%% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
+%% - 1 byte of frame type
+%% - 2 bytes of channel number
+%% - 4 bytes of frame payload length
+%% - 1 byte of payload trailer FRAME_END byte
+%% See definition of check_empty_content_body_frame_size/0,
+%% an assertion called at startup.
-define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8).
-export([build_simple_method_frame/3,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 96a22dca..7ddb7814 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -70,7 +70,7 @@
rabbit_types:infos()).
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys())
- -> [rabbit_types:infos()]).
+ -> [rabbit_types:infos()]).
-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()).
-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()).
-spec(remove_for_destination/1 ::
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e92421fc..da103284 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,9 +33,9 @@
start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm,
- confirmed, capabilities}).
+ consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
+ stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
+ unconfirmed_qm, confirmed, capabilities}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -68,9 +68,9 @@
-type(channel_number() :: non_neg_integer()).
-spec(start_link/9 ::
- (channel_number(), pid(), pid(), rabbit_types:protocol(),
- rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
- pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
+ (channel_number(), pid(), pid(), rabbit_types:protocol(),
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
@@ -176,6 +176,7 @@ init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
+ consumer_monitors = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
@@ -247,6 +248,11 @@ handle_cast(ready_for_close, State = #ch{state = closing,
handle_cast(terminate, State) ->
{stop, normal, State};
+handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
+ State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(WriterPid, Msg),
+ noreply(monitor_consumer(ConsumerTag, State));
+
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
noreply(State);
@@ -288,23 +294,15 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
-handle_info({'DOWN', _MRef, process, QPid, Reason},
- State = #ch{unconfirmed_qm = UQM}) ->
- MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSet} -> gb_sets:to_list(MsgSet);
- none -> []
- end,
- %% We remove the MsgSeqNos from UQM before calling
- %% process_confirms to prevent each MsgSeqNo being removed from
- %% the set one by one which which would be inefficient
- State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
- {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
- erase_queue_stats(QPid),
- State3 = (case Reason of
- normal -> fun record_confirms/2;
- _ -> fun send_nacks/2
- end)(MXs, State2),
- noreply(queue_blocked(QPid, State3)).
+handle_info({'DOWN', MRef, process, QPid, Reason},
+ State = #ch{consumer_monitors = ConsumerMonitors}) ->
+ noreply(
+ case dict:find(MRef, ConsumerMonitors) of
+ error ->
+ handle_publishing_queue_down(QPid, Reason, State);
+ {ok, ConsumerTag} ->
+ handle_consuming_queue_down(MRef, ConsumerTag, State)
+ end).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -513,23 +511,24 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
confirm(MsgSeqNos, QPid, State) ->
- {MXs, State1} = process_confirms(MsgSeqNos, QPid, State),
+ {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
record_confirms(MXs, State1).
-process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}) ->
+process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}) ->
{MXs, UMQ1, UQM1} =
lists:foldl(
- fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) ->
+ fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc,
- State);
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
+ Acc, Nack, State);
none -> Acc
end
end, {[], UMQ, UQM}, MsgSeqNos),
{MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) ->
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack,
+ State) ->
%% these confirms will be emitted even when a queue dies, but that
%% should be fine, since the queue stats get erased immediately
maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
@@ -544,8 +543,10 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) ->
UQM
end,
Qs1 = gb_sets:del_element(QPid, Qs),
- case gb_sets:is_empty(Qs1) of
- true ->
+ %% If QPid somehow died initiating a nack, clear the message from
+ %% internal data-structures. Also, cleanup empty entries.
+ case (Nack orelse gb_sets:is_empty(Qs1)) of
+ true ->
{[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
false ->
{MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
@@ -688,9 +689,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid,
- limiter_pid = LimiterPid,
- consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{reader_pid = ReaderPid,
+ limiter_pid = LimiterPid,
+ consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -707,18 +708,24 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) ->
- rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), LimiterPid,
- ActualConsumerTag, ExclusiveConsume,
- ok_msg(NoWait, #'basic.consume_ok'{
- consumer_tag = ActualConsumerTag}))
+ {rabbit_amqqueue:basic_consume(
+ Q, NoAck, self(), LimiterPid,
+ ActualConsumerTag, ExclusiveConsume,
+ ok_msg(NoWait, #'basic.consume_ok'{
+ consumer_tag = ActualConsumerTag})),
+ Q}
end) of
- ok ->
- {noreply, State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- QueueName,
- ConsumerMapping)}};
- {error, exclusive_consume_unavailable} ->
+ {ok, Q} ->
+ State1 = State#ch{consumer_mapping =
+ dict:store(ActualConsumerTag,
+ {Q, undefined},
+ ConsumerMapping)},
+ {noreply,
+ case NoWait of
+ true -> monitor_consumer(ActualConsumerTag, State1);
+ false -> State1
+ end};
+ {{error, exclusive_consume_unavailable}, _Q} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
[rabbit_misc:rs(QueueName)])
@@ -731,26 +738,31 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, QueueName} ->
- NewState = State#ch{consumer_mapping =
- dict:erase(ConsumerTag,
- ConsumerMapping)},
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- %% In order to ensure that no more messages
- %% are sent to the consumer after the
- %% cancel_ok has been sent, we get the
- %% queue process to send the cancel_ok on
- %% our behalf. If we were sending the
- %% cancel_ok ourselves it might overtake a
- %% message sent previously by the queue.
+ {ok, {Q, MRef}} ->
+ ConsumerMonitors1 =
+ case MRef of
+ undefined -> ConsumerMonitors;
+ _ -> true = erlang:demonitor(MRef),
+ dict:erase(MRef, ConsumerMonitors)
+ end,
+ NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
+ ConsumerMapping),
+ consumer_monitors = ConsumerMonitors1},
+ %% In order to ensure that no more messages are sent to
+ %% the consumer after the cancel_ok has been sent, we get
+ %% the queue process to send the cancel_ok on our
+ %% behalf. If we were sending the cancel_ok ourselves it
+ %% might overtake a message sent previously by the queue.
+ case rabbit_misc:with_exit_handler(
+ fun () -> {error, not_found} end,
+ fun () ->
rabbit_amqqueue:basic_cancel(
Q, self(), ConsumerTag,
ok_msg(NoWait, #'basic.cancel_ok'{
@@ -1080,6 +1092,53 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ capabilities = Capabilities}) ->
+ case rabbit_misc:table_lookup(
+ Capabilities, <<"consumer_cancel_notify">>) of
+ {bool, true} ->
+ {#amqqueue{pid = QPid} = Q, undefined} =
+ dict:fetch(ConsumerTag, ConsumerMapping),
+ MRef = erlang:monitor(process, QPid),
+ State#ch{consumer_mapping =
+ dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
+ consumer_monitors =
+ dict:store(MRef, ConsumerTag, ConsumerMonitors)};
+ _ ->
+ State
+ end.
+
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
+ MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSet} -> gb_sets:to_list(MsgSet);
+ none -> []
+ end,
+ %% We remove the MsgSeqNos from UQM before calling
+ %% process_confirms to prevent each MsgSeqNo being removed from
+ %% the set one by one which which would be inefficient
+ State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
+ {Nack, SendFun} = case Reason of
+ normal -> {false, fun record_confirms/2};
+ _ -> {true, fun send_nacks/2}
+ end,
+ {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
+ erase_queue_stats(QPid),
+ State3 = SendFun(MXs, State2),
+ queue_blocked(QPid, State3).
+
+handle_consuming_queue_down(MRef, ConsumerTag,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ writer_pid = WriterPid}) ->
+ ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
+ ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors),
+ Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
+ nowait = true},
+ ok = rabbit_writer:send_command(WriterPid, Cancel),
+ State#ch{consumer_mapping = ConsumerMapping1,
+ consumer_monitors = ConsumerMonitors1}.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -1250,16 +1309,9 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
consumer_queues(Consumers) ->
- [QPid || QueueName <-
- sets:to_list(
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)),
- case rabbit_amqqueue:lookup(QueueName) of
- {ok, Q} -> QPid = Q#amqqueue.pid, true;
- %% queue has been deleted in the meantime
- {error, not_found} -> QPid = none, false
- end].
+ lists:usort([QPid ||
+ {_Key, {#amqqueue{pid = QPid}, _MRef}}
+ <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 9cc407bc..8175ad80 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -68,12 +68,12 @@ start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost,
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
- SupPid,
- {channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid, Protocol,
- User, VHost, Capabilities, Collector,
- start_limiter_fun(SupPid)]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ClientChannelPid, ClientChannelPid, Protocol,
+ User, VHost, Capabilities, Collector,
+ start_limiter_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index dbdc6cd4..15e92542 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -29,9 +29,9 @@
-ifdef(use_specs).
-spec(start_link/1 :: (mfa()) ->
- rabbit_types:ok_pid_or_error()).
+ rabbit_types:ok_pid_or_error()).
-spec(start_link/2 :: ({'local', atom()}, mfa()) ->
- rabbit_types:ok_pid_or_error()).
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 746bb66e..8364ecd8 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -103,24 +103,22 @@ print_badrpc_diagnostics(Node) ->
diagnostics(Node) ->
{_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- [
- {"diagnostics:", []},
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w",
- [NodeHost, EpmdReason]};
- {ok, NamePorts} ->
- {"- nodes and their ports on ~s: ~p",
- [NodeHost, [{list_to_atom(Name), Port} ||
- {Name, Port} <- NamePorts]]}
- end,
- {"- current node: ~w", [node()]},
- case init:get_argument(home) of
- {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
- Other -> {"- no current node home dir: ~p", [Other]}
- end,
- {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}
- ].
+ [{"diagnostics:", []},
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ {"- unable to connect to epmd on ~s: ~w",
+ [NodeHost, EpmdReason]};
+ {ok, NamePorts} ->
+ {"- nodes and their ports on ~s: ~p",
+ [NodeHost, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]]}
+ end,
+ {"- current node: ~w", [node()]},
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
+ Other -> {"- no current node home dir: ~p", [Other]}
+ end,
+ {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}].
stop() ->
ok.
@@ -152,13 +150,13 @@ action(force_reset, Node, [], _Opts, Inform) ->
action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Clustering node ~p with ~p",
- [Node, ClusterNodes]),
+ [Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
- [Node, ClusterNodes]),
+ [Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
action(status, Node, [], _Opts, Inform) ->
@@ -320,10 +318,8 @@ wait_for_application0(Node, Attempts) ->
wait_for_application(Node, Attempts).
default_if_empty(List, Default) when is_list(List) ->
- if List == [] ->
- Default;
- true ->
- [list_to_atom(X) || X <- List]
+ if List == [] -> Default;
+ true -> [list_to_atom(X) || X <- List]
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
@@ -414,7 +410,7 @@ prettify_typed_amqp_value(Type, Value) ->
_ -> Value
end.
-% the slower shutdown on windows required to flush stdout
+%% the slower shutdown on windows required to flush stdout
quit(Status) ->
case os:type() of
{unix, _} ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 586563f6..a2693c69 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -26,8 +26,8 @@
-spec(boot/0 :: () -> 'ok').
-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
- {'ok', {rabbit_types:user(),
- rabbit_framing:amqp_table()}}).
+ {'ok', {rabbit_types:user(),
+ rabbit_framing:amqp_table()}}).
-spec(start_channel/7 ::
(rabbit_channel:channel_number(), pid(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
@@ -40,12 +40,12 @@
boot() ->
{ok, _} =
supervisor2:start_child(
- rabbit_sup,
- {rabbit_direct_client_sup,
- {rabbit_client_sup, start_link,
- [{local, rabbit_direct_client_sup},
- {rabbit_channel_sup, start_link, []}]},
- transient, infinity, supervisor, [rabbit_client_sup]}),
+ rabbit_sup,
+ {rabbit_direct_client_sup,
+ {rabbit_client_sup, start_link,
+ [{local, rabbit_direct_client_sup},
+ {rabbit_channel_sup, start_link, []}]},
+ transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
%%----------------------------------------------------------------------------
@@ -73,7 +73,7 @@ start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities,
Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
- rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, Protocol, User, VHost,
- Capabilities, Collector}]),
+ rabbit_direct_client_sup,
+ [{direct, Number, ClientChannelPid, Protocol, User, VHost,
+ Capabilities, Collector}]),
{ok, ChannelPid}.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 40651d36..9ed532db 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -101,7 +101,7 @@ ensure_stats_timer(State = #state{level = none}, _Fun) ->
State;
ensure_stats_timer(State = #state{timer = undefined}, Fun) ->
{ok, TRef} = timer:apply_after(?STATS_INTERVAL,
- erlang, apply, [Fun, []]),
+ erlang, apply, [Fun, []]),
State#state{timer = TRef};
ensure_stats_timer(State, _Fun) ->
State.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 92259195..a463e570 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -62,7 +62,7 @@
-> rabbit_types:infos()).
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys())
- -> [rabbit_types:infos()]).
+ -> [rabbit_types:infos()]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> {rabbit_router:routing_result(), [pid()]}).
-spec(delete/2 ::
@@ -266,9 +266,9 @@ process_route(#resource{kind = queue} = QName,
call_with_exchange(XName, Fun, PrePostCommitFun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
- [] -> {error, not_found};
- [X] -> Fun(X)
- end
+ [] -> {error, not_found};
+ [X] -> Fun(X)
+ end
end, PrePostCommitFun).
delete(XName, IfUnused) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 2363d05e..f12661d4 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -42,8 +42,8 @@ description() ->
route(#exchange{name = X},
#delivery{message = #basic_message{routing_keys = Routes}}) ->
lists:append([begin
- Words = split_topic_key(RKey),
- mnesia:async_dirty(fun trie_match/2, [X, Words])
+ Words = split_topic_key(RKey),
+ mnesia:async_dirty(fun trie_match/2, [X, Words])
end || RKey <- Routes]).
validate(_X) -> ok.
@@ -51,9 +51,9 @@ create(_Tx, _X) -> ok.
recover(_Exchange, Bs) ->
rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
- end).
+ fun () ->
+ lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
+ end).
delete(true, #exchange{name = X}, _Bs) ->
trie_remove_all_edges(X),
@@ -166,9 +166,9 @@ trie_child(X, Node, Word) ->
trie_bindings(X, Node) ->
MatchHead = #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- destination = '$1'}},
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = '$1'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
trie_add_edge(X, FromNode, ToNode, W) ->
@@ -194,9 +194,9 @@ trie_remove_binding(X, Node, D) ->
trie_binding_op(X, Node, D, Op) ->
ok = Op(rabbit_topic_trie_binding,
#topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- destination = D}},
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = D}},
write).
trie_has_any_children(X, Node) ->
@@ -209,10 +209,10 @@ trie_has_any_children(X, Node) ->
trie_has_any_bindings(X, Node) ->
has_any(rabbit_topic_trie_binding,
#topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ _ = '_'},
+ _ = '_'}).
trie_remove_all_edges(X) ->
remove_all(rabbit_topic_trie_edge,
@@ -223,8 +223,8 @@ trie_remove_all_edges(X) ->
trie_remove_all_bindings(X) ->
remove_all(rabbit_topic_trie_binding,
#topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X, _ = '_'},
- _ = '_'}).
+ trie_binding = #trie_binding{exchange_name = X, _ = '_'},
+ _ = '_'}).
has_any(Table, MatchHead) ->
Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read),
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 2f8c940b..996b0a98 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -111,11 +111,11 @@ stop() ->
init([]) ->
MemoryLimit = trunc(?MEMORY_LIMIT_SCALING *
- (try
- vm_memory_monitor:get_memory_limit()
- catch
- exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM
- end)),
+ (try
+ vm_memory_monitor:get_memory_limit()
+ catch
+ exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM
+ end)),
{ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
?SERVER, update, []),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index abc27c5f..e79a58a1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -105,7 +105,7 @@
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(table_lookup/2 ::
(rabbit_framing:amqp_table(), binary())
- -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
+ -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
-spec(r/2 :: (rabbit_types:vhost(), K)
-> rabbit_types:r3(rabbit_types:vhost(), K, '_')
when is_subtype(K, atom())).
@@ -469,11 +469,11 @@ map_in_order(F, L) ->
table_fold(F, Acc0, TableName) ->
lists:foldl(
fun (E, Acc) -> execute_mnesia_transaction(
- fun () -> case mnesia:match_object(TableName, E, read) of
- [] -> Acc;
- _ -> F(E, Acc)
- end
- end)
+ fun () -> case mnesia:match_object(TableName, E, read) of
+ [] -> Acc;
+ _ -> F(E, Acc)
+ end
+ end)
end, Acc0, dirty_read_all(TableName)).
dirty_read_all(TableName) ->
@@ -755,12 +755,12 @@ unlink_and_capture_exit(Pid) ->
after 0 -> ok
end.
-% Separate flags and options from arguments.
-% get_options([{flag, "-q"}, {option, "-p", "/"}],
-% ["set_permissions","-p","/","guest",
-% "-q",".*",".*",".*"])
-% == {["set_permissions","guest",".*",".*",".*"],
-% [{"-q",true},{"-p","/"}]}
+%% Separate flags and options from arguments.
+%% get_options([{flag, "-q"}, {option, "-p", "/"}],
+%% ["set_permissions","-p","/","guest",
+%% "-q",".*",".*",".*"])
+%% == {["set_permissions","guest",".*",".*",".*"],
+%% [{"-q",true},{"-p","/"}]}
get_options(Defs, As) ->
lists:foldl(fun(Def, {AsIn, RsIn}) ->
{AsOut, Value} = case Def of
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index fc95b77b..66436920 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -129,10 +129,10 @@ empty_ram_only_tables() ->
Node = node(),
lists:foreach(
fun (TabName) ->
- case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
- true -> {atomic, ok} = mnesia:clear_table(TabName);
- false -> ok
- end
+ case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
+ true -> {atomic, ok} = mnesia:clear_table(TabName);
+ false -> ok
+ end
end, table_names()),
ok.
@@ -519,13 +519,13 @@ create_local_table_copies(Type) ->
HasDiscOnlyCopies -> disc_only_copies;
true -> ram_copies
end;
-%% unused code - commented out to keep dialyzer happy
-%% Type =:= disc_only ->
-%% if
-%% HasDiscCopies or HasDiscOnlyCopies ->
-%% disc_only_copies;
-%% true -> ram_copies
-%% end;
+%%% unused code - commented out to keep dialyzer happy
+%%% Type =:= disc_only ->
+%%% if
+%%% HasDiscCopies or HasDiscOnlyCopies ->
+%%% disc_only_copies;
+%%% true -> ram_copies
+%%% end;
Type =:= ram ->
ram_copies
end,
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 55e6ac47..b7de27d4 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -27,8 +27,8 @@
-define(WRITE_OK_SIZE_BITS, 8).
-define(WRITE_OK_MARKER, 255).
-define(FILE_PACKING_ADJUSTMENT, (1 + ?INTEGER_SIZE_BYTES)).
--define(GUID_SIZE_BYTES, 16).
--define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)).
+-define(MSG_ID_SIZE_BYTES, 16).
+-define(MSG_ID_SIZE_BITS, (8 * ?MSG_ID_SIZE_BYTES)).
-define(SCAN_BLOCK_SIZE, 4194304). %% 4MB
%%----------------------------------------------------------------------------
@@ -39,43 +39,45 @@
-type(position() :: non_neg_integer()).
-type(msg_size() :: non_neg_integer()).
-type(file_size() :: non_neg_integer()).
+-type(message_accumulator(A) ::
+ fun (({rabbit_types:msg_id(), msg_size(), position(), binary()}, A) ->
+ A)).
--spec(append/3 :: (io_device(), rabbit_guid:guid(), msg()) ->
+-spec(append/3 :: (io_device(), rabbit_types:msg_id(), msg()) ->
rabbit_types:ok_or_error2(msg_size(), any())).
-spec(read/2 :: (io_device(), msg_size()) ->
- rabbit_types:ok_or_error2({rabbit_guid:guid(), msg()},
+ rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()},
any())).
--spec(scan/4 :: (io_device(), file_size(),
- fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A),
- A) -> {'ok', A, position()}).
+-spec(scan/4 :: (io_device(), file_size(), message_accumulator(A), A) ->
+ {'ok', A, position()}).
-endif.
%%----------------------------------------------------------------------------
-append(FileHdl, Guid, MsgBody)
- when is_binary(Guid) andalso size(Guid) =:= ?GUID_SIZE_BYTES ->
+append(FileHdl, MsgId, MsgBody)
+ when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES ->
MsgBodyBin = term_to_binary(MsgBody),
MsgBodyBinSize = size(MsgBodyBin),
- Size = MsgBodyBinSize + ?GUID_SIZE_BYTES,
+ Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
case file_handle_cache:append(FileHdl,
<<Size:?INTEGER_SIZE_BITS,
- Guid:?GUID_SIZE_BYTES/binary,
- MsgBodyBin:MsgBodyBinSize/binary,
- ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
+ MsgId:?MSG_ID_SIZE_BYTES/binary,
+ MsgBodyBin:MsgBodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
KO -> KO
end.
read(FileHdl, TotalSize) ->
Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
- BodyBinSize = Size - ?GUID_SIZE_BYTES,
+ BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
case file_handle_cache:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
- Guid:?GUID_SIZE_BYTES/binary,
- MsgBodyBin:BodyBinSize/binary,
- ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
- {ok, {Guid, binary_to_term(MsgBodyBin)}};
+ MsgId:?MSG_ID_SIZE_BYTES/binary,
+ MsgBodyBin:BodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
+ {ok, {MsgId, binary_to_term(MsgBodyBin)}};
KO -> KO
end.
@@ -97,26 +99,27 @@ scan(FileHdl, FileSize, Data, ReadOffset, ScanOffset, Fun, Acc) ->
end.
scanner(<<>>, Offset, _Fun, Acc) ->
- {<<>>, Acc, Offset};
+ {<<>>, Acc, Offset};
scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) ->
- {<<>>, Acc, Offset}; %% Nothing to do other than stop.
-scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
+ {<<>>, Acc, Offset}; %% Nothing to do other than stop.
+scanner(<<Size:?INTEGER_SIZE_BITS, MsgIdAndMsg:Size/binary,
WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) ->
- TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
- case WriteMarker of
- ?WRITE_OK_MARKER ->
- %% Here we take option 5 from
- %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
- %% which we read the Guid as a number, and then convert it
- %% back to a binary in order to work around bugs in
- %% Erlang's GC.
- <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> =
- <<GuidAndMsg:Size/binary>>,
- <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
- scanner(Rest, Offset + TotalSize, Fun,
- Fun({Guid, TotalSize, Offset, Msg}, Acc));
- _ ->
- scanner(Rest, Offset + TotalSize, Fun, Acc)
- end;
+ TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
+ case WriteMarker of
+ ?WRITE_OK_MARKER ->
+ %% Here we take option 5 from
+ %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
+ %% which we read the MsgId as a number, and then convert it
+ %% back to a binary in order to work around bugs in
+ %% Erlang's GC.
+ <<MsgIdNum:?MSG_ID_SIZE_BITS, Msg/binary>> =
+ <<MsgIdAndMsg:Size/binary>>,
+ <<MsgId:?MSG_ID_SIZE_BYTES/binary>> =
+ <<MsgIdNum:?MSG_ID_SIZE_BITS>>,
+ scanner(Rest, Offset + TotalSize, Fun,
+ Fun({MsgId, TotalSize, Offset, Msg}, Acc));
+ _ ->
+ scanner(Rest, Offset + TotalSize, Fun, Acc)
+ end;
scanner(Data, Offset, _Fun, Acc) ->
- {Data, Acc, Offset}.
+ {Data, Acc, Offset}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 9e65e442..4f5d2411 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -74,8 +74,8 @@
%% to callbacks
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
- cref_to_guids %% client ref to synced messages mapping
- }).
+ cref_to_msg_ids %% client ref to synced messages mapping
+ }).
-record(client_msstate,
{ server,
@@ -89,7 +89,7 @@
file_summary_ets,
dedup_cache_ets,
cur_file_cache_ets
- }).
+ }).
-record(file_summary,
{file, valid_total_size, left, right, file_size, locked, readers}).
@@ -132,30 +132,30 @@
file_summary_ets :: ets:tid(),
dedup_cache_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid()}).
--type(startup_fun_state() ::
- {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
- A}).
--type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())).
+-type(msg_ref_delta_gen(A) ::
+ fun ((A) -> 'finished' |
+ {rabbit_types:msg_id(), non_neg_integer(), A})).
+-type(maybe_msg_id_fun() :: 'undefined' | fun ((gb_set()) -> any())).
-type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')).
-type(deletion_thunk() :: fun (() -> boolean())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
- startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
+ {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error()).
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
--spec(client_init/4 :: (server(), client_ref(), maybe_guid_fun(),
+-spec(client_init/4 :: (server(), client_ref(), maybe_msg_id_fun(),
maybe_close_fds_fun()) -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
--spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok').
--spec(read/2 :: (rabbit_guid:guid(), client_msstate()) ->
- {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
--spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()).
--spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
--spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
--spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) ->
- 'ok').
+-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
+-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
+ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
+-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
+-spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
+-spec(release/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
+-spec(sync/3 ::
+ ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok').
-spec(sync/1 :: (server()) -> 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
@@ -177,8 +177,8 @@
%% The components:
%%
-%% Index: this is a mapping from Guid to #msg_location{}:
-%% {Guid, RefCount, File, Offset, TotalSize}
+%% Index: this is a mapping from MsgId to #msg_location{}:
+%% {MsgId, RefCount, File, Offset, TotalSize}
%% By default, it's in ets, but it's also pluggable.
%% FileSummary: this is an ets table which maps File to #file_summary{}:
%% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers}
@@ -279,7 +279,7 @@
%% alternating full files and files with only one tiny message in
%% them).
%%
-%% Messages are reference-counted. When a message with the same guid
+%% Messages are reference-counted. When a message with the same msg id
%% is written several times we only store it once, and only remove it
%% from the store when it has been removed the same number of times.
%%
@@ -422,29 +422,29 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write(Guid, Msg,
+write(MsgId, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
client_ref = CRef }) ->
- ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- ok = server_cast(CState, {write, CRef, Guid}).
+ ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
+ ok = server_cast(CState, {write, CRef, MsgId}).
-read(Guid,
+read(MsgId,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
%% 1. Check the dedup cache
- case fetch_and_increment_cache(DedupCacheEts, Guid) of
+ case fetch_and_increment_cache(DedupCacheEts, MsgId) of
not_found ->
%% 2. Check the cur file cache
- case ets:lookup(CurFileCacheEts, Guid) of
+ case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
Defer = fun() ->
- {server_call(CState, {read, Guid}), CState}
+ {server_call(CState, {read, MsgId}), CState}
end,
- case index_lookup_positive_ref_count(Guid, CState) of
+ case index_lookup_positive_ref_count(MsgId, CState) of
not_found -> Defer();
MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
- [{Guid, Msg, _CacheRefCount}] ->
+ [{MsgId, Msg, _CacheRefCount}] ->
%% Although we've found it, we don't know the
%% refcount, so can't insert into dedup cache
{{ok, Msg}, CState}
@@ -453,13 +453,13 @@ read(Guid,
{{ok, Msg}, CState}
end.
-contains(Guid, CState) -> server_call(CState, {contains, Guid}).
+contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
-remove(Guids, CState = #client_msstate { client_ref = CRef }) ->
- server_cast(CState, {remove, CRef, Guids}).
+remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
+ server_cast(CState, {remove, CRef, MsgIds}).
release([], _CState) -> ok;
-release(Guids, CState) -> server_cast(CState, {release, Guids}).
-sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}).
+release(MsgIds, CState) -> server_cast(CState, {release, MsgIds}).
+sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}).
sync(Server) ->
gen_server2:cast(Server, sync).
@@ -477,11 +477,11 @@ server_call(#client_msstate { server = Server }, Msg) ->
server_cast(#client_msstate { server = Server }, Msg) ->
gen_server2:cast(Server, Msg).
-client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer,
+client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
[] -> %% File has been GC'd and no longer exists. Go around again.
- read(Guid, CState);
+ read(MsgId, CState);
[#file_summary { locked = Locked, right = Right }] ->
client_read2(Locked, Right, MsgLocation, Defer, CState)
end.
@@ -503,7 +503,7 @@ client_read2(true, _Right, _MsgLocation, Defer, _CState) ->
%% the safest and simplest thing to do.
Defer();
client_read2(false, _Right,
- MsgLocation = #msg_location { guid = Guid, file = File },
+ MsgLocation = #msg_location { msg_id = MsgId, file = File },
Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
%% It's entirely possible that everything we're doing from here on
@@ -512,9 +512,9 @@ client_read2(false, _Right,
safe_ets_update_counter(
FileSummaryEts, File, {#file_summary.readers, +1},
fun (_) -> client_read3(MsgLocation, Defer, CState) end,
- fun () -> read(Guid, CState) end).
+ fun () -> read(MsgId, CState) end).
-client_read3(#msg_location { guid = Guid, file = File }, Defer,
+client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
@@ -539,7 +539,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% too).
case ets:lookup(FileSummaryEts, File) of
[] -> %% GC has deleted our file, just go round again.
- read(Guid, CState);
+ read(MsgId, CState);
[#file_summary { locked = true }] ->
%% If we get a badarg here, then the GC has finished and
%% deleted our file. Try going around again. Otherwise,
@@ -549,8 +549,8 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% GC ends, we +1 readers, msg_store ets:deletes (and
%% unlocks the dest)
try Release(),
- Defer()
- catch error:badarg -> read(Guid, CState)
+ Defer()
+ catch error:badarg -> read(MsgId, CState)
end;
[#file_summary { locked = false }] ->
%% Ok, we're definitely safe to continue - a GC involving
@@ -563,7 +563,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% us doing the lookup and the +1 on the readers. (Same as
%% badarg scenario above, but we don't have a missing file
%% - we just have the /wrong/ file).
- case index_lookup(Guid, CState) of
+ case index_lookup(MsgId, CState) of
#msg_location { file = File } = MsgLocation ->
%% Still the same file.
{ok, CState1} = close_all_indicated(CState),
@@ -589,9 +589,9 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
end
end.
-clear_client(CRef, State = #msstate { cref_to_guids = CTG,
+clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients }) ->
- State #msstate { cref_to_guids = dict:erase(CRef, CTG),
+ State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
dying_clients = sets:del_element(CRef, DyingClients) }.
@@ -666,8 +666,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- cref_to_guids = dict:new()
- },
+ cref_to_msg_ids = dict:new()
+ },
%% If we didn't recover the msg location index then we need to
%% rebuild it now.
@@ -698,7 +698,7 @@ prioritise_call(Msg, _From, _State) ->
case Msg of
successfully_recovered_state -> 7;
{new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
- {read, _Guid} -> 2;
+ {read, _MsgId} -> 2;
_ -> 0
end.
@@ -733,12 +733,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
-handle_call({read, Guid}, From, State) ->
- State1 = read_message(Guid, From, State),
+handle_call({read, MsgId}, From, State) ->
+ State1 = read_message(MsgId, From, State),
noreply(State1);
-handle_call({contains, Guid}, From, State) ->
- State1 = contains_message(Guid, From, State),
+handle_call({contains, MsgId}, From, State) ->
+ State1 = contains_message(MsgId, From, State),
noreply(State1).
handle_cast({client_dying, CRef},
@@ -751,53 +751,53 @@ handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
-handle_cast({write, CRef, Guid},
+handle_cast({write, CRef, MsgId},
State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
- true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
- [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
+ true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
+ [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId),
noreply(
- case write_action(should_mask_action(CRef, Guid, State), Guid, State) of
+ case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
{write, State1} ->
- write_message(CRef, Guid, Msg, State1);
+ write_message(CRef, MsgId, Msg, State1);
{ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
State1;
{ignore, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
State1;
{confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
- record_pending_confirm(CRef, Guid, State1);
+ record_pending_confirm(CRef, MsgId, State1);
{confirm, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(gb_sets:singleton(Guid), written),
- CTG
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(gb_sets:singleton(MsgId), written),
+ CTM
end, CRef, State1)
end);
-handle_cast({remove, CRef, Guids}, State) ->
+handle_cast({remove, CRef, MsgIds}, State) ->
State1 = lists:foldl(
- fun (Guid, State2) -> remove_message(Guid, CRef, State2) end,
- State, Guids),
- noreply(maybe_compact(
- client_confirm(CRef, gb_sets:from_list(Guids), removed, State1)));
+ fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end,
+ State, MsgIds),
+ noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
+ removed, State1)));
-handle_cast({release, Guids}, State =
+handle_cast({release, MsgIds}, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
lists:foreach(
- fun (Guid) -> decrement_cache(DedupCacheEts, Guid) end, Guids),
+ fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds),
noreply(State);
-handle_cast({sync, Guids, K},
+handle_cast({sync, MsgIds, K},
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
on_sync = Syncs }) ->
{ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
- case lists:any(fun (Guid) ->
+ case lists:any(fun (MsgId) ->
#msg_location { file = File, offset = Offset } =
- index_lookup(Guid, State),
+ index_lookup(MsgId, State),
File =:= CurFile andalso Offset >= SyncOffset
- end, Guids) of
+ end, MsgIds) of
false -> K(),
noreply(State);
true -> noreply(State #msstate { on_sync = [K | Syncs] })
@@ -879,16 +879,16 @@ reply(Reply, State) ->
{State1, Timeout} = next_state(State),
{reply, Reply, State1, Timeout}.
-next_state(State = #msstate { sync_timer_ref = undefined,
- on_sync = Syncs,
- cref_to_guids = CTG }) ->
- case {Syncs, dict:size(CTG)} of
+next_state(State = #msstate { sync_timer_ref = undefined,
+ on_sync = Syncs,
+ cref_to_msg_ids = CTM }) ->
+ case {Syncs, dict:size(CTM)} of
{[], 0} -> {State, hibernate};
_ -> {start_sync_timer(State), 0}
end;
-next_state(State = #msstate { on_sync = Syncs,
- cref_to_guids = CTG }) ->
- case {Syncs, dict:size(CTG)} of
+next_state(State = #msstate { on_sync = Syncs,
+ cref_to_msg_ids = CTM }) ->
+ case {Syncs, dict:size(CTM)} of
{[], 0} -> {stop_sync_timer(State), hibernate};
_ -> {State, 0}
end.
@@ -905,66 +905,66 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
internal_sync(State = #msstate { current_file_handle = CurHdl,
on_sync = Syncs,
- cref_to_guids = CTG }) ->
+ cref_to_msg_ids = CTM }) ->
State1 = stop_sync_timer(State),
- CGs = dict:fold(fun (CRef, Guids, NS) ->
- case gb_sets:is_empty(Guids) of
+ CGs = dict:fold(fun (CRef, MsgIds, NS) ->
+ case gb_sets:is_empty(MsgIds) of
true -> NS;
- false -> [{CRef, Guids} | NS]
+ false -> [{CRef, MsgIds} | NS]
end
- end, [], CTG),
+ end, [], CTM),
case {Syncs, CGs} of
{[], []} -> ok;
_ -> file_handle_cache:sync(CurHdl)
end,
[K() || K <- lists:reverse(Syncs)],
- [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
- State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+ [client_confirm(CRef, MsgIds, written, State1) || {CRef, MsgIds} <- CGs],
+ State1 #msstate { cref_to_msg_ids = dict:new(), on_sync = [] }.
-write_action({true, not_found}, _Guid, State) ->
+write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
-write_action({true, #msg_location { file = File }}, _Guid, State) ->
+write_action({true, #msg_location { file = File }}, _MsgId, State) ->
{ignore, File, State};
-write_action({false, not_found}, _Guid, State) ->
+write_action({false, not_found}, _MsgId, State) ->
{write, State};
write_action({Mask, #msg_location { ref_count = 0, file = File,
total_size = TotalSize }},
- Guid, State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ MsgId, State = #msstate { file_summary_ets = FileSummaryEts }) ->
case {Mask, ets:lookup(FileSummaryEts, File)} of
{false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State),
+ ok = index_delete(MsgId, State),
{write, State};
{false_if_increment, [#file_summary { locked = true }]} ->
- %% The msg for Guid is older than the client death
+ %% The msg for MsgId is older than the client death
%% message, but as it is being GC'd currently we'll have
%% to write a new copy, which will then be younger, so
%% ignore this write.
{ignore, File, State};
{_Mask, [#file_summary {}]} ->
- ok = index_update_ref_count(Guid, 1, State),
+ ok = index_update_ref_count(MsgId, 1, State),
State1 = adjust_valid_total_size(File, TotalSize, State),
{confirm, File, State1}
end;
write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
- Guid, State) ->
- ok = index_update_ref_count(Guid, RefCount + 1, State),
+ MsgId, State) ->
+ ok = index_update_ref_count(MsgId, RefCount + 1, State),
%% We already know about it, just update counter. Only update
%% field otherwise bad interaction with concurrent GC
{confirm, File, State}.
-write_message(CRef, Guid, Msg, State) ->
- write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)).
+write_message(CRef, MsgId, Msg, State) ->
+ write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)).
-write_message(Guid, Msg,
+write_message(MsgId, Msg,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
sum_valid_data = SumValid,
sum_file_size = SumFileSize,
file_summary_ets = FileSummaryEts }) ->
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
- {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
+ {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
ok = index_insert(
- #msg_location { guid = Guid, ref_count = 1, file = CurFile,
+ #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize }, State),
[#file_summary { right = undefined, locked = false }] =
ets:lookup(FileSummaryEts, CurFile),
@@ -976,21 +976,21 @@ write_message(Guid, Msg,
sum_valid_data = SumValid + TotalSize,
sum_file_size = SumFileSize + TotalSize }).
-read_message(Guid, From,
+read_message(MsgId, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
- case index_lookup_positive_ref_count(Guid, State) of
+ case index_lookup_positive_ref_count(MsgId, State) of
not_found ->
gen_server2:reply(From, not_found),
State;
MsgLocation ->
- case fetch_and_increment_cache(DedupCacheEts, Guid) of
+ case fetch_and_increment_cache(DedupCacheEts, MsgId) of
not_found -> read_message1(From, MsgLocation, State);
Msg -> gen_server2:reply(From, {ok, Msg}),
State
end
end.
-read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
+read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
file = File, offset = Offset } = MsgLoc,
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
@@ -1000,7 +1000,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
case File =:= CurFile of
true -> {Msg, State1} =
%% can return [] if msg in file existed on startup
- case ets:lookup(CurFileCacheEts, Guid) of
+ case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
{ok, RawOffSet} =
file_handle_cache:current_raw_offset(CurHdl),
@@ -1009,9 +1009,9 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
false -> ok
end,
read_from_disk(MsgLoc, State, DedupCacheEts);
- [{Guid, Msg1, _CacheRefCount}] ->
+ [{MsgId, Msg1, _CacheRefCount}] ->
ok = maybe_insert_into_cache(
- DedupCacheEts, RefCount, Guid, Msg1),
+ DedupCacheEts, RefCount, MsgId, Msg1),
{Msg1, State}
end,
gen_server2:reply(From, {ok, Msg}),
@@ -1019,7 +1019,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
false -> [#file_summary { locked = Locked }] =
ets:lookup(FileSummaryEts, File),
case Locked of
- true -> add_to_pending_gc_completion({read, Guid, From},
+ true -> add_to_pending_gc_completion({read, MsgId, From},
File, State);
false -> {Msg, State1} =
read_from_disk(MsgLoc, State, DedupCacheEts),
@@ -1028,47 +1028,47 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
end
end.
-read_from_disk(#msg_location { guid = Guid, ref_count = RefCount,
+read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
file = File, offset = Offset,
total_size = TotalSize },
State, DedupCacheEts) ->
{Hdl, State1} = get_read_handle(File, State),
{ok, Offset} = file_handle_cache:position(Hdl, Offset),
- {ok, {Guid, Msg}} =
+ {ok, {MsgId, Msg}} =
case rabbit_msg_file:read(Hdl, TotalSize) of
- {ok, {Guid, _}} = Obj ->
+ {ok, {MsgId, _}} = Obj ->
Obj;
Rest ->
{error, {misread, [{old_state, State},
{file_num, File},
{offset, Offset},
- {guid, Guid},
+ {msg_id, MsgId},
{read, Rest},
{proc_dict, get()}
]}}
end,
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
+ ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
{Msg, State1}.
-contains_message(Guid, From,
+contains_message(MsgId, From,
State = #msstate { pending_gc_completion = Pending }) ->
- case index_lookup_positive_ref_count(Guid, State) of
+ case index_lookup_positive_ref_count(MsgId, State) of
not_found ->
gen_server2:reply(From, false),
State;
#msg_location { file = File } ->
case orddict:is_key(File, Pending) of
true -> add_to_pending_gc_completion(
- {contains, Guid, From}, File, State);
+ {contains, MsgId, From}, File, State);
false -> gen_server2:reply(From, true),
State
end
end.
-remove_message(Guid, CRef,
+remove_message(MsgId, CRef,
State = #msstate { file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts }) ->
- case should_mask_action(CRef, Guid, State) of
+ case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
{false_if_increment, #msg_location { ref_count = 0 }} ->
@@ -1081,24 +1081,25 @@ remove_message(Guid, CRef,
total_size = TotalSize }} when RefCount > 0 ->
%% only update field, otherwise bad interaction with
%% concurrent GC
- Dec =
- fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
+ Dec = fun () ->
+ index_update_ref_count(MsgId, RefCount - 1, State)
+ end,
case RefCount of
%% don't remove from CUR_FILE_CACHE_ETS_NAME here
%% because there may be further writes in the mailbox
%% for the same msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
+ 1 -> ok = remove_cache_entry(DedupCacheEts, MsgId),
case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true }] ->
add_to_pending_gc_completion(
- {remove, Guid, CRef}, File, State);
+ {remove, MsgId, CRef}, File, State);
[#file_summary {}] ->
ok = Dec(),
delete_file_if_empty(
File, adjust_valid_total_size(File, -TotalSize,
State))
end;
- _ -> ok = decrement_cache(DedupCacheEts, Guid),
+ _ -> ok = decrement_cache(DedupCacheEts, MsgId),
ok = Dec(),
State
end
@@ -1119,12 +1120,12 @@ run_pending(Files, State) ->
lists:reverse(orddict:fetch(File, Pending)))
end, State, Files).
-run_pending_action({read, Guid, From}, State) ->
- read_message(Guid, From, State);
-run_pending_action({contains, Guid, From}, State) ->
- contains_message(Guid, From, State);
-run_pending_action({remove, Guid, CRef}, State) ->
- remove_message(Guid, CRef, State).
+run_pending_action({read, MsgId, From}, State) ->
+ read_message(MsgId, From, State);
+run_pending_action({contains, MsgId, From}, State) ->
+ contains_message(MsgId, From, State);
+run_pending_action({remove, MsgId, CRef}, State) ->
+ remove_message(MsgId, CRef, State).
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
try
@@ -1146,44 +1147,46 @@ orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
-update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients,
- cref_to_guids = CTG }) ->
+update_pending_confirms(Fun, CRef,
+ State = #msstate { clients = Clients,
+ cref_to_msg_ids = CTM }) ->
case dict:fetch(CRef, Clients) of
{undefined, _CloseFDsFun} -> State;
- {MsgOnDiskFun, _CloseFDsFun} -> CTG1 = Fun(MsgOnDiskFun, CTG),
- State #msstate { cref_to_guids = CTG1 }
+ {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
+ State #msstate {
+ cref_to_msg_ids = CTM1 }
end.
-record_pending_confirm(CRef, Guid, State) ->
+record_pending_confirm(CRef, MsgId, State) ->
update_pending_confirms(
- fun (_MsgOnDiskFun, CTG) ->
- dict:update(CRef, fun (Guids) -> gb_sets:add(Guid, Guids) end,
- gb_sets:singleton(Guid), CTG)
+ fun (_MsgOnDiskFun, CTM) ->
+ dict:update(CRef, fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
+ gb_sets:singleton(MsgId), CTM)
end, CRef, State).
-client_confirm(CRef, Guids, ActionTaken, State) ->
+client_confirm(CRef, MsgIds, ActionTaken, State) ->
update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(Guids, ActionTaken),
- case dict:find(CRef, CTG) of
- {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids),
- case gb_sets:is_empty(Guids1) of
- true -> dict:erase(CRef, CTG);
- false -> dict:store(CRef, Guids1, CTG)
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(MsgIds, ActionTaken),
+ case dict:find(CRef, CTM) of
+ {ok, Gs} -> MsgIds1 = gb_sets:difference(Gs, MsgIds),
+ case gb_sets:is_empty(MsgIds1) of
+ true -> dict:erase(CRef, CTM);
+ false -> dict:store(CRef, MsgIds1, CTM)
end;
- error -> CTG
+ error -> CTM
end
end, CRef, State).
-%% Detect whether the Guid is older or younger than the client's death
+%% Detect whether the MsgId is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
%% msg, and it has a 0 ref_count we must only alter the ref_count, not
%% rewrite the msg - rewriting it would make it younger than the death
%% msg and thus should be ignored. Note that this (correctly) returns
%% false when testing to remove the death msg itself.
-should_mask_action(CRef, Guid,
+should_mask_action(CRef, MsgId,
State = #msstate { dying_clients = DyingClients }) ->
- case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of
+ case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
{false, Location} ->
{false, Location};
{true, not_found} ->
@@ -1256,7 +1259,7 @@ safe_file_delete(File, Dir, FileHandlesEts) ->
close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
client_ref = Ref } =
- CState) ->
+ CState) ->
Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}),
{ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) ->
true = ets:delete(FileHandlesEts, Key),
@@ -1320,43 +1323,43 @@ list_sorted_file_names(Dir, Ext) ->
%% message cache helper functions
%%----------------------------------------------------------------------------
-maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg)
+maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg)
when RefCount > 1 ->
- update_msg_cache(DedupCacheEts, Guid, Msg);
-maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) ->
+ update_msg_cache(DedupCacheEts, MsgId, Msg);
+maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) ->
ok.
-update_msg_cache(CacheEts, Guid, Msg) ->
- case ets:insert_new(CacheEts, {Guid, Msg, 1}) of
+update_msg_cache(CacheEts, MsgId, Msg) ->
+ case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
true -> ok;
false -> safe_ets_update_counter_ok(
- CacheEts, Guid, {3, +1},
- fun () -> update_msg_cache(CacheEts, Guid, Msg) end)
+ CacheEts, MsgId, {3, +1},
+ fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
end.
-remove_cache_entry(DedupCacheEts, Guid) ->
- true = ets:delete(DedupCacheEts, Guid),
+remove_cache_entry(DedupCacheEts, MsgId) ->
+ true = ets:delete(DedupCacheEts, MsgId),
ok.
-fetch_and_increment_cache(DedupCacheEts, Guid) ->
- case ets:lookup(DedupCacheEts, Guid) of
+fetch_and_increment_cache(DedupCacheEts, MsgId) ->
+ case ets:lookup(DedupCacheEts, MsgId) of
[] ->
not_found;
- [{_Guid, Msg, _RefCount}] ->
+ [{_MsgId, Msg, _RefCount}] ->
safe_ets_update_counter_ok(
- DedupCacheEts, Guid, {3, +1},
+ DedupCacheEts, MsgId, {3, +1},
%% someone has deleted us in the meantime, insert us
- fun () -> ok = update_msg_cache(DedupCacheEts, Guid, Msg) end),
+ fun () -> ok = update_msg_cache(DedupCacheEts, MsgId, Msg) end),
Msg
end.
-decrement_cache(DedupCacheEts, Guid) ->
+decrement_cache(DedupCacheEts, MsgId) ->
true = safe_ets_update_counter(
- DedupCacheEts, Guid, {3, -1},
- fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, Guid);
+ DedupCacheEts, MsgId, {3, -1},
+ fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId);
(_N) -> true
end,
- %% Guid is not in there because although it's been
+ %% MsgId is not in there because although it's been
%% delivered, it's never actually been read (think:
%% persistent message held in RAM)
fun () -> true end),
@@ -1465,7 +1468,7 @@ recover_file_summary(true, Dir) ->
Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
case ets:file2tab(Path) of
{ok, Tid} -> file:delete(Path),
- {true, Tid};
+ {true, Tid};
{error, _Error} -> recover_file_summary(false, Dir)
end.
@@ -1473,19 +1476,19 @@ count_msg_refs(Gen, Seed, State) ->
case Gen(Seed) of
finished ->
ok;
- {_Guid, 0, Next} ->
+ {_MsgId, 0, Next} ->
count_msg_refs(Gen, Next, State);
- {Guid, Delta, Next} ->
- ok = case index_lookup(Guid, State) of
+ {MsgId, Delta, Next} ->
+ ok = case index_lookup(MsgId, State) of
not_found ->
- index_insert(#msg_location { guid = Guid,
+ index_insert(#msg_location { msg_id = MsgId,
file = undefined,
ref_count = Delta },
State);
#msg_location { ref_count = RefCount } = StoreEntry ->
NewRefCount = RefCount + Delta,
case NewRefCount of
- 0 -> index_delete(Guid, State);
+ 0 -> index_delete(MsgId, State);
_ -> index_update(StoreEntry #msg_location {
ref_count = NewRefCount },
State)
@@ -1530,7 +1533,7 @@ scan_file_for_valid_messages(Dir, FileName) ->
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
Hdl, filelib:file_size(
form_filename(Dir, FileName)),
- fun scan_fun/2, []),
+ fun scan_fun/2, []),
%% if something really bad has happened,
%% the close could fail, but ignore
file_handle_cache:close(Hdl),
@@ -1539,8 +1542,8 @@ scan_file_for_valid_messages(Dir, FileName) ->
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
-scan_fun({Guid, TotalSize, Offset, _Msg}, Acc) ->
- [{Guid, TotalSize, Offset} | Acc].
+scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
+ [{MsgId, TotalSize, Offset} | Acc].
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
@@ -1619,8 +1622,8 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize} =
lists:foldl(
- fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case index_lookup(Guid, State) of
+ fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ case index_lookup(MsgId, State) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(StoreEntry #msg_location {
file = File, offset = Offset,
@@ -1638,7 +1641,7 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
%% file size.
[] -> {undefined, case ValidMessages of
[] -> 0;
- _ -> {_Guid, TotalSize, Offset} =
+ _ -> {_MsgId, TotalSize, Offset} =
lists:last(ValidMessages),
Offset + TotalSize
end};
@@ -1693,8 +1696,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
pending_gc_completion = Pending,
file_summary_ets = FileSummaryEts,
file_size_limit = FileSizeLimit })
- when (SumFileSize > 2 * FileSizeLimit andalso
- (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) ->
+ when SumFileSize > 2 * FileSizeLimit andalso
+ (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
%% TODO: the algorithm here is sub-optimal - it may result in a
%% complete traversal of FileSummaryEts.
case ets:first(FileSummaryEts) of
@@ -1757,10 +1760,10 @@ delete_file_if_empty(File, State = #msstate {
locked = false }] =
ets:lookup(FileSummaryEts, File),
case ValidData of
- 0 -> %% don't delete the file_summary_ets entry for File here
- %% because we could have readers which need to be able to
- %% decrement the readers count.
- true = ets:update_element(FileSummaryEts, File,
+ %% don't delete the file_summary_ets entry for File here
+ %% because we could have readers which need to be able to
+ %% decrement the readers count.
+ 0 -> true = ets:update_element(FileSummaryEts, File,
{#file_summary.locked, true}),
ok = rabbit_msg_store_gc:delete(GCPid, File),
Pending1 = orddict_store(File, [], Pending),
@@ -1813,17 +1816,17 @@ combine_files(Source, Destination,
dir = Dir,
msg_store = Server }) ->
[#file_summary {
- readers = 0,
- left = Destination,
- valid_total_size = SourceValid,
- file_size = SourceFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Source),
+ readers = 0,
+ left = Destination,
+ valid_total_size = SourceValid,
+ file_size = SourceFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Source),
[#file_summary {
- readers = 0,
- right = Source,
- valid_total_size = DestinationValid,
- file_size = DestinationFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Destination),
+ readers = 0,
+ right = Source,
+ valid_total_size = DestinationValid,
+ file_size = DestinationFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Destination),
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
@@ -1903,8 +1906,8 @@ load_and_vacuum_message_file(File, #gc_state { dir = Dir,
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
- fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
- case Index:lookup(Guid, IndexState) of
+ fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
+ case Index:lookup(MsgId, IndexState) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
ok = Index:delete_object(Entry, IndexState),
@@ -1929,13 +1932,13 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
end,
case
lists:foldl(
- fun (#msg_location { guid = Guid, offset = Offset,
+ fun (#msg_location { msg_id = MsgId, offset = Offset,
total_size = TotalSize },
{CurOffset, Block = {BlockStart, BlockEnd}}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
%% update MsgLocation to reflect change of file and offset
- ok = Index:update_fields(Guid,
+ ok = Index:update_fields(MsgId,
[{#msg_location.file, Destination},
{#msg_location.offset, CurOffset}],
IndexState),
@@ -2001,12 +2004,12 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
?HANDLE_CACHE_BUFFER_SIZE}]),
{ok, _Acc, _IgnoreSize} =
rabbit_msg_file:scan(
- RefOld, filelib:file_size(FileOld),
- fun({Guid, _Size, _Offset, BinMsg}, ok) ->
- {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
- {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew),
- ok
- end, ok),
+ RefOld, filelib:file_size(FileOld),
+ fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
+ {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
+ {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
+ ok
+ end, ok),
file_handle_cache:close(RefOld),
file_handle_cache:close(RefNew),
ok.
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index 077400d6..d6dc5568 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -31,7 +31,7 @@
new(Dir) ->
file:delete(filename:join(Dir, ?FILENAME)),
- Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.guid}]),
+ Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]),
#state { table = Tid, dir = Dir }.
recover(Dir) ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 817abaa2..1f30a2fc 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -69,6 +69,7 @@ handle_call(_Request, _From, State) ->
handle_cast({rabbit_running_on, Node}, State) ->
rabbit_log:info("node ~p up~n", [Node]),
erlang:monitor(process, {rabbit, Node}),
+ ok = rabbit_alarm:on_node_up(Node),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -76,7 +77,7 @@ handle_cast(_Msg, State) ->
handle_info({nodedown, Node}, State) ->
rabbit_log:info("node ~p down~n", [Node]),
ok = handle_dead_rabbit(Node),
- {noreply, State};
+ {noreply, State};
handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
ok = handle_dead_rabbit(Node),
@@ -92,10 +93,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-%% TODO: This may turn out to be a performance hog when there are
-%% lots of nodes. We really only need to execute this code on
-%% *one* node, rather than all of them.
+%% TODO: This may turn out to be a performance hog when there are lots
+%% of nodes. We really only need to execute some of these statements
+%% on *one* node, rather than all of them.
handle_dead_rabbit(Node) ->
ok = rabbit_networking:on_node_down(Node),
- ok = rabbit_amqqueue:on_node_down(Node).
-
+ ok = rabbit_amqqueue:on_node_down(Node),
+ ok = rabbit_alarm:on_node_down(Node).
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index d9d92788..7bb8c0ea 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -250,13 +250,13 @@ duplicate_node_check(NodeStr) ->
case net_adm:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
- true -> io:format("node with name ~p "
- "already running on ~p~n",
- [NodeName, NodeHost]),
- [io:format(Fmt ++ "~n", Args) ||
- {Fmt, Args} <- rabbit_control:diagnostics(Node)],
- terminate(?ERROR_CODE);
- false -> ok
+ true -> io:format("node with name ~p "
+ "already running on ~p~n",
+ [NodeName, NodeHost]),
+ [io:format(Fmt ++ "~n", Args) ||
+ {Fmt, Args} <- rabbit_control:diagnostics(Node)],
+ terminate(?ERROR_CODE);
+ false -> ok
end;
{error, EpmdReason} -> terminate("unexpected epmd error: ~p~n",
[EpmdReason])
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76b1136f..8227e4cd 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -86,7 +86,7 @@
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}),
+%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}),
%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly
%% necessary for most operations. However, for startup, and to ensure
%% the safe and correct combination of journal entries with entries
@@ -138,19 +138,19 @@
-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)).
-define(NO_EXPIRY, 0).
--define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
--define(GUID_BITS, (?GUID_BYTES * 8)).
+-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
+-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix
--define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2).
+-define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + ?EXPIRY_BYTES + 2).
%% 1 publish, 1 deliver, 1 ack per msg
-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUBLISH_RECORD_LENGTH_BYTES +
- (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))).
+ (?PUBLISH_RECORD_LENGTH_BYTES +
+ (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))).
%% ---- misc ----
--define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent}
+-define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent}
-define(READ_MODE, [binary, raw, read]).
-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
@@ -159,7 +159,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unsynced_guids }).
+ max_journal_entries, on_sync, unsynced_msg_ids }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -177,7 +177,7 @@
path :: file:filename(),
journal_entries :: array(),
unacked :: non_neg_integer()
- })).
+ })).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict(), [segment()]}).
-type(on_sync_fun() :: fun ((gb_set()) -> ok)).
@@ -187,21 +187,21 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unsynced_guids :: [rabbit_guid:guid()]
- }).
--type(startup_fun_state() ::
- {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
- A}).
+ unsynced_msg_ids :: [rabbit_types:msg_id()]
+ }).
+-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
+-type(walker(A) :: fun ((A) -> 'finished' |
+ {rabbit_types:msg_id(), non_neg_integer(), A})).
-type(shutdown_terms() :: [any()]).
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
- fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) ->
- {'undefined' | non_neg_integer(), qistate()}).
+ contains_predicate(), on_sync_fun()) ->
+ {'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
--spec(publish/5 :: (rabbit_guid:guid(), seq_id(),
+-spec(publish/5 :: (rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(), boolean(), qistate())
-> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
@@ -209,14 +209,13 @@
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
- {[{rabbit_guid:guid(), seq_id(),
+ {[{rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(),
boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
- {non_neg_integer(), non_neg_integer(), qistate()}).
--spec(recover/1 :: ([rabbit_amqqueue:name()]) ->
- {[[any()]], startup_fun_state()}).
+ {non_neg_integer(), non_neg_integer(), qistate()}).
+-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}).
-spec(add_queue_ttl/0 :: () -> 'ok').
@@ -259,22 +258,22 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, MsgProps, IsPersistent,
- State = #qistate { unsynced_guids = UnsyncedGuids })
- when is_binary(Guid) ->
- ?GUID_BYTES = size(Guid),
+publish(MsgId, SeqId, MsgProps, IsPersistent,
+ State = #qistate { unsynced_msg_ids = UnsyncedMsgIds })
+ when is_binary(MsgId) ->
+ ?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} = get_journal_handle(
State #qistate {
- unsynced_guids = [Guid | UnsyncedGuids] }),
+ unsynced_msg_ids = [MsgId | UnsyncedMsgIds] }),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)]),
+ create_pub_record_body(MsgId, MsgProps)]),
maybe_flush_journal(
- add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)).
+ add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -284,8 +283,8 @@ ack(SeqIds, State) ->
%% This is only called when there are outstanding confirms and the
%% queue is idle.
-sync(State = #qistate { unsynced_guids = Guids }) ->
- sync_if([] =/= Guids, State).
+sync(State = #qistate { unsynced_msg_ids = MsgIds }) ->
+ sync_if([] =/= MsgIds, State).
sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack in
@@ -388,7 +387,7 @@ blank_state(QueueName) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unsynced_guids = [] }.
+ unsynced_msg_ids = [] }.
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -470,8 +469,9 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) ->
- recover_message(ContainsCheckFun(Guid), CleanShutdown,
+ fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack},
+ Segment1) ->
+ recover_message(ContainsCheckFun(MsgId), CleanShutdown,
Del, RelSeq, Segment1)
end,
Segment #segment { unacked = UnackedCount + UnackedCountDelta },
@@ -515,17 +515,17 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
ok = gatherer:stop(Gatherer),
ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
finished;
- {value, {Guid, Count}} ->
- {Guid, Count, {next, Gatherer}}
+ {value, {MsgId, Count}} ->
+ {MsgId, Count, {next, Gatherer}}
end.
queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
- fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
+ fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack},
ok) ->
- gatherer:in(Gatherer, {Guid, 1});
+ gatherer:in(Gatherer, {MsgId, 1});
(_RelSeq, _Value, Acc) ->
Acc
end, ok, segment_find_or_new(Seg, Dir, Segments)) ||
@@ -537,24 +537,24 @@ queue_index_walker_reader(QueueName, Gatherer) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(Guid, #message_properties{expiry = Expiry}) ->
- [Guid, expiry_to_binary(Expiry)].
+create_pub_record_body(MsgId, #message_properties{expiry = Expiry}) ->
+ [MsgId, expiry_to_binary(Expiry)].
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
read_pub_record_body(Hdl) ->
- case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of
+ case file_handle_cache:read(Hdl, ?MSG_ID_BYTES + ?EXPIRY_BYTES) of
{ok, Bin} ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
- <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
- <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>,
+ <<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
+ <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
Exp = case Expiry of
?NO_EXPIRY -> undefined;
X -> X
end,
- {Guid, #message_properties{expiry = Exp}};
+ {MsgId, #message_properties{expiry = Exp}};
Error ->
Error
end.
@@ -666,8 +666,8 @@ recover_journal(State) ->
journal_minus_segment(JEntries, SegEntries),
Segment #segment { journal_entries = JEntries1,
unacked = (UnackedCountInJournal +
- UnackedCountInSeg -
- UnackedCountDuplicates) }
+ UnackedCountInSeg -
+ UnackedCountDuplicates) }
end, Segments),
State1 #qistate { segments = Segments1 }.
@@ -681,8 +681,8 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
case read_pub_record_body(Hdl) of
- {Guid, MsgProps} ->
- Publish = {Guid, MsgProps,
+ {MsgId, MsgProps} ->
+ Publish = {MsgId, MsgProps,
case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
@@ -716,9 +716,9 @@ sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
ok = file_handle_cache:sync(JournalHdl),
notify_sync(State).
-notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
+notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) ->
OnSyncFun(gb_sets:from_list(UG)),
- State #qistate { unsynced_guids = [] }.
+ State #qistate { unsynced_msg_ids = [] }.
%%----------------------------------------------------------------------------
%% segment manipulation
@@ -796,19 +796,19 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {Guid, MsgProps, IsPersistent} ->
+ {MsgId, MsgProps, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)])
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS>>,
+ create_pub_record_body(MsgId, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
ok;
_ ->
Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
+ RelSeq:?REL_SEQ_BITS>>,
file_handle_cache:append(
Hdl, case {Del, Ack} of
{del, ack} -> [Binary, Binary];
@@ -821,10 +821,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
IsPersistent, IsDelivered == del} | Acc ];
(_RelSeq, _Value, Acc) ->
Acc
@@ -853,14 +853,14 @@ load_segment(KeepAcked, #segment { path = Path }) ->
load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
- {Guid, MsgProps} = read_pub_record_body(Hdl),
- Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
+ IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
+ {MsgId, MsgProps} = read_pub_record_body(Hdl),
+ Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
UnackedCount + 1);
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>} ->
+ RelSeq:?REL_SEQ_BITS>>} ->
{UnackedCountDelta, SegEntries1} =
case array:get(RelSeq, SegEntries) of
{Pub, no_del, no_ack} ->
@@ -1002,17 +1002,17 @@ add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
Rest/binary>>) ->
{<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
- Guid:?GUID_BYTES/binary, Rest/binary>>) ->
- {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid,
+ MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) ->
+ {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId,
expiry_to_binary(undefined)], Rest};
add_queue_ttl_journal(_) ->
stop.
add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
- RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BYTES/binary,
Rest/binary>>) ->
{[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
- RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest};
+ RelSeq:?REL_SEQ_BITS>>, MsgId, expiry_to_binary(undefined)], Rest};
add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS, Rest>>) ->
{<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b172db56..5afe5560 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -35,9 +35,8 @@
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
--define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation
-%---------------------------------------------------------------------------
+%%--------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer,
@@ -62,7 +61,7 @@
State#v1.connection_state =:= blocking orelse
State#v1.connection_state =:= blocked)).
-%%----------------------------------------------------------------------------
+%%--------------------------------------------------------------------------
-ifdef(use_specs).
@@ -165,7 +164,8 @@ server_properties(Protocol) ->
server_capabilities(rabbit_framing_amqp_0_9_1) ->
[{<<"publisher_confirms">>, bool, true},
{<<"exchange_exchange_bindings">>, bool, true},
- {<<"basic.nack">>, bool, true}];
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, true}];
server_capabilities(_) ->
[].
@@ -592,14 +592,14 @@ handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
HandleException =
fun(R) ->
- case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, R);
- %% We don't trust the client at this point - force
- %% them to wait for a bit so they can't DOS us with
- %% repeated failed logins etc.
- false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state, R})
- end
+ case ?IS_RUNNING(State) of
+ true -> send_exception(State, 0, R);
+ %% We don't trust the client at this point - force
+ %% them to wait for a bit so they can't DOS us with
+ %% repeated failed logins etc.
+ false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, State#v1.connection_state, R})
+ end
end,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
@@ -641,14 +641,15 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
connection = Connection,
sock = Sock,
start_heartbeat_fun = SHF}) ->
- if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
+ ServerFrameMax = server_frame_max(),
+ if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
rabbit_misc:protocol_error(
not_allowed, "frame_max=~w < ~w min size",
[FrameMax, ?FRAME_MIN_SIZE]);
- (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) ->
+ ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax ->
rabbit_misc:protocol_error(
not_allowed, "frame_max=~w > ~w max size",
- [FrameMax, ?FRAME_MAX]);
+ [FrameMax, ServerFrameMax]);
true ->
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
@@ -706,6 +707,12 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
+%% Compute frame_max for this instance. Could simply use 0, but breaks
+%% QPid Java client.
+server_frame_max() ->
+ {ok, FrameMax} = application:get_env(rabbit, frame_max),
+ FrameMax.
+
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -734,8 +741,7 @@ auth_mechanisms(Sock) ->
auth_mechanisms_binary(Sock) ->
list_to_binary(
- string:join(
- [atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
+ string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
auth_phase(Response,
State = #v1{auth_mechanism = AuthMechanism,
@@ -757,7 +763,7 @@ auth_phase(Response,
State#v1{auth_state = AuthState1};
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
- frame_max = ?FRAME_MAX,
+ frame_max = server_frame_max(),
heartbeat = 0},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 53e707f4..f6a1c92f 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -59,7 +59,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
{routed, QPids};
deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
+ immediate = Immediate}) ->
QPids = lookup_qpids(QNames),
{Success, _} =
delegate:invoke(QPids,
@@ -67,7 +67,7 @@ deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
rabbit_amqqueue:deliver(Pid, Delivery)
end),
{Routed, Handled} =
- lists:foldl(fun fold_deliveries/2, {false, []}, Success),
+ lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Mandatory, Immediate, {Routed, Handled}).
@@ -91,7 +91,7 @@ match_routing_key(SrcName, [RoutingKey]) ->
mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]);
match_routing_key(SrcName, [_|_] = RoutingKeys) ->
Condition = list_to_tuple(['orelse' | [{'=:=', '$2', RKey} ||
- RKey <- RoutingKeys]]),
+ RKey <- RoutingKeys]]),
MatchHead = #route{binding = #binding{source = SrcName,
destination = '$1',
key = '$2',
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index e831ee51..1953b6b8 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -87,8 +87,8 @@ cert_info(F, Cert) ->
find_by_type(Type, {rdnSequence, RDNs}) ->
case [V || #'AttributeTypeAndValue'{type = T, value = V}
- <- lists:flatten(RDNs),
- T == Type] of
+ <- lists:flatten(RDNs),
+ T == Type] of
[{printableString, S}] -> S;
[] -> not_found
end.
@@ -166,7 +166,7 @@ format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
true -> S
end;
format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
- Min1, Min2, S1, S2, $Z]}) ->
+ Min1, Min2, S1, S2, $Z]}) ->
io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
[Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
format_asn1_value(V) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 0c6250df..9547cae5 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -57,6 +57,7 @@ all_tests() ->
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
+ passed = test_confirms(),
passed = maybe_run_cluster_dependent_tests(),
passed = test_configurable_server_properties(),
passed.
@@ -425,35 +426,35 @@ test_content_properties() ->
[{<<"one">>, signedint, 1},
{<<"two">>, signedint, 2}]}]}],
<<
- % property-flags
- 16#8000:16,
+ %% property-flags
+ 16#8000:16,
- % property-list:
+ %% property-list:
- % table
- 117:32, % table length in bytes
+ %% table
+ 117:32, % table length in bytes
- 11,"a signedint", % name
- "I",12345678:32, % type and value
+ 11,"a signedint", % name
+ "I",12345678:32, % type and value
- 9,"a longstr",
- "S",10:32,"yes please",
+ 9,"a longstr",
+ "S",10:32,"yes please",
- 9,"a decimal",
- "D",123,12345678:32,
+ 9,"a decimal",
+ "D",123,12345678:32,
- 11,"a timestamp",
- "T", 123456789012345:64,
+ 11,"a timestamp",
+ "T", 123456789012345:64,
- 14,"a nested table",
- "F",
- 18:32,
+ 14,"a nested table",
+ "F",
+ 18:32,
- 3,"one",
- "I",1:32,
+ 3,"one",
+ "I",1:32,
- 3,"two",
- "I",2:32 >>),
+ 3,"two",
+ "I",2:32 >>),
case catch rabbit_binary_parser:parse_properties([bit, bit, bit, bit], <<16#A0,0,1>>) of
{'EXIT', content_properties_binary_overflow} -> passed;
V -> exit({got_success_but_expected_failure, V})
@@ -480,28 +481,28 @@ test_field_values() ->
]}],
<<
- % property-flags
- 16#8000:16,
- % table length in bytes
- 228:32,
-
- 7,"longstr", "S", 21:32, "Here is a long string", % = 34
- 9,"signedint", "I", 12345:32/signed, % + 15 = 49
- 7,"decimal", "D", 3, 123456:32, % + 14 = 63
- 9,"timestamp", "T", 109876543209876:64, % + 19 = 82
- 5,"table", "F", 31:32, % length of table % + 11 = 93
- 3,"one", "I", 54321:32, % + 9 = 102
- 3,"two", "S", 13:32, "A long string",% + 22 = 124
- 4,"byte", "b", 255:8, % + 7 = 131
- 4,"long", "l", 1234567890:64, % + 14 = 145
- 5,"short", "s", 655:16, % + 9 = 154
- 4,"bool", "t", 1, % + 7 = 161
- 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
- 4,"void", "V", % + 6 = 194
- 5,"array", "A", 23:32, % + 11 = 205
- "I", 54321:32, % + 5 = 210
- "S", 13:32, "A long string" % + 18 = 228
- >>),
+ %% property-flags
+ 16#8000:16,
+ %% table length in bytes
+ 228:32,
+
+ 7,"longstr", "S", 21:32, "Here is a long string", % = 34
+ 9,"signedint", "I", 12345:32/signed, % + 15 = 49
+ 7,"decimal", "D", 3, 123456:32, % + 14 = 63
+ 9,"timestamp", "T", 109876543209876:64, % + 19 = 82
+ 5,"table", "F", 31:32, % length of table % + 11 = 93
+ 3,"one", "I", 54321:32, % + 9 = 102
+ 3,"two", "S", 13:32, "A long string", % + 22 = 124
+ 4,"byte", "b", 255:8, % + 7 = 131
+ 4,"long", "l", 1234567890:64, % + 14 = 145
+ 5,"short", "s", 655:16, % + 9 = 154
+ 4,"bool", "t", 1, % + 7 = 161
+ 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
+ 4,"void", "V", % + 6 = 194
+ 5,"array", "A", 23:32, % + 11 = 205
+ "I", 54321:32, % + 5 = 210
+ "S", 13:32, "A long string" % + 18 = 228
+ >>),
passed.
%% Test that content frames don't exceed frame-max
@@ -598,65 +599,65 @@ test_topic_matching() ->
%% add some bindings
Bindings = lists:map(
- fun ({Key, Q}) ->
- #binding{source = XName,
- key = list_to_binary(Key),
- destination = #resource{virtual_host = <<"/">>,
- kind = queue,
- name = list_to_binary(Q)}}
- end, [{"a.b.c", "t1"},
- {"a.*.c", "t2"},
- {"a.#.b", "t3"},
- {"a.b.b.c", "t4"},
- {"#", "t5"},
- {"#.#", "t6"},
- {"#.b", "t7"},
- {"*.*", "t8"},
- {"a.*", "t9"},
- {"*.b.c", "t10"},
- {"a.#", "t11"},
- {"a.#.#", "t12"},
- {"b.b.c", "t13"},
- {"a.b.b", "t14"},
- {"a.b", "t15"},
- {"b.c", "t16"},
- {"", "t17"},
- {"*.*.*", "t18"},
- {"vodka.martini", "t19"},
- {"a.b.c", "t20"},
- {"*.#", "t21"},
- {"#.*.#", "t22"},
- {"*.#.#", "t23"},
- {"#.#.#", "t24"},
- {"*", "t25"},
- {"#.b.#", "t26"}]),
+ fun ({Key, Q}) ->
+ #binding{source = XName,
+ key = list_to_binary(Key),
+ destination = #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}}
+ end, [{"a.b.c", "t1"},
+ {"a.*.c", "t2"},
+ {"a.#.b", "t3"},
+ {"a.b.b.c", "t4"},
+ {"#", "t5"},
+ {"#.#", "t6"},
+ {"#.b", "t7"},
+ {"*.*", "t8"},
+ {"a.*", "t9"},
+ {"*.b.c", "t10"},
+ {"a.#", "t11"},
+ {"a.#.#", "t12"},
+ {"b.b.c", "t13"},
+ {"a.b.b", "t14"},
+ {"a.b", "t15"},
+ {"b.c", "t16"},
+ {"", "t17"},
+ {"*.*.*", "t18"},
+ {"vodka.martini", "t19"},
+ {"a.b.c", "t20"},
+ {"*.#", "t21"},
+ {"#.*.#", "t22"},
+ {"*.#.#", "t23"},
+ {"#.#.#", "t24"},
+ {"*", "t25"},
+ {"#.b.#", "t26"}]),
lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end,
Bindings),
%% test some matches
- test_topic_expect_match(X,
- [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12",
- "t18", "t20", "t21", "t22", "t23", "t24",
- "t26"]},
- {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11",
- "t12", "t15", "t21", "t22", "t23", "t24",
- "t26"]},
- {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14",
- "t18", "t21", "t22", "t23", "t24", "t26"]},
- {"", ["t5", "t6", "t17", "t24"]},
- {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24",
- "t26"]},
- {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23",
- "t24"]},
- {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23",
- "t24"]},
- {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23",
- "t24"]},
- {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22",
- "t23", "t24", "t26"]},
- {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]},
- {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24",
- "t25"]}]),
+ test_topic_expect_match(
+ X, [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12",
+ "t18", "t20", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11",
+ "t12", "t15", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14",
+ "t18", "t21", "t22", "t23", "t24", "t26"]},
+ {"", ["t5", "t6", "t17", "t24"]},
+ {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23",
+ "t24", "t26"]},
+ {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22",
+ "t23", "t24"]},
+ {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23",
+ "t24"]},
+ {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23",
+ "t24"]},
+ {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21",
+ "t22", "t23", "t24", "t26"]},
+ {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]},
+ {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24",
+ "t25"]}]),
%% remove some bindings
RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings),
@@ -669,21 +670,21 @@ test_topic_matching() ->
%% test some matches
test_topic_expect_match(X,
- [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22",
- "t23", "t24", "t26"]},
- {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15",
- "t22", "t23", "t24", "t26"]},
- {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22",
- "t23", "t24", "t26"]},
- {"", ["t6", "t17", "t24"]},
- {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
- {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
- {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
- {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
- {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
- "t24", "t26"]},
- {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
- {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
+ [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22",
+ "t23", "t24", "t26"]},
+ {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15",
+ "t22", "t23", "t24", "t26"]},
+ {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22",
+ "t23", "t24", "t26"]},
+ {"", ["t6", "t17", "t24"]},
+ {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
+ {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
+ {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
+ {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
+ {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
+ "t24", "t26"]},
+ {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
+ {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
%% remove the entire exchange
exchange_op_callback(X, delete, [RemainingBindings]),
@@ -693,23 +694,23 @@ test_topic_matching() ->
exchange_op_callback(X, Fun, ExtraArgs) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end),
+ fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end),
rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs).
test_topic_expect_match(X, List) ->
lists:foreach(
- fun ({Key, Expected}) ->
- BinKey = list_to_binary(Key),
- Res = rabbit_exchange_type_topic:route(
- X, #delivery{message = #basic_message{routing_keys =
+ fun ({Key, Expected}) ->
+ BinKey = list_to_binary(Key),
+ Res = rabbit_exchange_type_topic:route(
+ X, #delivery{message = #basic_message{routing_keys =
[BinKey]}}),
- ExpectedRes = lists:map(
- fun (Q) -> #resource{virtual_host = <<"/">>,
- kind = queue,
- name = list_to_binary(Q)}
- end, Expected),
- true = (lists:usort(ExpectedRes) =:= lists:usort(Res))
- end, List).
+ ExpectedRes = lists:map(
+ fun (Q) -> #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}
+ end, Expected),
+ true = (lists:usort(ExpectedRes) =:= lists:usort(Res))
+ end, List).
test_app_management() ->
%% starting, stopping, status
@@ -818,7 +819,7 @@ test_log_management_during_startup() ->
ok = delete_log_handlers([sasl_report_tty_h]),
ok = case catch control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
- log_rotation_tty_no_handlers_test});
+ log_rotation_tty_no_handlers_test});
{error, {cannot_log_to_tty, _, _}} -> ok
end,
@@ -843,8 +844,8 @@ test_log_management_during_startup() ->
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
- log_rotation_no_write_permission_dir_test});
- {error, {cannot_log_to_file, _, _}} -> ok
+ log_rotation_no_write_permission_dir_test});
+ {error, {cannot_log_to_file, _, _}} -> ok
end,
%% start application with logging to a subdirectory which
@@ -854,9 +855,9 @@ test_log_management_during_startup() ->
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
- log_rotatation_parent_dirs_test});
+ log_rotatation_parent_dirs_test});
{error, {cannot_log_to_file, _,
- {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok
+ {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok
end,
ok = set_permissions(TmpDir, 8#00700),
ok = set_permissions(TmpLog, 8#00600),
@@ -876,22 +877,22 @@ test_log_management_during_startup() ->
passed.
test_option_parser() ->
- % command and arguments should just pass through
+ %% command and arguments should just pass through
ok = check_get_options({["mock_command", "arg1", "arg2"], []},
[], ["mock_command", "arg1", "arg2"]),
- % get flags
+ %% get flags
ok = check_get_options(
{["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]},
[{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]),
- % get options
+ %% get options
ok = check_get_options(
{["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]},
[{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}],
["mock_command", "-foo", "bar"]),
- % shuffled and interleaved arguments and options
+ %% shuffled and interleaved arguments and options
ok = check_get_options(
{["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]},
[{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}],
@@ -1143,7 +1144,7 @@ test_server_status() ->
[_|_] = rabbit_binding:list_for_source(
rabbit_misc:r(<<"/">>, exchange, <<"">>)),
[_] = rabbit_binding:list_for_destination(
- rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
+ rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
[_] = rabbit_binding:list_for_source_and_destination(
rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
@@ -1225,6 +1226,82 @@ test_statistics_receive_event1(Ch, Matcher) ->
after 1000 -> throw(failed_to_receive_event)
end.
+test_confirms_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ Pid ! Method,
+ test_confirms_receiver(Pid)
+ end.
+
+test_confirms() ->
+ {_Writer, Ch} = test_spawn(fun test_confirms_receiver/1),
+ DeclareBindDurableQueue =
+ fun() ->
+ rabbit_channel:do(Ch, #'queue.declare'{durable = true}),
+ receive #'queue.declare_ok'{queue = Q0} ->
+ rabbit_channel:do(Ch, #'queue.bind'{
+ queue = Q0,
+ exchange = <<"amq.direct">>,
+ routing_key = "magic" }),
+ receive #'queue.bind_ok'{} ->
+ Q0
+ after 1000 ->
+ throw(failed_to_bind_queue)
+ end
+ after 1000 ->
+ throw(failed_to_declare_queue)
+ end
+ end,
+ %% Declare and bind two queues
+ QName1 = DeclareBindDurableQueue(),
+ QName2 = DeclareBindDurableQueue(),
+ %% Get the first one's pid (we'll crash it later)
+ {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)),
+ QPid1 = Q1#amqqueue.pid,
+ %% Enable confirms
+ rabbit_channel:do(Ch, #'confirm.select'{}),
+ receive #'confirm.select_ok'{} ->
+ ok
+ after 1000 ->
+ throw(failed_to_enable_confirms)
+ end,
+ %% Publish a message
+ rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>,
+ routing_key = "magic"
+ },
+ rabbit_basic:build_content(
+ #'P_basic'{delivery_mode = 2}, <<"">>)),
+ %% Crash the queue
+ QPid1 ! boom,
+ %% Wait for a nack
+ receive
+ #'basic.nack'{} ->
+ ok;
+ #'basic.ack'{} ->
+ throw(received_ack_instead_of_nack)
+ after 2000 ->
+ throw(did_not_receive_nack)
+ end,
+ receive
+ #'basic.ack'{} ->
+ throw(received_ack_when_none_expected)
+ after 1000 ->
+ ok
+ end,
+ %% Cleanup
+ rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}),
+ receive #'queue.delete_ok'{} ->
+ ok
+ after 1000 ->
+ throw(failed_to_cleanup_queue)
+ end,
+ unlink(Ch),
+ ok = rabbit_channel:shutdown(Ch),
+
+ passed.
+
test_statistics() ->
application:set_env(rabbit, collect_statistics, fine),
@@ -1305,9 +1382,9 @@ test_delegates_async(SecondaryNode) ->
make_responder(FMsg) -> make_responder(FMsg, timeout).
make_responder(FMsg, Throw) ->
fun () ->
- receive Msg -> FMsg(Msg)
- after 1000 -> throw(Throw)
- end
+ receive Msg -> FMsg(Msg)
+ after 1000 -> throw(Throw)
+ end
end.
spawn_responders(Node, Responder, Count) ->
@@ -1318,10 +1395,10 @@ await_response(0) ->
await_response(Count) ->
receive
response -> ok,
- await_response(Count - 1)
+ await_response(Count - 1)
after 1000 ->
- io:format("Async reply not received~n"),
- throw(timeout)
+ io:format("Async reply not received~n"),
+ throw(timeout)
end.
must_exit(Fun) ->
@@ -1337,7 +1414,7 @@ test_delegates_sync(SecondaryNode) ->
BadSender = fun (_Pid) -> exit(exception) end,
Responder = make_responder(fun ({'$gen_call', From, invoked}) ->
- gen_server:reply(From, response)
+ gen_server:reply(From, response)
end),
BadResponder = make_responder(fun ({'$gen_call', From, invoked}) ->
@@ -1349,7 +1426,7 @@ test_delegates_sync(SecondaryNode) ->
must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end),
must_exit(fun () ->
- delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
+ delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
LocalGoodPids = spawn_responders(node(), Responder, 2),
RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
@@ -1405,7 +1482,7 @@ test_queue_cleanup(_SecondaryNode) ->
rabbit_channel:do(Ch, #'queue.declare'{ passive = true,
queue = ?CLEANUP_QUEUE_NAME }),
receive
- #'channel.close'{reply_code = 404} ->
+ #'channel.close'{reply_code = ?NOT_FOUND} ->
ok
after 2000 ->
throw(failed_to_receive_channel_exit)
@@ -1438,7 +1515,7 @@ test_declare_on_dead_queue(SecondaryNode) ->
throw(failed_to_create_and_kill_queue)
end.
-%---------------------------------------------------------------------
+%%---------------------------------------------------------------------
control_action(Command, Args) ->
control_action(Command, node(), Args, default_options()).
@@ -1602,50 +1679,50 @@ restart_msg_store_empty() ->
ok = rabbit_variable_queue:start_msg_store(
undefined, {fun (ok) -> finished end, ok}).
-guid_bin(X) ->
+msg_id_bin(X) ->
erlang:md5(term_to_binary(X)).
msg_store_client_init(MsgStore, Ref) ->
rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
-msg_store_contains(Atom, Guids, MSCState) ->
+msg_store_contains(Atom, MsgIds, MSCState) ->
Atom = lists:foldl(
- fun (Guid, Atom1) when Atom1 =:= Atom ->
- rabbit_msg_store:contains(Guid, MSCState) end,
- Atom, Guids).
+ fun (MsgId, Atom1) when Atom1 =:= Atom ->
+ rabbit_msg_store:contains(MsgId, MSCState) end,
+ Atom, MsgIds).
-msg_store_sync(Guids, MSCState) ->
+msg_store_sync(MsgIds, MSCState) ->
Ref = make_ref(),
Self = self(),
- ok = rabbit_msg_store:sync(Guids, fun () -> Self ! {sync, Ref} end,
+ ok = rabbit_msg_store:sync(MsgIds, fun () -> Self ! {sync, Ref} end,
MSCState),
receive
{sync, Ref} -> ok
after
10000 ->
- io:format("Sync from msg_store missing for guids ~p~n", [Guids]),
+ io:format("Sync from msg_store missing for msg_ids ~p~n", [MsgIds]),
throw(timeout)
end.
-msg_store_read(Guids, MSCState) ->
- lists:foldl(fun (Guid, MSCStateM) ->
- {{ok, Guid}, MSCStateN} = rabbit_msg_store:read(
- Guid, MSCStateM),
+msg_store_read(MsgIds, MSCState) ->
+ lists:foldl(fun (MsgId, MSCStateM) ->
+ {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read(
+ MsgId, MSCStateM),
MSCStateN
- end, MSCState, Guids).
+ end, MSCState, MsgIds).
-msg_store_write(Guids, MSCState) ->
- ok = lists:foldl(
- fun (Guid, ok) -> rabbit_msg_store:write(Guid, Guid, MSCState) end,
- ok, Guids).
+msg_store_write(MsgIds, MSCState) ->
+ ok = lists:foldl(fun (MsgId, ok) ->
+ rabbit_msg_store:write(MsgId, MsgId, MSCState)
+ end, ok, MsgIds).
-msg_store_remove(Guids, MSCState) ->
- rabbit_msg_store:remove(Guids, MSCState).
+msg_store_remove(MsgIds, MSCState) ->
+ rabbit_msg_store:remove(MsgIds, MSCState).
-msg_store_remove(MsgStore, Ref, Guids) ->
+msg_store_remove(MsgStore, Ref, MsgIds) ->
with_msg_store_client(MsgStore, Ref,
fun (MSCStateM) ->
- ok = msg_store_remove(Guids, MSCStateM),
+ ok = msg_store_remove(MsgIds, MSCStateM),
MSCStateM
end).
@@ -1655,140 +1732,140 @@ with_msg_store_client(MsgStore, Ref, Fun) ->
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
- lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end,
+ lists:foldl(fun (MsgId, MSCState) -> Fun(MsgId, MSCState) end,
msg_store_client_init(MsgStore, Ref), L)).
test_msg_store() ->
restart_msg_store_empty(),
Self = self(),
- Guids = [guid_bin(M) || M <- lists:seq(1,100)],
- {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids),
+ MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
+ {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds),
Ref = rabbit_guid:guid(),
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
%% check we don't contain any of the msgs we're about to publish
- false = msg_store_contains(false, Guids, MSCState),
+ false = msg_store_contains(false, MsgIds, MSCState),
%% publish the first half
- ok = msg_store_write(Guids1stHalf, MSCState),
+ ok = msg_store_write(MsgIds1stHalf, MSCState),
%% sync on the first half
- ok = msg_store_sync(Guids1stHalf, MSCState),
+ ok = msg_store_sync(MsgIds1stHalf, MSCState),
%% publish the second half
- ok = msg_store_write(Guids2ndHalf, MSCState),
+ ok = msg_store_write(MsgIds2ndHalf, MSCState),
%% sync on the first half again - the msg_store will be dirty, but
%% we won't need the fsync
- ok = msg_store_sync(Guids1stHalf, MSCState),
+ ok = msg_store_sync(MsgIds1stHalf, MSCState),
%% check they're all in there
- true = msg_store_contains(true, Guids, MSCState),
+ true = msg_store_contains(true, MsgIds, MSCState),
%% publish the latter half twice so we hit the caching and ref count code
- ok = msg_store_write(Guids2ndHalf, MSCState),
+ ok = msg_store_write(MsgIds2ndHalf, MSCState),
%% check they're still all in there
- true = msg_store_contains(true, Guids, MSCState),
+ true = msg_store_contains(true, MsgIds, MSCState),
%% sync on the 2nd half, but do lots of individual syncs to try
%% and cause coalescing to happen
ok = lists:foldl(
- fun (Guid, ok) -> rabbit_msg_store:sync(
- [Guid], fun () -> Self ! {sync, Guid} end,
- MSCState)
- end, ok, Guids2ndHalf),
+ fun (MsgId, ok) -> rabbit_msg_store:sync(
+ [MsgId], fun () -> Self ! {sync, MsgId} end,
+ MSCState)
+ end, ok, MsgIds2ndHalf),
lists:foldl(
- fun(Guid, ok) ->
+ fun(MsgId, ok) ->
receive
- {sync, Guid} -> ok
+ {sync, MsgId} -> ok
after
10000 ->
- io:format("Sync from msg_store missing (guid: ~p)~n",
- [Guid]),
+ io:format("Sync from msg_store missing (msg_id: ~p)~n",
+ [MsgId]),
throw(timeout)
end
- end, ok, Guids2ndHalf),
+ end, ok, MsgIds2ndHalf),
%% it's very likely we're not dirty here, so the 1st half sync
%% should hit a different code path
- ok = msg_store_sync(Guids1stHalf, MSCState),
+ ok = msg_store_sync(MsgIds1stHalf, MSCState),
%% read them all
- MSCState1 = msg_store_read(Guids, MSCState),
+ MSCState1 = msg_store_read(MsgIds, MSCState),
%% read them all again - this will hit the cache, not disk
- MSCState2 = msg_store_read(Guids, MSCState1),
+ MSCState2 = msg_store_read(MsgIds, MSCState1),
%% remove them all
- ok = rabbit_msg_store:remove(Guids, MSCState2),
+ ok = rabbit_msg_store:remove(MsgIds, MSCState2),
%% check first half doesn't exist
- false = msg_store_contains(false, Guids1stHalf, MSCState2),
+ false = msg_store_contains(false, MsgIds1stHalf, MSCState2),
%% check second half does exist
- true = msg_store_contains(true, Guids2ndHalf, MSCState2),
+ true = msg_store_contains(true, MsgIds2ndHalf, MSCState2),
%% read the second half again
- MSCState3 = msg_store_read(Guids2ndHalf, MSCState2),
+ MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2),
%% release the second half, just for fun (aka code coverage)
- ok = rabbit_msg_store:release(Guids2ndHalf, MSCState3),
+ ok = rabbit_msg_store:release(MsgIds2ndHalf, MSCState3),
%% read the second half again, just for fun (aka code coverage)
- MSCState4 = msg_store_read(Guids2ndHalf, MSCState3),
+ MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3),
ok = rabbit_msg_store:client_terminate(MSCState4),
%% stop and restart, preserving every other msg in 2nd half
ok = rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start_msg_store(
[], {fun ([]) -> finished;
- ([Guid|GuidsTail])
- when length(GuidsTail) rem 2 == 0 ->
- {Guid, 1, GuidsTail};
- ([Guid|GuidsTail]) ->
- {Guid, 0, GuidsTail}
- end, Guids2ndHalf}),
+ ([MsgId|MsgIdsTail])
+ when length(MsgIdsTail) rem 2 == 0 ->
+ {MsgId, 1, MsgIdsTail};
+ ([MsgId|MsgIdsTail]) ->
+ {MsgId, 0, MsgIdsTail}
+ end, MsgIds2ndHalf}),
MSCState5 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
%% check we have the right msgs left
lists:foldl(
- fun (Guid, Bool) ->
- not(Bool = rabbit_msg_store:contains(Guid, MSCState5))
- end, false, Guids2ndHalf),
+ fun (MsgId, Bool) ->
+ not(Bool = rabbit_msg_store:contains(MsgId, MSCState5))
+ end, false, MsgIds2ndHalf),
ok = rabbit_msg_store:client_terminate(MSCState5),
%% restart empty
restart_msg_store_empty(),
MSCState6 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
%% check we don't contain any of the msgs
- false = msg_store_contains(false, Guids, MSCState6),
+ false = msg_store_contains(false, MsgIds, MSCState6),
%% publish the first half again
- ok = msg_store_write(Guids1stHalf, MSCState6),
+ ok = msg_store_write(MsgIds1stHalf, MSCState6),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
- msg_store_read(Guids1stHalf, MSCState6)),
+ msg_store_read(MsgIds1stHalf, MSCState6)),
MSCState7 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
- ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7),
+ ok = rabbit_msg_store:remove(MsgIds1stHalf, MSCState7),
ok = rabbit_msg_store:client_terminate(MSCState7),
%% restart empty
- restart_msg_store_empty(), %% now safe to reuse guids
+ restart_msg_store_empty(), %% now safe to reuse msg_ids
%% push a lot of msgs in... at least 100 files worth
{ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit),
PayloadSizeBits = 65536,
BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)),
- GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)],
+ MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)],
Payload = << 0:PayloadSizeBits >>,
ok = with_msg_store_client(
?PERSISTENT_MSG_STORE, Ref,
fun (MSCStateM) ->
- [ok = rabbit_msg_store:write(Guid, Payload, MSCStateM) ||
- Guid <- GuidsBig],
+ [ok = rabbit_msg_store:write(MsgId, Payload, MSCStateM) ||
+ MsgId <- MsgIdsBig],
MSCStateM
end),
%% now read them to ensure we hit the fast client-side reading
ok = foreach_with_msg_store_client(
?PERSISTENT_MSG_STORE, Ref,
- fun (Guid, MSCStateM) ->
+ fun (MsgId, MSCStateM) ->
{{ok, Payload}, MSCStateN} = rabbit_msg_store:read(
- Guid, MSCStateM),
+ MsgId, MSCStateM),
MSCStateN
- end, GuidsBig),
+ end, MsgIdsBig),
%% .., then 3s by 1...
ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
- [guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]),
+ [msg_id_bin(X) || X <- lists:seq(BigCount, 1, -3)]),
%% .., then remove 3s by 2, from the young end first. This hits
%% GC (under 50% good data left, but no empty files. Must GC).
ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
- [guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]),
+ [msg_id_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]),
%% .., then remove 3s by 3, from the young end first. This hits
%% GC...
ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
- [guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]),
+ [msg_id_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]),
%% ensure empty
ok = with_msg_store_client(
?PERSISTENT_MSG_STORE, Ref,
fun (MSCStateM) ->
- false = msg_store_contains(false, GuidsBig, MSCStateM),
+ false = msg_store_contains(false, MsgIdsBig, MSCStateM),
MSCStateM
end),
%% restart empty
@@ -1808,8 +1885,8 @@ init_test_queue() ->
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
TestQueue, Terms, false,
- fun (Guid) ->
- rabbit_msg_store:contains(Guid, PersistentClient)
+ fun (MsgId) ->
+ rabbit_msg_store:contains(MsgId, PersistentClient)
end,
fun nop/1),
ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient),
@@ -1840,25 +1917,25 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
false -> ?TRANSIENT_MSG_STORE
end,
MSCState = msg_store_client_init(MsgStore, Ref),
- {A, B = [{_SeqId, LastGuidWritten} | _]} =
+ {A, B = [{_SeqId, LastMsgIdWritten} | _]} =
lists:foldl(
- fun (SeqId, {QiN, SeqIdsGuidsAcc}) ->
- Guid = rabbit_guid:guid(),
+ fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
+ MsgId = rabbit_guid:guid(),
QiM = rabbit_queue_index:publish(
- Guid, SeqId, #message_properties{}, Persistent, QiN),
- ok = rabbit_msg_store:write(Guid, Guid, MSCState),
- {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]}
+ MsgId, SeqId, #message_properties{}, Persistent, QiN),
+ ok = rabbit_msg_store:write(MsgId, MsgId, MSCState),
+ {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]}
end, {Qi, []}, SeqIds),
%% do this just to force all of the publishes through to the msg_store:
- true = rabbit_msg_store:contains(LastGuidWritten, MSCState),
+ true = rabbit_msg_store:contains(LastMsgIdWritten, MSCState),
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
{A, B}.
verify_read_with_published(_Delivered, _Persistent, [], _) ->
ok;
verify_read_with_published(Delivered, Persistent,
- [{Guid, SeqId, _Props, Persistent, Delivered}|Read],
- [{SeqId, Guid}|Published]) ->
+ [{MsgId, SeqId, _Props, Persistent, Delivered}|Read],
+ [{SeqId, MsgId}|Published]) ->
verify_read_with_published(Delivered, Persistent, Read, Published);
verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
ko.
@@ -1866,10 +1943,10 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
- Guid = rabbit_guid:guid(),
+ MsgId = rabbit_guid:guid(),
Props = #message_properties{expiry=12345},
- Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0),
- {[{Guid, 1, Props, _, _}], Qi2} =
+ Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0),
+ {[{MsgId, 1, Props, _, _}], Qi2} =
rabbit_queue_index:read(1, 2, Qi1),
Qi2
end),
@@ -1891,19 +1968,19 @@ test_queue_index() ->
with_empty_test_queue(
fun (Qi0) ->
{0, 0, Qi1} = rabbit_queue_index:bounds(Qi0),
- {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1),
+ {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
{0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2),
{ReadA, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
ok = verify_read_with_published(false, false, ReadA,
- lists:reverse(SeqIdsGuidsA)),
+ lists:reverse(SeqIdsMsgIdsA)),
%% should get length back as 0, as all the msgs were transient
{0, Qi6} = restart_test_queue(Qi4),
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
- {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
+ {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
{ReadB, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9),
ok = verify_read_with_published(false, true, ReadB,
- lists:reverse(SeqIdsGuidsB)),
+ lists:reverse(SeqIdsMsgIdsB)),
%% should get length back as MostOfASegment
LenB = length(SeqIdsB),
{LenB, Qi12} = restart_test_queue(Qi10),
@@ -1911,7 +1988,7 @@ test_queue_index() ->
Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13),
{ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
ok = verify_read_with_published(true, true, ReadC,
- lists:reverse(SeqIdsGuidsB)),
+ lists:reverse(SeqIdsMsgIdsB)),
Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15),
Qi17 = rabbit_queue_index:flush(Qi16),
%% Everything will have gone now because #pubs == #acks
@@ -1927,12 +2004,12 @@ test_queue_index() ->
%% a) partial pub+del+ack, then move to new segment
with_empty_test_queue(
fun (Qi0) ->
- {Qi1, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC,
+ {Qi1, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC,
false, Qi0),
Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1),
Qi3 = rabbit_queue_index:ack(SeqIdsC, Qi2),
Qi4 = rabbit_queue_index:flush(Qi3),
- {Qi5, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize],
+ {Qi5, _SeqIdsMsgIdsC1} = queue_index_publish([SegmentSize],
false, Qi4),
Qi5
end),
@@ -1940,10 +2017,10 @@ test_queue_index() ->
%% b) partial pub+del, then move to new segment, then ack all in old segment
with_empty_test_queue(
fun (Qi0) ->
- {Qi1, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC,
+ {Qi1, _SeqIdsMsgIdsC2} = queue_index_publish(SeqIdsC,
false, Qi0),
Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1),
- {Qi3, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize],
+ {Qi3, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize],
false, Qi2),
Qi4 = rabbit_queue_index:ack(SeqIdsC, Qi3),
rabbit_queue_index:flush(Qi4)
@@ -1952,8 +2029,8 @@ test_queue_index() ->
%% c) just fill up several segments of all pubs, then +dels, then +acks
with_empty_test_queue(
fun (Qi0) ->
- {Qi1, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD,
- false, Qi0),
+ {Qi1, _SeqIdsMsgIdsD} = queue_index_publish(SeqIdsD,
+ false, Qi0),
Qi2 = rabbit_queue_index:deliver(SeqIdsD, Qi1),
Qi3 = rabbit_queue_index:ack(SeqIdsD, Qi2),
rabbit_queue_index:flush(Qi3)
@@ -1986,12 +2063,12 @@ test_queue_index() ->
%% exercise journal_minus_segment, not segment_plus_journal.
with_empty_test_queue(
fun (Qi0) ->
- {Qi1, _SeqIdsGuidsE} = queue_index_publish([0,1,2,4,5,7],
+ {Qi1, _SeqIdsMsgIdsE} = queue_index_publish([0,1,2,4,5,7],
true, Qi0),
Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1),
Qi3 = rabbit_queue_index:ack([0], Qi2),
{5, Qi4} = restart_test_queue(Qi3),
- {Qi5, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi4),
+ {Qi5, _SeqIdsMsgIdsF} = queue_index_publish([3,6,8], true, Qi4),
Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5),
Qi7 = rabbit_queue_index:ack([1,2,3], Qi6),
{5, Qi8} = restart_test_queue(Qi7),
@@ -2195,7 +2272,7 @@ check_variable_queue_status(VQ0, Props) ->
variable_queue_wait_for_shuffling_end(VQ) ->
case rabbit_variable_queue:needs_idle_timeout(VQ) of
true -> variable_queue_wait_for_shuffling_end(
- rabbit_variable_queue:idle_timeout(VQ));
+ rabbit_variable_queue:idle_timeout(VQ));
false -> VQ
end.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index ab2300c0..1f0f8bbe 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -21,7 +21,7 @@
-ifdef(use_specs).
-export_type([txn/0, maybe/1, info/0, infos/0, info_key/0, info_keys/0,
- message/0, basic_message/0,
+ message/0, msg_id/0, basic_message/0,
delivery/0, content/0, decoded_content/0, undecoded_content/0,
unencoded_content/0, encoded_content/0, message_properties/0,
vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0,
@@ -42,39 +42,40 @@
%% TODO: make this more precise by tying specific class_ids to
%% specific properties
-type(undecoded_content() ::
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: 'none',
- properties_bin :: binary(),
- payload_fragments_rev :: [binary()]} |
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: rabbit_framing:amqp_property_record(),
- properties_bin :: 'none',
- payload_fragments_rev :: [binary()]}).
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: 'none',
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]} |
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: rabbit_framing:amqp_property_record(),
+ properties_bin :: 'none',
+ payload_fragments_rev :: [binary()]}).
-type(unencoded_content() :: undecoded_content()).
-type(decoded_content() ::
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: rabbit_framing:amqp_property_record(),
- properties_bin :: maybe(binary()),
- payload_fragments_rev :: [binary()]}).
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: rabbit_framing:amqp_property_record(),
+ properties_bin :: maybe(binary()),
+ payload_fragments_rev :: [binary()]}).
-type(encoded_content() ::
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: maybe(rabbit_framing:amqp_property_record()),
- properties_bin :: binary(),
- payload_fragments_rev :: [binary()]}).
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: maybe(rabbit_framing:amqp_property_record()),
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]}).
-type(content() :: undecoded_content() | decoded_content()).
+-type(msg_id() :: rabbit_guid:guid()).
-type(basic_message() ::
- #basic_message{exchange_name :: rabbit_exchange:name(),
- routing_keys :: [rabbit_router:routing_key()],
- content :: content(),
- guid :: rabbit_guid:guid(),
- is_persistent :: boolean()}).
+ #basic_message{exchange_name :: rabbit_exchange:name(),
+ routing_keys :: [rabbit_router:routing_key()],
+ content :: content(),
+ id :: msg_id(),
+ is_persistent :: boolean()}).
-type(message() :: basic_message()).
-type(delivery() ::
- #delivery{mandatory :: boolean(),
- immediate :: boolean(),
- txn :: maybe(txn()),
- sender :: pid(),
- message :: message()}).
+ #delivery{mandatory :: boolean(),
+ immediate :: boolean(),
+ txn :: maybe(txn()),
+ sender :: pid(),
+ message :: message()}).
-type(message_properties() ::
#message_properties{expiry :: pos_integer() | 'undefined',
needs_confirming :: boolean()}).
@@ -89,9 +90,9 @@
-type(infos() :: [info()]).
-type(amqp_error() ::
- #amqp_error{name :: rabbit_framing:amqp_exception(),
- explanation :: string(),
- method :: rabbit_framing:amqp_method_name()}).
+ #amqp_error{name :: rabbit_framing:amqp_exception(),
+ explanation :: string(),
+ method :: rabbit_framing:amqp_method_name()}).
-type(r(Kind) ::
r2(vhost(), Kind)).
@@ -103,34 +104,34 @@
name :: Name}).
-type(listener() ::
- #listener{node :: node(),
- protocol :: atom(),
- host :: rabbit_networking:hostname(),
- port :: rabbit_networking:ip_port()}).
+ #listener{node :: node(),
+ protocol :: atom(),
+ host :: rabbit_networking:hostname(),
+ port :: rabbit_networking:ip_port()}).
-type(binding_source() :: rabbit_exchange:name()).
-type(binding_destination() :: rabbit_amqqueue:name() | rabbit_exchange:name()).
-type(binding() ::
- #binding{source :: rabbit_exchange:name(),
- destination :: binding_destination(),
- key :: rabbit_binding:key(),
- args :: rabbit_framing:amqp_table()}).
+ #binding{source :: rabbit_exchange:name(),
+ destination :: binding_destination(),
+ key :: rabbit_binding:key(),
+ args :: rabbit_framing:amqp_table()}).
-type(amqqueue() ::
- #amqqueue{name :: rabbit_amqqueue:name(),
- durable :: boolean(),
- auto_delete :: boolean(),
- exclusive_owner :: rabbit_types:maybe(pid()),
- arguments :: rabbit_framing:amqp_table(),
- pid :: rabbit_types:maybe(pid())}).
+ #amqqueue{name :: rabbit_amqqueue:name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: rabbit_types:maybe(pid()),
+ arguments :: rabbit_framing:amqp_table(),
+ pid :: rabbit_types:maybe(pid())}).
-type(exchange() ::
- #exchange{name :: rabbit_exchange:name(),
- type :: rabbit_exchange:type(),
- durable :: boolean(),
- auto_delete :: boolean(),
- arguments :: rabbit_framing:amqp_table()}).
+ #exchange{name :: rabbit_exchange:name(),
+ type :: rabbit_exchange:type(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ arguments :: rabbit_framing:amqp_table()}).
-type(connection() :: pid()).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 89acc10c..ebda5d03 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -106,9 +106,9 @@ upgrades_to_apply(Heads, G) ->
%% everything we've already applied. Subtract that from all
%% vertices: that's what we have to apply.
Unsorted = sets:to_list(
- sets:subtract(
- sets:from_list(digraph:vertices(G)),
- sets:from_list(digraph_utils:reaching(Heads, G)))),
+ sets:subtract(
+ sets:from_list(digraph:vertices(G)),
+ sets:from_list(digraph_utils:reaching(Heads, G)))),
%% Form a subgraph from that list and find a topological ordering
%% so we can invoke them in order.
[element(2, digraph:vertex(G, StepName)) ||
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 58a28d32..be6691e9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -156,7 +156,7 @@
%% segments.
%%
%% Pending acks are recorded in memory either as the tuple {SeqId,
-%% Guid, MsgProps} (tuple-form) or as the message itself (message-
+%% MsgId, MsgProps} (tuple-form) or as the message itself (message-
%% form). Acks for persistent messages are always stored in the tuple-
%% form. Acks for transient messages are also stored in tuple-form if
%% the message has been sent to disk as part of the memory reduction
@@ -261,20 +261,20 @@
-record(msg_status,
{ seq_id,
- guid,
+ msg_id,
msg,
is_persistent,
is_delivered,
msg_on_disk,
index_on_disk,
msg_props
- }).
+ }).
-record(delta,
{ start_seq_id, %% start_seq_id is inclusive
count,
end_seq_id %% end_seq_id is exclusive
- }).
+ }).
-record(tx, { pending_messages, pending_acks }).
@@ -400,10 +400,10 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
init(QueueName, IsDurable, Recover,
- fun (Guids, ActionTaken) ->
- msgs_written_to_disk(Self, Guids, ActionTaken)
+ fun (MsgIds, ActionTaken) ->
+ msgs_written_to_disk(Self, MsgIds, ActionTaken)
end,
- fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
+ fun (MsgIds) -> msg_indices_written_to_disk(Self, MsgIds) end).
init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
@@ -432,8 +432,8 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
rabbit_queue_index:recover(
QueueName, Terms1,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
- fun (Guid) ->
- rabbit_msg_store:contains(Guid, PersistentClient)
+ fun (MsgId) ->
+ rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
init(true, IndexState, DeltaCount, Terms1,
@@ -509,17 +509,16 @@ publish(Msg, MsgProps, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, #basic_message { guid = Guid },
- #message_properties {
- needs_confirming = NeedsConfirming },
+publish_delivered(false, #basic_message { id = MsgId },
+ #message_properties { needs_confirming = NeedsConfirming },
State = #vqstate { len = 0 }) ->
case NeedsConfirming of
- true -> blind_confirm(self(), gb_sets:singleton(Guid));
+ true -> blind_confirm(self(), gb_sets:singleton(MsgId));
false -> ok
end,
{undefined, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
- guid = Guid },
+ id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
State = #vqstate { len = 0,
@@ -535,7 +534,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
{SeqId, a(reduce_memory_use(
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
@@ -545,7 +544,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
- State1.
+ a(State1).
dropwhile1(Pred, State) ->
internal_queue_out(
@@ -586,12 +585,12 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
end.
read_msg(MsgStatus = #msg_status { msg = undefined,
- guid = Guid,
+ msg_id = MsgId,
is_persistent = IsPersistent },
State = #vqstate { ram_msg_count = RamMsgCount,
msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, Guid),
+ msg_store_read(MSCState, IsPersistent, MsgId),
{MsgStatus #msg_status { msg = Msg },
State #vqstate { ram_msg_count = RamMsgCount + 1,
msg_store_clients = MSCState1 }};
@@ -600,7 +599,7 @@ read_msg(MsgStatus, State) ->
internal_fetch(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
- guid = Guid,
+ msg_id = MsgId,
msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -619,7 +618,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
%% 2. Remove from msg_store and queue index, if necessary
Rem = fun () ->
- ok = msg_store_remove(MSCState, IsPersistent, [Guid])
+ ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
IndexState2 =
@@ -632,12 +631,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
- true -> StateN = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State),
- {SeqId, StateN};
- false -> {undefined, State}
- end,
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+ {SeqId, StateN};
+ false -> {undefined, State}
+ end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
@@ -678,7 +677,8 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable,
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
ok = case IsDurable of
- true -> msg_store_remove(MSCState, true, persistent_guids(Pubs));
+ true -> msg_store_remove(MSCState, true,
+ persistent_msg_ids(Pubs));
false -> ok
end,
{lists:append(AckTags), a(State)}.
@@ -689,13 +689,13 @@ tx_commit(Txn, Fun, MsgPropsFun,
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
AckTags1 = lists:append(AckTags),
- PersistentGuids = persistent_guids(Pubs),
- HasPersistentPubs = PersistentGuids =/= [],
+ PersistentMsgIds = persistent_msg_ids(Pubs),
+ HasPersistentPubs = PersistentMsgIds =/= [],
{AckTags1,
a(case IsDurable andalso HasPersistentPubs of
true -> ok = msg_store_sync(
- MSCState, true, PersistentGuids,
- msg_store_callback(PersistentGuids, Pubs, AckTags1,
+ MSCState, true, PersistentMsgIds,
+ msg_store_callback(PersistentMsgIds, Pubs, AckTags1,
Fun, MsgPropsFun)),
State;
false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
@@ -713,10 +713,10 @@ requeue(AckTags, MsgPropsFun, State) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
true, false, State1),
State2;
- ({IsPersistent, Guid, MsgProps}, State1) ->
+ ({IsPersistent, MsgId, MsgProps}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, Guid),
+ msg_store_read(MSCState, IsPersistent, MsgId),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
{_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps),
true, true, State2),
@@ -777,8 +777,8 @@ ram_duration(State = #vqstate {
RamAckCount = gb_trees:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
- case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
- AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of
+ case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
+ AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of
true -> infinity;
false -> (RamMsgCountPrev + RamMsgCount +
RamAckCount + RamAckCountPrev) /
@@ -905,12 +905,12 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a guid to the unconfirmed set
+%% when requeueing, we re-add a msg_id to the unconfirmed set
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
+msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
MsgProps) ->
- #msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
+ #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.
@@ -937,30 +937,30 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) ->
MsgStore, Ref, MsgOnDiskFun,
msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)).
-msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
+msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end).
+ fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end).
-msg_store_read(MSCState, IsPersistent, Guid) ->
+msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:read(Guid, MSCState1) end).
+ fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end).
-msg_store_remove(MSCState, IsPersistent, Guids) ->
+msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:remove(Guids, MCSState1) end).
+ fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
-msg_store_release(MSCState, IsPersistent, Guids) ->
+msg_store_release(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:release(Guids, MCSState1) end).
+ fun (MCSState1) -> rabbit_msg_store:release(MsgIds, MCSState1) end).
-msg_store_sync(MSCState, IsPersistent, Guids, Callback) ->
+msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end).
+ fun (MSCState1) -> rabbit_msg_store:sync(MsgIds, Fun, MSCState1) end).
msg_store_close_fds(MSCState, IsPersistent) ->
with_msg_store_state(
@@ -994,21 +994,21 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
erase_tx(Txn) -> erase({txn, Txn}).
-persistent_guids(Pubs) ->
- [Guid || {#basic_message { guid = Guid,
- is_persistent = true }, _MsgProps} <- Pubs].
+persistent_msg_ids(Pubs) ->
+ [MsgId || {#basic_message { id = MsgId,
+ is_persistent = true }, _MsgProps} <- Pubs].
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered},
+ fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
{Filtered1, Delivers1, Acks1}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
false -> {[m(#msg_status { msg = undefined,
- guid = Guid,
+ msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -1114,7 +1114,7 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
-msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
+msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
Self, fun (StateN) -> {[], tx_commit_post_msg_store(
@@ -1124,14 +1124,14 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
fun () -> remove_persistent_messages(
- PersistentGuids)
+ PersistentMsgIds)
end, F)
end)
end.
-remove_persistent_messages(Guids) ->
+remove_persistent_messages(MsgIds) ->
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined),
- ok = rabbit_msg_store:remove(Guids, PersistentClient),
+ ok = rabbit_msg_store:remove(MsgIds, PersistentClient),
rabbit_msg_store:client_delete_and_terminate(PersistentClient).
tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
@@ -1149,7 +1149,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
case dict:fetch(AckTag, PA) of
#msg_status {} ->
false;
- {IsPersistent, _Guid, _MsgProps} ->
+ {IsPersistent, _MsgId, _MsgProps} ->
IsPersistent
end];
false -> []
@@ -1215,38 +1215,38 @@ purge_betas_and_deltas(LensByStore,
end.
remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) ->
- {GuidsByStore, Delivers, Acks} =
+ {MsgIdsByStore, Delivers, Acks} =
Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
- ok = orddict:fold(fun (IsPersistent, Guids, ok) ->
- msg_store_remove(MSCState, IsPersistent, Guids)
- end, ok, GuidsByStore),
- {sum_guids_by_store_to_len(LensByStore, GuidsByStore),
+ ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
+ msg_store_remove(MSCState, IsPersistent, MsgIds)
+ end, ok, MsgIdsByStore),
+ {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore),
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
- #msg_status { guid = Guid, seq_id = SeqId,
+ #msg_status { msg_id = MsgId, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {GuidsByStore, Delivers, Acks}) ->
+ {MsgIdsByStore, Delivers, Acks}) ->
{case MsgOnDisk of
- true -> rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore);
- false -> GuidsByStore
+ true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
+ false -> MsgIdsByStore
end,
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
-sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
+sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
orddict:fold(
- fun (IsPersistent, Guids, LensByStore1) ->
- orddict:update_counter(IsPersistent, length(Guids), LensByStore1)
- end, LensByStore, GuidsByStore).
+ fun (IsPersistent, MsgIds, LensByStore1) ->
+ orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1)
+ end, LensByStore, MsgIdsByStore).
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
-publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
+publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
@@ -1266,7 +1266,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
@@ -1278,14 +1278,14 @@ maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
MsgStatus;
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
- msg = Msg, guid = Guid,
+ msg = Msg, msg_id = MsgId,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
- ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1),
+ ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
MsgStatus #msg_status { msg_on_disk = true };
maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
MsgStatus.
@@ -1295,7 +1295,7 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- guid = Guid,
+ msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -1303,7 +1303,7 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
IndexState1 = rabbit_queue_index:publish(
- Guid, SeqId, MsgProps, IsPersistent, IndexState),
+ MsgId, SeqId, MsgProps, IsPersistent, IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
@@ -1322,7 +1322,7 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%%----------------------------------------------------------------------------
record_pending_ack(#msg_status { seq_id = SeqId,
- guid = Guid,
+ msg_id = MsgId,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk,
msg_props = MsgProps } = MsgStatus,
@@ -1331,8 +1331,8 @@ record_pending_ack(#msg_status { seq_id = SeqId,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true -> {{IsPersistent, Guid, MsgProps}, RAI};
- false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
+ true -> {{IsPersistent, MsgId, MsgProps}, RAI};
+ false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
end,
PA1 = dict:store(SeqId, AckEntry, PA),
State #vqstate { pending_ack = PA1,
@@ -1343,28 +1343,28 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, GuidsByStore} =
+ {PersistentSeqIds, MsgIdsByStore} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
- true -> case orddict:find(false, GuidsByStore) of
- error -> State1;
- {ok, Guids} -> ok = msg_store_remove(MSCState, false,
- Guids),
+ true -> case orddict:find(false, MsgIdsByStore) of
+ error -> State1;
+ {ok, MsgIds} -> ok = msg_store_remove(MSCState, false,
+ MsgIds),
State1
end;
false -> IndexState1 =
rabbit_queue_index:ack(PersistentSeqIds, IndexState),
- [ok = msg_store_remove(MSCState, IsPersistent, Guids)
- || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
+ [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
State1 #vqstate { index_state = IndexState1 }
end.
ack(_MsgStoreFun, _Fun, [], State) ->
State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, GuidsByStore},
+ {{PersistentSeqIds, MsgIdsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1380,10 +1380,10 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
gb_trees:delete_any(SeqId, RAI)})}
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
- [ok = MsgStoreFun(MSCState, IsPersistent, Guids)
- || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
- PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
- orddict:new(), GuidsByStore)),
+ [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
+ PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
+ orddict:new(), MsgIdsByStore)),
State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) }.
@@ -1393,12 +1393,12 @@ accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false },
- {PersistentSeqIdsAcc, GuidsByStore}) ->
- {PersistentSeqIdsAcc, GuidsByStore};
-accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
- {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore};
+accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps},
+ {PersistentSeqIdsAcc, MsgIdsByStore}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
+ rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1417,12 +1417,12 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
false -> State
end.
-remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
+remove_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet),
- msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet),
- unconfirmed = gb_sets:difference(UC, GuidSet) }.
+ State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet),
+ msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet),
+ unconfirmed = gb_sets:difference(UC, MsgIdSet) }.
needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1439,37 +1439,37 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
%% subtraction.
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-msgs_confirmed(GuidSet, State) ->
- {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
+msgs_confirmed(MsgIdSet, State) ->
+ {gb_sets:to_list(MsgIdSet), remove_confirms(MsgIdSet, State)}.
-blind_confirm(QPid, GuidSet) ->
+blind_confirm(QPid, MsgIdSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State) -> msgs_confirmed(GuidSet, State) end).
+ QPid, fun (State) -> msgs_confirmed(MsgIdSet, State) end).
-msgs_written_to_disk(QPid, GuidSet, removed) ->
- blind_confirm(QPid, GuidSet);
-msgs_written_to_disk(QPid, GuidSet, written) ->
+msgs_written_to_disk(QPid, MsgIdSet, removed) ->
+ blind_confirm(QPid, MsgIdSet);
+msgs_written_to_disk(QPid, MsgIdSet, written) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
+ Written = gb_sets:intersection(UC, MsgIdSet),
+ msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD),
State #vqstate {
msgs_on_disk =
- gb_sets:union(
- MOD, gb_sets:intersection(UC, GuidSet)) })
+ gb_sets:union(MOD, Written) })
end).
-msg_indices_written_to_disk(QPid, GuidSet) ->
+msg_indices_written_to_disk(QPid, MsgIdSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
+ Written = gb_sets:intersection(UC, MsgIdSet),
+ msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD),
State #vqstate {
msg_indices_on_disk =
- gb_sets:union(
- MIOD, gb_sets:intersection(UC, GuidSet)) })
+ gb_sets:union(MIOD, Written) })
end).
%%----------------------------------------------------------------------------
@@ -1547,17 +1547,16 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
true ->
{Quota, State};
false ->
- {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI),
+ {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
MsgStatus = #msg_status {
- guid = Guid, %% ASSERTION
+ msg_id = MsgId, %% ASSERTION
is_persistent = false, %% ASSERTION
msg_props = MsgProps } = dict:fetch(SeqId, PA),
{_, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
+ PA1 = dict:store(SeqId, {false, MsgId, MsgProps}, PA),
limit_ram_acks(Quota - 1,
- State1 #vqstate {
- pending_ack =
- dict:store(SeqId, {false, Guid, MsgProps}, PA),
- ram_ack_index = RAI1 })
+ State1 #vqstate { pending_ack = PA1,
+ ram_ack_index = RAI1 })
end.
@@ -1817,12 +1816,12 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
multiple_routing_keys() ->
transform_storage(
- fun ({basic_message, ExchangeName, Routing_Key, Content,
- Guid, Persistent}) ->
- {ok, {basic_message, ExchangeName, [Routing_Key], Content,
- Guid, Persistent}};
- (_) -> {error, corrupt_message}
- end),
+ fun ({basic_message, ExchangeName, Routing_Key, Content,
+ MsgId, Persistent}) ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ MsgId, Persistent}};
+ (_) -> {error, corrupt_message}
+ end),
ok.
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index efebef06..24c130ed 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -48,15 +48,15 @@ add(VHostPath) ->
ok;
(ok, false) ->
[rabbit_exchange:declare(
- rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]],
ok
end),
rabbit_log:info("Added vhost ~p~n", [VHostPath]),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index eba86a55..ac3434d2 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -28,7 +28,7 @@
-define(HIBERNATE_AFTER, 5000).
-%%----------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
-ifdef(use_specs).
@@ -69,7 +69,7 @@
-endif.
-%%----------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
{ok,
@@ -133,7 +133,7 @@ handle_message({inet_reply, _, Status}, _State) ->
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
send_command(W, MethodRecord) ->
W ! {send_command, MethodRecord},
@@ -157,13 +157,13 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
call(Pid, Msg) ->
{ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
Res.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
assemble_frame(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 44e1e4b5..dcc6aff5 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -175,10 +175,10 @@ internal_update(State = #state { memory_limit = MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info(set, MemUsed, MemLimit),
- alarm_handler:set_alarm({vm_memory_high_watermark, []});
+ alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []});
{true, false} ->
emit_update_info(clear, MemUsed, MemLimit),
- alarm_handler:clear_alarm(vm_memory_high_watermark);
+ alarm_handler:clear_alarm({vm_memory_high_watermark, node()});
_ ->
ok
end,