summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-11 18:48:23 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-11 18:48:23 +0100
commit0725324ad4c4bd5648feff9c9f704388b2c6d067 (patch)
treecfa98020569de6b5fd087e1ac1f14cb6ba4482c6
parent5757824244675fb7d0c71dcb464da8e8536c8013 (diff)
parentf5ce49652d4e5c143e356969e9b7bc9e665e617d (diff)
downloadrabbitmq-server-0725324ad4c4bd5648feff9c9f704388b2c6d067.tar.gz
merge from default
-rw-r--r--Makefile20
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/Makefile2
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/generic-unix/Makefile2
-rw-r--r--packaging/windows-exe/Makefile4
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in18
-rw-r--r--packaging/windows/Makefile4
-rwxr-xr-xquickcheck36
-rw-r--r--src/file_handle_cache.erl12
-rw-r--r--src/gen_server2.erl92
-rw-r--r--src/gm.erl23
-rw-r--r--src/priority_queue.erl36
-rw-r--r--src/rabbit.erl14
-rw-r--r--src/rabbit_amqqueue.erl25
-rw-r--r--src/rabbit_amqqueue_process.erl89
-rw-r--r--src/rabbit_backing_queue_qc.erl392
-rw-r--r--src/rabbit_channel.erl45
-rw-r--r--src/rabbit_control.erl23
-rw-r--r--src/rabbit_event.erl16
-rw-r--r--src/rabbit_limiter.erl27
-rw-r--r--src/rabbit_mirror_queue_slave.erl37
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl12
-rw-r--r--src/rabbit_misc.erl37
-rw-r--r--src/rabbit_mnesia.erl160
-rw-r--r--src/rabbit_msg_store.erl32
-rw-r--r--src/rabbit_node_monitor.erl7
-rw-r--r--src/rabbit_queue_index.erl16
-rw-r--r--src/rabbit_reader.erl18
-rw-r--r--src/rabbit_tests.erl83
-rw-r--r--src/rabbit_upgrade.erl18
-rw-r--r--src/supervisor2.erl29
32 files changed, 992 insertions, 346 deletions
diff --git a/Makefile b/Makefile
index d8ef058e..ee2700af 100644
--- a/Makefile
+++ b/Makefile
@@ -20,6 +20,8 @@ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml
USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML)))
+QC_MODULES := rabbit_backing_queue_qc
+QC_TRIALS ?= 100
ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes)
PYTHON=python
@@ -45,8 +47,14 @@ ifndef USE_SPECS
USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8,4]), halt().')
endif
+ifndef USE_PROPER_QC
+# PropEr needs to be installed for property checking
+# http://proper.softlab.ntua.gr/
+USE_PROPER_QC:=$(shell erl -noshell -eval 'io:format({module, proper} =:= code:ensure_loaded(proper)), halt().')
+endif
+
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
-ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(if $(filter true,$(USE_SPECS)),-Duse_specs)
+ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc)
VERSION=0.0.0
TARBALL_NAME=rabbitmq-server-$(VERSION)
@@ -69,6 +77,10 @@ define usage_dep
$(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl
endef
+define boolean_macro
+$(if $(filter true,$(1)),-D$(2))
+endef
+
ifneq "$(SBIN_DIR)" ""
ifneq "$(TARGET_DIR)" ""
SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
@@ -165,6 +177,9 @@ run-tests: all
OUT=$$(echo "rabbit_tests:all_tests()." | $(ERL_CALL)) ; \
echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null
+run-qc: all
+ $(foreach MOD,$(QC_MODULES),./quickcheck $(RABBITMQ_NODENAME) $(MOD) $(QC_TRIALS))
+
start-background-node:
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
@@ -223,7 +238,7 @@ srcdist: distclean
chmod 0755 $(TARGET_SRC_DIR)/scripts/*
(cd dist; tar -zcf $(TARBALL_NAME).tar.gz $(TARBALL_NAME))
- (cd dist; zip -r $(TARBALL_NAME).zip $(TARBALL_NAME))
+ (cd dist; zip -q -r $(TARBALL_NAME).zip $(TARBALL_NAME))
rm -rf $(TARGET_SRC_DIR)
distclean: clean
@@ -314,3 +329,4 @@ ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" ""
-include $(DEPS_FILE)
endif
+.PHONY: run-qc
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index ffc826eb..11f5f01c 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -120,6 +120,9 @@ done
rm -rf %{buildroot}
%changelog
+* Mon Jun 27 2011 simon@rabbitmq.com 2.5.1-1
+- New Upstream Release
+
* Thu Jun 9 2011 jerryk@vmware.com 2.5.0-1
- New Upstream Release
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 31979a8e..38c81134 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -19,7 +19,7 @@ all:
package: clean
cp $(TARBALL_DIR)/$(TARBALL) $(DEBIAN_ORIG_TARBALL)
- tar -zxvf $(DEBIAN_ORIG_TARBALL)
+ tar -zxf $(DEBIAN_ORIG_TARBALL)
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
# Debian and descendants differ from most other distros in that
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 1cab4235..9063a6ed 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.5.1-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Mon, 27 Jun 2011 11:21:49 +0100
+
rabbitmq-server (2.5.0-1) lucid; urgency=low
* New Upstream Release
diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile
index c4e01f4a..b5c342aa 100644
--- a/packaging/generic-unix/Makefile
+++ b/packaging/generic-unix/Makefile
@@ -4,7 +4,7 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION)
dist:
- tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
+ tar -zxf ../../dist/$(SOURCE_DIR).tar.gz
$(MAKE) -C $(SOURCE_DIR) \
TARGET_DIR=`pwd`/$(TARGET_DIR) \
diff --git a/packaging/windows-exe/Makefile b/packaging/windows-exe/Makefile
index 59803f9c..ab50e30b 100644
--- a/packaging/windows-exe/Makefile
+++ b/packaging/windows-exe/Makefile
@@ -2,7 +2,7 @@ VERSION=0.0.0
ZIP=../windows/rabbitmq-server-windows-$(VERSION)
dist: rabbitmq-$(VERSION).nsi rabbitmq_server-$(VERSION)
- makensis rabbitmq-$(VERSION).nsi
+ makensis -V2 rabbitmq-$(VERSION).nsi
rabbitmq-$(VERSION).nsi: rabbitmq_nsi.in
sed \
@@ -10,7 +10,7 @@ rabbitmq-$(VERSION).nsi: rabbitmq_nsi.in
$< > $@
rabbitmq_server-$(VERSION):
- unzip $(ZIP)
+ unzip -q $(ZIP)
clean:
rm -rf rabbitmq-*.nsi rabbitmq_server-* rabbitmq-server-*.exe
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
index 1ed4064e..27e4e1dc 100644
--- a/packaging/windows-exe/rabbitmq_nsi.in
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -113,17 +113,17 @@ Section "Start Menu" RabbitStartMenu
CreateDirectory "$APPDATA\RabbitMQ\db"
CreateDirectory "$SMPROGRAMS\RabbitMQ Server"
- CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Uninstall.lnk" "$INSTDIR\uninstall.exe" "" "$INSTDIR\uninstall.exe" 0
- CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Plugins Directory.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\plugins"
- CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Log Directory.lnk" "$APPDATA\RabbitMQ\log"
- CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Database Directory.lnk" "$APPDATA\RabbitMQ\db"
- CreateShortCut "$SMPROGRAMS\RabbitMQ Server\(Re)Install Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "install" "$INSTDIR\rabbitmq.ico"
- CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Remove Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "remove" "$INSTDIR\rabbitmq.ico"
- CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Start Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico"
- CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Stop Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Uninstall RabbitMQ.lnk" "$INSTDIR\uninstall.exe" "" "$INSTDIR\uninstall.exe" 0
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Plugins.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\plugins"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Logs.lnk" "$APPDATA\RabbitMQ\log"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Database Directory.lnk" "$APPDATA\RabbitMQ\db"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - (re)install.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "install" "$INSTDIR\rabbitmq.ico"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - remove.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "remove" "$INSTDIR\rabbitmq.ico"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - start.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - stop.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico"
SetOutPath "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin"
- CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Command Prompt (sbin dir).lnk" "$WINDIR\system32\cmd.exe" "" "$WINDIR\system32\cmd.exe"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Command Prompt (sbin dir).lnk" "$WINDIR\system32\cmd.exe" "" "$WINDIR\system32\cmd.exe"
SetOutPath $INSTDIR
SectionEnd
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index dacfa620..a0be8d89 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -4,7 +4,7 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_ZIP=rabbitmq-server-windows-$(VERSION)
dist:
- tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
+ tar -zxf ../../dist/$(SOURCE_DIR).tar.gz
$(MAKE) -C $(SOURCE_DIR)
mkdir $(SOURCE_DIR)/sbin
@@ -24,7 +24,7 @@ dist:
elinks -dump -no-references -no-numbering rabbitmq-service.html \
> $(TARGET_DIR)/readme-service.txt
todos $(TARGET_DIR)/readme-service.txt
- zip -r $(TARGET_ZIP).zip $(TARGET_DIR)
+ zip -q -r $(TARGET_ZIP).zip $(TARGET_DIR)
rm -rf $(TARGET_DIR) rabbitmq-service.html
clean: clean_partial
diff --git a/quickcheck b/quickcheck
new file mode 100755
index 00000000..a36cf3ed
--- /dev/null
+++ b/quickcheck
@@ -0,0 +1,36 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+%%! -sname quickcheck
+-mode(compile).
+
+%% A helper to test quickcheck properties on a running broker
+%% NodeStr is a local broker node name
+%% ModStr is the module containing quickcheck properties
+%% The number of trials is optional
+main([NodeStr, ModStr | TrialsStr]) ->
+ {ok, Hostname} = inet:gethostname(),
+ Node = list_to_atom(NodeStr ++ "@" ++ Hostname),
+ Mod = list_to_atom(ModStr),
+ Trials = lists:map(fun erlang:list_to_integer/1, TrialsStr),
+ case rpc:call(Node, code, ensure_loaded, [proper]) of
+ {module, proper} ->
+ case rpc:call(Node, proper, module, [Mod] ++ Trials) of
+ [] -> ok;
+ _ -> quit(1)
+ end;
+ {badrpc, Reason} ->
+ io:format("Could not contact node ~p: ~p.~n", [Node, Reason]),
+ quit(2);
+ {error,nofile} ->
+ io:format("Module PropEr was not found on node ~p~n", [Node]),
+ quit(2)
+ end;
+main([]) ->
+ io:format("This script requires a node name and a module.~n").
+
+quit(Status) ->
+ case os:type() of
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status)
+ end.
+
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 61b08d49..235e14c0 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -925,10 +925,10 @@ handle_cast({transfer, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
{noreply, process_pending(
update_counts(obtain, ToPid, +1,
- update_counts(obtain, FromPid, -1, State)))};
+ update_counts(obtain, FromPid, -1, State)))}.
-handle_cast(check_counts, State) ->
- {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}.
+handle_info(check_counts, State) ->
+ {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #fhc_state { elders = Elders,
@@ -1133,9 +1133,9 @@ reduce(State = #fhc_state { open_pending = OpenPending,
end
end,
case TRef of
- undefined -> {ok, TRef1} = timer:apply_after(
- ?FILE_HANDLES_CHECK_INTERVAL,
- gen_server, cast, [?SERVER, check_counts]),
+ undefined -> TRef1 = erlang:send_after(
+ ?FILE_HANDLES_CHECK_INTERVAL, ?SERVER,
+ check_counts),
State #fhc_state { timer_ref = TRef1 };
_ -> State
end.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 43e0a8f5..35258139 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -67,6 +67,11 @@
%% module. Note there is no form also encompassing a reply, thus if
%% you wish to reply in handle_call/3 and change the callback module,
%% you need to use gen_server2:reply/2 to issue the reply manually.
+%%
+%% 8) The callback module can optionally implement
+%% format_message_queue/2 which is the equivalent of format_status/2
+%% but where the second argument is specifically the priority_queue
+%% which contains the prioritised message_queue.
%% All modifications are (C) 2009-2011 VMware, Inc.
@@ -593,41 +598,35 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
CurrentTO1 = Base + Extra,
{backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
-in({'$gen_cast', Msg}, GS2State = #gs2_state { prioritise_cast = PC,
- queue = Queue }) ->
- GS2State #gs2_state { queue = priority_queue:in(
- {'$gen_cast', Msg},
- PC(Msg, GS2State), Queue) };
-in({'$gen_call', From, Msg}, GS2State = #gs2_state { prioritise_call = PC,
- queue = Queue }) ->
- GS2State #gs2_state { queue = priority_queue:in(
- {'$gen_call', From, Msg},
- PC(Msg, From, GS2State), Queue) };
-in(Input, GS2State = #gs2_state { prioritise_info = PI, queue = Queue }) ->
- GS2State #gs2_state { queue = priority_queue:in(
- Input, PI(Input, GS2State), Queue) }.
-
-process_msg(Msg,
- GS2State = #gs2_state { parent = Parent,
- name = Name,
- debug = Debug }) ->
- case Msg of
- {system, From, Req} ->
- sys:handle_system_msg(
- Req, From, Parent, ?MODULE, Debug,
- GS2State);
- %% gen_server puts Hib on the end as the 7th arg, but that
- %% version of the function seems not to be documented so
- %% leaving out for now.
- {'EXIT', Parent, Reason} ->
- terminate(Reason, Msg, GS2State);
- _Msg when Debug =:= [] ->
- handle_msg(Msg, GS2State);
- _Msg ->
- Debug1 = sys:handle_debug(Debug, fun print_event/3,
- Name, {in, Msg}),
- handle_msg(Msg, GS2State #gs2_state { debug = Debug1 })
- end.
+in({'$gen_cast', Msg} = Input,
+ GS2State = #gs2_state { prioritise_cast = PC }) ->
+ in(Input, PC(Msg, GS2State), GS2State);
+in({'$gen_call', From, Msg} = Input,
+ GS2State = #gs2_state { prioritise_call = PC }) ->
+ in(Input, PC(Msg, From, GS2State), GS2State);
+in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) ->
+ in(Input, infinity, GS2State);
+in({system, _From, _Req} = Input, GS2State) ->
+ in(Input, infinity, GS2State);
+in(Input, GS2State = #gs2_state { prioritise_info = PI }) ->
+ in(Input, PI(Input, GS2State), GS2State).
+
+in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
+ GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }.
+
+process_msg({system, From, Req},
+ GS2State = #gs2_state { parent = Parent, debug = Debug }) ->
+ sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State);
+process_msg({'EXIT', Parent, Reason} = Msg,
+ GS2State = #gs2_state { parent = Parent }) ->
+ %% gen_server puts Hib on the end as the 7th arg, but that version
+ %% of the fun seems not to be documented so leaving out for now.
+ terminate(Reason, Msg, GS2State);
+process_msg(Msg, GS2State = #gs2_state { debug = [] }) ->
+ handle_msg(Msg, GS2State);
+process_msg(Msg, GS2State = #gs2_state { name = Name, debug = Debug }) ->
+ Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}),
+ handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }).
%%% ---------------------------------------------------
%%% Send/recive functions
@@ -1161,17 +1160,22 @@ format_status(Opt, StatusData) ->
end,
Header = lists:concat(["Status for generic server ", NameTag]),
Log = sys:get_debug(log, Debug, []),
- Specfic =
- case erlang:function_exported(Mod, format_status, 2) of
- true -> case catch Mod:format_status(Opt, [PDict, State]) of
- {'EXIT', _} -> [{data, [{"State", State}]}];
- Else -> Else
- end;
- _ -> [{data, [{"State", State}]}]
- end,
+ Specfic = callback(Mod, format_status, [Opt, [PDict, State]],
+ fun () -> [{data, [{"State", State}]}] end),
+ Messages = callback(Mod, format_message_queue, [Opt, Queue],
+ fun () -> priority_queue:to_list(Queue) end),
[{header, Header},
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log},
- {"Queued messages", priority_queue:to_list(Queue)}]} |
+ {"Queued messages", Messages}]} |
Specfic].
+
+callback(Mod, FunName, Args, DefaultThunk) ->
+ case erlang:function_exported(Mod, FunName, length(Args)) of
+ true -> case catch apply(Mod, FunName, Args) of
+ {'EXIT', _} -> DefaultThunk();
+ Success -> Success
+ end;
+ false -> DefaultThunk()
+ end.
diff --git a/src/gm.erl b/src/gm.erl
index 8b7dc70c..8b4d2776 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -376,11 +376,11 @@
confirmed_broadcast/2, group_members/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, prioritise_cast/2, prioritise_info/2]).
+ code_change/3, prioritise_info/2]).
-export([behaviour_info/1]).
--export([table_definitions/0, flush/1]).
+-export([table_definitions/0]).
-define(GROUP_TABLE, gm_group).
-define(HIBERNATE_AFTER_MIN, 1000).
@@ -511,9 +511,6 @@ confirmed_broadcast(Server, Msg) ->
group_members(Server) ->
gen_server2:call(Server, group_members, infinity).
-flush(Server) ->
- gen_server2:cast(Server, flush).
-
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
@@ -629,12 +626,12 @@ handle_cast(join, State = #state { self = Self,
{Module:joined(Args, all_known_members(View)), State1});
handle_cast(leave, State) ->
- {stop, normal, State};
+ {stop, normal, State}.
-handle_cast(flush, State) ->
- noreply(
- flush_broadcast_buffer(State #state { broadcast_timer = undefined })).
+handle_info(flush, State) ->
+ noreply(
+ flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
handle_info({'DOWN', MRef, process, _Pid, _Reason},
State = #state { self = Self,
@@ -684,9 +681,7 @@ terminate(Reason, State = #state { module = Module,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-prioritise_cast(flush, _State) -> 1;
-prioritise_cast(_ , _State) -> 0.
-
+prioritise_info(flush, _State) -> 1;
prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
prioritise_info(_ , _State) -> 0.
@@ -808,10 +803,10 @@ ensure_broadcast_timer(State = #state { broadcast_buffer = [],
State;
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = TRef }) ->
- timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State #state { broadcast_timer = undefined };
ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
- {ok, TRef} = timer:apply_after(?BROADCAST_TIMER, ?MODULE, flush, [self()]),
+ TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush),
State #state { broadcast_timer = TRef };
ensure_broadcast_timer(State) ->
State.
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 4a94b24b..4fc8b469 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -47,7 +47,10 @@
-ifdef(use_specs).
--type(priority() :: integer()).
+-export_type([q/0]).
+
+-type(q() :: pqueue()).
+-type(priority() :: integer() | 'infinity').
-type(squeue() :: {queue, [any()], [any()]}).
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
@@ -71,8 +74,9 @@ new() ->
is_queue({queue, R, F}) when is_list(R), is_list(F) ->
true;
is_queue({pqueue, Queues}) when is_list(Queues) ->
- lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end,
- Queues);
+ lists:all(fun ({infinity, Q}) -> is_queue(Q);
+ ({P, Q}) -> is_integer(P) andalso is_queue(Q)
+ end, Queues);
is_queue(_) ->
false.
@@ -89,7 +93,8 @@ len({pqueue, Queues}) ->
to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
[{0, V} || V <- Out ++ lists:reverse(In, [])];
to_list({pqueue, Queues}) ->
- [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)].
+ [{maybe_negate_priority(P), V} || {P, Q} <- Queues,
+ {0, V} <- to_list(Q)].
in(Item, Q) ->
in(Item, 0, Q).
@@ -103,12 +108,20 @@ in(X, Priority, _Q = {queue, [], []}) ->
in(X, Priority, Q = {queue, _, _}) ->
in(X, Priority, {pqueue, [{0, Q}]});
in(X, Priority, {pqueue, Queues}) ->
- P = -Priority,
+ P = maybe_negate_priority(Priority),
{pqueue, case lists:keysearch(P, 1, Queues) of
{value, {_, Q}} ->
lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
+ false when P == infinity ->
+ [{P, {queue, [X], []}} | Queues];
false ->
- lists:keysort(1, [{P, {queue, [X], []}} | Queues])
+ case Queues of
+ [{infinity, InfQueue} | Queues1] ->
+ [{infinity, InfQueue} |
+ lists:keysort(1, [{P, {queue, [X], []}} | Queues1])];
+ _ ->
+ lists:keysort(1, [{P, {queue, [X], []}} | Queues])
+ end
end}.
out({queue, [], []} = Q) ->
@@ -141,7 +154,8 @@ join({queue, [], []}, B) ->
join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
{queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
join(A = {queue, _, _}, {pqueue, BPQ}) ->
- {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ),
+ {Pre, Post} =
+ lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ),
Post1 = case Post of
[] -> [ {0, A} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
@@ -149,7 +163,8 @@ join(A = {queue, _, _}, {pqueue, BPQ}) ->
end,
{pqueue, Pre ++ Post1};
join({pqueue, APQ}, B = {queue, _, _}) ->
- {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ),
+ {Pre, Post} =
+ lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ),
Post1 = case Post of
[] -> [ {0, B} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
@@ -165,7 +180,7 @@ merge(APQ, [], Acc) ->
lists:reverse(Acc, APQ);
merge([{P, A}|As], [{P, B}|Bs], Acc) ->
merge(As, Bs, [ {P, join(A, B)} | Acc ]);
-merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB ->
+merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
merge(As, Bs, [ {PA, A} | Acc ]);
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
@@ -174,3 +189,6 @@ r2f([]) -> {queue, [], []};
r2f([_] = R) -> {queue, [], R};
r2f([X,Y]) -> {queue, [X], [Y]};
r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}.
+
+maybe_negate_priority(infinity) -> infinity;
+maybe_negate_priority(P) -> -P.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 6ef816c0..46f7d9d1 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -134,6 +134,18 @@
{requires, empty_db_check},
{enables, routing_ready}]}).
+-rabbit_boot_step({mirror_queue_slave_sup,
+ [{description, "mirror queue slave sup"},
+ {mfa, {rabbit_mirror_queue_slave_sup, start, []}},
+ {requires, recovery},
+ {enables, routing_ready}]}).
+
+-rabbit_boot_step({mirrored_queues,
+ [{description, "adding mirrors to queues"},
+ {mfa, {rabbit_mirror_queue_misc, on_node_up, []}},
+ {requires, mirror_queue_slave_sup},
+ {enables, routing_ready}]}).
+
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"},
{requires, core_initialized}]}).
@@ -175,7 +187,7 @@
-spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(stop_and_halt/0 :: () -> 'ok').
+-spec(stop_and_halt/0 :: () -> no_return()).
-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
-spec(status/0 ::
() -> [{pid, integer()} |
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2d5b696a..84d8cdcb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,9 +32,7 @@
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
- sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
- set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
- emit_stats/1]).
+ set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -100,7 +98,6 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
--spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
@@ -143,11 +140,8 @@
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
--spec(sync_timeout/1 :: (pid()) -> 'ok').
--spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
--spec(maybe_expire/1 :: (pid()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
@@ -231,7 +225,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
end).
store_queue(Q = #amqqueue{durable = true}) ->
- ok = mnesia:write(rabbit_durable_queue, Q, write),
+ ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write),
ok = mnesia:write(rabbit_queue, Q, write),
ok;
store_queue(Q = #amqqueue{durable = false}) ->
@@ -406,9 +400,6 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) ->
delegate_call(QPid, stat).
-emit_stats(#amqqueue{pid = QPid}) ->
- delegate_cast(QPid, emit_stats).
-
delete_immediately(#amqqueue{ pid = QPid }) ->
gen_server2:cast(QPid, delete_immediately).
@@ -487,24 +478,12 @@ internal_delete(QueueName) ->
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
-sync_timeout(QPid) ->
- gen_server2:cast(QPid, sync_timeout).
-
-update_ram_duration(QPid) ->
- gen_server2:cast(QPid, update_ram_duration).
-
set_ram_duration_target(QPid, Duration) ->
gen_server2:cast(QPid, {set_ram_duration_target, Duration}).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-maybe_expire(QPid) ->
- gen_server2:cast(QPid, maybe_expire).
-
-drop_expired(QPid) ->
- gen_server2:cast(QPid, drop_expired).
-
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) ||
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ae677ab4..c9dc6c8f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -31,7 +31,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2]).
+ prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
-export([init_with_backing_queue_state/7]).
@@ -249,8 +249,7 @@ backing_queue_module(#amqqueue{arguments = Args}) ->
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
+ TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -258,14 +257,12 @@ ensure_sync_timer(State) ->
stop_sync_timer(State = #q{sync_timer_ref = undefined}) ->
State;
stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State#q{sync_timer_ref = undefined}.
ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(
- ?RAM_DURATION_UPDATE_INTERVAL,
- rabbit_amqqueue, update_ram_duration,
- [self()]),
+ TRef = erlang:send_after(
+ ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration),
State#q{rate_timer_ref = TRef};
ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
State#q{rate_timer_ref = undefined};
@@ -277,13 +274,13 @@ stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
State#q{rate_timer_ref = undefined};
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State#q{rate_timer_ref = undefined}.
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State#q{expiry_timer_ref = undefined}.
%% We wish to expire only when there are no consumers *and* the expiry
@@ -295,18 +292,16 @@ ensure_expiry_timer(State = #q{expires = Expires}) ->
case is_unused(State) of
true ->
NewState = stop_expiry_timer(State),
- {ok, TRef} = timer:apply_after(
- Expires, rabbit_amqqueue, maybe_expire, [self()]),
+ TRef = erlang:send_after(Expires, self(), maybe_expire),
NewState#q{expiry_timer_ref = TRef};
false ->
State
end.
ensure_stats_timer(State = #q{stats_timer = StatsTimer,
- q = Q}) ->
+ q = #amqqueue{pid = QPid}}) ->
State#q{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer,
- fun() -> rabbit_amqqueue:emit_stats(Q) end)}.
+ StatsTimer, QPid, emit_stats)}.
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -700,8 +695,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
when TTL =/= undefined ->
case BQ:is_empty(BQS) of
true -> State;
- false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired,
- [self()]),
+ false -> TRef = erlang:send_after(TTL, self(), drop_expired),
State#q{ttl_timer_ref = TRef}
end;
ensure_ttl_timer(State) ->
@@ -792,25 +786,27 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
{ack, _AckTags, _ChPid} -> 7;
{reject, _AckTags, _Requeue, _ChPid} -> 7;
{notify_sent, _ChPid} -> 7;
{unblock, _ChPid} -> 7;
{run_backing_queue, _Mod, _Fun} -> 6;
- sync_timeout -> 6;
_ -> 0
end.
-prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
- #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8;
-prioritise_info(_Msg, _State) -> 0.
+prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+ case Msg of
+ {'DOWN', _, process, DownPid, _} -> 8;
+ update_ram_duration -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ sync_timeout -> 6;
+ _ -> 0
+ end.
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -1016,9 +1012,6 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast(sync_timeout, State) ->
- noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined}));
-
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
noreply(deliver_or_enqueue(Delivery, State));
@@ -1089,15 +1082,6 @@ handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
noreply(State);
-handle_cast(update_ram_duration, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State#q{rate_timer_ref = just_measured,
- backing_queue_state = BQS2});
-
handle_cast({set_ram_duration_target, Duration},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
@@ -1105,24 +1089,24 @@ handle_cast({set_ram_duration_target, Duration},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State);
+ noreply(State).
-handle_cast(maybe_expire, State) ->
+handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
{stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
end;
-handle_cast(drop_expired, State) ->
+handle_info(drop_expired, State) ->
noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
-handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) ->
+handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) ->
%% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
assert_invariant(State1),
- {noreply, State1, hibernate}.
+ {noreply, State1, hibernate};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -1139,6 +1123,18 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
+handle_info(update_ram_duration, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ noreply(State#q{rate_timer_ref = just_measured,
+ backing_queue_state = BQS2});
+
+handle_info(sync_timeout, State) ->
+ noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined}));
+
handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
@@ -1159,10 +1155,11 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
BQS3 = BQ:handle_pre_hibernate(BQS2),
- rabbit_event:if_enabled(StatsTimer,
- fun () ->
- emit_stats(State, [{idle_since, now()}])
- end),
+ rabbit_event:if_enabled(
+ StatsTimer,
+ fun () -> emit_stats(State, [{idle_since, now()}]) end),
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
backing_queue_state = BQS3},
{hibernate, stop_rate_timer(State1)}.
+
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
new file mode 100644
index 00000000..22691ef9
--- /dev/null
+++ b/src/rabbit_backing_queue_qc.erl
@@ -0,0 +1,392 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_backing_queue_qc).
+-ifdef(use_proper_qc).
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+-include_lib("proper/include/proper.hrl").
+
+-behaviour(proper_statem).
+
+-define(BQMOD, rabbit_variable_queue).
+-define(QUEUE_MAXLEN, 10000).
+-define(TIMEOUT_LIMIT, 100).
+
+-define(RECORD_INDEX(Key, Record),
+ proplists:get_value(Key, lists:zip(
+ record_info(fields, Record), lists:seq(2, record_info(size, Record))))).
+
+-export([initial_state/0, command/1, precondition/2, postcondition/3,
+ next_state/3]).
+
+-export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]).
+
+-record(state, {bqstate,
+ len, %% int
+ messages, %% queue of {msg_props, basic_msg}
+ acks, %% dict of acktag => {msg_props, basic_msg}
+ confirms}). %% set of msgid
+
+%% Initialise model
+
+initial_state() ->
+ #state{bqstate = qc_variable_queue_init(qc_test_queue()),
+ len = 0,
+ messages = queue:new(),
+ acks = orddict:new(),
+ confirms = gb_sets:new()}.
+
+%% Property
+
+prop_backing_queue_test() ->
+ ?FORALL(Cmds, commands(?MODULE, initial_state()),
+ backing_queue_test(Cmds)).
+
+backing_queue_test(Cmds) ->
+ {ok, FileSizeLimit} =
+ application:get_env(rabbit, msg_store_file_size_limit),
+ application:set_env(rabbit, msg_store_file_size_limit, 512,
+ infinity),
+ {ok, MaxJournal} =
+ application:get_env(rabbit, queue_index_max_journal_entries),
+ application:set_env(rabbit, queue_index_max_journal_entries, 128,
+ infinity),
+
+ {_H, #state{bqstate = BQ}, Res} = run_commands(?MODULE, Cmds),
+
+ application:set_env(rabbit, msg_store_file_size_limit,
+ FileSizeLimit, infinity),
+ application:set_env(rabbit, queue_index_max_journal_entries,
+ MaxJournal, infinity),
+
+ ?BQMOD:delete_and_terminate(shutdown, BQ),
+ ?WHENFAIL(
+ io:format("Result: ~p~n", [Res]),
+ aggregate(command_names(Cmds), Res =:= ok)).
+
+%% Commands
+
+%% Command frequencies are tuned so that queues are normally reasonably
+%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple
+%% and purging cause extreme queue lengths, so these have lower probabilities.
+%% Fetches are sufficiently frequent so that commands that need acktags
+%% get decent coverage.
+
+command(S) ->
+ frequency([{10, qc_publish(S)},
+ {1, qc_publish_delivered(S)},
+ {1, qc_publish_multiple(S)}, %% very slow
+ {15, qc_fetch(S)}, %% needed for ack and requeue
+ {15, qc_ack(S)},
+ {15, qc_requeue(S)},
+ {3, qc_set_ram_duration_target(S)},
+ {1, qc_ram_duration(S)},
+ {1, qc_drain_confirmed(S)},
+ {1, qc_dropwhile(S)},
+ {1, qc_is_empty(S)},
+ {1, qc_timeout(S)},
+ {1, qc_purge(S)}]).
+
+qc_publish(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, publish,
+ [qc_message(),
+ #message_properties{needs_confirming = frequency([{1, true},
+ {20, false}]),
+ expiry = oneof([undefined | lists:seq(1, 10)])},
+ self(), BQ]}.
+
+qc_publish_multiple(#state{bqstate = BQ}) ->
+ {call, ?MODULE, publish_multiple,
+ [qc_message(), #message_properties{}, BQ,
+ resize(?QUEUE_MAXLEN, pos_integer())]}.
+
+qc_publish_delivered(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, publish_delivered,
+ [boolean(), qc_message(), #message_properties{}, self(), BQ]}.
+
+qc_fetch(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, fetch, [boolean(), BQ]}.
+
+qc_ack(#state{bqstate = BQ, acks = Acks}) ->
+ {call, ?BQMOD, ack, [rand_choice(orddict:fetch_keys(Acks)), BQ]}.
+
+qc_requeue(#state{bqstate = BQ, acks = Acks}) ->
+ {call, ?BQMOD, requeue,
+ [rand_choice(orddict:fetch_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
+
+qc_set_ram_duration_target(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, set_ram_duration_target,
+ [oneof([0, 1, 2, resize(1000, pos_integer()), infinity]), BQ]}.
+
+qc_ram_duration(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, ram_duration, [BQ]}.
+
+qc_drain_confirmed(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, drain_confirmed, [BQ]}.
+
+qc_dropwhile(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
+
+qc_is_empty(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, is_empty, [BQ]}.
+
+qc_timeout(#state{bqstate = BQ}) ->
+ {call, ?MODULE, timeout, [BQ, ?TIMEOUT_LIMIT]}.
+
+qc_purge(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, purge, [BQ]}.
+
+%% Preconditions
+
+precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg})
+ when Fun =:= ack; Fun =:= requeue ->
+ orddict:size(Acks) > 0;
+precondition(#state{messages = Messages},
+ {call, ?BQMOD, publish_delivered, _Arg}) ->
+ queue:is_empty(Messages);
+precondition(_S, {call, ?BQMOD, _Fun, _Arg}) ->
+ true;
+precondition(_S, {call, ?MODULE, timeout, _Arg}) ->
+ true;
+precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) ->
+ Len < ?QUEUE_MAXLEN.
+
+%% Model updates
+
+next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
+ #state{len = Len, messages = Messages, confirms = Confirms} = S,
+ MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
+ NeedsConfirm =
+ {call, erlang, element,
+ [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
+ S#state{bqstate = BQ,
+ len = Len + 1,
+ messages = queue:in({MsgProps, Msg}, Messages),
+ confirms = case eval(NeedsConfirm) of
+ true -> gb_sets:add(MsgId, Confirms);
+ _ -> Confirms
+ end};
+
+next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) ->
+ #state{len = Len, messages = Messages} = S,
+ Messages1 = repeat(Messages, fun(Msgs) ->
+ queue:in({MsgProps, Msg}, Msgs)
+ end, Count),
+ S#state{bqstate = BQ,
+ len = Len + Count,
+ messages = Messages1};
+
+next_state(S, Res,
+ {call, ?BQMOD, publish_delivered,
+ [AckReq, Msg, MsgProps, _Pid, _BQ]}) ->
+ #state{confirms = Confirms, acks = Acks} = S,
+ AckTag = {call, erlang, element, [1, Res]},
+ BQ1 = {call, erlang, element, [2, Res]},
+ MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
+ NeedsConfirm =
+ {call, erlang, element,
+ [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
+ S#state{bqstate = BQ1,
+ confirms = case eval(NeedsConfirm) of
+ true -> gb_sets:add(MsgId, Confirms);
+ _ -> Confirms
+ end,
+ acks = case AckReq of
+ true -> orddict:append(AckTag, {MsgProps, Msg}, Acks);
+ false -> Acks
+ end
+ };
+
+next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
+ #state{len = Len, messages = Messages, acks = Acks} = S,
+ ResultInfo = {call, erlang, element, [1, Res]},
+ BQ1 = {call, erlang, element, [2, Res]},
+ AckTag = {call, erlang, element, [3, ResultInfo]},
+ S1 = S#state{bqstate = BQ1},
+ case queue:out(Messages) of
+ {empty, _M2} ->
+ S1;
+ {{value, MsgProp_Msg}, M2} ->
+ S2 = S1#state{len = Len - 1, messages = M2},
+ case AckReq of
+ true ->
+ S2#state{acks = orddict:append(AckTag, MsgProp_Msg, Acks)};
+ false ->
+ S2
+ end
+ end;
+
+next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
+ #state{acks = AcksState} = S,
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1,
+ acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)};
+
+next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) ->
+ #state{len = Len, messages = Messages, acks = AcksState} = S,
+ BQ1 = {call, erlang, element, [2, Res]},
+ RequeueMsgs = lists:append([orddict:fetch(Key, AcksState) ||
+ Key <- AcksArg]),
+ S#state{bqstate = BQ1,
+ len = Len + length(RequeueMsgs),
+ messages = queue:join(Messages, queue:from_list(RequeueMsgs)),
+ acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)};
+
+next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) ->
+ S#state{bqstate = BQ};
+
+next_state(S, Res, {call, ?BQMOD, ram_duration, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1};
+
+next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1};
+
+next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) ->
+ #state{messages = Messages} = S,
+ Messages1 = drop_messages(Messages),
+ S#state{bqstate = BQ1, len = queue:len(Messages1), messages = Messages1};
+
+next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) ->
+ S;
+
+next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
+ S#state{bqstate = BQ};
+
+next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1, len = 0, messages = queue:new()}.
+
+%% Postconditions
+
+postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
+ #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
+ case Res of
+ {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
+ {_MsgProps, Msg} = queue:head(Messages),
+ MsgFetched =:= Msg andalso
+ not orddict:is_key(AckTag, Acks) andalso
+ not gb_sets:is_element(AckTag, Confrms) andalso
+ RemainingLen =:= Len - 1;
+ {empty, _BQ} ->
+ Len =:= 0
+ end;
+
+postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) ->
+ #state{acks = Acks, confirms = Confrms} = S,
+ not orddict:is_key(AckTag, Acks) andalso
+ not gb_sets:is_element(AckTag, Confrms);
+
+postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) ->
+ {PurgeCount, _BQ} = Res,
+ Len =:= PurgeCount;
+
+postcondition(#state{len = Len},
+ {call, ?BQMOD, is_empty, _Args}, Res) ->
+ (Len =:= 0) =:= Res;
+
+postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
+ #state{confirms = Confirms} = S,
+ {ReportedConfirmed, _BQ} = Res,
+ lists:all(fun (M) ->
+ gb_sets:is_element(M, Confirms)
+ end, ReportedConfirmed);
+
+postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
+ ?BQMOD:len(BQ) =:= Len.
+
+%% Helpers
+
+repeat(Result, _Fun, 0) ->
+ Result;
+repeat(Result, Fun, Times) ->
+ repeat(Fun(Result), Fun, Times - 1).
+
+publish_multiple(Msg, MsgProps, BQ, Count) ->
+ repeat(BQ, fun(BQ1) ->
+ ?BQMOD:publish(Msg, MsgProps, self(), BQ1)
+ end, Count).
+
+timeout(BQ, 0) ->
+ BQ;
+timeout(BQ, AtMost) ->
+ case ?BQMOD:needs_timeout(BQ) of
+ false -> BQ;
+ _ -> timeout(?BQMOD:timeout(BQ), AtMost - 1)
+ end.
+
+qc_message_payload() ->
+ ?SIZED(Size, resize(Size * Size, binary())).
+
+qc_routing_key() ->
+ noshrink(binary(10)).
+
+qc_delivery_mode() ->
+ oneof([1, 2]).
+
+qc_message() ->
+ qc_message(qc_delivery_mode()).
+
+qc_message(DeliveryMode) ->
+ {call, rabbit_basic, message, [
+ qc_default_exchange(),
+ qc_routing_key(),
+ #'P_basic'{delivery_mode = DeliveryMode},
+ qc_message_payload()]}.
+
+qc_default_exchange() ->
+ {call, rabbit_misc, r, [<<>>, exchange, <<>>]}.
+
+qc_variable_queue_init(Q) ->
+ {call, ?BQMOD, init,
+ [Q, false, function(2, ok)]}.
+
+qc_test_q() ->
+ {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}.
+
+qc_test_queue() ->
+ qc_test_queue(boolean()).
+
+qc_test_queue(Durable) ->
+ #amqqueue{name = qc_test_q(),
+ durable = Durable,
+ auto_delete = false,
+ arguments = [],
+ pid = self()}.
+
+rand_choice([]) -> [];
+rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)].
+
+dropfun(Props) ->
+ Expiry = eval({call, erlang, element,
+ [?RECORD_INDEX(expiry, message_properties), Props]}),
+ Expiry =/= 1.
+
+drop_messages(Messages) ->
+ case queue:out(Messages) of
+ {empty, _} ->
+ Messages;
+ {{value, MsgProps_Msg}, M2} ->
+ MsgProps = {call, erlang, element, [1, MsgProps_Msg]},
+ case dropfun(MsgProps) of
+ true -> drop_messages(M2);
+ false -> Messages
+ end
+ end.
+
+-endif.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2829d397..35d77c45 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -23,11 +23,11 @@
-export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([refresh_config_all/0, emit_stats/1, ready_for_close/1]).
+-export([refresh_config_all/0, ready_for_close/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2]).
+ prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter, tx_status, next_tag,
@@ -90,7 +90,6 @@
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(refresh_config_all/0 :: () -> 'ok').
--spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(ready_for_close/1 :: (pid()) -> 'ok').
-endif.
@@ -152,9 +151,6 @@ refresh_config_all() ->
fun (C) -> gen_server2:call(C, refresh_config) end, list()),
ok.
-emit_stats(Pid) ->
- gen_server2:cast(Pid, emit_stats).
-
ready_for_close(Pid) ->
gen_server2:cast(Pid, ready_for_close).
@@ -194,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
trace_state = rabbit_trace:init(VHost)},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
- fun() -> internal_emit_stats(State) end),
+ fun() -> emit_stats(State) end),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -207,11 +203,16 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- emit_stats -> 7;
{confirm, _MsgSeqNos, _QPid} -> 5;
_ -> 0
end.
+prioritise_info(Msg, _State) ->
+ case Msg of
+ emit_stats -> 7;
+ _ -> 0
+ end.
+
handle_call(flush, _From, State) ->
reply(ok, State);
@@ -293,11 +294,6 @@ handle_cast({deliver, ConsumerTag, AckRequired,
rabbit_trace:tap_trace_out(Msg, TraceState),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
- internal_emit_stats(State),
- noreply([ensure_stats_timer],
- State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
-
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
@@ -305,6 +301,11 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
+handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
+ emit_stats(State),
+ noreply([ensure_stats_timer],
+ State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
+
handle_info({'DOWN', MRef, process, QPid, Reason},
State = #ch{consumer_monitors = ConsumerMonitors}) ->
noreply(
@@ -320,11 +321,8 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- rabbit_event:if_enabled(StatsTimer,
- fun () ->
- internal_emit_stats(
- State, [{idle_since, now()}])
- end),
+ rabbit_event:if_enabled(
+ StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end),
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
{hibernate, State#ch{stats_timer = StatsTimer1}}.
@@ -342,6 +340,8 @@ terminate(Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> reply(Reply, [], NewState).
@@ -366,8 +366,7 @@ next_state(Mask, State) ->
ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
ChPid = self(),
State#ch{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer,
- fun() -> emit_stats(ChPid) end)}.
+ StatsTimer, ChPid, emit_stats)}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -1497,10 +1496,10 @@ update_measures(Type, QX, Inc, Measure) ->
put({Type, QX},
orddict:store(Measure, Cur + Inc, Measures)).
-internal_emit_stats(State) ->
- internal_emit_stats(State, []).
+emit_stats(State) ->
+ emit_stats(State, []).
-internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
+emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
CoarseStats = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(StatsTimer) of
coarse ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6eb1aaba..e8afed0c 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -17,7 +17,7 @@
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, diagnostics/1]).
+-export([start/0, stop/0, action/5, diagnostics/1, log_action/3]).
-define(RPC_TIMEOUT, infinity).
-define(WAIT_FOR_VM_ATTEMPTS, 5).
@@ -51,6 +51,7 @@
-> 'ok').
-spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]).
-spec(usage/0 :: () -> no_return()).
+-spec(log_action/3 :: (node(), string(), [term()]) -> ok).
-endif.
@@ -73,6 +74,7 @@ start() ->
Command = list_to_atom(Command0),
Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
Node = proplists:get_value(?NODE_OPT, Opts1),
+ rpc_call(Node, rabbit_control, log_action, [node(), Command0, Args]),
Inform = case Quiet of
true -> fun (_Format, _Args1) -> ok end;
false -> fun (Format, Args1) ->
@@ -474,3 +476,22 @@ quit(Status) ->
{unix, _} -> halt(Status);
{win32, _} -> init:stop(Status)
end.
+
+log_action(Node, Command, Args) ->
+ rabbit_misc:with_local_io(
+ fun () ->
+ error_logger:info_msg("~p executing~n rabbitmqctl ~s ~s~n",
+ [Node, Command,
+ format_args(mask_args(Command, Args))])
+ end).
+
+%% Mask passwords and other sensitive info before logging.
+mask_args("add_user", [Name, _Password | Args]) ->
+ [Name, "****" | Args];
+mask_args("change_password", [Name, _Password | Args]) ->
+ [Name, "****" | Args];
+mask_args(_, Args) ->
+ Args.
+
+format_args(Args) ->
+ string:join([io_lib:format("~p", [A]) || A <- Args], " ").
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 468f9293..bb765566 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -19,7 +19,7 @@
-include("rabbit.hrl").
-export([start_link/0]).
--export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]).
+-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/1]).
-export([reset_stats_timer/1]).
-export([stats_level/1, if_enabled/2]).
-export([notify/2, notify_if/3]).
@@ -57,7 +57,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(init_stats_timer/0 :: () -> state()).
--spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()).
+-spec(ensure_stats_timer/3 :: (state(), pid(), term()) -> state()).
-spec(stop_stats_timer/1 :: (state()) -> state()).
-spec(reset_stats_timer/1 :: (state()) -> state()).
-spec(stats_level/1 :: (state()) -> level()).
@@ -80,7 +80,7 @@ start_link() ->
%% if_enabled(internal_emit_stats) - so we immediately send something
%%
%% On wakeup:
-%% ensure_stats_timer(Timer, emit_stats)
+%% ensure_stats_timer(Timer, Pid, emit_stats)
%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.)
%%
%% emit_stats:
@@ -99,13 +99,13 @@ init_stats_timer() ->
{ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
#state{level = StatsLevel, interval = Interval, timer = undefined}.
-ensure_stats_timer(State = #state{level = none}, _Fun) ->
+ensure_stats_timer(State = #state{level = none}, _Pid, _Msg) ->
State;
ensure_stats_timer(State = #state{interval = Interval,
- timer = undefined}, Fun) ->
- {ok, TRef} = timer:apply_after(Interval, erlang, apply, [Fun, []]),
+ timer = undefined}, Pid, Msg) ->
+ TRef = erlang:send_after(Interval, Pid, Msg),
State#state{timer = TRef};
-ensure_stats_timer(State, _Fun) ->
+ensure_stats_timer(State, _Pid, _Msg) ->
State.
stop_stats_timer(State = #state{level = none}) ->
@@ -113,7 +113,7 @@ stop_stats_timer(State = #state{level = none}) ->
stop_stats_timer(State = #state{timer = undefined}) ->
State;
stop_stats_timer(State = #state{timer = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State#state{timer = undefined}.
reset_stats_timer(State) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 15193d2b..8ee90645 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -58,7 +58,7 @@
-record(lim, {prefetch_count = 0,
ch_pid,
blocked = false,
- queues = dict:new(), % QPid -> {MonitorRef, Notify}
+ queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
%% notified of a change in the limit or volume that may allow it to
@@ -224,31 +224,30 @@ limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
blocked(#lim{blocked = Blocked}) -> Blocked.
remember_queue(QPid, State = #lim{queues = Queues}) ->
- case dict:is_key(QPid, Queues) of
+ case orddict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
- State#lim{queues = dict:store(QPid, {MRef, false}, Queues)};
+ State#lim{queues = orddict:store(QPid, {MRef, false}, Queues)};
true -> State
end.
forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
- case dict:find(QPid, Queues) of
- {ok, {MRef, _}} ->
- true = erlang:demonitor(MRef),
- ok = rabbit_amqqueue:unblock(QPid, ChPid),
- State#lim{queues = dict:erase(QPid, Queues)};
- error -> State
+ case orddict:find(QPid, Queues) of
+ {ok, {MRef, _}} -> true = erlang:demonitor(MRef),
+ ok = rabbit_amqqueue:unblock(QPid, ChPid),
+ State#lim{queues = orddict:erase(QPid, Queues)};
+ error -> State
end.
limit_queue(QPid, State = #lim{queues = Queues}) ->
UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
- State#lim{queues = dict:update(QPid, UpdateFun, Queues)}.
+ State#lim{queues = orddict:update(QPid, UpdateFun, Queues)}.
notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
{QList, NewQueues} =
- dict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
- (QPid, {MRef, true}, {L, D}) ->
- {[QPid | L], dict:store(QPid, {MRef, false}, D)}
- end, {[], Queues}, Queues),
+ orddict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
+ (QPid, {MRef, true}, {L, D}) ->
+ {[QPid | L], orddict:store(QPid, {MRef, false}, D)}
+ end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
L ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b38a8967..c918f388 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -37,7 +37,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2]).
+ prioritise_cast/2, prioritise_info/2]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -89,9 +89,8 @@ init([#amqqueue { name = QueueName } = Q]) ->
%% ASSERTION
[] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node],
MPids1 = MPids ++ [Self],
- mnesia:write(rabbit_queue,
- Q1 #amqqueue { slave_pids = MPids1 },
- write),
+ ok = rabbit_amqqueue:store_queue(
+ Q1 #amqqueue { slave_pids = MPids1 }),
{ok, QPid}
end),
erlang:monitor(process, MPid),
@@ -187,9 +186,9 @@ handle_cast({set_ram_duration_target, Duration},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- noreply(State #state { backing_queue_state = BQS1 });
+ noreply(State #state { backing_queue_state = BQS1 }).
-handle_cast(update_ram_duration,
+handle_info(update_ram_duration,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
@@ -199,9 +198,9 @@ handle_cast(update_ram_duration,
noreply(State #state { rate_timer_ref = just_measured,
backing_queue_state = BQS2 });
-handle_cast(sync_timeout, State) ->
+handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(
- State #state { sync_timer_ref = undefined })).
+ State #state { sync_timer_ref = undefined }));
handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
@@ -266,16 +265,21 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
- sync_timeout -> 6;
{gm, _Msg} -> 5;
{post_commit, _Txn, _AckTags} -> 4;
_ -> 0
end.
+prioritise_info(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ sync_timeout -> 6;
+ _ -> 0
+ end.
+
%% ---------------------------------------------------------------------------
%% GM
%% ---------------------------------------------------------------------------
@@ -516,8 +520,7 @@ backing_queue_timeout(State = #state { backing_queue = BQ }) ->
run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
- {ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
+ TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout),
State #state { sync_timer_ref = TRef };
ensure_sync_timer(State) ->
State.
@@ -525,14 +528,12 @@ ensure_sync_timer(State) ->
stop_sync_timer(State = #state { sync_timer_ref = undefined }) ->
State;
stop_sync_timer(State = #state { sync_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State #state { sync_timer_ref = undefined }.
ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
- {ok, TRef} = timer:apply_after(
- ?RAM_DURATION_UPDATE_INTERVAL,
- rabbit_amqqueue, update_ram_duration,
- [self()]),
+ TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
+ self(), update_ram_duration),
State #state { rate_timer_ref = TRef };
ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
State #state { rate_timer_ref = undefined };
@@ -544,7 +545,7 @@ stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
stop_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
State #state { rate_timer_ref = undefined };
stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State #state { rate_timer_ref = undefined }.
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index 879a6017..fc04ec79 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -16,18 +16,6 @@
-module(rabbit_mirror_queue_slave_sup).
--rabbit_boot_step({mirror_queue_slave_sup,
- [{description, "mirror queue slave sup"},
- {mfa, {rabbit_mirror_queue_slave_sup, start, []}},
- {requires, recovery},
- {enables, routing_ready}]}).
-
--rabbit_boot_step({mirrored_queues,
- [{description, "adding mirrors to queues"},
- {mfa, {rabbit_mirror_queue_misc, on_node_up, []}},
- {requires, mirror_queue_slave_sup},
- {enables, routing_ready}]}).
-
-behaviour(supervisor2).
-export([start/0, start_link/0, start_child/2]).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index b6b97f6d..b98dbd46 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -42,7 +42,7 @@
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
--export([format_stderr/2]).
+-export([format_stderr/2, with_local_io/1]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
@@ -57,6 +57,7 @@
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
-export([pget/2, pget/3, pget_or_die/2]).
+-export([format_message_queue/2]).
%%----------------------------------------------------------------------------
@@ -164,6 +165,7 @@
-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
+-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
@@ -205,6 +207,7 @@
-spec(pget/2 :: (term(), [term()]) -> term()).
-spec(pget/3 :: (term(), [term()], term()) -> term()).
-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
+-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-endif.
@@ -603,6 +606,17 @@ format_stderr(Fmt, Args) ->
end,
ok.
+%% Execute Fun using the IO system of the local node (i.e. the node on
+%% which the code is executing).
+with_local_io(Fun) ->
+ GL = group_leader(),
+ group_leader(whereis(user), self()),
+ try
+ Fun()
+ after
+ group_leader(GL, self())
+ end.
+
manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
Iterate(fun (App, Acc) ->
case Do(App) of
@@ -919,3 +933,24 @@ pget_or_die(K, P) ->
undefined -> exit({error, key_missing, K});
V -> V
end.
+
+format_message_queue(_Opt, MQ) ->
+ Len = priority_queue:len(MQ),
+ {Len,
+ case Len > 100 of
+ false -> priority_queue:to_list(MQ);
+ true -> {summary,
+ orddict:to_list(
+ lists:foldl(
+ fun ({P, V}, Counts) ->
+ orddict:update_counter(
+ {P, format_message_queue_entry(V)}, 1, Counts)
+ end, orddict:new(), priority_queue:to_list(MQ)))}
+ end}.
+
+format_message_queue_entry(V) when is_atom(V) ->
+ V;
+format_message_queue_entry(V) when is_tuple(V) ->
+ list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]);
+format_message_queue_entry(_V) ->
+ '_'.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 8d5c8646..ab553a8b 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -23,7 +23,8 @@
empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
create_cluster_nodes_config/1, read_cluster_nodes_config/0,
record_running_nodes/0, read_previously_running_nodes/0,
- delete_previously_running_nodes/0, running_nodes_filename/0]).
+ delete_previously_running_nodes/0, running_nodes_filename/0,
+ is_disc_node/0]).
-export([table_names/0]).
@@ -65,6 +66,7 @@
-spec(read_previously_running_nodes/0 :: () -> [node()]).
-spec(delete_previously_running_nodes/0 :: () -> 'ok').
-spec(running_nodes_filename/0 :: () -> file:filename()).
+-spec(is_disc_node/0 :: () -> boolean()).
-endif.
@@ -115,13 +117,47 @@ force_cluster(ClusterNodes) ->
cluster(ClusterNodes, Force) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
- rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+
+ %% Wipe mnesia if we're changing type from disc to ram
+ case {is_disc_node(), should_be_disc_node(ClusterNodes)} of
+ {true, false} -> error_logger:warning_msg(
+ "changing node type; wiping mnesia...~n~n"),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
+ cannot_delete_schema);
+ _ -> ok
+ end,
+
+ %% Pre-emptively leave the cluster
+ %%
+ %% We're trying to handle the following two cases:
+ %% 1. We have a two-node cluster, where both nodes are disc nodes.
+ %% One node is re-clustered as a ram node. When it tries to
+ %% re-join the cluster, but before it has time to update its
+ %% tables definitions, the other node will order it to re-create
+ %% its disc tables. So, we need to leave the cluster before we
+ %% can join it again.
+ %% 2. We have a two-node cluster, where both nodes are disc nodes.
+ %% One node is forcefully reset (so, the other node thinks its
+ %% still a part of the cluster). The reset node is re-clustered
+ %% as a ram node. Same as above, we need to leave the cluster
+ %% before we can join it. But, since we don't know if we're in a
+ %% cluster or not, we just pre-emptively leave it before joining.
+ ProperClusterNodes = ClusterNodes -- [node()],
+ try
+ ok = leave_cluster(ProperClusterNodes, ProperClusterNodes)
+ catch
+ {error, {no_running_cluster_nodes, _, _}} when Force ->
+ ok
+ end,
+
+ %% Join the cluster
+ start_mnesia(),
try
ok = init_db(ClusterNodes, Force,
fun maybe_upgrade_local_or_record_desired/0),
ok = create_cluster_nodes_config(ClusterNodes)
after
- mnesia:stop()
+ stop_mnesia()
end,
ok.
@@ -158,10 +194,13 @@ nodes_of_type(Type) ->
%% This function should return the nodes of a certain type (ram,
%% disc or disc_only) in the current cluster. The type of nodes
%% is determined when the cluster is initially configured.
- %% Specifically, we check whether a certain table, which we know
- %% will be written to disk on a disc node, is stored on disk or in
- %% RAM.
- mnesia:table_info(rabbit_durable_exchange, Type).
+ mnesia:table_info(schema, Type).
+
+%% The tables aren't supposed to be on disk on a ram node
+table_definitions(disc) ->
+ table_definitions();
+table_definitions(ram) ->
+ [{Tab, copy_type_to_ram(TabDef)} || {Tab, TabDef} <- table_definitions()].
table_definitions() ->
[{rabbit_user,
@@ -218,8 +257,6 @@ table_definitions() ->
{type, ordered_set},
{match, #topic_trie_binding{trie_binding = trie_binding_match(),
_='_'}}]},
- %% Consider the implications to nodes_of_type/1 before altering
- %% the next entry.
{rabbit_durable_exchange,
[{record_name, exchange},
{attributes, record_info(fields, exchange)},
@@ -341,7 +378,11 @@ check_table_content(Tab, TabDef) ->
end.
check_tables(Fun) ->
- case [Error || {Tab, TabDef} <- table_definitions(),
+ case [Error || {Tab, TabDef} <- table_definitions(
+ case is_disc_node() of
+ true -> disc;
+ false -> ram
+ end),
case Fun(Tab, TabDef) of
ok -> Error = none, false;
{error, Error} -> true
@@ -442,30 +483,47 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
end;
true -> ok
end,
- case {Nodes, mnesia:system_info(use_dir)} of
- {[], false} ->
+ WantDiscNode = should_be_disc_node(ClusterNodes),
+ WasDiscNode = is_disc_node(),
+ %% We create a new db (on disk, or in ram) in the first
+ %% two cases and attempt to upgrade the in the other two
+ case {Nodes, WasDiscNode, WantDiscNode} of
+ {[], _, false} ->
+ %% New ram node; start from scratch
+ ok = create_schema(ram);
+ {[], false, true} ->
%% Nothing there at all, start from scratch
- ok = create_schema();
- {[], true} ->
+ ok = create_schema(disc);
+ {[], true, true} ->
%% We're the first node up
case rabbit_upgrade:maybe_upgrade_local() of
ok -> ensure_schema_integrity();
version_not_available -> ok = schema_ok_or_move()
- end,
- ok;
- {[AnotherNode|_], _} ->
+ end;
+ {[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
ensure_version_ok(
rpc:call(AnotherNode, rabbit_version, recorded, [])),
- IsDiskNode = ClusterNodes == [] orelse
- lists:member(node(), ClusterNodes),
+ {CopyType, CopyTypeAlt} =
+ case WantDiscNode of
+ true -> {disc, disc_copies};
+ false -> {ram, ram_copies}
+ end,
ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, disc_copies),
- ok = create_local_table_copies(case IsDiskNode of
- true -> disc;
- false -> ram
- end),
+ ok = create_local_table_copy(schema, CopyTypeAlt),
+ ok = create_local_table_copies(CopyType),
+
ok = SecondaryPostMnesiaFun(),
+ %% We've taken down mnesia, so ram nodes will need
+ %% to re-sync
+ case is_disc_node() of
+ false -> start_mnesia(),
+ mnesia:change_config(extra_db_nodes,
+ ProperClusterNodes),
+ wait_for_replicated_tables();
+ true -> ok
+ end,
+
ensure_schema_integrity(),
ok
end;
@@ -496,7 +554,7 @@ schema_ok_or_move() ->
"and recreating schema from scratch~n",
[Reason]),
ok = move_db(),
- ok = create_schema()
+ ok = create_schema(disc)
end.
ensure_version_ok({ok, DiscVersion}) ->
@@ -508,18 +566,27 @@ ensure_version_ok({ok, DiscVersion}) ->
ensure_version_ok({error, _}) ->
ok = rabbit_version:record_desired().
-create_schema() ->
- mnesia:stop(),
- rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
- cannot_create_schema),
- rabbit_misc:ensure_ok(mnesia:start(),
- cannot_start_mnesia),
- ok = create_tables(),
+create_schema(Type) ->
+ stop_mnesia(),
+ case Type of
+ disc -> rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
+ cannot_create_schema);
+ ram -> %% remove the disc schema since this is a ram node
+ rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
+ cannot_delete_schema)
+ end,
+ start_mnesia(),
+ ok = create_tables(Type),
ensure_schema_integrity(),
ok = rabbit_version:record_desired().
+is_disc_node() -> mnesia:system_info(use_dir).
+
+should_be_disc_node(ClusterNodes) ->
+ ClusterNodes == [] orelse lists:member(node(), ClusterNodes).
+
move_db() ->
- mnesia:stop(),
+ stop_mnesia(),
MnesiaDir = filename:dirname(dir() ++ "/"),
{{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(),
BackupDir = lists:flatten(
@@ -537,14 +604,16 @@ move_db() ->
MnesiaDir, BackupDir, Reason}})
end,
ensure_mnesia_dir(),
- rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ start_mnesia(),
ok.
copy_db(Destination) ->
ok = ensure_mnesia_not_running(),
rabbit_misc:recursive_copy(dir(), Destination).
-create_tables() ->
+create_tables() -> create_tables(disc).
+
+create_tables(Type) ->
lists:foreach(fun ({Tab, TabDef}) ->
TabDef1 = proplists:delete(match, TabDef),
case mnesia:create_table(Tab, TabDef1) of
@@ -554,9 +623,13 @@ create_tables() ->
Tab, TabDef1, Reason}})
end
end,
- table_definitions()),
+ table_definitions(Type)),
ok.
+copy_type_to_ram(TabDef) ->
+ [{disc_copies, []}, {ram_copies, [node()]}
+ | proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))].
+
table_has_copy_type(TabDef, DiscType) ->
lists:member(node(), proplists:get_value(DiscType, TabDef, [])).
@@ -586,7 +659,7 @@ create_local_table_copies(Type) ->
end,
ok = create_local_table_copy(Tab, StorageType)
end,
- table_definitions()),
+ table_definitions(Type)),
ok.
create_local_table_copy(Tab, Type) ->
@@ -622,14 +695,14 @@ reset(Force) ->
true -> ok;
false ->
ensure_mnesia_dir(),
- rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ start_mnesia(),
{Nodes, RunningNodes} =
try
ok = init(),
{all_clustered_nodes() -- [Node],
running_clustered_nodes() -- [Node]}
after
- mnesia:stop()
+ stop_mnesia()
end,
leave_cluster(Nodes, RunningNodes),
rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
@@ -652,6 +725,7 @@ leave_cluster(Nodes, RunningNodes) ->
[schema, node()]) of
{atomic, ok} -> true;
{badrpc, nodedown} -> false;
+ {aborted, {node_not_running, _}} -> false;
{aborted, Reason} ->
throw({error, {failed_to_leave_cluster,
Nodes, RunningNodes, Reason}})
@@ -662,3 +736,11 @@ leave_cluster(Nodes, RunningNodes) ->
false -> throw({error, {no_running_cluster_nodes,
Nodes, RunningNodes}})
end.
+
+start_mnesia() ->
+ rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ ensure_mnesia_running().
+
+stop_mnesia() ->
+ stopped = mnesia:stop(),
+ ensure_mnesia_not_running().
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 3f4162cd..e90e1281 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -23,13 +23,14 @@
client_ref/1, close_all_indicated/1,
write/3, read/2, contains/2, remove/2, sync/3]).
--export([sync/1, set_maximum_since_use/2,
- has_readers/2, combine_files/3, delete_file/2]). %% internal
+-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
+ delete_file/2]). %% internal
-export([transform_dir/3, force_recovery/2]). %% upgrade
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3, prioritise_call/3, prioritise_cast/2,
+ prioritise_info/2, format_message_queue/2]).
%%----------------------------------------------------------------------------
@@ -153,7 +154,6 @@
-spec(sync/3 ::
([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok').
--spec(sync/1 :: (server()) -> 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
@@ -443,9 +443,6 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
server_cast(CState, {remove, CRef, MsgIds}).
sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}).
-sync(Server) ->
- gen_server2:cast(Server, sync).
-
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
@@ -682,7 +679,6 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- sync -> 8;
{combine_files, _Source, _Destination, _Reclaimed} -> 8;
{delete_file, _File, _Reclaimed} -> 8;
{set_maximum_since_use, _Age} -> 8;
@@ -690,6 +686,12 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
+prioritise_info(Msg, _State) ->
+ case Msg of
+ sync -> 8;
+ _ -> 0
+ end.
+
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
@@ -773,9 +775,6 @@ handle_cast({sync, MsgIds, K},
true -> noreply(State #msstate { on_sync = [K | Syncs] })
end;
-handle_cast(sync, State) ->
- noreply(internal_sync(State));
-
handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
file_handles_ets = FileHandlesEts,
@@ -799,6 +798,9 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
+handle_info(sync, State) ->
+ noreply(internal_sync(State));
+
handle_info(timeout, State) ->
noreply(internal_sync(State));
@@ -836,6 +838,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%%----------------------------------------------------------------------------
%% general helper functions
%%----------------------------------------------------------------------------
@@ -863,13 +867,13 @@ next_state(State = #msstate { on_sync = Syncs,
end.
start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
- {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]),
+ TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync),
State #msstate { sync_timer_ref = TRef }.
stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
State;
stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State #msstate { sync_timer_ref = undefined }.
internal_sync(State = #msstate { current_file_handle = CurHdl,
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 1f30a2fc..78aeb2ef 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -60,24 +60,19 @@ notify_cluster() ->
%%--------------------------------------------------------------------
init([]) ->
- ok = net_kernel:monitor_nodes(true),
{ok, no_state}.
handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast({rabbit_running_on, Node}, State) ->
- rabbit_log:info("node ~p up~n", [Node]),
+ rabbit_log:info("rabbit on ~p up~n", [Node]),
erlang:monitor(process, {rabbit, Node}),
ok = rabbit_alarm:on_node_up(Node),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({nodedown, Node}, State) ->
- rabbit_log:info("node ~p down~n", [Node]),
- ok = handle_dead_rabbit(Node),
- {noreply, State};
handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
ok = handle_dead_rabbit(Node),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index bf89cdb2..636913b5 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -569,13 +569,13 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
add_to_journal(RelSeq, Action,
Segment = #segment { journal_entries = JEntries,
unacked = UnackedCount }) ->
- Segment1 = Segment #segment {
- journal_entries = add_to_journal(RelSeq, Action, JEntries) },
- case Action of
- del -> Segment1;
- ack -> Segment1 #segment { unacked = UnackedCount - 1 };
- ?PUB -> Segment1 #segment { unacked = UnackedCount + 1 }
- end;
+ Segment #segment {
+ journal_entries = add_to_journal(RelSeq, Action, JEntries),
+ unacked = UnackedCount + case Action of
+ ?PUB -> +1;
+ del -> 0;
+ ack -> -1
+ end};
add_to_journal(RelSeq, Action, JEntries) ->
Val = case array:get(RelSeq, JEntries) of
@@ -1013,7 +1013,7 @@ add_queue_ttl_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
{[<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>,
MsgId, expiry_to_binary(undefined)], Rest};
add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS, Rest>>) ->
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
{<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
Rest};
add_queue_ttl_segment(_) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index dffabf85..2dccc748 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -28,8 +28,6 @@
-export([process_channel_frame/5]). %% used by erlang-client
--export([emit_stats/1]).
-
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
@@ -70,7 +68,6 @@
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
--spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
@@ -126,9 +123,6 @@ info(Pid, Items) ->
{error, Error} -> throw(Error)
end.
-emit_stats(Pid) ->
- gen_server:cast(Pid, emit_stats).
-
conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
@@ -323,8 +317,8 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
catch Error -> {error, Error}
end),
mainloop(Deb, State);
-handle_other({'$gen_cast', emit_stats}, Deb, State) ->
- mainloop(Deb, internal_emit_stats(State));
+handle_other(emit_stats, Deb, State) ->
+ mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
handle_other(Other, _Deb, _State) ->
@@ -591,10 +585,8 @@ refuse_connection(Sock, Exception) ->
ensure_stats_timer(State = #v1{stats_timer = StatsTimer,
connection_state = running}) ->
- Self = self(),
State#v1{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer,
- fun() -> emit_stats(Self) end)};
+ StatsTimer, self(), emit_stats)};
ensure_stats_timer(State) ->
State.
@@ -694,7 +686,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
rabbit_event:if_enabled(StatsTimer,
- fun() -> internal_emit_stats(State1) end),
+ fun() -> emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
@@ -923,6 +915,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}},
State1#v1.sock, 0, CloseMethod, Protocol),
State1.
-internal_emit_stats(State = #v1{stats_timer = StatsTimer}) ->
+emit_stats(State = #v1{stats_timer = StatsTimer}) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3ba87b00..6266b5ef 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -203,6 +203,42 @@ test_priority_queue() ->
{true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} =
test_priority_queue(Q15),
+ %% 1-element infinity priority Q
+ Q16 = priority_queue:in(foo, infinity, Q),
+ {true, false, 1, [{infinity, foo}], [foo]} = test_priority_queue(Q16),
+
+ %% add infinity to 0-priority Q
+ Q17 = priority_queue:in(foo, infinity, priority_queue:in(bar, Q)),
+ {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q17),
+
+ %% and the other way around
+ Q18 = priority_queue:in(bar, priority_queue:in(foo, infinity, Q)),
+ {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q18),
+
+ %% add infinity to mixed-priority Q
+ Q19 = priority_queue:in(qux, infinity, Q3),
+ {true, false, 3, [{infinity, qux}, {2, bar}, {1, foo}], [qux, bar, foo]} =
+ test_priority_queue(Q19),
+
+ %% merge the above with a negative priority Q
+ Q20 = priority_queue:join(Q19, Q4),
+ {true, false, 4, [{infinity, qux}, {2, bar}, {1, foo}, {-1, foo}],
+ [qux, bar, foo, foo]} = test_priority_queue(Q20),
+
+ %% merge two infinity priority queues
+ Q21 = priority_queue:join(priority_queue:in(foo, infinity, Q),
+ priority_queue:in(bar, infinity, Q)),
+ {true, false, 2, [{infinity, foo}, {infinity, bar}], [foo, bar]} =
+ test_priority_queue(Q21),
+
+ %% merge two mixed priority with infinity queues
+ Q22 = priority_queue:join(Q18, Q20),
+ {true, false, 6, [{infinity, foo}, {infinity, qux}, {2, bar}, {1, foo},
+ {0, bar}, {-1, foo}], [foo, qux, bar, foo, bar, foo]} =
+ test_priority_queue(Q22),
+
passed.
priority_queue_in_all(Q, L) ->
@@ -904,7 +940,6 @@ test_option_parser() ->
passed.
test_cluster_management() ->
-
%% 'cluster' and 'reset' should only work if the app is stopped
{error, _} = control_action(cluster, []),
{error, _} = control_action(reset, []),
@@ -952,13 +987,16 @@ test_cluster_management() ->
ok = control_action(reset, []),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
+ ok = assert_disc_node(),
ok = control_action(force_cluster, ["invalid1@invalid",
"invalid2@invalid"]),
+ ok = assert_ram_node(),
%% join a non-existing cluster as a ram node
ok = control_action(reset, []),
ok = control_action(force_cluster, ["invalid1@invalid",
"invalid2@invalid"]),
+ ok = assert_ram_node(),
SecondaryNode = rabbit_misc:makenode("hare"),
case net_adm:ping(SecondaryNode) of
@@ -977,15 +1015,18 @@ test_cluster_management2(SecondaryNode) ->
%% make a disk node
ok = control_action(reset, []),
ok = control_action(cluster, [NodeS]),
+ ok = assert_disc_node(),
%% make a ram node
ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS]),
+ ok = assert_ram_node(),
%% join cluster as a ram node
ok = control_action(reset, []),
ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
+ ok = assert_ram_node(),
%% change cluster config while remaining in same cluster
ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]),
@@ -997,27 +1038,45 @@ test_cluster_management2(SecondaryNode) ->
"invalid2@invalid"]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
+ ok = assert_ram_node(),
- %% join empty cluster as a ram node
+ %% join empty cluster as a ram node (converts to disc)
ok = control_action(cluster, []),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
+ ok = assert_disc_node(),
- %% turn ram node into disk node
+ %% make a new ram node
ok = control_action(reset, []),
+ ok = control_action(force_cluster, [SecondaryNodeS]),
+ ok = control_action(start_app, []),
+ ok = control_action(stop_app, []),
+ ok = assert_ram_node(),
+
+ %% turn ram node into disk node
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
+ ok = assert_disc_node(),
%% convert a disk node into a ram node
+ ok = assert_disc_node(),
ok = control_action(force_cluster, ["invalid1@invalid",
"invalid2@invalid"]),
+ ok = assert_ram_node(),
+
+ %% make a new disk node
+ ok = control_action(force_reset, []),
+ ok = control_action(start_app, []),
+ ok = control_action(stop_app, []),
+ ok = assert_disc_node(),
%% turn a disk node into a ram node
ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
+ ok = assert_ram_node(),
%% NB: this will log an inconsistent_database error, which is harmless
%% Turning cover on / off is OK even if we're not in general using cover,
@@ -1043,6 +1102,10 @@ test_cluster_management2(SecondaryNode) ->
{error, {no_running_cluster_nodes, _, _}} =
control_action(reset, []),
+ %% attempt to change type when no other node is alive
+ {error, {no_running_cluster_nodes, _, _}} =
+ control_action(cluster, [SecondaryNodeS]),
+
%% leave system clustered, with the secondary node as a ram node
ok = control_action(force_reset, []),
ok = control_action(start_app, []),
@@ -1224,7 +1287,7 @@ test_statistics_event_receiver(Pid) ->
test_statistics_receive_event(Ch, Matcher) ->
rabbit_channel:flush(Ch),
- rabbit_channel:emit_stats(Ch),
+ Ch ! emit_stats,
test_statistics_receive_event1(Ch, Matcher).
test_statistics_receive_event1(Ch, Matcher) ->
@@ -1581,6 +1644,18 @@ clean_logs(Files, Suffix) ->
end || File <- Files],
ok.
+assert_ram_node() ->
+ case rabbit_mnesia:is_disc_node() of
+ true -> exit('not_ram_node');
+ false -> ok
+ end.
+
+assert_disc_node() ->
+ case rabbit_mnesia:is_disc_node() of
+ true -> ok;
+ false -> exit('not_disc_node')
+ end.
+
delete_file(File) ->
case file:delete(File) of
ok -> ok;
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index a2abb1e5..9739f6b7 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -144,7 +144,7 @@ upgrade_mode(AllNodes) ->
case nodes_running(AllNodes) of
[] ->
AfterUs = rabbit_mnesia:read_previously_running_nodes(),
- case {is_disc_node(), AfterUs} of
+ case {is_disc_node_legacy(), AfterUs} of
{true, []} ->
primary;
{true, _} ->
@@ -182,12 +182,6 @@ upgrade_mode(AllNodes) ->
end
end.
-is_disc_node() ->
- %% This is pretty ugly but we can't start Mnesia and ask it (will hang),
- %% we can't look at the config file (may not include us even if we're a
- %% disc node).
- filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")).
-
die(Msg, Args) ->
%% We don't throw or exit here since that gets thrown
%% straight out into do_boot, generating an erl_crash.dump
@@ -218,7 +212,7 @@ force_tables() ->
secondary_upgrade(AllNodes) ->
%% must do this before we wipe out schema
- IsDiscNode = is_disc_node(),
+ IsDiscNode = is_disc_node_legacy(),
rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
cannot_delete_schema),
%% Note that we cluster with all nodes, rather than all disc nodes
@@ -282,6 +276,14 @@ lock_filename() -> lock_filename(dir()).
lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME).
backup_dir() -> dir() ++ "-upgrade-backup".
+is_disc_node_legacy() ->
+ %% This is pretty ugly but we can't start Mnesia and ask it (will
+ %% hang), we can't look at the config file (may not include us
+ %% even if we're a disc node). We also can't use
+ %% rabbit_mnesia:is_disc_node/0 because that will give false
+ %% postivies on Rabbit up to 2.5.1.
+ filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")).
+
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet
info(Msg, Args) -> error_logger:info_msg(Msg, Args).
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index ec1ee9cd..405949ef 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -76,7 +76,6 @@
%% Internal exports
-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]).
-export([handle_cast/2]).
--export([delayed_restart/2]).
-define(DICT, dict).
@@ -157,9 +156,6 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
end;
check_childspecs(X) -> {error, {badarg, X}}.
-delayed_restart(Supervisor, RestartDetails) ->
- gen_server:cast(Supervisor, {delayed_restart, RestartDetails}).
-
%%% ---------------------------------------------------
%%%
%%% Initialize the supervisor.
@@ -355,12 +351,19 @@ handle_call(which_children, _From, State) ->
State#state.children),
{reply, Resp, State}.
+%%% Hopefully cause a function-clause as there is no API function
+%%% that utilizes cast.
+handle_cast(null, State) ->
+ error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
+ []),
+
+ {noreply, State}.
-handle_cast({delayed_restart, {RestartType, Reason, Child}}, State)
+handle_info({delayed_restart, {RestartType, Reason, Child}}, State)
when ?is_simple(State) ->
{ok, NState} = do_restart(RestartType, Reason, Child, State),
{noreply, NState};
-handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) ->
+handle_info({delayed_restart, {RestartType, Reason, Child}}, State) ->
case get_child(Child#child.name, State) of
{value, Child1} ->
{ok, NState} = do_restart(RestartType, Reason, Child1, State),
@@ -369,14 +372,6 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) ->
{noreply, State}
end;
-%%% Hopefully cause a function-clause as there is no API function
-%%% that utilizes cast.
-handle_cast(null, State) ->
- error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
- []),
-
- {noreply, State}.
-
%%
%% Take care of terminated children.
%%
@@ -539,9 +534,9 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
{ok, NState} ->
{ok, NState};
{terminate, NState} ->
- {ok, _TRef} = timer:apply_after(
- trunc(Delay*1000), ?MODULE, delayed_restart,
- [self(), {{RestartType, Delay}, Reason, Child}]),
+ _TRef = erlang:send_after(trunc(Delay*1000), self(),
+ {delayed_restart,
+ {{RestartType, Delay}, Reason, Child}}),
{ok, state_del_child(Child, NState)}
end;
do_restart(permanent, Reason, Child, State) ->