authorMatthias Radestock <>2011-07-26 16:53:25 +0100
committerMatthias Radestock <>2011-07-26 16:53:25 +0100
commit076f9c4723b987e89520754aa5bf7859a3192b04 (patch)
parent4f0f940bbd406e36b9e6c1663cdfcda6704926db (diff)
parent5c8f4940bd021364f823d20c9a6fad843a3f5b3b (diff)
merge bug24296 into default
3 files changed, 445 insertions, 1 deletions
diff --git a/Makefile b/Makefile
index d8ef058e..495689fb 100644
--- a/Makefile
+++ b/Makefile
@@ -20,6 +20,8 @@ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml,, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML)))
+QC_MODULES := rabbit_backing_queue_qc
+QC_TRIALS ?= 100
ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes)
@@ -45,8 +47,14 @@ ifndef USE_SPECS
USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8,4]), halt().')
+# PropEr needs to be installed for property checking
+USE_PROPER_QC:=$(shell erl -noshell -eval 'io:format({module, proper} =:= code:ensure_loaded(proper)), halt().')
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
-ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(if $(filter true,$(USE_SPECS)),-Duse_specs)
+ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc)
@@ -69,6 +77,10 @@ define usage_dep
$(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl
+define boolean_macro
+$(if $(filter true,$(1)),-D$(2))
ifneq "$(SBIN_DIR)" ""
ifneq "$(TARGET_DIR)" ""
SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
@@ -165,6 +177,9 @@ run-tests: all
OUT=$$(echo "rabbit_tests:all_tests()." | $(ERL_CALL)) ; \
echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null
+run-qc: all
+ $(foreach MOD,$(QC_MODULES),./quickcheck $(RABBITMQ_NODENAME) $(MOD) $(QC_TRIALS))
@@ -314,3 +329,4 @@ ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" ""
-include $(DEPS_FILE)
+.PHONY: run-qc
diff --git a/quickcheck b/quickcheck
new file mode 100755
index 00000000..a36cf3ed
--- /dev/null
+++ b/quickcheck
@@ -0,0 +1,36 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+%%! -sname quickcheck
+%% A helper to test quickcheck properties on a running broker
+%% NodeStr is a local broker node name
+%% ModStr is the module containing quickcheck properties
+%% The number of trials is optional
+main([NodeStr, ModStr | TrialsStr]) ->
+ {ok, Hostname} = inet:gethostname(),
+ Node = list_to_atom(NodeStr ++ "@" ++ Hostname),
+ Mod = list_to_atom(ModStr),
+ Trials = lists:map(fun erlang:list_to_integer/1, TrialsStr),
+ case rpc:call(Node, code, ensure_loaded, [proper]) of
+ {module, proper} ->
+ case rpc:call(Node, proper, module, [Mod] ++ Trials) of
+ [] -> ok;
+ _ -> quit(1)
+ end;
+ {badrpc, Reason} ->
+ io:format("Could not contact node ~p: ~p.~n", [Node, Reason]),
+ quit(2);
+ {error,nofile} ->
+ io:format("Module PropEr was not found on node ~p~n", [Node]),
+ quit(2)
+ end;
+main([]) ->
+ io:format("This script requires a node name and a module.~n").
+quit(Status) ->
+ case os:type() of
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status)
+ end.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
new file mode 100644
index 00000000..d358a041
--- /dev/null
+++ b/src/rabbit_backing_queue_qc.erl
@@ -0,0 +1,392 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%% The Original Code is RabbitMQ.
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+-define(BQMOD, rabbit_variable_queue).
+-define(QUEUE_MAXLEN, 10000).
+-define(TIMEOUT_LIMIT, 100).
+-define(RECORD_INDEX(Key, Record),
+ proplists:get_value(Key, lists:zip(
+ record_info(fields, Record), lists:seq(2, record_info(size, Record))))).
+-export([initial_state/0, command/1, precondition/2, postcondition/3,
+ next_state/3]).
+-export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]).
+-record(state, {bqstate,
+ len, %% int
+ messages, %% queue of {msg_props, basic_msg}
+ acks, %% dict of acktag => {msg_props, basic_msg}
+ confirms}). %% set of msgid
+%% Initialise model
+initial_state() ->
+ #state{bqstate = qc_variable_queue_init(qc_test_queue()),
+ len = 0,
+ messages = queue:new(),
+ acks = orddict:new(),
+ confirms = gb_sets:new()}.
+%% Property
+prop_backing_queue_test() ->
+ ?FORALL(Cmds, commands(?MODULE, initial_state()),
+ backing_queue_test(Cmds)).
+backing_queue_test(Cmds) ->
+ {ok, FileSizeLimit} =
+ application:get_env(rabbit, msg_store_file_size_limit),
+ application:set_env(rabbit, msg_store_file_size_limit, 512,
+ infinity),
+ {ok, MaxJournal} =
+ application:get_env(rabbit, queue_index_max_journal_entries),
+ application:set_env(rabbit, queue_index_max_journal_entries, 128,
+ infinity),
+ {_H, #state{bqstate = BQ}, Res} = run_commands(?MODULE, Cmds),
+ application:set_env(rabbit, msg_store_file_size_limit,
+ FileSizeLimit, infinity),
+ application:set_env(rabbit, queue_index_max_journal_entries,
+ MaxJournal, infinity),
+ ?BQMOD:delete_and_terminate(shutdown, BQ),
+ io:format("Result: ~p~n", [Res]),
+ aggregate(command_names(Cmds), Res =:= ok)).
+%% Commands
+%% Command frequencies are tuned so that queues are normally reasonably
+%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple
+%% and purging cause extreme queue lengths, so these have lower probabilities.
+%% Fetches are sufficiently frequent so that commands that need acktags
+%% get decent coverage.
+command(S) ->
+ frequency([{10, qc_publish(S)},
+ {1, qc_publish_delivered(S)},
+ {1, qc_publish_multiple(S)}, %% very slow
+ {15, qc_fetch(S)}, %% needed for ack and requeue
+ {15, qc_ack(S)},
+ {15, qc_requeue(S)},
+ {3, qc_set_ram_duration_target(S)},
+ {1, qc_ram_duration(S)},
+ {1, qc_drain_confirmed(S)},
+ {1, qc_dropwhile(S)},
+ {1, qc_is_empty(S)},
+ {1, qc_timeout(S)},
+ {1, qc_purge(S)}]).
+qc_publish(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, publish,
+ [qc_message(),
+ #message_properties{needs_confirming = frequency([{1, true},
+ {20, false}]),
+ expiry = choose(0, 10)},
+ self(), BQ]}.
+qc_publish_multiple(#state{bqstate = BQ}) ->
+ {call, ?MODULE, publish_multiple,
+ [qc_message(), #message_properties{}, BQ,
+ resize(?QUEUE_MAXLEN, pos_integer())]}.
+qc_publish_delivered(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, publish_delivered,
+ [boolean(), qc_message(), #message_properties{}, self(), BQ]}.
+qc_fetch(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, fetch, [boolean(), BQ]}.
+qc_ack(#state{bqstate = BQ, acks = Acks}) ->
+ {call, ?BQMOD, ack, [rand_choice(orddict:fetch_keys(Acks)), BQ]}.
+qc_requeue(#state{bqstate = BQ, acks = Acks}) ->
+ {call, ?BQMOD, requeue,
+ [rand_choice(orddict:fetch_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
+qc_set_ram_duration_target(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, set_ram_duration_target,
+ [oneof([0, 1, 2, resize(1000, pos_integer()), infinity]), BQ]}.
+qc_ram_duration(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, ram_duration, [BQ]}.
+qc_drain_confirmed(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, drain_confirmed, [BQ]}.
+qc_dropwhile(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
+qc_is_empty(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, is_empty, [BQ]}.
+qc_timeout(#state{bqstate = BQ}) ->
+ {call, ?MODULE, timeout, [BQ, ?TIMEOUT_LIMIT]}.
+qc_purge(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, purge, [BQ]}.
+%% Preconditions
+precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg})
+ when Fun =:= ack; Fun =:= requeue ->
+ orddict:size(Acks) > 0;
+precondition(#state{messages = Messages},
+ {call, ?BQMOD, publish_delivered, _Arg}) ->
+ queue:is_empty(Messages);
+precondition(_S, {call, ?BQMOD, _Fun, _Arg}) ->
+ true;
+precondition(_S, {call, ?MODULE, timeout, _Arg}) ->
+ true;
+precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) ->
+%% Model updates
+next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
+ #state{len = Len, messages = Messages, confirms = Confirms} = S,
+ MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
+ NeedsConfirm =
+ {call, erlang, element,
+ [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
+ S#state{bqstate = BQ,
+ len = Len + 1,
+ messages = queue:in({MsgProps, Msg}, Messages),
+ confirms = case eval(NeedsConfirm) of
+ true -> gb_sets:add(MsgId, Confirms);
+ _ -> Confirms
+ end};
+next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) ->
+ #state{len = Len, messages = Messages} = S,
+ Messages1 = repeat(Messages, fun(Msgs) ->
+ queue:in({MsgProps, Msg}, Msgs)
+ end, Count),
+ S#state{bqstate = BQ,
+ len = Len + Count,
+ messages = Messages1};
+next_state(S, Res,
+ {call, ?BQMOD, publish_delivered,
+ [AckReq, Msg, MsgProps, _Pid, _BQ]}) ->
+ #state{confirms = Confirms, acks = Acks} = S,
+ AckTag = {call, erlang, element, [1, Res]},
+ BQ1 = {call, erlang, element, [2, Res]},
+ MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
+ NeedsConfirm =
+ {call, erlang, element,
+ [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
+ S#state{bqstate = BQ1,
+ confirms = case eval(NeedsConfirm) of
+ true -> gb_sets:add(MsgId, Confirms);
+ _ -> Confirms
+ end,
+ acks = case AckReq of
+ true -> orddict:append(AckTag, {MsgProps, Msg}, Acks);
+ false -> Acks
+ end
+ };
+next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
+ #state{len = Len, messages = Messages, acks = Acks} = S,
+ ResultInfo = {call, erlang, element, [1, Res]},
+ BQ1 = {call, erlang, element, [2, Res]},
+ AckTag = {call, erlang, element, [3, ResultInfo]},
+ S1 = S#state{bqstate = BQ1},
+ case queue:out(Messages) of
+ {empty, _M2} ->
+ S1;
+ {{value, MsgProp_Msg}, M2} ->
+ S2 = S1#state{len = Len - 1, messages = M2},
+ case AckReq of
+ true ->
+ S2#state{acks = orddict:append(AckTag, MsgProp_Msg, Acks)};
+ false ->
+ S2
+ end
+ end;
+next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
+ #state{acks = AcksState} = S,
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1,
+ acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)};
+next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) ->
+ #state{len = Len, messages = Messages, acks = AcksState} = S,
+ BQ1 = {call, erlang, element, [2, Res]},
+ RequeueMsgs = lists:append([orddict:fetch(Key, AcksState) ||
+ Key <- AcksArg]),
+ S#state{bqstate = BQ1,
+ len = Len + length(RequeueMsgs),
+ messages = queue:join(Messages, queue:from_list(RequeueMsgs)),
+ acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)};
+next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) ->
+ S#state{bqstate = BQ};
+next_state(S, Res, {call, ?BQMOD, ram_duration, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1};
+next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1};
+next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) ->
+ #state{messages = Messages} = S,
+ Messages1 = drop_messages(Messages),
+ S#state{bqstate = BQ1, len = queue:len(Messages1), messages = Messages1};
+next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) ->
+ S;
+next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
+ S#state{bqstate = BQ};
+next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1, len = 0, messages = queue:new()}.
+%% Postconditions
+postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
+ #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
+ case Res of
+ {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
+ {_MsgProps, Msg} = queue:head(Messages),
+ MsgFetched =:= Msg andalso
+ not orddict:is_key(AckTag, Acks) andalso
+ not gb_sets:is_element(AckTag, Confrms) andalso
+ RemainingLen =:= Len - 1;
+ {empty, _BQ} ->
+ Len =:= 0
+ end;
+postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) ->
+ #state{acks = Acks, confirms = Confrms} = S,
+ not orddict:is_key(AckTag, Acks) andalso
+ not gb_sets:is_element(AckTag, Confrms);
+postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) ->
+ {PurgeCount, _BQ} = Res,
+ Len =:= PurgeCount;
+postcondition(#state{len = Len},
+ {call, ?BQMOD, is_empty, _Args}, Res) ->
+ (Len =:= 0) =:= Res;
+postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
+ #state{confirms = Confirms} = S,
+ {ReportedConfirmed, _BQ} = Res,
+ lists:all(fun (M) ->
+ gb_sets:is_element(M, Confirms)
+ end, ReportedConfirmed);
+postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
+ ?BQMOD:len(BQ) =:= Len.
+%% Helpers
+repeat(Result, _Fun, 0) ->
+ Result;
+repeat(Result, Fun, Times) ->
+ repeat(Fun(Result), Fun, Times - 1).
+publish_multiple(Msg, MsgProps, BQ, Count) ->
+ repeat(BQ, fun(BQ1) ->
+ ?BQMOD:publish(Msg, MsgProps, self(), BQ1)
+ end, Count).
+timeout(BQ, 0) ->
+ BQ;
+timeout(BQ, AtMost) ->
+ case ?BQMOD:needs_timeout(BQ) of
+ false -> BQ;
+ _ -> timeout(?BQMOD:timeout(BQ), AtMost - 1)
+ end.
+qc_message_payload() ->
+ ?SIZED(Size, resize(Size * Size, binary())).
+qc_routing_key() ->
+ noshrink(binary(10)).
+qc_delivery_mode() ->
+ oneof([1, 2]).
+qc_message() ->
+ qc_message(qc_delivery_mode()).
+qc_message(DeliveryMode) ->
+ {call, rabbit_basic, message, [
+ qc_default_exchange(),
+ qc_routing_key(),
+ #'P_basic'{delivery_mode = DeliveryMode},
+ qc_message_payload()]}.
+qc_default_exchange() ->
+ {call, rabbit_misc, r, [<<>>, exchange, <<>>]}.
+qc_variable_queue_init(Q) ->
+ {call, ?BQMOD, init,
+ [Q, false, function(2, ok)]}.
+qc_test_q() ->
+ {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}.
+qc_test_queue() ->
+ qc_test_queue(boolean()).
+qc_test_queue(Durable) ->
+ #amqqueue{name = qc_test_q(),
+ durable = Durable,
+ auto_delete = false,
+ arguments = [],
+ pid = self()}.
+rand_choice([]) -> [];
+rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)].
+dropfun(Props) ->
+ Expiry = eval({call, erlang, element,
+ [?RECORD_INDEX(expiry, message_properties), Props]}),
+ Expiry =/= 0.
+drop_messages(Messages) ->
+ case queue:out(Messages) of
+ {empty, _} ->
+ Messages;
+ {{value, MsgProps_Msg}, M2} ->
+ MsgProps = {call, erlang, element, [1, MsgProps_Msg]},
+ case dropfun(MsgProps) of
+ true -> drop_messages(M2);
+ false -> Messages
+ end
+ end.